【业务场景实战】我等你10秒
今天是个特殊的日子啊。9月1日开学的日子,其实我30号就到学校了,刚来一个新的学校还挺不适应的,这几天都摆烂了/(ㄒoㄒ)/~~。明天就要正式上课了,这新学校课还挺多。
不扯了,下面进入今天的正题!今天讲讲MQ中的延时队列
一、使用场景
延时队列在我们日常生活中也是比较常见的
比如说:下单,在你点击下单前,如果没有及时付款,订单会为你保留10分钟,十分钟之后如果还没付款,订单就会消失了。比较常见的就是淘宝、订火车票这种场景。
还有预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。那这种如何用延迟队列来实现呢?下面来讲讲
二、延迟实现
RabbitMQ 本身是没有延迟队列的,要实现延迟消息,一般有两种方式:
-
通过 RabbitMQ 本身队列的特性来实现,需要使用 RabbitMQ 的死信交换机(Exchange)和消息的存活时间 TTL(Time To Live)。
-
在 RabbitMQ 3.5.7 及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时,插件依赖 Erlang/OPT 18.0 及以上。
也就是说,AMQP 协议以及 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 TTL 和 DLX 模拟出延迟队列的功能。
安装一个插件即可:
https://www.rabbitmq.com/community-plugins.html
,下载rabbitmq_delayed_message_exchange
插件,然后解压放置到RabbitMQ的插件目录
及:plugins
文件下。我这里安装的RabbitMQ是3.6.5版本,属于比较老的版本了。我找了很久适合我这个版本的延迟插件,结果最后还是失败了,它没有ez文件,只能自己去压缩,试了很久,就是不行,所以这里只能使用第二种方式来实现延迟队列了。
1、实现思路
延迟队列,实际就是让消息在队列中停留一段时间之后再做处理,延迟插件可以直接设置消息的延迟时间。但现在我只能使用死信队列加消息过期来实现这个效果。
我给消息设置一个过期时间TTL,然后设置签收方式为手动,这样消息就会一直留在队列中不被处理,直到过期,被死信队列处理,间接的达到一个延迟的效果。
既然设置消息延迟,那这个延迟时间最好还是手动设置更为灵活,需要延迟多长时间,可以通过参数传递的方式进行设置
2、代码实现
1)引入依赖
<!--springboot整合rabbitmq依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.7.2</version></dependency>
2)导入配置
rabbitmq:host: ENC(UJr2n8We05y2ijsbZy51aG7YOT+CmD5An9N90lSaauWruF+FKEz8RqDsrzxWE4fs)port: 5672username: guestpassword: guest
3)mq配置类
创建延迟队列和延迟交换机
创建死信队列和死信交换机
@Component
public class RabbitmqConfig {//创建延迟交换器@Bean("delayExchange")public DirectExchange delayExchange(){return new DirectExchange(DELAY_EXCHANGE_NAME);}//创建延迟队列A@Bean("delayQueueA")public Queue delayQueueA(){Map<String,Object> args=new HashMap<>();args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_A_ROUTING_KEY);return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(args).build();}//创建死信队列A@Bean("deadLetterQueueA")public Queue deadLetterQueueA() {return new Queue(DEAD_LETTER_QUEUE_A_NAME);}//创建死信交换器@Bean("deadExchange")public DirectExchange deadLetterExchange(){return new DirectExchange(DEAD_LETTER_EXCHANGE);}//延迟队列A绑定交换器@Beanpublic Binding delayQueueABinding(@Qualifier("delayQueueA") Queue queue,@Qualifier("delayExchange")DirectExchange delayExchange){return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_A_NAME);}//死信队列A绑定交换器@Beanpublic Binding deadLetterQueueABinding(@Qualifier("deadLetterQueueA") Queue queue,@Qualifier("deadExchange")DirectExchange deadExchange){return BindingBuilder.bind(queue).to(deadExchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);}
}
4)定义常量接口
public interface MQContant {String DELAY_EXCHANGE_NAME="delay_exchange";String DEAD_LETTER_EXCHANGE="deadletter_exchange";String DEAD_LETTER_QUEUE_A_NAME="deadqueueA";String DEAD_LETTER_QUEUE_A_ROUTING_KEY="moon";String DELAY_QUEUE_ROUTING_A_NAME="yupi";String DELAY_QUEUE_A_NAME="delayqueueA";
}
4)延迟消息生产者
需要思考一下,对于延迟队列,我们需要实现一个什么效果
发送消息,延迟一段时间之后输出
首先发送消息,然后为了灵活的设置延迟,消息过期时间我们需要手动进行定义
这样才能达到我们需要的效果
使用setExpiration
方法将消息过期时间进行手动设置,如果引入的延迟插件则可以使用setDelayed
方法直接设置消息延迟时间
延迟时间单位为毫秒ms
@Service
public class DelayedMessageProducer {@Resourceprivate AmqpTemplate amqpTemplate;public void sendDelayedMessage(String message, String delayTime) {amqpTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_A_NAME, message, messagePostProcessor -> {//设置消息的过期时间,单位是msmessagePostProcessor.getMessageProperties().setExpiration(delayTime);return messagePostProcessor;});}
}
5)延迟队列消费者
使用该注解指定程序要监听的队列,,并设置消息的确认机制为手动
@RabbitListener(queues= DEAD_LETTER_QUEUE_A_NAME,ackMode="MANUAL")
在mq中,每条消息都会被分配一个唯一投递标签,用于标识该消息在通道中的投递状态和顺序,使用该注解可以从消息头中获取该投递标签,并将其赋值给deliveryTag参数
前面我设置了消息过期时间,它会作为一个标识被放在头部投递标签中
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag
表示消费者确认了具有指定deliveryTag的消息,并且不进行批量确认,只确认指定的deliveryTag对应的消息。
channel.basicAck(deliveryTag,false);
@Component@Slf4jpublic class QueueConsumer {/*** 死信队列消息处理* @param message* @param channel* @throws IOException*///使用该注解指定程序要监听的队列,,并设置消息的确认机制为手动@RabbitListener(queues= DEAD_LETTER_QUEUE_A_NAME,ackMode="MANUAL")//在mq中,每条消息都会被分配一个唯一投递标签,用于标识该消息在通道中的投递状态和顺序,使用该注解可以从消息头中获取该投递标签,并将其赋值给deliveryTag参数,public void receiveA(Message message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException{String msg = new String(message.getBody());log.info("当前时间:{},死信队列A收到消息:{}", new Date(), msg);//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);channel.basicAck(deliveryTag,false);}}
6)发送请求
@Resourceprivate DelayedMessageProducer delayedMessageProducer;/*** RabbitMQ延迟队列模拟发送消息* @param msg* @param delayTimes*/@GetMapping("/send")public void sendMessage(String msg, String delayTimes){delayedMessageProducer.sendDelayedMessage(msg,delayTimes);}
这里我模拟发送两次,一次延迟时间为10s,一次为20s
结果:
从结果可以看到,我们的消息都延迟了一段指定的时间后才被处理,实现完成!
喜欢的兄弟,欢迎点赞、收藏、关注,我们下期再见!