当前位置: 首页 > news >正文

RabbitMq消息队列

文章目录

    • 前言
    • 一、RabbitMq核心概念
    • 二、RabbitMq应用场景
    • 三、RabbitMq微服务落地
    • 四、RabbitMq核心功能落地
    • 五、RabbitMq交换机落地
    • 六、RabbitMq-RPC回调落地

前言

  • MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ等等…
  • 相关对象介绍
    • ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。
      • Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。
      • ConnectionFactory为Connection的制造工厂。
      • Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

一、RabbitMq核心概念

  • 概念
    简单的说就是消息队列;
    • 消息队列

      • 消息
        消息是对数据库的数据实现增删改查的操作;如图:
        在这里插入图片描述

      • 队列
        队列是一端进数据,一端出数据;如图:
        在这里插入图片描述

      消息队列就是一端进消息,一端出消息;如图:
      在这里插入图片描述

二、RabbitMq应用场景

  • 应用场景
    在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。

三、RabbitMq微服务落地

  • 条件
    • Demo项目
    • RabbitMq
      下载地址:
      https://download.csdn.net/download/Fu_Shi_rong/86399985
    • RabbitMq.Client
  • 步骤
    • 运行RabbitMq命令

          #安装插件
          rabbitmq-plugins enable rabbitmq_management
          #进入rabbitmq_server-3.9.13\sbin目录中
          rabbitmq-server.bat
      

      运行结果如下:
      在这里插入图片描述

      浏览器输入地址:http://localhost:15672,用户名:guest;密码:guest;运行结果如图:
      在这里插入图片描述

    • Demo 项目代码

      • nuget 安装包 [生产者与消费者都必须安装]

          RabbitMQ.Client
        
      • 生产者

              //建立连接
              var factory = new ConnectionFactory() {
                  HostName = "127.0.0.1",
                  Port = 5672,
                  UserName = "guest",
                  Password = "guest",
                  VirtualHost="/"
              };
              using var connect = factory.CreateConnection();
              var channel = connect.CreateModel();
              //定义队列 
              channel.QueueDeclare(
                  "testQueue",//队列名称
                  false,//消息的持久化   第一种方式
                  false,
                  false,
                  null);
              //消息数据
              Stutent stu = new Stutent()
              {
                  name = "小刚",
                  sex = "男"
              };
              string json = JsonConvert.SerializeObject(stu);
              var body = Encoding.UTF8.GetBytes(json);
              //发送消息
              var properties = channel.CreateBasicProperties();
              properties.Persistent = true;//第二种方式设置消息持久化
              channel.BasicPublish(
                  "",   //交换机名称
                  "testQueue",//routingKey  :指定队列名称
                  properties,
                  body);
              Console.WriteLine("--------------------消息已发送!---------------------");
        

        运行结果如下:
        在这里插入图片描述
        在这里插入图片描述

      • 消费者 [手动确认+消息持久化机制]

              #1、如果是Web项目,可以使用类继承IHostService接口的方式实现,然后将消费的代码放到StartAsync函数中即可。
              #2、然后再Startup类中的ConfigureServices函数中注册这个类 [services.AddHostService<类名>()]
              Console.WriteLine("-------------消息开始消费---------------");
              //建立连接
              var factory = new ConnectionFactory()
              {
                  HostName = "127.0.0.1",
                  Port = 5672,
                  UserName = "guest",
                  Password = "guest",
                  VirtualHost = "/"
              };
              //建立连接
              using var connect = factory.CreateConnection();
              var channel = connect.CreateModel();
              //定义队列 
              channel.QueueDeclare(
                  "testQueue",//队列名称
                  true,//消息的持久化
                  false,
                  false,
                  null);
              var consumer = new EventingBasicConsumer(channel);
              consumer.Received += (model, ea) => {
                  Console.WriteLine($"{model}");
                  var bytes = ea.Body;
                  var result = Encoding.UTF8.GetString(bytes.ToArray());
                  //配置手动确认
                  channel.BasicAck(ea.DeliveryTag,true);
                  Console.WriteLine($"{result}");
              };
              //Qos质量检测
              channel.BasicQos(0,1,false);
              channel.BasicConsume(
              "testQueue",//队列名称
              false, //消息手动确认
              consumer);
              Console.WriteLine("-------------消息消费结束 ---------------");   
        

        运行代码如下:
        在这里插入图片描述

