当前位置: 首页 > news >正文

【Node.js】初识 RabbitMQ

概述

MQ 顾名思义,是消息队列。

RabbitMQ 是一个消息队列系统,用于实现异步通信。基于 AMQP。AMQP(高级消息队列协议) 实现了对于消息的排序,点对点通讯,和发布订阅,保持可靠性、保证安全性。

在 Node.js 的微服务架构中,RabbitMQ 可以作为服务之间的消息传递中介,帮助解耦系统组件并提升系统的扩展性和可靠性。支持主流操作系统和多种语言。

通过 RabbitMQ,你可以让一个服务发送消息到队列,另一个服务从队列中消费消息。这种机制使服务之间可以异步通信,特别适合那些不需要立即响应的场景,例如订单处理、日志记录、通知推送等。

下面通过一个电商平台的例子讲解如何使用 Node.js 和 RabbitMQ 来实现异步的订单处理。

RabbitMQ 概念

  • Producer(生产者):发送消息到 RabbitMQ 的服务(到交换器中)。
  • Consumer(消费者):从 RabbitMQ 队列中接收和处理消息的服务。
  • Message(消息):在RabbitMQ中,消息是传递的基本单元。它由消息体和可选的属性组成。
  • Queue(队列):消息存储的地方,生产者把消息发送到队列,消费者从队列中接收消息。
  • Exchange(交换器):RabbitMQ 用于接收生产者的消息,并根据某种规则将其路由到一个或多个队列。
  • Routing Key(路由键):用于帮助交换器将消息路由到正确的队列。
    在这里插入图片描述

上图来源:小满zs

RabbitMQ 的安装

erlang 的安装

Rabbit MQ的依赖环境 erlang ,MQ是基于这个语言开发的。

官网下载erlang:erlang

在这里插入图片描述

然后一直选择默认即可。

安装完成之后,配置一个环境变量。

在这里插入图片描述

在这里插入图片描述

power shell 测试:

在这里插入图片描述

RabbitMQ 的安装

RabbitMQ官网

在这里插入图片描述

然后一直默认即可。

之后还是需要配置下环境变量。

还是类似的操作:

在这里插入图片描述

安装 RabbitMQ 的可视化面板

直接在终端 安装MQ插件拥有可视化面板:

rabbitmq-plugins enable rabbitmq_management

在这里插入图片描述

启动 MQ ,MQ 默认使用端口为 5672:

rabbitmq-server.bat start

在这里插入图片描述

这里有部分同学可能启动失败,是因为在安装 RabbitMQ 的时候,最后一步默认安装后启动 RabbitMQ 被自动勾选了,所以我们需要打开服务,手动停止 RabbitMQ 服务,然后使用 MQ 的插件手动启动 服务就可以了。

在这里插入图片描述

启动好后该终端不要关闭,访问 http://localhost:15672/#/ 账号密码都是 guest:

在这里插入图片描述

就进入了 MQ 的可视化界面。

Node.js 使用 RabbitMQ

这边它的官网也是由很多语言的使用教程 JavaScript使用 MQ:

在这里插入图片描述

安装必要的依赖

在 Node.js 中使用 RabbitMQ,通常会用到 amqplib 这个库来与 RabbitMQ 进行交互。

npm install amqplib express

express 帮助我们快速启动服务。

RabbitMQ 示例:异步订单处理

我们以订单服务为例来说明如何通过 RabbitMQ 实现异步订单处理。假设系统中有两个服务:

  • 订单服务(Order Service):负责接收用户订单,并将订单消息发送到 RabbitMQ 队列。
  • 支付服务(Payment Service):从 RabbitMQ 队列中接收订单信息并处理支付。
订单服务(生产者)

订单服务接收到用户创建订单的请求后,会将订单信息发送到 RabbitMQ 的队列中,而不需要立即处理支付流程。

