[Flink][3][ApacheFlink架构]

第3章 Apache Flink架构

在这一章中,我们对Flink的架构进行了一个高层次的介绍,并描述了Flink如何解决我们之前讨论过的流处理相关问题。特别地,我们重点解释Flink的分布式架构,展示它在流处理应用中是如何处理时间状态的,并讨论了它的容错机制

3.1 系统架构

Flink是一个用于状态化并行数据流处理分布式系统。Flink设置由多个进程组成,这些进程通常分布在多台机器上运行。

分布式系统需要解决的常见挑战是

  1. 集群中计算资源分配和管理
  2. 进程协调
  3. 持久和高可用性数据存储
  4. 故障恢复

Flink本身并没有实现所有这些功能。它只关注于其核心功能——分布式数据流处理,但是利用了很多现有的开源中间件和框架来实现其他非核心部分。

  • Flink与集群资源管理器(如Apache Mesos、YARN和Kubernetes)集成得很好,但也可以配置为作为独立集群运行。
  • Flink不提供持久的分布式存储。相反,它利用了像HDFS这样的分布式文件系统或S3这样的对象存储。
  • 对于高可用设置中的领导选举,Flink依赖于Apache ZooKeeper。

3.1.1 搭建Flink所需的组件

Flink的搭建由四个不同的组件组成,它们一起工作来执行流应用程序。这些组件是JobManagerResourceManagerTaskManagerDispatcher。由于Flink是用Java和Scala实现的,所以所有组件都运行在Java虚拟机(jvm)上。各组成部分的职责将在下面四个子小节分别介绍。

3.1.1.1 JobManager

应用管理

JobManager是控制 单个应用程序执行主进程,每个应用程序由一个的JobManager控制。(一对一关系)

  • JobManager负责接收要执行的应用程序。该应用程序由一个所谓的JobGraph(一个逻辑数据流图)和一个JAR文件组成(JAR文件捆绑了该应用程序所有必需的类、库和其他资源)。
  • JobManager将JobGraph转换为名为ExecutionGraph的物理数据流图,该数据流图由可并行执行的任务组成
  • JobManager从ResourceManager请求必要的资源(TaskManager槽)来执行任务。一旦它接收到足够数量的TaskManager槽,它就会ExecutionGraph的任务分配给执行它们的TaskManager
  • 执行期间,JobManager负责所有需要集中协调的操作,如检查点的协调。
3.1.1.2 RecourceManager

资源管理

Flink 不同的环境和资源提供者(如YARN、Mesos、Kubernetes和独立部署)提供了多个资源管理器

  • ResourceManager负责管理Flink的处理资源单元TaskManager槽
  • 当JobManager请求TaskManager槽时,ResourceManager会命令某个带有空闲槽的TaskManager将它的空闲槽提供给JobManager
  • 如果ResourceManager没有足够的槽来满足JobManager的请求,则ResourceManager可以与资源提供者对话,让资源提供者尝试启动更多的TaskManager。
  • ResourceManager还负责终止空闲的TaskManager以释放计算资源。
3.1.1.3 TaskManager

工作进程,执行任务的

TaskManager是Flink的工作进程(worker process,工人)

  • 通常,在一个Flink集群中有多个TaskManager在运行。
  • 每个TaskManager提供一定数量的槽槽的数量限制了TaskManager可以执行的任务数量
    • 在TaskManager启动之后,TaskManager将它的槽注册到ResourceManager
    • 当JobManager请求槽的时候,根据ResourceManager的指示,TaskManager向JobManager提供一个或多个槽
    • 然后JobManager可以将任务分配到槽中,让TaskManager执行这些任务。
    • 在执行期间,TaskManager与运行相同应用但是不同任务的其他TaskManager交换数据
3.1.1.4 Dispatcher

与用户直接对话

Dispatcher提供一个REST接口让用户提交要执行的应用。

  • 应用提交执行后,它将启动JobManager,并将应用交给它来执行。
  • Dispatcher还运行一个web仪表盘来提供关于作业执行的信息。
