当前位置: 首页 > news >正文

SpingBoot集成kafka-发送读取消息示例

SpingBoot集成kafka开发

    • kafka的几个常见概念
  • 1、springboot和kafka对应版本(重要)
  • 2、创建springboot项目,引入kafka依赖
    • 2.1、生产者EventProducer
    • 2.2、消费者EventConsumer
    • 2.3、启动生产者的方法SpringBoot01KafkaBaseApplication
    • 2.4、application.yml
    • 2.5、pom.xml
    • 2.6、启动springboot项目的启动类(Application)报错
  • 3、springboot集成kafka读取最早的消息
    • 3.1、如何设置消费者auto-offset-reset: earliest
    • 3.2、设置消费者auto-offset-reset: earliest后存在的问题
      • 3.2.1、修改消费组ID
      • 3.2.2、手动重置偏移量
          • 3.2.2.1、手动将偏移量设置为最早
          • 3.2.2.2、手动将偏移量设置为最新

kafka的几个常见概念

在这里插入图片描述

1、springboot和kafka对应版本(重要)

https://spring.io/projects/spring-kafka

在这里插入图片描述

在这里插入图片描述

2、创建springboot项目,引入kafka依赖

在这里插入图片描述

2.1、生产者EventProducer

package com.power.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendEvent(){kafkaTemplate.send("hello-topic","hello kafka");}
}

2.2、消费者EventConsumer

package com.power.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class EventConsumer {//采用监听的方式接收事件(消息,数据)@KafkaListener(topics = {"hello-topic"},groupId="hello-group")public void onEvent(String event){System.out.println("读取到的事件:"+event);}
}

2.3、启动生产者的方法SpringBoot01KafkaBaseApplication

执行一次该方法,会调用一次生产者发送一次消息。
即每执行一次,会调用EventProducer类下的sendEvent方法一次。

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid test01(){eventProducer.sendEvent();}
}

2.4、application.yml

spring:application:#应用名称name: spring-boot-01-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的服务器ip>:9092#配置生产者(有24个配置)#producer:#配置消费者(有24个配置)#consumer:

2.5、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.2</version><relativePath /></parent><groupId>org.powernode</groupId><artifactId>spring-boot-01-kafka-base</artifactId><version>0.0.1-SNAPSHOT</version><name>kafkaSpringBootProject</name><description>kafka project for Spring Boot</description><properties><java.version>8</java.version></properties><repositories><repository><id>central</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url><layout>default</layout><!-- 是否开启发布版构件下载 --><releases><enabled>true</enabled></releases><!-- 是否开启快照版构件下载 --><snapshots><enabled>false</enabled></snapshots></repository></repositories><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><version>2.8.0</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

2.6、启动springboot项目的启动类(Application)报错

项目启动类

package com.power;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);System.out.println("启动成功--------------------------");}
}

启动服务后发现报错:
在这里插入图片描述

修改server.properties配置文件:
在这里插入图片描述修改前:

在这里插入图片描述
修改后:
在这里插入图片描述

3、springboot集成kafka读取最早的消息

已经被消费者读取/消费的消息,无法被新启动的消费组消息的,那么新启动的消费组该如何读取最早的消息呢,可以通过设置消费者auto-offset-reset: earliest去实现。
在这里插入图片描述

3.1、如何设置消费者auto-offset-reset: earliest

在这里插入图片描述

1、修改application.yml
在这里插入图片描述

3.2、设置消费者auto-offset-reset: earliest后存在的问题

在这里插入图片描述

3.2.1、修改消费组ID

原消费组ID
在这里插入图片描述
修改后的消费组ID
在这里插入图片描述4、新的消费组ID成功读取到之前的消息
在这里插入图片描述

3.2.2、手动重置偏移量

3.2.2.1、手动将偏移量设置为最早
#示例:./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-earliest --execute

来到kafka安装目录下:

在这里插入图片描述执行如下命令:

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group --topic hello-topic --reset-offsets --to-earliest --execute

执行后报错

在这里插入图片描述
需要先停掉服务,在去手动重置偏移量,此时重置偏移量成功,偏移量为0

3.2.2.2、手动将偏移量设置为最新
#示例:./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-latest --execute
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group --topic hello-topic --reset-offsets --to-latest --execute

设置成功,此时偏移量已为最新:
在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • JS Blob与ArrayBuffer:深入解析二者关系及应用场景
  • 2024.8.26 Python,最大子数和与动态规划,最小路径和,分割回文串,字典序排数,最长重复子数组(动态规划)
  • Python中csv文件的操作2
  • 3DsMax将两个模型的UV展到一个UV上面
  • 启动kafka
  • 网安新声 | 网易云音乐崩了:网络安全如何守护在线体验
  • 操作系统线程分离
  • 数学建模学习(128):使用Python结合CILOS与熵法的多准则决策权重确定
  • 浏览器发送HTTP请求的过程
  • ABC 368 G - Add and Multiply Queries
  • [Day 63] 區塊鏈與人工智能的聯動應用:理論、技術與實踐
  • PyTorch踩坑记录1
  • SQLserver中的DATEADD使用、avg使用、Round使用
  • iOS profiles文件过期如何更新
  • Linux环境下使用Git把代码上传到云端
  • 【跃迁之路】【669天】程序员高效学习方法论探索系列(实验阶段426-2018.12.13)...
  • 230. Kth Smallest Element in a BST
  • C++入门教程(10):for 语句
  • flutter的key在widget list的作用以及必要性
  • JavaScript设计模式系列一:工厂模式
  • JS 面试题总结
  • MD5加密原理解析及OC版原理实现
  • Redis提升并发能力 | 从0开始构建SpringCloud微服务(2)
  • 初识 webpack
  • 构造函数(constructor)与原型链(prototype)关系
  • 基于阿里云移动推送的移动应用推送模式最佳实践
  • 聊聊flink的TableFactory
  • 小李飞刀:SQL题目刷起来!
  • raise 与 raise ... from 的区别
  • ​用户画像从0到100的构建思路
  • #数学建模# 线性规划问题的Matlab求解
  • (2)STM32单片机上位机
  • (delphi11最新学习资料) Object Pascal 学习笔记---第2章第五节(日期和时间)
  • (vue)页面文件上传获取:action地址
  • (web自动化测试+python)1
  • (附源码)ssm高校社团管理系统 毕业设计 234162
  • (回溯) LeetCode 78. 子集
  • (论文阅读22/100)Learning a Deep Compact Image Representation for Visual Tracking
  • (三分钟)速览传统边缘检测算子
  • (转)http-server应用
  • (转)使用VMware vSphere标准交换机设置网络连接
  • (转载)从 Java 代码到 Java 堆
  • .NET C# 操作Neo4j图数据库
  • .NET Core实战项目之CMS 第一章 入门篇-开篇及总体规划
  • .net 逐行读取大文本文件_如何使用 Java 灵活读取 Excel 内容 ?
  • @GetMapping和@RequestMapping的区别
  • [Android]Android开发入门之HelloWorld
  • [C# WPF] DataGrid选中行或选中单元格的背景和字体颜色修改
  • [C#]winform部署yolov9的onnx模型
  • [C/C++]数据结构 循环队列
  • [C++] vector对比list deque的引出
  • [C++]AVL树怎么转
  • [Deepin] 简单使用 RustDesk 实现远程访问Deepin
  • [HJ56 完全数计算]
  • [JavaScript]_[初级]_[不使用JQuery原生Ajax提交表单文件并监听进度]