本系列主要总结Spark的使用方法,及注意事项。


1,spark简介

Apache Spark是一个开源、强大的的分布式查询和处理引擎,最初由Matei Zaharia在UC Berkeley读博期间开发的[1]。最早的Spark版本于2012年发布,后来被捐赠给Apache SoftwareFoundation,成为Apache的旗舰项目之一(github链接:https://github.com/apache/spark)。

Apache Spark提供MapReduce的灵活性和可扩展性,但处理速度明显高于MapReduce,内存内处理速度和访问磁盘速度 分别比Hadoop快100倍和10倍。

Apache Spark 允许用户读取、转换和 聚合数据,可以轻松地训练和部署复杂的统计模型,并且Spark API支持Java、Scala、Python、R和SQL的访问。其可以通过如Jupyter、Apache Zeppelin、Spark-Notebook、Databricks notebooks这样的笔记本交互式地执行快速的数据分析。除此之外,Apache Spark还提供有几个已经实现并调优过的算法、统计模型和框架,如用于机器学习的MLlib和ML,用于图形处理的GraphX和GraphFrames,用于处理实时流数据的Spark Streaming。

1.1 什么是RDD?

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
在这里插入图片描述
通俗点来讲,可以将 RDD 理解为一个分布式对象集合,本质上是一个只读的分区记录集合。每个 RDD 可以分成多个分区,每个分区就是一个数据集片段。一个 RDD 的不同分区可以保存到集群中的不同结点上,从而可以在集群中的不同结点上进行并行计算。

1.2 RDD 的属性

只读:不能修改,只能通过转换操作生成新的 RDD。
分布式:可以分布在多台机器上进行并行处理。
弹性:计算过程中内存不够时它会和磁盘进行数据交换。
基于内存:可以全部或部分缓存在内存中,在多次计算间重用。

(1)一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目

(2)一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

(3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算

(4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

(5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置

通过使用 RDD,用户不必担心底层数据的分布式特性,只需要将具体的应用逻辑表达为一系列转换处理,就可以实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘 I/O 和数据序列化的开销

RDD主要有两组操作:转换操作(transformation,返回指向新RDD的指针)和行动操作(action,在运行计算后向驱动程序返回值)。Spark每个转换操作并行执行,大大提高速度。数据集的转换通常是惰性的,即在transformation过程不会执行程序,只有在action过程才会执行。

2,RDD的转换(transformation)和行动操作(action)

本节总结一部分常用的转换和行动操作,详细全面的内容请参考官方文档

2.1,创建RDD

import pyspark
from pyspark import SparkContext,SparkConf

conf = SparkConf().setAppName('project1').setMaster('local')
sc = SparkContext.getOrCreate(conf)

# 创建RDD
'''
创建RDD的方式有两种,
一是,.parallelize(…) 个collection集合
二是,引用位于本地或HDFS上的某个文件(或多个文件)
'''
data = sc.parallelize([['1','2'],{'m':2}])
data = sc.textFile('..VS14MORT.txt.gz',4)

'''
RDD是无模式的数据结构(不像DataFrames)。
因此,在使用RDD时,并行化数据集对于Spark来说是完美的。
'''
data_heterogenous = sc.parallelize([('Ferrari','fast'),{'Porsche':100000},['Spain','visited',4504]]).collect()

'''
所以,我们可以混合几乎任何东西:一个元组,一个字典,或一个列表。
一旦你.collect()数据集(即,运行一个动作将其返回给驱动程序),你可以像在Python中通常那样访问对象中的数据:
.collect()方法将RDD的所有元素返回到驱动程序,并将其作为列表序列化。
'''
data_heterogenous
output:[('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain', 'visited', 4504]]

2.2 transformation

1) map:用于RDD每个元素

data_file_1 = data_fromfile.map(lambda row:(row[2],row[2] + 'x' ))
data_file_1.take(10)
[('e', 'ex'),
 ('4', '4x'),
 ('1', '1x'),
 ('7', '7x'),
 ('5', '5x'),
 ('4', '4x'),
 ('6', '6x'),
 ('6', '6x'),
 ('5', '5x'),
 ('5', '5x')]

2) filter 允许选择符合指定条件的数据集元素

data_file_2 = data_fromfile.filter(lambda row:row[0]=='u' and row[1]=='s')
data_file_2.count()
1

flatMap 与map()的工作方式类似,但返回的是平铺的结果而不是列表。

data_file_1 = data_fromfile.flatMap(lambda row:(row[2],row[2] + 'x' ))
data_file_1.take(10)
['e', 'ex', '4', '4x', '1', '1x', '7', '7x', '5', '5x']

3) distinct 此方法返回指定列中不同值的列表。

data_fromfile.map(lambda row:row[2]).distinct().collect()
['e', '4', '1', '7', '5', '6', ',', '0', '2', '3', '8', '9']

4) sample 该方法返回数据集的随机样本,第一个参数指定采样是否应该替换,第二个参数定义返回数据的分数,第三个参数是伪随机数产生器的种子。

fraction = 0.1
data_sample = data_fromfile.sample(False,fraction,1)
data_sample.take(10)
['11,8,2',
 '13,6,2',
 '56,1,1',
 '71,5,1',
 '99,5,2',
 '108,2,1',
 '112,2,1',
 '114,2,1',
 '144,2,1',
 '155,6,2']

5) leftOuterJoin() 左外连接就像SQL一样,根据两个数据集中的值加入两个RDD,并从左RDD中返回从右侧追加两个RDD匹配的记录。

rdd1 = sc.parallelize([('a',1),('b',4),('c',10)])
rdd2 = sc.parallelize([('a',4),('a',1),('b','6'),('d',15)])
rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3.take(5)
[('b', (4, '6')), ('c', (10, None)), ('a', (1, 4)), ('a', (1, 1))]

6) 如果我们使用.join(…)方法,那么当这两个值在这两个RDD之间相交时,我们只能得到’a’和’b’的值。

rdd4 = rdd1.join(rdd2)
rdd4.collect()
[('b', (4, '6')), ('a', (1, 4)), ('a', (1, 1))]

7) 另一个有用的方法是.intersection(…),它返回两个RDD中相同的记录。

rdd5 = rdd1.intersection(rdd2)
rdd5.collect()
[('a', 1)]

2.3 action

1) take() 从单个数据分区返回n个最高行,跟panda的head差不多

data_fromfile.take(5)
['user_id,age,gender', '1,4,1', '2,10,1', '3,7,2', '4,5,1']

2) reduce 使用指定的方法减少RDD的元素

rdd7 = rdd1.map(lambda row: row[0]).reduce(lambda x,y:x+y)
rdd7
'abc'

3) reduceByKey(…) 将key值相同的单元合并在一起

rdd2.reduceByKey(lambda x,y:x+y).collect()
[('a', 5), ('b', '6'), ('d', 15)]

4) count() 统计rdd元素数量

rdd1.count()
3

5) .countByKey() 统计key相同的数量

rdd2.countByKey().items()
dict_items([('a', 2), ('b', 1), ('d', 1)])

6) saveAsTextFile(…) 将RDD保存到文本文件:每个分区保存到一个单独的文件。

rdd2.saveAsTextFile('rdd_data.txt')
sc.textFile('rdd_data.txt').collect()
["('a', 4)", "('a', 1)", "('b', '6')", "('d', 15)"]

7) foreach(…) 将函数应用到RDD的每个元素

def f(x):
    print(x)
rdd1.foreach(f)
Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