Flink 容错机制
Checkpoints(检查点)
Flink中基于异步轻量级的分布式快照技术提供了Checkpoints容错机制,Checkpoints可以将同一时间点作业/算子的状态数据全局统一快照处理,包括前面提到的算子状态和键值分区状态。当发生了故障后,Flink会将所有任务的状态恢复至最后一次Checkpoint中的状态,并从那里重新开始执行。
那么Checkpoints的生成策略是什么样的呢?它会在什么时候进行快照的生成呢?
其实就是在所有任务都处理完同一个输入数据流的时候,这时就会对当前全部任务的状态进行一个拷贝,生成Checkpoints。
为了方便理解,这里先简单的用一个朴素算法来解释这一生成过程(Flink的Checkpoints算法实际要更加复杂,在下面会详细讲解)
- 暂停接受所有输入流。
- 等待已经流入系统的数据被完全处理,即所有任务已经处理完所有的输入数据。
- 将所有任务的状态拷贝到远程持久化,生成Checkpoints。在所有任务完成自己的拷贝工作后,Checkpoints生成完毕。
- 恢复所有数据流的接收
恢复流程
为了方便进行实例的讲解,假设当前有一个Source任务,负责从一个递增的数字流(1、2、3、4……)中读取数据,读取到的数据会分为奇数流和偶数流,求和算子的两个任务会分别对它们进行求和。在当前任务中,数据源算子的任务会将输入流的当前偏移量存为状态,求和算子的任务会将当前和存为状态。
如上图,在当前生成的Checkpoints中保存的输入偏移为5,偶数求和为6,奇数求和为9。
假设在下一轮计算中,任务sum_odd计算出现了问题,任务sum_odd的时候产生了问题,导致结果出现错误。由于出现问题,为了防止从头开始重复计算,此时会通过Checkpoints来进行快照的恢复。
Checkpoints恢复应用需要以下三个步骤
- 重启整个应用
- 利用最新的检查点重置任务状态
- 恢复所有任务的处理
- 第一步我们需要先重启整个应用,恢复到最原始的状态。
- 紧接着从检查点的快照信息中读取出输入源的偏移量以及算子计算的结果,进行状态的恢复
- 状态恢复完成后,继续Checkpoints恢复的位置开始继续处理。
从检查点恢复后,它的内部状态会和生成检查点的时候完全一致,并且会紧接着重新处理那些从之前检查点完成开始,到发生系统故障之间已经处理过的数据。虽然这意味着Flink会重复处理部分消息,但上述机制仍然可以实现精确一次的状态一致性,因为所有的算子都会恢复到那些数据处理之前的时间点。
但这个机制仍然面临一些问题,因为Checkpoints和恢复机制仅能重置应用内部的状态,而应用所使用的Sink可能在恢复期间将结果向下游系统(如事件日志系统、文件系统或数据库)重复发送多次。为了解决这个问题,对于某些存储系统,Flink提供的Sink函数支持精确一次输出 (在检查点完成后才会把写出的记录正式提交)。另一种方法则是适用于大多数存储系统的幂等更新。
生成策略
Flink中的Checkpoints是基于Chandy-Lamport分布式快照算法 实现的,该算法不会暂停整个应用,而是会将生成Checkpoints的过程和处理过程分离,这样在部分任务持久化状态的过程中,其他任务还可以继续执行。
在介绍生成策略之前,首先需要介绍一下Checkpoints barrier(屏障) 这一种特殊记录。
如上图,与水位线相同,Flink会在Source中间隔性地生成barrier,通过barrier把一条流上的数据划分到不同的Checkpoints中,在barrier之前到来的数据导致的状态更改,都会被包含在当前所属的Checkpoints中;而基于barrier之后的数据导致的所有更改,就会被包含在之后的Checkpoints中。
- 假设当前有两个Source任务,各自消费一个递增的数字流(1、2、3、4……),读取到的数据会分为奇数流和偶数流,求和算子的两个任务会分别对它们进行求和,并将结果值更新至下游Sink。
- 此时JobManager向每一个Source任务发送一个新的Checkpoints编号,以此启动Checkpoints生成流程。
- 在Source任务收到消息后,会暂停发出记录,紧接着利用状态后端生成本地状态的Checkpoints,并把barrier连同编号广播给所有传出的数据流分区。
- 状态后端在状态存入Checkpoints后通知Source任务,并向JobManager发送确认消息。
- 在所有barrier发出后,Source将恢复正常工作。
- Source任务会广播barrier至所有与之相连的任务,确保这些任务能从它们的每个输入都收到一个barrier
- 在等待过程中,对于barrier未到达的分区,数据会继续正常处理。而barrier已经到达的分区,它们新到来的记录会被缓冲起来,不能处理。这个等待所有barrier到来的过程被称为barrier对齐
- 任务中收齐全部输入分区发送的barrier后,就会通知状态后端开始生成Checkpoints,同时继续把Checkpoints barrier广播转发到下游相连的任务。
- 任务在发出所有的Checkpoints barrier后就会开始处理缓冲的记录。等到所有缓冲记录处理完后,任务就会继续处理Source。
- Sink任务在收到分隔符后会依次进行barrier对齐,然后将自身状态写入Checkpoints,最终向JobManager发送确认信息。
- JobManager在接收到所有任务返回的Checkpoints确认信息后,就说明此次Checkpoints生成结束。
Savepoints(保存点)
- 由于Cheakpoints是周期性自动生成的,但有些时候我们需要手动的去进行镜像保存功能,于是Flink同时还为我们提供了Savepoints来完成这个功能,Savepoints不仅可以做到故障恢复,还可以用于手动备份、版本迁移、暂停或重启应用等。
- Savepoints是Checkpoints的一种特殊实现,底层也是使用Checkpoint机制,因此Savepoints可以认为是具有一些额外元数据的Checkpoints。
- Savepoints的生成和清理都无法由Flink自动进行,因此都需要用户自己来显式触发。