当前位置: 首页 > news >正文

kafka如何保证消息不丢失

                Kafka发送消息是异步发送的,所以我们不知道消息是否发送成功,所以会可能造成消息丢失。而且Kafka架构是由生产者-服务器端-消费者三种组成部分构成的。要保证消息不丢失,那么主要有三种解决方法。

生产者(producer)端处理

生产者默认发送消息代码如下:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class KafkaMessageProducer {public static void main(String[] args) {// 配置Kafka生产者Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);String topic = "test"; // Kafka主题try {// 发送消息到Kafkafor (int i = 0; i < 10; i++) {String message = "Message " + i;ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);producer.send(record);System.out.println("Sent message: " + message);}} catch (Exception e) {e.printStackTrace();} finally {// 关闭Kafka生产者producer.close();}}
}

生产者端要保证消息发送成功,可以有两个方法:

1.把异步发送改成同步发送,这样producer就能实时知道消息的发送结果。

要将 Kafka 发送方法改为同步发送,可以使用 `send()` 方法的返回值Future<RecordMetadata>`, 并调用 `get()` 方法来等待发送完成。

以下是将 Kafka 发送方法改为同步发送的示例代码:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.RecordMetadata;public class KafkaMessageProducer {public static void main(String[] args) {// 配置 Kafka 生产者Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092"); // Kafka 集群地址props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器// 创建 Kafka 生产者实例Producer<String, String> producer = new KafkaProducer<>(props);String topic = "test"; // Kafka 主题try {// 发送消息到 Kafkafor (int i = 0; i < 10; i++) {String message = "Message " + i;ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);RecordMetadata metadata = producer.send(record).get(); // 同步发送并等待发送完成System.out.println("Sent message: " + message + ", offset: " + metadata.offset());}} catch (InterruptedException | ExecutionException e) {e.printStackTrace();} finally {// 关闭 Kafka 生产者producer.close();}}
}

在这个示例代码中,通过调用 send(record).get() 实现了同步发送,其中 get() 方法会阻塞当前线程,直到发送完成并返回消息的元数据。

2.添加异步回调函数来监听消息发送的结果,如果发送失败,可以在回调函数里重新发送。

要保持发送消息成功并添加回调函数,你可以在发送消息的时候指定一个回调函数作为参数。回调 函数将在消息发送完成后被调用,以便你可以在回调函数中处理发送结果。

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;public class KafkaMessageProducer {public static void main(String[] args) {// 配置 Kafka 生产者Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092"); // Kafka 集群地址props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器// 创建 Kafka 生产者实例Producer<String, String> producer = new KafkaProducer<>(props);String topic = "test"; // Kafka 主题try {// 发送消息到 Kafkafor (int i = 0; i < 10; i++) {String message = "Message " + i;ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);// 发送消息并指定回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("Sent message: " + message + ", offset: " + metadata.offset());} else {// 这里重新发送消息producer.send(record);exception.printStackTrace();}}});}} finally {// 关闭 Kafka 生产者producer.close();}}
}

在这个示例代码中,我们使用了 send(record, callback) 方法来发送消息,并传递了一个实现了 Callback 接口的匿名内部类作为回调函数。当消息发送完成后,回调函数的 onCompletion() 方法会被调用。你可以根据 RecordMetadata 和 Exception 参数来处理发送结果。
另外producer还提供了一个重试参数,这个参数叫retries,如果因为网络问题或者Broker故障导致producer发送消息失败,那么producer会根据这个参数的值进行重试发送消息。

服务器端(Broker)端

Kafka Broker(服务器端)通过以下方式来确保生产者端消息发送的成功和不丢失:

1. 消息持久化(异步刷盘):Kafka Broker将接收到的消息持久化到磁盘上的日志文件中。这样即使在消息发送后发生故障,Broker能够恢复并确保消息不会丢失。(注意:持久化是由操作系统调度的,如果持久化之前系统崩溃了,那么就因为不能持久化导致数据丢失,但是Kafka没提供同步刷盘策略)

2. 复制与高可用性:Kafka支持分布式部署,可以将消息分布到多个Broker上形成一个Broker集群。在集群中,消息被复制到多个副本中,以提供冗余和高可用性。生产者发送消息时,它可以将消息发送到任何一个Broker,然后Broker将确保消息在集群中的所有副本中都被复制成功。

3. 消息提交确认:当生产者发送消息后,在收到Broker的确认响应之前,生产者会等待。如果消息成功写入并复制到了指定的副本中,Broker会发送确认响应给生产者。如果生产者在指定的时间内没有收到确认响应,它将会尝试重新发送消息,以确保消息不会丢失。

4. 可靠性设置(同步刷盘):生产者可以配置一些参数来提高消息发送的可靠性。例如,可以设置`acks`参数来指定需要收到多少个Broker的确认响应才认为消息发送成功。可以将`acks`设置为`"all"`,表示需要收到所有副本的确认响应才算发送成功。

总之,Kafka Broker通过持久化和复制机制,以及消息确认和可靠性设置,确保生产者端的消息发送成功且不丢失。同时,应注意及时处理可能的错误情况,并根据生产者端需求和场景合理配置相应的参数。

对于使用YAML文件进行Kafka配置的情况,你可以按照以下格式设置acks参数:

# Kafka生产者配置
producer:bootstrap.servers: your-kafka-server:9092acks: all        # 设置acks参数为"all"key.serializer: org.apache.kafka.common.serialization.StringSerializervalue.serializer: org.apache.kafka.common.serialization.StringSerializer

消费者(Consumer)处理

        Kafka Consumer 默认会确保消息的至少一次传递(at least once delivery)。这意味着当 Consumer 完成对一条消息的处理后,会向 Kafka 提交消息的偏移量(offset),告知 Kafka 这条消息已被成功处理。如果 Consumer 在处理消息时发生错误,可以通过回滚偏移量来重试处理之前的消息。

以下是一些确保消息消费成功的方法:

  •  使用自动提交偏移量(Auto Commit Offsets)
  • 手动提交偏移量(Manual Commit Offsets)
  • 设置消费者的最大重试次数:
  • 设置适当的消费者参数

尽管 Kafka 提供了可靠的消息传递机制,但仍然需要在消费者端实现适当的错误处理和重试逻辑,以处理可能发生的错误情况。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • GAT1399协议分析(7)--pycharm anaconde3 配置pyside2
  • 转让北京劳务分包地基基础施工资质条件和流程
  • Vue3 组合式 API:依赖注入(四)
  • bash、zsh、fish三种流行Unix shell的区别
  • Linux 进程控制
  • 为什么选择Python作为AI开发语言
  • Kimichat使用案例010:快速识别出图片中的表格保存到Excel
  • 重邮计算机网络803-(2)物理层
  • AI大模型在健康睡眠监测中的深度融合与实践案例
  • 天诚公租房、人才公寓NB-IOT人脸物联网智能门锁解决方案
  • 融云:应用出海新增长引擎,GPT-4o 后的 AI 创新与用户运营
  • 144、二叉树的前序递归遍历
  • 5.1 实体完整性
  • 2024-06-10 Unity 编辑器开发之编辑器拓展10 —— 其他常见工具类
  • VueRouter路由与Vuex状态管理
  • 【162天】黑马程序员27天视频学习笔记【Day02-上】
  • AWS实战 - 利用IAM对S3做访问控制
  • CAP 一致性协议及应用解析
  • CSS进阶篇--用CSS开启硬件加速来提高网站性能
  • CSS中外联样式表代表的含义
  • idea + plantuml 画流程图
  • JavaScript函数式编程(一)
  • Js基础知识(一) - 变量
  • oldjun 检测网站的经验
  • php中curl和soap方式请求服务超时问题
  • python3 使用 asyncio 代替线程
  • React-Native - 收藏集 - 掘金
  • spark本地环境的搭建到运行第一个spark程序
  • 初识 webpack
  • 观察者模式实现非直接耦合
  • 开放才能进步!Angular和Wijmo一起走过的日子
  • 面试遇到的一些题
  • 智能合约开发环境搭建及Hello World合约
  • ionic异常记录
  • PostgreSQL之连接数修改
  • shell使用lftp连接ftp和sftp,并可以指定私钥
  • 新海诚画集[秒速5センチメートル:樱花抄·春]
  • # 20155222 2016-2017-2 《Java程序设计》第5周学习总结
  • #NOIP 2014# day.1 T2 联合权值
  • #数学建模# 线性规划问题的Matlab求解
  • #我与Java虚拟机的故事#连载12:一本书带我深入Java领域
  • (八十八)VFL语言初步 - 实现布局
  • (附源码)springboot电竞专题网站 毕业设计 641314
  • (六) ES6 新特性 —— 迭代器(iterator)
  • (十)Flink Table API 和 SQL 基本概念
  • (原創) 是否该学PetShop将Model和BLL分开? (.NET) (N-Tier) (PetShop) (OO)
  • (转)http协议
  • (转载)hibernate缓存
  • *算法训练(leetcode)第三十九天 | 115. 不同的子序列、583. 两个字符串的删除操作、72. 编辑距离
  • ./configure,make,make install的作用(转)
  • .Net Core 中间件验签
  • .Net Memory Profiler的使用举例
  • .net 写了一个支持重试、熔断和超时策略的 HttpClient 实例池
  • .NET/C# 利用 Walterlv.WeakEvents 高性能地定义和使用弱事件
  • /*在DataTable中更新、删除数据*/