3.1.1.5 整体架构图

3.1.2 应用部署

Flink应用程序可以以两种不同的模式来部署。

3.1.2.1 框架模式

在这种模式下,Flink应用程序打包到一个JAR文件中,并由客户端提交给一个正在运行的服务。该服务可以是Flink Dispatcher、Flink JobManager或YARN的ResourceManager。

  • 如果应用程序被提交到JobManager,它将立即开始执行应用程序
  • 如果应用程序被提交给Dispatcher或YARN ResourceManager,它将启动JobManager移交应用程序,然后JobManager将开始执行应用程序。
3.1.2.2 库模式

在这种模式下,Flink应用程序绑定在一个应用程序特定的容器镜像中,比如Docker镜像

  • 该镜像还包括运行JobManager和ResourceManager的代码。
  • 当容器从镜像启动时,它会自动启动ResourceManager和JobManager,并执行绑定的应用程序。
  • 第二个独立于应用程序的镜像用于部署TaskManager容器
    • 从这个镜像启动的容器会自动启动TaskManager,它连接到ResourceManager并注册它的槽。
    • 通常,外部资源管理器(如Kubernetes)负责启动镜像,并负责在发生故障时重新启动容器。

第一种模式比较传统,第二种模式常用于微服务中。

3.1.3 任务执行

TaskManager可以同时执行多个任务

这些任务可以

  • 属于同一算子(数据并行)、
  • 不同算子(任务并行)的子任务
  • 甚至是来自不同应用程序的子任务(应用并行)。

TaskManager提供固定数量的处理槽控制它能够并发执行的任务的数量一个处理槽可以执行应用程序某个算子一个并行任务。下图是一个TaskManager、处理槽、任务以及算子关系的例子。

左侧是一个JobGraph(应用程序的非并行表示,逻辑图)。

  • 它由5个算子组成。

  • 算子A和C是数据源,算子E是数据汇。

右侧是一个ExecutionGraph物理图

  • 算子C和E的并行度为2。其他算子的并行度为4。
  • 由于最大算子并行度是4个,应用程序至少需要4个可用的处理槽来执行。
  • 给定两个各有两个处理槽的Taskmanager,就满足了这个需求。
  • JobManager将JobGraph扩展为一个ExecutionGraph,并将任务分配给四个可用插槽。
  • 并行度为4的算子各自有4个并行任务,这些任务 被分配给每个槽。
  • 运算符C和E的各自有两个并行任务,分别被分配到槽1.1和2.1以及槽1.2和2.2。
  • 多个不同算子任务 分配到同一个插槽的优点是这些任务可以在同一个进程中高效地交换数据,而不需要访问网络。

每个TaskManager是一个JVM,而每个Slot是JVM中的一个线程。TaskManager在同一个JVM进程中以多线程方式执行它的任务。线程比单独的进程更轻量,通信成本更低,但不会严格地将任务彼此隔离。因此,一个行为不正常的任务可以杀死整个TaskManager进程和运行在它上面的所有任务。

3.1.4 高可用性设置

流式应用程序通常设计为24x7运行。因此,即使内部进程失败,也不能停止运行。

而要想从失败中恢复

  1. 系统首先需要重新启动失败的进程
  2. 其次,重新启动应用程序并恢复其状态。

本小节主要学习如何重新启动失败的进程。

3.1.4.1 TaskManager故障

下面举例说明TaskManager故障应该如何处理

  • 假设我们的应用程序要以最大并行度为8来执行,那么四个TaskManager(每个TaskManager提供两个插槽)可以满足我们对并行度的需求。

  • 如果其中一个TaskManager发生故障,可用插槽的数量将减少到6个。

  • 在这种情况下,JobManager将请求ResourceManager提供更多处理槽。

  • 如果请求失败,JobManager会按照一定的时间间隔连续地重启应用。直到重启成功(有足够多的空闲插槽就能重启成功)。

3.1.4.2 JobManager故障

