当前位置: 首页 > news >正文

延时队列实现实战:如何利用 RabbitMQ 实现延时队列,以满足特定延迟处理需求

实现延时队列,可以通过RabbitMQ的死信队列(Dead-letter queue)特性,“死信队列”是当消息过期,或者队列达到最大长度时,未消费的消息会被加入到死信队列。然后,我们可以对死信队列中的消息进行消费,完成类似“延时”的效果。

下面的示例代码演示了如何在Spring Boot中使用RabbitMQ设置一个订单,然后在15分钟后自动取消。

1. 添加 RabbitMQ 依赖:

在你的pom.xml中加入Spring Boot对RabbitMQ的Starter:

xml

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置队列、交换器、绑定和容器

我们创建一个配置类,定义一个正常的队列和一个死信队列,以及相应的交换机和队列的绑定:

java

@Configuration
public class RabbitMQConfig {/* 正常队列配置 */@Beanpublic Queue orderQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "dead_exchange");  // 队列消息过期后发送的交换器args.put("x-dead-letter-routing-key", "dead"); // 队列消息过期后发送的路由键return new Queue("order_queue", true, false, false, args);}@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order_exchange");}@Beanpublic Binding orderBinding() {return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order");}/* 死信队列配置 */@Beanpublic Queue deadQueue() {return new Queue("dead_queue");}@Beanpublic DirectExchange deadExchange() {return new DirectExchange("dead_exchange");}@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");}
}

3. 发送延时消息

创建订单时,我们发送一个延时消息到队列:

java

@Autowired
private RabbitTemplate rabbitTemplate;public void createOrder(String orderId) {//订单创建业务...rabbitTemplate.convertAndSend("order_exchange", "order", orderId, message -> {message.getMessageProperties().setExpiration(String.valueOf(15 * 60 * 1000)); // 15分钟return message;});
}

4. 消费死信队列中的消息

然后,我们需要消费死信队列中的消息,进行订单取消的操作:

java

@RabbitListener(queues = "dead_queue")
public void processDeadLetter(String orderId) {// 订单取消业务...
}
  1. 在RabbitMQ中设置消息的过期时间
    RabbitMQ允许你在两个级别上设置消息的过期时间:队列级别和消息级别。一旦消息过期,它将从队列中删除。
  • 队列级别:你可以在声明队列时通过"x-message-ttl"参数设置过期时间(以毫秒为单位)。这会影响队列中的所有消息。

java

Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 设置60s过期时间
Channel channel = ...;
channel.queueDeclare("my_queue", false, false, false, args); 
  • 消息级别:你也可以在发布消息时单独为每条消息设置过期时间。如果队列级别的TTL和消息级别的TTL都被设置了,那么较小的那个值会被应用。

java

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration("60000").build(); // 设置60s过期时间
Channel channel = ...;
channel.basicPublish("my_exchange", "my_routing_key", props, messageBodyBytes);

5、RabbitMQ中的死信队列如何工作?

“死信队列”用于接收不可路由的消息,或者由于一些原因不能正确处理的消息。这样能防止原始队列堵塞,也可以进一步处理这些消息。

当以下情况发生时,消息会被投递到死信队列:

  • 消息被拒绝 (basic.reject / basic.nack),并且requeue=false。
  • 在队列中排队的时间过长(超过设置的TTL)。
  • 队列长度溢出(超过设置的最大长度)。

在声明队列时,可以通过"x-dead-letter-exchange"和"x-dead-letter-routing-key"来设置死信交换器和路由键。

 6、如何确保消费者在消费消息时不会发生重复消费的情况?

确保消费者在消费消息时不发生重复消费,一般可以通过以下方式实现:

  • 消息确认机制:RabbitMQ提供了消息确认机制ACK,消费者处理完消息后,返回一个ACK信号,告诉RabbitMQ可以将该消息从队列中删除。如果在处理消息的过程中消费者出现异常(如断开连接),RabbitMQ将不会删除队列中的消息,并且会再次尝试发送该消息给消费者。
  • 幂等操作:在消费者端,确保消息处理逻辑是幂等的,也就是说,无论消息被处理一次还是多次,结果都是相同的。例如,对于一个扣款操作,无论这个操作执行多少次,账户的余额都应该只被扣除一次。
  • 分布式锁:在处理消息之前,消费者首先尝试获取一个分布式锁。只有获取到锁的消费者才能处理消息。处理完毕后释放锁。避免了同一消息被多个消费者处理的情况。

