当前位置: 首页 > news >正文

.Net CoreRabbitMQ消息存储可靠机制

前言

上篇讨论过消息投递和消息消费过程中如何确保可靠传输,也提及到消息到达RabbitMQ中到被消费前也需要可靠的留存,可因许多的不确定因素会影响着消息的存在与否。

消息中转点

生产者发送消息到RabbitMQ中,如果交换机根据自身类型和RoutingKey能够匹配到队列,则存入相关队列,但当匹配不到队列时,遇到两种情况而使得消息走向不同的方向,消息可能会丢失或是发回给生产者,这取决于生产者对消息的配置。

0517938d45650d3fc889d9a1c6956054.png

  • 生产者设置了Mandatory且为true,则消息回退给生产者。

  • 当生产者为设置Mandatory或是设置为false时,为了避免消息丢失,可以由交换机路由给备份交换机负责去搞定存储。
    f838a5af64a07aed468a4f47c261ab59.png

Mandatory

生产者发送消息时,可以设置一个参数mandatory,来决定消息到达RabbitMQ后,如果出现交换机根据自身类型及RoutingKey找不到合适的队列情况下,消息的一个走向。

  • 当mandatory为true时,消息则返回给生产者。
    bcaacd3333485fcc4e94f60824eaa6c3.png

  • 当mandatory为false时,消息则被丢弃。

生产者代码

当在BasicPublish方法参数中设置mandatory为true且队列暂不声明时,仅有一个交换机,消息将会被返回。

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "rabbitmqdemo@test",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var exchangeName = "mandatory_publishsubscribe_exchange";
        channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
        while (true)
        {
            Console.WriteLine("消息内容(exit退出):");
            var message = Console.ReadLine();
            if (message.Trim().ToLower() == "exit")
            {
                break;
            }

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: exchangeName, routingKey: "", mandatory: true, basicProperties: null, body: body);
            Console.WriteLine("消息内容发送完毕:" + message);
        }
    }
}

生产者发送消息,交换机收到消息但无对应队列,消息被返回。
86dbe2b9f84913634db67f68e37eeeb5.png

为了直观的知道消息返回到了生产者,我们可以增加一个监听器,来监听返回的消息。

监听回退消息

当mandatory设置为true,消息回退时可以监听消息

channel.BasicReturn += new EventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs>((sender, e) =>
{
    var message = Encoding.UTF8.GetString(e.Body.ToArray());
    Console.WriteLine($"收到回退消息:{message}");
});

生产者发送消息,因无匹配队列,消息被返回,可以直观的看到返回的消息。
b4413ba55ba048df217ed41fab0b5a36.png

备份交换机

当mandatory设置为false时,消息被丢失了,这种情况可不太好。可以使用备份交换机来存储原要被丢弃的消息,当需要这些消息的时候,还能拿到这些消息。实际上备份交换机没有什么特殊,和主交换机是一样的只是充当备份的角色。
298d774ebf27c6b37e9e8a265d06f47a.png

生产者代码

  1. 在创建主交换机的时候,给定参数argument,设置该主交换机的备份交换机,指定备份交换机名称。

  2. 然后声明备份交换机并绑定一个队列,用于存储被丢弃的消息。

  3. 发送消息时mandatory参数设置为false。

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "rabbitmqdemo@test",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var exchangeName = "aedemo_publishsubscribe_exchange";
        var alternateExchangeName = "aedemo_ae_publishsubscribe_exchange";
        var arguments = new Dictionary<string, object>
        {
            { "alternate-exchange", alternateExchangeName }
        };
        channel.ExchangeDeclare(exchange: exchangeName, type: "fanout", arguments: arguments);
        channel.ExchangeDeclare(exchange: alternateExchangeName, type: "fanout");

        var alternateExchangeQueueName = alternateExchangeName + "_worker";
        channel.QueueDeclare(queue: alternateExchangeQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        channel.QueueBind(queue: alternateExchangeQueueName, exchange: alternateExchangeName, routingKey: "");

        while (true)
        {
            Console.WriteLine("消息内容(exit退出):");
            var message = Console.ReadLine();
            if (message.Trim().ToLower() == "exit")
            {
                break;
            }

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: exchangeName, routingKey: "", mandatory: false, basicProperties: null, body: body);
            Console.WriteLine("消息内容发送完毕:" + message);
        }
    }
}

