当前位置: 首页 > news >正文

SpringBoot用kafka.listener监听接受Kafka消息

1.创建kafka监听配置并进行注册

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;import java.util.HashMap;
import java.util.Map;/*** @author 35* @description kafka listen监听配置* @date 2024年04月24日 13:25*/
@Configuration
@EnableKafka
public class KafkaConfig {// kafka实例@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;// kafka AI 服务的Groupprivate String groupId = Constants.KAFKA_AI_SERVER_GROUP;@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 设置为可以手动消费factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;}
}

2.使用示例

  @KafkaListener(topics = Constants.KAFKA_USER_TOPIC, groupId = Constants.KAFKA_SERVER_GROUP)public void syncUserByKafKa(String message, Acknowledgment ack) {try {// 调用具体的执行方法bb(message);// 提交kafka消费位移ack.acknowledge();} catch (Exception e) {log.error("失败:" + e.getMessage() + "消息:" + message);} finally {// 提交kafka消费位移ack.acknowledge();}}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 计算机图形系统发展的前世今生
  • 计算机毕业设计 基于SpringBoot的课程教学平台的设计与实现 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试
  • 【深度学习】【onnxruntime】C++调用onnx
  • 三维动画|创意无限,让品牌传播更精彩!
  • 2024年第十五届蓝桥杯青少组国赛撞期GESP认证、放弃那个?
  • C语言:刷题日志(3)
  • Golang | Leetcode Golang题解之第393题UTF-8编码验证
  • 【系统架构设计师-2015年】案例分析-答案及详解
  • 使用Azure+C#+visual studio开发图像目标检测系统
  • 《黑神话.悟空》与人工智能AI重塑经典与探索未来的交织
  • Android内存知识总结
  • JAVA毕业设计173—基于Java+Springboot+vue3的酒店民宿管理系统(源代码+数据库)
  • 浙大数据结构:03-树2 List Leaves
  • 数据库MySQL
  • 源码到class字节码的编译流程 字节码到内存的Java类加载流程
  • (三)从jvm层面了解线程的启动和停止
  • [ 一起学React系列 -- 8 ] React中的文件上传
  • 「译」Node.js Streams 基础
  • 【跃迁之路】【641天】程序员高效学习方法论探索系列(实验阶段398-2018.11.14)...
  • ABAP的include关键字,Java的import, C的include和C4C ABSL 的import比较
  • ES6--对象的扩展
  • golang 发送GET和POST示例
  • Java到底能干嘛?
  • Vue.js源码(2):初探List Rendering
  • 从零开始学习部署
  • 官方新出的 Kotlin 扩展库 KTX,到底帮你干了什么?
  • 类orAPI - 收藏集 - 掘金
  • 如何编写一个可升级的智能合约
  • 如何合理的规划jvm性能调优
  • 使用阿里云发布分布式网站,开发时候应该注意什么?
  • 我与Jetbrains的这些年
  • puppet连载22:define用法
  • ​Python 3 新特性:类型注解
  • ​人工智能之父图灵诞辰纪念日,一起来看最受读者欢迎的AI技术好书
  • #define
  • #前后端分离# 头条发布系统
  • #我与虚拟机的故事#连载20:周志明虚拟机第 3 版:到底值不值得买?
  • $.ajax()方法详解
  • (1综述)从零开始的嵌入式图像图像处理(PI+QT+OpenCV)实战演练
  • (7)摄像机和云台
  • (代码示例)使用setTimeout来延迟加载JS脚本文件
  • (动手学习深度学习)第13章 计算机视觉---图像增广与微调
  • (附源码)ssm学生管理系统 毕业设计 141543
  • (十) 初识 Docker file
  • (四) Graphivz 颜色选择
  • (学习日记)2024.01.19
  • (一)、软硬件全开源智能手表,与手机互联,标配多表盘,功能丰富(ZSWatch-Zephyr)
  • (原創) 如何使用ISO C++讀寫BMP圖檔? (C/C++) (Image Processing)
  • ..thread“main“ com.fasterxml.jackson.databind.JsonMappingException: Jackson version is too old 2.3.1
  • .net 8 发布了,试下微软最近强推的MAUI
  • .Net Core中Quartz的使用方法
  • .Net mvc总结
  • .NET NPOI导出Excel详解
  • .net 后台导出excel ,word
  • .net 使用ajax控件后如何调用前端脚本