当前位置: 首页 > news >正文

kafak推送消息。

1、引入依赖

maven

<dependency>  <groupId>org.springframework.kafka</groupId>  <artifactId>spring-kafka</artifactId>  <version>2.8.0</version>  
</dependency>

gradle

dependencies {compile "org.springframework.kafka:spring-kafka
}

2、添加配置

在application.properties或application.yml中配置Kafka

spring:  kafka:  bootstrap-servers: localhost:9092 # Kafka集群的地址,格式为host:port,多个地址用逗号分隔  consumer:  group-id: myGroup # 消费者群组ID,用于标识一组消费者实例  auto-offset-reset: earliest # 当Kafka中没有初始偏移量或当前偏移量不再存在时(例如,数据被删除),从何处开始读取:earliest表示从最早的记录开始,latest表示从最新的记录开始  key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 键的反序列化器,用于将字节转换为Java对象  value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化器,用于将字节转换为Java对象  producer:  key-serializer: org.apache.kafka.common.serialization.StringSerializer # 键的序列化器,用于将Java对象转换为字节  value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化器,用于将Java对象转换为字节

3、代码

1、方法

异步调用

import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.kafka.support.SendResult;  
import org.springframework.util.concurrent.ListenableFuture;  
import org.springframework.util.concurrent.ListenableFutureCallback;  public class KafkaMessageSender {  @Autowired  private KafkaTemplate<String, String> kafkaTemplate;  public void sendMessage(String topic, String message) {  // 同步发送消息并等待响应  ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);  try {  SendResult<String, String> result = future.get(); // 阻塞等待发送结果  System.out.println("Sent message=[" + message + "] with topic=[" + topic + "]");  } catch (Exception e) {  e.printStackTrace();  }  }  public void sendMessageWithCallback(String topic, String message) {  // 异步发送消息并注册回调  ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);  future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {  @Override  public void onSuccess(SendResult<String, String> result) {  System.out.println("Sent message=[" + message + "] with topic=[" + topic + "]");  }  @Override  public void onFailure(Throwable ex) {  System.err.println("Failed to send message=[" + message + "] with topic=[" + topic + "]: " + ex.getMessage());  }  });  }  // 其他方法...  
}

调用方法

public class SomeService {  @Autowired  private KafkaMessageSender kafkaMessageSender;  public void someMethod() {  String topic = "some-topic";  String message = "Hello, Kafka!";  kafkaMessageSender.sendMessageWithCallback(topic, message);  }  
}

同步调用

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class MessageProducer {private final KafkaTemplate<String, String> kafkaTemplate;public MessageProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {try {kafkaTemplate.send(topic, message).get();System.out.println("Message sent successfully.");} catch (InterruptedException | ExecutionException e) {System.err.println("Failed to send message: " + e.getMessage());Thread.currentThread().interrupt();}}
}
2、send

1、发送简单的消息

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);

2、发送带有键的消息

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);

3、发送带有分区号的消息
``java
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, message);

4、发送带有时间戳的消息
```java
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, message, timestamp);

5、发送带有自定义头信息的消息

Map<String, Object> headers = new HashMap<>();  
headers.put("myHeaderKey", "myHeaderValue");  
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, message, timestamp, headers);

6、使用ProducerRecord发送消息

ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, message);  
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);

7、使用Message发送消息(Spring Kafka 2.2及更高版本)

Message<String> message = MessageBuilder.withPayload(payload).setHeader(KafkaHeaders.TOPIC, topic).build();  
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 将Nginx注册为Windows服务
  • uniapp布局
  • react 列表页面中管理接口请求的参数
  • 代码随想录冲冲冲 Day34 动态规划Part2
  • k8s集群环境搭建(一主二从--kubeadm安装)
  • 人工智能再次进化 善用AI提升营运效率
  • 小哥进小区难进门难,此现状如何破局?
  • javaee、ssm(maven)、springboot(maven)项目目录结构以及编译后文件目录存放路径
  • anomaly detection
  • FreeRTOS线程数据传递---消息队列
  • Git如何安装和配置
  • 详解PASCAL VOC数据集及基于Python和PyTorch的下载、解析及可视化【目标检测+类别分割】
  • QT5.14.2编译有界面的DLL供C#Winform程序调用步骤
  • C#编程语言及.NET 平台快速入门指南
  • 高级java每日一道面试题-2024年9月02日-基础篇-什么是脏读、不可重复读和幻读?
  • [iOS]Core Data浅析一 -- 启用Core Data
  • AHK 中 = 和 == 等比较运算符的用法
  • CSS魔法堂:Absolute Positioning就这个样
  • HashMap剖析之内部结构
  • httpie使用详解
  • iOS | NSProxy
  • java多线程
  • JS学习笔记——闭包
  • leetcode386. Lexicographical Numbers
  • LeetCode刷题——29. Divide Two Integers(Part 1靠自己)
  • PHP 使用 Swoole - TaskWorker 实现异步操作 Mysql
  • SAP云平台运行环境Cloud Foundry和Neo的区别
  • SpiderData 2019年2月16日 DApp数据排行榜
  • 安装python包到指定虚拟环境
  • 动手做个聊天室,前端工程师百无聊赖的人生
  • 反思总结然后整装待发
  • 后端_MYSQL
  • 前端性能优化——回流与重绘
  • 区块链分支循环
  • 区块链技术特点之去中心化特性
  • 做一名精致的JavaScripter 01:JavaScript简介
  • ​520就是要宠粉,你的心头书我买单
  • ​数据结构之初始二叉树(3)
  • #NOIP 2014#Day.2 T3 解方程
  • (4)STL算法之比较
  • (day 2)JavaScript学习笔记(基础之变量、常量和注释)
  • (LeetCode C++)盛最多水的容器
  • (php伪随机数生成)[GWCTF 2019]枯燥的抽奖
  • (二)PySpark3:SparkSQL编程
  • (附源码)ssm基于jsp的在线点餐系统 毕业设计 111016
  • (实战)静默dbca安装创建数据库 --参数说明+举例
  • (学习日记)2024.01.19
  • (转)Windows2003安全设置/维护
  • (转)一些感悟
  • .NET 8.0 发布到 IIS
  • .Net Core与存储过程(一)
  • .Net Memory Profiler的使用举例
  • .NET Remoting学习笔记(三)信道
  • .NET 中什么样的类是可使用 await 异步等待的?
  • @Mapper作用