1. Spark介绍

Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一,与Hadoop和Storm等其他大数据和MapReduce技术相比,Spark有如下优势:

1) 运行速度快, Spark拥有DAG执行引擎,支持在内存中对数据进行迭代计算。官方提供的数据表明,如果数据由磁盘读取,速度是Hadoop MapReduce的10倍以上,如果数据从内存中读取,速度可以高达100多倍。

2) 适用场景广泛, 大数据分析统计,实时数据处理,图计算及机器学习

3) 易用性, 编写简单, 支持80种以上的高级算子,支持多种语言,数据源丰富,可部署在多种集群中

4) 容错性高。Spark引进了弹性分布式数据集RDD (Resilient Distributed Dataset) 的抽象,它是分布在一组节点中的只读对象集合,这些集合是弹性的,如果数据集一部分丢失,则可以根据“血统”(即充许基于数据衍生过程)对它们进行重建。另外在RDD计算时可以通过CheckPoint来实现容错,而CheckPoint有两种方式:CheckPoint Data,和Logging The Updates,用户可以控制采用哪种方式来实现容错.

Spark的适用场景有以下几个类型:

1) 复杂的批量处理(Batch Data Processing),偏重点在于处理海量数据的能力,至于处理速度可忍受,通常的时间可能是在数十分钟到数小时;

2) 基于历史数据的交互式查询(Interactive Query),通常的时间在数十秒到数十分钟之间

3) 基于实时数据流的数据处理(Streaming Data Processing),通常在数百毫秒到数秒之间

2. Spark架构以及生态

通常当需要处理的数据量超过了单机尺度(比如我们的计算机有4GB的内存,而我们需要处理100GB以上的数据)这时我们可以选择spark集群进行计算,有时我们可能需要处理的数据量并不大,但是计算很复杂,需要大量的时间,这时我们也可以选择利用spark集群强大的计算资源,并行化地计算,其架构示意图如下:

v2-fb2d2031d3c60ee7bb1454c4905ff512_b.jpg

Spark Core:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的。

Spark SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作。Spark提供的sql形式的对接Hive、JDBC、HBase等各种数据渠道的API,用Java开发人员的思想来讲就是面向接口、解耦合,ORMapping、Spring Cloud Stream等都是类似的思想。

Spark Streaming:基于SparkCore实现的可扩展、高吞吐、高可靠性的实时数据流处理。支持从Kafka、Flume等数据源处理后存储到HDFS、DataBase、Dashboard中。对实时数据流进行处理和控制。Spark Streaming允许程序能够像普通RDD一样处理实时数据。

MLlib:一个常用机器学习算法库,算法被实现为对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。

GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作

3. Spark的架构设计

(1) 运行流程及特点

Hadoop存在缺陷: 基于磁盘,无论是MapReduce还是YARN都是将数据从磁盘中加载出来,经过DAG,然后重新写回到磁盘中,计算过程的中间数据又需要写入到HDFS的临时文件,这些都使得Hadoop在大数据运算上表现太“慢”,Spark应运而生。

v2-415142f5ad33c6dcd37717e2bee8310d_b.jpg

Cluster Manager在standalone模式中即为Master主节点,控制整个集群,监控worker。在YARN模式中为资源管理器负责分配资源,有点像YARN中ResourceManager那个角色,大管家握有所有的干活的资源,属于乙方的总包。

WorkerNode是可以干活的节点,听大管家ClusterManager差遣,是真正有资源干活的主。从节点,负责控制计算节点,启动Executor或者Driver。

Executor是在WorkerNode上起的一个进程,相当于一个包工头,负责准备Task环境和执行

Task,负责内存和磁盘的使用。Task是施工项目里的每一个具体的任务。

Driver是统管Task的产生与发送给Executor的,运行Application 的main()函数,是甲方的司令员。

SparkContext是与ClusterManager打交道的,负责给钱申请资源的,是甲方的接口人。

整个互动流程是这样的:

1) Spark Apllication 创建SparkContext , 通过SparkContext向Cluster manager(资源管理器)申请所需执行的资源(cpu、内存等)

2) Cluster manager分配应用程序执行需要的资源,在Worker(Yarn是NodeManager)节点上创建Executor

3) SparkContext 将程序代码(jar包或者python文件)和Task任务发送给Executor执行,并收集结果给Driver。

