Spark快速入门

一、Spark框架

1.Spark介绍

1.1 spark和Hadoop

Spark和Hadoop的根本差异是多个作业之间的数据通信问题:
Spark多个作业之间数据通信是基于内存,而Hadoop是基于磁盘的

1.2 spark和mapreduce

在绝大多数的数据计算场景中,spark确实会比mapreduce更有优势,但是Spark是基于内存的,所以在实际的生产环境中,由于内存的限制,可能会由于内存资源不够导致Job执行失败,此时,Mapreduce其实是一个更好的选择,所以Spark并不能完全替代MR。

1.3 spark组件

  • 1.Spark Core

    spark core中提供了Spark最基础和最核心的功能,spark其他功能如:Spark SQL,spark Streaming,GraphX,Mlib都是在spark core上拓展的

  • 2.Spark SQL

    spark sql是spark用来操作结构化数据的组件。通过spark sql,用户可以使用SQL或者hive

  • 3.Spark Streaming

    Spark Streaming是spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API

在这里插入图片描述

2.快速入门

2.1 WordCount项目

在这里插入图片描述

采用了Spark特有方法的写法

package com.zxy.SparkCore

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount2{
    def main(args: Array[String]): Unit = {
        //建立和Spark框架的连接
        val wordCount: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
        val context: SparkContext = new SparkContext(wordCount)
        
        //读取指定文件目录数据
        val lines: RDD[String] = context.textFile("spark-core\\dates")
        
        //切分数据
        val words: RDD[String] = lines.flatMap(_.split("\\s+"))
        
        //数据分组
        val WordToOne: RDD[(String, Int)] = words.map(
            word => (word, 1)
        )
        
        //spark提供的方法,将分组和聚合通过一个方法实现
        //reduceByKey:相同的饿数据,可以对value进行reduce聚合
        val WordToCount: RDD[(String, Int)] = WordToOne.reduceByKey(_ + _)
        
        //数据收集
        val array: Array[(String, Int)] = WordToCount.collect()
        
        //数据打印
        array.foreach(println)
        
        //关闭连接
        context.stop()
    }
}

简化版

package com.zxy.SparkCore

import org.apache.spark.{SparkConf, SparkContext}

object WordCount4{
    def main(args: Array[String]): Unit = {
        //建立和Spark框架的连接
        val wordCount: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
        val context: SparkContext = new SparkContext(wordCount)
        
        context.textFile("spark-core\\dates").flatMap(_.split("\\s+")).map(word => (word,1)).reduceByKey(_ + _).collect().foreach(println)
        
        //关闭连接
        context.stop()
    }
}

控制台效果

在这里插入图片描述
在这里插入图片描述

2.2 Maven的POM文件

我这里采用的Scala2.11.8
使用的Spark2.4.7

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scalap</artifactId>
            <version>2.11.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.7</version>
        </dependency>
    </dependencies>

2.3 log4j.properties


log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

log4j.logger.org.apache.spark.repl.Main=WARN

log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

log4j.logger.org.apache.parquet.CorruptStatistics=ERROR
log4j.logger.parquet.CorruptStatistics=ERROR

3.Linux安装Spark

Hadoop版本采用2.8.1,Spark版本采用3.0.2

Spark下载官方地址

## 解压缩
[root@hadoop software]# tar -zxvf spark-3.0.2-bin-hadoop3.2.tgz -C /opt/

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.2
      /_/
         
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.

## Web UI
http://192.168.130.129:4040/jobs/

快速入门

scala> sc.textFile("data/date.txt").flatMap(_.split("\\s+")).groupBy(word => word).map(vk => (vk._1,vk._2.size)).collect().foreach(println)
(Spark,1)                                                                       
(Hello,2)
(Scala,1)

scala> sc.textFile("data/date.txt").flatMap(_.split("\\s+")).map(word => (word,1)).reduceByKey(_ + _).collect().foreach(println)
(Spark,1)
(Hello,2)
(Scala,1)

4.IDEA项目上传到Spark终端

## Maven项目打包上传到Spark,
bin/spark-submit \
--class com.zxy.SparkCore.WordCount4 \
--master local[2] \
/opt/apps/spark-3.0.2/data/spark-core-1.0-SNAPSHOT.jar \
10

5.Spark资源申请架构

在这里插入图片描述

6.提交流程(资源申请和运算)

在这里插入图片描述

二、Spark-Core

