SpringBoot 集成 kafka
SpringBoot 集成 kafka
第一步 通过 docker 启动 kafka
这个已经在前一篇文章详细描述了,这里就不在赘述了。没有看过的小伙伴可以去看这篇文章:docker启动kafka并挂载配置文件,并让外部环境连接kafka
第二步 创建 SpringBoot 工程,并引入 kafka 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.wanfeng</groupId><artifactId>kafka-01-base</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-01-base</name><description>kafka-01-base</description><properties><java.version>17</java.version></properties><dependencies><!--SpringBoot 起步依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--kafka 依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!--热部署插件--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><!--lombok 插件--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!--单元测试起步依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>
第三步 编写 application.yml 文件
spring:application:# 应用名称name: kafka-01-basekafka:# kafka 连接地址bootstrap-servers: 自己的 IP:9092
第四步 编写生产者
package com.wanfeng.producer;import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;/*** 作者:晚枫* 时间:2024/9/1 8:57*/
@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;public void sendEvent() {// 参数一:kafka 主题名字// 参数二:需要发送的事件kafkaTemplate.send("hello", "喜欢欣宝");}}
第五步 编写消费者
package com.wanfeng.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** 作者:晚枫* 时间:2024/9/1 9:00*/
@Component
public class EventConsumer {// 采用监听的方式接收事件// topics 指定需要监听的主题// groupId 指定消费者组 id@KafkaListener(topics = "hello", groupId = "hello-group")public void onEvent(String event) {System.out.println("接收到的事件为:" + event);}}
第六步 编写测试方法
编写测试方法的目的就是让生产者发送事件
package com.wanfeng;import com.wanfeng.producer.EventProducer;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class Kafka01BaseApplicationTests {@Resourceprivate EventProducer eventProducer;@Testvoid sendEvent() {eventProducer.sendEvent();}}
最后,先启动消费者对事件进行监听,然后再启动测试方法,让生产者发送事件,我们就可以在控制台看到事件被打印出来了!