运行flink任务的时候提示:Could not perform checkpoint 1 for operator Source

提问者:帅平 问题分类:大数据
运行flink任务的时候报错了,错误信息提示如下:
java.lang.Exception: Could not perform checkpoint 1 for operator Source: MySQL Source -> Sink: Writer -> Sink: Committer (1/1)#0.
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync(StreamTask.java:1150)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.base/java.lang.Thread.run(Thread.java:833)

这是什么原因?
1 个回答
习惯所有的虚假
习惯所有的虚假
这是因为在代码里面执行了enableCheckpointing,但是没有设置enableCheckpointing信息,所以给他添加上就可以了,示例如下:
//开启检查点
env.enableCheckpointing(10000,CheckpointingMode.EXACTLY_ONCE);
//获取系统环境的检查点配置
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//指定检查点的存储位置
checkpointConfig.setCheckpointStorage("file:\\\C:\Users\Administrator\Downloads\tmp");
//checkpoint的超时时间: 默认10分钟
checkpointConfig.setCheckpointTimeout(60000);
//同时运行中的checkpoint的最大数量
checkpointConfig.setMaxConcurrentCheckpoints(1);
//最小等待间隔: 上一轮checkpoint结束 到 下一轮checkpoint开始 之间的间隔
checkpointConfig.setMinPauseBetweenCheckpoints(1000);
//取消作业时,checkpoint的数据 是否保留在外部系统,DELETE_ON_CANCELLATION:主动cancel时,删除存在外部系统的chk-xx目录 (如果是程序突然挂掉,不会删),RETAIN_ON_CANCELLATION:主动cancel时,外部系统的chk-xx目录会保存下来
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//允许 checkpoint 连续失败的次数,默认 0 表示 checkpoint 一失败,job 就挂掉
checkpointConfig.setTolerableCheckpointFailureNumber(10);
发布于:8个月前 (12-07) IP属地:未知
我来回答