当前位置: 首页 > news >正文

kakfa发版丢消息事件分析

背景

其他部门同事反馈在项目发版/重启(kill -15)的那段时间,经常会出现导致 C 端业务出现问题,从而产生资损

一听资损,赶紧应答下来,了解了下具体情况,然后立马去排查了

问题分析

结合同事的描述以及对业务的了解,很快就定位到是 kafka 消息丢失导致 C 端业务出现问题

业务当前消费架构图


从上图可以了解到几个点会导致目前这个场景消息丢失

  1. kafka 一秒一次的位移提交
  2. Queue 队列没消费完任务
  3. work 线程池从 Queue 中拉取的任务没消费完(每次拉取一个)

问题所在:因C端业务特性,非准实时的消息是没有意义的(分钟级),所以kafka的自动提交位移实际上是符合业务需求,三点结合起来看问题应该是出在:在发版时 消费单线程 依旧在拉取消息写入 Queue,并且后续的 线程池也没有将 Queue中的任务给处理完

消费架构改造

  1. 改造消费流程
  2. 启动时增加JVM关闭钩子,在关闭前将 isRunning 修改为fale,从而停止 消费单线程 继续拉取kafka消息
  3. 优雅关闭 work线程池

// shutdown() 与 shutdownNow()这里也给到一段shutdown测试代码
ThreadPoolExecutor executorService =new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
AtomicInteger integer = new AtomicInteger();
for (int i = 0; i < 100; i++) {executorService.execute(() -> {try {System.out.println(new Date() + "=====>" + integer.incrementAndGet());Thread.sleep(1000L);} catch (Exception e) {e.printStackTrace();}});
}Thread.sleep(5000L);
executorService.shutdown();
// executorService.shutdownNow();
System.out.println("线程池已触发shutdown");

随之而来的另一个问题,若在JVM关闭钩子中对 work线程池 操作shutdown,在任务中是有使用到Spring容器中的bean,若bean销毁了,那么work线程池中的任务都无法再执行成功(具体销毁优先级细则可自行百度,这里不做延伸)。
基于这个问题,回想到之前常用的一个注解 @PostConstruct 的一个孪生兄弟 @PreDestroy,这是在Java规范JSR-250引入的注解,定义了对象的创建和销毁工作,那么Spring必然对它有做支持,测试代码如下

ThreadPoolExecutor executorService =new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>());@PostConstruct
public void postConstruct(){AtomicInteger integer = new AtomicInteger();for (int i = 0; i < 100; i++) {executorService.execute(() -> {try {System.out.println(new Date() + "=====>" + integer.incrementAndGet());Thread.sleep(1000L);} catch (Exception e) {e.printStackTrace();}});}
}@PreDestroy
public void preDestroy(){executorService.shutdown();
}// 增加一个测试关闭的接口
@GetMapping("/shutdown")
public void shutdown() {System.exit(0);
}

测试结果依旧失败,看日志打印是正在处理线程池中已被接收的任务时挂掉的(这不科学,上面shutdown()测试案例结果明明会等待所有任务结束以后再结束),心里一群 草姓的马 飘过-_-

转念一想:其实这样也对,若一个池任务过多导致一直无法kill掉进程,这种行为也不对…那有没有什么补偿机制可以用,emm,山重水复疑无路,柳暗花明又一村哇,Doug Lea大神名不虚传,早就为我们考虑好了

// 贴出改动方法
@PreDestroy
public void preDestroy(){executorService.shutdown();try {if(executorService.awaitTermination(5, TimeUnit.SECONDS)){System.out.println("任务执行完毕结束");} else {System.out.println("time out 结束");}} catch (InterruptedException e) {System.out.println("Interrupted while waiting for executor");Thread.currentThread().interrupt();executorService.shutdownNow();}
}

嘿嘿,这么一改顺眼多了,线程池在shutdown后再至多等待N秒(若无任务则直接返回true),业务可以根据特性去决定此值配置


但是这么写多麻烦,那么多重要的线程池各个都要在这里写,那Spring如何实现线程池的优雅停的呢?想到Spring的生命周期中的 销毁回调,实现 DisposableBean 即可,那看看ThreadPoolTaskExecutor,其父类ExecutorConfigurationSupport在处理销毁时,会判定其 waitForTasksToCompleteOnShutdown 参数是否为true来决定是否要调用shutdown(),并且根据其 awaitTerminationSeconds 参数来决定是否需要调用 ExecutorService.awaitTermination 去等待线程池处理一定时间

