当前位置: 首页 > news >正文

一篇内容带你了解Rabbitmq

Rabbitmq

  • 1.引言
    • 1.1什么是rabbitmq
    • 1.2消息队列的基本概念
  • 2.RabbitMQ 基础
    • 2.1安装
    • 2.2"holle world"示例
  • 3.RabbitMQ 高级特性
    • 3.1Exchange 类型
      • direct
      • topic
      • fanout
    • 3.2消息确认机制
    • 3.3消息优先级
    • 3.4死信队列(DLQ)
  • 4.参考资料

1.引言

1.1什么是rabbitmq

RabbitMQ 是一个消息代理,它可以接受并转发消息。RabbitMQ集接受,存储转发为一体。TODO…

1.2消息队列的基本概念

可以把消息队列看为一个消息容器,我们可以按需生产消息向容器中发送或者从容器中消费消息,尽管这听起来想“一个人”完成的操作,但实际生产和消费的角色并不相同,而是两个程序,并且这两个程序往往不在同一机器上。

生产消息的一方我们通常称为“生产者”:
在这里插入图片描述

消费消息的一方通常称为“消费者”:
在这里插入图片描述

2.RabbitMQ 基础

2.1安装

我们将使用 Pika 1.0.0,它是 RabbitMQ 团队推荐的 Python 客户端。 要安装它,您可以使用 pip 软件包管理工具:

python -m pip install pika --upgrade

2.2"holle world"示例

我们将写两个简单的代码,一个作为生产者发送“holle world”字符串,一个作为消费者从队列接受字符串,并在屏幕上打印出来。

首先先来编写我们的生产者”sending.py

#!/usr/bin/env python
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

我们这里是连接的本地的服务,如果想连接其他机器上的服务只需要更改“localhost”即可。
有了生产者(后续统一用P表示)之后我们就需要有一个队列来作为和消费者(后续统一用C表示)之间通信的媒介。

channel.queue_declare(queue='hello')

如果P向一个不存在的队列发送消息,那么该消息就会被丢弃。

需要注意的是Rabbitmq不会直接向队列发送消息,而是将消息发送给exchange(交换机),由exchange来通过绑定的规则向一个或多个队列发送消息。这部分的内容在3.1会有更多介绍。
在这里插入图片描述

我们现在只需要知道我们需要向默认的exchange发送消息即可,默认的exchange用“”字符串来表示。

channel.basic_publish(exchange='',         #交换机类型,这里采用默认的使用空字符串表示routing_key='hello', # 队列名body='Hello World!') #消息内容
print(" [x] Sent 'Hello World!'")connection.close()  #关闭连接

现在我们就可以向hello队列发送消息了,下面我们就来完成C的代码“receiving”。

同样的我们需要先和Rabbitmq取得连接

 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='hello')

这里我们再一次创建了hello队列,不用担心调用多次queue_declare函数只会创建一个同名的队列,这样的好处就是当我们无法确定是sending.py在receiving.py之前运行的时候代码也不会报错,这时重复的创建队列就是一个好的方法,也推荐在实际应用中这样做。

接下来就是为receiving写一个回调函数(callback),该函数的作用就是在C取得队列中的消息之后,用过callback处理,我们这里比较简单就是打印接收的消息。

def callback(ch, method, properties, body):print(f" [x] Received {body}")

接下来就需要告诉这个callback函数从那个队列去接收消息。

channel.basic_consume(queue='hello',auto_ack=True,on_message_callback=callback)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

对于auto_ack=True参数,其实是Rabbitmq的确认应答机制,Rabbitmq需要知道C取得的消息是否被正确的处理了,就是通过该参数来控制,后序在3.2会有更多的介绍。

整合起来如下:
sending.py

#!/usr/bin/env python
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

receiving

#!/usr/bin/env python
import pika, sys, osdef main():connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='hello')def callback(ch, method, properties, body):print(f" [x] Received {body}")channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()if __name__ == '__main__':try:main()except KeyboardInterrupt:  #通过ctrl-c来中断程序print('Interrupted')try:sys.exit(0)except SystemExit:os._exit(0)

可以开启两个终端分别运行两个文件来查看效果。

3.RabbitMQ 高级特性

3.1Exchange 类型

