当前位置: 首页 > news >正文

使用消息队列、rocketMq实现通信

1背景

springboot框架,

2需求

后端需要调用一个类似于api这种作用的小工具,获得工具的返回值,后端再根据客户端的返回值进行更新数据操作

3讨论

1工具开发者使用的是python,将工具封装起来,暴露成web接口供后端调用
2方式一能满足需求了,但是考虑到后续这样的工具还有很多,方便工具的管理和集成,考虑使用消息队列的方式进行通信-》使用rocketmq
3既然使用rocketmq,那就要考虑同步还是异步处理消息
比较一下 RocketMQ 中的同步发送和异步发送
同步发送(Synchronous Send):
使用 rocketMQTemplate.syncSend
异步发送(Asynchronous Send):
使用 rocketMQTemplate.asyncSend
主要区别
(1)执行流程:
同步:调用线程会阻塞,直到收到服务器的响应或超时。
异步:调用后立即返回,不等待服务器响应。消息发送结果通过回调函数通知
(2)响应时间:
同步:总体响应时间较长,包含网络传输和服务器处理时间。
异步:立即返回,响应时间很短。
(3)可靠性保证:
同步:可以立即知道消息是否发送成功。
异步:无法立即知道发送结果,需要在回调中处理。
(4)异常处理:
同步:可以直接在调用代码中进行异常处理。
异步:异常需要在回调函数中处理。
(5)资源利用:
同步:可能造成线程阻塞,影响系统吞吐量。
异步:更有效地利用系统资源,特别是在高并发场景下。
(6)使用场景:
同步:适用于对可靠性要求高,且可以接受一定延迟的场景
异步:适用于高吞吐量、对延迟敏感的场景。

如果你需要立即知道消息是否成功发送,并且可以接受一些延迟,使用同步发送。
如果你需要高吞吐量,并且可以在回调中处理发送结果,使用异步发送。

4理解

public class MessageService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendAsync(String topic, String message) {System.out.println("1. 开始异步发送");rocketMQTemplate.asyncSend(topic, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("4. 异步发送成功回调: " + sendResult.getMsgId());}@Overridepublic void onException(Throwable throwable) {System.out.println("4. 异步发送异常回调: " + throwable.getMessage());}});System.out.println("2. 异步发送方法调用完成");}public void demonstrateSendMethods() {String topic = "TestTopic";String message = "Hello, RocketMQ!";System.out.println("开始演示异步发送");sendAsync(topic, message);System.out.println("3. 主方法继续执行");// 为了等待异步操作完成,这里添加一个简单的延迟try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}
}

输出为:

开始演示异步发送
1. 开始异步发送
2. 异步发送方法调用完成在这里插入代码片
3. 主方法继续执行
4. 异步发送成功回调: [消息ID]

方法调用立即返回(步骤 1 和 2)
主线程继续执行(步骤 3)
回调方法(onSuccess 或 onException)在稍后异步执行(步骤 4)

需要注意的是,步骤 4 的执行时间是不确定的。它可能在步骤 3 之前或之后发生,取决于网络条件和服务器响应速度。在实际应用中,你可能需要使用更复杂的机制(如 CompletableFuture 或消息队列)来处理异步结果。

