大数据之Spark调优:Job 优化之 整体优化
在 Spark 项目开发阶段,可以使用 client 模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的 Task 数据本地化的级别,如果大部分都是PROCESS_LOCAL、NODE_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是RACK_LOCAL、ANY,那么需要对本地化的等待时长进行调节,应该是反复调节,每次调节完以后,再来运行观察日志,看看大部分的
调节数据本地化等待时长
在 Spark 项目开发阶段,可以使用 client 模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的 Task 数据本地化的级别,如果大部分都是PROCESS_LOCAL、NODE_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是RACK_LOCAL、ANY,那么需要对本地化的等待时长进行调节,应该是反复调节,每次调节完以后,再来运行观察日志,看看大部分的 task 的本地化级别有没有提升;看看,整个spark 作业的运行时间有没有缩短。
注意过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得Spark 作业的运行时间反而增加了。
下面几个参数,默认都是 3s,可以改成如下:
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num executors 3 --executor-cores 2 --executor-memory 6g --class
com.atguigu.sparktuning.job.LocalityWaitTuning spark-tuning-1.0-SNAPSHOT jar-with-dependencies.jar
具体代码:
package com.atguigu.sparktuning.job
import com.atguigu.sparktuning.bean.CoursePay
import com.atguigu.sparktuning.utils.InitUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
object LocalityWaitTuning {
def main( args: Array[String] ): Unit = {
val sparkConf = new SparkConf().setAppName("LocalityWaitTuning")
// 分别打包测试
// .set("spark.locality.wait", "1")
// .set("spark.locality.wait.process", "1")
// .set("spark.locality.wait.node", "1")
// .set("spark.locality.wait.rack", "1")
// 分别打包测试
.set("spark.locality.wait", "6s")
.set("spark.locality.wait.process", "60s")
.set("spark.locality.wait.node", "30s")
.set("spark.locality.wait.rack", "20s")
val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)
import sparkSession.implicits._
val ds: Dataset[CoursePay] = sparkSession
.read.json("/sparkdata/coursepay.log").as[CoursePay]
ds.cache()
ds.foreachPartition(( p: Iterator[CoursePay] ) => p.foreach(item => println(item.orderid)))
}
}
使用堆外内存
1、堆外内存参数
讲到堆外内存,就必须去提一个东西,那就是去 yarn 申请资源的单位,容器。Spark on yarn 模式,一个容器到底申请多少内存资源。
一个容器最多可以申请多大资源,是由 yarn 参数 yarn.scheduler.maximum-allocation mb 决定, 需要满足:
spark.executor.memoryOverhead + spark.executor.memory + spark.memory.offHeap.size≤ yarn.scheduler.maximum-allocation-mb
参数解释:
➢ spark.executor.memory:提交任务时指定的堆内内存。
➢ spark.executor.memoryOverhead:堆外内存参数,内存额外开销。
默认开启,默认值为 spark.executor.memory*0.1 并且会与最小值 384mb 做对比,取最大值。所以 spark on yarn 任务堆内内存申请 1 个 g,而实际去 yarn 申请的内存大于 1 个 g 的原因。
➢ spark.memory.offHeap.size : 堆 外 内 存 参 数 , spark 中 默 认 关 闭 , 需 要 将spark.memory.enable.offheap.enable 参数设置为 true。
注意:很多网上资料说 spark.executor.memoryOverhead 包含 spark.memory.offHeap.size,这是由版本区别的,仅限于 spark3.0 之前的版本。3.0 之后就发生改变,实际去 yarn 申请
的内存资源由三个参数相加。
测试申请容器上限:
yarn.scheduler.maximum-allocation-mb 修改为 7G,将三个参数设为如下,大于 7G,会报错:
spark-submit --master yarn --deploy-mode client --driver-memory 1g --
num-executors 3 --executor-cores 4 --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=2g --
executor-memory 5g --class com.atguigu.sparktuning.join.SMBJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar
将 spark.memory.offHeap.size 修改为 1g 后再次提交:
spark-submit --master yarn --deploy-mode client --driver-memory 1g --
num-executors 3 --executor-cores 4 --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1g --
executor-memory 5g --class com.atguigu.sparktuning.join.SMBJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar
2、使用堆外缓存
使用堆外内存可以减轻垃圾回收的工作,也加快了复制的速度。
当需要缓存非常大的数据量时,虚拟机将承受非常大的 GC 压力,因为虚拟机必须检查每个对象是否可以收集并必须访问所有内存页。本地缓存是最快的,但会给虚拟机带来GC 压力,所以,当你需要处理非常多 GB 的数据量时可以考虑使用堆外内存来进行优化,因为这不会给 Java 垃圾收集器带来任何压力。让 JAVA GC 为应用程序完成工作,缓存操作交给堆外。
spark-submit --master yarn --deploy-mode client --driver-memory 1g --
num-executors 3 --executor-cores 4 --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1g --
executor-memory 5g --class com.atguigu.sparktuning.job.OFFHeapCache spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar
具体代码:
package com.atguigu.sparktuning.job
import com.atguigu.sparktuning.bean.CoursePay
import com.atguigu.sparktuning.utils.InitUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
object OFFHeapCache {
def main( args: Array[String] ): Unit = {
val sparkConf = new SparkConf().setAppName("OFFHeapCache")
// .setMaster("local[*]")
val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)
useOFFHeapMemory(sparkSession)
}
def useOFFHeapMemory( sparkSession: SparkSession ): Unit = {
import sparkSession.implicits._
val result = sparkSession.sql("select * from sparktuning.course_pay").as[CoursePay]
// TODO 指定持久化到 堆外内存
result.persist(StorageLevel.OFF_HEAP)
result.foreachPartition(( p: Iterator[CoursePay] ) => p.foreach(item => println(item.orderid)))
// while (true) {}
}
}
调节连接等待时长
在 Spark 作业运行过程中,Executor 优先从自己本地关联的 BlockManager 中获取某份数据,如果本地 BlockManager 没有的话,会通过 TransferService 远程连接其他节点上Executor 的 BlockManager 来获取数据。
如果 task 在运行过程中创建大量对象或者创建的对象较大,会占用大量的内存,这回导致频繁的垃圾回收,但是垃圾回收会导致工作现场全部停止,也就是说,垃圾回收一旦执行,Spark 的 Executor 进程就会停止工作,无法提供相应,此时,由于没有响应,无法
建立网络连接,会导致网络连接超时。
在生产环境下,有时会遇到 file not found、file lost 这类错误,在这种情况下,很有可能是 Executor 的 BlockManager 在拉取数据的时候,无法建立连接,然后超过默认的连接等待时长 120s 后,宣告数据拉取失败,如果反复尝试都拉取不到数据,可能会导致 Spark 作
业的崩溃。这种情况也可能会导致 DAGScheduler 反复提交几次 stage,TaskScheduler 反复提交几次 task,大大延长了我们的 Spark 作业的运行时间。
为了避免长时间暂停(如 GC)导致的超时,可以考虑调节连接的超时时长,连接等待时长需要在 spark-submit 脚本中进行设置,设置方式可以在提交时指定:
--conf spark.core.connection.ack.wait.timeout=300s
调节连接等待时长后,通常可以避免部分的 XX 文件拉取失败、XX 文件 lost 等报错。
spark-submit --master yarn --deploy-mode client --driver-memory 1g --numexecutors 3 --executor-cores 2 --executor-memory 1g --conf
spark.core.connection.ack.wait.timeout=300s --class com.atguigu.sparktuning.job.AckWaitTuning spark-tuning-1.0-SNAPSHOT-jar with-dependencies.jar
具体代码:
package com.atguigu.sparktuning.job
import com.atguigu.sparktuning.bean.CoursePay
import com.atguigu.sparktuning.utils.InitUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
object AckWaitTuning {
def main( args: Array[String] ): Unit = {
val sparkConf = new SparkConf().setAppName("AckWaitTuning")
// .set("spark.core.connection.ack.wait.timeout", "2s") // 连接超时时间,默认等于spark.network.timeout的值,默认120s
// .setMaster("local[*]")
val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)
useOFFHeapMemory(sparkSession)
}
def useOFFHeapMemory( sparkSession: SparkSession ): Unit = {
import sparkSession.implicits._
val result = sparkSession.sql("select * from sparktuning.course_pay").as[CoursePay]
result.cache()
result.foreachPartition(( p: Iterator[CoursePay] ) => p.foreach(item => println(item.orderid)))
// while (true) {}
}
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)