启动程序,可以从Web面板中看到主交换机和备份交换机都创建完毕,并且主交换机打上了有AE的标记。
2e984ffda13b7c39156c7aefb5ebb533.png

生产者发送消息,经主交换机匹配但无合适队列后,转发给备份交换机,路由到其队列存储。
b1eedabd72666eea2d3b638a72fcee16.png

注:推荐使用Fanout类型的交换机,如果其他比如Direct,当主交换机转发到备份交换机,在进行匹配时候,如果消息给定的RoutingKey没有匹配到相应的队列,消息则会被丢失,这样一来,最初的预想就出现偏差了。

持久化

当RabbitMQ在异常情况下,比如系统宕机、重启、关闭等,可能会导致数据丢失,可靠性降低。针对这种情况,RabbitMQ提供了持久化机制,将消息本身和元数据(队列、交换机、绑定信息)都保存到磁盘中。具体分为三类持久化

  • 交换机持久化

  • 队列持久化

  • 消息持久化

交换机持久化

当RabbitMQ遇到异常情况(如服务重启)后,如果没有设置交换机持久化,那么交换机相关数据则会被丢失,生产者再发送消息到指定交换机时就失败了。

服务重启异常

1、在Web中新建一个交换机,指定非持久化模式。
55bfd6b65bbeed59d2f2a426ae4f0532.png

2、新建一个队列,指定非持久化模式。
f7687baa8a56d322c15567df212691da.png

3、设置交换机和队列的绑定关系。
73971a2b35d1a7f0b49a5a02ebff5025.png

4、生产者前部分正常发送消息,中间经服务重启后,交换机、队列及绑定关系都被清除,生产继续发送消息,出现异常。
420919a29c7f0fd6362bcece18d502d4.png

持久化设置

在声明交换机时可以指定durable参数设置为true(Web面板中也可设置)。

channel.ExchangeDeclare(exchange: exchangeName, type: "fanout", durable: true, arguments: null);

RabbitMQ服务重启,生产者继续发送消息给交换机。
329233e17cc99427500974bce29d70ff.png

队列持久化

队列的持久化是队列声明时设置durable参数为true,如果队列不持久化,异常情况(如服务重启)后,队列元数据丢失,存储在内的消息也就丢失了。

服务重启异常

1、Web中创建一个交换机并设置为持久化模式。
6741d7ee297a04d0fc43f4a60a0f8320.png

2、创建一个队列并设置为非持久化模式
409063821df4bbf75ed40622cf191f8a.png

3、设置交换机和队列的绑定关系。
c26e4bff1b512261f67065dcf07168d6.png

4、生产者前部分正常发送消息,中间经服务重启后,队列及绑定关系被清除,生产继续发送消息,匹配队列失败,消息被回退给生产者。
faa9b755883d2c6dd84257a0ef53c5c7.png

持久化设置

在声明队列时可以指定durable参数设置为true(Web面板中也可设置)。

channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

RabbitMQ服务重启,生产者继续发送消息给交换机。
30b788944e92f9fe76f663a40082d92c.png

消息持久化

队列的持久化仅能保证其自身的数据不丢失,而其存储的消息却不能保证不会丢失。
186e51b76363235b65262897dcf833a6.png

持久化设置

需要对消息消息设置持久化,以确保消息本身不会因异常情况(如服务重启)而丢失。在发送消息时,可以设置消息的基础属性,来支持消息的持久化。