7、设置RabbitMQ中消息的优先级
在RabbitMQ中,对消息的优先级的支持是通过队列来实现的。在声明队列的时候,可以通过x-max-priority参数来指定队列支持的最大优先级。然后在发布消息的时候,可以通过basicPropertiespriority字段来指定消息的优先级。如下:

java

// 声明队列时设置最大优先级
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("my_queue", false, false, false, args);// 发布消息时设置优先级
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("", "my_queue", props, "Hello world".getBytes());

注意,优先级较高的消息将会优先被消费,但是并不保证完全按照优先级顺序消费。

8、RabbitMQ中如何处理消费者异常断开连接的情况?
当RabbitMQ检测到消费者(如一个TCP连接)异常断开,例如因为消费者主机崩溃或因为网络问题,它将关闭该消费者的连接,并将消费者未确认的任何消息重新放入队列。如果你希望一个消息在消费者断开连接时不被再次放入队列,你可以设置该消费者的autoAck参数为true(也就是无需显示确认)。但一般情况下,我们推荐消费者在正确处理消息后发送一个确认应答(basicAck)。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 从人机界面设计黄金三法则视角看 ChatGPT 的界面设计的“好”与“坏”
  • 提高网站安全性,漏洞扫描能带来什么帮助
  • 如何在Bash中连接字符串变量
  • Golang中的上下文-context包的简介及使用
  • nodejs应用程序以守护进程daemon的方式启动,容器化部署的时候一直部署出错,导致无法成功启动程序。
  • redis的简单操作
  • 【第十二篇】使用BurpSuite实现CSRF(实战案例)
  • docker compose部署项目—踩坑记录
  • vue2 利用网络代理axios实现开发环境前端跨域
  • 文字识别 Optical Character Recognition,OCR CTC STN
  • 【C/C++】C语言实现单链表
  • HarmonyOS实战开发DLP-如何实现一个安全类App。
  • 无参数绕过RCE
  • 掌握JWT安全
  • Git 术语及中英文对照
  • 深入了解以太坊
  • 77. Combinations
  • CAP 一致性协议及应用解析
  • JavaScript DOM 10 - 滚动
  • java取消线程实例
  • mysql常用命令汇总
  • OSS Web直传 (文件图片)
  • python学习笔记 - ThreadLocal
  • Transformer-XL: Unleashing the Potential of Attention Models
  • Vue ES6 Jade Scss Webpack Gulp
  • 动态魔术使用DBMS_SQL
  • 每天10道Java面试题,跟我走,offer有!
  • 扑朔迷离的属性和特性【彻底弄清】
  • 区块链将重新定义世界
  • 如何实现 font-size 的响应式
  • 微信小程序上拉加载:onReachBottom详解+设置触发距离
  • 学习笔记:对象,原型和继承(1)
  • 远离DoS攻击 Windows Server 2016发布DNS政策
  • ​​​​​​​​​​​​​​汽车网络信息安全分析方法论
  • ​数据结构之初始二叉树(3)
  • ​探讨元宇宙和VR虚拟现实之间的区别​
  • ######## golang各章节终篇索引 ########
  • #if和#ifdef区别
  • #常见电池型号介绍 常见电池尺寸是多少【详解】
  • $(selector).each()和$.each()的区别
  • $分析了六十多年间100万字的政府工作报告,我看到了这样的变迁
  • (09)Hive——CTE 公共表达式
  • (173)FPGA约束:单周期时序分析或默认时序分析
  • (delphi11最新学习资料) Object Pascal 学习笔记---第8章第5节(封闭类和Final方法)
  • (附源码)ssm航空客运订票系统 毕业设计 141612
  • (附源码)ssm经济信息门户网站 毕业设计 141634
  • (经验分享)作为一名普通本科计算机专业学生,我大学四年到底走了多少弯路
  • (论文阅读32/100)Flowing convnets for human pose estimation in videos
  • (一)插入排序
  • (转载)虚幻引擎3--【UnrealScript教程】章节一:20.location和rotation
  • **Java有哪些悲观锁的实现_乐观锁、悲观锁、Redis分布式锁和Zookeeper分布式锁的实现以及流程原理...
  • *算法训练(leetcode)第四十五天 | 101. 孤岛的总面积、102. 沉没孤岛、103. 水流问题、104. 建造最大岛屿
  • .Net Core 中间件验签
  • .Net Core/.Net6/.Net8 ,启动配置/Program.cs 配置
  • .NET使用存储过程实现对数据库的增删改查