参考:https://github.com/seahboonsiew/pyspark-csv


csv数据介绍

# blah.csv

Name, Model, Size, Width, Dt
Jag, 63, 4, 4, '2014-12-23'
Pog, 7.0, 5, 5, '2014-12-23'
Peek, 68 xp, 5, 5.5, ''


用法

#!/usr/bin/python
# -*- coding: UTF-8 -*-

from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import DataFrame,SQLContext,HiveContext

sc = SparkContext(conf=SparkConf().setAppName("The first example"))
# sqlCtx = SQLContext or HiveContext
sqlCtx=SQLContext(sc)

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

# Read csv data via SparkContext and convert it to DataFrame
plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
# plaintext_rdd = sc.textFile('file:///x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)

# By default, pyspark-csv parses the first line as column names. To supply your own column names
plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd, columns=['Name','Model','Size','Width','Dt'])

# To change separator
plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
datarame = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd, sep=",")

# Skipping date and time parsing can lead to significant performance gain on large datasets
plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd, parseDate=False)


pyspark_csv.py

import csv
import sys
import dateutil.parser
from pyspark.sql.types import (StringType, DoubleType, TimestampType, NullType,
                               IntegerType, StructType, StructField)

py_version = sys.version_info[0]


def csvToDataFrame(sqlCtx, rdd, columns=None, sep=",", parseDate=True):
    """Converts CSV plain text RDD into SparkSQL DataFrame (former SchemaRDD)
    using PySpark. If columns not given, assumes first row is the header.
    If separator not given, assumes comma separated
    """
    if py_version < 3:
        def toRow(line):
            return toRowSep(line.encode('utf-8'), sep)
    else:
        def toRow(line):
            return toRowSep(line, sep)

    rdd_array = rdd.map(toRow)
    rdd_sql = rdd_array

    if columns is None:
        columns = rdd_array.first()
        rdd_sql = rdd_array.zipWithIndex().filter(
            lambda r_i: r_i[1] > 0).keys()
    column_types = evaluateType(rdd_sql, parseDate)

    def toSqlRow(row):
        return toSqlRowWithType(row, column_types)

    schema = makeSchema(zip(columns, column_types))

    return sqlCtx.createDataFrame(rdd_sql.map(toSqlRow), schema=schema)


def makeSchema(columns):
    struct_field_map = {'string': StringType(),
                        'date': TimestampType(),
                        'double': DoubleType(),
                        'int': IntegerType(),
                        'none': NullType()}
    fields = [StructField(k, struct_field_map[v], True) for k, v in columns]

    return StructType(fields)


def toRowSep(line, d):
    """Parses one row using csv reader"""
    for r in csv.reader([line], delimiter=d):
        return r


def toSqlRowWithType(row, col_types):
    """Convert to sql.Row"""
    d = row
    for col, data in enumerate(row):
        typed = col_types[col]
        if isNone(data):
            d[col] = None
        elif typed == 'string':
            d[col] = data
        elif typed == 'int':
            d[col] = int(round(float(data)))
        elif typed == 'double':
            d[col] = float(data)
        elif typed == 'date':
            d[col] = toDate(data)
    return d


# Type converter
def isNone(d):
    return (d is None or d == 'None' or
            d == '?' or
            d == '' or
            d == 'NULL' or
            d == 'null')


def toDate(d):
    return dateutil.parser.parse(d)


def getRowType(row):
    """Infers types for each row"""
    d = row
    for col, data in enumerate(row):
        try:
            if isNone(data):
                d[col] = 'none'
            else:
                num = float(data)
                if num.is_integer():
                    d[col] = 'int'
                else:
                    d[col] = 'double'
        except:
            try:
                toDate(data)
                d[col] = 'date'
            except:
                d[col] = 'string'
    return d


def getRowTypeNoDate(row):
    """Infers types for each row"""
    d = row
    for col, data in enumerate(row):
        try:
            if isNone(data):
                d[col] = 'none'
            else:
                num = float(data)
                if num.is_integer():
                    d[col] = 'int'
                else:
                    d[col] = 'double'
        except:
            d[col] = 'string'
    return d


def reduceTypes(a, b):
    """Reduces column types among rows to find common denominator"""
    type_order = {'string': 0, 'date': 1, 'double': 2, 'int': 3, 'none': 4}
    reduce_map = {'int': {0: 'string', 1: 'string', 2: 'double'},
                  'double': {0: 'string', 1: 'string'},
                  'date': {0: 'string'}}
    d = a
    for col, a_type in enumerate(a):
        # a_type = a[col]
        b_type = b[col]
        if a_type == 'none':
            d[col] = b_type
        elif b_type == 'none':
            d[col] = a_type
        else:
            order_a = type_order[a_type]
            order_b = type_order[b_type]
            if order_a == order_b:
                d[col] = a_type
            elif order_a > order_b:
                d[col] = reduce_map[a_type][order_b]
            elif order_a < order_b:
                d[col] = reduce_map[b_type][order_a]
    return d


def evaluateType(rdd_sql, parseDate):
    if parseDate:
        return rdd_sql.map(getRowType).reduce(reduceTypes)
    else:
        return rdd_sql.map(getRowTypeNoDate).reduce(reduceTypes)


Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