通过前面的学习我们已经对RabbitMQ的基本工作流程有了一些认识,接下来就来学习更多高级内容。

前面在2.2小节也提到过,P不直接向队列发送消息,而是向exchange发送消息,exchange通过不同的绑定规则绑定到不同的队列中,不同的绑定规则就对应了不同类型的exchange,RabbitMQ一同有四种,分别是

direct,
topic,
headers,# 本文不做介绍,后续可能会更新
fanout,

接下来分别介绍这三种exchange的特性:

direct

只有设置的RoutingKey和BindingKey完全匹配的时候,消息队列才可以获取消息。
在这里插入图片描述
如图所示,info类型的消息只会发送到第一个队列中去,warn类型的消息只会发送到第二个队列中去…。
代码中
sending.py

#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='direct_logs', exchange_type='direct')RoutingKey="info"
channel.basic_publish(exchange='direct_logs', routing_key=RoutingKey, body=message)
print(f" [x] Sent {severity}:{message}")
connection.close()

receiving.py

#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='direct_logs', exchange_type='direct')#这里我们没有给出队列的具体名,而是通过空字符串和exclusive=True参数来生成一个唯一的队列。
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue #获取唯一队列名BindingKey="info"
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=BindingKey)

topic

topic和direct较为相似,不过它更较为复杂。它使用通配符来绑定特定的队列。

*:可以完全替代一个单词。
#:可以替代零个或多个单词。
在这里插入图片描述
如上图所示:
第一个队列将匹配任意三个单词,其中第二个必须为orange。
而第二个队列有两个通配符规则第一个是匹配任意一个以lazy开头的单词组合(或者lazy,因为#也可以为0)。
第二个是可以匹配三个单词组合,但是第三个必须为rabbit。

如果没有一个可以匹配到对应的队列中,那么该消息将被丢弃。

sending.py

#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_logs', exchange_type='topic')channel.basic_publish(exchange='topic_logs', routing_key="dag.orange.cat", body="hello world")
print(f" [x] Sent {routing_key}:{message}")
connection.close()

receiving.py

#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_logs', exchange_type='topic')result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queuechannel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key="*.orange.*")def callback(ch, method, properties, body):print(f" [x] {method.routing_key}:{body}")channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

fanout

fanout就很简单了,它会向它知道的所有队列广播发送消息。
在这里插入图片描述

sending.py

#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='logs', exchange_type='fanout')channel.basic_publish(exchange='logs', routing_key='', body="hello world")
print(f" [x] Sent {message}")
connection.close()

receiving.py

#!/usr/bin/env python
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='logs', exchange_type='fanout')result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue# 将队列和exchange进行绑定,多次调用queue_bind即可将多个不同的队列绑定到同一个exchange上,具体策略需根据需求而定
channel.queue_bind(exchange='logs', queue=queue_name)def callback(ch, method, properties, body):print(f" [x] {body}")channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

注意在receiving中我们并没有对队列名进行定义,而是使用空字符串和exclusive=True让Rabbitmq来生成唯一的一个队列。这一点我们前面也有提及。

3.2消息确认机制

这一节我们将继续学习在2.2中提高的消息确认机制也就是auto_ack=True参数。

在正式介绍前我们先要讨论一种场景,当我们有多个消费者并且这些消费者都从同一个队列中拿取消息,那么这时Rabbitmq就会有一个分配的问题,首先你可能想以轮询的方式每个C取得一个,那么就会引发这样的一个问题,其中一个C在处理某个耗时消息(比如I/O密集型)的时候,另外一个C一直能很快的处理完消息,这就会造成一些某个C的“压力”越来越大,而其他C却大部分时间都处于空闲的状态。
在这里插入图片描述
另外如果图中第一个C在处理1消息的时候,被意外终止了,那么该消息就会丢失,因为消息一旦从队列中取走就会被”标记“为删除,如果你不希望这样的事发生就需要通过auto_ack=False来开启任务的确认应答机制,这样在任务的意外中断时Rabbitmq就会将该消息分配给另外的C去处理。

上述解决了消息在被消费时意外丢失的问题,可是如果Rabbitmq如果意外中断了那么所有的消息就会全部丢失,为了解决这种问题我们还需要在创建队列的时候设置一个参数,那就是exclusive=True。该参数的意义在于将消息持久化写入磁盘中。

