当前位置: 首页 > news >正文

【RabbitMQ】概述

前言

Rabbit是一个公司名称,MQ(Message Queue)是消息队列的意思。所以RabbitMQ指的就是Rabbit企业下的一个消息队列产品。

RabbitMQ是一个实现了AMQP协议的消息队列服务,是当前主流的消息中间件之一。

AMQP(Advanced Message Queuing Protocol),高级消息队列协议。

AMQP协议是一个通用的应用层协议,用来提供统一消息服务的协议,为面向消息的中间件设计。AMQP协议定义了一套确定的消息交换功能,包括交换器、队列等,这些组件共同工作,使得生产者能够将消息发送到交换器,然后由队列接收并等待消费者接收。基于此协议的客户端与消息中间件可传递消息,并不受客户端、中间件或开发语言等的限制。

RabbitMQ是遵从AMQP协议的。换句话说,RabbitMQ就是AMQP协议的Erlang实现(RabbitMQ也支持STOMP、MQTT2等协议)。AMQP的模型结构和RabbitMQ的模型结构是一样的。

 在互联网架构中,会经常使用MQ来作为消息通信服务。

什么是MQ

MQ,即Message Queue,可以学到组件部分的同学一定对这个东西不陌生,至少对Queue不陌生。在数据结构中,就肯定学习了队列。谈到队列,最简单的无非就是它先进先出的特性。对于MQ来说,实质就是一个队列,只不过是一个功能更强大的队列而已。在消息队列中,可以存放文本字符串、JSON等简单内容,也可以内嵌对象这样的复杂内容。

MQ多用于分布式系统之间进行通信(作用)。

系统之间的调方式一般有两种:

同步通信,直接调用对方的服务,数据从一段发出后立即到达另一端

异步通信,数据从一端出发之后,现进入一个容器进行临时存储,当到达某种条件之后,再由这个容器发送给另一端。容器的一个具体实现就是消息队列。

 MQ的作用

MQ主要的工作就是收发消息,在不同的应用场景下可以展现不同的作用。

异步解耦

在业务流程中,一些操作可能非常耗时,但并不需求及时返回结果。这时可以借助MQ把这些操作异步化。比如,用户完成注册后发送短信通知或邮件通知,可以作为异步任务处理,而不必等待这些操作完成后才告知用户注册成功。

流量削峰

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,无疑是最大的浪费。使用MQ能够使关键组件支撑突发访问压力,不会因为突发流量而崩溃。比如秒杀或促销活动,可以使用MQ来控制流量,将请求排队,然后系统根据自己的处理能力逐渐处理这些请求。

异步通信

在很多时候应用不需要立即处理消息,消息队列提供了异步处理机制。允许应用把一些消息放在消息队列中,但是并不立即处理它,在需要的时候慢慢处理。

消息分发

当多个系统需要对同一数据做出相应时,可以使用MQ进行消息分发。比如支付成功后,支付系统可以向MQ发送消息,其他系统订阅消息,而无需轮询数据库。

延迟通知

在需要在特定时间后发送通知的场景中,可以使用MQ的延迟消息功能。比如在电子商务平台中,如果用户下单后一定时间内未支付,可以使用延迟队列在超时后自动取消订单。

不同MQ的区别

目前业界有很多的MQ产品,例如RabbitMQ、Kafka、RocketMQ以及ActiveMQ等,当然直接使用Redis中的消息队列的也有。这些消息队列,各有侧重,没有好坏之分,只有适合不适合。在实际选型时,往往需要结合自身需求以及MQ产品的特性,综合考虑。

Kafka

Kafka一开始的目的就是用于日志收集和传输,追求高吞吐量,性能卓越,单机吞吐可以达到十万级,在日志领域比较成熟,功能较为简单,主要支持简单的MQ功能。如果有日志采集需求,首选肯定是Kafka。

RabbitMQ

采用Erlang语言开发,MQ功能比较完善,且支持所有的主流语言,开源提供的界面也非常友好,性能较好,吞吐量能达到万级,社区活跃度也比较高。主要适合中小型企业,数据量没那么大,且并发没那么高的场景。

RocketMQ

采用Java语言开发,由阿里巴巴开源,后捐赠给Apache。在可用性、可靠性以及稳定性方面都非常出色,吞吐量达到十万级,在阿里巴巴集团内部广泛使用。但是支持的客户端语言不高,产品较新文档较少,且社区活跃度一般。适合大规模分布式系统,可靠性要求高,且并发大的场景,比如互联网金融。