比TaskManager失败更具挑战性的问题是JobManager失败。

  • JobManager控制流应用程序的执行,并保存有关其执行的元数据,例如指向已完成检查点的指针。

  • 如果负责的JobManager进程失败,流应用程序将无法继续处理。

  • 这使得JobManager成为Flink中的应用程序的一个单点失效组件(也就是如果这个组件失效,那么整个系统失效)。

为了克服这个问题,Flink支持一种高可用模式,该模式可以在原始JobManager失效时将应用的管理权和应用的元数据 迁移到另一个JobManager。

Flink的高可用模式 基于 ZooKeeper

  • 它是一个分布式系统,来提供分布式协调共识服务

  • Flink使用ZooKeeper进行领袖选举,并将其作为一个高可用性和持久的数据存储

  • 高可用性模式下操作时,JobManagerJobGraph和所有必需的元数据(如应用程序的JAR文件)写入远程持久存储系统

  • 此外,JobManager将一个指向存储位置的指针 写入ZooKeeper的数据存储中。

  • 在应用程序执行期间,JobManager接收各个任务检查点的状态句柄(存储位置)。当检查点完成后,JobManager将状态写入远程存储,并将指向此远程存储位置的指针写入ZooKeeper

  • 因此,从JobManager故障中恢复所需的所有数据都存储在远程存储中,而ZooKeeper持有指向存储位置的指针

  • 图3-3说明了这种设计。

当JobManager失败时,接管它工作的新JobManager执行以下步骤:

  1. 从ZooKeeper请求存储位置然后从远程存储中获取JobGraph、JAR文件和应用程序最后一个检查点的存储位置。
  2. 它向ResourceManager请求处理槽继续执行应用程序
  3. 它将重新启动应用程序,并将其所有任务的状态重置检查点中的状态值

最后还有一个问题,当TaskManager或者JobManager失效时,谁会触发它们的重启

  • 在容器环境(如Kubernetes)中作为库部署运行应用程序时,失败的JobManager或TaskManager容器通常由容器编排服务自动重新启动。
  • 在YARN或Mesos上运行时,Flink的其余进程将触发JobManager或TaskManager进程的重新启动。

3.2 Flink中的数据传输

在运行过程中,应用的任务不断地交换数据TaskManager 负责将数据从发送任务发送到接收任务。TaskManager的网络组件在发送记录之前在缓冲区中收集记录,就是说,记录不是一个一个发送的,而是先缓存到缓冲区中然后一批一批发送。这种技术是有效使用网络资源和实现高吞吐量的基础。

每个TaskManager都有一个 网络缓冲池(默认大小为32 KB)用于发送和接收数据。

  • 如果发送方任务接收方任务不同的TaskManager进程中运行,则它们通过网络通信
  • 每对TaskManager维护一个永久的TCP连接来交换数据。
  • 使用shuffle连接模式时,每个发送方任务都需要能够向每个接收方任务发送数据。TaskManager需要为每个接收任务提供一个专用的网络缓冲区,此任务对应的发送方会向该缓冲区发送数据。

图3-4显示了这个架构。

  • 在shuffle连接模式下,由于接收端的并行度为4,所以每个发送端都需要4个网络缓冲区来向接收端任务发送数据
  • 由于发送端的并行度也是4,所以每个接收端也都需要4个网络缓冲区来接受发送端发送的数据
  • 同一个TaskManager中的缓存区共用同一条网络连接
  • 在shuffle模式或者broadcast模式下,需要的缓冲区的大小将是并行度的平方级
  • Flink的网络缓冲区的默认配置对于中小型的设置是足够的。

发送方任务接收方任务同一个TaskManager进程中运行时

  1. 发送方任务将传出的记录序列化到缓冲区中,并在缓冲区填满后将其放入队列中。
  2. 接收任务从队列中获取缓冲区,并对传入的记录进行反序列化。
  3. 因此,在同一TaskManager上运行的任务之间的数据传输不会导致网络通信。

3.2.1 基于信用值的流量控制

