当前位置: 首页 > news >正文

midwayjs 框架使用 rabbitmq 消息延迟

插件rabbitmq_delayed_message_exchange是RabbitMQ官方提供的一种用于实现延迟消息的解决方案。该插件将交换机类型扩展至x-delayed-message,这种类型的交换机能够将消息暂时挂起,直到设定的延迟时间到达,才将消息投递到绑定的队列中。这一特性使得RabbitMQ能够轻松处理延迟消息的场景,无需额外的业务逻辑来定时检查和触发消息的投递。

插件需要在服务端安装并开启后使用。

消息发送:生产者向一个x-delayed-message类型的交换机发送消息,同时在消息属性中设置x-delay头,表示消息应延迟的时间(单位:毫秒)。
延迟处理:交换机接收到消息后,不会立即投递给队列,而是将其挂起,等待设定的延迟时间。在此期间,消息处于未投递状态。
消息投递:一旦达到延迟时间,交换机会将消息投递给与之绑定的队列。此时,消息的行为就像普通消息一样,可以被消费者消费。
消息消费:消费者从队列中拉取消息,执行相应的业务逻辑。

1、生产者:在service文件夹下建立rabbitmq.service.ts文件,通过调用sendDelayOrderToExchange方法发送消息,x-delay 设置延时时间  单位ms

