当前位置: 首页 > news >正文

RabbitMQ 系列教程

一、RabbitMQ 部署及配置详解(集群部署)

二、RabbitMQ 部署及配置详解 (单机)

三、RabbitMQ 详解及实例(含错误信息处理)

四、RabbitMq死信队列及其处理方案

五、RabbitMQ Java开发教程—官方原版 

六、RabbitMQ Java开发教程(二)—官方原版

七、RabbitMQ Java开发教程(三)—官方原版_

 

 

一、RabbitMQ 核心概念

1. 生产者和消费者

Producer: 消息的生产者,用于发布消息;

Consumer: 消息的消费者,用于从队列中获取消息.消费者只需关注队列即可,不需要关注交换机和路由键。消费者可以通过basicConsume(订阅模式可以从队列中一直持续的自动的接收消息)或者basicGet(先订阅消息,然后获取单条消息,再然后取消订阅。也就是说basicGet一次只能获取一条消息,如果还想再获取下一条还要再次调用basicGet)来从队列中获取消息。

2. RabbitMQ broker 服务器

RabbitMQ broker: 官方定义"RabbitMQ isn’t a food truck, it’s a delivery service",指明RabbitMQ是一种传输服务。

3. ExChange 交换机

Exchange: 生产者会将消息发送到交换机,然后交换机通过路由策略(规则)将消息路由到匹配的队列中去。ExchangeType决定了Exchange路由消息的行为,在RabbitMQ中,ExchangeType有direct、Fanout、Topic和Header 4种。Exchange 类似于数据通信网络中的交换机,提供消息路由策略。

在RabbitMQ 中,Producer 不是通过信道直接将消息发送给 Queue,而是先发送给 ExChange. 一个 ExChange 可以和多个 Queue 进行绑定,Producer 在传递消息的时候,会传递一个 ROUTING_KEY, ExChange 会根据这个 ROUTING_KEY 按照特定的路由算法,将消息路由给指定的 Message Queue。与 Queue 一样, ExChange 也可设置为持久化,临时或者自动删除。

4. Binding 绑定

所谓绑定就是将一个特定的 ExChange 和一个特定的 Queue 绑定起来,所以Binding不是一个概念,而是一种操作。RabbitMQ中通过绑定,以路由键作为桥梁将Exchange与Queue关联起来()Exchange—>Routing Key—>Queue,这样RabbitMQ就知道如何正确地将消息路由到指定的队列了,通过queueBind()方法将Exchange、Routing Key、Queue绑定起来.ExChange 和 Queue 的绑定可以是多对多的关系。

5. Binding Key 绑定键

Binding Key: 它表示的是Exchange与Message Queue是通过binding key进行绑定联系的,这个关系是固定的。初始化的时候,我们就会建立该队列。

6. Routing Key 路由键

Routing Key: 它是一个String值,用于定义路由规则。生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则。在队列绑定的时候需要指定路由键,在生产者发布消息的时候需要指定路由键,当消息的路由键和队列绑定的路由键匹配时,消息就会发送到该队列。

7. Message Queue 消息队列

用于存储消息的容器,可以看成一个有序的数组,生产者生产的消息会发送到交换机中,最终交换机将消息存储到某个或某些队列中。队列可被消费者订阅,消费者从订阅的队列中获取消息。

Message Queue: 消息队列,我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到相应的queue则数据会丢失),等待消费者来取.

消息队列提供了 FIFO 的处理机制,具有缓存消息的能力.在RabbitMQ 中,队列消息可以设置为持久化,临时或者自动删除.

设置为持久化的队列,Queue 中的消息会在 Server 本地硬盘存储一份,防止系统 Crash,数据丢失;

设置为临时的队列,Queue 中的数据在系统重启之后就会丢失;

设置为自动删除的队列,当没有用户连接到 Server,队列中的数据会被自动删除。

8. Virtual Host 虚拟主机

每一个RabbitMQ服务器都能创建多个虚拟消息服务器,我们称之为虚拟主机.每一个vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的交换机、队列、绑定等,拥有自己的权限机制.vhost相对于RabbitMQ就像虚拟机之于物理机一样.他们通过在各个实例间提供逻辑上的分离,允许不同的应用程序安全保密的运行数据,这很有用,它既能将同一个Rabbit的众多客户区分开来,又可以避免队列和交换器的命名冲突.RabbitMQ提供了开箱即用的默认的虚拟主机“/”,如果不需要多个vhost可以直接使用这个默认的vhost,通过使用缺省的guest用户名和guest密码来访问默认的vhost.

