当前位置: 首页 > news >正文

Kafka整合SpringBoot

前文 Kafka客户端详解

引入依赖

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><spring-boot.version>2.3.12.RELEASE</spring-boot.version><fastjson.version>2.0.51</fastjson.version><!--我服务器安装的kafka版本是3.4.0 所以最好和安装版本对应--><kafka.version>3.4.0</kafka.version>
</properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>${spring-boot.version}</version></dependency></dependencies>
</dependencyManagement><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--kafka整合SpringBoot--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency>
</dependencies>



yml配置

spring:kafka:# 服务地址bootstrap-servers: 192.168.75.61:9092,192.168.75.62:9092,192.168.75.63:9092# 生产者相关配置producer:# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)acks: 1# 重试次数retries: 5# 默认批处理大小,ProducerBatch大小batch-size: 16384# 生产端缓冲区大小buffer-memory: 33554432# 发送消息的key - value 序列化类key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 消息发送 最大等待时长properties:linger:ms: 0# 消费端配置consumer:# 是否开启自动提交enable-auto-commit: true# 提交offset延时(接收到消息后多久提交offset)auto-commit-interval: 1000# 当kafka中没有初始offset或offset超出范围时将自动重置offset# earliest:重置为分区中最小的offset;# latest:重置为分区中最新的offset(消费分区中新产生的数据);# none:只要有一个分区不存在已提交的offset,就抛出异常;auto-offset-reset: latest# 接收消息的key - value 反序列化类key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:# 默认的消费组IDgroup:id: defaultConsumerGroup# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)session:timeout:ms: 120000# 消费请求超时时间request:timeout:ms: 180000




消息生产者

package com.hs.kfk.boot;import com.alibaba.fastjson2.JSON;
import com.hs.kfk.serializer.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Description: kafka整合SpringBoot,消息生产者* @Author 胡尚* @Date: 2024/8/8 17:03*/
@RestController()
public class BootProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send")public void sendMessage(){User user = new User(1L, "hushang", 24);String message = JSON.toJSON(user).toString();kafkaTemplate.send("disTopic", "key", message);}
}



消息消费者

package com.hs.kfk.boot;import com.alibaba.fastjson2.JSON;
import com.hs.kfk.serializer.User;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @Description: kafka整合SpringBoot 消息消费者* @Author 胡尚* @Date: 2024/8/8 17:09*/
@Component
public class BootConsumer {@KafkaListener(topics = {"disTopic"})public void consumerMessage(ConsumerRecord<String, String> record){int partition = record.partition();long offset = record.offset();String topic = record.topic();String key = record.key();String message = record.value();System.out.println("topic:" + topic + "\tpartition:" + partition + "\toffset: " + offset + "\tkey: " + key + "\tmessage: " + message);User user = JSON.parseObject(message, User.class);System.out.println(user);}
}



输出结果

topic:disTopic	partition:1	offset: 8	key: key	message: {"age":24,"uId":1,"username":"hushang"}
User{uId=1, username='hushang', age=24}
topic:disTopic	partition:1	offset: 9	key: key	message: {"age":24,"uId":1,"username":"hushang"}
User{uId=1, username='hushang', age=24}
topic:disTopic	partition:1	offset: 10	key: key	message: {"age":24,"uId":1,"username":"hushang"}
User{uId=1, username='hushang', age=24}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • HookNet- 用于病理全切片图像的多分辨率语义分割模型|顶刊精析·24-08-08
  • 9.1 迭装饰器的定义与使用:给你的 Python 代码加点“魔法”
  • 服务器启动jar包的时候报”no main manifest attribute“异常(快捷解决)
  • 数据加密-AES数据加密及C#实现
  • 为什么在网页编辑文字时键盘输入换行要停顿一下网页才显示
  • MaxKB:基于 LLM大语言模型的知识库问答系统实操
  • 部署服务器项目及发布
  • Spring统一处理请求响应与异常
  • QT 布局管理器之QHBoxLayout
  • C语言(16)指针(iv)
  • 当科幻照进现实:Figure 02机器人震撼发布!
  • 2024网络安全必会的基础知识
  • 【Linux】系列入门摘抄笔记-5-管理、创建、移动文件目录及文件搜索命令
  • 详解高性能中间件Iceoryx在ROS2中的使用
  • 【大模型从入门到精通13】openAI API 构建和评估大型语言模型(LLM)应用1
  • 【面试系列】之二:关于js原型
  • 2018天猫双11|这就是阿里云!不止有新技术,更有温暖的社会力量
  • ABAP的include关键字,Java的import, C的include和C4C ABSL 的import比较
  • Django 博客开发教程 8 - 博客文章详情页
  • GitUp, 你不可错过的秀外慧中的git工具
  • IDEA常用插件整理
  • IIS 10 PHP CGI 设置 PHP_INI_SCAN_DIR
  • Javascripit类型转换比较那点事儿,双等号(==)
  • javascript面向对象之创建对象
  • java概述
  • nodejs:开发并发布一个nodejs包
  • overflow: hidden IE7无效
  • 多线程事务回滚
  • 京东美团研发面经
  • 码农张的Bug人生 - 见面之礼
  • 微信开放平台全网发布【失败】的几点排查方法
  • scrapy中间件源码分析及常用中间件大全
  • 好程序员大数据教程Hadoop全分布安装(非HA)
  • ​DB-Engines 12月数据库排名: PostgreSQL有望获得「2020年度数据库」荣誉?
  • ​LeetCode解法汇总2808. 使循环数组所有元素相等的最少秒数
  • # 利刃出鞘_Tomcat 核心原理解析(二)
  • # 移动硬盘误操作制作为启动盘数据恢复问题
  • #include到底该写在哪
  • #NOIP 2014# day.1 T3 飞扬的小鸟 bird
  • $refs 、$nextTic、动态组件、name的使用
  • (152)时序收敛--->(02)时序收敛二
  • (17)Hive ——MR任务的map与reduce个数由什么决定?
  • (2024最新)CentOS 7上在线安装MySQL 5.7|喂饭级教程
  • (编程语言界的丐帮 C#).NET MD5 HASH 哈希 加密 与JAVA 互通
  • (第61天)多租户架构(CDB/PDB)
  • (附源码)ssm基于web技术的医务志愿者管理系统 毕业设计 100910
  • (附源码)计算机毕业设计ssm电影分享网站
  • (四十一)大数据实战——spark的yarn模式生产环境部署
  • (循环依赖问题)学习spring的第九天
  • (一)Linux+Windows下安装ffmpeg
  • ******之网络***——物理***
  • .bat批处理(十一):替换字符串中包含百分号%的子串
  • .Net - 类的介绍
  • .net framwork4.6操作MySQL报错Character set ‘utf8mb3‘ is not supported 解决方法
  • .net web项目 调用webService