核心概念

  •  Producer:生产者,是RabbitMQ Server的客户端,向RabbitMQ发送消息。
  • Consumer:消费者,是RabbitMQ Server的客户端,从RabbitMQ接收消息。
  • Broker:其实就是RabbitMQ Server,主要是接消息和发消息。
  • Virtual Host:虚拟主机。这是一个虚拟概念,他为消息队列提供了一种逻辑上的隔离机制。对于RabbitMQ而言,一个Broker上可以存在多个虚拟主机。当多个用户使用同一个Broker提供的服务时,可以划分出多个虚拟主机,每个用户在自己的虚拟主机创建交换机和队列。
  • Exchange:交换机。消息到达Broker的第一站,交换机负责接收生产者发送的消息,并根据特定的规则将这些消息路由到一个或者多个队列中。交换机起到了消息路由的作用,它根据类型和规则来确定如何转发接收到的消息。
  • Queue:队列,用于存储消息。队列和消费者之间的关系是多对多的。
  • Connection:连接,是客户端和Broker之间的一个TCP连接,这个连接是建立消息传递的基础,它负责客户端和服务器之间的所有数据和控制信息。
  • Channel:通道,信道。Channel是在Connection之上的一个抽象层。在RabbitMQ中,一个TCP连接可以有多个信道,每个信道都是独立的虚拟连接,消息的收发都是基于信道的。通道的主要作用是将信息的读写操作复用到同一个TCP连接上,这样就可以减少建立和关闭连接的过程,提高性能。

工作流程

  1. 生产消息:生产者生产了一条消息。
  2. 创建连接:生产者连接到Broker,建立一个连接,开启一个信道。
  3. 声明交换机、队列以及绑定规则:生产者声明一个队列,用来存放消息;声明一个交换机,用来路由消息;制定一个绑定规则,使得交换机把消息能路由给队列。
  4. 发送消息:生产者发送消息到Broker。
  5. 消息存储:Broker接收到消息,并根据路由规则存入相应的队列中。如果未找到相应的队列,则根据生产者的配置,选择丢弃或者退回给生产者。
  6. 消费消息:消费者监听队列,当消息到达时,从队列中获取消息。处理后,向Broker发送消息确认。
  7. 删除消息:消息被确认后,RabbitMQ会把消息从队列中删除。

入门案例

引入依赖

<!--rabbitmq依赖-->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

生产者代码编写

// 入门案例的生产者代码编写import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("43.138.108.125"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机,此处使用的是RabbitMQ内部的交换机,因此不需要声明// TODO 声明队列/*** queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)* queue:队列名称* durable:可持久化,当mq接收到消息之后,是否要持久化到硬盘中(防止因宕机等一系列原因导致的信息丢失)* exclusive:是否独占,该队列在消费者消费时,是否只能存在一个消费者* autoDelete:是否自动删除,该队列没有消费者时,是否将其删除*/channel.queueDeclare("entry", true, false, false, null);// TODO 发送消息/*** basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)* exchange:交换机名称,此处使用的是内置交换机,因此不需要给出名称* routingKey:交换机和队列的绑定关系,由于使用的是内置交换机,所以该属性要和队列名保持一致* props:属性配置* body:消息内容*/String message = "hello entry";channel.basicPublish("", "entry", null, message.getBytes());// TODO 资源释放channel.close();connection.close();}}

当启动该程序之后,就会发现在开源界面的队列中新增了一个entry队列,并且在队列中存在一条消息。

消费者代码编写

// 入门案例的消费者代码编写import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("43.138.108.125"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机,此处使用的是RabbitMQ内部的交换机,因此不需要声明/* TODO 声明队列,此处声明是可以省略的,不过不建议TODO 因为在实际开发者,无法确定生产者和消费者哪个先启动,如果消费者先启动TODO 消费则就会去监听该队列,但是却发现队列不存在,就会产生报错 *//*** queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)* queue:队列名称* durable:可持久化,当mq接收到消息之后,是否要持久化到硬盘中(防止因宕机等一系列原因导致的信息丢失)* exclusive:是否独占,该队列在消费者消费时,是否只能存在一个消费者* autoDelete:是否自动删除,该队列没有消费者时,是否将其删除*/channel.queueDeclare("entry", true, false, false, null);// TODO 消费消息/*** basicConsume(String queue, boolean autoAck, Consumer callback)* basicConsume(String queue, Consumer callback)* queue:队列名称* autoAck:是否自动确认* callback 接收到消息之后,执行的逻辑*/DefaultConsumer consumer = new DefaultConsumer(channel) {/*** 在这个方法中,我们可以定义如何处理接收到的消息* @param consumerTag 消费者标签,通常是消费者在订阅队列时指定的* @param envelope 包含消息的封包消息,如队列名称、交换机等* @param properties 一些配置信息* @param body 消息的具体内容*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到消息");System.out.println(new String(body));}};channel.basicConsume("entry", true, consumer);// TODO 释放资源channel.close();connection.close();}}