vhost之间是相互独立的,这避免了各种命名的冲突,就像App中的沙盒的概念一样,每个沙盒是相互独立的,且只能访问自己的沙盒,以保证非法访问别的沙盒带来的安全隐患.

二、RabbitMQ部署

rabbitmq部署有三种模式:单机模式,普通集群模式,镜像集群模式

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现

1、安装erlang环境

由于rabbitmq是基于erlang语言开发的,所以必须先安装erlang。

wget http://erlang.org/download/otp_src_20.0.tar.gz
tar -zxvf otp_src_20.0.tar.gz
cd otp_src_20.0
./configure --prefix=/usr/local/erlang --without-javac
make
make install

加入环境变量

vim /etc/profile
在最后添加:
export ERLANG_HOME=/usr/local/erlang
export PATH=$PATH:/usr/local/erlang/bin

运行以下命令,使环境变量立即生效

source /etc/profile

验证erl是否成功安装

erl

2、单机模式安装

安装 rabbitmq

可以去官网手动下载(Downloading and Installing RabbitMQ — RabbitMQ)也可以通过wget命令下载

cd /opt
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.11.6/rabbitmq-server-3.11.6.tar.xz
xz -d rabbitmq-server-generic-unix-3.11.6.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.6.10.tar
cp -r /opt/rabbitmq_server-3.6.10 /usr/local/rabbitmq
注:该方式安装的RabbitMQ是没有配置文件的,如需要配置文件,需手动进行配置,文件置于自己Rabbitmq安装路径下的 /etc/rabbitmq/rabbitmq.conf 即可,再管理页面或者日志中都可以查看到路径位置
/usr/local/rabbitmq/sbin/rabbitmq-server -detached
echo "/usr/local/rabbitmq/sbin/rabbitmq-server -detached" >> /etc/rc.local      #添加开机自启动。

开启 web 管理界面

/usr/local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management

RabbitMQ默认的账号用户名和密码都是guest

rabbitmqctl delete_user guest

创建RabbitMQ管理员用户

#创建一个新用户
rabbitmqctl add_user <用户名> <密码>
#将创建的新用户设置为管理员
rabbitmqctl set_user_tags <用户名> administrator
#赋予新创建的用户所有权限
rabbitmqctl set_permissions -p / <用户名> ".*" ".*" ".*"

在本地主机中,使用浏览器访问Linux实例的公网IP:15672

显示如下页面,说明RabbitMQ安装成功。

3、普通集群模式

在多台机器上启动多个rabbitmq实例,每个机器启动一个。

但是你创建的queue,只会放在一个rabbtimq实例上,但是每个实例都同步queue的元数据(存放含queue数据的真正实例位置)。消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从queue所在实例上拉取数据过来。

三、配置文件详解

所有​ ​受支持的RabbitMQ版本​​​ 的主配置文件都使用​ ​类似ini的sysctl配置文件格式​​。该文件通常命名为Rabbitmq.conf。

rabbitmq.conf

新样式格式(sysctl或类似ini的格式)

​主配置文件​​。应该用于大多数设置。对于人类来说,阅读和机器(部署工具)的生成更容易。并非所有设置都可以这种格式表示。

advanced.config

经典(Erlang术语)

不能以新样式配置格式表示的有限数量的设置,例如​ ​LDAP查询​​。仅在必要时使用。

rabbitmq-env.conf

环境变量对

用于在一处设置与RabbitMQ相关的​ ​环境变量​​。

 

 四、开发实例

 1、引入依赖,pom文件:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、基本配置文件:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*rabbitMq基本配置*/
@Configuration
public class RabbitMqConfig {public final static String productExchange = "productExchange ";private String productQueue = "productQueue";public final static String ERROR_EXCHANGE = "le.error.exchange";private String topicExchangeName = "";@Beanpublic FanoutExchange productExchange() {return new FanoutExchange(productExchange);}public TopicExchange topicExchange() {return new TopicExchange(topicExchangeName);}@Beanpublic DirectExchange errorExchang() {return new DirectExchange(ERROR_EXCHANGE, true, false);}@Beanpublic Queue productQueue() {return new Queue(productQueue, true);}@Beanpublic Binding productBinding(Queue productQueue, FanoutExchange productExchange) {return BindingBuilder.bind(productQueue).to(productExchange);}
}

3、生产者:

@Component
public class ProductProducer {Logger logger = LoggerFactory.getLogger(ProductProducer.class);@AutowiredRabbitTemplate rabbitTemplate;@AutowiredGson gson;public void push(ProfitEntity profitEntity) {String json = gson.toJson("");logger.info("###消息发送者: json=" + json); rabbitTemplate.convertAndSend(RabbitmqConfig.Product_EXCHANGE, null, json);}}

