1. 检查点(Checkpoint)

  flink将之前某个时间点所有的状态保存下来,这份存档就是所谓的检查点(checkpoint)。检查点是 Flink 容错机制的核心。检查是针对故障恢复的结果而言的:故障恢复之后继续处理的结果,应该与发生故障前完全一致。所以有时又会把 checkpoint 叫作一致性检查点


1.1 检查点的保存

(1) 周期性的触发保存

  检查点作为应用状态的一份存档,其实就是所有任务状态在同一时间点的一个快照(snapshot),它的触发是周期性的。每隔一段时间检查点保存操作被触发时,就把每个任务当前的状态复制一份,按照一定的逻辑结构放在一起持久化保存起来,就构成了检查点

(2) 保存的时间点

  当所有任务都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。首先,这样避免了除状态之外其他额外信息的存储,提高了检查点保存的效率。其次,一个数据要么就是被所有任务完整地处理完,状态得到了保存;要么就是没处理完,状态全部没保存:这就相当于构建一个事务(transaction)。如果出现故障,恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理;所以只需要让源(source)任务向数据源重新提交偏移量、请求重放数据就可以了。这需要源任务可以把偏移量作为算子状态保存下来,而且外部数据源能够重置偏移量;(典型应用 Kafka)


1.2 从检查点恢复状态

  当发生故障时,需要找到最近一次成功保存的检查点来恢复状态
在这里插入图片描述
具体的步骤为:

(1)重启应用

  遇到故障之后,第一步当然是重启,所有任务的状态会清空
在这里插入图片描述

(2)读取检查点,重置状态

  找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态中。这样,Flink 内部所有任务的状态,就恢复到了保存检查点的那一时刻
在这里插入图片描述

(3)重放数据

  从检查点恢复状态后如果直接继续处理数据,那么保存检查点之后、到发生故障这段时间内的数据就相当于丢掉了;这会造成计算结果的错误。为了不丢数据,应该从保存检查点后开始重新读取数据,这可以通过 Source 任务向外部数据源重新提交偏移量(offset)来实现。这样,整个系统的状态已经完全回退到了检查点保存完成的那一时刻
在这里插入图片描述

(4)继续处理数据

在这里插入图片描述

注:
  想要正确地从检查点中读取并恢复状态,必须知道每个算子任务状态的类型和先后顺序(拓扑结构);因此为了可以从之前的检查点中恢复状态,在改动程序、修复 bug 时要保证状态的拓扑顺序和类型不变。状态的拓扑结构在 JobManager 上可以由 JobGraph 分析得到,而检查点保存的定期触发也是由 JobManager 控制的;所以故障恢复的过程需要 JobManager 的参与


1.3 检查点算法

  在不暂停整体流处理的前提下,将状态备份保存到检查点。在 Flink中,采用了基于 Chandy-Lamport 算法的分布式快照

(1)检查点分界线(Barrier)

  借鉴水位线(watermark)的设计,在数据流中插入一个特殊的数据结构,专门用来表示触发检查点保存的时间点。收到保存检查点的指令后,Source 任务可以在当前数据流中插入这个结构;之后的所有任务只要遇到它就开始对状态做持久化快照保存。由于数据流是保持顺序依次处理的,因此遇到这个标识就代表之前的数据都处理完了,可以保存一个检查点;而在它之后的数据,引起的状态改变就不会体现在这个检查点中,而需要保存到下一个检查点。这种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫作检查点的分界线(Checkpoint Barrier)。检查点分界线中带有一个检查点 ID,这是当前要保存的检查点的唯一标识
在这里插入图片描述
  在 JobManager 中有一个检查点协调器(checkpoint coordinator),专门用来协调处理检查点的相关工作。检查点协调器会定期向 TaskManager 发出指令,要求保存检查点(带着检查点 ID);TaskManager 会让所有的 Source 任务把自己的偏移量(算子状态)保存起来,并将带有检查点 ID 的分界线(barrier)插入到当前的数据流中,然后像正常的数据一样像下游传递;之后 Source 任务就可以继续读入新的数据了。每个算子任务只要处理到这个 barrier,就把当前的状态进行快照;在收到 barrier 之前,还是正常地处理之前的数据,完全不受影响