当启动该程序之后,就会看到entry队列的消息从1变成了0,并且在输出界面上出现了下述内容。

在本篇文章中,主要是对MQ整体进行了一个概述,并且写了一个简单的案例来表示RabbitMQ是如何使用的。在下篇文章中,将主要来介绍MQ的工作模式。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 骨传导耳机哪个品牌比较好?盘点五款闭眼入都不踩雷的优质骨传导耳机!
  • 大模型LLM之SpringAI:Web+AI(一)
  • UEFI学习笔记(七):UEFI_Spec_2_10 Protocols整理
  • 【滑动窗口-1004. 最大连续1的个数 III】
  • 基于Java+SpringBoot+Vue+MySQL的西安旅游管理系统网站
  • Windows--linux共享文件夹
  • SAP B1 学习笔记 - 易混淆字段名(持续更新中)
  • matlab数据批量保存为excel,文件名,行和列的名称设置
  • Redis面对数据量庞大处理方法
  • 基于SpringBoot的社团管理系统
  • Java实现邮箱发送功能详细步骤及注意事项?
  • 介绍 Apache Spark 的基本概念和在大数据分析中的应用。
  • Java设计模式—面向对象设计原则(二) --------> 里氏代换原则 LSP (完整详解,附有代码+案列)
  • leetcode-647. 回文子串
  • Linux相关概念和重要知识点(2)(用户、文件和目录、inode、权限)
  • -------------------- 第二讲-------- 第一节------在此给出链表的基本操作
  • [ JavaScript ] 数据结构与算法 —— 链表
  • 【407天】跃迁之路——程序员高效学习方法论探索系列(实验阶段164-2018.03.19)...
  • 0基础学习移动端适配
  • angular2开源库收集
  • JavaScript 基础知识 - 入门篇(一)
  • js递归,无限分级树形折叠菜单
  • 分布式事物理论与实践
  • 给github项目添加CI badge
  • 诡异!React stopPropagation失灵
  • 聊聊sentinel的DegradeSlot
  • 微服务入门【系列视频课程】
  • puppet连载22:define用法
  • TPG领衔财团投资轻奢珠宝品牌APM Monaco
  • ​Python 3 新特性:类型注解
  • ## 临床数据 两两比较 加显著性boxplot加显著性
  • $refs 、$nextTic、动态组件、name的使用
  • (Forward) Music Player: From UI Proposal to Code
  • (转)c++ std::pair 与 std::make
  • .desktop 桌面快捷_Linux桌面环境那么多,这几款优秀的任你选
  • .Family_物联网
  • .NET Core 项目指定SDK版本
  • .net 开发怎么实现前后端分离_前后端分离:分离式开发和一体式发布
  • .NET/C# 如何获取当前进程的 CPU 和内存占用?如何获取全局 CPU 和内存占用?
  • .vollhavhelp-V-XXXXXXXX勒索病毒的最新威胁:如何恢复您的数据?
  • /tmp目录下出现system-private文件夹解决方法
  • ::
  • @cacheable 是否缓存成功_Spring Cache缓存注解
  • @data注解_SpringBoot 使用WebSocket打造在线聊天室(基于注解)
  • @ResponseBody
  • @vue-office/excel 解决移动端预览excel文件触发软键盘
  • @zabbix数据库历史与趋势数据占用优化(mysql存储查询)
  • [2008][note]腔内级联拉曼发射的,二极管泵浦多频调Q laser——
  • [2023年]-hadoop面试真题(一)
  • [AIGC] 解题神器:Python中常用的高级数据结构
  • [ai笔记3] ai春晚观后感-谈谈ai与艺术
  • [APIO2015]巴厘岛的雕塑
  • [Assignment] C++1
  • [C# 网络编程系列]专题六:UDP编程
  • [C#] 如何调用Python脚本程序