容错机制之检查点
在Flink中,有一套完整的容错机制来保证故障后的恢复,其中最重要的就是检查点。在流处理中,将之前某个时间点所有的状态保存下来,这份"存档"就是所谓的"检查点"(checkpoint)。 遇到故障重启的时候,我们可以从检查点中"读档",恢复出之前的状态,这样就可以回到当时保存的一刻接着处理数据了。故障恢复之后继续处理的结果,应该与发生故障前完全一致,我们需要"检查"结果的正确性。所以,有时又会把checkpoint叫做"一致性检查点"。
1. 检查点的保存
1.1 周期性的触发保存
"随时存档"确实恢复起来方便,可是需要我们不停地做存档操作。如果每处理一条数据就进行检查点的保存,当大量数据同时到来时,就会耗费很多资源来频繁做检查点,数据处理的速度就会受到影响。所以在Flink中,检查点的保存是周期性触发的,间隔时间可以进行设置。
1.2 保存的时间点
我们应该在所有任务(算子)都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。这样做可以实现一个数据被所有任务(算子)完整地处理完,状态得到了保存。 如果出现故障,我们恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理;我们只需要让源(source)任务向数据源重新提交偏移量、请求重放数据就可以了。当然这需要源任务可以把偏移量作为算子状态保存下来,而且外部数据源能够重置偏移量;kafka就是满足这些要求的一个最好的例子。
1.3 保存的具体流程
检查点的保存,最关键的就是要等所有任务将"同一个数据"处理完毕。下面我们通过一个具体的例子,来详细描述一下检查点具体的保存过程。
回忆一下我们最初实现的统计词频的程序——word count。这里为了方便,我们直接从数据源读入已经分开的一个个单词,例如这里输入的是: "hello","world","hello","flink","hello","world","hello","flink"… 我们所需要的就是每个任务都处理完"hello"之后保存自己的状态。
2. 从检查点恢复状态流程
2.1 正在处理数据流
当我们需要保存检查点时,就是在所有任务处理完同一条数据后,对状态做个快照保存下来。例如我们输入数据为:"hello","world","hello","flink","hello","world","hello","flink"… 我们所需要的就是每个任务都处理完"hello"之后保存自己的状态。 标红的hello表示它正在被处理,偏移量也就是3。
2.2 处理数据过程发生故障
当发生故障时,就需要找到最近一次成功保存的检查点来恢复状态。 例如在前面的word count示例中,我们处理完三个数据后保存了一个检查点。之后继续运行,又正常处理了一个数据"flink",在处理第五个数据"hello"时发生了故障。
这里Source任务已经处理完毕,所以偏移量为5;Map任务也处理完成了。而Sum任务在处理中发生了故障,此时状态并未保存。
2.3 重启应用->读取检查点,重置状态
接下来就需要从检查点来恢复状态了。具体的步骤为:
- 重启应用
遇到故障之后,第一步当然就是重启。我们将应用重新启动后,所有任务的状态会清空。 - 读取检查点,重置状态
找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态中。这样,Flink内部所有任务的状态,就恢复到了保存检查点的那一时刻,也就是刚好处理完第三个数据的时候。
2.4 重置偏移量
从检查点恢复状态后还有一个问题:如果直接继续处理数据,那么保存检查点之后、到发生故障这段时间内的数据,也就是第4、5个数据("flink""hello")就相当于丢掉了;这会造成计算结果的错误。
为了不丢数据,我们应该从保存检查点后开始重新读取数据,这可以通过Source任务向外部数据源重新提交偏移量(offset)来实现。 这样,整个系统的状态已经完全回退到了检查点保存完成的那一时刻。
2.5 继续处理数据
接下来,我们就可以正常处理数据了。首先是重放第4、5个数据,然后继续读取后面的数据。 当处理到第5个数据时,就已经追上了发生故障时的系统状态。之后继续处理,就好像没有发生过故障一样;我们既没有丢掉数据、也没有重复计算数据,这就保证了计算结果的正确性。在分布式系统中,这叫做实现了"精确一次"(exactly-once)的状态一致性保证。
3. 检查点算法
在Flink中,采用了基于Chandy-Lamport算法的分布式快照,可以在不暂停整体流处理的前提下,将状态备份保存到检查点。具体实现上分为了Barrier对齐的精准一次、Barrier对齐的至少一次、非Barrier对齐的精准一次策略。
3.1 检查点分界线(Barrier)
借鉴水位线的设计,在数据流中插入一个特殊的数据结构,专门用来表示触发检查点保存的时间点。收到保存检查点的指令后,Source任务可以在当前数据流中插入这个结构;之后的所有任务只要遇到它就开始对状态做持久化快照保存。由于数据流是保持顺序依次处理的,因此遇到这个标识就代表之前的数据都处理完了,可以保存一个检查点;而在它之后的数据,引起的状态改变就不会体现在这个检查点中,而需要保存到下一个检查点。
这种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫做检查点的"分界线"(Checkpoint Barrier),这也就是Chandy-Lamport算法的实现核心点。 在JobManager中有一个"检查点协调器",专门用来协调处理检查点的相关工作。检查点协调器会定期向TaskManager发出指令,要求保存检查点(带着检查点ID);TaskManager会让所有的source任务把自己的偏移量(算子状态)保存起来,并将带有检查点ID的分界线插入到当前的数据流中,然后像正常的数据一样向下游传递;之后Source任务就可以继续读入新的数据了。
3.2 Barrier对齐的精准一次
watermark指示的是"之前的数据全部到齐了",而barrier指示的是"之前所有数据的状态更改保存入当前检查点":它们都是一个"截止时间"的标志。所以在处理多个分区的传递时,也要以是否还会有数据到来作为一个判断标准。
具体实现上,Flink使用了Chandy-Lamport算法的一种变体,被称为"异步分界线快照"算法。算法的核心就是两个原则:
- 当上游任务向多个并行下游任务发送barrier时,需要广播出去;
- 当多个上游任务向同一个下游任务传递分界线时,需要在下游任务执行"分界线对齐"操作,也就是需要等到所有并行分区的barrier都到齐后,才可以开始状态的保存。
3.2.1 检查点算法的并行场景
为了详细解释检查点算法的原理,我们对之前的word count程序进行扩展,考虑所有算子并行度为2的场景。 我们有两个并行的Source任务,会分别读取两个数据流(或者是一个源的不同分区)。这里每条流中的数据都是一个个的单词:第一条流数据是"hello""hello""hello""flink" "hello",第二条流是交替出现。此时第一条流的Source任务(为了方便,下文中我们直接叫它"Source 1",其它任务类似)读取了3个数据,偏移量为3;而第二条流的Source任务(Source 2)只读取了一个"hello"数据,偏移量为1。
3.2.2 触发检查点保存
JobManager发送指令,触发检查点的保存;Source 任务中插入一个分界线,并将偏移量保存到远程的持久化存储中。
说明
并行的Source任务保存的状态为3和1,表示当前的1号检查点应该包含:第一条流中截至第三个数据、第二条流中截至第一个数据的所有状态更改。可以发现Source任务做这些的时候并不影响后面任务的处理,Sum任务已经处理完了第一条流中传来的(world, 1),对应的状态也有了更改。
3.2.3 分界线向下游传递
状态快照保存完成,分界线向下游传递: 状态存入持久化存储之后,会返回通知给Source任务;Source任务就会向JobManager确认检查点完成,然后跟数据一样把分界线向下游任务传递。
说明
由于Source和Map之间是一对一(forward)的传输关系(这里没有考虑算子链),所以barrier可以直接传递给对应的Map任务。之后Source任务就可以继续读取新的数据了。与此同时,Sum 1已经将第二条流传来的(hello,1)处理完毕,更新了状态。
3.2.4 分界线对齐
向下游多个并行子任务广播分界线,执行分界线对齐: Map任务没有状态,所以直接将barrier继续向下游传递。这时由于进行了keyBy分区,所以需要将barrier广播到下游并行的两个Sum任务。同时,Sum任务可能收到来自上游两个并行Map任务的barrier,所以需要执行"分界线对齐"操作。 此时的Sum 2收到了来自上游两个Map任务的barrier,说明第一条流第三个数据、第二条流第一个数据都已经处理完,可以进行状态的保存了;
而Sum 1只收到了来自Map 2的barrier,所以这时需要等待分界线对齐。在等待的过程中,如果分界线尚未到达的分区任务Map 1又传来了数据(hello, 1),说明这是需要保存到检查点的,Sum任务应该正常继续处理数据,状态更新为3;而如果分界线已经到达的分区任务Map 2又传来数据,这已经是下一个检查点要保存的内容了,就不应立即处理,而是要缓存起来等到状态保存之后再做处理。
3.2.5 有状态算子将状态保存至持久化存储
分界线对齐后,保存状态到持久化存储: 各个分区的分界线都对齐后,就可以对当前状态做快照,保存到持久化存储了。存储完成之后,同样将barrier向下游继续传递,并通知JobManager保存完毕。 这个过程中,每个任务保存自己的状态都是相对独立的,互不影响。我们可以看到,当Sum将当前状态保存完毕时,Source 1任务已经读取到第一条流的第五个数据了。
3.2.6 总结
检查点保存算法过程为:
- 触发检查点:JobManager向Source发送Barrier;
- Barrier发送:向下游广播发送;
- Barrier对齐:下游需要收到上游所有并行度传递过来的Barrier才做自身状态的保存;
- 状态保存:有状态的算子将状态保存至持久化;
- 先处理缓存数据,然后正常继续处理;
完成检查点保存之后,任务就可以继续正常处理数据了。这时如果有等待分界线对齐时缓存的数据,需要先做处理;然后再按照顺序依次处理新到的数据。当JobManager收到所有任务成功保存状态的信息,就可以确认当前检查点成功保存。之后遇到故障就可以从这里恢复了。
由于分界线对齐要求先到达的分区做缓存等待,一定程度上会影响处理的速度;当出现背压时,下游任务会堆积大量的缓冲数据,检查点可能需要很久才可以保存完毕。为了应对这种场景,Barrier对齐中提供了至少一次语义以及Flink 1.11之后提供了不对齐的检查点保存方式,可以将未处理的缓冲数据也保存进检查点。这样,当我们遇到一个分区barrier时就不需等待对齐,而是可以直接启动状态的保存了。
提示
背压是指在数据流系统中,数据的生成速度大于数据的处理速度时,为了防止数据在系统中的积压,系统会采取措施来减缓数据的生成速度,以维护系统的稳定性。
3.3 Barrier对齐的至少一次
3.3.1 触发检查点保存
JobManager发送指令,触发检查点的保存;Source 任务中插入一个分界线,并将偏移量保存到远程的持久化存储中。 并行的Source任务保存的状态为3和1,表示当前的1号检查点应该包含:第一条流中截至第三个数据、第二条流中截至第一个数据的所有状态更改。可以发现Source任务做这些的时候并不影响后面任务的处理,Sum任务已经处理完了第一条流中传来的(world, 1),对应的状态也有了更改。
3.3.2 分界线向下游传递
状态快照保存完成,分界线向下游传递: 状态存入持久化存储之后,会返回通知给 Source 任务;Source 任务就会向 JobManager 确认检查点完成,然后跟数据一样把分界线向下游任务传递。 由于Source和Map之间是一对一(forward)的传输关系(这里没有考虑算子链),所以barrier可以直接传递给对应的Map任务。之后Source任务就可以继续读取新的数据了。与此同时,Sum 1已经将第二条流传来的(hello,1)处理完毕,更新了状态。
3.3.3 分界线对齐
向下游多个并行子任务广播分界线,执行分界线对齐: Map任务没有状态,所以直接将barrier继续向下游传递。这时由于进行了keyBy分区,所以需要将barrier广播到下游并行的两个Sum任务。同时,Sum任务可能收到来自上游两个并行Map任务的barrier,所以需要执行"分界线对齐"操作。 此时的Sum 2收到了来自上游两个Map任务的barrier,说明第一条流第三个数据、第二条流第一个数据都已经处理完,可以进行状态的保存了;而Sum 1只收到了来自Map 2的barrier,所以这时需要等待分界线对齐。而如果分界线已经到达的分区任务Map 2又传来数据,直接计算等到下一个Barrier到达时做状态的保存。重新启动时介于两个Barrier之间分界线已经到达的分区任务Map 2传过来的数据会再次计算(至少一次)。
3.3.4 有状态算子将状态保存至持久化存储
分界线对齐后,保存状态到持久化存储: 各个分区的分界线都对齐后,就可以对当前状态做快照,保存到持久化存储了。存储完成之后,同样将barrier向下游继续传递,并通知JobManager保存完毕。 这个过程中,每个任务保存自己的状态都是相对独立的,互不影响。我们可以看到,当Sum将当前状态保存完毕时,Source 1任务已经读取到第一条流的第五个数据了。
3.4 非Barrier对齐的精准一次
3.4.1 触发检查点保存
JobManager发送指令,触发检查点的保存;Source 任务中插入一个分界线,并将偏移量保存到远程的持久化存储中。 并行的Source任务保存的状态为3和1,表示当前的1号检查点应该包含:第一条流中截至第三个数据、第二条流中截至第一个数据的所有状态更改。可以发现Source任务做这些的时候并不影响后面任务的处理,Sum 2已经处理完了第一条流中传来(world, 1)对应的状态也有了更改。
3.4.2 分界线向下游传递
状态快照保存完成,分界线向下游传递: 状态存入持久化存储之后,会返回通知给Source任务;Source任务就会向JobManager确认检查点完成,然后跟数据一样把分界线向下游任务传递。 由于Source和Map之间是一对一(forward)的传输关系(这里没有考虑算子链),所以barrier可以直接传递给对应的Map任务。之后Source任务就可以继续读取新的数据了。与此同时,Sum 1已经将第二条流传来的(hello,1)处理完毕,更新了状态。
3.4.3 非Barrier对齐
向下游多个并行子任务广播分界线,执行非Barrier对齐: Map任务没有状态,所以直接将barrier继续向下游传递。这时由于进行了keyBy分区,所以需要将barrier广播到下游并行的两个Sum任务。同时,Sum任务可能收到来自上游两个并行Map任务的barrier,执行"非Barrier对齐"操作。 这里我们我只关注Sum 1的细节,Sum 1在第一个barrier到达时就开始执行非对齐检查点。
核心思想:只要in-flight的数据也存到状态里,barrier就可以越过所有in-flight的数据继续往下游传递。
此时的Sum 1任务在第一个Barrier到达输入缓冲区时:
- 直接将barrier放到输出缓冲区末端,向下游传递。
- 标记数据(图中标颜色部分)
- 一是被第一个barrier越过的输入缓冲区和输出缓冲区的数据
- 二是在其他barrier之前的所有数据
- 把标记数据和状态一起保存到checkpoint中,从checkpoint恢复时这些数据也会一起恢复到对应位置
总结
- Barrier对齐: 一个Task收到所有上游同一个编号的barrier后,才会对自己的本地状态做备份
精准一次:在barrier对齐过程中,barrier后面的数据阻塞等待(不会越过barrier)
至少一次:在barrier对齐过程中,先到的barrier后面的数据会接着计算执行 - 非Barrier对齐: 一个Task收到上游的barrier后,就会对自己的本地状态做备份
先到的barrier,将本地状态备份,其后面的数据接着计算输出
后到的barrier,其前面的数据会计算输出,同时输出保存到备份中
最后到的barrier到达的时候,整个Task的备份结束
2. 配置检查点
检查点的作用是为了故障恢复,我们不能因为保存检查点占据了大量时间、导致数据处理性能明显降低。为了兼顾容错性和处理性能,我们可以在代码中对检查点进行各种配置。
2.1 启用检查点
默认情况下,Flink程序是禁用检查点的。如果想要为Flink应用开启自动保存快照的功能,需要在代码中显式地调用执行环境的enableCheckpointing()
方法:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1秒启动一次检查点保存, 默认的间隔周期为500毫秒
env.enableCheckpointing(1000);
检查点的间隔时间是对处理性能和故障恢复速度的一个权衡。如果我们希望对性能的影响更小,可以调大间隔时间;而如果希望故障重启后迅速赶上实时的数据处理,就需要将间隔时间设小一些。
2.2 检查点存储
public static void main(String[] args) throws Exception {
// 只要用HDFS,需要指定用户名
System.setProperty("HADOOP_USER_NAME", "jack");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// 每隔3秒启动一次检查点保存, 不配置默认是EXACTLY_ONCE
env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
// 设置检查点保存的路径
checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/flink/ckps");
// checkpoint的超时时间, 默认10分钟
checkpointConfig.setCheckpointTimeout(60000);
// 同时运行中的checkpoint的最大数量, 推荐为1,避免检查点开始时,上一个检查点还没结束
checkpointConfig.setMaxConcurrentCheckpoints(1);
// 最小等待间隔: 上一轮checkpoint结束 到 下一轮checkpoint开始 之间的间隔,设置了>0,并发就会变成1
// 如果maxConcurrentCheckpoints>1,该设置有效
checkpointConfig.setMinPauseBetweenCheckpoints(500);
// 取消作业时,checkpoint的数据是否保留在外部系统,
// 默认为RETAIN_ON_CANCELLATION, 当作业被取消时,保留checkpoint数据
checkpointConfig.
setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 允许checkpoint 连续失败的次数,默认0--》表示checkpoint一失败,job就挂掉
checkpointConfig.setTolerableCheckpointFailureNumber(3);
// 开启 非对齐检查点(barrier非对齐)
checkpointConfig.enableUnalignedCheckpoints();
// 如果大于0,一开始用 对齐的检查点(barrier对齐), 对齐的时间超过这个参数,自动切换成 非对齐检查点(barrier非对齐)
checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1));
env
.socketTextStream("hadoop103", 7777)
.flatMap(
(String value, Collector<Tuple2<String, Integer>> out) -> {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
)
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(value -> value.f0)
.sum(1)
.print();
String jobName = "检查点应用";
env.execute(jobName);
}
运行结果: 点击Configuration:
点击History:
保存点保存在HDFS上信息:
提示
如果开启非对齐检查点:要求检查点模式(CheckpointingMode)必须为exctly-once,并且最大并发的检查点个数为1。
2.3 增量checkpoint (changelog)
在1.15之前,只有RocksDB支持增量快照。不同于产生一个包含所有数据的全量备份,增量快照中只包含自上一次快照完成之后被修改的记录,因此可以显著减少快照完成的耗时。
从1.15开始,不管hashmap还是rocksdb状态后端都可以通过开启changelog实现通用的增量checkpoint。
提示
目前该特性标记为实验性功能,开启后可能会造成资源消耗增大:
- HDFS上保存的文件数变多
- 消耗更多的IO带宽用于上传变更日志
- 更多的CPU用于序列化状态更改
- TaskManager使用更多内存来缓存状态更改
使用限制:
- Checkpoint的最大并发必须为1
- 从Flink 1.15开始,只有文件系统的存储类型实现可用(memory测试阶段)
- 不支持NO_CLAIM模式
- 开启方式--配置文件指定(推荐方式,有些配置在代码中没有api提供):
state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem
# 存储 changelog 数据
dstl.dfs.base-path: hdfs://hadoop102:8020/changelog
execution.checkpointing.max-concurrent-checkpoints: 1
execution.savepoint-restore-mode: CLAIM
- 开启方式--在代码中设置
在pom.xml中添加依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-changelog</artifactId>
<version>${flink.version}</version>
<scope>runtime</scope>
</dependency>
// 开启changelog
env.enableChangelogStateBackend(true);
2.4 最终检查点
如果数据源是有界的,就可能出现部分Task已经处理完所有数据,变成finished状态,不继续工作。从Flink 1.14开始,这些finished状态的Task,也可以继续执行检查点。自1.15起默认启用此功能。
3. 保存点(Savepoint)
除了检查点外,Flink还提供了另一个非常独特的镜像保存功能——保存点(savepoint)。从名称就可以看出,这也是一个存盘的备份,它的原理和算法与检查点完全相同,只是多了一些额外的元数据。
3.1 保存点的用途
保存点与检查点最大的区别,就是触发的时机。检查点是由Flink自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个"自动存盘"的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是"手动存盘"。保存点可以当作一个强大的运维工具来使用。我们可以在需要的时候创建一个保存点,然后停止应用,做一些处理调整之后再从保存点重启。它适用的具体场景有:
- 版本管理和归档存储
- 更新Flink版本
- 更新应用程序
- 调整并行度
- 暂停应用程序
需要注意的是,保存点能够在程序更改的时候依然兼容,前提是状态的拓扑结构和数据类型不变。我们知道保存点中状态都是以算子ID-状态名称这样的key-value组织起来的,算子ID可以在代码中直接调用SingleOutputStreamOperator的.uid()方法来进行指定:
DataStream<String> stream = env
.addSource(new StatefulSource()).uid("source-id")
.map(new StatefulMapper()).uid("mapper-id")
.print();
对于没有设置ID的算子,Flink默认会自动进行设置,所以在重新启动应用后可能会导致ID不同而无法兼容以前的状态。所以为了方便后续的维护,强烈建议在程序中为每一个算子手动指定ID。
3.2 使用保存点
保存点的使用非常简单,我们可以使用命令行工具来创建保存点,也可以从保存点恢复作业。
3.2.1 创建保存点
## 要在命令行中为运行的作业创建一个保存点镜像,只需要执行:
bin/flink savepoint :jobId [:targetDirectory]
## 在停掉一个作业时直接创建保存点
bin/flink stop --savepointPath [:targetDirectory] :jobId
jobId需要填充要做镜像保存的作业ID,目标路径targetDirectory可选,表示保存点存储的路径。对于保存点的默认路径,可以通过配置文件flink-conf.yaml中的state.savepoints.dir
项来设定:
state.savepoints.dir: hdfs:///flink/savepoints
当然对于单独的作业,我们也可以在程序代码中通过执行环境来设置:
// 不太建议,涉及改代码就不方便
env.setDefaultSavepointDir("hdfs:///flink/savepoints");
3.2.2 从保存点重启应用
从保存点重启一个应用:
bin/flink run -s :savepointPath [:runArgs]
3.3 使用保存点切换状态后端
使用savepoint恢复状态的时候,也可以更换状态后端。但是有一点需要注意的是,不要在代码中指定状态后端了, 通过配置文件来配置或者-D
参数配置。
## 提交flink作业
bin/flink run-application -d -t yarn-application -Dstate.backend=hashmap -c com.rocket.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar
## 停止flink作业时,触发保存点
bin/flink stop -p savepoint路径 job-id -yid application-id(推荐方式--优雅停止)
或者
bin/flink cancel -s savepoint路径 job-id -yid application-id
## 从保存下来的checkpoint恢复作业
bin/flink run-application -d -t yarn-application -Dstate.backend=rocksdb -s hdfs://hadoop102:8020/chk/532f87ef4146b2a2968a1c137d33d4a6/chk-175 -c com.rocket.checkpoint.SavepointDemo ./FlinkTutorial-1.0-SNAPSHOT.jar
如果停止作业时,忘了触发保存点也不用担心,现在版本的flink支持从保留在外部系统的checkpoint恢复作业,但是恢复时不支持切换状态后端。