(2)分布式快照算法

  通过在流中插入分界线(barrier)需要保持顺序一致,在一条单一的流上,数据依次进行处理,顺序保持不变;不过对于分布式流处理来说,想要保持数据的顺序就不是那么容易了。
  Flink 使用了 Chandy-Lamport 算法的一种变体,被称为异步分界线快照(asynchronous barrier snapshotting)算法。算法的核心就是两个原则:当上游任务向多个并行下游任务发送 barrier 时,需要广播出去;而当多个上游任务向同一个下游任务传递 barrier 时,需要在下游任务执行分界线对齐(barrier alignment)操作,也就是需要等到所有并行分区的 barrier 都到齐,才可以开始状态的保存

在这里插入图片描述
具体过程:
(1)JobManager 发送指令,触发检查点的保存;Source 任务保存状态,插入分界线
  JobManager 会周期性地向每个 TaskManager 发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点。收到指令后,TaskManger 会在所有 Source 任务中插入一个分界线(barrier),并将偏移量保存到远程的持久化存储中
在这里插入图片描述
(2)状态快照保存完成,分界线向下游传递
  状态存入持久化存储之后,会返回通知给 Source 任务;Source 任务就会向 JobManager 确认检查点完成,然后像数据一样把 barrier 向下游任务传递
在这里插入图片描述
(3)向下游多个并行子任务广播分界线,执行分界线对齐
  对于下一个检查点要保存的内容,不应立即处理,而是要缓存起来、等到状态保存之后再做处理
在这里插入图片描述
(4)分界线对齐后,保存状态到持久化存储
在这里插入图片描述
(5)先处理缓存数据,然后正常继续处理
  完成检查点保存之后,任务就可以继续正常处理数据。这时如果有等待分界线对齐时缓存的数据,需要先做处理;然后再按照顺序依次处理新到的数据

注:
  当 JobManager 收到所有任务成功保存状态的信息,就可以确认当前检查点成功保存。之后遇到故障就可以从这里恢复
  由于分界线对齐要求先到达的分区做缓存等待,一定程度上会影响处理的速度;当出现背压(backpressure)时,下游任务会堆积大量的缓冲数据,检查点可能需要很久才可以保存完毕。为了应对这种场景,Flink 1.11 之后提供了不对齐的检查点保存方式,可以将未处理的缓冲数据(in-flight data)也保存进检查点。这样,当我们遇到一个分区 barrier 时就不需等待对齐,而是可以直接启动状态的保存了


1.4 检查点配置

(1) 启用检查点

  默认情况下,Flink 程序是禁用检查点的。如果想要为 Flink 应用开启自动保存快照的功能,需要在代码中显式地调用执行环境的.enableCheckpointing()方法:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔 1 秒启动一次检查点保存
env.enableCheckpointing(1000);
(2)检查点存储(Checkpoint Storage)

  检查点具体的持久化存储位置,取决于检查点存储(CheckpointStorage)的设置。默认情况下,检查点存储在 JobManager 的堆(heap)内存中。而对于大状态的持久化保存,Flink也提供了在其他存储位置进行保存的接口,这就是 CheckpointStorage。具 体 可以通过调用检查点配置的 .setCheckpointStorage() 来 配 置 , 需 要 传 入 一 个CheckpointStorage 的实现类。Flink 主要提供了两种 CheckpointStorage:作业管理器的堆内存(JobManagerCheckpointStorage)和文件系统(FileSystemCheckpointStorage)