通过网络连接发送单条记录很低效,并且造成很大的开销。缓冲充分利用网络连接的带宽的关键。在流处理上下文中,缓冲的一个缺点增加了延迟,因为记录是在缓冲区中收集的,而不是立即发送的

Flink实现了一个基于信用值的流控制机制,其工作原理如下。

  1. 接收任务发送任务 授予一定的信用值,也就是告诉发送端为了接收其数据,我为你保留的缓冲区的大小
  2. 一旦发送方收到信用值通知,就会在信用值允许范围内尽可能多的传输缓冲数据,并会附带积压量大小(已经填满准备传输的网络缓冲数目)
  3. 接收方使用预留的缓冲来处理发送的数据,同时依据各发送端的积压量信息计算所有发送方在下一轮的信用值分别是多少。

基于信用值的好处

  • 基于信用的流控制减少了延迟,因为一旦接收方有足够的资源接受数据,发送方就可以发送数据。
  • 此外,在数据分布不均的情况下,它是一种有效的分配网络资源的机制,因为信用是根据发送方的积压的大小授予的。
  • 因此,基于信用的流控制是Flink实现高吞吐低延迟重要一环

3.2.2 任务链接

Flink提供了一种被称为任务链接的优化技术,它可以减少特定条件下本地通信的开销

  • 为了满足任务链接的要求,被链接的所有算子必须配置相同的并行性,并通过本地转发通道进行连接
  • 图3-5所示的操作管道满足这些要求。它由三个算子组成,它们都被配置为任务并行度为2,并与本地转发连接连接。

图3-6描述了如何在任务链接模式下执行管道。

  • 多个算子函数被融合到单个任务中,由单个线程执行。
  • 通过一个简单的方法调用,一个函数产生的记录被单独地移交给下一个函数。
  • 因此,在函数之间传递记录基本上没有序列化开销没有通信开销

Flink在默认情况下会开启任务链接,但是也可以通过配置关闭这个功能

3.3 事件时间处理

正如上一节所述,事件时间语义会生成可重复且一致性的结果,这是许多流应用的刚性需求。下面,我们将描述Flink如何在内部实现和处理事件时间戳和水位线,以支持具有事件时间语义的流应用。

3.3.1 时间戳

Flink事件时间流应用处理的所有记录都必须带时间戳。时间戳将记录与特定的时间点关联起来,**通常是记录所表示的事件发生的时间点。**此外,在现实环境中,时间戳乱序几乎不可避免。

当Flink以事件时间模式处理数据流时,它会根据记录的事件时间戳来触发基于时间的算子操作。

  • 例如,时间窗口操作符根据相关的时间戳将记录分配给窗口。
  • Flink将时间戳编码为8字节长的Long值,并将它们作为元数据附加到记录中
  • 然后内置算子或者用户自定义的算子解析这个Long值就可以获得事件时间。

3.3.2 水位线

水位线用于标注事件时间应用程序中每个任务当前的事件时间。

  • 基于时间的操作符使用这段时间来触发相关的计算计算并推动这个流进行。
  • 例如,基于时间窗口的任务会在水位线超过窗口边界的时候触发计算并且发出结果

在Flink中,水位线被实现为一种带时间戳的特殊记录。如图3-8所示,水位线像常规记录一样在数据流中移动。

水位线有两个基本特性:

  1. 水位线必须是单调递增的,以确保任务的事件时间时钟前进的,而不是向后的。
  2. 水位线与记录的时间戳存在关系。一个时间戳为T的水位线表示:所有后续记录的时间戳都应该大于T。

第二个属性用于处理数据流中时间戳乱序的记录,例如图3-8中具有时间戳2和5的记录。

  • 基于时间的算子任务可能会处理带有无序时间戳的记录,每个任务都会维护一个自己的事件时钟,并通过时间戳来更新这个时钟。
  • 任务有可能接收到违反水位线属性且时间戳 小于先前接收的水位线记录,该记录所属的计算可能已经完成。这样的记录称为迟到记录

水位线的一个意义是,它们允许应用控制结果完整性延迟

3.3.3 水位线传播和事件时间