四、RabbitMq核心功能落地

  • 消息确认 ACK
    • 场景
      当rabbitMq发送消息到服务器,服务器宕机了,导致消息丢失。要使用RabbitMq的消息应答机制。
    • 生产者代码
              //建立连接
              var factory = new ConnectionFactory() {
                  HostName = "127.0.0.1",
                  Port = 5672,
                  UserName = "guest",
                  Password = "guest",
                  VirtualHost="/"
              };
              using var connect = factory.CreateConnection();
              var channel = connect.CreateModel();
              //定义队列 
              channel.QueueDeclare(
                  "testQueue",//队列名称
                  false,//消息的持久化
                  false,
                  false,
                  null);
              //消息数据
              Stutent stu = new Stutent()
              {
                  name = "小刚",
                  sex = "男"
              };
              string json = JsonConvert.SerializeObject(stu);
              var body = Encoding.UTF8.GetBytes(json);
              //发送消息
              var properties = channel.CreateBasicProperties();
              properties.Persistent = true;//设置消息持久化
              channel.BasicPublish(
                  "", 
                  "testQueue",
                  properties,
                  body);
              Console.WriteLine("--------------------消息已发送!---------------------");
      
    • 消费者代码
      • 代码:
          channel.BasicConsume(
                               "testQueue",
                               true,  //备注:true:自动消息确认[推荐]  false[消息堆积,启动应用服务后会出现重复消费的现象]
                               consumer
                               );
        
      • 示例代码:
              #1、如果是Web项目,可以使用类继承IHostService接口的方式实现,然后将消费的代码放到StartAsync函数中即可。
              #2、然后再Startup类中的ConfigureServices函数中注册这个类 [services.AddHostService<类名>()]
              Console.WriteLine("-------------消息开始消费---------------");
              //建立连接
              var factory = new ConnectionFactory()
              {
                  HostName = "127.0.0.1",
                  Port = 5672,
                  UserName = "guest",
                  Password = "guest",
                  VirtualHost = "/"
              };
              //建立连接
              using var connect = factory.CreateConnection();
              var channel = connect.CreateModel();
              //定义队列 
              channel.QueueDeclare(
                  "testQueue",//队列名称
                  true,//消息的持久化
                  false,
                  false,
                  null);
              var consumer = new EventingBasicConsumer(channel);
              consumer.Received += (model, ea) => {
                  Console.WriteLine($"{model}");
                  var bytes = ea.Body;
                  var result = Encoding.UTF8.GetString(bytes.ToArray());
                  Console.WriteLine($"{result}");
              };
              channel.BasicConsume(
                                    "testQueue",
                                     true,  //备注:true:自动消息确认  
                                     consumer);
              Console.WriteLine("-------------消息消费结束 ---------------");   
        
  • 消息持久化
    • 场景
      生产者发送消息到RabbitMq服务器,RabbitMq服务器突然宕机了,导致消费者无法消费;这就需要用到消息持久化机制,将消息存储到磁盘中,启动RabbitMq服务器后再重新加载消息,让消费者重新消费。
    • 生产者代码
              //建立连接
              var factory = new ConnectionFactory() {
                  HostName = "127.0.0.1",
                  Port = 5672,
                  UserName = "guest",
                  Password = "guest",
                  VirtualHost="/"
              };
              using var connect = factory.CreateConnection();
              var channel = connect.CreateModel();
              //定义队列 
              channel.QueueDeclare(
                  "testQueue",//队列名称
                  false,//消息的持久化
                  false,
                  false,
                  null);
              //消息数据
              Stutent stu = new Stutent()
              {
                  name = "小刚",
                  sex = "男"
              };
              string json = JsonConvert.SerializeObject(stu);
              var body = Encoding.UTF8.GetBytes(json);
              //发送消息
              var properties = channel.CreateBasicProperties();
              properties.Persistent = true;//设置消息持久化
              channel.BasicPublish(
                  "", 
                  "testQueue",
                  properties,
                  body);
              Console.WriteLine("--------------------消息已发送!---------------------");
      
    • 消费者代码
      • 代码
         //定义队列 
              channel.QueueDeclare(
                  "testQueue",
                  true,//消息的持久化  ture:消息持久化;false:不用消息持久化
                  false,
                  false,
                  null);
        
      • 示例代码
              #1、如果是Web项目,可以使用类继承IHostService接口的方式实现,然后将消费的代码放到StartAsync函数中即可。
              #2、然后再Startup类中的ConfigureServices函数中注册这个类 [services.AddHostService<类名>()]
              Console.WriteLine("-------------消息开始消费---------------");
              //建立连接
              var factory = new ConnectionFactory()
              {
                  HostName = "127.0.0.1",
                  Port = 5672,
                  UserName = "guest",
                  Password = "guest",
                  VirtualHost = "/"
              };
              //建立连接
              using var connect = factory.CreateConnection();
              var channel = connect.CreateModel();
              //定义队列 
              channel.QueueDeclare(
                  "testQueue",
                  true,//消息的持久化
                  false,
                  false,
                  null);
              var consumer = new EventingBasicConsumer(channel);
              consumer.Received += (model, ea) => {
                  Console.WriteLine($"{model}");
                  var bytes = ea.Body;
                  var result = Encoding.UTF8.GetString(bytes.ToArray());
                  Console.WriteLine($"{result}");
              };
              channel.BasicConsume(
                                    "testQueue",
                                     true,  //备注:true:自动消息确认  
                                     consumer);
              Console.WriteLine("-------------消息消费结束 ---------------");   
        
  • 手动确认 [推荐]
    • 场景
      当RabbitMq给应用服务发送了消息,应用服务也收到了消息,同时应用服务讲确认消息给RabbitMq期间,执行的业务逻辑代码失败了;
    • 生产者代码
              //建立连接
              var factory = new ConnectionFactory() {
                  HostName = "127.0.0.1",
                  Port = 5672,
                  UserName = "guest",
                  Password = "guest",
                  VirtualHost="/"
              };
              using var connect = factory.CreateConnection();
              var channel = connect.CreateModel();
              //定义队列 
              channel.QueueDeclare(
                  "testQueue",//队列名称
                  false,//消息的持久化
                  false,
                  false,
                  null);
              //消息数据
              Stutent stu = new Stutent()
              {
                  name = "小刚",
                  sex = "男"
              };
              string json = JsonConvert.SerializeObject(stu);
              var body = Encoding.UTF8.GetBytes(json);
              //发送消息
              var properties = channel.CreateBasicProperties();
              properties.Persistent = true;//设置消息持久化
              channel.BasicPublish(
                  "", 
                  "testQueue",
                  properties,
                  body);
              Console.WriteLine("--------------------消息已发送!---------------------");
      
    • 消费者代码
      • 代码
            consumer.Received += (model, ea) => {
                  .....
                  //添加手动确认配置
                  channel.BasicAck(ea.DeliveryTag,true);
              };
              channel.BasicConsume(
                                    "testQueue",
                                     false, //手动确认
                                     consumer);
        
      • 示例代码
              #1、如果是Web项目,可以使用类继承IHostService接口的方式实现,然后将消费的代码放到StartAsync函数中即可。
              #2、然后再Startup类中的ConfigureServices函数中注册这个类 [services.AddHostService<类名>()]
              Console.WriteLine("-------------消息开始消费---------------");
              //建立连接
              var factory = new ConnectionFactory()
              {
                  HostName = "127.0.0.1",
                  Port = 5672,
                  UserName = "guest",
                  Password = "guest",
                  VirtualHost = "/"
              };
              //建立连接
              using var connect = factory.CreateConnection();
              var channel = connect.CreateModel();
              //定义队列 
              channel.QueueDeclare(
                  "testQueue",
                  true,//消息的持久化
                  false,
                  false,
                  null);
              var consumer = new EventingBasicConsumer(channel);
              consumer.Received += (model, ea) => {
                  Console.WriteLine($"{model}");
                  var bytes = ea.Body;
                  var result = Encoding.UTF8.GetString(bytes.ToArray());
                  Console.WriteLine($"{result}");
                  //配置手动确认
                  channel.BasicAck(ea.DeliveryTag,true);
              };
              channel.BasicConsume(
                                    "testQueue",
                                     false, //手动确认
                                     consumer);
              Console.WriteLine("-------------消息消费结束 ---------------");   
        
  • QOS质量检测
    消息分发到实例之前,查看当前实例是否处理完消息,如果没有处理完则分发给其他的实例。
    • 场景
      消息堆积:应用服务来不及处理,导致的堆积。
    • 解决方案
      使用QOS质量检测。
      使用应用集群的方式来解决。
    • 实现
      • 生产者代码
              //建立连接
              var factory = new ConnectionFactory() {
                  HostName = "127.0.0.1",
                  Port = 5672,
                  UserName = "guest",
                  Password = "guest",
                  VirtualHost="/"
              };
              using var connect = factory.CreateConnection();
              var channel = connect.CreateModel();
              //定义队列 
              channel.QueueDeclare(
                  "testQueue",//队列名称
                  false,//消息的持久化   第一种方式
                  false,
                  false,
                  null);
              //消息数据
              Stutent stu = new Stutent()
              {
                  name = "小刚",
                  sex = "男"
              };
              string json = JsonConvert.SerializeObject(stu);
              var body = Encoding.UTF8.GetBytes(json);
              //发送消息
              var properties = channel.CreateBasicProperties();
              properties.Persistent = true;//第二种方式设置消息持久化
              channel.BasicPublish(
                  "", 
                  "testQueue",
                  properties,
                  body);
              Console.WriteLine("--------------------消息已发送!---------------------");
        
      • 消费者代码
        • 代码
              //Qos质量检测
              channel.BasicQos(0,1,false);
          
        • 示例代码
              #1、如果是Web项目,可以使用类继承IHostService接口的方式实现,然后将消费的代码放到StartAsync函数中即可。
              #2、然后再Startup类中的ConfigureServices函数中注册这个类 [services.AddHostService<类名>()]
              Console.WriteLine("-------------消息开始消费---------------");
              //建立连接
              var factory = new ConnectionFactory()
              {
                  HostName = "127.0.0.1",
                  Port = 5672,
                  UserName = "guest",
                  Password = "guest",
                  VirtualHost = "/"
              };
              //建立连接
              using var connect = factory.CreateConnection();
              var channel = connect.CreateModel();
              //定义队列 
              channel.QueueDeclare(
                  "testQueue",
                  true,//消息的持久化
                  false,
                  false,
                  null);
              var consumer = new EventingBasicConsumer(channel);
              consumer.Received += (model, ea) => {
                  Console.WriteLine($"{model}");
                  var bytes = ea.Body;
                  var result = Encoding.UTF8.GetString(bytes.ToArray());
                  Console.WriteLine($"{result}");
                  //配置手动确认
                  channel.BasicAck(ea.DeliveryTag,true);
              };
              //Qos质量检测
              channel.BasicQos(0,1,false);
              channel.BasicConsume(
                                    "testQueue",
                                     false, //手动确认
                                     consumer);
              Console.WriteLine("-------------消息消费结束 ---------------");         
          