4、消费者:

@Component
public class TransProductConsumer {Logger logger = LoggerFactory.getLogger(TransproductConsumer.class);@AutowiredRabbitTemplate rabbitTemplate;@AutowiredErrorProducer errorProducer;@AutowiredProductService productService;@RabbitHandler@RabbitListener(queues = RabbitmqConfig.productQueue )public void comsume(String json) {try {logger.info("##start##消费者:TransProductConsumer ,接收到消息:json=" + json);} catch (Exception e) {logger.error("计算消息队列出错 zf.Product.caculate.queue ", e);errorProducer.push(RabbitmqConfig.ERROR_PROFIT_CACULATE_QUEUE, json);}}
}

5、错误消息处理生产者:

@Component
public class ErrorProducer {Logger logger = LoggerFactory.getLogger(TransDetailProducer.class);@AutowiredRabbitTemplate rabbitTemplate;@AutowiredGson gson;public void push(String queueName, String errorJson) {logger.info("###异常消息发送者: queueName=" + queueName + " errorJson=" + errorJson);rabbitTemplate.convertAndSend(RabbitmqConfig.ERROR_EXCHANGE, queueName, errorJson);}public void push(String queueName, Object obj) {String errorJson = gson.toJson(obj);logger.info("###异常消息发送者: queueName=" + queueName + " errorJson=" + errorJson);rabbitTemplate.convertAndSend(RabbitmqConfig.ERROR_EXCHANGE, queueName, errorJson);}}

相关文章:

  • ChatGPT重磅升级 奢侈品VERTU推出双模型AI手机
  • Android 12 S 系统开机流程分析 - SecondStageMain(三)
  • 一封来自未来的offer
  • Vite探索:构建、启程、原理、CSS艺术与插件魔法
  • CSS常用示例100+ 【目录】
  • MyBatis 知识总结
  • 从Docker Hub获取镜像和创建容器
  • 绩效管理系统有哪些?
  • 新生儿奶瓣:原因、科普和注意事项
  • 线性代数-Python-05:矩阵的逆+LU分解
  • 【vue2】vue2 适配pc端,解决浏览器缩放问题,解决电脑显示设置缩放、分辨率问题
  • opencv车牌识别<一>
  • 时钟丢失监控机制
  • 动态规划、回溯搜索、分治算法、分支定界算法
  • 如何判断从本机上传到服务器的文件数据内容是一致的?用md5加密算法!
  • [js高手之路]搞清楚面向对象,必须要理解对象在创建过程中的内存表示
  • [LeetCode] Wiggle Sort
  • 《用数据讲故事》作者Cole N. Knaflic:消除一切无效的图表
  • emacs初体验
  • express.js的介绍及使用
  • Facebook AccountKit 接入的坑点
  • Java反射-动态类加载和重新加载
  • magento 货币换算
  • mysql外键的使用
  • NLPIR语义挖掘平台推动行业大数据应用服务
  • Python_网络编程
  • Shadow DOM 内部构造及如何构建独立组件
  • 诡异!React stopPropagation失灵
  • 欢迎参加第二届中国游戏开发者大会
  • 解析带emoji和链接的聊天系统消息
  • 免费小说阅读小程序
  • 那些年我们用过的显示性能指标
  • 实战:基于Spring Boot快速开发RESTful风格API接口
  • 用Canvas画一棵二叉树
  • CMake 入门1/5:基于阿里云 ECS搭建体验环境
  • 阿里云IoT边缘计算助力企业零改造实现远程运维 ...
  • 关于Kubernetes Dashboard漏洞CVE-2018-18264的修复公告
  • 直播平台建设千万不要忘记流媒体服务器的存在 ...
  • ​LeetCode解法汇总1410. HTML 实体解析器
  • ​学习一下,什么是预包装食品?​
  • # Apache SeaTunnel 究竟是什么?
  • # 数据结构
  • #pragma 指令
  • #预处理和函数的对比以及条件编译
  • (3)nginx 配置(nginx.conf)
  • (42)STM32——LCD显示屏实验笔记
  • (6)设计一个TimeMap
  • (8)Linux使用C语言读取proc/stat等cpu使用数据
  • (day 2)JavaScript学习笔记(基础之变量、常量和注释)
  • (八)c52学习之旅-中断实验
  • (翻译)terry crowley: 写给程序员
  • (理论篇)httpmoudle和httphandler一览
  • (十六)一篇文章学会Java的常用API
  • (中等) HDU 4370 0 or 1,建模+Dijkstra。
  • (转)Android中使用ormlite实现持久化(一)--HelloOrmLite