当前位置: 首页 > news >正文

kafka 消费者线程安全问题详细探讨

内容概要

图片

主要内容

常见错误案例

下面这段代码大概逻辑

  • 初始化时 实例化KafkaConsumer, 开启线程拉取消息并且处理

  • 资源释放回调 停止线程、调用kafkaConsumer.close进行资源释放

表面上没有问题,但实际上可能出现线程安全问题,因为poll 和 close 两个操作可能同时执行,因此存在线程安全问题, 如何修改,读者自己思考下。

    @PostConstructpublic void consumer(){kafkaConsumer = new KafkaConsumer(getConfig());kafkaConsumer.subscribe(Arrays.asList("test_partition_num"));new Thread(new Runnable() {@Overridepublic void run() {while(running){ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(1000));records.forEach(record->{System.out.println(" partition =" + record.partition()  +" offset  = " + record.offset() + " value = " + record.value());});}}}).start();}@PreDestroypublic void close(){running = false;if(kafkaConsumer != null){kafkaConsumer.close();}}

消费者非线程安全代码解读

kafka生成者是线程安全的,但消费者是非线程安全的。KafkaConsumer

  • 相关操作前

    • 调用acquire()方法,校验线程安全问题,如果发现其他线程也在操作,则直接抛出异常。

  • 操作完成后

    • 调用release()清除痕迹

acquire()相对于加锁,release()相当于释放锁。

参看poll 方法实现,一目了然。

    private void acquire() {long threadId = Thread.currentThread().getId();if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");} else {this.refcount.incrementAndGet();}}private void release() {if (this.refcount.decrementAndGet() == 0) {this.currentThread.set(-1L);}}

图片

poll源码

如何实现消费者多线程消费消息呢

思路1

每次实例化一个 KafkaConsumer

这种方式实现简单,但每次都需要建立TCP 链接


思路2

相关操作方法 加上  synchronized,获取使用Lock 加锁保证线程安全

这种方式性能较差

思路3

拉取消息使用一个线程, 消息处理使用多线程

因为通常拉取消息比较快,消息处理比较耗时,由于消息处理不涉及KafkaConsumer 相关API 操作,因此不存在线程安全问题。这种方式建议消息位移设置自动提交,否则编程复杂度较高。

示例代码

ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(1000));executorService.execute(()->{//处理消息records.forEach(record->{System.out.println(" partition =" + record.partition()  +" offset  = " + record.offset() + " value = " + record.value());});
});

旁敲侧击 举一反三

面试题回顾 Dubbo 线程模型

通常我们线程分为两类

  • IO 线程:负责网络通信的读写操作,接收和发送请求与响应。

  • 业务线程:处理具体的业务逻辑,避免因业务处理耗时过长而阻塞 IO 线程。


Dubbo 线程模型有几种你还记得否?该如何选择?

  • AllDispatcher:所有消息都派发到线程池,包括请求、响应、连接事件、断开事件、心跳等。

  • DirectDispatcher:所有消息都不派发到线程池,全部在 IO 线程上直接执行。

  • MessageOnlyDispatcher:只有请求和响应消息派发到线程池,其它连接断开、心跳等消息直接在 IO 线程上执行。

  • ExecutionDispatcher:只把请求消息派发到线程池,响应和其它连接、断开、心跳等消息直接在 IO 线程上执行。

其实选择的依据 业务处理的快慢,如果业务处理很快则建议让业务处理逻辑放到 IO线程中执行,这样避免线程上下文切换影响性能。反之则处理逻辑需要放到具体的业务线程中执行。

一般来说业务执行需要查询数据库,绝大数场景建议使用默认的 AllDispatcher 

是不是又和我一起温故知新了,加油吧 少年 !!!

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 中台架构下的数据仓库与非结构化数据整合
  • 详解前驱图与PV操作
  • R语言中的shiny框架
  • 《AI设计类工具系列之一——FigJam AI》
  • 邀请功能的实现分析
  • 初识C语言(三)
  • 嵌入式开发中学习C++的用处?
  • 拼图缺口形状检测系统源码分享
  • 解锁电商新视界:京东商品详情API——您的深度商品信息探索利器
  • Javax Validation 自定义注解校验(身份证号校验)
  • 线程池的执行流程和配置参数总结
  • np.array_fancy_indexing花式索引
  • Vue.js入门
  • 如何使用ssm实现基于BS的库存管理软件设计与实现+vue
  • AI中医香方仪丨OPENAIGC开发者大赛企业组AI创作力奖
  • JS中 map, filter, some, every, forEach, for in, for of 用法总结
  • Druid 在有赞的实践
  • dva中组件的懒加载
  • git 常用命令
  • JavaScript 一些 DOM 的知识点
  • Linux学习笔记6-使用fdisk进行磁盘管理
  • nodejs调试方法
  • PaddlePaddle-GitHub的正确打开姿势
  • react 代码优化(一) ——事件处理
  • Travix是如何部署应用程序到Kubernetes上的
  • Vim Clutch | 面向脚踏板编程……
  • 阿里研究院入选中国企业智库系统影响力榜
  • 给github项目添加CI badge
  • 如何胜任知名企业的商业数据分析师?
  • 适配iPhoneX、iPhoneXs、iPhoneXs Max、iPhoneXr 屏幕尺寸及安全区域
  • 我们雇佣了一只大猴子...
  • ​1:1公有云能力整体输出,腾讯云“七剑”下云端
  • ​Distil-Whisper:比Whisper快6倍,体积小50%的语音识别模型
  • #Linux(Source Insight安装及工程建立)
  • #每天一道面试题# 什么是MySQL的回表查询
  • #我与Java虚拟机的故事#连载07:我放弃了对JVM的进一步学习
  • (04)odoo视图操作
  • (2024,RWKV-5/6,RNN,矩阵值注意力状态,数据依赖线性插值,LoRA,多语言分词器)Eagle 和 Finch
  • (3)选择元素——(14)接触DOM元素(Accessing DOM elements)
  • (Redis使用系列) SpringBoot 中对应2.0.x版本的Redis配置 一
  • (二刷)代码随想录第16天|104.二叉树的最大深度 559.n叉树的最大深度● 111.二叉树的最小深度● 222.完全二叉树的节点个数
  • (附源码)spring boot基于小程序酒店疫情系统 毕业设计 091931
  • (附源码)ssm考生评分系统 毕业设计 071114
  • (官网安装) 基于CentOS 7安装MangoDB和MangoDB Shell
  • (七)理解angular中的module和injector,即依赖注入
  • (三)模仿学习-Action数据的模仿
  • (十一)JAVA springboot ssm b2b2c多用户商城系统源码:服务网关Zuul高级篇
  • (游戏设计草稿) 《外卖员模拟器》 (3D 科幻 角色扮演 开放世界 AI VR)
  • (轉貼)《OOD启思录》:61条面向对象设计的经验原则 (OO)
  • .net 4.0发布后不能正常显示图片问题
  • .NET C# 配置 Options
  • .NET6使用MiniExcel根据数据源横向导出头部标题及数据
  • .Net中间语言BeforeFieldInit
  • /etc/fstab 只读无法修改的解决办法
  • :O)修改linux硬件时间