Flink学习笔记(二):Flink内存模型
Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。Flink 总内存(Total Flink Memory)包括 JVM 堆内存(Heap Memory)和堆外内存(Off-Heap Memory)。如配置总内存中所述,另一种配置 JobManager 内存的方式
文章目录
1、配置总内存
Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。 Flink 总内存(Total Flink Memory)包括 JVM 堆内存(Heap Memory)和堆外内存(Off-Heap Memory)。 其中堆外内存包括直接内存(Direct Memory)和本地内存(Native Memory)。详细的配置参数:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/deployment/config.html
配置 Flink 进程内存最简单的方法是指定以下两个配置项中的任意一个:
配置项 | TaskManager 配置参数 | JobManager 配置参数 |
---|---|---|
Flink 总内存 | taskmanager.memory.flink.size | jobmanager.memory.flink.size |
进程总内存 | taskmanager.memory.process.size | jobmanager.memory.process.size |
Flink 启动需要明确配置:
TaskManager | JobManager |
---|---|
taskmanager.memory.flink.size | jobmanager.memory.flink.size |
taskmanager.memory.process.size | jobmanager.memory.process.size |
taskmanager.memory.task.heap.size 和 taskmanager.memory.managed.size | jobmanager.memory.heap.size |
不建议同时设置进程总内存和 Flink 总内存。 这可能会造成内存配置冲突,从而导致部署失败。 额外配置其他内存部分时,同样需要注意可能产生的配置冲突。
2、JobManager 内存模型
如上图所示,下表中列出了 Flink JobManager 内存模型的所有组成部分,以及影响其大小的相关配置参数。
组成部分 | 配置参数 | 描述 |
---|---|---|
JVM 堆内存 | jobmanager.memory.heap.size | JobManager 的 JVM 堆内存。 |
堆外内存 | jobmanager.memory.off-heap.size | JobManager 的堆外内存(直接内存或本地内存)。 |
JVM Metaspace | jobmanager.memory.jvm-metaspace.size | Flink JVM 进程的 Metaspace。 |
JVM 开销 | jobmanager.memory.jvm-overhead.min、jobmanager.memory.jvm-overhead.max、jobmanager.memory.jvm-overhead.fraction | 用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存的受限的等比内存部分。 |
如配置总内存中所述,另一种配置 JobManager 内存的方式是明确指定 JVM 堆内存的大小(jobmanager.memory.heap.size)。 通过这种方式,用户可以更好地掌控用于以下用途的 JVM 堆内存大小。
3、TaskManager 内存模型
如上图所示,下表中列出了 Flink TaskManager 内存模型的所有组成部分,以及影响其大小的相关配置参数。
组成部分 | 配置参数 | 描述 |
---|---|---|
框架堆内存(Framework Heap Memory) | taskmanager.memory.framework.heap.size | 用于 Flink 框架的 JVM 堆内存(进阶配置)。 |
任务堆内存(Task Heap Memory) | taskmanager.memory.task.heap.size | 用于 Flink 应用的算子及用户代码的 JVM 堆内存。 |
托管内存(Managed memory) | taskmanager.memory.managed.size、taskmanager.memory.managed.fraction | 由 Flink 管理的用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存。 |
框架堆外内存(Framework Off-heap Memory) | taskmanager.memory.framework.off-heap.size | 用于 Flink 框架的堆外内存(直接内存或本地内存)(进阶配置)。 |
任务堆外内存(Task Off-heap Memory) | taskmanager.memory.task.off-heap.size | 用于 Flink 应用的算子及用户代码的堆外内存(直接内存或本地内存)。 |
网络内存(Network Memory) | taskmanager.memory.network.min、taskmanager.memory.network.max、taskmanager.memory.network.fraction | 用于任务之间数据传输的直接内存(例如网络传输缓冲)。该内存部分为基于 Flink 总内存的受限的等比内存部分。 |
JVM Metaspace | taskmanager.memory.jvm-metaspace.size | Flink JVM 进程的 Metaspace。 |
JVM 开销 | taskmanager.memory.jvm-overhead.min、taskmanager.memory.jvm-overhead.max、taskmanager.memory.jvm-overhead.fraction | 用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存的受限的等比内存部分。 |
我们可以看到,有些内存部分的大小可以直接通过一个配置参数进行设置,有些则需要根据多个参数进行调整。通常情况下,不建议对框架堆内存和框架堆外内存进行调整。 除非你非常肯定 Flink 的内部数据结构及操作需要更多的内存。 这可能与具体的部署环境及作业结构有关,例如非常高的并发度。 此外,Flink 的部分依赖(例如 Hadoop)在某些特定的情况下也可能会需要更多的直接内存或本地内存。
4、WebUI 展示内存
JobManager 内存直观展示
TaskManager 内存直观展示
树状图表示:
5、Flink On YARN 模式下内存分配
如果是 Flink On YARN 模式下:
taskmanager.memory.process.size = 4096 MB = 4G
taskmanager.memory.network.fraction = 0.15
taskmanager.memory.managed.fraction = 0.45
然后根据以上参数,就可以计算得到各部分的内存大小:
taskmanager.memory.jvm-overhead = 4096 * 0.1 = 409.6 MB
taskmanager.memory.flink.size = 4096 - 409.6 - 256 = 3430.4 MB
taskmanager.memory.network = 3430.4 * 0.15 = 514.56 MB
taskmanager.memory.managed = 3430.4 * 0.45 = 1543.68 MB
taskmanager.memory.task.heap.size = 3430.4 - 128 * 2 - 1543.68 - 514.56 = 1116.16 MB
6、Flink On Yarn 集群消耗资源估算
6.1、Flink 集群
Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 TaskManager。
Client 不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给 JobManager。之后,客户端可以断开连接(分离模式),或保持连接来接收进程报告(附加模式)。客户端可以作为触发执行 Java/Scala 程序的一部分运行,也可以在命令行进程./bin/flink run …中运行。
可以通过多种方式启动 JobManager 和 TaskManager:直接在机器上作为standalone 集群启动、在容器中启动、或者通过YARN或Mesos等资源框架管理并启动。TaskManager 连接到 JobManagers,宣布自己可用,并被分配工作。
(1)JobManager
JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:
-
ResourceManager
ResourceManager 负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots,这是 Flink 集群中资源调度的单位(请参考TaskManagers)。Flink 为不同的环境和资源提供者(例如 YARN、Mesos、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。 -
Dispatcher
Dispatcher 提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。 -
JobMaster
JobMaster 负责管理单个JobGraph的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。
始终至少有一个 JobManager。高可用(HA)设置中可能有多个 JobManager,其中一个始终是 leader,其他的则是 standby(请参考 高可用(HA))。
(2)JobManager
TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。
必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子。
(3)Tasks 和算子链
对于分布式执行,Flink 将算子的 subtasks 链接成 tasks。每个 task 由一个线程执行。将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量。
(4)Task Slots 和资源
每个 worker(TaskManager)都是一个 JVM 进程,可以在单独的线程中执行一个或多个 subtask。为了控制一个 TaskManager 中接受多少个 task,就有了所谓的 task slots(至少一个)。
6.2、资源分配
- 每一个 Flink Application 都包含 至少一个 JobManager (若 HA 配置则可包含多个 JobManagers)。若有多个 JobManagers ,则 有且仅有一个 JobManager 处于 Running 状态,其他的 JobManager 则处于 Standby 状态;
- 每一个处于 Running 状态的 JobManager 管理着 一个或多个 TaskManager。TaskManager 的本质是一个 JVM 进程,可以执行一个或多个线程。TaskManager 可以用于对 Memory 进行隔离;
- 每一个 TaskManager 可以执行 一个或多个 Slot。Slot 的本质是由 JVM 进程所生成的线程。每个 Slot 可以将 TaskManager 管理的的 Total Memory 进行平均分配,但不会对 CPU 进行隔离。在同一个 TaskManager 中的 Slots 共享 TCP 连接 (through multiplexing) 、心跳信息、数据集和数据结构;
- 每一个 Slot 内部可以执行 零个或一个 Pipeline。 每一个 Pipeline 中又可以包含 任意数量的 有前后关联关系的 Tasks。注意一个 Flink Cluster 所能达到的最大并行度数量等于所有 TaskManager 中全部 Slot 的数量的总和。
6.3、Flink 提交 Yarn 集群的相关命令
在使用 Yarn 作为集群资源管理器时,时常会使用如下命令对 Flink Application 进行提交,主要参数如下:
flink run -m yarn-cluster -ys 2 -p 1 -yjm 1G -ytm 2G
参数 | 解释 | 说明 |
---|---|---|
-yjm,–yarnjobManagerMemory | Memory for JobManager Container with optional unit (default: MB) | JobManager 内存容量 (在一个 Flink Application 中处于 Running 状态的 JobManager 只有一个) |
-ytm,–yarntaskManagerMemory | Memory per TaskManager Container with optional unit (default: MB) | 每一个 TaskManager 的内存容量 |
-ys,–yarnslots | Number of slots per TaskManager | 每一个 TaskManager 中的 Slot 数量 |
-p,–parallelism | The parallelism with which to run the program. Optional flag to override the default value specified in the configuration. | 任务执行的并行度 |
该命令的各个参数表示的含义如下 (使用 flink --help 命令即可阅读)。
Flink 启动参考配置参数(带有 kerberos 认证可根据实际情况需要删减):
/home/dev/soft/flink/bin/flink run \
-m yarn-cluster \
-yD akka.ask.timeout='360 s' \
-yD akka.framesize=20485760b \
-yD blob.fetch.backlog=1000 \
-yD blob.fetch.num-concurrent=500 \
-yD blob.fetch.retries=50 \
-yD blob.storage.directory=/data1/flinkdir \
-yD env.java.opts.jobmanager='-XX:ErrorFile=/tmp/java_error_%p.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=512m -XX:+UseG1GC -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=50 -XX:+ExplicitGCInvokesConcurrent -XX:+AlwaysPreTouch -XX:AutoBoxCacheMax=20000 -XX:G1HeapWastePercent=5 -XX:G1ReservePercent=25 -Dfile.encoding=UTF-8' \
-yD env.java.opts.taskmanager='-XX:ErrorFile=/tmp/java_error_%p.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=1024m -XX:+UseG1GC -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=50 -XX:+ExplicitGCInvokesConcurrent -XX:+AlwaysPreTouch -XX:AutoBoxCacheMax=20000 -Dsun.security.krb5.debug=false -Dfile.encoding=UTF-8' \
-yD env.java.opts='-XX:ErrorFile=/tmp/java_error_%p.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=1024m -Dfile.encoding=UTF-8' \
-yD execution.attached=false \
-yD execution.buffer-timeout='1000 ms' \
-yD execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
-yD execution.checkpointing.interval='30 min' \
-yD execution.checkpointing.max-concurrent-checkpoints=1 \
-yD execution.checkpointing.min-pause='2 min' \
-yD execution.checkpointing.mode=EXACTLY_ONCE \
-yD execution.checkpointing.timeout='28 min' \
-yD execution.checkpointing.tolerable-failed-checkpoints=8 \
-yD execution.checkpointing.unaligned=true \
-yD execution.checkpointing.unaligned.forced=true \
-yD heartbeat.interval=60000 \
-yD heartbeat.rpc-failure-threshold=5 \
-yD heartbeat.timeout=340000 \
-yD io.tmp.dirs=/data1/flinkdir \
-yD jobmanager.heap.size=1024m \
-yD jobmanager.memory.jvm-metaspace.size=268435456b \
-yD jobmanager.memory.jvm-overhead.max=1073741824b \
-yD jobmanager.memory.jvm-overhead.min=1073741824b \
-yD jobmanager.memory.network.fraction=0.2 \
-yD jobmanager.memory.network.max=6GB \
-yD jobmanager.memory.off-heap.size=134217728b \
-yD jobmanager.memory.process.size='18360 mb' \
-yD metrics.reporter.promgateway.deleteOnShutdown=true \
-yD metrics.reporter.promgateway.factory.class=org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory \
-yD metrics.reporter.promgateway.filter.includes=\*:dqc\*,uptime,taskSlotsTotal,numRegisteredTaskManagers,taskSlotsAvailable,numberOfFailedCheckpoints,numRestarts,lastCheckpointDuration,Used,Max,Total,Count,Time:gauge,meter,counter,histogram \
-yD metrics.reporter.promgateway.groupingKey="yarn=${yarn};hdfs=${hdfs};job_name=TEST-broadcast-${jobName//./-}-${provId}" \
-yD metrics.reporter.promgateway.host=172.17.xxxx.xxxx \
-yD metrics.reporter.promgateway.interval='60 SECONDS' \
-yD metrics.reporter.promgateway.jobName="TEST-broadcast-${jobName//./-}-${provId}" \
-yD metrics.reporter.promgateway.port=10080 \
-yD metrics.reporter.promgateway.randomJobNameSuffix=true \
-yD pipeline.name="TEST-broadcast-${jobName//./-}-${provId}" \
-yD pipeline.object-reuse=true \
-yD rest.flamegraph.enabled=true \
-yD rest.server.numThreads=20 \
-yD restart-strategy.failure-rate.delay='60 s' \
-yD restart-strategy.failure-rate.failure-rate-interval='3 min' \
-yD restart-strategy.failure-rate.max-failures-per-interval=3 \
-yD restart-strategy=failure-rate \
-yD security.kerberos.krb5-conf.path=/home/dev/kerberos/krb5.conf \
-yD security.kerberos.login.contexts=Client,KafkaClient \
-yD security.kerberos.login.keytab=/home/dev/kerberos/xxxx.keytab \
-yD security.kerberos.login.principal=xxxx \
-yD security.kerberos.login.use-ticket-cache=false \
-yD state.backend.async=true \
-yD state.backend=hashmap \
-yD state.checkpoints.dir=hdfs://xxxx/flink/checkpoint/${jobName//.//}/$provId \
-yD state.checkpoint-storage=filesystem \
-yD state.checkpoints.num-retained=3 \
-yD state.savepoints.dir=hdfs://xxxx/flink/savepoint/${jobName//.//}/$provId \
-yD table.exec.hive.fallback-mapred-writer=false \
-yD task.manager.memory.segment-size=4mb \
-yD taskmanager.memory.framework.off-heap.size=1GB \
-yD taskmanager.memory.managed.fraction=0.2 \
-yD taskmanager.memory.network.fraction=0.075 \
-yD taskmanager.memory.network.max=16GB \
-yD taskmanager.memory.process.size='50 gb' \
-yD taskmanager.network.netty.client.connectTimeoutSec=600 \
-yD taskmanager.network.request-backoff.max=120000 \
-yD taskmanager.network.retries=100 \
-yD taskmanager.numberOfTaskSlots=10 \
-yD web.timeout=900000 \
-yD web.upload.dir=/data1/flinkdir \
-yD yarn.application.name="TEST-broadcast-${jobName//./-}-${provId}" \
-yD yarn.application.queue=$yarnQueue \
-yD yarn.application-attempts=10 \
6.4、Flink On Yarn 集群的资源计算公式
-
JobManager 的内存计算
JobManager 的数量 = 1 (固定,由于一个 Flink Application 只能有一个 JobManager)
JobManager 的内存总量 = 1 * JobManager 的内存大小 = 1 * yjm -
TaskManager 的内存计算
TaskManager 的数量 = (设置的并行度总数 / 每个 TaskManager 的 Slot 数量) = (p / ys) (Ps: p / ys 有可能为非整数,故需要向下取整)
TaskManager 的内存总量 = TaskManager 的数量 * 每个 TaskManager 的内存容量 = TaskManager 的数量 * ytm -
Slot 所占用的内存计算
每个 Slot 的内存容量 = 每个 TaskManager 的内存容量 / 每一个 TaskManager 中的 Slot 数量 = ytm / ys
Slot 的总数量 = 最大并行度数量 = p
Slot 所占用的总内存容量 = TaskManager 的内存总量 = (p / ys) * ytm -
yarn vcore 总数量计算
yarn vcore 总数量 = Slot 的总数量 + JobManager 占用的 vcore 数量 (与 Yarn 的 minimum Allocation 有关) = p + m (不足则取 Yarn 的最小 vcore 分配数量) -
yarn container 的总数量计算
yarn container 的总数量 = TaskManager 的数量 + JobManager 的数量 = 1 + (p / ys) = (p / ys) + 1
yarn 的内存总量 = JobManager 的数量 * yjm (与 Yarn 的 minimum Allocation 有关) + TaskManager 的数量 * ytm = 1 * yjm (不足则取 Yarn 的最小 Memory 分配数量) + (p / ys) * ytm
Ps: 根据实际应用经验,一般 Yarn 的一个 vcore 搭配 2G 内存是最为有效率的配置方法。
实战应用:
flink-conf.yaml 配置项:
# Total size of the JobManager (JobMaster / ResourceManager / Dispatcher) process.
jobmanager.memory.process.size: 2048m
# Total size of the TaskManager process.
taskmanager.memory.process.size: 20480m
# Managed Memory size for TaskExecutors. This is the size of off-heap memory managed by the memory manager, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend. Memory consumers can either allocate memory from the memory manager in the form of MemorySegments, or reserve bytes from the memory manager and keep their memory usage within that boundary. If unspecified, it will be derived to make up the configured fraction of the Total Flink Memory.
taskmanager.memory.managed.size: 4096m
# The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores).
taskmanager.numberOfTaskSlots: 10
Yarn 集群相关配置项:
Minimum Allocation <memory:4096, vCores:2>
Maximum Allocation <memory:163840, vCores:96>
假设 Flink 流任务在 FlinkSQL 中设置的并行度为 10 (parallelism = 10)。根据计算公式:
JobManager 的数量 = 1
TaskManager 的数量 = (p / ys) = 1 (注:ys 配置是 taskmanager.numberOfTaskSlots = 10)
Slot 的总数量 = p = 10
yarn vcore 的总数量 = Slot 的总数量 + 1 = p + 1 = p + 2 (向上取至 Yarn 最小分配 vcore 数) = 12
yarn container 的总数量 = TaskManager 的数量 + JobManager 的数量 = 2
yarn 的内存总量 = JobManager 的数量 * yjm + TaskManager 的数量 * ytm = 1 * 2048m + 1 * 20480m = 1 * 4096m (向上取至 Yarn 最小分配内存数) + 1 * 20480m = 24576m
6.5、Flink On Yarn 集群三种部署模式
Flink 应用程序的作业可以被提交到长期运行的 Flink Session 集群
、专用的 Flink Job 集群
或 Flink Application 集群
。这些选项之间的差异主要与集群的生命周期和资源隔离保证有关。
Flink Session 集群
-
集群生命周期:在 Flink Session 集群中,客户端连接到一个预先存在的、长期运行的集群,该集群可以接受多个作业提交。即使所有作业完成后,集群(和 JobManager)仍将继续运行直到手动停止 session 为止。因此,Flink Session 集群的寿命不受任何 Flink 作业寿命的约束。
-
资源隔离:TaskManager slot 由 ResourceManager 在提交作业时分配,并在作业完成时释放。由于所有作业都共享同一集群,因此在集群资源方面存在一些竞争 — 例如提交工作阶段的网络带宽。此共享设置的局限性在于,如果 TaskManager 崩溃,则在此 TaskManager 上运行 task 的所有作业都将失败;类似的,如果 JobManager 上发生一些致命错误,它将影响集群中正在运行的所有作业。
-
其他注意事项:拥有一个预先存在的集群可以节省大量时间申请资源和启动 TaskManager。有种场景很重要,作业执行时间短并且启动时间长会对端到端的用户体验产生负面的影响 — 就像对简短查询的交互式分析一样,希望作业可以使用现有资源快速执行计算。
注意: 以前,Flink Session 集群也被称为 session 模式下的 Flink 集群。
Flink Job 集群
-
集群生命周期:在 Flink Job 集群中,可用的集群管理器(例如 YARN 或 Kubernetes)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。在这里,客户端首先从集群管理器请求资源启动 JobManager,然后将作业提交给在这个进程中运行的 Dispatcher。然后根据作业的资源请求惰性的分配 TaskManager。一旦作业完成,Flink Job 集群将被拆除。
-
资源隔离:JobManager 中的致命错误仅影响在 Flink Job 集群中运行的一个作业。
-
其他注意事项:由于 ResourceManager 必须应用并等待外部资源管理组件来启动 TaskManager 进程和分配资源,因此 Flink Job 集群更适合长期运行、具有高稳定性要求且对较长的启动时间不敏感的大型作业。
注意: 以前,Flink Job 集群也被称为 job (or per-job) 模式下的 Flink 集群。
Flink Application 集群
-
集群生命周期:Flink Application 集群是专用的 Flink 集群,仅从 Flink 应用程序执行作业,并且 main()方法在集群上而不是客户端上运行。提交作业是一个单步骤过程:无需先启动 Flink 集群,然后将作业提交到现有的 session 集群;相反,将应用程序逻辑和依赖打包成一个可执行的作业 JAR 中,并且集群入口(ApplicationClusterEntryPoint)负责调用 main()方法来提取 JobGraph。例如,这允许你像在 Kubernetes 上部署任何其他应用程序一样部署 Flink 应用程序。因此,Flink Application 集群的寿命与 Flink 应用程序的寿命有关。
-
资源隔离:在 Flink Application 集群中,ResourceManager 和 Dispatcher 作用于单个的 Flink 应用程序,相比于 Flink Session 集群,它提供了更好的隔离。
注意: Flink Job 集群可以看做是 Flink Application 集群”客户端运行“的替代方案。
模式 | 介绍 | 特点 | 优点 | 缺点 | 应用场景 |
---|---|---|---|---|---|
Session 模式 | JobManager 与 TaskManager 共享;客户端通过 RPC 或者 Rest API 连接集群的管理节点;Deployer 需要上传依赖的 Dependences jar;Deployer 需要生成 job Graph,并提交到管理节点;JobManager 的生命周期不受提交的 Job 影响,会长期运行。 | 需要事先申请资源,使用 Flink 中的 yarn-session(yarn客户端),启动 JobManager 和 TaskManger | 不需要每次递交作业申请资源,而是使用已经申请好的资源,从而提高执行效率 | 作业执行完成以后,资源不会被释放,因此一直会占用系统资源,资源隔离相对较差 | 适合作业递交比较频繁的场景,小作业比较多的场景 |
Per-Job 模式 | 单个 Job 独享 JobManager 与 TaskManager;TaskManager 中 Slot 资源根据 Job 指定;Deployer 需要上传依赖的 Dependences Jar;客户端生成JobGraph,并提交到管理节点 ;JobManager 的生命周期和 Job 生命周期绑定。 | 每次递交作业都需要申请一次资源 | 作业运行完成,资源会立刻被释放,不会一直占用系统资源。不同Job之间资源隔离充分,每个 Job 的 TaskManager Slots 数量可以不同 | 每次递交作业都需要申请资源,会影响执行效率,因为申请资源需要消耗时间 | 适合作业比较少的场景、大作业的场景 |
Application模式 | 用户程序的 main 方法将在集群中而不是客户端运行,用户将程序逻辑和依赖打包进一个可执行的 jar 包里,集群的入口程序 (ApplicationClusterEntryPoint) 负责调用其中的 main 方法来生成 JobGraph。 | Application 模式为每个提交的应用程序创建一个集群,该集群可以看作是在特定应用程序的作业之间共享的会话集群,并在应用程序完成时终止 | Application 模式在不同应用之间提供了资源隔离和负载平衡保证。在特定一个应用程序上,JobManager 执行 main() 可以节省所需的 CPU 周期,还可以节省本地下载依赖项所需的带宽 | 客户端执行 main() 方法来获取 flink 运行时所需的依赖项,并生成 JobGraph,提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。尤其在大量用户共享客户端时,问题更加突出 | flink-1.11 支持Kubernetes 的 Application 模式下运行 |
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)