当前位置: 首页 > news >正文

RabbitMQ 发布订阅

 

互联网公司对消息队列是深度使用者,因此需要我们了解消息队列的方方面面,良好的设计及深入的理解,更有利于我们对消息队列的规划。

当前我们使用消息队列中发现一些问题

1、实际上是异步无返回远程调用,由发布者定义队列,消费者订阅已定义的队列。

2、并没有体现解耦设计,而且开发人员间依然要像单体项目开发那样针对同一个功能不断沟通交互,提高了开发时间以及成本。

3、没有消息版本的实现,导致发布者服务和消费者服务必须一起更新。如果没有保持一致可能导致批量的合法消息被丢到死信队列,甚至可能要启动旧服务将旧版本的消息消费掉才能更新服务。并且开发人员要进入发布流程指导各服务的发布顺序。

4、订阅者的对象定义在“集群”,但现实中的确存在“节点”的订阅需求,如节点的配置更新、本地缓存的刷新等。

快速、高效的使用消息队列,不需要过多的沟通成本,是我们不断的追求。因此发布订阅映入我们眼帘,公司内部称为分布式事件。

RabbitMQ的发布订阅能解决我们的什么问题呢?

  1. 不需要发送端和接收端同时发布。相比较普通的队列方式,如果发送端发布,而消费端未发布,那么就会有大量的消息积压。
  2. 实现一次发布,多次处理。

RabbitMQ发布订阅模式:像使用邮箱一样,不需要发送端和接收端共同发布。

发布订阅基础概念:

Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息:

  1. fanout:所有bind到此exchange的queue都可以接收消息
  2. direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息
  3. topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
  4. headers:通过headers 来决定把消息发给哪些queue(这个很少用)

fanout订阅发布模式(广播模式)

direct订阅发布模式(广播模式)

 

topic定义发布模式(广播模式)

实现发布订阅模式下消息的发布订阅主要有以下几个步骤:

  1. 创建消息会话IMQSession(前提连接IMQConnection已经建立好了)
  2. 声明要订阅的消息主题
  3. 声明消息主题订阅者
  4. 声明消息发送者
  5. 发送消息
  6. 主题订阅者接收消息
  7. 关闭主题订阅者
  8. 关闭会话

生产者发送广播是实时的,消费者需要提前等待生产者发生消息,这个又叫订阅发布,收音机模式,就像只有收音机打开了才能听到锁定的FM频道,但是如果在节目开始一段时间,再打开收音机的话,之前的节目就收听不到了。即订阅之前的消息都是收不到的。

发布订阅相关的执行命令:

rabbitmqctl list_exchanges  列出所有exchange

临时队列:

    我们需要每次连接至mq的时候使用一个新队列,使用完了就销毁,这里可使用临时队列:

result = channel.queue_declare()

   然后就可以通过result.method.queue获取临时队列名称,提供给消费者使用。 另外消费者用完后需要销毁,可添加一个exclusive选项:result = channel.queue_declare(exclusive=True)  代表该队列是排他性队列。

绑定:Binding

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

查看系统所有的绑定命令

rabbitmqctl list_binding

发布订阅相关的概念主要包括exchange、binding、routingkey、及queue。我们只需要按照步骤,使用合适的交换器类型,即可实现发布订阅的一对多消息处理。

 

相关文章:

  • json对象 按key排序
  • 蜂鸟运单系统架构及实现
  • .NET Core实战项目之CMS 第十二章 开发篇-Dapper封装CURD及仓储代码生成器实现
  • PythonR量化 金融之路
  • 第二十章:异步和文件I/O.(二十二)
  • 2018年度总结
  • 第二十章:异步和文件I/O.(二十三)
  • 使用.Net Core+IView+Vue集成上传图片功能
  • 三分钟教你同步 Visual Studio Code 设置
  • 《快速阅读术》
  • 容器中 Java 应用程序的内存和 CPU 如何分配?看这一篇就够了!
  • 北斗三号系列核心芯片
  • Hive日期函数笔记
  • 21-Python与设计模式--备忘录模式
  • mysql-connector-java 6版本的jdbc连接问题
  • [rust! #004] [译] Rust 的内置 Traits, 使用场景, 方式, 和原因
  • “大数据应用场景”之隔壁老王(连载四)
  • Consul Config 使用Git做版本控制的实现
  • Linux各目录及每个目录的详细介绍
  • MySQL的数据类型
  • rabbitmq延迟消息示例
  • SegmentFault 2015 Top Rank
  • 大型网站性能监测、分析与优化常见问题QA
  • 每个JavaScript开发人员应阅读的书【1】 - JavaScript: The Good Parts
  • 学习笔记TF060:图像语音结合,看图说话
  • 异步
  • 异常机制详解
  • 原生 js 实现移动端 Touch 滑动反弹
  • 中文输入法与React文本输入框的问题与解决方案
  • 阿里云服务器如何修改远程端口?
  • 策略 : 一文教你成为人工智能(AI)领域专家
  • #13 yum、编译安装与sed命令的使用
  • #HarmonyOS:软件安装window和mac预览Hello World
  • ( 10 )MySQL中的外键
  • (26)4.7 字符函数和字符串函数
  • (3)llvm ir转换过程
  • (8)Linux使用C语言读取proc/stat等cpu使用数据
  • (ctrl.obj) : error LNK2038: 检测到“RuntimeLibrary”的不匹配项: 值“MDd_DynamicDebug”不匹配值“
  • (java)关于Thread的挂起和恢复
  • (python)数据结构---字典
  • (Pytorch框架)神经网络输出维度调试,做出我们自己的网络来!!(详细教程~)
  • (附源码)php新闻发布平台 毕业设计 141646
  • (更新)A股上市公司华证ESG评级得分稳健性校验ESG得分年均值中位数(2009-2023年.12)
  • (十一)手动添加用户和文件的特殊权限
  • (完整代码)R语言中利用SVM-RFE机器学习算法筛选关键因子
  • (转) Android中ViewStub组件使用
  • (转)Oracle 9i 数据库设计指引全集(1)
  • .gitignore文件_Git:.gitignore
  • .net 4.0 A potentially dangerous Request.Form value was detected from the client 的解决方案
  • .NET CF命令行调试器MDbg入门(一)
  • .NET 跨平台图形库 SkiaSharp 基础应用
  • .NET/C# 获取一个正在运行的进程的命令行参数
  • .NET和.COM和.CN域名区别
  • .net与java建立WebService再互相调用
  • @ 代码随想录算法训练营第8周(C语言)|Day53(动态规划)