当前位置: 首页 > news >正文

Kafka 生产者

目录

一、kafka生产者原理

二、kafka异步发送

配置kafka

创建对象,发送数据

带回调函数的异步发送

同步发送

 

三、kafka生产者分区

分区策略

指定分区:

 指定key:

什么都不指定:

自定义分区器

四、生产者提高吞吐量

五、数据的可靠性

ACK应答级别

数据完全可靠条件

可靠性总结

代码中配置ACK

五、数据重复

幂等性

生产者事务

六、数据有序、乱序

数据有序

数据乱序


 

一、kafka生产者原理

main线程中创建生产者对象,然后调用send方法来发送数据,根据生产环境的需求来判断是否要增加拦截器(一般建议不用),因为是跨节点通信,所以要对数据进行序列化,用kafka自带的序列化器处理,再由分区器来规定数据发送到哪个分区,先发送到了一个缓存队列当中,队列大小是32M,其中每个批次默认大小是16k,达到16k后就会进入sender线程发送到kafka集群,或者等待时间到了也会自动发送,每一个broker节点接受一个队列的消息,发送到分区后,分区最多缓存5个请求,如果达到5个请求都没有ack应答的话,那么接下来消息就会发送到别的分区上。

通过Selector将底层链路进行打通进行发送数据,集群收到后进行一个副本的同步,同步完成后进行ack应答。

 

应答成功后清理队列中的数据

 应答失败会进行retry重试

 

二、kafka异步发送

 

 创建Maven工程导入kafka客户端依赖

 导包

配置kafka

创建对象,发送数据

 

带回调函数的异步发送

 

同步发送

 

 

三、kafka生产者分区

分区策略

 

指定分区:

 

 指定key:

什么都不指定:

 

怎么把订单表的数据发送到kafka的指定分区中?

      key 上写表名,表名的hashcode值一定会发送到同一个分区(生产环境通常将表名作为key)

自定义分区器

 

 

 在生产者中关联自定义分区器

 在生产环境中可以过滤一些脏数据!

四、生产者提高吞吐量

压缩类型:

 

五、数据的可靠性

ACK应答级别

0的时候,数据发过来还没落盘就应答,结果leader挂了导致了数据丢失。

 

 1的时候,数据发送过来,leader落盘后就会应答,生产者收到ack应答认为信息已经发送成功,随后就会清除掉队列中的消息,但是此时follwer可能还没完成同步,这个时候leader挂掉,就会有一个follwer成为新的leader,可是生产者已经认为信息发送成功从队列中清除了消息,这就导致了数据的丢失。

 

-1(all):leader收到消息,并且所有follwer都完成消息同步后返回ack应答

follwer挂掉的话,等时间达到阈值还没向Leader发送通信请求或同步数据就会被踢出ISR,意味着这个follwer就不是有效副本了

 

 

 

 上面几种都有数据丢失的风险,如何真正保护数据的可靠性呢?

数据完全可靠条件

ack级别设置为-1,且至少1个leader+1个follwer,ISR里min.insync.replicas >= 2

 

可靠性总结

 

代码中配置ACK

 

五、数据重复

幂等性

 

 

生产者事务

 

 

 

六、数据有序、乱序

数据有序

数据乱序

 

相关文章:

  • Spring核心IOC的核心类解析
  • 【数据挖掘】恒生金融有限公司2023届秋招数据ETL工程师笔试题解析
  • 软件测试分类
  • (附源码)spring boot儿童教育管理系统 毕业设计 281442
  • var、let、const的区别
  • 机器学习(二十九):LightGBM 模型
  • node.js 使用教程-2.Gulp 打包构建入门与使用
  • UMLChina建模竞赛第3赛季第12轮:歌曲知识
  • springboot二手交易平台毕业设计源码290915
  • MySQL-触发器
  • 使用 Jest 对 Vuex 模块进行单元测试
  • 常用的芯片封装与PCB封装总结
  • 操作系统 —— 进程间的通信
  • 管理运筹学的一些知识点
  • 使用Spring事务管理和集成JUnit测试
  • 【Leetcode】101. 对称二叉树
  • [原]深入对比数据科学工具箱:Python和R 非结构化数据的结构化
  • Angular2开发踩坑系列-生产环境编译
  • git 常用命令
  • java2019面试题北京
  • Lsb图片隐写
  • Unix命令
  • Zsh 开发指南(第十四篇 文件读写)
  • 干货 | 以太坊Mist负责人教你建立无服务器应用
  • 和 || 运算
  • 猴子数据域名防封接口降低小说被封的风险
  • 记一次用 NodeJs 实现模拟登录的思路
  • 目录与文件属性:编写ls
  • 前端技术周刊 2019-01-14:客户端存储
  • 如何解决微信端直接跳WAP端
  • 如何用vue打造一个移动端音乐播放器
  • 我建了一个叫Hello World的项目
  • Python 之网络式编程
  • Redis4.x新特性 -- 萌萌的MEMORY DOCTOR
  • Spring Batch JSON 支持
  • $.ajax()参数及用法
  • (2)MFC+openGL单文档框架glFrame
  • (30)数组元素和与数字和的绝对差
  • (4)logging(日志模块)
  • (day 2)JavaScript学习笔记(基础之变量、常量和注释)
  • (ZT) 理解系统底层的概念是多么重要(by趋势科技邹飞)
  • (附源码)ssm学生管理系统 毕业设计 141543
  • (收藏)Git和Repo扫盲——如何取得Android源代码
  • (转)chrome浏览器收藏夹(书签)的导出与导入
  • (转)eclipse内存溢出设置 -Xms212m -Xmx804m -XX:PermSize=250M -XX:MaxPermSize=356m
  • .Net Attribute详解(上)-Attribute本质以及一个简单示例
  • .NET Core WebAPI中封装Swagger配置
  • .net core 实现redis分片_基于 Redis 的分布式任务调度框架 earth-frost
  • .NET/C# 的字符串暂存池
  • .NET简谈互操作(五:基础知识之Dynamic平台调用)
  • .NET与java的MVC模式(2):struts2核心工作流程与原理
  • .net中生成excel后调整宽度
  • .so文件(linux系统)
  • .vollhavhelp-V-XXXXXXXX勒索病毒的最新威胁:如何恢复您的数据?
  • @Async注解的坑,小心