当前位置: 首页 > news >正文

Kafka Producer之事务性

文章目录

  • 1. 跨会话幂等性失效
  • 2. 开启事务
  • 3. 事务流程原理

事务性可以防止跨会话幂等性失效,同时也可以保证单个生产者的指定数据,要么全部成功要么全部失败,不限分区。不可以多个生产者共用相同的事务ID。

1. 跨会话幂等性失效

幂等性开启后,broker会对每个分区记录生产者状态,并且生产者具有PID,消息被标记为PID加上序列号,数据重复和有序都是在其基础之上运作的。

生产者重启等因素会导致PID变化,导致幂等性短暂失效。

2. 开启事务

因为事务是基于幂等性的,所以幂等性的配置都要有。

package org.dragon.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class KafkaProducerTransactionTest {public static void main(String[] args) throws InterruptedException, ExecutionException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//配置acks等级config.put(ProducerConfig.ACKS_CONFIG, "-1");config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);config.put(ProducerConfig.RETRIES_CONFIG, 5);// 把buffer改小一点,让测试数据组成更多batchconfig.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);// 事务IDconfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-tx-id");KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);//初始化事务producer.initTransactions();try {// 开启事务producer.beginTransaction();for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test2","" + i,"我是你爹" + i);//发送recordFuture<RecordMetadata> send = producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");send.get();}// 提交事务producer.commitTransaction();}catch(Exception e) {// 中止事务producer.abortTransaction();e.printStackTrace();}finally{//关闭producerproducer.close();}}
}

3. 事务流程原理

在这里插入图片描述

  1. 查找联系事务管理器

  2. 根据设置的TRANSACTIONAL_ID_CONFIG计算PID,计算方式为哈希值%分区数量

  3. 初始化事务

  4. 将涉及到的分区信息发送给事务管理器,方便事务管理器管理和监控这些分区的事务状态。

  5. 生成数据,发送数据到对应Broker

  6. 对应Broker把分区信息发送给事务管理器,为了确认哪些分区确实已经收到了事务中的消息

  7. 对应Broker返回ACKS

  8. 生产者发起结束事务的请求

  9. 修改事务状态为准备提交

  10. 事务管理器将事务标记为成功或者失败,并通知对应broker。

  11. 修改事务状态为已提交

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • VSCode STM32嵌入式开发插件记录
  • 计算机毕业设计hadoop+spark+hive物流大数据分析平台 仓储数据分析 物流预测系统 物流信息爬虫 物流大数据 机器学习 深度学习 知识图谱 大数据
  • 软件物料清单科普 | SBOM对开源管理的意义
  • Apache POI-Excel入门与实战
  • 数据仓库中的数据治理流程
  • 什么是离线语音识别芯片?与在线语音识别的区别
  • nfs和samba
  • 服务器上使用Docker部署sonarQube,并集成到Jenkins实现自动化。
  • 网站验证:确保网络安全与信任的重要步骤
  • C2W3.Assignment.Language Models: Auto-Complete.Part1
  • 一个简单好用安全的开源交互审计系统,支持SSH,Telnet,Kubernetes协议(带私活)
  • 智慧隧道可视化:安全与效率的智能保障
  • SpringMVC实现文件上传
  • stm32 io输入中断
  • 系统架构设计师教程 第4章 信息安全技术基础知识-4.1 信息安全基础知识-解读
  • CoolViewPager:即刻刷新,自定义边缘效果颜色,双向自动循环,内置垂直切换效果,想要的都在这里...
  • CSS选择器——伪元素选择器之处理父元素高度及外边距溢出
  • FastReport在线报表设计器工作原理
  • Git 使用集
  • iOS 系统授权开发
  • JS变量作用域
  • js操作时间(持续更新)
  • k8s如何管理Pod
  • Unix命令
  • 阿里云购买磁盘后挂载
  • 动态规划入门(以爬楼梯为例)
  • 解析带emoji和链接的聊天系统消息
  • 盘点那些不知名却常用的 Git 操作
  • 前端面试题总结
  • 深度解析利用ES6进行Promise封装总结
  • 我与Jetbrains的这些年
  • “十年磨一剑”--有赞的HBase平台实践和应用之路 ...
  • 《天龙八部3D》Unity技术方案揭秘
  • 3月27日云栖精选夜读 | 从 “城市大脑”实践,瞭望未来城市源起 ...
  • 长三角G60科创走廊智能驾驶产业联盟揭牌成立,近80家企业助力智能驾驶行业发展 ...
  • ​iOS实时查看App运行日志
  • ​数据结构之初始二叉树(3)
  • #我与Java虚拟机的故事#连载13:有这本书就够了
  • (3)(3.5) 遥测无线电区域条例
  • (52)只出现一次的数字III
  • (C语言)逆序输出字符串
  • (Redis使用系列) Springboot 使用Redis+Session实现Session共享 ,简单的单点登录 五
  • (回溯) LeetCode 40. 组合总和II
  • (论文阅读40-45)图像描述1
  • (三分钟)速览传统边缘检测算子
  • (实测可用)(3)Git的使用——RT Thread Stdio添加的软件包,github与gitee冲突造成无法上传文件到gitee
  • (四)docker:为mysql和java jar运行环境创建同一网络,容器互联
  • (转)jQuery 基础
  • (转)负载均衡,回话保持,cookie
  • (转载)(官方)UE4--图像编程----着色器开发
  • .NET 8 编写 LiteDB vs SQLite 数据库 CRUD 接口性能测试(准备篇)
  • .NET Framework与.NET Framework SDK有什么不同?
  • .NET 的静态构造函数是否线程安全?答案是肯定的!
  • .net访问oracle数据库性能问题
  • [ C++ ] STL---stack与queue