当前位置: 首页 > news >正文

Kafka接收消息

文章目录

  • Acknowledgment
  • 读消息指定分区
  • 批量消费
  • 消息拦截

// 采用监听得方式接收 @Payload标记消息体内容.
@KafkaListener(topics = {"test"},groupId = "hello")
public void onEvent(@Payload String event,@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(value = KafkaHeaders.RECEIVED_PARTITION_ID) String partition){System.out.println("读取到了时间消息: " + event);
}

Acknowledgment

开启手动确认模式;

listener:ack-mode: manual
// 采用监听得方式接收 @Payload标记消息体内容.
@KafkaListener(topics = {"test"},groupId = "hello")
public void onEvent(@Payload String event,@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(value = KafkaHeaders.RECEIVED_PARTITION_ID) String partition,ConsumerRecord<String,String> record,Acknowledgment ack){ack.acknowledge(); // 手动确认,告诉kafka服务器该消息我已经收到了. System.out.println("读取到了时间消息: " + event);
}

读消息指定分区

@KafkaListener(groupId = "hello",topicPartitions = {@TopicPartition(topic = "${kafka.topic.test}",partitions = {"0","1","2"}, // 0 1 2分区不限制偏移量partitionOffsets = { // 3 分区只读 3偏移量之后的; 4分区只读 4偏移量之后的@PartitionOffset(partition = "3",initialOffset = "3"),@PartitionOffset(partition = "4",initialOffset = "3")})}
)

批量消费

修改配置

kafka:bootstrap-servers: 192.168.225.128:9092listener:type: batch# 每次读取20条consumer:max-poll-records: 20

消费者端接收一个List即可

@KafkaListener(topics = {"hi"},groupId = "batchGroup2")
public void onEvent3(List<ConsumerRecord<String,String>> records){System.out.println(records.size());
}

消息拦截

在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 《简历宝典》12 - 简历中“项目经历”,内功学习 - 下篇
  • 2024最新最全面的软件测试自动化面试题(含答案)
  • 网络爬虫Scrapy shell 的使用和介绍
  • STM32+TMC2209控制步进电机正反转。
  • WEB07Vue+Ajax
  • openharmony上传图片,并获取返回路径
  • Unable to obtain driver using Selenium Manager: Selenium Manager failed解决方案
  • 角点检测及MATLAB实现
  • AIGC笔记--基于Stable Diffusion实现图片的inpainting
  • 9.5 栅格图层符号化多波段彩色渲染
  • 网页数据抓取:融合BeautifulSoup和Scrapy的高级爬虫技术
  • Node学习-第六章-express中间件与RESful API接口规范(下)
  • live555 rtsp服务器实战之createNewStreamSource
  • 目标检测--X-anylabeling使用自己的模型自动标注
  • [C++]多态
  • JavaScript-如何实现克隆(clone)函数
  •  D - 粉碎叛乱F - 其他起义
  • JAVA_NIO系列——Channel和Buffer详解
  • Java编程基础24——递归练习
  • linux学习笔记
  • Odoo domain写法及运用
  • Selenium实战教程系列(二)---元素定位
  • underscore源码剖析之整体架构
  • use Google search engine
  • VirtualBox 安装过程中出现 Running VMs found 错误的解决过程
  • 阿里中间件开源组件:Sentinel 0.2.0正式发布
  • 浅谈JavaScript的面向对象和它的封装、继承、多态
  • 悄悄地说一个bug
  • 文本多行溢出显示...之最后一行不到行尾的解决
  • 一些css基础学习笔记
  • 数据库巡检项
  • ​ ​Redis(五)主从复制:主从模式介绍、配置、拓扑(一主一从结构、一主多从结构、树形主从结构)、原理(复制过程、​​​​​​​数据同步psync)、总结
  • ​DB-Engines 11月数据库排名:PostgreSQL坐稳同期涨幅榜冠军宝座
  • ​创新驱动,边缘计算领袖:亚马逊云科技海外服务器服务再进化
  • ( 10 )MySQL中的外键
  • (1/2) 为了理解 UWP 的启动流程,我从零开始创建了一个 UWP 程序
  • (2.2w字)前端单元测试之Jest详解篇
  • (3)STL算法之搜索
  • (C语言)共用体union的用法举例
  • (done) 两个矩阵 “相似” 是什么意思?
  • (ibm)Java 语言的 XPath API
  • (pycharm)安装python库函数Matplotlib步骤
  • (备忘)Java Map 遍历
  • (附源码)计算机毕业设计ssm基于B_S的汽车售后服务管理系统
  • (附源码)小程序 交通违法举报系统 毕业设计 242045
  • (官网安装) 基于CentOS 7安装MangoDB和MangoDB Shell
  • (转)chrome浏览器收藏夹(书签)的导出与导入
  • (转)母版页和相对路径
  • (自适应手机端)响应式服装服饰外贸企业网站模板
  • (自用)learnOpenGL学习总结-高级OpenGL-抗锯齿
  • .gitignore
  • .NET CORE 第一节 创建基本的 asp.net core
  • .net core 6 集成和使用 mongodb
  • .net core Redis 使用有序集合实现延迟队列
  • .NET 使用 ILMerge 合并多个程序集,避免引入额外的依赖