当前位置: 首页 > news >正文

flink的异常concurrent.TimeoutException: Heartbeat of TaskManager with id的解决

背景

在使用flink进行集成测试时,我们会使用MiniClusterWithClientResource类,但是当我们断点导致在某个方法执行的时间比较长时,会有错误发生,那么该如何解决这个错误呢?

处理concurrent.TimeoutException: Heartbeat of TaskManager with id错误

其实关键的配置是heartbeat.timeout,这个错误是JobManager抛出的,意思是和某个TaskManager的心跳中断超过了指定的时间,我们把这个参数配置到MiniClusterWithClientResource类中就可以了,代码如下所示:

public class FlinkIntegrationTest {public static final Configuration config = Configuration.fromMap(new HashMap<String, String>() {{put("heartbeat.timeout", "300000");}});@ClassRulepublic static MiniClusterWithClientResource flinkCluster =new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberSlotsPerTaskManager(1).setNumberTaskManagers(3).build());@Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements("world", "hi").keyBy(e -> "1").flatMap(statefulFlatMap).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi world")));}@Testpublic void testStateFlatMap1() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements("world", "hi", "world").keyBy(e -> e).flatMap(statefulFlatMap).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi", "hello world world")));}// create a testing sinkprivate static class CollectSink implements SinkFunction<String> {// must be staticpublic static final List<String> values = Collections.synchronizedList(new ArrayList<>());@Overridepublic void invoke(String value, Context context) throws Exception {values.add(value);}}}

相关文章:

  • 河南省第五届“金盾信安杯”网络与数据安全大赛实操技能赛 部分wp(自己的一些思路和解析 )(主misc crypto )
  • 【华为OD】B\C卷真题 100%通过:字符串统计 C/C++实现
  • 记录一次因内存不足而导致hiveserver2和namenode进程宕机的排查
  • 千云物流 - 使用k8s负载均衡openelb
  • 【Spring源码】Spring Event事件
  • 如何给echarts的legend设置不同的样式和位置 legend分组显示
  • 备考雅思记录
  • u8g2图形库——丝滑菜单制作
  • Linux系统常用指令大全(图文详解)
  • 发布鸿蒙的第一个java应用
  • 什么是索引?索引的作用是什么?
  • app小程序定制的重点|软件定制开发|网站搭建
  • 你了解Postman 变量吗?
  • DeepWalk代码实战-维基百科词条图嵌入可视化
  • pcie-2-rj45速度优化
  • 【许晓笛】 EOS 智能合约案例解析(3)
  • 2017届校招提前批面试回顾
  • CentOS7 安装JDK
  • hadoop集群管理系统搭建规划说明
  • httpie使用详解
  • Java编程基础24——递归练习
  • Java超时控制的实现
  • JAVA多线程机制解析-volatilesynchronized
  • Magento 1.x 中文订单打印乱码
  • 阿里云ubuntu14.04 Nginx反向代理Nodejs
  • 动手做个聊天室,前端工程师百无聊赖的人生
  • 回顾2016
  • 记录一下第一次使用npm
  • 开年巨制!千人千面回放技术让你“看到”Flutter用户侧问题
  • 软件开发学习的5大技巧,你知道吗?
  • 微信开源mars源码分析1—上层samples分析
  • 原生JS动态加载JS、CSS文件及代码脚本
  • 你学不懂C语言,是因为不懂编写C程序的7个步骤 ...
  • #QT(智能家居界面-界面切换)
  • (11)MATLAB PCA+SVM 人脸识别
  • (17)Hive ——MR任务的map与reduce个数由什么决定?
  • (done) NLP “bag-of-words“ 方法 (带有二元分类和多元分类两个例子)词袋模型、BoW
  • (八)Spring源码解析:Spring MVC
  • (超简单)使用vuepress搭建自己的博客并部署到github pages上
  • (一)认识微服务
  • (原+转)Ubuntu16.04软件中心闪退及wifi消失
  • (转) SpringBoot:使用spring-boot-devtools进行热部署以及不生效的问题解决
  • *Algs4-1.5.25随机网格的倍率测试-(未读懂题)
  • .bat批处理(六):替换字符串中匹配的子串
  • .dwp和.webpart的区别
  • .NET Conf 2023 回顾 – 庆祝社区、创新和 .NET 8 的发布
  • .net core 3.0 linux,.NET Core 3.0 的新增功能
  • .Net Remoting(分离服务程序实现) - Part.3
  • .NET建议使用的大小写命名原则
  • .NET企业级应用架构设计系列之结尾篇
  • /run/containerd/containerd.sock connect: connection refused
  • @AliasFor注解
  • @RequestParam详解
  • [.net] 如何在mail的加入正文显示图片
  • []新浪博客如何插入代码(其他博客应该也可以)