channel.queue_declare(queue='task_queue', durable=True)

除了设置auto_ack=False来解决耗时问题(即负载不均衡)外,Rabbitmq还提供另外两种机制。

#消息预取(Prefetch):
#RabbitMQ 提供了 basic.qos 方法,可以用来设置每个消费者在处理完前一个消息之前最多能接收多少个未确认的消息。这可以防止某个消费者一次性接收过多的消息,从而导致负载不均衡。
channel.basic_qos(prefetch_count=1)
#通过设置 prefetch_count=1,RabbitMQ 会确保每个消费者在处理完并确认前一个消息之前不会接收新的消息。这有助于更均匀地分配消息,避免某个消费者过载。
#手动消息确认(Manual Acknowledgment):
#使用手动消息确认机制,消费者在处理完消息后显式地向 RabbitMQ 发送确认。这可以确保消息在被成功处理之前不会被RabbitMQ 从队列中删除。
def callback(ch, method, properties, body):# 处理消息print(f"Received {body}")# 手动确认消息ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_consume(queue='your_queue', on_message_callback=callback, auto_ack=False)

3.3消息优先级

如果你想某个消息应该较早的被C消费,那么就可以通过设置优先级来实现。

优先级范围:
RabbitMQ 支持的优先级范围是 0 到 255,其中 0 是最低优先级,255 是最高优先级。你可以在声明队列时指定最大优先级。

要实现优先级首先需要在创建队列的时候名声该队列为优先级队列,我们通过arguments参数来设置。
sending.py

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()# 声明一个优先级队列,最大优先级为 10
args = {'x-max-priority': 10}
channel.queue_declare(queue='priority_queue', arguments=args)# 发送消息
channel.basic_publish(exchange='',routing_key='priority_queue',body='Low priority message',properties=pika.BasicProperties(priority=1)  # 设置消息优先级为 1
)channel.basic_publish(exchange='',routing_key='priority_queue',body='High priority message',properties=pika.BasicProperties(priority=10)  # 设置消息优先级为 10
)print("Messages sent")
connection.close()

receiving.py
消费者代码不需要做任何特殊处理,RabbitMQ 会根据消息的优先级自动排序并分发消息。

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()def callback(ch, method, properties, body):print(f"Received {body}")# 消费消息
channel.basic_consume(queue='priority_queue', on_message_callback=callback, auto_ack=True)print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

3.4死信队列(DLQ)

RabbitMQ 的死信队列(Dead Letter Queue, DLQ)是一个用于处理无法被正常消费的消息的机制。
无法被正常消费的消息有以下几种情况。

1.消息被拒绝(Rejected):
消费者显式地拒绝消息,并且不要求重新入队(requeue)。
2.消息过期(TTL, Time-To-Live):
消息在队列中存活的时间超过了设置的 TTL。
3.队列长度限制(Max Length):
队列达到最大长度,新的消息无法入队。

以上几种类型的消息都会被放到私信队列中去。
在这里插入图片描述
要使用死信队列,你需要在声明队列时设置一些参数,包括 x-dead-letter-exchange 和(可选的)x-dead-letter-routing-key。
sending.py

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()# 声明死信交换机
channel.exchange_declare(exchange='dlx_exchange', exchange_type='direct')# 声明死信队列
channel.queue_declare(queue='dlx_queue')# 将死信队列绑定到死信交换机
channel.queue_bind(exchange='dlx_exchange', queue='dlx_queue', routing_key='dlx_routing_key')# 声明原始队列,并设置死信交换机和路由键
args = {'x-dead-letter-exchange': 'dlx_exchange','x-dead-letter-routing-key': 'dlx_routing_key'
}
channel.queue_declare(queue='original_queue', arguments=args)# 发送消息到原始队列
channel.basic_publish(exchange='',routing_key='original_queue',body='Test Message'
)
print("Message sent to original queue")
connection.close()

receiving.py