1.Driver和Executor通信

Driver相当于Client,Executor相当于Server

  • Driver代码
package com.zxy.Socket

import java.io.OutputStream
import java.net.Socket


object Driver {
    def main(args: Array[String]): Unit = {
        //连接服务器
        val client: Socket = new Socket("localhost",9999)
        
        //发送数据
        val out: OutputStream = client.getOutputStream
        
        out.write(2)
        out.flush()
        out.close()
        client.close()
    }
}
  • Executor代码
package com.zxy.Socket

import java.io.InputStream
import java.net.{ServerSocket, Socket}

object Executor {
    def main(args: Array[String]): Unit = {
        //启动服务器,接受数据
        val server: ServerSocket = new ServerSocket(9999)
        
        println("服务器启动,等待数据")
        //等待客户端连接接收数据
        val client: Socket = server.accept()
        
        val in: InputStream = client.getInputStream
        
        val i: Int = in.read()
        
        println(s"接收到客户端数据 + ${i}")
        client.close()
        server.close()
    }
}

先启动服务端Executor,等待数据

启动客户端Driver,建立连接发送数据

2.案例引入Spark三大数据结构

2.1 案例

修改以上案例,使用两个服务端Executor接收数据,将Task中的数据分开计算

  • Executor1

package com.zxy.Socket

import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}

object Executor1 {
    def main(args: Array[String]): Unit = {
        //启动服务器,接受数据
        val server: ServerSocket = new ServerSocket(8888)
        
        println("服务器启动,等待数据")
        //等待客户端连接接收数据
        val client: Socket = server.accept()
        
        val in: InputStream = client.getInputStream
    
        val TaskOBJ2: ObjectInputStream = new ObjectInputStream(in)
        val task: SubTask = TaskOBJ2.readObject().asInstanceOf[SubTask]
        
        val ints: List[Int] = task.computer()

        println(s"计算[8888]后的结果是: ${ints}")
        TaskOBJ2.close()
        client.close()
        server.close()
        
    }
}

  • Executor2

package com.zxy.Socket

import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}

object Executor2 {
    def main(args: Array[String]): Unit = {
        //启动服务器,接受数据
        val server: ServerSocket = new ServerSocket(9999)
        
        println("服务器启动,等待数据")
        //等待客户端连接接收数据
        val client: Socket = server.accept()
        
        val in: InputStream = client.getInputStream
    
        val TaskOBJ1: ObjectInputStream = new ObjectInputStream(in)
        val task: SubTask = TaskOBJ1.readObject().asInstanceOf[SubTask]
        
        val ints: List[Int] = task.computer()

        println(s"计算[9999]后的结果是: ${ints}")
        TaskOBJ1.close()
        client.close()
        server.close()
        
    }
}

  • Driver

package com.zxy.Socket

import java.io.{ObjectOutputStream, OutputStream}
import java.net.Socket


object Driver {
    def main(args: Array[String]): Unit = {
        //连接服务器
        val client1: Socket = new Socket("localhost",8888)
        val client2: Socket = new Socket("localhost",9999)
    
        val task: Task = new Task()
        
        //server1发送数据
        val out1: OutputStream = client1.getOutputStream
        val TaskOBJ1: ObjectOutputStream = new ObjectOutputStream(out1)
    
        val subTask1 = new SubTask()
        subTask1.logic = task.logic
        subTask1.datas = task.datas.take(2)
        
        TaskOBJ1.writeObject(subTask1)
        TaskOBJ1.flush()
        TaskOBJ1.close()
        client1.close()
    
        //server2发送数据
        val out2: OutputStream = client2.getOutputStream
        val TaskOBJ2: ObjectOutputStream = new ObjectOutputStream(out2)
    
        val subTask2 = new SubTask()
        subTask2.logic = task.logic
        subTask2.datas = task.datas.takeRight(2)
        
        TaskOBJ2.writeObject(subTask2)
        TaskOBJ2.flush()
        TaskOBJ2.close()
        client2.close()
        println("数据发送完毕")
    }
}

  • Task

package com.zxy.Socket

class Task extends Serializable {
    val datas = List(1,2,3,4)
    
    val logic:Int => Int = _ * 2
}

  • SubTask

package com.zxy.Socket

class SubTask extends Serializable {
    //初始值
    var datas:List[Int] = _
    
    var logic:Int => Int = _
    
    //计算
    def computer()={
        datas.map(logic)
    }
}

  • 执行效果