4) 释放SparkContext

运行流程及特点

v2-17d855b91a9cc12b17af379173352e3c_b.jpg

v2-4624ce3e6580b9e44d888a33bf110503_b.jpg

涉及的几个定义和详细的运行过程如下:

1) Application:Spark应用程序

指的是用户编写的Spark应用程序,包含了Driver功能代码和分布在集群中多个节点上运行的Executor代码。

Spark应用程序,由一个或多个作业JOB组成,如下图所示。

v2-328ca6307b8173f3ee440a33d0d02b50_b.jpg

2) Driver:驱动程序

Spark中的Driver即运行上述Application的Main()函数并且创建SparkContext,其中创建SparkContext的目的是为了准备Spark应用程序的运行环境。在Spark中由SparkContext负责和ClusterManager通信,进行资源的申请、任务的分配和监控等;当Executor部分运行完毕后,Driver负责将SparkContext关闭。通常SparkContext代表Driver,如下图所示。

v2-3b94f4cc8bf36ff2885749eefaedee78_b.jpg

3) Cluster Manager:资源管理器

指的是在集群上获取资源的外部服务,常用的有:Standalone,Spark原生的资源管理器,由Master负责资源的分配;Hadoop Yarn,由Yarn中的ResourceManager负责资源的分配;Messos,由Messos中的Messos Master负责资源管理。

4) Executor:执行器

Executor是Application运行在Worker节点上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上,每个Application都有各自独立的一批Executor,如下图所示。

v2-a59c4eced9ba25928a94c6f6f89a22e1_b.jpg

5)Worker:计算节点

集群中任何可以运行Application代码的节点,类似于Yarn中的NodeManager节点。在Standalone模式中指的就是通过Slave文件配置的Worker节点,在Spark on Yarn模式中指的就是NodeManager节点,在Spark on Messos模式中指的就是Messos Slave节点,如下图所示。

v2-ba9edf0d53da118f6010b7a1a961b6ac_b.jpg

6) DAGScheduler:有向无环图调度器

在Spark里每一个操作生成一个RDD,RDD之间连一条边,最后这些RDD和他们之间的边组成一个有向无环图,这个就是DAG。

Spark会根据宽依赖窄依赖来划分具体的Stage,而依赖有2个作用:

用来解决数据容错的高效性和用来划分stage。

RDD的依赖关系分为两种:窄依赖(Narrow Dependencies)与宽依赖(Wide Dependencies)

v2-bb1e89c0b1073d00110802944e8c37c2_b.jpg

a. 窄依赖

每个父RDD的一个Partition最多被子RDD的一个Partition所使用(1:1 或 n:1)。例如map、filter、union等操作都会产生窄依赖;

子RDD分区通常对应常数个父RDD分区(O(1),与数据规模无关)。

b. 宽依赖

一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖;(1:m 或 n:m)

子RDD分区通常对应所有的父RDD分区(O(n),与数据规模有关)

原始的RDD通过一系列的转换就形成了DAG,有了计算的DAG图,Spark内核下一步的任务就是根据DAG图将计算划分成任务集,也就是Stage,这样可以将任务提交到计算节点进行真正的计算。Spark计算的中间结果默认是保存在内存中的,Spark在划分Stage的时候会充分考虑在分布式计算中可流水线计算(pipeline)的部分来提高计算的效率,而在这个过程中Spark根据RDD之间依赖关系的不同将DAG划分成不同的Stage(调度阶段)。对于窄依赖,partition的转换处理在一个Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。

v2-9fcd5cb0a424dad36382cdc835d45ab6_b.jpg

Spark 执行时有下面所列的流程:

a. 用户代码定义RDD的有向无环图

RDD上的操作会创建新的RDD,并引用它们的父节点,这样就创建了一个图。

b. 行动(action)操作把有向无环图强制转译为执行计划stage

当调用RDD的一个行动操作时,这个RDD就必须被计算出来。这也要求计算出该RDD的父节点。Spark调度器提交一个作业来计算出所有必要的RDD。这个作业会包含一个或多个步骤,每个步骤其实也就是一波并行执行的计算任务。一个stage步骤对应有向五环图中的一个或多个RDD,一个步骤对应多个RDD是因为发生了流水线执行。

c. 任务于集群中调度并执行

