由于Flink亦是内存计算,所以要有一套完善的数据存储机制。Flink利用CheckPoint机制数据持久化,以便于出现异常,应用挂掉时,做数据恢复。所谓CheckPoint(可以理解为CheckPoint是把State数据持久化存储了)则表示了一个FlinkJob在一个特定时刻的一份全局状态快照,即包含了所有Task/Operator的状态。

一、CheckPoint的原理

Flink中基于异步轻量级的分布式快照技术提供了Checkpoints容错机制,分布式快照可以将同一时间点Task/Operator的状态数据全局统一快照处理,包括前面提到的KeyedState和OperatorState。Flink会在输入的数据集上间隔性地生成checkpointbarrier,通过栅栏(barrier)将间隔时间段内的数据划分到相应的checkpoint中。当异常中断重启时,会从最近的一次checkpoint中读取保存的数据,继续原有数据状态的执行。

如下图,是一条数据的流向,flink的Source读取并接收数据,然后data依次流向各个算子并执行计算逻辑。同时JM负责每隔一段时间向数据流中插入一个barrier,两个barrier之间的数据间隔视为一次checkpoint。当data流进Operator Area时候,会依次经过各个算子,每个算子遇到barrier便对之前的数据计算状态做一次checkpoint,直到所有的算子都做完checkpoint,这样一次整体的checkpoint才完成了。遇到第二个barrier重复操作。

 

二、CheckPoint的设置

默认情况下Flink不开启checkpoint的,需要我们自己在程序中开启,也可以调整其他相关参数值

(1)开启Checkpoint并指定检查点间隔

开启检查点并且指定检查点时间间隔为1000ms,根据实际情况自行选择,如果状态比较大,则建议适当增加该值

env.enableCheckpointing(1000);

 (2) 设置Checkpoint的模式

 exactly-once:保证整个应用内端到端的数据一致性,这种情况比较适合于数据要求比较高,不允许出现丢数据或者数据重复

 由于一致性要求高,flink内部要做exactly-once检查,因此Flink的性能也相对较弱。对于大多数应用程序,只需一次就可以了,flink的Checkpoint默认模式为exactly-once。

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

at-least-once:语义更适合于时廷和吞吐量要求非常高但对数据的一致性要求不高的场景。flink内部无须exactly-once检查。

env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

(3)设置超时时间

 指定了每次Checkpoint 执行过程中的上限时间范围,一旦Checkpoint执行时间超过该阈值,Flink将会中断Checkpoint过程,并按照超时处理。默认为10分钟超时

env.getCheckpointConfig().setCheckpointTimeout(600000);

 (4)检查点之间最小时间间隔

该参数主要目的是设定两个Checkpoint之间的最小时间间隔,防止出现例如状态数据过大而导致Checkpoint执行时间过长,从而导致Checkpoint积压过多,最终Flink应用密集地触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能。

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

 (5)最大并行执行的检查点数量  

设置flink同时最大执行的Checkpoint数量,默认情况下只有一个检查点可以运行。用户可以根据实际情况,适当调整Checkpoint的并行数量,进而提升Checkpoint整体的效率

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

(6)是否删除Checkpoint中保存的数据

RETAIN_ON_CANCELLATION:表示一旦 Flink 处理程序被 cancel 后,会保留CheckPoint数据,以便根据实际需要恢复到指定的CheckPoint。
DELETE_ON_CANCELLATION:表示一旦 Flink 处理程序被 cancel 后,会删除CheckPoint 数据,只有Job执行失败的时候才会保存CheckPoint

 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

(7)设置可以容忍的检查的失败数

  CheckPoint超过这个数量则系统自动关闭和停止任务 

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);

(8)是否优先采用更近savepoint恢复Checkpoint

true当有更近的savepoint时,优先采用savepoint来恢复成检查点。false,即使有更新的savepoint,不采用savepoint恢复Checkpoint,默认false

 env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

更多配置see:https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#state-backend-1 

 

三、CheckPoint的存储

默认情况下,State会保存在TaskManager的内存中,CheckPoint会存储在JobManager的内存中。State和CheckPoint的存储位置取决于StateBackend的配置。Flink一共提供了3种StateBackend。包括基于内存的MemoryStateBackend、基于文件系统的FsStateBackend,以及基于RockDB作为存储介质的RocksDBState-Backend

1、基于内存的MemoryStateBackend

基于内存的状态管理具有非常快速和高效的特点,但也具有非常多的限制,最主要的就是内存的容量限制,一旦存储的状态数据过多就会导致系统内存溢出等问题,从而影响整个应用的正常运行。同时如果机器出现问题,整个主机内存中的状态数据都会丢失,进而无法恢复任务中的状态数据。因此从数据安全的角度建议用户尽可能地避免在生产环境中使用MemoryStateBackend。适用本地开发和调试,状态比较少的作业场景。

MemoryStateBackend将State作为Java对象保存(在堆内存),存储着key/value状态、window运算符、触发器等的哈希表。在Checkpoint时,State Backend将对State进行快照,并将其作为checkpoint发送到JobManager机器上,JobManager将这个State数据存储在Java堆内存。MemoryStateBackend默认使用异步快照,来避免阻塞管道。如果需要修改,可以在MemoryStateBackend的构造函数将布尔值改为false(仅用于调试)。

快照有两种方式:

异步快照方式时,operator操作符在做快照的同时也会处理新流入的数据,默认异步方式
同步快照方式:operator操作符在做快照的时候,不会处理新流入的数据,同步快照会增加数据处理的延迟度。 

2、基于文件系统的FsStateBackend(生产常用)