五、RabbitMq交换机落地

  • 概念
    交换机就是将客户端的消息发送到队列中。
  • 分类
    • 扇形交换机 [性能较高]
      • 使用场景
        发送一个消息队列,多个消费者必须去消费[订阅/发布];如图:
        在这里插入图片描述

      • 实现

        • 生产者代码
          • 代码
                var channel = connect.CreateModel(); 
                //定义交换机
                //参数1:交换机自定义名称
                //参数2:类型 fanout 扇形交换机
                channel.ExchangeDeclare("testQueue",ExchangeType.Fanout);  
            
          • 示例代码
              //建立连接
              var factory = new ConnectionFactory() {
                  HostName = "127.0.0.1",
                  Port = 5672,
                  UserName = "guest",
                  Password = "guest",
                  VirtualHost="/"
              };
              using var connect = factory.CreateConnection(); 
              var channel = connect.CreateModel();
              //定义扇形交换机
              channel.ExchangeDeclare("testQueue",ExchangeType.Fanout);  
              //消息数据
              Stutent stu = new Stutent()
              {
                  name = "小刚",
                  sex = "男"
              };
              string json = JsonConvert.SerializeObject(stu);
              var body = Encoding.UTF8.GetBytes(json);
              //发送消息
              var properties = channel.CreateBasicProperties();
              properties.Persistent = true;//第二种方式设置消息持久化
              channel.BasicPublish(
                  "testQueue", //交换机名称
                  "",
                  properties,
                  body);
              Console.WriteLine("--------------------消息已发送!---------------------");
            
        • 消费者代码
          • 代码
              var channel = connect.CreateModel(); 
              //1 定义扇形交换机
              channel.ExchangeDeclare("testQueue",ExchangeType.Fanout);  
              //2 定义随机队列
              var queueName = channel.QueueDeclare.QueueName;
              //3 队列与交换机绑定
              channel.QueueBind(queueName,"testQueue","");
            
          • 示例代码
                  //多个消费者消费需要写同样的代码去消费消息
                  Console.WriteLine("-------------消息开始消费---------------");
                  //建立连接
                  var factory = new ConnectionFactory()
                  {
                      HostName = "127.0.0.1",
                      Port = 5672,
                      UserName = "guest",
                      Password = "guest",
                      VirtualHost = "/"
                  };
                  //建立连接
                  using var connect = factory.CreateConnection();
                  var channel = connect.CreateModel(); 
                  //1 定义扇形交换机
                  channel.ExchangeDeclare("testQueue",ExchangeType.Fanout);  
                  //2 定义随机名称队列
                  var queueName = channel.QueueDeclare.QueueName;
                  //3 队列与交换机绑定
                  channel.QueueBind(queueName,"testQueue","");
                  
                  var consumer = new EventingBasicConsumer(channel);
                  consumer.Received += (model, ea) => {
                      Console.WriteLine($"{model}");
                      var bytes = ea.Body;
                      var result = Encoding.UTF8.GetString(bytes.ToArray());
                      //配置手动确认
                      channel.BasicAck(ea.DeliveryTag,true);
                      Console.WriteLine($"{result}");
                  };
                  //Qos质量检测
                  channel.BasicQos(0,1,false);
                  channel.BasicConsume(
                  queueName,//随机队列名称
                  false, //消息手动确认
                  consumer);
                  Console.WriteLine("-------------消息消费结束 ---------------");   
            
    • 直接交换机
      • 使用场景
        比如:购买完商品后,有的用户希望系统给自己发送短信,有的用户希望系统给自己发送邮件。
      • 实现
        使用 routingKey,路由key。
        • 生产者代码
          • 代码
                //发送消息的使用必须指定下routingKey路由名称;
                //注意:生产者与消费者名称必须是一致才可以;
                channel.BasicPublish(
                      "testQueue", //交换机名称
                      "[自定义指定名称]",//routingKey 指定路由名称,生产者与消费者名称必须是一致才可以
                      properties,
                      body);
            
          • 示例代码
                 //建立连接
                  var factory = new ConnectionFactory() {
                      HostName = "127.0.0.1",
                      Port = 5672,
                      UserName = "guest",
                      Password = "guest",
                      VirtualHost="/"
                  };
                  using var connect = factory.CreateConnection(); 
                  var channel = connect.CreateModel();
                  //定义扇形交换机
                  channel.ExchangeDeclare("testQueue",ExchangeType.Direct);  
                  //消息数据
                  Stutent stu = new Stutent()
                  {
                      name = "小刚",
                      sex = "男"
                  };
                  string json = JsonConvert.SerializeObject(stu);
                  var body = Encoding.UTF8.GetBytes(json);
                  //发送消息
                  var properties = channel.CreateBasicProperties();
                  properties.Persistent = true;//第二种方式设置消息持久化
                  channel.BasicPublish(
                      "testQueue", //交换机名称
                      "[自定义指定名称]",//routingKey 指定路由名称,生产者与消费者名称必须是一致才可以
                      properties,
                      body);
                  Console.WriteLine("--------------------消息已发送!---------------------");
            
        • 消费者代码
          • 代码
                  var channel = connect.CreateModel(); 
                  //1 定义扇形交换机
                  channel.ExchangeDeclare("交换机名称",ExchangeType.Direct);  
                  //2 定义随机名称队列
                  var queueName = channel.QueueDeclare.QueueName;
                   //3 队列与交换机绑定
                  channel.QueueBind(queueName,"testQueue","路由名称[指定队列名称]");
            
          • 示例代码
                  Console.WriteLine("-------------消息开始消费---------------");
                  //建立连接
                  var factory = new ConnectionFactory()
                  {
                      HostName = "127.0.0.1",
                      Port = 5672,
                      UserName = "guest",
                      Password = "guest",
                      VirtualHost = "/"
                  };
                  //建立连接
                  using var connect = factory.CreateConnection();
                  var channel = connect.CreateModel(); 
                  //1 定义扇形交换机
                  channel.ExchangeDeclare("testQueue",ExchangeType.Fanout);  
                  //2 定义随机名称队列
                  var queueName = channel.QueueDeclare.QueueName;
                  //3 队列与交换机绑定
                  channel.QueueBind(queueName,"testQueue","[自定义指定名称]");
                  
                  var consumer = new EventingBasicConsumer(channel);
                  consumer.Received += (model, ea) => {
                      Console.WriteLine($"{model}");
                      var bytes = ea.Body;
                      var result = Encoding.UTF8.GetString(bytes.ToArray());
                      //配置手动确认
                      channel.BasicAck(ea.DeliveryTag,true);
                      Console.WriteLine($"{result}");
                  };
                  //Qos质量检测
                  channel.BasicQos(0,1,false);
                  channel.BasicConsume(
                  queueName,//随机队列名称
                  false, //消息手动确认
                  consumer);
                  Console.WriteLine("-------------消息消费结束 ---------------");   
            
    • 主题交换机
      项目中用的主题交换机比较多。既可以1对多,也可以多对1;
      • 实现
        • 生产者代码
          • 代码
                //定义主题交换机
                channel.ExchangeDeclare("[交换机名称]",ExchangeType.topic);  
                .......
                //指定RoutingKey名称
               channel.BasicPublish(
                      "[交换机名称]", //交换机名称
                      "[routingKey 自定义指定名称]",//routingKey 指定路由名称,生产者前缀与消费者前缀名称必须是一致才可以[消费者用的通配符的方式:比如:生产者:sms.xxx   生产者:sms.*]
                      properties,
                      body);
            
          • 示例代码 [多个生产者]
              //建立连接
                  var factory = new ConnectionFactory() {
                      HostName = "127.0.0.1",
                      Port = 5672,
                      UserName = "guest",
                      Password = "guest",
                      VirtualHost="/"
                  };
                  using var connect = factory.CreateConnection(); 
                  var channel = connect.CreateModel();
                  //定义扇形交换机
                  channel.ExchangeDeclare("test_topic",ExchangeType.topic);  
                  //消息数据
                  Stutent stu = new Stutent()
                  {
                      name = "小刚",
                      sex = "男"
                  };
                  string json = JsonConvert.SerializeObject(stu);
                  var body = Encoding.UTF8.GetBytes(json);
                  //发送消息
                  var properties = channel.CreateBasicProperties();
                  properties.Persistent = true;//第二种方式设置消息持久化
                  channel.BasicPublish(
                      "test_topic", //交换机名称
                      "sms.Queue",//routingKey 指定路由名称,生产者前缀与消费者前缀名称必须是一致才可以[消费者用的通配符的方式:比如:生产者:sms.xxx   生产者:sms.*]
                      properties,
                      body);
                  Console.WriteLine("--------------------消息已发送!---------------------");     
            
        • 消费者代码
          • 代码
                   //1 定义主题交换机
                  channel.ExchangeDeclare("[交换机名称]",ExchangeType.Topic);  
                  //2 定义随机名称队列
                  var queueName = channel.QueueDeclare.QueueName;
                  //3 队列与交换机绑定
                  // * :只能匹配一级。
                  // # :可以匹配多级。
                  channel.QueueBind(queueName,"[交换机名称]","[自定义生产者RoutingKey名称前缀].[*|#]"); 
            
          • 示例代码
                  Console.WriteLine("-------------消息开始消费---------------");
                  //建立连接
                  var factory = new ConnectionFactory()
                  {
                      HostName = "127.0.0.1",
                      Port = 5672,
                      UserName = "guest",
                      Password = "guest",
                      VirtualHost = "/"
                  };
                  //建立连接
                  using var connect = factory.CreateConnection();
                  var channel = connect.CreateModel(); 
                  //1 定义主题交换机
                  channel.ExchangeDeclare("test_topic",ExchangeType.Topic);  
                  //2 定义随机名称队列
                  var queueName = channel.QueueDeclare.QueueName;
                  //3 队列与交换机绑定
                  // * :只能匹配一级。
                  // # :可以匹配多级。
                  channel.QueueBind(queueName,"test_topic","sms.*"); 
                  
                  var consumer = new EventingBasicConsumer(channel);
                  consumer.Received += (model, ea) => {
                      Console.WriteLine($"{model}");
                      var bytes = ea.Body;
                      var result = Encoding.UTF8.GetString(bytes.ToArray());
                      //配置手动确认
                      channel.BasicAck(ea.DeliveryTag,true);
                      Console.WriteLine($"{result}");
                  };
                  //Qos质量检测
                  channel.BasicQos(0,1,false);
                  channel.BasicConsume(
                  queueName,//随机队列名称
                  false, //消息手动确认
                  consumer);
                  Console.WriteLine("-------------消息消费结束 ---------------");               
            