import { Provide, Scope, ScopeEnum, Init, Autoload, Destroy, Config, Inject } from '@midwayjs/decorator';
import * as amqp from 'amqp-connection-manager';
import { ChannelWrapper, AmqpConnectionManager } from 'amqp-connection-manager';import * as dayjs from 'dayjs';const OPTIONS = { durable: true, autoDelete: true }; // 队列opts
const EXCHANGE_CHARGE_DELAY = 'exchange.charge.delay'; // 延时订单
const QUEUE_CHARGE_DELAY = 'queue.charege.delay';@Autoload()
@Provide()
@Scope(ScopeEnum.Singleton) // Singleton 单例,全局唯一(进程级别)
export class RabbitmqService {private connection: AmqpConnectionManager;private channelWrapper: ChannelWrapper;@Config('rabbitmq')mqConfig;@Inject()logger;@Init()async connect() {// 创建连接,你可以把配置放在 Config 中,然后注入进来this.connection = await amqp.connect(this.mqConfig);// 创建 channelthis.channelWrapper = await this.connection.createChannel({json: true,setup: function (channel) {return Promise.all([// 延时Exchangechannel.assertExchange(EXCHANGE_CHARGE_DELAY, 'x-delayed-message', {durable: true,autoDelete: true,arguments: {'x-delayed-type': 'direct',},}),channel.assertQueue(QUEUE_CHARGE_DELAY, OPTIONS),// 队列channel.bindQueue(QUEUE_CHARGE_DELAY, EXCHANGE_CHARGE_DELAY, 'DELAY_ORDER'),// 绑定交换机]);},});}// 发送预约订单public async sendDelayOrderToExchange(message: string) {this.logger.info(`发送延时订单:${message}  当前时间:${dayjs().format('YYYY-MM-DD HH:mm:ss')}`);await this.channelWrapper.publish(EXCHANGE_CHARGE_DELAY, 'DELAY_ORDER', message, {headers: { 'x-delay': 10 * 1000 },// 延时时间 单位毫秒});}@Destroy()async close() {await this.channelWrapper.close();await this.connection.close();}
}

2、消费者:在consumer文件夹下新建mq.consumer.ts,通过监听延时队列接受消息

import { Consumer, MSListenerType, RabbitMQListener, Inject } from '@midwayjs/decorator';
import { ConsumeMessage } from 'amqplib';
import { Context } from '@midwayjs/rabbitmq';import * as dayjs from 'dayjs';const QUEUE_CHARGE_DELAY = 'queue.charege.delay';@Consumer(MSListenerType.RABBITMQ)
export class UserConsumer {@Inject()ctx: Context;@Inject()logger;@RabbitMQListener(QUEUE_CHARGE_DELAY, {durable: true,autoDelete: true,})async delayOrder(msg: ConsumeMessage) {if (msg && msg.content) {const id = msg.content.toString('utf-8');this.logger.info(`预约订单号:${id} 当前时间:${dayjs().format('YYYY-MM-DD HH:mm:ss')}`);}}
}

在configuration.ts文件中调用测试

import { Configuration, App } from '@midwayjs/core';
import * as koa from '@midwayjs/koa';
.....
.....
.....
export class ContainerLifeCycle {@App()app: koa.Application;@Inject()rabbitmqService: RabbitmqService;async onReady() {await this.rabbitmqService.sendDelayOrderToExchange('123456789');}
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • ES 根据条件删除文档
  • 【Python入门】第5节 数据容器
  • 分布式云扩展 AI 边缘算力,助力用户智能化创新
  • [Linux#47][网络] 网络协议 | TCP/IP模型 | 以太网通信
  • Apache RocketMQ 中文社区全新升级丨阿里云云原生 7 月产品月报
  • Xor Sigma Problem
  • CSS系列之浮动清除clear(三)
  • 数据库mysql集群主从、高可用MGR、MHA技术详解
  • Go 语言生产服务故障案例精析
  • 黑马JavaWeb开发笔记09——ElementUI代码引入教程、Element常用组件使用(Table, Pagination, Dialog, Form)
  • python爬虫源码:selenium+browsermobproxy实现浏览器请求抓取
  • 7- 排序算法
  • 如何本地搭建Whisper语音识别模型
  • netty之ChannelOption
  • 数据库入门: 从 0 到 1 理解数据管理
  • hexo+github搭建个人博客
  • Computed property XXX was assigned to but it has no setter
  • LeetCode刷题——29. Divide Two Integers(Part 1靠自己)
  • Mac转Windows的拯救指南
  • Markdown 语法简单说明
  • miaov-React 最佳入门
  • MYSQL如何对数据进行自动化升级--以如果某数据表存在并且某字段不存在时则执行更新操作为例...
  • Node + FFmpeg 实现Canvas动画导出视频
  • spring security oauth2 password授权模式
  • 大快搜索数据爬虫技术实例安装教学篇
  • 记录一下第一次使用npm
  • 聊聊flink的BlobWriter
  • 你不可错过的前端面试题(一)
  • 使用Gradle第一次构建Java程序
  • 微信开放平台全网发布【失败】的几点排查方法
  • 我从编程教室毕业
  • 智能网联汽车信息安全
  • 自定义函数
  • 资深实践篇 | 基于Kubernetes 1.61的Kubernetes Scheduler 调度详解 ...
  • ​html.parser --- 简单的 HTML 和 XHTML 解析器​
  • ​软考-高级-系统架构设计师教程(清华第2版)【第1章-绪论-思维导图】​
  • #我与虚拟机的故事#连载20:周志明虚拟机第 3 版:到底值不值得买?
  • (07)Hive——窗口函数详解
  • (1)(1.13) SiK无线电高级配置(五)
  • (1)Map集合 (2)异常机制 (3)File类 (4)I/O流
  • (4)(4.6) Triducer
  • (AngularJS)Angular 控制器之间通信初探
  • (JS基础)String 类型
  • (M)unity2D敌人的创建、人物属性设置,遇敌掉血
  • (二)基于wpr_simulation 的Ros机器人运动控制,gazebo仿真
  • (附源码)ssm考生评分系统 毕业设计 071114
  • (规划)24届春招和25届暑假实习路线准备规划
  • (生成器)yield与(迭代器)generator
  • (一)基于IDEA的JAVA基础1
  • (转) ns2/nam与nam实现相关的文件
  • (最简单,详细,直接上手)uniapp/vue中英文多语言切换
  • ./indexer: error while loading shared libraries: libmysqlclient.so.18: cannot open shared object fil
  • .【机器学习】隐马尔可夫模型(Hidden Markov Model,HMM)
  • .Net 垃圾回收机制原理(二)
  • .net 生成二级域名