当前位置: 首页 > news >正文

SpringBoot集成kafka-生产者发送消息

springboot集成kafka发送消息

  • 1、kafkaTemplate.send()方法
    • 1.1、springboot集成kafka发送消息Message对象消息
    • 1.2、springboot集成kafka发送ProducerRecord对象消息
    • 1.3、springboot集成kafka发送指定分区消息
  • 2、kafkaTemplate.sendDefault()方法
  • 3、kafkaTemplate.send(...)和kafkaTemplate.sendDefault(...)的区别

在这里插入图片描述在这里插入图片描述

1、kafkaTemplate.send()方法

1.1、springboot集成kafka发送消息Message对象消息

生产者代码

package com.power.producer;import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendMessage(){//通过构建器模式创建Message对象Message message = MessageBuilder.withPayload("hello-message").setHeader(KafkaHeaders.TOPIC,"test-topic-02")//在header中放置topic的名字.build();kafkaTemplate.send(message);}
}

2、测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid testMessage(){eventProducer.sendMessage();}@Testvoid testMessage(){eventProducer.sendMessage();}
}

1.2、springboot集成kafka发送ProducerRecord对象消息

生产者

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendMessage(){//通过构建器模式创建Message对象Message message = MessageBuilder.withPayload("hello-message").setHeader(KafkaHeaders.TOPIC,"test-topic-02")//在header中放置topic的名字.build();kafkaTemplate.send(message);}public void sendProducerRecord(){//Headers里面放一些信息(信息是key-value键值对),到时候消费组接收到消息后,可以拿到这个Headers里面放的信息Headers headers = new RecordHeaders();headers.add("phone","17676767676".getBytes(StandardCharsets.UTF_8));headers.add("orderId","OD1456467576467".getBytes(StandardCharsets.UTF_8));//String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headersProducerRecord<String,String> record =new ProducerRecord("test-topic-02", 0, System.currentTimeMillis(), "key1", "value", headers);kafkaTemplate.send(record);}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid testProducerRecord(){eventProducer.sendProducerRecord();}}

1.3、springboot集成kafka发送指定分区消息

生产者:(方法sendEvent4)

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendEvent4(){//String topic, Integer partition, Long timestamp, K key, V datakafkaTemplate.send("test-topic-02",0,System.currentTimeMillis(),"k2","hello-kafka");}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendEvent4(){eventProducer.sendEvent4();}
}

2、kafkaTemplate.sendDefault()方法

生产者:

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendDefault(){//Integer partition, Long timestamp, K key, V datakafkaTemplate.sendDefault(0,System.currentTimeMillis(),"k3","hello-kafka");}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendDefault(){eventProducer.sendDefault();}
}

配置文件:设置默认topic
不设置topic运行生产者会报找不到topic的错。
在这里插入图片描述

3、kafkaTemplate.send(…)和kafkaTemplate.sendDefault(…)的区别

在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 【前端面试】浏览器原理解读
  • Scratch深潜:解锁递归与分治算法的编程之门
  • 【补充篇】AUTOSAR多核OS介绍(下)
  • JavaScript基础知识(六)
  • Python案例 | Kriging预测钢筋混凝土梁长期挠度
  • C++ 设计模式(1. 单例模式)
  • 数据库E-R 图
  • Total Commander 右键卡死问题,百度云冲突
  • Yolov10网络详解与实战(附数据集)
  • 软件测试-测试分类
  • Linux云计算 |【第二阶段】SECURITY-DAY4
  • Big Data for AI实践:面向AI大模型开发和应用的大规模数据处理套件
  • 超声波模块HC_SR04(hal库)
  • 【图像超分】论文精读:AdaBM: On-the-Fly Adaptive Bit Mapping for Image Super-Resolution
  • 向量数据库中的PQ(Procduct Quantization)
  • @angular/forms 源码解析之双向绑定
  • [deviceone开发]-do_Webview的基本示例
  • [笔记] php常见简单功能及函数
  • 【跃迁之路】【641天】程序员高效学习方法论探索系列(实验阶段398-2018.11.14)...
  • 10个确保微服务与容器安全的最佳实践
  • extract-text-webpack-plugin用法
  • GitUp, 你不可错过的秀外慧中的git工具
  • Netty 框架总结「ChannelHandler 及 EventLoop」
  • Shadow DOM 内部构造及如何构建独立组件
  • vue:响应原理
  • 初识MongoDB分片
  • 给新手的新浪微博 SDK 集成教程【一】
  • 关于List、List?、ListObject的区别
  • 跨域
  • 使用Maven插件构建SpringBoot项目,生成Docker镜像push到DockerHub上
  • 通过获取异步加载JS文件进度实现一个canvas环形loading图
  • 线上 python http server profile 实践
  • 智能网联汽车信息安全
  • 函数计算新功能-----支持C#函数
  • ​linux启动进程的方式
  • ​二进制运算符:(与运算)、|(或运算)、~(取反运算)、^(异或运算)、位移运算符​
  • ​软考-高级-系统架构设计师教程(清华第2版)【第1章-绪论-思维导图】​
  • ​一些不规范的GTID使用场景
  • # 消息中间件 RocketMQ 高级功能和源码分析(七)
  • #LLM入门|Prompt#2.3_对查询任务进行分类|意图分析_Classification
  • (17)Hive ——MR任务的map与reduce个数由什么决定?
  • (3)(3.5) 遥测无线电区域条例
  • (C语言)输入自定义个数的整数,打印出最大值和最小值
  • (Matlab)基于蝙蝠算法实现电力系统经济调度
  • (pt可视化)利用torch的make_grid进行张量可视化
  • (ZT)出版业改革:该死的死,该生的生
  • (定时器/计数器)中断系统(详解与使用)
  • (二) Windows 下 Sublime Text 3 安装离线插件 Anaconda
  • (二)PySpark3:SparkSQL编程
  • (附源码)ssm高校志愿者服务系统 毕业设计 011648
  • (附源码)基于ssm的模具配件账单管理系统 毕业设计 081848
  • (离散数学)逻辑连接词
  • (七)glDrawArry绘制
  • (图文详解)小程序AppID申请以及在Hbuilderx中运行
  • (转)创业的注意事项