在本节中,我们将讨论算子如何处理水位线。

  • Flink将水位线实现为算子任务 接收发出特殊记录
  • 任务内部的时间服务会维护一些计时器(Timer),任务可以在计时器服务上注册计时器,以便将来在特定的时间点执行计算,这些计时器依靠收到的水位线来激活。
  • 例如,窗口操作符为每个活动窗口注册一个计时器,当事件时间超过窗口的结束时间时,计时器将清除窗口的状态。

当一个任务收到水位线时,会发生以下操作:

  1. 任务根据水位线的时间戳 更新内部事件时间时钟
  2. 任务的时间服务根据更新后的时钟来执行那些超时计时器的回调。对于每个过期的计时器,任务将调用一个回调函数,该函数可以执行计算并发出记录。
  3. 任务根据更新后的时钟向下游任务发送水位线。

考虑到任务并行,我们将详细介绍一个任务如何将水位线发送到多个下游任务,以及它从多个上游任务获取水位线之后如何推动事件时间时钟前进。具体的方式如下

  1. 任务每个输入分区 维护 分区水位线
  2. 当它从一个分区接收到水位线时,它相应的分区水位线 更新为接收值和当前值的最大值。
  3. 随后,任务将其内部事件时间时钟 更新为所有分区水印的最小值。
  4. 如果事件时间时钟前进,任务处理所有触发的计时器,最后通过所有连接的输出分区 发出更新后的水位线,向所有下游任务广播它的新事件时间。

下图举了一个有4个输入分区和3个输出分区的任务在接受到水位线之后是如何更新它的分区水位线和事件时间时钟的。

Flink的水位线传播算法确保算子任务所发出带时间戳的记录水位线一定会对齐

  • 然而,它依赖于这样一个事实,即所有的分区都不断地提供自增的水位线。
  • 一旦一个分区不推进它的水位线,或者变成完全空闲而不再发送任何记录和水位线,任务的事件时间时钟将不会推进,进而导致计时器不会触发。
  • 因此,如果一个任务没有定期从所有输入任务接收到新的水位线,那么任务处理延迟状态大小显著增加

对于具有两个输入流且水位线差距很大的算子,也会出现类似的效果。具有两个输入流的任务的事件时间时钟将受制于较慢的流,通常较快的流的记录或中间结果将处于缓冲状态,直到事件时间时钟允许处理它们。

3.3.4 时间戳分配和水位线生成

下面介绍时间戳和水位线是如何产生的。

时间戳和水位线通常是在流应用接收数据流时 分配和生成的。Flink DataStream应用可以通过三种方式完成该工作

  1. 在数据源完成:当一个流被读入到一个应用中时。数据源算子将产生带有时间戳的记录流。水位线可以作为特殊记录在任何时间点发出。如果数据源暂时不再发出水位线了,可以将自己声明为空闲,Flink会在后续算子计算水位线时将那些来自空闲数据源的流分区排除在外。
  2. 周期性分配器(Periodic Assigner):这个Assigner可以从每个记录中提取一个时间戳,并定期查询当前的水位线。提取到的时间戳被分配给相应的记录,所查询的水印被加入到流中。
  3. 定点分配器(Punctuated Assigner):它可以用于根据特殊输入记录来生成水位线

3.4 状态管理

大多数流应用有状态的。许多算子不断读取和更新某种状态。不管是内置状态还是用户自定义状态,Flink的处理方式都是一样的。

在本节中,我们将讨论

  1. Flink支持的不同类型的状态。
  2. 状态后端如何存储和维护状态
  3. 有状态应用程序如何通过进行状态再分配来实现扩缩容

通常,需要任务去维护并用于计算结果的数据都属于任务的状态。图3-10显示了任务与其状态之间的典型交互。

  • 任务接收一些输入数据。
  • 在处理数据时,任务可以读取和更新其状态,
  • 并根据其输入数据和状态计算其结果。