先启动Executor1,Executor2;
再启动Driver
Executor1:
    服务器启动,等待数据
    计算[8888]后的结果是: List(2, 4)

Executor2:
    服务器启动,等待数据
    计算[9999]后的结果是: List(6, 8)
Driver:
	数据发送完毕

2.2 Spark三大数据结构

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

RDD: 弹性分布式数据集
累加器:分布式共享只写变量
广播变量:分布式共享只读变量

3.RDD:弹性分布式数据集

3.1 创建RDD

3.1.1 从内存创建RDD
package Rdd.Rebuilder

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * file:Spark_RDD_Memary
 * author:zxy
 * date:2021-06-07
 * desc:从内存创建RDD
 */
object Spark_RDD_Memary {
    def main(args: Array[String]): Unit = {
        //TODO 准备环境
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    
        val sc: SparkContext = new SparkContext(conf)
        //TODO 创建RDD
        //从内存中创建RDD,将内存中集合的数据作为处理的数据源
        val seq = Seq[Int](1,2,3,4)
        //parallelize:并行
        //val rdd: RDD[Int] = sc.parallelize(seq)
        //makeRDD实现功能和parallelize相同,其底层实现时调用了rdd对象的parallelize方法
        val rdd: RDD[Int] = sc.makeRDD(seq)
        rdd.collect().foreach(println)
        //TODO 关闭环境
        sc.stop()
    }
}
3.1.2 从外部文件创建RDD
  • textFile
package Rdd.Rebuilder

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * file:Spark_RDD_File
 * author:zxy
 * date:2021-06-07
 * desc:从外部文件创建RDD
 */
object Spark_RDD_File {
    def main(args: Array[String]): Unit = {
        //TODO 准备环境
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    
        val sc: SparkContext = new SparkContext(conf)
        //TODO 创建RDD
        //从文件中创建RDD,将文件中集合的数据作为处理的数据源
        //1.path路径默认为根路径为基准,可以写绝对路径也可以写相对路径
        //2.path可以是文件,也可以是一个路径
        //3.path可以使用通配符
        //4.path可以是分布式存储系统的路径:HDFS
        val rdd: RDD[String] = sc.textFile("spark-core/dates/1.txt")
        rdd.collect().foreach(println)
        //TODO 关闭环境
        sc.stop()
    }
}
  • wholeTextFiles
package Rdd.Rebuilder

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * file:Spark_RDD_Files
 * author:zxy
 * date:2021-06-07
 * desc:从外部文件创建RDD
 */
object Spark_RDD_Files {
    def main(args: Array[String]): Unit = {
        //TODO 准备环境
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    
        val sc: SparkContext = new SparkContext(conf)
        //TODO 创建RDD
        //从文件中创建RDD,将文件中的数据作为处理的数据源
        
        //textFile:以行为单位来读取数据,读取的数据都是字符串
        //wholeTextFiles:以文件为单位读取数据
        val rdd: RDD[(String, String)] = sc.wholeTextFiles("spark-core/dates")
        rdd.collect().foreach(println)
        //TODO 关闭环境
        sc.stop()
    }
}

3.2 RDD并行度与分区

package Rdd.Rebuilder

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * file:Spark_RDD_File_Par
 * author:zxy
 * date:2021-06-07
 * desc:从外部文件创建RDD
 */
object Spark_RDD_File_Par {
    def main(args: Array[String]): Unit = {
        //TODO 准备环境
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        //2.设置使用核数为5核,即产生5个分区
        conf.set("spark.default.parallelism","5")
        val sc: SparkContext = new SparkContext(conf)
        //TODO 创建RDD
        //RDD的并行度 & 分区
        //makeRDD方法可以传递第二个参数,这个参数表示分区的数量
        //第二个参数可以不传递,那么makeRDD方法会使用默认值:defaultParallelism(默认并行度)
        //scheduler.conf.getInt("spark.default.parallelism",totalCores)
        //spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism
        //如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数
        
        
        //1.设置两个分区
//        val rdd = sc.makeRDD(
//            List(1,2,3,4),2
//        )
        
        //2.选择默认分区
        val rdd = sc.makeRDD(
            List(1,2,3,4)
        )
    
        //将处理的数据保存成分区文件
        rdd.saveAsTextFile("spark-core/dates/output")
        
        //TODO 关闭环境
        sc.stop()
    }
}

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