var basicProperties = channel.CreateBasicProperties();
basicProperties.DeliveryMode = 2;// 1非持久化 2持久化channel.BasicPublish(exchange: exchangeName, routingKey: "", mandatory: true, basicProperties: basicProperties, body: body);

如此一来,当异常情况(如服务重启后),消息还是存在的。
46f9032f5834d8c41f5d1ce4c9d8fff6.png

注:消息持久化会影响性能,仅确保有价值的消息持久化,来权衡可靠与吞吐量。

2022-08-25,望技术有成后能回来看见自己的脚步

相关文章:

  • 2022年9月2号学习总结
  • Hive基本概念
  • 有向图的强连通分量
  • 新建SpringBoot Maven项目中pom常用依赖配置及常用的依赖的介绍
  • 想搞清楚API网关到底是什么?请看这篇
  • 【STM32F4系列】【HAL库】电机控制(转速和角度)(PID实战1)
  • 设定目标(1)- 为什么你每天感觉很忙却没什么拿得出手的业绩?
  • Java 进阶集合Set、Map(二)
  • 2022-Docker常用命令
  • Spring中事务传播特性(Propagation)
  • Matlab:Matlab编程语言应用之数学统计(柱状图、曲线分析等)的使用方法简介、案例实现之详细攻略
  • YOLOv7改进之二十五:引入Swin Transformer
  • 03 nginx 是如何自动推导文件的 content-type 的
  • Java 8 Stream 从入门到进阶——像SQL一样玩转集合
  • C++STL详解(五)mapset的使用及其模拟实现
  • C++11: atomic 头文件
  • iOS | NSProxy
  • Java 23种设计模式 之单例模式 7种实现方式
  • java中具有继承关系的类及其对象初始化顺序
  • JS笔记四:作用域、变量(函数)提升
  • laravel 用artisan创建自己的模板
  • PAT A1050
  • python3 使用 asyncio 代替线程
  • Python语法速览与机器学习开发环境搭建
  • Quartz实现数据同步 | 从0开始构建SpringCloud微服务(3)
  • socket.io+express实现聊天室的思考(三)
  • Stream流与Lambda表达式(三) 静态工厂类Collectors
  • windows-nginx-https-本地配置
  • 简单实现一个textarea自适应高度
  • 让你成为前端,后端或全栈开发程序员的进阶指南,一门学到老的技术
  • 算法---两个栈实现一个队列
  • 算法系列——算法入门之递归分而治之思想的实现
  • 腾讯优测优分享 | Android碎片化问题小结——关于闪光灯的那些事儿
  • ​软考-高级-系统架构设计师教程(清华第2版)【第1章-绪论-思维导图】​
  • ​软考-高级-信息系统项目管理师教程 第四版【第19章-配置与变更管理-思维导图】​
  • # 透过事物看本质的能力怎么培养?
  • #14vue3生成表单并跳转到外部地址的方式
  • #经典论文 异质山坡的物理模型 2 有效导水率
  • #周末课堂# 【Linux + JVM + Mysql高级性能优化班】(火热报名中~~~)
  • (1)(1.9) MSP (version 4.2)
  • (2)(2.10) LTM telemetry
  • (附源码)spring boot基于Java的电影院售票与管理系统毕业设计 011449
  • (附源码)计算机毕业设计高校学生选课系统
  • (九十四)函数和二维数组
  • (循环依赖问题)学习spring的第九天
  • (已解决)vue+element-ui实现个人中心,仿照原神
  • (转)Groupon前传:从10个月的失败作品修改,1个月找到成功
  • .net core使用ef 6
  • .net framework profiles /.net framework 配置
  • .NET 反射 Reflect
  • .NET开发不可不知、不可不用的辅助类(一)
  • /*在DataTable中更新、删除数据*/
  • /etc/fstab 只读无法修改的解决办法
  • /etc/skel 目录作用
  • @Responsebody与@RequestBody