然而,高效可靠的状态管理更具挑战性。这包括处理非常大的状态(可能超过内存),并确保在发生故障时不会丢失任何状态。所有与状态一致性、故障处理、高效存储和访问相关的问题都由Flink处理,以便开发人员能够将重点放在应用程序的逻辑上。

在Flink中,状态总是与一个特定的算子相关联。为了让Flink的运行时知道算子有哪些状态,操作符需要对其状态进行注册。根据作用域的不同,有两种类型的状态:算子状态和键值分区状态

3.4.1 算子状态

算子状态的作用域为算子的单个任务。这意味着由同一并行任务之内的记录都可以访问同一状态。算子状态不能被其他任务访问。如下图

Flink为算子状态提供了三类原语

  • 列表状态:将状态表示为一个条目列表
  • 联合列表状态:同样将状态表示为一个条目列表。但是,在出现故障或从保存点启动应用程序时,它的恢复方式与常规列表状态不同。
  • 广播状态:专门为哪些需要保证算子的每个任务状态都相同的场景而设计

3.4.2 键值分区状态

键值分区状态是根据算子输入记录中定义的键来维护和访问的。Flink为每个键维护一个状态实例该状态实例总是位于那个处理对应键值记录的任务上。当任务处理一个记录时,它自动将状态访问范围限制到当前记录的键。因此,具有相同键值分区的所有记录都访问相同的状态。图3-12显示了任务如何与键值分区状态交互。

键值分区状态是一个在算子的所有并行任务上进行分区的分布式键值映射。键值分区状态原语如下

  • 单值状态:为每个键存储一个任意类型的值。该值可以是一个任意复杂的数据结构。
  • 列表状态:为每个键储存一个列表。列表条目可以是任意类型。
  • 映射状态:为每个键存储键值映射。映射的键和值可以是任意类型。

3.4.3 状态后端

为了确保快速的状态访问,每个并行任务都在本地维护其状态。至于状态的具体存储、访问和维护,则一个称为状态后端的可拔插组件来完成

状态后端负责两件事:

  1. 本地状态管理
  2. 将状态以检查点的形式写入远程存储

对于本地状态管理,Flink提供两种实现

  • 第一种状态后端,将状态作为存储在JVM堆内存数据结构中的对象进行管理。
  • 第二种状态后端,序列化状态对象并将它们放入RocksDB中,这种方式是基于硬盘的。
  • 虽然第一种实现提供非常快的访问速度,但它受到内存空间大小的限制。访问RocksDB会比较慢,但是空间大。

状态检查点很重要,因为Flink是一个分布式系统,状态只能在本地维护。TaskManager进程可能在任何时间点失败。因此,它的存储必须被认为是易失的。状态后端负责将任务的状态检查点指向远程和持久存储。用于检查点的远程存储可以是分布式文件系统或数据库系统。状态后端在状态检查点的方式上有所不同。例如,RocksDB状态后端支持增量检查点,这可以显著减少非常大的状态的检查点开销。

3.4.4 有状态的算子的扩缩容

流应用的一个基本需求是根据输入速率的增加或减少而调整算子的并行性。有状态算子,调整并行度比较难。因为我们需要把状态重新分组,分配到与之前数量不等的并行任务上。

3.4.4.1 带有键值分区状态的算子扩缩容

带有键值分区状态的算子可以通过将键重新划分来进行任务的扩缩容。但是,为了提高效率,Flink不会以键为单位来进行划分。相反,Flink以键组作为单位来重新分配,每个键组里面包含了多个键。

3.4.4.2 带有算子列表状态的算子扩缩容

带有算子列表状态的算子在扩缩容时会对列表中的条目进行重新分配。理论上来说,所有并行任务的列表项会被统一收集起来,并再均匀重新分配。如果列表项的数量少于算子的新并行度,一些任务将以空状态开始。图3-14显示了操作符列表状态的重新分配。

3.4.4.3 带有算子联合状态的算子扩缩容

带有算子联合状态的算子会在扩缩容时状态列表中的全部条目 广播到全部任务中。然后,任务自己来选择使用哪些项和丢弃哪些项。如图3-15显示。