六、RabbitMq-RPC回调落地

  • 场景
    1 以添加商品为例,添加完商品再去查询当前的商品,怎么去验证当前的商品添加成功呢?
    2 分布式异步事务使用 rpc回调
  • 目的
    为了生产者发送消息到队列,消费者消费队列后并创建商品成功后,通知要查询商品的服务,商品创建成功,可以查询。
  • 实现
    • 生产者代码
      • 关键代码
                 //定义队列
                 string replyQueueName = channel.QueueDeclare().QueueName;  
                 var properties = channel.CreateBasicProperties();
                 var correlationId = Guid.NewGuid().ToString();
                 properties.CorrelationId = correlationId;
                 properties.ReplyTo = replyQueueName;
                 ....... 
                 //消息回调
                  var consumer = new  EventingBasicConsumer(channel);
                  channel.Received += (model,ea)=>{
                      Console.WriteLine($"{model}");
                      var bytes = ea.Body;
                      var result = Encoding.UTF8.GetString(bytes.ToArray());
                      if (ea.BasicProperties.CorrelationId==correlationId)
                      {
                           Console.WriteLine($"回调成功:{result}");
                      }
                  };
        
      • 示例代码
              //建立连接
                  var factory = new ConnectionFactory() {
                      HostName = "127.0.0.1",
                      Port = 5672,
                      UserName = "guest",
                      Password = "guest",
                      VirtualHost="/"
                  };
                  var connect = factory.CreateConnection(); 
                  var channel = connect.CreateModel();
                  //定义交换机
                 string replyQueueName = channel.QueueDeclare().QueueName;  
                 var properties = channel.CreateBasicProperties();
                 var correlationId = Guid.NewGuid().ToString();
                 properties.CorrelationId = correlationId;
                 properties.ReplyTo = replyQueueName;
                  //消息数据
                  Stutent stu = new Stutent()
                  {
                      name = "小刚",
                      sex = "男"
                  };
                  string json = JsonConvert.SerializeObject(stu);
                  var body = Encoding.UTF8.GetBytes(json); 
                  properties.Persistent = true;//第二种方式设置消息持久化
                  channel.BasicPublish(
                      "", //交换机名称
                      "rpc_queue", 
                      properties,
                      body);
                  //消息回调
                  var consumer = new  EventingBasicConsumer(channel);
                  channel.Received += (model,ea)=>{
                      Console.WriteLine($"{model}");
                      var bytes = ea.Body;
                      var result = Encoding.UTF8.GetString(bytes.ToArray());
                      if (ea.BasicProperties.CorrelationId==correlationId)
                      {
                           Console.WriteLine($"回调成功:{result}");
                      }
                  };
                  Console.WriteLine("--------------------消息已发送!---------------------");             
        
    • 消费者代码
      • 示例代码
                  Console.WriteLine("-------------消息开始消费---------------");
                  //建立连接
                  var factory = new ConnectionFactory()
                  {
                      HostName = "127.0.0.1",
                      Port = 5672,
                      UserName = "guest",
                      Password = "guest",
                      VirtualHost = "/"
                  };
                  //建立连接
                  using var connect = factory.CreateConnection();
                  var channel = connect.CreateModel(); 
                  channel.QueueDeclare(
                  "rpc_queue",//队列名称
                  true,//消息的持久化
                  false,
                  false,
                  null);
                  
                  var consumer = new EventingBasicConsumer(channel);
                  consumer.Received += (model, ea) => {
                      Console.WriteLine($"{model}");
                      var bytes = ea.Body;
                      var props = ea.BasicProperties;
                      var replyProps =  channel.CreateBasicProperties();
                      replyProps.CorrelationId = props.CorrelationId;
                      try
                      {
                          
                          var result = Encoding.UTF8.GetString(bytes.ToArray());
                          Console.WriteLine($"{result}");
                      }
                      catch(Exception ex)
                      {
                           
                      } 
                      finally
                      { 
                           var responssMeaaage = Encodinig.UTF8.GetBytes("回调成功");
                           channel.BasicPublish(
                          "", //交换机名称
                          props.ReplyTo, 
                          replyProps,
                          responssMeaaage); 
                      }
                  };
                  //Qos质量检测
                  channel.BasicQos(0,1,false);
                  channel.BasicConsume(
                  "rpc_queue",//队列名称
                  true, 
                  consumer);
                  Console.WriteLine("-------------消息消费结束 ---------------");             
        

