当前位置: 首页 > news >正文

SpringBoot集成kafka-监听器手动确认接收消息(主要为了保证业务完成后再确认接收)

SpringBoot集成kafka-监听器手动确认接收消息

  • 1、说明
  • 2、示例
    • 2.1、application.yml
    • 2.2、消费者
    • 2.3、生产者
    • 2.4、测试类
    • 2.5、测试

1、说明

kafak中默认情况下是自动确认消息接收的,也就是说先启动消费者监听程序,再启动生产者发送消息,此时消费者监听到生产者发送的消息后,程序会自动确认接收成功,偏移量会自动下移,此时再启动消费者,偏移量会从新的位置读取数据,如果本次出现异常,业务没有处理完成,那么下次启动消费者是读取不到本次的消息的,所以可以采用手动确认的配置,确保本次消费者接收到了消息,并且业务正常处理完毕了,给kafak手动反馈接收成功。

在这里插入图片描述

2、示例

在这里插入图片描述

2.1、application.yml

在这里插入图片描述

2.2、消费者

package com.power.consumer;import com.power.model.User;
import com.power.util.JSONUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.util.function.Consumer;@Component
public class EventConsumer {@KafkaListener(topics = {"${kafka.topic.name}"},groupId="${kafka.consumer.group}")public void onEvent4(String userJson,@Header(value=KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(value=KafkaHeaders.RECEIVED_PARTITION_ID) String partition,ConsumerRecord<String,String> record,Acknowledgment ack){try {User user =JSONUtils.toBean(userJson,User.class);System.out.println("读取/消费到的事件,user:"+user+",topic:"+topic+",partition:"+partition);System.out.println("读取/消费到的事件:"+record.toString());int a = 10/0;//业务确认完成,给kafka服务器反馈确认ack.acknowledge();//手动确认消息,就是告诉kafka服务器,该消息我已经接收到了,默认情况下是自动确认//手动确认后,下次启动消费者,偏移量会从新的位置开始;没有手动确认,下次启动消费者,偏移量还是从老位置开始}catch (Exception e){e.printStackTrace();}}}

2.3、生产者

package com.power.producer;import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate;public void sendEvent2(){User user = User.builder().id(10001).phone("15676767676").birthday(new Date()).build();String userJson = JSONUtils.toJSON(user);kafkaTemplate.send("helloTopic",userJson);}}

2.4、测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot02KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendEvent2(){eventProducer.sendEvent2();}}

2.5、测试

先启动消费者监听程序
再启动生产者发送消息
程序再业务中出现了异常:

在这里插入图片描述
再次启动消费者程序,因为再上次启动时出现了异常,也没有进行手动确认接收,所以本地启动消费者后依然可以读取到上次未完成业务时接收到的数据
在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 隔离操作系统与进程
  • 【源码】IMX6uLL与QT的串口通信
  • 【C++类和对象】类和对象的介绍、this指针以及体会面向对象编程
  • 代码随想录算法训练营第29天 贪心算法 part03| 题目:134. 加油站 、 135. 分发糖果 、860.柠檬水找零 、 406.根据身高重建队列
  • MAC安装miniconda提示“文本编码Unicode(UTF-8)不适用”解决方案
  • uni-app小程序当前页面刷新怎么实现
  • 基于Spring Boot的文字识别系统
  • HarmonyOS开发移动应用:调用百度翻译开放平台的App Id和密钥
  • vue项目中解决el-table数据过多导致页面卡顿问题
  • ZooKeeper 实战(六) - 分布式ID实现方案
  • vue前端获取电脑本机的mac和ip地址
  • Flask+LayUI开发手记(四):弹出层实现增删改查功能
  • (十八)Flink CEP 详解
  • Spring Boot 项目打包及在宝塔面板上部署的简易指南
  • 从C向C++28——设计模式
  • 《Javascript数据结构和算法》笔记-「字典和散列表」
  • 【翻译】Mashape是如何管理15000个API和微服务的(三)
  • Angular 2 DI - IoC DI - 1
  • interface和setter,getter
  • Java反射-动态类加载和重新加载
  • java小心机(3)| 浅析finalize()
  • Js基础知识(一) - 变量
  • Just for fun——迅速写完快速排序
  • nginx 负载服务器优化
  • SQL 难点解决:记录的引用
  • Vue UI框架库开发介绍
  • webpack入门学习手记(二)
  • Yeoman_Bower_Grunt
  • 百度贴吧爬虫node+vue baidu_tieba_crawler
  • 个人博客开发系列:评论功能之GitHub账号OAuth授权
  • 技术攻略】php设计模式(一):简介及创建型模式
  • 解决jsp引用其他项目时出现的 cannot be resolved to a type错误
  • 开源中国专访:Chameleon原理首发,其它跨多端统一框架都是假的?
  • 两列自适应布局方案整理
  • 前端_面试
  • 前言-如何学习区块链
  • 使用 @font-face
  • 手写双向链表LinkedList的几个常用功能
  • 通过npm或yarn自动生成vue组件
  • 小而合理的前端理论:rscss和rsjs
  • 一些css基础学习笔记
  • elasticsearch-head插件安装
  • ​iOS实时查看App运行日志
  • ​secrets --- 生成管理密码的安全随机数​
  • ​直流电和交流电有什么区别为什么这个时候又要变成直流电呢?交流转换到直流(整流器)直流变交流(逆变器)​
  • # Swust 12th acm 邀请赛# [ A ] A+B problem [题解]
  • #NOIP 2014# day.1 T2 联合权值
  • #我与Java虚拟机的故事#连载17:我的Java技术水平有了一个本质的提升
  • ( 用例图)定义了系统的功能需求,它是从系统的外部看系统功能,并不描述系统内部对功能的具体实现
  • (2015)JS ES6 必知的十个 特性
  • (C语言)输入自定义个数的整数,打印出最大值和最小值
  • (delphi11最新学习资料) Object Pascal 学习笔记---第13章第6节 (嵌套的Finally代码块)
  • (leetcode学习)236. 二叉树的最近公共祖先
  • (Matalb分类预测)GA-BP遗传算法优化BP神经网络的多维分类预测
  • (Matalb回归预测)PSO-BP粒子群算法优化BP神经网络的多维回归预测