3.4.4.4 带有算子广播状态的算子扩缩容

带有算子广播状态的算子在扩缩容时会把状态拷贝到全部新任务上。这样做是因为广播状态要确保所有任务具有相同的状态。在缩容的情况下,直接简单地停掉多余的任务即可。如图3-16显示。

3.5 检查点、保存点、状态恢复

Flink是一个分布式的数据处理系统,且任务在本地维护它们的状态,Flink必须确保这种状态不会丢失,并且在发生故障时保持一致。

在本节中,我们将介绍Flink的检查点故障恢复机制,看一下它们是如何提供精确一次的状态一致性保障。此外,我们还讨论了Flink独特的保存点(savepoint)功能,它就像一把瑞士军刀,解决了运行流式应用过程中的诸多难题。

3.5.1 一致性检查点

有状态流应用程序的一致检查点是在所有任务都处理完等量的原始输出后对全部任务状态进行的一个拷贝。我们可以通过一个朴素算法来对应用建立一致性检查点的过程进行解释。朴素算法的步骤为:

  1. 暂停接收所有输入流。
  2. 等待所有流入系统的数据完全处理,即所有任务已经处理完所有的输入数据。
  3. 将所有任务的状态复制到远程持久存储,生成检查点。当所有任务拷贝完成后,检查点就完成了
  4. 恢复接收所有输入流

下图展示了一个一致性检查点的例子,这个算法读取数据,然后对奇数和偶数分别求和

3.5.2 从一致性检查点中恢复

在流应用执行期间,Flink周期性为应用程序生成检查点。一旦发生故障,**Flink会使用最新的检查点将应用状态恢复到某个一致性的点并重启应用。**图3-18显示了恢复过程。

应用程序恢复分为三个步骤:

  1. 重启整个应用程序。
  2. 将所有状态重置为最新的检查点。
  3. 恢复所有任务的处理

假设所有算子都将它们的状态写入检查点并从中恢复,并且所有输入流的消费位置都能重置到检查点生成那一刻,那么这种检查点和恢复机制可以为整个应用提供精确一次一致性保障。输入流是否可以重置,取决于它的具体实现以及所消费外部系统是否提供相关接口。例如,像Apache Kafka这样的事件日志可以从之前的某个偏移读取记录。相反,如果是从socket消费而来则无法重置,因为socket一旦消耗了数据就会丢弃数据。

我们必须指出,Flink的检查点和恢复机制只能重置流应用内部的状态。根据应用所采用的数据汇算子,在恢复期间某些结果记录可能被多次发送到下游系统,例如事件日志、文件系统或数据库。对于某些存储系统,Flink提供的数据汇可以保证了精确一次输出。

3.5.3 Flink检查点算法

Flink基于Chandy-Lamport的分布式快照算法来实现检查点。该算法并不会暂停整个应用程序,在部分任务持久化状态的过程中,其他任务可以继续执行。

Flink的检查点算法使用一种称为检查点分隔符的特殊类型的记录,它与水位线类似。检查点分隔符携带一个检查点ID来标识它所属的检查点,分隔符从逻辑上将流分割为两个部分。由检查点之前的记录 引起的所有状态修改都包含在分隔符对应的检查点中,而由屏障之后的记录引起的所有修改不包含在分隔符对应的检查点中。

下面我们通过一个简单的例子来解释这个算法

我们使用一个简单的流应用程序示例逐步解释该算法。应用程序由两个数据源任务组成,每个数据源任务消耗一个不断增长的数字流。数据源任务的分别输出奇数分区和偶数分区。每个分区都由一个任务处理,该任务计算所有接收到的数字的总和,并将更新后的总和发送给下游数据汇。该应用程序如图3-19所示。

JobManager通过向每个数据源任务 发送一个新的带有检查点编号的消息启动检查点生成流程,如图3-20所示。