// 配置存储检查点到 JobManager 堆内存
env.getCheckpointConfig().setCheckpointStorage(new 
JobManagerCheckpointStorage());
// 配置存储检查点到文件系统
env.getCheckpointConfig().setCheckpointStorage(new 
FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
(3)其他高级配置

(1)检查点模式(CheckpointingMode)
  设置检查点一致性的保证级别,有精确一次(exactly-once)和至少一次(at-least-once)两个选项。默认级别为 exactly-once,而对于大多数低延迟的流处理程序,at-least-once 就够用了,而且处理效率会更高

(2)超时时间(checkpointTimeout)
  用于指定检查点保存的超时时间,超时没完成就会被丢弃掉。传入一个长整型毫秒数作为参数,表示超时时间

(3)最小间隔时间(minPauseBetweenCheckpoints)
  用于指定在上一个检查点完成之后,检查点协调器(checkpoint coordinator)最快等多久可以发出保存下一个检查点的指令。这就意味着即使已经达到了周期触发的时间点,只要距离上一个检查点完成的间隔不够,就依然不能开启下一次检查点的保存。这就为正常处理数据留下了充足的间隙。当指定这个参数时,maxConcurrentCheckpoints 的值强制为 1

(4)最大并发检查点数量(maxConcurrentCheckpoints)
  用于指定运行中的检查点最多可以有多少个。由于每个任务的处理进度不同,完全可能出现后面的任务还没完成前一个检查点的保存、前面任务已经开始保存下一个检查点了。这个参数就是限制同时进行的最大数量。如果前面设置了 minPauseBetweenCheckpoints,则 maxConcurrentCheckpoints 这个参数就不起作用了

(5)开启外部持久化存储(enableExternalizedCheckpoints)
  用于开启检查点的外部持久化,而且默认在作业失败的时候不会自动清理,如果想释放空间需要自己手工清理。里面传入的参数 ExternalizedCheckpointCleanup 指定了当作业取消的时候外部的检查点该如何清理。
DELETE_ON_CANCELLATION:在作业取消的时候会自动删除外部检查点,但是如果是作业失败退出,则会保留检查点。
RETAIN_ON_CANCELLATION:作业取消的时候也会保留外部检查点

(6)检查点异常时是否让整个任务失败(failOnCheckpointingErrors)
  用于指定在检查点发生异常的时候,是否应该让任务直接失败退出。默认为 true,如果设置为 false,则任务会丢弃掉检查点然后继续运行

(7)不对齐检查点(enableUnalignedCheckpoints)
  不再执行检查点的分界线对齐操作,启用之后可以大大减少产生背压时的检查点保存时间。这个设置要求检查点模式(CheckpointingMode)必须为 exctly-once,并且并发的检查点个数为 1


1.5 保存点(Savepoint)

  原理和算法与检查点完全相同,只是多了一些额外的元数据。事实上,保存点就是通过检查点的机制来创建流式作业状态的一致性镜像(consistent image)的。保存点中的状态快照,是以算子 ID 和状态名称组织起来的,相当于一个键值对。从保存点启动应用程序时,Flink 会将保存点的状态数据重新分配给相应的算子任务

(1)使用保存点

(1)创建保存点
  要在命令行中为运行的作业创建一个保存点镜像,只需要执行:

bin/flink savepoint :jobId [:targetDirectory]

  对于保存点的默认路径,可以通过配置文件 flink-conf.yaml 中的 state.savepoints.dir 项来设定:

state.savepoints.dir: hdfs:///flink/savepoints

  对于单独的作业,可以在程序代码中通过执行环境来设置:

env.setDefaultSavepointDir("hdfs:///flink/savepoints");

  由于创建保存点一般都是希望更改环境之后重启,所以创建之后往往紧接着就是停掉作业的操作。除了对运行的作业创建保存点,可以在停掉一个作业时直接创建保存点:

bin/flink stop --savepointPath [:targetDirectory] :jobId
(2)从保存点重启应用

  提交启动一个 Flink 作业,使用的命令是 flink run;现在要从保存点重启一个应用,其实本质是一样的:

bin/flink run -s :savepointPath [:runArgs]

2. 状态一致性

2.1 状态一致性的三种级别

最多一次(AT-MOST-ONCE)
  当任务发生故障时,最简单的做法就是直接重启,别的什么都不干;既不恢复丢失的状态,也不重放丢失的数据。每个数据在正常情况下会被处理一次,遇到故障时就会丢掉,所以就是最多处理一次

至少一次(AT-LEAST-ONCE)
  在实际应用中,希望至少不要丢掉数据。这种一致性级别就叫作至少一次(at-least-once),就是说是所有数据都不会丢,肯定被处理了;不过不能保证只处理一次,有些数据会被重复处理。
  在有些场景下,重复处理数据是不影响结果的正确性的,这种操作具有幂等性。如 UV
  为了保证达到 at-least-once 的状态一致性,需要在发生故障时能够重放数据。最常见的做法是,可以用持久化的事件日志系统,把所有的事件写入到持久化存储中。这时只要记录一个偏移量,当任务发生故障重启后,重置偏移量就可以重放检查点之后的数据了。Kafka 就是这种架构的一个典型实现

精确一次(EXACTLY-ONCE)
  最难实现的状态一致性语义。exactly-once 意味着所有数据不仅不会丢失,而且只被处理一次,不会重复处理。也就是说对于每一个数据,最终体现在状态和输出结果上,只能有一次统计
  


2.2 端到端的状态一致性

  在实际应用中,一般要保证从用户的角度看来,最终消费的数据是正确的。而用户或者外部应用不会直接从 Flink 内部的状态读取数据,往往需要将处理结果写入外部存储中。这就要求不仅要考虑 Flink 内部数据的处理转换,还涉及从外部数据源读取,以及写入外部持久化系统,整个应用处理流程从头到尾都应该是正确的。所以完整的流处理应用,应该包括了数据源、流处理器和外部存储系统三个部分。这个完整应用的一致性,就叫作端到端(end-to-end)的状态一致性,它取决于三个组件中最弱的那一环。一般来说,能否达到 at-least-once 一致性级别,主要看数据源能够重放数据;而能否达到 exactly-once 级别,流处理器内部、数据源、外部存储都要有相应的保证机制


3. 端到端精确一次(end-to-end exactly-once)

  由于检查点保存的是之前所有任务处理完某个数据后的状态快照,所以重放的数据引起的状态改变一定不会包含在里面,最终结果中只处理了一次。所以,端到端一致性的关键点,就在于输入的数据源端和输出的外部存储端

3.1 输入端保证

  数据源可重放数据,或者说可重置读取数据偏移量,加上 Flink 的 Source 算子将偏移量作为状态保存进检查点,就可以保证数据不丢。这是达到 at-least-once 一致性语义的基本要求,也是实现端到端 exactly-once 的基本要求


3.2 输出端保证

  为了实现端到端 exactly-once,我们还需要对外部存储系统、以及 Sink 连接器有额外的要求。保证 exactly-once 一致性的写入方式有两种:

(1)幂等(idempotent)写入

  一个操作可以重复执行很多次,但只导致一次结果更改

(2)事务(transactional)写入

  事务(transaction)是应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所做的所有更改都会被撤消。事务有四个基本特性:原子性(Atomicity)、一致性(Correspondence)、隔离性(Isolation)和持久性(Durability)。
  用一个事务来进行数据向外部系统的写入,事务与检查点绑定。当 Sink 任务遇到 barrier 时,开始保存状态的同时就开启一个事务,接下来所有数据的写入都在这个事务中;待到当前检查点保存完毕时,将事务提交,所有写入的数据就真正可用了。如果中间过程出现故障,状态会回退到上一个检查点,而当前事务没有正常关闭(因为当前检查点没有保存完),所以也会回滚,写入到外部的数据就被撤销了

(1)预写日志(write-ahead-log,WAL)
①先把结果数据作为日志(log)状态保存起来
②进行检查点保存时,也会将这些结果数据一并做持久化存储
③在收到检查点完成的通知时,将所有结果一次性写入外部系统

(2)两阶段提交(two-phase-commit,2PC)
真正基于事务的,它需要外部系统提供事务支持
①当第一条数据到来时,或者收到检查点的分界线时,Sink 任务都会启动一个事务。
②接下来接收到的所有数据,都通过这个事务写入外部系统;这时由于事务没有提交,所以数据尽管写入了外部系统,但是不可用,是预提交的状态。
③当 Sink 任务收到 JobManager 发来检查点完成的通知时,正式提交事务,写入的结果真正可用


Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