当前位置: 首页 > news >正文

Kafka无消息丢失配置

Kafka无消息丢失配置

Kafka到底会不会丢数据(data loss)? 网上各种说法都有,在回答这个问题之前, 我们要明确“责任边界”。所谓责任边界就是要确定消息在生产和消费的完整流程中是由谁来负责,确保它不会丢失。这样即使真的出现了消息丢失,也能明确是责任主体,有针对性地进行改进和调整。

个人认为,关于责任的划定,官方其实已经给出了很明确的答案:

Once a published message is committed it will not be lost as long as one broker that replicates the partition to which this message was written remains "alive".

倘若我们完全理解这句话,那么“是否丢失消息”的问题自可迎刃而解。这句话有两个关键要点:

  1. committed: Kafka只对已提交的消息做出交付保证(delivery guarantee),没有成功提交的消息Kafka不对其做出任何承诺

  2. alive:只要有一个保存了该条消息的broker还活着(alive)就不会丢失消息

Kafka如何定义一个broker是否存活(alive)呢? 很简单,也是两个条件:

  1. 节点进程必须存活,且一直维持与zookeeper的会话

  2. 如果是follower节点,它与leader节点相差的消息数不能过大,即不能远远落后于leader节点的进度。如果按照Kafka的术语来说,就是这个follower节点必须是一个ISR(in-sync replica,即与leader保持同步的副本节点)

当然,我个人绝对相信,因为一些默认的配置和尚未发现的bug等原因,上面Kafka所做的保证也不一定百分之百能够实现,但大多数情况下通过本文的配置是可以帮助你做到无消息丢失的。

okay,闲言少叙,直接上配置了。下面的参数配置及Best practice列表可以较好地保证数据的持久性(当然是trade-off,牺牲了吞吐量)。我会在该列表之后对列表中的每一项进行讨论,有兴趣的同学可以看下后面的分析。

  • block.on.buffer.full = true

  • acks = all

  • retries = MAX_VALUE

  • max.in.flight.requests.per.connection = 1

  • 使用KafkaProducer.send(record, callback)

  • 如果仅仅是要消息无丢失,使用带callback的send方法;如果还要保证无乱序问题,那么发送失败时一定要在callback逻辑中立即关闭producer:close(0)

  • unclean.leader.election.enable=false

  • replication.factor = 3

  • min.insync.replicas = 2

  • replication.factor > min.insync.replicas

  • enable.auto.commit=false

  • 使用手动提交位移,消息处理完成之后再提交位移

给出列表之后,我们从两个方面来探讨一下数据为什么会丢失:

1. Producer端

本文讨论的是Kafka 0.9版本之后的producer——Kafka0.9正式使用java版producer替换了老版的scala producer。

新版本默认使用异步发送机制,所以KafkaProducer.send仅仅是把这条消息放入一个缓存中(即RecordAccumulator,本质上使用了队列来缓存记录),同时后台的Sender IO线程会不断扫描该缓存区,将满足条件的消息封装到某个batch中然后发送出去。显然,这个过程中就有一个数据丢失的窗口:若IO线程发送之前client端挂掉了,累积在accumulator中的数据的确有可能会丢失。但显然,这不在Kafka做出保证的责任边界内,毕竟消息没有提交成功,尚未被Kafka接管。不过上面列表中的一些参数配置仍然可以帮助你避免这种情况下的数据丢失。

Producer的另一个问题是消息的乱序问题。假设客户端代码依次执行下面的语句将两条消息发到相同的分区

producer.send(record1);
producer.send(record2);

如果此时由于某些原因(比如瞬时的网络抖动)导致record1没有成功发送,同时Kafka又配置了重试机制和max.in.flight.requests.per.connection大于1(默认值是5,本来就是大于1的),那么重试record1成功后,record1在分区中就在record2之后,从而造成消息的乱序。很多某些要求强顺序保证的场景是不允许出现这种情况的。

鉴于producer的这两个问题,我们应该如何规避呢??对于消息丢失的问题,很容易想到的一个方案就是:既然异步发送有可能丢失数据, 我改成同步发送总可以吧?比如这样:

producer.send(record).get();