FsStateBackend将正在运行的数据保存在TaskManager的内存中。在checkpoint时,它将State的快照写入文件系统对应的目录下的文件中。最小元数据存储在JobManager的内存中(如果是高可用模式下,元数据存储在checkpoint中)。FsStateBackend默认使用异步快照,来避免阻塞处理的管道。如果需要禁用,在FsStateBackend构造方法中将布尔值设为false

和MemoryStateBackend有所不同,FsStateBackend是基于文件系统的一种状态管理器,这里的文件系统可以是本地文件系统,也可以是HDFS分布式文件系统。FsStateBackend更适合任务状态非常大的情况,例如应用中含有时间范围非常长的窗口计算,或Key/value的State状态数据量非常大的场景。

3、基于RocksDB的RocksDBStateBackend 

RocksDBStateBackend是Flink中内置的第三方状态管理器,和前面的状态管理器不同,RocksDBStateBackend需要单独引入相关的依赖包到工程中。

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.9.1</version>
</dependency>

RocksDBStateBackend采用异步的方式进行状态数据的Snapshot,任务中的状态数据首先被写入本地RockDB中,这样在RockDB仅会存储正在进行计算的热数据,而需要进行CheckPoint的时候,会把本地的数据直接复制到远端的FileSystem中。

与FsStateBackend 相比,RocksDBStateBackend在性能上要比FsStateBackend高一些,主要是因为借助于RocksDB在本地存储了最新热数据,然后通过异步的方式再同步到文件系统中,但RocksDBStateBackend和MemoryStateBackend相比性能就会较弱一些。RocksDB克服了State受内存限制的缺点,同时又能够持久化到远端文件系统中,推荐在生产中使用。

此种方式kv state需要由rockdb数据库来管理,这是和内存或file backend最大的不同,即状态数据是直接写入到rockdb的,不像前面两种,只有在checkpoint的时候才会将数据保存到指定的backend。RocksDBStateBackend使用RocksDB数据库保存数据,这个数据库保存在TaskManager的数据目录中。注意的是:RocksDB,它是一个高性能的Key-Value数据库。数据会放到先内存当中,在一定条件下触发写到磁盘文件上。
在 checkpoint时, 整个 RocksDB数据库的数据会快照一份, 然后存到配置的文件系统中(一般是 hdfs)。同时, Apache Flink将一些最小的元数据存储在 JobManager 的内存或者 Zookeeper 中(对于高可用性情况)。RocksD始终配置为执行异步快照

4、状态后端的设置

(1)全局配置StateBackend

以上是在代码中设置状态存储配置,是基于单job配置状态后端。三种状态又可以全局配置,在商业生产中通常我们使用的是全局配置,无需开发者设置。通过conf/flink-conf.yaml 中指定state.backend,可以设置默认的状态后端

state.backend: filesystem

filesystem表示使用FsStateBackend,jobmanager表示使用MemoryStateBackend,rocksdb表示使用RocksDBStateBackend。

(2) 代码中设置

        //使用MemoryStateBackend的方式进行存储
        StateBackend memoryStateBackend = new MemoryStateBackend(10*1024*1024);
        env.setStateBackend(memoryStateBackend);

        //使用FsStateBackend的方式进行存储
        StateBackend fsStateBackend = new FsStateBackend("hdfs://test.xxx.local:9000/checkpoint/cp1");
        env.setStateBackend(fsStateBackend);
         
        //使用RocksDBStateBackend的方式进行存储
        StateBackend rocksDBStateBackend = new RocksDBStateBackend("hdfs://test.xxx.local:9000/checkpoint/cp1");
        env.setStateBackend(rocksDBStateBackend );

四、与SavePoint的区别

1、Checkpoint
检查点的主要目的是在job意外失败时提供恢复机制。
Checkpoint的生命周期由Flink管理,即Flink创建,拥有和发布Checkpoint - 无需用户交互。
作为一种恢复和定期触发的方法,Checkpoint主要的设计目标是:创建checkpoint,是轻量级的和尽可能快地恢复

2、Savepoints
savepoints是检查点的一种特殊实现,底层实现其实也是使用Checkpoints的机制。由用户创建,拥有和删除。
savepoints一般是用户以手工命令的方式触发Checkpoint,并将结果持久化到指定的存储路径中
通常在Flink版本升级或者维护暂停,避免因为升级或维护导致应用中止而无法恢复原有计算状态
从概念上讲,Savepoints的生成和恢复成本可能更高,并且更多地关注可移植性和对前面提到的作业更改的支持

 

五、Checkpoint的应用

1、Checkpoint参数设置

以下为Checkpoint的参数设置, Checkpoint的任务代码不在列举

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkEnvConf {

    public static StreamExecutionEnvironment getEnv(){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置每次checkpoint间隔,每隔60s一次
        env.enableCheckpointing(1000 * 60);
        //设置checkpoint模式,exactly_once只处理一次
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置时间语义,设置事件时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //设置失败重启策略,尝试重启5次,每隔120s重启1次
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, org.apache.flink.api.common.time.Time.seconds(120)));
        //设置checkpoint之心超时时间,设置10分钟超时
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        //设置两次checkpoint的最小间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        //设置最大并行的checkpoint数量
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //设置是够保存checkpoint的数据,RETAIN_ON_CANCELLATION会保留CheckPoint数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //设置checkpoint可以容忍失败的次数
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
        //设置若有最新的savepoint,优先采用savepoint来恢复成检查点
        env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
        return env;
    }
}

2、Flink UI查看设置Checkpoint参数

(1)访问Flink UI查看Checkpoint情况

 

(2)  点击Checkpoint,可以查看Checkpoint整体概况

 (3) 点击History,可以查看历史每次Checkpoint的情况,如完成情况、执行时间、耗费时长、存储大小等

 有测试环境的同学还可以中断flink应用,验证下Checkpoint机制

 

参考:

https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/state/checkpointing.html

https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html

 

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