当前位置: 首页 > news >正文

Spring Boot 项目中整合 RabbitMQ,使用死信队列(Dead Letter Exchange, DLX)实现延迟队列功能

在 Spring Boot 项目中整合 RabbitMQ 并使用死信队列(Dead Letter Exchange, DLX)实现延迟队列功能,可以通过配置 RabbitMQ 的交换机、队列和绑定关系来实现。以下是一个完整的示例,展示如何在 Spring Boot 项目中使用死信队列实现延迟队列。

1. 添加依赖

在 pom.xml 文件中添加 RabbitMQ 的依赖:

xml

复制

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

运行 HTML

2. 配置 RabbitMQ

在 application.yml 文件中配置 RabbitMQ 的连接信息:

yaml

复制

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest

3. 创建 RabbitMQ 配置类

创建一个配置类,用于定义交换机、队列和绑定关系:

java

复制

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMQConfig {public static final String DELAY_EXCHANGE_NAME = "delay.exchange";public static final String DELAY_QUEUE_NAME = "delay.queue";public static final String DLX_EXCHANGE_NAME = "dlx.exchange";public static final String DLX_QUEUE_NAME = "dlx.queue";// 创建死信交换机@Beanpublic DirectExchange dlxExchange() {return new DirectExchange(DLX_EXCHANGE_NAME);}// 创建死信队列@Beanpublic Queue dlxQueue() {return new Queue(DLX_QUEUE_NAME);}// 绑定死信队列到死信交换机@Beanpublic Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("");}// 创建延迟交换机@Beanpublic DirectExchange delayExchange() {return new DirectExchange(DELAY_EXCHANGE_NAME);}// 创建延迟队列,并设置死信交换机@Beanpublic Queue delayQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);return new Queue(DELAY_QUEUE_NAME, false, false, false, args);}// 绑定延迟队列到延迟交换机@Beanpublic Binding delayBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("");}
}

4. 发送延迟消息

创建一个消息发送服务,用于发送延迟消息:

java

复制

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class RabbitMQSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void sendDelayMessage(String message, long delayTime) {rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, "", message, msg -> {msg.getMessageProperties().setExpiration(String.valueOf(delayTime));return msg;});System.out.println(" [x] Sent '" + message + "' with delay " + delayTime + " ms");}
}

5. 消费延迟消息

创建一个消息消费者,用于消费延迟消息:

java

复制

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class RabbitMQReceiver {@RabbitListener(queues = RabbitMQConfig.DLX_QUEUE_NAME)public void receiveDelayMessage(String message) {System.out.println(" [x] Received '" + message + "'");}
}

6. 测试延迟队列

创建一个测试类,用于测试延迟队列的功能:

java

复制

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;@Component
public class RabbitMQTest implements CommandLineRunner {@Autowiredprivate RabbitMQSender rabbitMQSender;@Overridepublic void run(String... args) throws Exception {String message = "Hello, RabbitMQ Delay Queue!";long delayTime = 5000; // 延迟 5 秒rabbitMQSender.sendDelayMessage(message, delayTime);}
}

7. 运行项目

启动 Spring Boot 项目,观察控制台输出,可以看到消息在延迟 5 秒后被消费。

总结

  • 配置 RabbitMQ:在 application.yml 文件中配置 RabbitMQ 的连接信息。

  • 创建 RabbitMQ 配置类:定义交换机、队列和绑定关系,设置死信交换机。

  • 发送延迟消息:通过 AmqpTemplate 发送带有 TTL 的消息。

  • 消费延迟消息:通过 @RabbitListener 注解消费死信队列中的消息。

通过这种方式,可以在 Spring Boot 项目中使用死信队列实现延迟队列功能。需要注意的是,消息的 TTL 是以毫秒为单位的,且消息的延迟时间不能超过 RabbitMQ 的最大消息大小限制。

上面代码没看懂的,看这里,以下是详细的流程:

  1. 发送延迟消息

    • 生产者将消息发送到延迟交换机(delay.exchange),并设置消息的 TTL。

    • 消息被路由到延迟队列(delay.queue)。

  2. 延迟队列处理

    • 延迟队列没有消费者,消息会在队列中等待,直到 TTL 到期。

    • TTL 到期后,消息被转发到死信交换机(dlx.exchange)。

  3. 死信队列处理

    • 死信交换机将消息路由到死信队列(dlx.queue)。

    • 死信队列的消费者消费这些消息。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 【前端】ES6:Set与Map
  • PyCharm与Anaconda超详细安装配置教程
  • 【贪心算法】贪心算法一
  • nginx配置案例,文件服务器配置,浏览某个目录下所有文件,中文乱码,try_files解释,root和alias区别
  • 数据结构-3.1.栈的基本概念
  • 一文说清楚ETL与Kafka如何实现集成
  • SalescustomerController
  • 分享一款idea插件
  • day-56 字符串转换整数 (atoi)
  • Matplotlib在运维开发中的应用
  • Vue组件:模板引用ref属性的使用
  • 【python】【绘制小程序】动态爱心绘制
  • 如何利用 Visual Studio 和 AI 工具实现高效编程
  • SQLPlus执行成功但数据没有更新的原因及解决办法
  • CTFHUB 技能树 信息泄露 HG泄露 解密过程记录
  • [分享]iOS开发-关于在xcode中引用文件夹右边出现问号的解决办法
  • C# 免费离线人脸识别 2.0 Demo
  • Java知识点总结(JavaIO-打印流)
  • jQuery(一)
  • Linux CTF 逆向入门
  • Median of Two Sorted Arrays
  • React-flux杂记
  • sublime配置文件
  • 程序员该如何有效的找工作?
  • 回流、重绘及其优化
  • 猫头鹰的深夜翻译:JDK9 NotNullOrElse方法
  • 前端攻城师
  • 深度解析利用ES6进行Promise封装总结
  • JavaScript 新语法详解:Class 的私有属性与私有方法 ...
  • zabbix3.2监控linux磁盘IO
  • 关于Kubernetes Dashboard漏洞CVE-2018-18264的修复公告
  • ​Linux·i2c驱动架构​
  • ​软考-高级-系统架构设计师教程(清华第2版)【第20章 系统架构设计师论文写作要点(P717~728)-思维导图】​
  • #define
  • $var=htmlencode(“‘);alert(‘2“); 的个人理解
  • (2022 CVPR) Unbiased Teacher v2
  • (70min)字节暑假实习二面(已挂)
  • (C#)Windows Shell 外壳编程系列9 - QueryInfo 扩展提示
  • (javaweb)Http协议
  • (STM32笔记)九、RCC时钟树与时钟 第二部分
  • (完整代码)R语言中利用SVM-RFE机器学习算法筛选关键因子
  • .NET 4 并行(多核)“.NET研究”编程系列之二 从Task开始
  • .NET CLR Hosting 简介
  • .Net IOC框架入门之一 Unity
  • .net 流——流的类型体系简单介绍
  • .net 使用ajax控件后如何调用前端脚本
  • .NET 依赖注入和配置系统
  • .NET(C#、VB)APP开发——Smobiler平台控件介绍:Bluetooth组件
  • .NET下的多线程编程—1-线程机制概述
  • ??Nginx实现会话保持_Nginx会话保持与Redis的结合_Nginx实现四层负载均衡
  • @autowired注解作用_Spring Boot进阶教程——注解大全(建议收藏!)
  • @property @synthesize @dynamic 及相关属性作用探究
  • []指针
  • [2544]最短路 (两种算法)(HDU)
  • [Android 13]Input系列--获取触摸窗口