这样当然是可以的,但是性能会很差,不建议这样使用。因此特意总结了一份配置列表。个人认为该配置清单应该能够比较好地规避producer端数据丢失情况的发生:(特此说明一下,软件配置的很多决策都是trade-off,下面的配置也不例外:应用了这些配置,你可能会发现你的producer/consumer 吞吐量会下降,这是正常的,因为你换取了更高的数据安全性)

  • block.on.buffer.full = true  尽管该参数在0.9.0.0已经被标记为“deprecated”,但鉴于它的含义非常直观,所以这里还是显式设置它为true,使得producer将一直等待缓冲区直至其变为可用。否则如果producer生产速度过快耗尽了缓冲区,producer将抛出异常

  • acks=all  很好理解,所有follower都响应了才认为消息提交成功,即"committed"

  • retries = MAX 无限重试,直到你意识到出现了问题:)

  • max.in.flight.requests.per.connection = 1 限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。注意:设置此参数是为了避免消息乱序

  • 使用KafkaProducer.send(record, callback)而不是send(record)方法   自定义回调逻辑处理消息发送失败

  • callback逻辑中最好显式关闭producer:close(0) 注意:设置此参数是为了避免消息乱序

  • unclean.leader.election.enable=false   关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,以避免数据丢失

  • replication.factor >= 3   这个完全是个人建议了,参考了Hadoop及业界通用的三备份原则

  • min.insync.replicas > 1 消息至少要被写入到这么多副本才算成功,也是提升数据持久性的一个参数。与acks配合使用

  • 保证replication.factor > min.insync.replicas  如果两者相等,当一个副本挂掉了分区也就没法正常工作了。通常设置replication.factor = min.insync.replicas + 1即可

2. Consumer端

consumer端丢失消息的情形比较简单:如果在消息处理完成前就提交了offset,那么就有可能造成数据的丢失。由于Kafka consumer默认是自动提交位移的,所以在后台提交位移前一定要保证消息被正常处理了,因此不建议采用很重的处理逻辑,如果处理耗时很长,则建议把逻辑放到另一个线程中去做。为了避免数据丢失,现给出两点建议:

  • enable.auto.commit=false  关闭自动提交位移

  • 在消息被完整处理之后再手动提交位移

okay,总结一下,本文给出了Kafka关于交付保证的基本定义以及无消息丢失配置。这只是一个best practice,具体的使用还要结合各自的业务特点进行展开,有针对性地进行设置。

 

 

 

版权声明:本文版权由 木秀林网所有,转载请保留链接: Kafka无消息丢失配置

转载于:https://www.cnblogs.com/felixzh/p/8027584.html

相关文章:

  • 人体的数学美思考
  • winfrom 水晶报表制作
  • 洛谷 P1454 圣诞夜的极光
  • 关于手势处理
  • ASP.NET Web API 使用Swagger生成在线帮助测试文档,支持多个GET
  • Centos运行Mysql因为内存不足进程被杀
  • BZOJ3529 [Sdoi2014]数表 【莫比乌斯反演】
  • JS 详解 Cookie、 LocalStorage 与 SessionStorage
  • 进程和线程(5)-分布式进程
  • LeetCode-13-roman-to-integer
  • 荣品i.mx6q飞思卡尔工业级核心板开发板高稳定性
  • SoapUI使用中遇到的问题及解决办法
  • 【软工项目组】第十三次会议(样式设计2.0)
  • QGhappy小组第三次作业第十次会议完成情况
  • corethink功能模块探索开发(二)让这个模块可安装
  • 10个确保微服务与容器安全的最佳实践
  • CentOS学习笔记 - 12. Nginx搭建Centos7.5远程repo
  • ES6系列(二)变量的解构赋值
  • JavaScript中的对象个人分享
  • JWT究竟是什么呢?
  • leetcode378. Kth Smallest Element in a Sorted Matrix
  • nodejs调试方法
  • vue:响应原理
  • vue脚手架vue-cli
  • 聚簇索引和非聚簇索引
  • 前端性能优化--懒加载和预加载
  • 使用 @font-face
  • 好程序员大数据教程Hadoop全分布安装(非HA)
  • 智能情侣枕Pillow Talk,倾听彼此的心跳
  • # Maven错误Error executing Maven
  • (Oracle)SQL优化技巧(一):分页查询
  • (二)WCF的Binding模型
  • (二)换源+apt-get基础配置+搜狗拼音
  • (非本人原创)史记·柴静列传(r4笔记第65天)
  • (附源码)springboot 房产中介系统 毕业设计 312341
  • (蓝桥杯每日一题)love
  • .htaccess配置重写url引擎
  • .Net IE10 _doPostBack 未定义
  • .NET Micro Framework 4.2 beta 源码探析
  • .NET/C# 如何获取当前进程的 CPU 和内存占用?如何获取全局 CPU 和内存占用?
  • .net6Api后台+uniapp导出Excel
  • .NET程序员迈向卓越的必由之路
  • .net开源工作流引擎ccflow表单数据返回值Pop分组模式和表格模式对比
  • .NET序列化 serializable,反序列化
  • @Autowired注解的实现原理
  • @ModelAttribute使用详解
  • [ C++ ] STL_vector -- 迭代器失效问题
  • [2015][note]基于薄向列液晶层的可调谐THz fishnet超材料快速开关——
  • [Android Pro] AndroidX重构和映射
  • [CQOI 2011]动态逆序对
  • [DAU-FI Net开源 | Dual Attention UNet+特征融合+Sobel和Canny等算子解决语义分割痛点]
  • [IE编程] 多页面基于IE内核浏览器的代码示例
  • [LeetCode]—Add Binary 两个字符串二进制相加
  • [LuoguP1141]01迷宫
  • [one_demo_9]判断数组是否递增