在flink中,Flink是使用流重放和检查点来实现容错的。本篇文章我们介绍下Flink的检查点。 Flink的检查点是标记每一个输入流中的特定点以及每个算子的对应状态。检查点的间隔设置为多少,是在执行期间的容错开销与恢复时间(需要重放的记录数量)之间进行权衡的一种结果。
Flink能够使用检查点恢复状态到流中的某个位置,从而向应用程序提供与无故障执行时一样的语义。
Flink的检查点机制会和持久化存储进行交互,读/写“流”与“状态”,一般需要满足以下几点要求。
1、一个能够回放一段时间内数据的持久化数据源,如持久化消息队列(Kafka,Rabbitmq等) 2、存放状态的持久化存储,通常为分布式文件系统(hdfs,s3,nfs,ceph等)
开启配置检查点的属性信息
序号 | 检查点属性 | 说明 |
1 | 精确一次(Exactly-Once)和至少一次(At-Least-Once) | 可以通过向enableCheckPointing()方法中传入一个模式来选择使用保证等级中的哪一种。对于大多数应用来说,精确一次是比较好的选择,至少一次可能与某些延迟超低的应用关联较大。 |
2 | 检查点超时 | 如果检查点执行的时间超过了改配置的阈值,则还在进行中的检查点操作就会被抛弃 |
3 | 检查点之间的最小时间 | 该熟悉定义在检查点之间需要多久的时间,以确保流应用在检查点之间有足够的进展。如果将值设置为5000,则无论检查点持续时间与间隔是多久,在前一个检查点完成时的至少5s后才开始下一个检查点。使用检查点之间的最小时间来配置应用比检查点间隔容易很多,因为检查点之间的最小时间在检查点的执行时间超过平均值时不会受到影响。 |
4 | 并发检查点的数 | 在默认情况下,如果上一个检查点未完(失败或成功),则系统不会出发另一个检查点,这样可以确保拓扑不会在检查点上花费太多时间,从而不影响正常的处理流程。单允许多个检查点并行进行,对于有确定的处理延迟,但是仍然想进行频繁的检查点去最小化故障后重试的管道来说,是有意义的。 此选项不能和检查点之间的最小时间同时使用 |
5 | 外部化检查点 | 可以将周期存储检查点配置到外部系统中。将他们的元数据写入持久化存储上,并且在作业失败时不会被自动删除。在这种方式下,如果作业失败,则会有一个现有的检查点去恢复 |
6 | 在检查点出错时使任务失败或者继续进行任务 | 此选择决定了在任务的检查点检查的过程中发生错误时,是否使任务失败。默认会使任务失败,也可以禁用该选项,这个任务会简单把检查点错误信息报告给检查点协调员并继续运行。 |
7 | 优先从检查点恢复 | 此属性确定作业是否在最新的检查点回退,即使有更近的保存点可用,也可以潜在的减少恢复时间。 |
最后附上代码,设置检查点的一些配置信息。
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //设置每秒钟开始一次检查点 environment.enableCheckpointing(1000); //将模式设置为精确一次 environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置两次检查点之间的时间间隔是0.5秒 environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); //设置检查点必须在1分钟内完成,如果没完成,则抛弃掉 environment.getCheckpointConfig().setCheckpointTimeout(60000); //设置同一个时间只允许一个检查点在工作 environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //开启在作业终止后仍然保留外部检查点 environment.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //允许在有更近保存点时间退到检查点 environment.getCheckpointConfig().setPreferCheckpointForRecovery(true);
还没有评论,来说两句吧...