当前位置: 首页 > news >正文

使用RabbitMQ死信交换机实现延迟消息

文章目录

    • 什么是死信交换机?
    • 死信交换机实现延迟消息的思路
    • 实现过程
      • 配置类
      • 消费者监听死信队列
      • 发送延迟消息
    • 注意事项
    • 总结

在开发过程中,我们常常会遇到需要延迟处理某些消息的场景,例如订单的支付超时处理、短信的定时发送等。本文将介绍如何使用RabbitMQ的死信交换机(Dead Letter Exchange,DLX)来实现延迟消息的处理。

什么是死信交换机?

死信交换机是一种特殊的交换机,用于处理不能被正常消费的消息。当消息在队列中出现以下几种情况时,会被转发到死信交换机:

  1. 消息被拒绝(Basic.Reject或Basic.Nack)并且requeue参数设置为false。
  2. 消息在队列中的存活时间超过了TTL(Time To Live)。
  3. 队列的最大长度已满,导致消息被丢弃。

通过配置死信交换机,我们可以将这些“死信”转发到一个特殊的队列,从而进行后续处理。

死信交换机实现延迟消息的思路

利用消息的TTL属性和死信交换机,我们可以实现延迟消息的处理。具体步骤如下:

  1. 创建一个普通交换机和队列,队列绑定一个死信交换机。
  2. 发送消息到普通队列,并设置消息的TTL。
  3. 消息过期后,会转发到死信交换机,死信交换机再将消息路由到实际处理的队列中。

实现过程

配置类

首先,我们需要配置普通交换机、队列和绑定关系:

package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class NormalConfiguration {@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange("normal.direct").build();}@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal.queue").deadLetterExchange("dlx.direct").build();}@Beanpublic Binding normalExchangeBinding(Queue normalQueue, DirectExchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with("hi");}
}

在上述配置中,我们创建了一个普通交换机normal.direct,以及一个普通队列normal.queue,并将队列绑定到了死信交换机dlx.direct

消费者监听死信队列

接下来,我们需要定义一个消费者来监听死信队列,并处理延迟后的消息:

import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.core.ExchangeTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;@Component
public class DlxMessageListener {private static final Logger log = LoggerFactory.getLogger(DlxMessageListener.class);@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dlx.queue", durable = "true"),exchange = @Exchange(name = "dlx.direct", type = ExchangeTypes.DIRECT),key = {"hi"}))public void listenDlxQueue1(String message) {log.info("消费者监听到dlx.queue消息:{}", message);}
}

发送延迟消息

最后,我们编写一个测试方法来发送延迟消息:

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class DelayMessageTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendDelayMessage() {rabbitTemplate.convertAndSend("normal.direct", "hi", "hello", message -> {message.getMessageProperties().setExpiration("10000");return message;});}
}

在上述测试方法中,我们向普通队列发送了一条消息,并设置其TTL为10000毫秒(即10秒)。当消息在普通队列中存活超过10秒后,会被转发到死信交换机,然后由消费者监听并处理。

注意事项

需要注意的是,RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后,不一定会立即被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。

总结

通过以上配置和代码,我们实现了使用RabbitMQ死信交换机来处理延迟消息。利用死信交换机的机制,我们可以方便地实现各种复杂的消息处理场景,提高系统的灵活性和可靠性。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • MySQL Galera Cluster 部署与介绍
  • 天津教育杂志天津教育杂志社天津教育编辑部2024年第24期目录
  • 【C++】函数的调用
  • 【RISC-V设计-05】- RISC-V处理器设计K0A之GPR
  • 【论文阅读】MobileNetV4 - Universal Models for the Mobile Ecosystem
  • 【原创】java+swing+mysql学生管理系统设计与实现
  • CentOS linux安装nginx
  • 【区块链+社会公益】长安大学城梦想小镇居民公益积分项目 | FISCO BCOS应用案例
  • Docker进阶:Docker容器图形化显示(Ubuntu22系统下运行带图形界面的 Ubuntu20 Docker容器)
  • 【微信小程序开发】——奶茶点餐小程序的制作(一)
  • 口碑最好的洗地机排名?洗地机十大排名公开揭晓!
  • MySQL多表查询练习(53题)
  • 网络协议四 物理层,数据链路层
  • 有序转化数组(LeetCode)
  • 项目比赛项目负责人的识人与用人之道
  • 【翻译】babel对TC39装饰器草案的实现
  • 4个实用的微服务测试策略
  • Facebook AccountKit 接入的坑点
  • iOS编译提示和导航提示
  • PHP 程序员也能做的 Java 开发 30分钟使用 netty 轻松打造一个高性能 websocket 服务...
  • PHP 使用 Swoole - TaskWorker 实现异步操作 Mysql
  • React的组件模式
  • Spark VS Hadoop:两大大数据分析系统深度解读
  • 初识 beanstalkd
  • 从0搭建SpringBoot的HelloWorld -- Java版本
  • 从零搭建Koa2 Server
  • 等保2.0 | 几维安全发布等保检测、等保加固专版 加速企业等保合规
  • 和 || 运算
  • 基于阿里云移动推送的移动应用推送模式最佳实践
  • 排序算法之--选择排序
  • 如何选择开源的机器学习框架?
  • 微信小程序:实现悬浮返回和分享按钮
  • 小程序、APP Store 需要的 SSL 证书是个什么东西?
  • 小程序滚动组件,左边导航栏与右边内容联动效果实现
  • 新版博客前端前瞻
  • 直播平台建设千万不要忘记流媒体服务器的存在 ...
  • ​MPV,汽车产品里一个特殊品类的进化过程
  • #if和#ifdef区别
  • #Linux(Source Insight安装及工程建立)
  • #我与Java虚拟机的故事#连载05:Java虚拟机的修炼之道
  • (06)金属布线——为半导体注入生命的连接
  • (152)时序收敛--->(02)时序收敛二
  • (7)svelte 教程: Props(属性)
  • (9)STL算法之逆转旋转
  • (SERIES12)DM性能优化
  • (板子)A* astar算法,AcWing第k短路+八数码 带注释
  • (编程语言界的丐帮 C#).NET MD5 HASH 哈希 加密 与JAVA 互通
  • (不用互三)AI绘画工具应该如何选择
  • (二)七种元启发算法(DBO、LO、SWO、COA、LSO、KOA、GRO)求解无人机路径规划MATLAB
  • (四)TensorRT | 基于 GPU 端的 Python 推理
  • (学习日记)2024.03.12:UCOSIII第十四节:时基列表
  • (转)Windows2003安全设置/维护
  • .chm格式文件如何阅读
  • .halo勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复
  • .Mobi域名介绍