当前位置: 首页 > news >正文

【业务场景实战】我等你10秒

今天是个特殊的日子啊。9月1日开学的日子,其实我30号就到学校了,刚来一个新的学校还挺不适应的,这几天都摆烂了/(ㄒoㄒ)/~~。明天就要正式上课了,这新学校课还挺多。

不扯了,下面进入今天的正题!今天讲讲MQ中的延时队列

一、使用场景

延时队列在我们日常生活中也是比较常见的

比如说:下单,在你点击下单前,如果没有及时付款,订单会为你保留10分钟,十分钟之后如果还没付款,订单就会消失了。比较常见的就是淘宝、订火车票这种场景。

还有预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。那这种如何用延迟队列来实现呢?下面来讲讲

二、延迟实现

RabbitMQ 本身是没有延迟队列的,要实现延迟消息,一般有两种方式:

  1. 通过 RabbitMQ 本身队列的特性来实现,需要使用 RabbitMQ 的死信交换机(Exchange)和消息的存活时间 TTL(Time To Live)。

  2. 在 RabbitMQ 3.5.7 及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时,插件依赖 Erlang/OPT 18.0 及以上。

也就是说,AMQP 协议以及 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 TTL 和 DLX 模拟出延迟队列的功能。

安装一个插件即可:https://www.rabbitmq.com/community-plugins.html ,下载rabbitmq_delayed_message_exchange插件,然后解压放置到RabbitMQ的插件目录及:plugins文件下。

我这里安装的RabbitMQ是3.6.5版本,属于比较老的版本了。我找了很久适合我这个版本的延迟插件,结果最后还是失败了,它没有ez文件,只能自己去压缩,试了很久,就是不行,所以这里只能使用第二种方式来实现延迟队列了。

1、实现思路

延迟队列,实际就是让消息在队列中停留一段时间之后再做处理,延迟插件可以直接设置消息的延迟时间。但现在我只能使用死信队列加消息过期来实现这个效果。

我给消息设置一个过期时间TTL,然后设置签收方式为手动,这样消息就会一直留在队列中不被处理,直到过期,被死信队列处理,间接的达到一个延迟的效果。

既然设置消息延迟,那这个延迟时间最好还是手动设置更为灵活,需要延迟多长时间,可以通过参数传递的方式进行设置

2、代码实现

1)引入依赖

<!--springboot整合rabbitmq依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.7.2</version></dependency>

2)导入配置

rabbitmq:host: ENC(UJr2n8We05y2ijsbZy51aG7YOT+CmD5An9N90lSaauWruF+FKEz8RqDsrzxWE4fs)port: 5672username: guestpassword: guest

3)mq配置类

创建延迟队列和延迟交换机

创建死信队列和死信交换机

@Component
public class RabbitmqConfig {//创建延迟交换器@Bean("delayExchange")public DirectExchange delayExchange(){return new DirectExchange(DELAY_EXCHANGE_NAME);}//创建延迟队列A@Bean("delayQueueA")public Queue delayQueueA(){Map<String,Object> args=new HashMap<>();args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_A_ROUTING_KEY);return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(args).build();}//创建死信队列A@Bean("deadLetterQueueA")public Queue deadLetterQueueA() {return new Queue(DEAD_LETTER_QUEUE_A_NAME);}//创建死信交换器@Bean("deadExchange")public DirectExchange deadLetterExchange(){return new DirectExchange(DEAD_LETTER_EXCHANGE);}//延迟队列A绑定交换器@Beanpublic Binding delayQueueABinding(@Qualifier("delayQueueA") Queue queue,@Qualifier("delayExchange")DirectExchange delayExchange){return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_A_NAME);}//死信队列A绑定交换器@Beanpublic Binding deadLetterQueueABinding(@Qualifier("deadLetterQueueA") Queue queue,@Qualifier("deadExchange")DirectExchange deadExchange){return BindingBuilder.bind(queue).to(deadExchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);}
}

4)定义常量接口

public interface MQContant {String  DELAY_EXCHANGE_NAME="delay_exchange";String  DEAD_LETTER_EXCHANGE="deadletter_exchange";String  DEAD_LETTER_QUEUE_A_NAME="deadqueueA";String  DEAD_LETTER_QUEUE_A_ROUTING_KEY="moon";String  DELAY_QUEUE_ROUTING_A_NAME="yupi";String  DELAY_QUEUE_A_NAME="delayqueueA";
}

4)延迟消息生产者

需要思考一下,对于延迟队列,我们需要实现一个什么效果

发送消息,延迟一段时间之后输出

首先发送消息,然后为了灵活的设置延迟,消息过期时间我们需要手动进行定义

这样才能达到我们需要的效果

使用setExpiration方法将消息过期时间进行手动设置,如果引入的延迟插件则可以使用setDelayed方法直接设置消息延迟时间

延迟时间单位为毫秒ms