// order-service.js 生产者
const express = require('express');
const amqp = require('amqplib/callback_api');
const app = express();
app.use(express.json());let channel = null;// 连接 RabbitMQ 并创建一个频道
amqp.connect('amqp://localhost:5672', (error, connection) => {if (error) {throw error;}connection.createChannel((err, ch) => {if (err) {throw err;}channel = ch;// 创建队列const queue = 'orderQueue';channel.assertQueue(queue, { durable: false });});
});// 订单服务 API
app.post('/order', (req, res) => {const order = { userId: req.body.userId, productId: req.body.productId };// 将订单发送到 RabbitMQ 队列channel.sendToQueue('orderQueue', Buffer.from(JSON.stringify(order)), {persistent: true // 持久化消息(避免服务器宕机) 底层原理是 存储在磁盘});console.log("Order sent to queue:", order);res.status(201).json({ message: 'Order placed successfully!' });
});// 启动订单服务
app.listen(3001, () => {console.log('Order service is running on port 3001');
});

在这里,订单服务通过 sendToQueue() 方法将订单信息发送到名为 orderQueue 的队列中。

支付服务(消费者)

支付服务从 orderQueue 中获取订单,并处理订单支付。

// payment-service.js 消费者
const amqp = require('amqplib/callback_api');// 连接 RabbitMQ 并创建一个频道
amqp.connect('amqp://localhost:5672', (error, connection) => {if (error) {throw error;}connection.createChannel((err, channel) => {if (err) {throw err;}const queue = 'orderQueue';// 确保队列存在channel.assertQueue(queue, {durable: false // 队列和交换机的持久化});// 从队列中消费消息console.log('Waiting for messages in %s. To exit press CTRL+C', queue);channel.consume(queue, (msg) => {const order = JSON.parse(msg.content.toString());console.log('Received order:', order);// 假设支付处理逻辑setTimeout(() => {console.log('Payment processed for order:', order);channel.ack(msg); // 确认消息处理完成}, 1000);}, {noAck: false // 确保消息处理完成后才确认});});
});

支付服务通过 consume() 方法从 orderQueue 中获取订单消息,并处理支付逻辑。每次支付处理完成后,支付服务会调用 ack() 方法确认消息已经成功处理。

开启一个 http 请求,进行测试:

POST http://localhost:3001/order HTTP/1.1
Content-Type: application/json{"userId": 1,"productId": 1
}

响应:

{"message": "Order placed successfully!"
}

在这里插入图片描述
在这里插入图片描述

RabbitMQ 的工作流程

  1. 用户通过订单服务创建订单。
  2. 订单服务将订单信息发送到 RabbitMQ 队列中。
  3. 支付服务从队列中接收订单消息,并异步处理订单支付。

这个流程使订单服务和支付服务解耦。订单服务只需要把消息发送到队列,支付服务则在后台处理支付逻辑。这样的架构具有以下优势:

  • 异步处理:订单服务不需要等待支付完成,响应用户请求变得更加快速。
  • 解耦:订单服务和支付服务可以独立扩展和维护,互不依赖。
  • 负载均衡:可以轻松地扩展多个支付服务实例,从同一个队列中消费消息,提升系统的处理能力。
  • 而且服务之间使用不同的语言也是完全可以的。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 短文写作竞赛系统小程序的设计
  • VSCode拉取远程项目
  • python爬虫基础:了解html
  • 国密起步7:BouncyCastle使用SM4自定义格式加解密C#版
  • PyTorch详解-可视化模块
  • Redis 在 Spring Boot 项目中的实际应用及问题解决
  • OpenCV和Tesseract OCR识别复杂验证码喽~~
  • Wine容器内程序执行sh脚本问题研究
  • 「数组」堆排序 / 大根堆优化(C++)
  • 短信验证码倒计时 (直接复制即可使用) vue3
  • C# List定义和常用方法
  • 接口测试到底测试什么?
  • uniapp 如何自定义导航栏并自适应机型
  • 【 Kubernetes 风云录 】- Cert证书更新
  • YoloV10改进策略:上采样改进|动态上采样|轻量高效,即插即用(适用于分类、分割、检测等多种场景)
  • 【comparator, comparable】小总结
  • iOS 颜色设置看我就够了
  • Java面向对象及其三大特征
  • Meteor的表单提交:Form
  • Mocha测试初探
  • Odoo domain写法及运用
  • python 装饰器(一)
  • Redis学习笔记 - pipline(流水线、管道)
  • 多线程事务回滚
  • 关于for循环的简单归纳
  • 机器学习中为什么要做归一化normalization
  • 两列自适应布局方案整理
  • 聊聊springcloud的EurekaClientAutoConfiguration
  • 腾讯优测优分享 | 你是否体验过Android手机插入耳机后仍外放的尴尬?
  • 为什么要用IPython/Jupyter?
  • 我有几个粽子,和一个故事
  • 小而合理的前端理论:rscss和rsjs
  • 一些基于React、Vue、Node.js、MongoDB技术栈的实践项目
  • UI设计初学者应该如何入门?
  • 好程序员web前端教程分享CSS不同元素margin的计算 ...
  • 正则表达式-基础知识Review
  • ​软考-高级-系统架构设计师教程(清华第2版)【第20章 系统架构设计师论文写作要点(P717~728)-思维导图】​
  • #pragma pack(1)
  • #基础#使用Jupyter进行Notebook的转换 .ipynb文件导出为.md文件
  • (14)Hive调优——合并小文件
  • (Arcgis)Python编程批量将HDF5文件转换为TIFF格式并应用地理转换和投影信息
  • (PySpark)RDD实验实战——求商品销量排行
  • (STM32笔记)九、RCC时钟树与时钟 第一部分
  • (TOJ2804)Even? Odd?
  • (zz)子曾经曰过:先有司,赦小过,举贤才
  • (草履虫都可以看懂的)PyQt子窗口向主窗口传递参数,主窗口接收子窗口信号、参数。
  • (十七)Flink 容错机制
  • (译)计算距离、方位和更多经纬度之间的点
  • (源码分析)springsecurity认证授权
  • (转)菜鸟学数据库(三)——存储过程
  • (转载)从 Java 代码到 Java 堆
  • .config、Kconfig、***_defconfig之间的关系和工作原理
  • .net 8 发布了,试下微软最近强推的MAUI
  • .net CHARTING图表控件下载地址
  • .NET Framework、.NET Core 、 .NET 5、.NET 6和.NET 7 和.NET8 简介及区别