《Spark系列》Spark快速入门教程
Spark和Hadoop的根本差异是多个作业之间的数据通信问题:Spark多个作业之间数据通信是基于内存,而Hadoop是基于磁盘的。
Spark快速入门
一、Spark框架
1.Spark介绍
1.1 spark和Hadoop
Spark和Hadoop的根本差异是多个作业之间的数据通信问题:
Spark多个作业之间数据通信是基于内存,而Hadoop是基于磁盘的
1.2 spark和mapreduce
在绝大多数的数据计算场景中,spark确实会比mapreduce更有优势,但是Spark是基于内存的,所以在实际的生产环境中,由于内存的限制,可能会由于内存资源不够导致Job执行失败,此时,Mapreduce其实是一个更好的选择,所以Spark并不能完全替代MR。
1.3 spark组件
-
1.Spark Core
spark core中提供了Spark最基础和最核心的功能,spark其他功能如:Spark SQL,spark Streaming,GraphX,Mlib都是在spark core上拓展的
-
2.Spark SQL
spark sql是spark用来操作结构化数据的组件。通过spark sql,用户可以使用SQL或者hive
-
3.Spark Streaming
Spark Streaming是spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API
2.快速入门
2.1 WordCount项目
采用了Spark特有方法的写法
package com.zxy.SparkCore
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount2{
def main(args: Array[String]): Unit = {
//建立和Spark框架的连接
val wordCount: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
val context: SparkContext = new SparkContext(wordCount)
//读取指定文件目录数据
val lines: RDD[String] = context.textFile("spark-core\\dates")
//切分数据
val words: RDD[String] = lines.flatMap(_.split("\\s+"))
//数据分组
val WordToOne: RDD[(String, Int)] = words.map(
word => (word, 1)
)
//spark提供的方法,将分组和聚合通过一个方法实现
//reduceByKey:相同的饿数据,可以对value进行reduce聚合
val WordToCount: RDD[(String, Int)] = WordToOne.reduceByKey(_ + _)
//数据收集
val array: Array[(String, Int)] = WordToCount.collect()
//数据打印
array.foreach(println)
//关闭连接
context.stop()
}
}
简化版
package com.zxy.SparkCore
import org.apache.spark.{SparkConf, SparkContext}
object WordCount4{
def main(args: Array[String]): Unit = {
//建立和Spark框架的连接
val wordCount: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
val context: SparkContext = new SparkContext(wordCount)
context.textFile("spark-core\\dates").flatMap(_.split("\\s+")).map(word => (word,1)).reduceByKey(_ + _).collect().foreach(println)
//关闭连接
context.stop()
}
}
控制台效果
2.2 Maven的POM文件
我这里采用的Scala2.11.8
使用的Spark2.4.7
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scalap</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.7</version>
</dependency>
</dependencies>
2.3 log4j.properties
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
log4j.logger.org.apache.spark.repl.Main=WARN
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
log4j.logger.org.apache.parquet.CorruptStatistics=ERROR
log4j.logger.parquet.CorruptStatistics=ERROR
3.Linux安装Spark
Hadoop版本采用2.8.1,Spark版本采用3.0.2
## 解压缩
[root@hadoop software]# tar -zxvf spark-3.0.2-bin-hadoop3.2.tgz -C /opt/
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.2
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
## Web UI
http://192.168.130.129:4040/jobs/
快速入门
scala> sc.textFile("data/date.txt").flatMap(_.split("\\s+")).groupBy(word => word).map(vk => (vk._1,vk._2.size)).collect().foreach(println)
(Spark,1)
(Hello,2)
(Scala,1)
scala> sc.textFile("data/date.txt").flatMap(_.split("\\s+")).map(word => (word,1)).reduceByKey(_ + _).collect().foreach(println)
(Spark,1)
(Hello,2)
(Scala,1)
4.IDEA项目上传到Spark终端
## Maven项目打包上传到Spark,
bin/spark-submit \
--class com.zxy.SparkCore.WordCount4 \
--master local[2] \
/opt/apps/spark-3.0.2/data/spark-core-1.0-SNAPSHOT.jar \
10
5.Spark资源申请架构
6.提交流程(资源申请和运算)
二、Spark-Core
1.Driver和Executor通信
Driver相当于Client,Executor相当于Server
- Driver代码
package com.zxy.Socket
import java.io.OutputStream
import java.net.Socket
object Driver {
def main(args: Array[String]): Unit = {
//连接服务器
val client: Socket = new Socket("localhost",9999)
//发送数据
val out: OutputStream = client.getOutputStream
out.write(2)
out.flush()
out.close()
client.close()
}
}
- Executor代码
package com.zxy.Socket
import java.io.InputStream
import java.net.{ServerSocket, Socket}
object Executor {
def main(args: Array[String]): Unit = {
//启动服务器,接受数据
val server: ServerSocket = new ServerSocket(9999)
println("服务器启动,等待数据")
//等待客户端连接接收数据
val client: Socket = server.accept()
val in: InputStream = client.getInputStream
val i: Int = in.read()
println(s"接收到客户端数据 + ${i}")
client.close()
server.close()
}
}
先启动服务端Executor,等待数据
启动客户端Driver,建立连接发送数据
2.案例引入Spark三大数据结构
2.1 案例
修改以上案例,使用两个服务端Executor接收数据,将Task中的数据分开计算
package com.zxy.Socket
import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}
object Executor1 {
def main(args: Array[String]): Unit = {
//启动服务器,接受数据
val server: ServerSocket = new ServerSocket(8888)
println("服务器启动,等待数据")
//等待客户端连接接收数据
val client: Socket = server.accept()
val in: InputStream = client.getInputStream
val TaskOBJ2: ObjectInputStream = new ObjectInputStream(in)
val task: SubTask = TaskOBJ2.readObject().asInstanceOf[SubTask]
val ints: List[Int] = task.computer()
println(s"计算[8888]后的结果是: ${ints}")
TaskOBJ2.close()
client.close()
server.close()
}
}
package com.zxy.Socket
import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}
object Executor2 {
def main(args: Array[String]): Unit = {
//启动服务器,接受数据
val server: ServerSocket = new ServerSocket(9999)
println("服务器启动,等待数据")
//等待客户端连接接收数据
val client: Socket = server.accept()
val in: InputStream = client.getInputStream
val TaskOBJ1: ObjectInputStream = new ObjectInputStream(in)
val task: SubTask = TaskOBJ1.readObject().asInstanceOf[SubTask]
val ints: List[Int] = task.computer()
println(s"计算[9999]后的结果是: ${ints}")
TaskOBJ1.close()
client.close()
server.close()
}
}
package com.zxy.Socket
import java.io.{ObjectOutputStream, OutputStream}
import java.net.Socket
object Driver {
def main(args: Array[String]): Unit = {
//连接服务器
val client1: Socket = new Socket("localhost",8888)
val client2: Socket = new Socket("localhost",9999)
val task: Task = new Task()
//server1发送数据
val out1: OutputStream = client1.getOutputStream
val TaskOBJ1: ObjectOutputStream = new ObjectOutputStream(out1)
val subTask1 = new SubTask()
subTask1.logic = task.logic
subTask1.datas = task.datas.take(2)
TaskOBJ1.writeObject(subTask1)
TaskOBJ1.flush()
TaskOBJ1.close()
client1.close()
//server2发送数据
val out2: OutputStream = client2.getOutputStream
val TaskOBJ2: ObjectOutputStream = new ObjectOutputStream(out2)
val subTask2 = new SubTask()
subTask2.logic = task.logic
subTask2.datas = task.datas.takeRight(2)
TaskOBJ2.writeObject(subTask2)
TaskOBJ2.flush()
TaskOBJ2.close()
client2.close()
println("数据发送完毕")
}
}
package com.zxy.Socket
class Task extends Serializable {
val datas = List(1,2,3,4)
val logic:Int => Int = _ * 2
}
package com.zxy.Socket
class SubTask extends Serializable {
//初始值
var datas:List[Int] = _
var logic:Int => Int = _
//计算
def computer()={
datas.map(logic)
}
}
先启动Executor1,Executor2;
再启动Driver
Executor1:
服务器启动,等待数据
计算[8888]后的结果是: List(2, 4)
Executor2:
服务器启动,等待数据
计算[9999]后的结果是: List(6, 8)
Driver:
数据发送完毕
2.2 Spark三大数据结构
Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
RDD: 弹性分布式数据集
累加器:分布式共享只写变量
广播变量:分布式共享只读变量
3.RDD:弹性分布式数据集
3.1 创建RDD
3.1.1 从内存创建RDD
package Rdd.Rebuilder
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* file:Spark_RDD_Memary
* author:zxy
* date:2021-06-07
* desc:从内存创建RDD
*/
object Spark_RDD_Memary {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc: SparkContext = new SparkContext(conf)
//TODO 创建RDD
//从内存中创建RDD,将内存中集合的数据作为处理的数据源
val seq = Seq[Int](1,2,3,4)
//parallelize:并行
//val rdd: RDD[Int] = sc.parallelize(seq)
//makeRDD实现功能和parallelize相同,其底层实现时调用了rdd对象的parallelize方法
val rdd: RDD[Int] = sc.makeRDD(seq)
rdd.collect().foreach(println)
//TODO 关闭环境
sc.stop()
}
}
3.1.2 从外部文件创建RDD
- textFile
package Rdd.Rebuilder
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* file:Spark_RDD_File
* author:zxy
* date:2021-06-07
* desc:从外部文件创建RDD
*/
object Spark_RDD_File {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc: SparkContext = new SparkContext(conf)
//TODO 创建RDD
//从文件中创建RDD,将文件中集合的数据作为处理的数据源
//1.path路径默认为根路径为基准,可以写绝对路径也可以写相对路径
//2.path可以是文件,也可以是一个路径
//3.path可以使用通配符
//4.path可以是分布式存储系统的路径:HDFS
val rdd: RDD[String] = sc.textFile("spark-core/dates/1.txt")
rdd.collect().foreach(println)
//TODO 关闭环境
sc.stop()
}
}
- wholeTextFiles
package Rdd.Rebuilder
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* file:Spark_RDD_Files
* author:zxy
* date:2021-06-07
* desc:从外部文件创建RDD
*/
object Spark_RDD_Files {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc: SparkContext = new SparkContext(conf)
//TODO 创建RDD
//从文件中创建RDD,将文件中的数据作为处理的数据源
//textFile:以行为单位来读取数据,读取的数据都是字符串
//wholeTextFiles:以文件为单位读取数据
val rdd: RDD[(String, String)] = sc.wholeTextFiles("spark-core/dates")
rdd.collect().foreach(println)
//TODO 关闭环境
sc.stop()
}
}
3.2 RDD并行度与分区
package Rdd.Rebuilder
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* file:Spark_RDD_File_Par
* author:zxy
* date:2021-06-07
* desc:从外部文件创建RDD
*/
object Spark_RDD_File_Par {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
//2.设置使用核数为5核,即产生5个分区
conf.set("spark.default.parallelism","5")
val sc: SparkContext = new SparkContext(conf)
//TODO 创建RDD
//RDD的并行度 & 分区
//makeRDD方法可以传递第二个参数,这个参数表示分区的数量
//第二个参数可以不传递,那么makeRDD方法会使用默认值:defaultParallelism(默认并行度)
//scheduler.conf.getInt("spark.default.parallelism",totalCores)
//spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism
//如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数
//1.设置两个分区
// val rdd = sc.makeRDD(
// List(1,2,3,4),2
// )
//2.选择默认分区
val rdd = sc.makeRDD(
List(1,2,3,4)
)
//将处理的数据保存成分区文件
rdd.saveAsTextFile("spark-core/dates/output")
//TODO 关闭环境
sc.stop()
}
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)