相关文章:

  • 神经网络计算机的用途是,神经网络计算机的应用
  • visual studio快捷键
  • gif制作动图教你一键搞定,图片转gif和视频转gif怎么制作
  • 国产数据库百家争鸣,百花齐放有感
  • 电脑重装系统Win11edge浏览器看视频绿屏如何处理?
  • 【python】计算mel频率可能比你想象的要复杂一点
  • springboot+vue+elementui校园博客管理系统
  • 从QQ秀到VR穿搭,为什么服装搭配对虚拟化身如此重要
  • LeetCode每日一题——946. 验证栈序列
  • Mybatis的注解实现复杂映射开发
  • 【Games101 作业6 + 附加题】渲染兔子 BVH SAH 代码
  • 全球最强智算来了:12000000000000000000(别数了18个0)FLOPS!
  • 轻量级神经网络算法系列文章-ShuffleNet v2
  • 4-20mA 电压控制电流输出电路浅析
  • java+python+vue高校竞赛报名系统
  • hexo+github搭建个人博客
  • 5、React组件事件详解
  • Angular2开发踩坑系列-生产环境编译
  • classpath对获取配置文件的影响
  • create-react-app项目添加less配置
  • echarts花样作死的坑
  • FastReport在线报表设计器工作原理
  • Flex布局到底解决了什么问题
  • quasar-framework cnodejs社区
  • SpiderData 2019年2月23日 DApp数据排行榜
  • vue自定义指令实现v-tap插件
  • web标准化(下)
  • 动态规划入门(以爬楼梯为例)
  • 短视频宝贝=慢?阿里巴巴工程师这样秒开短视频
  • 试着探索高并发下的系统架构面貌
  • 手写双向链表LinkedList的几个常用功能
  • 正则学习笔记
  • 看到一个关于网页设计的文章分享过来!大家看看!
  • Java性能优化之JVM GC(垃圾回收机制)
  • 回归生活:清理微信公众号
  • 正则表达式-基础知识Review
  • # Swust 12th acm 邀请赛# [ A ] A+B problem [题解]
  • ### Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLTr
  • #{} 和 ${}区别
  • #常见电池型号介绍 常见电池尺寸是多少【详解】
  • #图像处理
  • #我与Java虚拟机的故事#连载03:面试过的百度,滴滴,快手都问了这些问题
  • (2)(2.10) LTM telemetry
  • (delphi11最新学习资料) Object Pascal 学习笔记---第2章第五节(日期和时间)
  • (react踩过的坑)Antd Select(设置了labelInValue)在FormItem中initialValue的问题
  • (保姆级教程)Mysql中索引、触发器、存储过程、存储函数的概念、作用,以及如何使用索引、存储过程,代码操作演示
  • (二)Linux——Linux常用指令
  • (附源码)ssm码农论坛 毕业设计 231126
  • (每日持续更新)jdk api之StringBufferInputStream基础、应用、实战
  • (四)七种元启发算法(DBO、LO、SWO、COA、LSO、KOA、GRO)求解无人机路径规划MATLAB
  • (一) springboot详细介绍
  • (转)Linq学习笔记
  • .Net Attribute详解(上)-Attribute本质以及一个简单示例
  • .net framework profiles /.net framework 配置
  • .net mvc 获取url中controller和action