当前位置: 首页 > news >正文

19.延迟队列优化

问题

前面所讲的延迟队列有一个不足之处,比如现在有一个需求需要延迟半个小时的消息,那么就只有添加一个新的队列。那就意味着,每新增一个不同时间需求,就会新创建一个队列。

解决方案

应该讲消息的时间不要跟队列绑定,应该交给消息的生产者,由发送消息来指定延迟时间。这样就可以定义个通用的队列。

方案图

代码

配置类

package com.xkj.org.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** TTL队列,配置文件代码*/
@Configuration
public class TtlQueueConfig {//普通交换机public static final String X_EXCHANGE = "X";//死信交换机public static final String Y_HEAD_LETTER_EXCHANGE = "Y";//普通队列public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String QUEUE_C = "QC";//死信队列public static final String DEAD_LETTER_QUEUE = "QD";/*** 声明普通交换机X,bean的别名xExchange* @return*/@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}/*** 声明死信交换机Y,bean的别名yExchange* @return*/@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_HEAD_LETTER_EXCHANGE);}/*** 声明普通队列QA* @return*/@Bean("queueA")public Queue queueA() {Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange", Y_HEAD_LETTER_EXCHANGE);//声明死信的routingeyarguments.put("x-dead-letter-routing-key", "YD");//设置消息过期时间ttl为10sarguments.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}/*** 声明普通队列QB* @return*/@Bean("queueB")public Queue queueB() {Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange", Y_HEAD_LETTER_EXCHANGE);//声明死信的routingKeyarguments.put("x-dead-letter-routing-key", "YD");//设置消息过期时间ttl为40sarguments.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}/*** 声明普通队列QC* @return*/@Bean("queueC")public Queue queueC() {Map<String, Object> arguments = new HashMap<>(2);//设置死信交换机arguments.put("x-dead-letter-exchange", Y_HEAD_LETTER_EXCHANGE);//设置死信的routingKeyarguments.put("x-dead-letter-routing-key", "YD");//这里不要设置TTL时间return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();}/*** 声明死信队列QD* @return*/@Bean("queueD")public Queue queueD() {return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}/*** 将队列QA绑定到交换机X上,指定routingKey为XA* @param queueA* @param xExchange* @return*/@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}/*** 将队列QB绑定到交换机X上,指定routingKey为XB* @param queueB* @param xExchange* @return*/@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}/*** 将队列QC绑定到交换机X上,指定routingKey为XC* @param queueC* @param xExchange* @return*/@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with("XC");}/*** 将队列QD绑定到交换机Y上,指定routingKey为YD* @param queueD* @param yExchange* @return*/@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) {return BindingBuilder.bind(queueD).to(yExchange).with("YD");}}

生产者

package com.xkj.org.controller;import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;/*** 消息生产者*/
@Slf4j
@RestController
@RequestMapping("/ttl")
@Api(tags = "消息生产者", description = "消息生产者控制器")
public class MessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@ApiOperation("消息发送测试")@GetMapping("/sendMsg/{msg}")public void sendMsg(@ApiParam(value = "发送的消息内容", required = true) @PathVariable("msg") String message) {log.info("当前时间{},发送一条消息给两个队列:{}", new Date().toString(), message);rabbitTemplate.convertAndSend("X", "XA", "ttl=10s的消息:" + message);rabbitTemplate.convertAndSend("X", "XB", "ttl=40s的消息:" + message);}@ApiOperation("发送带过期时间的消息")@GetMapping("/sendExpiredMsg/{msg}/{ttl}")public void sendMsgExpired(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message,@ApiParam(value = "ttl时间", required = true)@PathVariable("ttl") String ttlTime) {log.info("当前时间{},发送一条消息给队列QC:{},ttl={}", new Date().toString(), message, ttlTime);rabbitTemplate.convertAndSend("X", "XC", message, msg -> {msg.getMessageProperties().setExpiration(ttlTime);return msg;});}}

消费者