当数据源任务接收到检查点消息时,

  1. 暂停处理数据流,并利用状态后端 生成本地状态的检查点并发送到远程存储
  2. 把该检查点分隔符广播至所有下游任务
  3. 状态后端会在检查点保存好之后通知TaskManager,TaskManager会给JobManager发送确认消息
  4. 在发出了分隔符之后,数据源将恢复正常的工作状态。
  5. 如下图所示

数据源发出的检查点分隔符被广播给下游任务。当下游任务接收到新的检查点分隔符时,将继续等待来自所有其他上游任务的分隔符到达检查点。在等待期间,它继续处理那些尚未提供分隔符的上游任务的记录,而那些提供了分隔符的上游任务的记录会被缓存,等待稍后处理。等待所有检查点到达的过程称为检查点对齐,如图3-22所示。

一旦一个任务从它的所有上游任务收到分隔符,它就会让状态后端生成一个检查点,并将检查点分隔符广播给它的所有下游任务,如图3-23所示。

发出检查点分隔符后,任务就开始处理缓冲的记录。在处理完所有缓冲记录之后,任务会继续处理其输入流。图3-24显示了此时的应用程序。

最后,检查点分隔符到达数据汇。当数据汇接收到分割符时,会先进行对齐操作,然后将自身状态写入检查点,并向JobManager确认接收到该分隔符**。一旦应用的所有任务都发送了检查点确认,JobManager就会将应用程序的检查点记录为已完成。**图3-25显示了检查点算法的最后一步。如前所述,已完成的检查点可用于从故障中恢复应用。

3.5.4 检查点对性能的影响

Flink的检查点算法流应用产生一致的分布式检查点,而不会停止整个应用。但是,它会增加应用的处理延迟。Flink实现了一些调整,可以在某些条件下减轻性能影响。

任务在将其状态写入检查点的过程中,将被阻塞。一种好的方法是先将检查点写入本地,然后任务继续执行它的常规处理,另一个进程负责将检查点传到远端存储。

此外,还可以在分隔符对齐的过程中不缓存那些已经收到分隔符所对应分区的记录,而是直接处理。但这会让一致性保证从精确一次降低到至少一次

3.5.5 保存点

Flink最有价值和最独特的功能之一是保存点。原则上,保存点的生成算法与检查点生成算法一样,因此可以把保存点看作是带有一些额外元数据的检查点。Flink不会自动生成保存点,而是需要用户显式的调用来生成保存点。

3.5.5.1 保存点的使用

给定一个应用和一个兼容的保存点,我们可以从该保存点启动应用。这将把应用的状态初始化为保存点的状态,并从获取保存点的位置运行应用。

保存点可以用在很多情况

  • 可以从保存点启动一个不同但兼容的应用程序。这意味着可以修复一些小bug之后从保存点重启
  • 可以使用不同的并行度启动原应用
  • 可以在不同的集群上启动原应用
  • 可以使用保存点暂停应用程序并在稍后恢复它。这样就可以为其他高优先级的应用腾出集群资源
  • 可以用保存点来完成归档操作
3.5.5.2 从保存点启动应用

在本节中,我们将描述Flink在从保存点启动时如何去初始化应用状态。

一个典型的应用程序包含多个状态,它们分布在不同算子的不同任务上。

下图显示了一个具有三个算子的应用程序,每个算子各运行两个任务。其中一个算子(OP-1)有一个算子状态(OS-1),另一个算子(OP-2)有两个键值分区状态(KS-1和KS-2)。当生成保存点时,所有任务的状态都会被复制到一个持久化存储位置上。

保存点中状态副本会按照算子标识符和状态名称进行组织。该算子标识符和状态名需要能够将保存点的状态数据映射到应用启动后的状态上。当从保存点启动应用程序时,Flink将保存点数据重新分发给相应算子的任务。

如果应用发生了修改,只有那些算子标识符和状态名称没变的状态副本才能被成功还原。默认情况下,Flink会分配唯一的算子标识符。但是,算子的标识符是基于其前面算子的标识符生成的。这样,假如上游的算子标识符发生了变化,那么下游的算子也会变化。因此,我们强烈建议为操作符手动分配唯一标识符,而不依赖于Flink的默认赋值。