当前位置: 首页 > news >正文

SpringBoot 集成 Kafka消息中间件,Docker安装Kafka环境

前述

提供kafka、zooker在docker环境下进行安装的示例,springBoot集成kafka实现producer-生产者和consumer-消费者(监听消费:single模式和batch模式)的功能实现

环境安装


# 拉取镜像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka# 运行zooker
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
#运行kafka
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092 --env KAFKA_LOG_DIRS=/kafka/KafkaLog --volume /home/vagrant/kafka/localtime:/etc/localtime --volume /home/vagrant/kafka/log:/app/kafka/log wurstmeister/kafka:latest

SpringBoot集成Kafka消息中间件

pom依赖


<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency><!-- kafka -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency><!-- fastjson2 -->
<dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>${fastjson2.version}</version>
</dependency><!--    hutool工具类    -->
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool.version}</version>
</dependency>

配置

yaml文件配置

spring:kafka:# kafka地址bootstrap-servers: localhost:9092# 生产者配置producer:# 重试次数retries: 3#  批量提交batch-size: 16384# 缓存空间buffer-memory: 33554432# 消费者配置consumer:group-id: springboot-mq-kafka-demo# 手动提交enable-auto-commit: falseauto-offset-reset: latestproperties:# 超时时间session.timeout.ms: 60000# 监听listener:# 类型: single/batchtype: batchlog-container-config: false# 分区concurrency: 5# 手动提交ack-mode: MANUAL_IMMEDIATE
Config
import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;import java.util.Objects;/*** kafka配置类** @author yunnuo* @since 1.0.0*/
@Configuration
@EnableConfigurationProperties({KafkaProperties.class})
@EnableKafka
@AllArgsConstructor
public class KafkaConfig {private final KafkaProperties kafkaProperties;private static final Integer DEFAULT_PARTITION_NUM = 3;private static final String GROUP_ID = "springboot-mq-kafka-demo-batch-manual_immediate";@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());}/*** kafka 默认 ContainerFactory** @return {@link ConcurrentKafkaListenerContainerFactory}*/@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 提交模式ContainerProperties.AckMode ackMode = Objects.nonNull(kafkaProperties.getListener().getAckMode()) ? kafkaProperties.getListener().getAckMode() : ContainerProperties.AckMode.MANUAL_IMMEDIATE;factory.getContainerProperties().setAckMode(ackMode);// 分区Integer concurrency = kafkaProperties.getListener().getConcurrency();concurrency = (Objects.nonNull(concurrency) && concurrency > 0) ? concurrency : DEFAULT_PARTITION_NUM;factory.setConcurrency(concurrency);// 拉取类型:单个/批量KafkaProperties.Listener.Type type = kafkaProperties.getListener().getType();factory.setBatchListener(Objects.equals(type, KafkaProperties.Listener.Type.BATCH));return factory;}/*** 自定义 ContainerFactory** @return {@link ConcurrentKafkaListenerContainerFactory}*/@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> batchContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.getContainerProperties().setGroupId(GROUP_ID);factory.setConcurrency(DEFAULT_PARTITION_NUM);factory.setBatchListener(true);factory.getContainerProperties().setPollTimeout(3000);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}}

生产者(producer)

下面示例是采用API方式进行调用发送kafka消息,进行模拟生产者

请求DTO

import lombok.Data;/*** kafka 发送 请求** @author yunnuo <a href="2552846359@qq.com">Email: 2552846359@qq.com</a>* @date 2023-12-27*/
@Data
public class KafkaPushDemoReq {private String topic;private String msg;}
API接口发送kafka

import com.ukayunnuo.core.Result;
import com.ukayunnuo.core.exception.ServiceException;
import com.ukayunnuo.domain.request.KafkaPushDemoReq;
import org.springframework.http.HttpStatus;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;/*** kafka 测试 api 接口** @author yunnuo* @since 1.0.0*/
@RestController
@RequestMapping("/demo/kafka")
public class KafkaPushController {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;/*** kafka 发送消息 (无结果)** @param req 请求参数* @return 结果*/@PostMapping("pushTest")public Result<Boolean> pushMsgTest(@RequestBody KafkaPushDemoReq req) {kafkaTemplate.send(req.getTopic(), req.getMsg());return Result.success(Boolean.TRUE);}}