package com.xkj.org.listener;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** 消费者*/
@Slf4j
@Componentpublic class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), "UTF-8");log.info("当前时间:{},收到死信队列的消息:{}", new Date().toString(), msg);}
}

问题

改造代码后,发送两条消息,一条ttl为2s,另一条ttl为10s。ttl为2s的消息也要等到10s后才会收到。 

原因

因为队列是先进先出的,消息需要排队,第一条消息10s才会发出去,第二条消息2s发出去,但是由于10s的消息没有发出去,2s的消息就只有等待。

解决方案

使用rabbitmq插件来解决,请看后续博文。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 高性能响应式UI部件DevExtreme v24.1.4全新发布
  • TCP程序设计
  • Linux基础操作(下)
  • 基于Flink SQL CDC的实时数据同步
  • wire和reg的区别
  • 使用eclipse在新建的java项目中编辑xml文件时Unhandled event loop exception No more handles
  • 力扣 二分查找
  • Android Studio run App 不更新代码
  • 谷粒商城实战笔记-63-商品服务-API-品牌管理-OSS获取服务端签名
  • GO发票真伪批量查验方法、数电票查验接口
  • 系统移植(七)u-boot移植 ④ trusted版本
  • Flume安装部署
  • 先用先发!小样本故障诊断新思路!Transformer-SVM组合模型多特征分类预测/故障诊断(Matlab)
  • Unity横板动作游戏 -为什么我又开始学习Unity,而不是Godot。
  • SteerLM_ Attribute Conditioned SFT as an (User-Steerable) Alternative to RLHF
  • Angular 2 DI - IoC DI - 1
  • JavaScript 是如何工作的:WebRTC 和对等网络的机制!
  • JavaScript工作原理(五):深入了解WebSockets,HTTP/2和SSE,以及如何选择
  • Java知识点总结(JavaIO-打印流)
  • JS 面试题总结
  • MySQL-事务管理(基础)
  • python学习笔记 - ThreadLocal
  • 阿里研究院入选中国企业智库系统影响力榜
  • 初识 webpack
  • 扫描识别控件Dynamic Web TWAIN v12.2发布,改进SSL证书
  • 深入浏览器事件循环的本质
  • 线上 python http server profile 实践
  • 国内唯一,阿里云入选全球区块链云服务报告,领先AWS、Google ...
  • ​LeetCode解法汇总2696. 删除子串后的字符串最小长度
  • ​Redis 实现计数器和限速器的
  • # 职场生活之道:善于团结
  • #1015 : KMP算法
  • #周末课堂# 【Linux + JVM + Mysql高级性能优化班】(火热报名中~~~)
  • (0)Nginx 功能特性
  • (6)STL算法之转换
  • (C++哈希表01)
  • (function(){})()的分步解析
  • (HAL)STM32F103C6T8——软件模拟I2C驱动0.96寸OLED屏幕
  • (TipsTricks)用客户端模板精简JavaScript代码
  • (顶刊)一个基于分类代理模型的超多目标优化算法
  • (附源码)springboot 个人网页的网站 毕业设计031623
  • (附源码)基于SSM多源异构数据关联技术构建智能校园-计算机毕设 64366
  • (深入.Net平台的软件系统分层开发).第一章.上机练习.20170424
  • (原創) 如何優化ThinkPad X61開機速度? (NB) (ThinkPad) (X61) (OS) (Windows)
  • (转载)(官方)UE4--图像编程----着色器开发
  • *setTimeout实现text输入在用户停顿时才调用事件!*
  • .Net CF下精确的计时器
  • .NET Core Web APi类库如何内嵌运行?
  • .Net Core 笔试1
  • .NET Core 发展历程和版本迭代
  • .Net实现SCrypt Hash加密
  • @value 静态变量_Python彻底搞懂:变量、对象、赋值、引用、拷贝
  • [ACM独立出版]2024年虚拟现实、图像和信号处理国际学术会议(ICVISP 2024)
  • [Bada开发]初步入口函数介绍
  • [BZOJ] 1001: [BeiJing2006]狼抓兔子