于是引入消息队列

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;public class AsyncMessageHandler {private final RocketMQTemplate rocketMQTemplate;private final BlockingQueue<AsyncResult> resultQueue;private final Thread processThread;private volatile boolean running = true;public AsyncMessageHandler(RocketMQTemplate rocketMQTemplate) {this.rocketMQTemplate = rocketMQTemplate;this.resultQueue = new LinkedBlockingQueue<>();this.processThread = new Thread(this::processResults);this.processThread.start();}public void sendAsyncMessage(String topic, String message) {rocketMQTemplate.asyncSend(topic, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {resultQueue.offer(new AsyncResult(true, sendResult, null));}@Overridepublic void onException(Throwable throwable) {resultQueue.offer(new AsyncResult(false, null, throwable));}});}private void processResults() {while (running) {try {AsyncResult result = resultQueue.poll(1, TimeUnit.SECONDS);if (result != null) {if (result.success) {System.out.println("Message sent successfully: " + result.sendResult.getMsgId());// 处理成功发送的消息} else {System.out.println("Message send failed: " + result.throwable.getMessage());// 处理发送失败的消息,可能进行重试或记录日志}}} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}public void shutdown() {running = false;processThread.interrupt();}private static class AsyncResult {final boolean success;final SendResult sendResult;final Throwable throwable;AsyncResult(boolean success, SendResult sendResult, Throwable throwable) {this.success = success;this.sendResult = sendResult;this.throwable = throwable;}}
}

解析一下:
processResults 方法的执行机制:

1线程启动时机:
processResults 方法在一个单独的线程中运行。这个线程在 AsyncMessageHandler 构造函数中被创建和启动:
javaCopythis.processThread = new Thread(this::processResults);
this.processThread.start();

2持续执行:
processResults 方法包含一个 while 循环,只要 running 变量为 true,它就会持续执行。
执行条件:
在每次循环中,它尝试从队列中获取一个 AsyncResult 对象:

AsyncResult result = resultQueue.poll(1, TimeUnit.SECONDS);

3具体执行情况:
a. 当队列中有结果时:
如果队列中有 AsyncResult 对象,poll 方法会立即返回这个对象。
然后 processResults 会处理这个结果(打印消息ID或错误信息)。
b. 当队列为空时:
poll 方法会等待最多1秒钟。
如果1秒内没有新的结果加入队列,poll 返回 null。
循环继续,再次尝试获取结果。
c. 当线程被中断时:
如果在等待过程中线程被中断(比如调用 shutdown 方法时),会抛出 InterruptedException。
捕获这个异常后,线程会退出循环并结束。

4执行频率:

如果队列中持续有数据,processResults 会不断执行,处理每个结果。
如果队列经常为空,processResults 大部分时间会在等待新的结果。

5停止执行:
当调用 shutdown 方法时:
javaCopypublic void shutdown() {
running = false;
processThread.interrupt();
}
running 被设置为 false,使得 while 循环条件不再满足。
线程被中断,确保即使在 poll 等待中也能及时退出。

总结

processResults 方法在一个独立的线程中持续运行。
它会不断尝试从队列中获取并处理结果。
当有结果时立即处理,没有结果时等待。
这个方法会一直执行,直到 AsyncMessageHandler 被关闭。

这种设计确保了:
异步结果可以被及时处理,不会阻塞消息发送。
系统资源得到有效利用,因为在没有结果时线程会等待而不是持续空转。
结果处理的顺序性得到了保证。

结合应用场景:
后端发送异步消息到rocketmq,客户端作为消费者消费消息,并将结果放到一个消息队列中,后端会循环执行获取队列中的消息,根据从队列中获取到的消息进行更新数据库。

// pom.xml 依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>// application.properties
spring.application.name=rocketmq-demo
rocketmq.name-server=localhost:9876
rocketmq.producer.group=my-producer-group// Message.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {private String id;private String content;
}// ResultMessage.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ResultMessage {private String id;private String result;
}// AsyncMessageHandler.java
@Component
@Slf4j
public class AsyncMessageHandler {private final BlockingQueue<ResultMessage> resultQueue = new LinkedBlockingQueue<>();private final Thread processThread;private volatile boolean running = true;@Autowiredprivate BackendService backendService;public AsyncMessageHandler() {this.processThread = new Thread(this::processResults);this.processThread.start();}public void handleResult(ResultMessage resultMessage) {resultQueue.offer(resultMessage);}private void processResults() {while (running) {try {ResultMessage result = resultQueue.poll(1, TimeUnit.SECONDS);if (result != null) {backendService.updateData(result);}} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}@PreDestroypublic void shutdown() {running = false;processThread.interrupt();}
}// BackendService.java
@Service
@Slf4j
public class BackendService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate AsyncMessageHandler asyncMessageHandler;public void sendAsyncMessage(Message message) {rocketMQTemplate.asyncSend("TestTopic", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("Message sent successfully, msgId: {}", sendResult.getMsgId());}@Overridepublic void onException(Throwable throwable) {log.error("Failed to send message", throwable);}});}public void updateData(ResultMessage resultMessage) {// 这里实现更新数据的逻辑log.info("Updating data with result: {}", resultMessage);// 例如:更新数据库}
}// ClientService.java
@Service
@Slf4j
public class ClientService {@Autowiredprivate AsyncMessageHandler asyncMessageHandler;@RocketMQMessageListener(topic = "TestTopic",consumerGroup = "my-consumer-group")@Componentpublic class MessageConsumer implements RocketMQListener<Message> {@Overridepublic void onMessage(Message message) {log.info("Received message: {}", message);// 处理消息的逻辑String result = processMessage(message);ResultMessage resultMessage = new ResultMessage(message.getId(), result);asyncMessageHandler.handleResult(resultMessage);}}private String processMessage(Message message) {// 这里实现处理消息的逻辑return "Processed: " + message.getContent();}
}// BackendController.java
@RestController
@RequestMapping("/api")
public class BackendController {@Autowiredprivate BackendService backendService;@PostMapping("/send")public ResponseEntity<String> sendMessage(@RequestBody Message message) {backendService.sendAsyncMessage(message);return ResponseEntity.ok("Message sent asynchronously");}
}// Application.java
@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}

这个代码的实现是基于后端与消费者都是用java实现的

进一步处理

被调用的工具不是用的java,而是使用python,就需要使用HTTP接口来实现客户端处理结果的回调。上述代码就需要进行修改,就相当于上述的 AsyncMessageHandler.java和ClientService.java需要在python实现,即客户端消费生产者的消息扔到消息队列中并主动调用后端根据消息更新数据库的操作,那么java这边就需要新增一个接口供python那边调用
java

// pom.xml 依赖 (保持不变)// application.properties
spring.application.name=rocketmq-demo
rocketmq.name-server=localhost:9876
rocketmq.producer.group=my-producer-group// Message.java (保持不变)// ResultMessage.java (保持不变)// BackendService.java
@Service
@Slf4j
public class BackendService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendAsyncMessage(Message message) {rocketMQTemplate.asyncSend("TestTopic", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("Message sent successfully, msgId: {}", sendResult.getMsgId());}@Overridepublic void onException(Throwable throwable) {log.error("Failed to send message", throwable);}});}public void updateData(ResultMessage resultMessage) {// 这里实现更新数据的逻辑log.info("Updating data with result: {}", resultMessage);// 例如:更新数据库}
}// BackendController.java
@RestController
@RequestMapping("/api")
public class BackendController {@Autowiredprivate BackendService backendService;@PostMapping("/send")public ResponseEntity<String> sendMessage(@RequestBody Message message) {backendService.sendAsyncMessage(message);return ResponseEntity.ok("Message sent asynchronously");}@PostMapping("/result")public ResponseEntity<String> receiveResult(@RequestBody ResultMessage resultMessage) {backendService.updateData(resultMessage);return ResponseEntity.ok("Result received and processed");}
}// Application.java (保持不变)