步骤stage是按顺序处理的,任务则独立的启动来计算出RDD的一部分。一旦作业的最后一个步骤结束,一个行动操作也就执行完了。

DAGScheduler基于DAG划分Stage 并以TaskSet的形势提交Stage给TaskScheduler;负责将作业拆分成不同阶段的具有依赖关系的多批任务;最重要的任务之一就是:计算作业和任务的依赖关系,制定调度逻辑。在SparkContext初始化的过程中被实例化,一个SparkContext对应创建一个DAGScheduler。

v2-a925ae236d57cdd87e33d1e3488e01eb_b.jpg

7) TaskScheduler:任务调度器

将Taskset提交给worker(集群)运行并回报结果;负责每个具体任务的实际物理调度。如图所示。

v2-e5ce94634f0514aad80467d165d9a0f9_b.jpg

8) Job:作业

由一个或多个调度阶段所组成的一次计算作业;包含多个Task组成的并行计算,往往由Spark Action催生,一个JOB包含多个RDD及作用于相应RDD上的各种Operation。如图所示。

v2-8df819207fcc3fa0856f85a9ba2bff72_b.jpg

9) Stage:调度阶段

一个任务集对应的调度阶段;每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段;Stage分成两种类型ShuffleMapStage、ResultStage。如图所示。

v2-9be6d05d0c6bbe2e5bd14f1f66c8c4f9_b.jpg

Application多个job多个Stage:Spark Application中可以因为不同的Action触发众多的job,一个Application中可以有很多的job,每个job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行。

划分依据:Stage划分的依据就是宽依赖,何时产生宽依赖,reduceByKey, groupByKey等算子,会导致宽依赖的产生。

核心算法:从后往前回溯,遇到窄依赖加入本stage,遇见宽依赖进行Stage切分。Spark内核会从触发Action操作的那个RDD开始从后往前推,首先会为最后一个RDD创建一个stage,然后继续倒推,如果发现对某个RDD是宽依赖,那么就会将宽依赖的那个RDD创建一个新的stage,那个RDD就是新的stage的最后一个RDD。然后依次类推,继续继续倒推,根据窄依赖或者宽依赖进行stage的划分,直到所有的RDD全部遍历完成为止。

将DAG划分为Stage剖析:如上图,从HDFS中读入数据生成3个不同的RDD,通过一系列transformation操作后再将计算结果保存回HDFS。可以看到这个DAG中只有join操作是一个宽依赖,Spark内核会以此为边界将其前后划分成不同的Stage. 同时我们可以注意到,在图中Stage2中,从map到union都是窄依赖,这两步操作可以形成一个流水线操作,通过map操作生成的partition可以不用等待整个RDD计算结束,而是继续进行union操作,这样大大提高了计算的效率。

10) TaskSet:任务集

由一组关联的,但相互之间没有Shuffle依赖关系的任务所组成的任务集。如图所示。

v2-13d499c920d7203a2c1d3e2c67736cb2_b.jpg

提示:

1)一个Stage创建一个TaskSet;

2)为Stage的每个Rdd分区创建一个Task,多个Task封装成TaskSet

v2-b9beb2f87dca68dc02944877ef8febba_b.jpg

summary:

v2-9fd6575052745fdc7b3c557d49bb3095_b.jpg

4. Spark RDD 操作

v2-a369257cbd7a773a5ffdfff3c6b49d9d_b.jpg

map的作用很容易理解就是对rdd之中的元素进行逐一进行函数操作映射为另外一个rdd。

flatMap的操作是将函数应用于rdd之中的每一个元素,将返回的迭代器的所有内容构成新的rdd。通常用来切分单词。

Spark 中 map函数会对每一条输入进行指定的操作,然后为每一条输入返回一个对象。 而flatMap函数则是两个操作的集合——正是“先映射后扁平化”:

-------flat map------

object fla_map {

def main(args: Array[String]): Unit = {

val rdd1= List(List("A","B"),List("C","D"))

rdd1.map( i => println(i))

println(rdd1)

println("----------------------")

val strings = rdd1.flatMap(f => f)

println(strings)

strings.foreach( i => println(i))

}

}

-----------------

v2-7fe519abc47be393897b095e24cd6be6_b.jpg

map:List里有小的List

flatmap:先flat再map,只能压一次,形成一个新的List集合,把原元素放进新的集合里面

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