那让我们来改造改造现在的work线程池,指定业务指定配置以后,交给spring去帮我们去做这些重复的销毁动作

写到最后

若使用Spring提供线程池,并指定以下两个参数即可实现线程池优雅停

  1. waitForTasksToCompleteOnShutdown 参数,在销毁时会帮我们调用一次线程池shutdown()
  2. awaitTerminationSeconds 参数,在调用shutdown以后可以等等一段时间,从而尽可能的将线程池中任务给执行完毕

ExecutorService.awaitTermination 虽好,可不要贪杯(滥用)哦,多个线程池都指定此参数并在销毁时都存在大量的任务,可能会导致 kill -15 的时间增加,从而出现一种 “kill不掉” 的现象

相关文章:

  • CRMEB Pro版高并发商城系统秒杀需要多大的服务器
  • [Linux内核驱动]模块参数
  • 【开源项目】智慧北京案例~超经典实景三维数字孪生智慧城市CIM/BIM数字孪生可视化项目——开源工程及源码!
  • redis之集群
  • Python web 开发 flask 实践
  • 汇凯金业:现货黄金投资平仓策略有哪些
  • 暴雨讲堂|通往AGI的必由之路—AI agent是什么?
  • 超详细的linux-conda环境安装教程
  • svm 超参数
  • maxwell源码编译安装部署
  • Docker 从安装到使用的完整指南
  • Python抓取高考网图片
  • Android屏幕旋转流程(1)
  • 代理网络基础设施 101:增强安全性、速度和可扩展性
  • 生成式人工智能时代的5大网络安全趋势
  • 【108天】Java——《Head First Java》笔记(第1-4章)
  • 【腾讯Bugly干货分享】从0到1打造直播 App
  • chrome扩展demo1-小时钟
  • CSS居中完全指南——构建CSS居中决策树
  • CSS相对定位
  • Dubbo 整合 Pinpoint 做分布式服务请求跟踪
  • in typeof instanceof ===这些运算符有什么作用
  • JS正则表达式精简教程(JavaScript RegExp 对象)
  • leetcode98. Validate Binary Search Tree
  • MySQL的数据类型
  • Vue 动态创建 component
  • Work@Alibaba 阿里巴巴的企业应用构建之路
  • XML已死 ?
  • 关于Android中设置闹钟的相对比较完善的解决方案
  • 力扣(LeetCode)357
  • 面试遇到的一些题
  • 如何实现 font-size 的响应式
  • 算法---两个栈实现一个队列
  • 再谈express与koa的对比
  • 怎么把视频里的音乐提取出来
  • 2017年360最后一道编程题
  • 教程:使用iPhone相机和openCV来完成3D重建(第一部分) ...
  • ​zookeeper集群配置与启动
  • ​马来语翻译中文去哪比较好?
  • (iPhone/iPad开发)在UIWebView中自定义菜单栏
  • (顶刊)一个基于分类代理模型的超多目标优化算法
  • (附源码)python旅游推荐系统 毕业设计 250623
  • (附源码)计算机毕业设计SSM疫情居家隔离服务系统
  • (三)Kafka离线安装 - ZooKeeper开机自启
  • (译) 理解 Elixir 中的宏 Macro, 第四部分:深入化
  • (转)setTimeout 和 setInterval 的区别
  • (转)编辑寄语:因为爱心,所以美丽
  • (转)使用VMware vSphere标准交换机设置网络连接
  • .h头文件 .lib动态链接库文件 .dll 动态链接库
  • .NET Core中Emit的使用
  • .Net Core中的内存缓存实现——Redis及MemoryCache(2个可选)方案的实现
  • .Net FrameWork总结
  • .net 重复调用webservice_Java RMI 远程调用详解,优劣势说明
  • .NET/C# 解压 Zip 文件时出现异常:System.IO.InvalidDataException: 找不到中央目录结尾记录。
  • .Net调用Java编写的WebServices返回值为Null的解决方法(SoapUI工具测试有返回值)