消费者(consumer)


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;/*** kafka 消费** @author yunnuo <a href="2552846359@qq.com">Email: 2552846359@qq.com</a>* @date 2023-12-27*/
@Slf4j
@Component
public class KafkaConsumerTest {/*** 使用默认配置的 ContainerFactory 进行监听 (single 模式)** @param record        消息* @param acknowledgment 提交器*/@KafkaListener(topics = {"default_container_factory-test.kafka.demo.default.single"})public void receiveMessageDefaultSingle(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {log.info("receiveMessageDefault-single record key:{}, value:{}", record.key(), record.value());acknowledgment.acknowledge();}/*** 使用默认配置的 ContainerFactory 进行监听 (batch 模式)** @param records        消息* @param acknowledgment 提交器*/@KafkaListener(topics = {"default_container_factory-test.kafka.demo.default.batch"})public void receiveMessageDefaultBatch(ConsumerRecords<String, String> records, Acknowledgment acknowledgment) {log.info("receiveMessageDefault-batch records size:{}", records.count());for (ConsumerRecord<String, String> record : records) {log.info("receiveMessageDefault-batch record key:{}, value:{}", record.key(), record.value());}acknowledgment.acknowledge();}/*** 使用自定义的 batchContainerFactory 进行监听** @param records        消息* @param acknowledgment 提交器*/@KafkaListener(topics = {"batch_container_factory-test.kafka.demo.batch"}, containerFactory = "batchContainerFactory")public void receiveMessage(ConsumerRecords<String, String> records, Acknowledgment acknowledgment) {log.info("receiveMessage records size:{}", records.count());for (ConsumerRecord<String, String> record : records) {log.info("receiveMessage record key:{}, value:{}", record.key(), record.value());}acknowledgment.acknowledge();}}

IDEA-HTTP请求示例

### 消息发送-single模式
POST http://localhost:8087/demo/kafka/pushTest
Content-Type: application/json{"topic": "default_container_factory-test.kafka.demo.default.single","msg": "test single send..."
}### 消息发送-batch模式
POST http://localhost:8087/demo/kafka/pushTest
Content-Type: application/json{"topic": "default_container_factory-test.kafka.demo.default.batch","msg": "test batch send..."
}### 消息发送-自定义
POST http://localhost:8087/demo/kafka/pushTest
Content-Type: application/json{"topic": "batch_container_factory-test.kafka.demo.batch","msg": "test custom batch send..."
}

相关文章:

  • Eureka相关面试题及答案
  • Jenkins 系列:Jenkins 安装(Windows、Mac、Centos)和简介
  • C++基础-文件读写操作详解
  • SpringBoot 请求参数
  • pycharm配置pyrcc5外部工具
  • 详解数组的轮转
  • 总结项目中oauth2模块的配置流程及实际业务oauth2认证记录(Spring Security)
  • ArcGIS Pro中Conda环境的Scripts文件解读
  • 在 Android 手机上从SD 卡恢复数据的 6 个有效应用程序
  • C#判断骨龄与生活年龄的比较
  • MySQL8 一键部署
  • 插入排序 InsertionSort
  • 多线程编程设计模式(单例,阻塞队列,定时器,线程池)
  • asp.net core 教程
  • flutter flutter pub cache clean和flutter clean区别
  • 《深入 React 技术栈》
  • 【笔记】你不知道的JS读书笔记——Promise
  • CSS实用技巧干货
  • gf框架之分页模块(五) - 自定义分页
  • open-falcon 开发笔记(一):从零开始搭建虚拟服务器和监测环境
  • Spark RDD学习: aggregate函数
  • 分布式任务队列Celery
  • 快速体验 Sentinel 集群限流功能,只需简单几步
  • 深度学习中的信息论知识详解
  • AI又要和人类“对打”,Deepmind宣布《星战Ⅱ》即将开始 ...
  • shell使用lftp连接ftp和sftp,并可以指定私钥
  • 宾利慕尚创始人典藏版国内首秀,2025年前实现全系车型电动化 | 2019上海车展 ...
  • 数据可视化之下发图实践
  • ​io --- 处理流的核心工具​
  • ​如何使用ArcGIS Pro制作渐变河流效果
  • (1)(1.11) SiK Radio v2(一)
  • (13)[Xamarin.Android] 不同分辨率下的图片使用概论
  • (32位汇编 五)mov/add/sub/and/or/xor/not
  • (PWM呼吸灯)合泰开发板HT66F2390-----点灯大师
  • (非本人原创)史记·柴静列传(r4笔记第65天)
  • (深度全面解析)ChatGPT的重大更新给创业者带来了哪些红利机会
  • (十)c52学习之旅-定时器实验
  • (原創) 如何安裝Linux版本的Quartus II? (SOC) (Quartus II) (Linux) (RedHat) (VirtualBox)
  • (转)用.Net的File控件上传文件的解决方案
  • **python多态
  • .mat 文件的加载与创建 矩阵变图像? ∈ Matlab 使用笔记
  • .net操作Excel出错解决
  • @Repository 注解
  • @RestControllerAdvice异常统一处理类失效原因
  • [ vulhub漏洞复现篇 ] AppWeb认证绕过漏洞(CVE-2018-8715)
  • [.net]官方水晶报表的使用以演示下载
  • [2010-8-30]
  • [Angular 基础] - 指令(directives)
  • [c++] C++多态(虚函数和虚继承)
  • [Flutter]设置应用包名、名称、版本号、最低支持版本、Icon、启动页以及环境判断、平台判断和打包
  • [HOW TO]怎么在iPhone程序中实现可多选可搜索按字母排序的联系人选择器
  • [javaee基础] 常见的javaweb笔试选择题含答案
  • [JS] 常用正则表达式集(一)
  • [LaTex]arXiv投稿攻略——jpg/png转pdf
  • [leetcode] Balanced Binary Tree