当前位置: 首页 > news >正文

SpringBoot Kafka消费者 多kafka配置

一、配置文件

xxxxxx:kafka:bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092consumer:poll-timeout: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-commit: falseoffset-reset: earliestrecords: 10session-timeout: 150000poll-interval: 360000request-timeout: 60000

二、KafkaConfig

package com.xxxxxx.xxxxxx.config;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
@EnableKafka
public class KafkaConfig {@Value("${xxxxxx.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${xxxxxx.kafka.consumer.poll-timeout}")private Integer pollTimeout;@Value("${xxxxxx.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${xxxxxx.kafka.consumer.value-deserializer}")private String valueDeserializer;@Value("${xxxxxx.kafka.consumer.auto-commit}")private String autoCommit;@Value("${xxxxxx.kafka.consumer.offset-reset}")private String offsetReset;@Value("${xxxxxx.kafka.consumer.records}")private Integer records;@Value("${xxxxxx.kafka.consumer.session-timeout}")private Integer sessionTimeout;@Value("${xxxxxx.kafka.consumer.poll-interval}")private Integer pollInterval;@Value("${xxxxxx.kafka.consumer.request-timeout}")private Integer requestTimeout;@Bean(name = "ixxxxxxKafkaListenerContainerFactory")public KafkaListenerContainerFactory integratedEnergyKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//并发数量factory.setConcurrency(3);//设置在消费者中等待记录的最大阻塞时间。factory.getContainerProperties().setPollTimeout(pollTimeout);//ack模式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map consumerConfigs() {Map props = new HashMap<>();//Kafka集群props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//消费者组,只要group.id相同,就属于同一个消费者组//props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//是否自动提交offset,默认为true,设置为falseprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);//key反序列化器props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);//value反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);//一次消费信息条数props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);//earliest:第一次从头开始消费,之后按照offset开始消费;latest:只消费自己启动之后的消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);//session超时时间props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);//消费者轮询获取消息的最大时间间隔,超过此时间未获取消息,组将重新平衡,以便将分区重新分配给另一个成员props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);//客户端发起请求后,等待响应的最大时间。如果超时之前未收到响应,客户端会在必要时重新发起请求props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);return props;}
}

三、消费者

@KafkaListener(containerFactory = "xxxxxxEnergyKafkaListenerContainerFactory",id = "itsId",idIsGroup = false,groupId = "itsGroupId",topics = "itsTopic")public void consumerUser(@Payload String data,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,Acknowledgment ack,Consumer<?, ?> consumer){try{}catch (Exception e){}ack.acknowledge();}

相关文章:

  • 【星海出品】flask(一)demo
  • 【Nginx39】Nginx学习:upstream服务器组模块
  • 教给孩子们如何认真听讲
  • windowCPU虚拟化已禁用解决方案
  • AIX5.3安装weblogic10.3
  • 已解决:rm: 无法删除“/opt/module/zookeeper-3.4.10/zkData/zookeeper_server.pid“: 权限不够
  • 【23真题】简单!原题很多!211!
  • IEC104 工具和代码库
  • 【使用教程】在Ubuntu下PMM60系列一体化伺服电机通过PDO跑循环同步位置模式详解
  • Android 12.0 内置MTK平台音乐播放器
  • pytorch搭建squeezenet网络的整套工程(升级版)
  • ​​​​​​​​​​​​​​汽车网络信息安全分析方法论
  • STM32_project:led_beep
  • RFID智慧物流设计解决方案
  • 剖析WPF模板机制的内部实现
  • 实现windows 窗体的自己画,网上摘抄的,学习了
  • 【知识碎片】第三方登录弹窗效果
  • 〔开发系列〕一次关于小程序开发的深度总结
  • 2017届校招提前批面试回顾
  • ES6核心特性
  • Github访问慢解决办法
  • go append函数以及写入
  • JAVA多线程机制解析-volatilesynchronized
  • JS变量作用域
  • js学习笔记
  • LeetCode541. Reverse String II -- 按步长反转字符串
  • Netty+SpringBoot+FastDFS+Html5实现聊天App(六)
  • niucms就是以城市为分割单位,在上面 小区/乡村/同城论坛+58+团购
  • orm2 中文文档 3.1 模型属性
  • Redis在Web项目中的应用与实践
  • Spark in action on Kubernetes - Playground搭建与架构浅析
  • STAR法则
  • Vue ES6 Jade Scss Webpack Gulp
  • 新版博客前端前瞻
  • 优化 Vue 项目编译文件大小
  • 原创:新手布局福音!微信小程序使用flex的一些基础样式属性(一)
  • Play Store发现SimBad恶意软件,1.5亿Android用户成受害者 ...
  • ###C语言程序设计-----C语言学习(6)#
  • (1/2)敏捷实践指南 Agile Practice Guide ([美] Project Management institute 著)
  • (11)MATLAB PCA+SVM 人脸识别
  • (C语言)二分查找 超详细
  • (C语言版)链表(三)——实现双向链表创建、删除、插入、释放内存等简单操作...
  • (html5)在移动端input输入搜索项后 输入法下面为什么不想百度那样出现前往? 而我的出现的是换行...
  • (多级缓存)缓存同步
  • (附程序)AD采集中的10种经典软件滤波程序优缺点分析
  • (深度全面解析)ChatGPT的重大更新给创业者带来了哪些红利机会
  • (四) Graphivz 颜色选择
  • (五) 一起学 Unix 环境高级编程 (APUE) 之 进程环境
  • (原創) 系統分析和系統設計有什麼差別? (OO)
  • .bat批处理(四):路径相关%cd%和%~dp0的区别
  • .form文件_SSM框架文件上传篇
  • .mkp勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复
  • .net mvc actionresult 返回字符串_.NET架构师知识普及
  • .NET/C# 使窗口永不激活(No Activate 永不获得焦点)
  • .NET与 java通用的3DES加密解密方法