@Service
public class DelayedMessageProducer {@Resourceprivate AmqpTemplate amqpTemplate;public void sendDelayedMessage(String message, String delayTime) {amqpTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_A_NAME, message, messagePostProcessor -> {//设置消息的过期时间,单位是msmessagePostProcessor.getMessageProperties().setExpiration(delayTime);return messagePostProcessor;});}
}

5)延迟队列消费者

使用该注解指定程序要监听的队列,,并设置消息的确认机制为手动

@RabbitListener(queues= DEAD_LETTER_QUEUE_A_NAME,ackMode="MANUAL")

在mq中,每条消息都会被分配一个唯一投递标签,用于标识该消息在通道中的投递状态和顺序,使用该注解可以从消息头中获取该投递标签,并将其赋值给deliveryTag参数

前面我设置了消息过期时间,它会作为一个标识被放在头部投递标签中

@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag

表示消费者确认了具有指定deliveryTag的消息,并且不进行批量确认,只确认指定的deliveryTag对应的消息。

channel.basicAck(deliveryTag,false);
@Component@Slf4jpublic class QueueConsumer {/*** 死信队列消息处理* @param message* @param channel* @throws IOException*///使用该注解指定程序要监听的队列,,并设置消息的确认机制为手动@RabbitListener(queues= DEAD_LETTER_QUEUE_A_NAME,ackMode="MANUAL")//在mq中,每条消息都会被分配一个唯一投递标签,用于标识该消息在通道中的投递状态和顺序,使用该注解可以从消息头中获取该投递标签,并将其赋值给deliveryTag参数,public void receiveA(Message message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException{String msg = new String(message.getBody());log.info("当前时间:{},死信队列A收到消息:{}", new Date(), msg);//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);channel.basicAck(deliveryTag,false);}}

6)发送请求

@Resourceprivate DelayedMessageProducer delayedMessageProducer;​/*** RabbitMQ延迟队列模拟发送消息* @param msg* @param delayTimes*/@GetMapping("/send")public void sendMessage(String msg, String delayTimes){delayedMessageProducer.sendDelayedMessage(msg,delayTimes);}

这里我模拟发送两次,一次延迟时间为10s,一次为20s

结果:

从结果可以看到,我们的消息都延迟了一段指定的时间后才被处理,实现完成!

喜欢的兄弟,欢迎点赞、收藏、关注,我们下期再见!

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • [Leetcode 51][Hard]-n皇后问题-回溯
  • BeanFactory 和 FactoryBean 的区别
  • 基于yolov10的PCB检测算法研究
  • Leetcode Day18 堆
  • EventBus搭配LifeCycle可能更美味
  • 大模型笔记01--基于ollama和open-webui快速部署chatgpt
  • 51单片机-定时器介绍
  • 模型 冯/诺依曼思维模型
  • 《PCI Express体系结构导读》随记 —— 第II篇 第7章 PCIe总线的数据链路层与物理层(2)
  • 【Java开发】Maven安装配置详细教程
  • python模块06 mock-1基础用法
  • JavaWeb:实验一JSP运行环境安装及配置
  • 5.Redis 集群 主从复制 哨兵
  • Mybatis 是如何进行分页的?分页插件的原理是什么?
  • java构建工具-maven的复习笔记【适用于复习或者初步了解】
  • IDEA 插件开发入门教程
  • iOS帅气加载动画、通知视图、红包助手、引导页、导航栏、朋友圈、小游戏等效果源码...
  • MySQL常见的两种存储引擎:MyISAM与InnoDB的爱恨情仇
  • MySQL几个简单SQL的优化
  • MySQL主从复制读写分离及奇怪的问题
  • pdf文件如何在线转换为jpg图片
  • ubuntu 下nginx安装 并支持https协议
  • Vultr 教程目录
  • 湖南卫视:中国白领因网络偷菜成当代最寂寞的人?
  • 基于Android乐音识别(2)
  • 计算机在识别图像时“看到”了什么?
  • 记一次删除Git记录中的大文件的过程
  • 前端攻城师
  • 它承受着该等级不该有的简单, leetcode 564 寻找最近的回文数
  • 学习HTTP相关知识笔记
  • 在Unity中实现一个简单的消息管理器
  • 3月7日云栖精选夜读 | RSA 2019安全大会:企业资产管理成行业新风向标,云上安全占绝对优势 ...
  • 阿里云服务器购买完整流程
  • ​一、什么是射频识别?二、射频识别系统组成及工作原理三、射频识别系统分类四、RFID与物联网​
  • # 睡眠3秒_床上这样睡觉的人,睡眠质量多半不好
  • #我与Java虚拟机的故事#连载02:“小蓝”陪伴的日日夜夜
  • $.ajax中的eval及dataType
  • (20)docke容器
  • (9)目标检测_SSD的原理
  • (libusb) usb口自动刷新
  • (Pytorch框架)神经网络输出维度调试,做出我们自己的网络来!!(详细教程~)
  • (多级缓存)多级缓存
  • (十三)Flask之特殊装饰器详解
  • (新)网络工程师考点串讲与真题详解
  • (一)Docker基本介绍
  • (最全解法)输入一个整数,输出该数二进制表示中1的个数。
  • .gitignore文件忽略的内容不生效问题解决
  • .NET的数据绑定
  • .NET面试题解析(11)-SQL语言基础及数据库基本原理
  • 。Net下Windows服务程序开发疑惑
  • @Conditional注解详解
  • [ 渗透工具篇 ] 一篇文章让你掌握神奇的shuize -- 信息收集自动化工具
  • []常用AT命令解释()
  • [2019红帽杯]Snake
  • [AIGC] 广度优先搜索(Breadth-First Search,BFS)详解