import pika
import threadingconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()# 消费原始队列中的消息并触发死信机制
def original_callback(ch, method, properties, body):print(f"Received from original queue: {body}")# 拒绝消息并不重新入队ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)channel.basic_consume(queue='original_queue', on_message_callback=original_callback, auto_ack=False)# 启动一个新的线程来消费死信队列中的消息
def consume_dlx():dlx_connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))dlx_channel = dlx_connection.channel()def dlx_callback(ch, method, properties, body):print(f"DLX Received: {body}")ch.basic_ack(delivery_tag=method.delivery_tag)dlx_channel.basic_consume(queue='dlx_queue', on_message_callback=dlx_callback, auto_ack=False)dlx_channel.start_consuming()dlx_thread = threading.Thread(target=consume_dlx)
dlx_thread.start()# 开始消费原始队列中的消息
channel.start_consuming()

4.参考资料

官网:Rabbitmq官网(需要科学上网)

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • [Leetcode 105][Medium] 从前序与中序遍历序列构造二叉树-递归
  • 接口如何设计
  • 2-76 基于matlab的加权平均融合算法
  • sheng的学习笔记-AI-半监督学习
  • Kubernetes中的Kube-proxy:服务发现与负载均衡的基石
  • Java—双列集合
  • 数据库管理-第234期 2024DTCC,一场数据库盛宴(20240826)
  • debian12 - systemctl 根据状态值判断服务启动成功的依据
  • 机器学习第五十三周周报 MAG
  • 云手机解决了TikTok哪些账号运营难题?
  • 将标准输入stdin转换成命令行参数——Unix中的xargs指令
  • 手机快充头哪个牌子好?倍思65W伸缩线充电器交出优秀答卷
  • SQL注入-SQL注入基础-SQL注入流程
  • uniapp 向左滑动进入下一题,向右滑动进入上一题功能实现
  • 告警中心消息转发系统PrometheusAlert
  • php的引用
  • css系列之关于字体的事
  • docker-consul
  • hadoop入门学习教程--DKHadoop完整安装步骤
  • HTTP传输编码增加了传输量,只为解决这一个问题 | 实用 HTTP
  • java B2B2C 源码多租户电子商城系统-Kafka基本使用介绍
  • React as a UI Runtime(五、列表)
  • React+TypeScript入门
  • RedisSerializer之JdkSerializationRedisSerializer分析
  • SAP云平台里Global Account和Sub Account的关系
  • spring cloud gateway 源码解析(4)跨域问题处理
  • 让你成为前端,后端或全栈开发程序员的进阶指南,一门学到老的技术
  • #调用传感器数据_Flink使用函数之监控传感器温度上升提醒
  • (16)Reactor的测试——响应式Spring的道法术器
  • (4)Elastix图像配准:3D图像
  • (arch)linux 转换文件编码格式
  • (C++)栈的链式存储结构(出栈、入栈、判空、遍历、销毁)(数据结构与算法)
  • (动态规划)5. 最长回文子串 java解决
  • (读书笔记)Javascript高级程序设计---ECMAScript基础
  • (附源码)spring boot智能服药提醒app 毕业设计 102151
  • (附源码)计算机毕业设计高校学生选课系统
  • (力扣)1314.矩阵区域和
  • (七)glDrawArry绘制
  • (算法)前K大的和
  • (贪心) LeetCode 45. 跳跃游戏 II
  • (学习日记)2024.03.12:UCOSIII第十四节:时基列表
  • (一)C语言之入门:使用Visual Studio Community 2022运行hello world
  • (一)模式识别——基于SVM的道路分割实验(附资源)
  • (转)Sublime Text3配置Lua运行环境
  • (转载)跟我一起学习VIM - The Life Changing Editor
  • (轉貼) 2008 Altera 亞洲創新大賽 台灣學生成果傲視全球 [照片花絮] (SOC) (News)
  • .net 4.0 A potentially dangerous Request.Form value was detected from the client 的解决方案
  • .NET MVC、 WebAPI、 WebService【ws】、NVVM、WCF、Remoting
  • .NET 线程 Thread 进程 Process、线程池 pool、Invoke、begininvoke、异步回调
  • .Net(C#)常用转换byte转uint32、byte转float等
  • .NET/C# 利用 Walterlv.WeakEvents 高性能地中转一个自定义的弱事件(可让任意 CLR 事件成为弱事件)
  • .net利用SQLBulkCopy进行数据库之间的大批量数据传递
  • .NET连接数据库方式
  • .xml 下拉列表_RecyclerView嵌套recyclerview实现二级下拉列表,包含自定义IOS对话框...
  • @KafkaListener注解详解(一)| 常用参数详解