python

import json
from rocketmq.client import PushConsumer
import requestsdef callback(msg):print(f"Received message: {msg.body}")# 解析消息message = json.loads(msg.body)# 处理消息result = process_message(message)# 创建结果消息result_message = {"id": message["id"],"result": result}# 发送结果给Java后端try:response = requests.post("http://localhost:8080/api/result", json=result_message)if response.status_code == 200:print("Result sent successfully")else:print(f"Failed to send result. Status code: {response.status_code}")except requests.RequestException as e:print(f"Error sending result: {e}")def process_message(message):# 这里实现处理消息的逻辑return f"Processed: {message['content']}"if __name__ == "__main__":consumer = PushConsumer("my-consumer-group")consumer.set_name_server_address("localhost:9876")consumer.subscribe("TestTopic", callback)print("Consumer started. Press Ctrl+C to exit.")consumer.start()try:consumer.join()except KeyboardInterrupt:consumer.shutdown()

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • OpenAI 发布 GPT-4o 模型安全评估报告:风险等级为“中等”|TodayAI
  • C++——红黑树(图片+动图详解)
  • TCP Window Full TCP Zero Window
  • 【源码】Sharding-JDBC源码分析之Yaml分片配置文件解析原理
  • 【漏洞修复】Tomcat中间件漏洞
  • 强化学习之REINFORECE策略梯度算法——已CartPole环境为例
  • 高级web安全技术(第一篇)
  • 【ARM】v8架构programmer guide(4)_ARMv8的寄存器
  • Oracle(47)如何创建和使用集合?
  • Leetcode面试经典150题-236.二叉树的最低公共祖先
  • 保研考研机试攻略:第二章——入门经典(2)
  • LVS(Linux virual server)
  • 排序算法——插入排序
  • “华为杯”第十六届中国研究生数学建模竞赛-C题:视觉情报信息分析
  • rust pin_project的使用
  • CSS居中完全指南——构建CSS居中决策树
  • Gradle 5.0 正式版发布
  • Javascript弹出层-初探
  • JavaScript实现分页效果
  • JS正则表达式精简教程(JavaScript RegExp 对象)
  • MySQL QA
  • nodejs实现webservice问题总结
  • Odoo domain写法及运用
  • SAP云平台里Global Account和Sub Account的关系
  • Yeoman_Bower_Grunt
  • 基于 Babel 的 npm 包最小化设置
  • 使用 QuickBI 搭建酷炫可视化分析
  • 视频flv转mp4最快的几种方法(就是不用格式工厂)
  • 一些关于Rust在2019年的思考
  • 用Python写一份独特的元宵节祝福
  • nb
  • 基于django的视频点播网站开发-step3-注册登录功能 ...
  • ​如何在iOS手机上查看应用日志
  • # 深度解析 Socket 与 WebSocket:原理、区别与应用
  • #AngularJS#$sce.trustAsResourceUrl
  • #FPGA(基础知识)
  • $LayoutParams cannot be cast to android.widget.RelativeLayout$LayoutParams
  • (3)选择元素——(17)练习(Exercises)
  • (苍穹外卖)day03菜品管理
  • (二)基于wpr_simulation 的Ros机器人运动控制,gazebo仿真
  • (算法设计与分析)第一章算法概述-习题
  • (学习日记)2024.02.29:UCOSIII第二节
  • (一)、python程序--模拟电脑鼠走迷宫
  • (一)Mocha源码阅读: 项目结构及命令行启动
  • (原創) 人會胖會瘦,都是自我要求的結果 (日記)
  • (转载)CentOS查看系统信息|CentOS查看命令
  • .NET Framework .NET Core与 .NET 的区别
  • .net 怎么循环得到数组里的值_关于js数组
  • .NET/C# 使用 ConditionalWeakTable 附加字段(CLR 版本的附加属性,也可用用来当作弱引用字典 WeakDictionary)
  • .NET上SQLite的连接
  • .NET周刊【7月第4期 2024-07-28】
  • /var/lib/dpkg/lock 锁定问题
  • [20171101]rman to destination.txt
  • [20190416]完善shared latch测试脚本2.txt
  • [8] CUDA之向量点乘和矩阵乘法