在flink datastream api的应用程序开发中,我们必然会使用到检查点,也就是checkpoint,在flink datastream应用程序开发中,checkpoint是默认关闭的,在准备运行到生产环境的时候,我们是要求必须打开这个checkpoint。打开checkpoint的方法是:调用StreamExecutionEnvironment的enableCheckpointing(n)方法来启用chekpoint。这里的n指的是checkpoint的时间间隔,单位是毫秒。示例代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每 1000ms 开始一次 checkpoint env.enableCheckpointing(1000);
这里我们设置的是1000,代表的就是每1秒执行下checkpoint检查。并且把状态记录下。
当然除了这个基础的开启checkpoint之外,还有一些其他的关于checkpoint的参数,下面列举下:
序号 | 参数 | 默认值 | 类型 | 说明 |
1 | state.backend.incremental | false | Boolean | 是否创建增量检查点,如果为true的话,那么每次仅存储与前一个检查点的差异,而不是存储完整的检查状态。 |
2 | state.backend.local-recovery | false | Boolean | 是否从本地恢复 |
3 | state.checkpoint-storage | none | String | 用于存储检查点,可以使filesystem或者jobmanager |
4 | state.checkpoints.dir | none | String | 存储点的数据存储路径 |
5 | state.checkpoints.num-retained | 1 | Integer | 要保留的检查点的最大数量 |
6 | state.savepoints.dir | none | String | 保存点的默认目录 |
7 | state.storage.fs.memory-threshold | 20kb | MemorySize | 数据文件的最小值,小于当前最小值的数据都会存储在根目录下,最大值是1MB |
8 | state.storage.fs.write-buffer-size | 4096 | Integer | 写入文件系统的检查点缓冲区的默认大小 |
9 | taskmanager.state.local.root-dirs | none | String | 用于存储本地恢复的根文件目录 |
关于checkpoint的话,建议设置全一点,下面给一张案例图:
备注:
1、为了保证使用两阶段提交的算子可以提交所有的数据,任务会在所有算子都调用 finish() 方法后等待下一次 checkpoint 成功后退出。 需要注意的是,这一行为可能会延长任务运行的时间,如果 checkpoint 周期比较大,这一延迟会非常明显。 极端情况下,如果 checkpoint 的周期被设置为 Long.MAX_VALUE,那么任务永远不会结束,因为下一次 checkpoint 不会进行。
还没有评论,来说两句吧...