当前位置: 首页 > news >正文

Kafka Producer发送消息流程之消息异步发送和同步发送

文章目录

  • 1. 异步发送
  • 2. 同步发送

在这里插入图片描述

1. 异步发送

Kafka默认就是异步发送,在Main线程中的多条消息,没有严格的先后顺序,Sender发送后就继续下一条,异步接受结果。

public class KafkaProducerCallbackTest {public static void main(String[] args) throws InterruptedException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test2",""+i,"我是你爹"+i);//发送recordproducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");}//关闭producerproducer.close();}
}

Main线程中,对于多条数据,下一条消息的发送并不等待上一条消息的确认,而是继续发送。

2024-07-17 21:43:46.052 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BqIgDGtwTeeusL_ygHtn2w
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
2024-07-17 21:43:46.075 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:43:46.280 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 6000 with epoch 0
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
2024-07-17 21:43:46.569 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed

可以看到先是main线程循环发送完了多条数据,然后再异步收到通知。

2. 同步发送

消息有严格的先后顺序,下一条消息必须等到上一条消息的回调确认后,再发送,这是一个效率极低的过程。

按照流程图,上一条消息需要从生产者一直流转,多个步骤,到数据收集器,到Sender,最后还要等待回调确认,才可以开始下一条消息的流转。

public class KafkaProducerCallbackTest {public static void main(String[] args) throws InterruptedException, ExecutionException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test2",""+i,"我是你爹"+i);//发送recordFuture<RecordMetadata> send = producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");send.get();}//关闭producerproducer.close();}
}
2024-07-17 21:49:19.586 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 5000 with epoch 0
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
2024-07-17 21:49:19.823 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:49:19.838 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 【整体介绍】HTML和JS编写多用户VR应用程序的框架
  • node.js的安装及学习(node/nvm/npm的区别)
  • 【闲聊】-Perl的基础语法
  • Python3 学习——基础语法2
  • pyinstall 打包基于PyQt5和PaddleOCR的项目为.exe
  • phpinfo
  • ubuntu24.04 linux bcm94352hmb 无线网卡(带蓝牙功能)无法连接蓝牙设备的解决办法
  • LINUX多进程相关
  • Hadoop3:HDFS存储优化之小文件归档
  • XML 编辑器:功能、选择与使用技巧
  • 北斗高精度服务:无人驾驶领域的创新引擎
  • 基于opencv的图片加水印实现方案
  • ubuntu 通讯学习笔记
  • GESP CCF C++ 三级认证真题 2024年6月
  • 常用的设计模式有哪些
  • 【跃迁之路】【699天】程序员高效学习方法论探索系列(实验阶段456-2019.1.19)...
  • Android路由框架AnnoRouter:使用Java接口来定义路由跳转
  • DOM的那些事
  • java正则表式的使用
  • Java知识点总结(JavaIO-打印流)
  • oldjun 检测网站的经验
  • opencv python Meanshift 和 Camshift
  • ⭐ Unity 开发bug —— 打包后shader失效或者bug (我这里用Shader做两张图片的合并发现了问题)
  • 大快搜索数据爬虫技术实例安装教学篇
  • 简单数学运算程序(不定期更新)
  • 如何优雅地使用 Sublime Text
  • 入口文件开始,分析Vue源码实现
  • 想写好前端,先练好内功
  • Play Store发现SimBad恶意软件,1.5亿Android用户成受害者 ...
  • ​​​​​​​STM32通过SPI硬件读写W25Q64
  • ​人工智能之父图灵诞辰纪念日,一起来看最受读者欢迎的AI技术好书
  • # Swust 12th acm 邀请赛# [ A ] A+B problem [题解]
  • # 职场生活之道:善于团结
  • #1015 : KMP算法
  • #define与typedef区别
  • #etcd#安装时出错
  • #stm32整理(一)flash读写
  • (1)Jupyter Notebook 下载及安装
  • (20)目标检测算法之YOLOv5计算预选框、详解anchor计算
  • (C++哈希表01)
  • (DenseNet)Densely Connected Convolutional Networks--Gao Huang
  • (二) Windows 下 Sublime Text 3 安装离线插件 Anaconda
  • (二)基于wpr_simulation 的Ros机器人运动控制,gazebo仿真
  • (二十五)admin-boot项目之集成消息队列Rabbitmq
  • (深度全面解析)ChatGPT的重大更新给创业者带来了哪些红利机会
  • .bat批处理(八):各种形式的变量%0、%i、%%i、var、%var%、!var!的含义和区别
  • .net core + vue 搭建前后端分离的框架
  • .NET Core WebAPI中使用Log4net 日志级别分类并记录到数据库
  • .net core 源码_ASP.NET Core之Identity源码学习
  • .NET Framework Client Profile - a Subset of the .NET Framework Redistribution
  • .net 简单实现MD5
  • .NET 将多个程序集合并成单一程序集的 4+3 种方法
  • .net遍历html中全部的中文,ASP.NET中遍历页面的所有button控件
  • .NET导入Excel数据
  • @angular/cli项目构建--http(2)