当前位置: 首页 > news >正文

SpringBoot集成kafka-获取生产者发送的消息(阻塞式和非阻塞式获取)

在这里插入图片描述
在这里插入图片描述在这里插入图片描述

说明

CompletableFuture对象需要的SpringBoot版本为3.X.X以上,需要的kafka依赖版本为3.X.X以上,需要的jdk版本17以上。

1、阻塞式(等待式)获取生产者发送的消息

生产者:

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void getResult(){//Integer partition, Long timestamp, K key, @Nullable V dataCompletableFuture<SendResult<String, String>> result =kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "hello-kafka");//怎么拿结果,通过ListenableFuture类拿结果try {//1、阻塞式等待拿结果SendResult<String, String> sendResult = result.get();if(null!=sendResult.getRecordMetadata()){//kafka服务器确认已经拿到了消息System.out.println("消息发送成功:"+sendResult.getRecordMetadata().toString());}System.out.println("producerRecord:"+sendResult.getProducerRecord());} catch (Exception e) {e.printStackTrace();}}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid getResult(){eventProducer.getResult();}
}

测试结果:
在这里插入图片描述

消息发送成功:default-topic-0@1
2024-08-22 22:18:51.344  INFO 8976 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-hello-group-1, groupId=hello-group] Adding newly assigned partitions: hello-topic-0
producerRecord:ProducerRecord(topic=default-topic, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=k3, value=hello-kafka, timestamp=1724336330821)

2、非阻塞式(非等待式)获取生产者发送的消息

生产者:

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void getResult2(){//Integer partition, Long timestamp, K key, @Nullable V dataCompletableFuture<SendResult<String, String>> result =kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "hello-kafka");//怎么拿结果,通过CompletableFuture类拿结果try {//2、非阻塞式等待拿结果result.thenAccept((sendResult)->{if(null!=sendResult.getRecordMetadata()){//kafka服务器确认已经拿到了消息System.out.println("消息发送成功:"+sendResult.getRecordMetadata().toString());}System.out.println("producerRecord:"+sendResult.getProducerRecord());}).exceptionally((e)->{e.printStackTrace();//做消息发送失败的处理System.out.println("消息发送失败");return null;});} catch (Exception e) {e.printStackTrace();}}}

测试类:

@Test
void getResult2(){eventProducer.getResult2();
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 1111111111
  • 微服务:网关路由和登录校验
  • 计算机视觉与视觉大模型对板书检测效果对比
  • 上线eleme项目
  • 怎么整合spring security和JWT
  • 【Unity3D小技巧】Unity3D中实现FPS数值显示功能实现
  • CSS 的了解text-rendering属性
  • 大模型学习笔记 - LLM 之 LLaMA系列(待更新)
  • 缺失ffmpeg.dll要用什么修复方法?快速恢复丢失的ffmpeg.dll文件
  • C++基础面试题 | C和C++的区别?
  • 【小趴菜前端学习日记3】
  • 【速览】计算机网络(更新中)
  • 使用VRoid Studio二次元建模,创建专属于自己的二次元卡通人物模型,创建完全免费开源且属于自己VRM模型
  • css 宫格样式内容上下结构
  • 井盖异动传感器:为城市安全加码
  • 《Javascript数据结构和算法》笔记-「字典和散列表」
  • 4. 路由到控制器 - Laravel从零开始教程
  • java架构面试锦集:开源框架+并发+数据结构+大企必备面试题
  • QQ浏览器x5内核的兼容性问题
  • SegmentFault 技术周刊 Vol.27 - Git 学习宝典:程序员走江湖必备
  • Swift 中的尾递归和蹦床
  • Theano - 导数
  • 构建二叉树进行数值数组的去重及优化
  • 机器学习 vs. 深度学习
  • 基于MaxCompute打造轻盈的人人车移动端数据平台
  • 记一次用 NodeJs 实现模拟登录的思路
  • 模仿 Go Sort 排序接口实现的自定义排序
  • 前端每日实战 2018 年 7 月份项目汇总(共 29 个项目)
  • 详解移动APP与web APP的区别
  • 曜石科技宣布获得千万级天使轮投资,全方面布局电竞产业链 ...
  • ​补​充​经​纬​恒​润​一​面​
  • ​批处理文件中的errorlevel用法
  • #php的pecl工具#
  • (14)目标检测_SSD训练代码基于pytorch搭建代码
  • (14)学习笔记:动手深度学习(Pytorch神经网络基础)
  • (5)STL算法之复制
  • (Redis使用系列) Springboot 使用redis实现接口Api限流 十
  • (附源码)ssm高校运动会管理系统 毕业设计 020419
  • (机器学习-深度学习快速入门)第一章第一节:Python环境和数据分析
  • (离散数学)逻辑连接词
  • (七)理解angular中的module和injector,即依赖注入
  • (三)mysql_MYSQL(三)
  • (三十)Flask之wtforms库【剖析源码上篇】
  • (四)linux文件内容查看
  • (四)鸿鹄云架构一服务注册中心
  • (五) 一起学 Unix 环境高级编程 (APUE) 之 进程环境
  • (原)记一次CentOS7 磁盘空间大小异常的解决过程
  • (转)Linux NTP配置详解 (Network Time Protocol)
  • (转)创业的注意事项
  • (自用)交互协议设计——protobuf序列化
  • **PHP二维数组遍历时同时赋值
  • **PHP分步表单提交思路(分页表单提交)
  • .Net Core与存储过程(一)
  • .Net IOC框架入门之一 Unity
  • .net 后台导出excel ,word