当前位置: 首页 > news >正文

Flink Checkpoint expired before completing解决方法

在Flink消费Kafka日志的时候出现了这样的一则报错,

JobManager报错如下:

2024-03-07 15:21:12,500 [Checkpoint Timer] WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 181 for job 835243b848a64f2fae918faf23c5392c. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2438) [flink-dist-1.17-vvr-8.0.7-2-SNAPSHOT.jar:1.17-vvr-8.0.7-2-SNAPSHOT]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_372]at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_372]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_372]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_372]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_372]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadP

TaskManager报错如下:

2024-03-07 15:22:50,882 WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 4673 for job 894b4dd298704c6b91bfaa2c06aa40b4. (2 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2259) [flink-dist-1.15-vvr-6.0.6-SNAPSHOT.jar:1.15-vvr-6.0.6-SNAPSHOT]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_102]at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_102]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:186) [?:1.8.0_102]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299) [?:1.8.0_102]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) [?:1.8.0_102]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) [?:1.8.0_102]at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]
2024-03-07 15:22:50,886 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.

这个出错的原因主要是:CheckPoint超时导致的。
要解决这个问题:首先要从CheckPoint的原理说起:

  1. JobManager在定时做CheckPoint操作的时候,CheckpointCoordinator会周期性的向所有source算子发送barrier
  2. 当Source算子接收到Barrier的时候,就会启动CheckPoint处理。有同步和异步两个过程,把当前的状态写入到持久化存储中,最后向CheckPointCoordinator报告快照制作情况,同事向下游算子广播该barrier,恢复数据处理
  3. 每个算子按照步骤3不断制作快照并且向下广播,直到最后的barrier传递到sink算子,Sink算子向CheckpointCoordinator报告快照完成,交给CheckpointCoordinator处理
  4. 当CheckpointCoordinator收到所有的算子报告之后,认为该周期的快照制作成功,否则,如果在规定的时间内没有收到算子的报告,超时了,则认为本周期快照制作失败。

现在回到我们的报错,这个明显是算子的Checkpoint时间超时了,点开CheckPoint节点一看,Checkpoint一直报错
CheckPoint报错
刚好我们设置的Checkpoint超时是6秒,如下:

env.getCheckpointConfig().setCheckpointTimeout(6000L)

为此本次报错主要解决方法有几种:
1.关闭Checkpoint,但是这个有风险
主要有2种方法:
第一种:直接关闭

env.getCheckpointConfig().disableCheckpointing()

第二种:加大CheckPoint出错容忍告警次数

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);

2.延长Checkpoint的超时时间并且把Checkpoint的间隔时间调长一点

env.getCheckpointConfig().setCheckpointTimeout(60*1000L);//CK 1分钟延迟
env.enableCheckpointing(60*60*1000L); // CK间隔1小时做一次

3.这种方法如果不行,说明是CK过于复杂,需要较多的资源和时间,这个时候,可以考虑修改CK逻辑,使其尽量简便,也可以使用RockDB加快CK的速度

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • R 语言学习教程,从入门到精通,R 数据框(14)
  • 使用html+css+js实现完整的登录注册页面
  • Python酷库之旅-第三方库Pandas(082)
  • 数据集的简单制作和使用
  • TS中什么是泛型
  • MySQL与PostgreSQL语法区别
  • 小山菌_代码随想录算法训练营第六十二天|dijkstra(堆优化版)精讲 、Bellman_ford 算法精讲
  • 重新连接 到 时出错 Microsoft Windows Network:本地设备名已在使用中
  • Qt:线程
  • LeetCode 热题100-2
  • Unity引擎加密方案解析
  • Linux装ifort环境
  • el-select选项框样式修改
  • 【cv::triangulatePoints】其中的投射矩阵P(3x4)是怎么得到的?(内外参数K[R|t]到最终矩阵的变换过程)
  • 【ESP32】在原理图中为什么要在VCC处加几个电容
  • 【跃迁之路】【699天】程序员高效学习方法论探索系列(实验阶段456-2019.1.19)...
  • 【跃迁之路】【735天】程序员高效学习方法论探索系列(实验阶段492-2019.2.25)...
  • 0x05 Python数据分析,Anaconda八斩刀
  • cookie和session
  • Git的一些常用操作
  • JS字符串转数字方法总结
  • Linux gpio口使用方法
  • nginx(二):进阶配置介绍--rewrite用法,压缩,https虚拟主机等
  • php的插入排序,通过双层for循环
  • Python学习之路16-使用API
  • Spring核心 Bean的高级装配
  • vue脚手架vue-cli
  • webgl (原生)基础入门指南【一】
  • Xmanager 远程桌面 CentOS 7
  • 程序员该如何有效的找工作?
  • 初识 webpack
  • 诡异!React stopPropagation失灵
  • 技术攻略】php设计模式(一):简介及创建型模式
  • 力扣(LeetCode)21
  • 前端面试之CSS3新特性
  • 融云开发漫谈:你是否了解Go语言并发编程的第一要义?
  • 听说你叫Java(二)–Servlet请求
  • 通过几道题目学习二叉搜索树
  • 网页视频流m3u8/ts视频下载
  • 以太坊客户端Geth命令参数详解
  • 怎么把视频里的音乐提取出来
  • 容器镜像
  • ​1:1公有云能力整体输出,腾讯云“七剑”下云端
  • # include “ “ 和 # include < >两者的区别
  • #{}和${}的区别是什么 -- java面试
  • (20)目标检测算法之YOLOv5计算预选框、详解anchor计算
  • (C语言)输入一个序列,判断是否为奇偶交叉数
  • (附源码)springboot车辆管理系统 毕业设计 031034
  • (回溯) LeetCode 131. 分割回文串
  • (亲测成功)在centos7.5上安装kvm,通过VNC远程连接并创建多台ubuntu虚拟机(ubuntu server版本)...
  • .NET 5.0正式发布,有什么功能特性(翻译)
  • .NET Core 通过 Ef Core 操作 Mysql
  • .net core 微服务_.NET Core 3.0中用 Code-First 方式创建 gRPC 服务与客户端
  • .NET core 自定义过滤器 Filter 实现webapi RestFul 统一接口数据返回格式
  • .NET+WPF 桌面快速启动工具 GeekDesk