当前位置: 首页 > news >正文

Apache Kafka 事务详解

Apache Kafka 事务详解

Apache Kafka 是一个分布式流处理平台,主要用于实时数据的传输和处理。在现代的数据密集型应用中,事务性保证在数据传输和处理中的作用至关重要。本文将详细介绍 Kafka 的事务性支持,包括其基本概念、架构、使用方法以及相关代码示例和运行效果。

1. Kafka 事务简介

Kafka 的事务性支持在 0.11.0 版本中引入,目的是提供跨多个 topic 和 partition 的原子消息写入能力。这意味着事务消息要么全部写入成功,要么全部失败,从而确保数据的一致性和完整性。

Kafka 的事务特性主要用于以下场景:

  • 确保多个 topic 和 partition 的消息一致性
  • 实现端到端的 Exactly Once 语义(EOS)
  • 防止消息丢失或重复消费

2. Kafka 事务架构

Kafka 事务涉及三个主要组件:

  • 生产者(Producer):负责发送事务性消息。
  • 消费者(Consumer):负责消费事务性消息。
  • Kafka Broker:负责管理事务状态,确保事务的一致性。

在 Kafka 中,每个事务都有一个唯一的 Transactional ID,用于标识事务的生命周期。事务的状态通过 Broker 中的事务协调器(Transaction Coordinator)进行管理。

3. Kafka 事务使用方法

3.1 配置生产者

要使用 Kafka 事务性支持,首先需要配置生产者。下面是一个配置事务性生产者的示例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class TransactionalProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");KafkaProducer<String, String> producer = new KafkaProducer<>(props);producer.initTransactions();try {producer.beginTransaction();producer.send(new ProducerRecord<>("my-topic", "key1", "value1")).get();producer.send(new ProducerRecord<>("my-topic", "key2", "value2")).get();producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {producer.close();throw e;} catch (KafkaException e) {producer.abortTransaction();}producer.close();}
}
3.2 配置消费者

为了正确消费事务性消息,需要配置隔离级别(isolation.level)为“读已提交(read_committed)”:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;import java.util.Collections;
import java.util.Properties;public class TransactionalConsumer {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("my-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}

4. 事务运行效果

4.1 生产者运行效果

当事务性生产者运行时,如果事务成功提交,我们可以看到以下输出:

Successfully sent message: key1, value1
Successfully sent message: key2, value2

如果事务失败并被回滚,我们将不会看到任何消息发送成功的日志。

4.2 消费者运行效果

事务性消费者只会读取已提交的事务消息。例如,如果我们发送了两条消息,但只提交了一条,那么消费者只会读取已提交的那条消息。

offset = 0, key = key1, value = value1

未提交的消息将不会被读取,从而确保数据的一致性。

5. 总结

Kafka 的事务性支持提供了一种确保消息一致性和完整性的方法,尤其适用于需要跨多个 topic 和 partition 进行原子写入的场景。通过配置事务性生产者和消费者,我们可以实现端到端的 Exactly Once 语义,防止消息丢失或重复消费。希望本文能帮助你更好地理解和使用 Kafka 的事务特性。

参考文献

  • Apache Kafka Documentation
  • Confluent Kafka Transactions

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 数据结构之《二叉树》(中)
  • Python爬虫核心面试题2
  • C#用Socket实现TCP客户端
  • 哦吼,新模型?文生图领域的新模型FLUX.1(附模型下载网盘地址和详细使用方法)
  • onceperrequestfilter 不生效问题
  • Centos 安装 Gitlab
  • 数据库文件管理
  • hcip作业1
  • apex正则表达式匹配富文本字段内容,如何只匹配文本而忽略富文本符号
  • Astro 实现TodoList网页应用案例
  • 【机器学习基础】Scikit-learn主要用法
  • 【问题处理】海康视频websocket代理问题(websocket在业务系统https协议下调用海康ws协议)
  • 想提交BCSP小高组T4的同学请看这里~
  • 链接器找不到一些ACADO库中的函数定义,导致未定义引用的错误。
  • Leetcode 3143. 正方形中的最多点数(二分、数组字符串、位运算集合)
  • 《Javascript高级程序设计 (第三版)》第五章 引用类型
  • 3.7、@ResponseBody 和 @RestController
  • Angular数据绑定机制
  • C# 免费离线人脸识别 2.0 Demo
  • const let
  • Docker 笔记(1):介绍、镜像、容器及其基本操作
  • IP路由与转发
  • learning koa2.x
  • LeetCode18.四数之和 JavaScript
  • Python连接Oracle
  • react-native 安卓真机环境搭建
  • vue-loader 源码解析系列之 selector
  • Vue组件定义
  • 不用申请服务号就可以开发微信支付/支付宝/QQ钱包支付!附:直接可用的代码+demo...
  • 仿天猫超市收藏抛物线动画工具库
  • 工作踩坑系列——https访问遇到“已阻止载入混合活动内容”
  • 关于使用markdown的方法(引自CSDN教程)
  • 基于Volley网络库实现加载多种网络图片(包括GIF动态图片、圆形图片、普通图片)...
  • 开年巨制!千人千面回放技术让你“看到”Flutter用户侧问题
  • 开源中国专访:Chameleon原理首发,其它跨多端统一框架都是假的?
  • 聊聊springcloud的EurekaClientAutoConfiguration
  • 什么软件可以剪辑音乐?
  • 实现简单的正则表达式引擎
  • 使用Gradle第一次构建Java程序
  • 世界编程语言排行榜2008年06月(ActionScript 挺进20强)
  • 掌握面试——弹出框的实现(一道题中包含布局/js设计模式)
  • 终端用户监控:真实用户监控还是模拟监控?
  • “十年磨一剑”--有赞的HBase平台实践和应用之路 ...
  • 智能情侣枕Pillow Talk,倾听彼此的心跳
  • ​比特币大跌的 2 个原因
  • # 飞书APP集成平台-数字化落地
  • #pragma 指令
  • #QT(QCharts绘制曲线)
  • (c语言版)滑动窗口 给定一个字符串,只包含字母和数字,按要求找出字符串中的最长(连续)子串的长度
  • (待修改)PyG安装步骤
  • (二十一)devops持续集成开发——使用jenkins的Docker Pipeline插件完成docker项目的pipeline流水线发布
  • (附源码)python旅游推荐系统 毕业设计 250623
  • (十三)MipMap
  • (新)网络工程师考点串讲与真题详解
  • (一)Docker基本介绍