RabbitMq消息队列
文章目录
- 前言
- 一、RabbitMq核心概念
- 二、RabbitMq应用场景
- 三、RabbitMq微服务落地
- 四、RabbitMq核心功能落地
- 五、RabbitMq交换机落地
- 六、RabbitMq-RPC回调落地
前言
- MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ等等…
- 相关对象介绍
- ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。
- Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。
- ConnectionFactory为Connection的制造工厂。
- Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
- ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。
一、RabbitMq核心概念
- 概念
简单的说就是消息队列;-
消息队列
-
消息
消息是对数据库的数据实现增删改查的操作;如图:
-
队列
队列是一端进数据,一端出数据;如图:
消息队列就是一端进消息,一端出消息;如图:
-
-
二、RabbitMq应用场景
- 应用场景
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
三、RabbitMq微服务落地
- 条件
- Demo项目
- RabbitMq
下载地址:
https://download.csdn.net/download/Fu_Shi_rong/86399985 - RabbitMq.Client
- 步骤
-
运行RabbitMq命令
#安装插件 rabbitmq-plugins enable rabbitmq_management #进入rabbitmq_server-3.9.13\sbin目录中 rabbitmq-server.bat
运行结果如下:
浏览器输入地址:http://localhost:15672,用户名:guest;密码:guest;运行结果如图:
-
Demo 项目代码
-
nuget 安装包 [生产者与消费者都必须安装]
RabbitMQ.Client
-
生产者
//建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost="/" }; using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //定义队列 channel.QueueDeclare( "testQueue",//队列名称 false,//消息的持久化 第一种方式 false, false, null); //消息数据 Stutent stu = new Stutent() { name = "小刚", sex = "男" }; string json = JsonConvert.SerializeObject(stu); var body = Encoding.UTF8.GetBytes(json); //发送消息 var properties = channel.CreateBasicProperties(); properties.Persistent = true;//第二种方式设置消息持久化 channel.BasicPublish( "", //交换机名称 "testQueue",//routingKey :指定队列名称 properties, body); Console.WriteLine("--------------------消息已发送!---------------------");
运行结果如下:
-
消费者 [手动确认+消息持久化机制]
#1、如果是Web项目,可以使用类继承IHostService接口的方式实现,然后将消费的代码放到StartAsync函数中即可。 #2、然后再Startup类中的ConfigureServices函数中注册这个类 [services.AddHostService<类名>()] Console.WriteLine("-------------消息开始消费---------------"); //建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost = "/" }; //建立连接 using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //定义队列 channel.QueueDeclare( "testQueue",//队列名称 true,//消息的持久化 false, false, null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine($"{model}"); var bytes = ea.Body; var result = Encoding.UTF8.GetString(bytes.ToArray()); //配置手动确认 channel.BasicAck(ea.DeliveryTag,true); Console.WriteLine($"{result}"); }; //Qos质量检测 channel.BasicQos(0,1,false); channel.BasicConsume( "testQueue",//队列名称 false, //消息手动确认 consumer); Console.WriteLine("-------------消息消费结束 ---------------");
运行代码如下:
-
-
四、RabbitMq核心功能落地
- 消息确认 ACK
- 场景
当rabbitMq发送消息到服务器,服务器宕机了,导致消息丢失。要使用RabbitMq的消息应答机制。 - 生产者代码
//建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost="/" }; using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //定义队列 channel.QueueDeclare( "testQueue",//队列名称 false,//消息的持久化 false, false, null); //消息数据 Stutent stu = new Stutent() { name = "小刚", sex = "男" }; string json = JsonConvert.SerializeObject(stu); var body = Encoding.UTF8.GetBytes(json); //发送消息 var properties = channel.CreateBasicProperties(); properties.Persistent = true;//设置消息持久化 channel.BasicPublish( "", "testQueue", properties, body); Console.WriteLine("--------------------消息已发送!---------------------");
- 消费者代码
- 代码:
channel.BasicConsume( "testQueue", true, //备注:true:自动消息确认[推荐] false[消息堆积,启动应用服务后会出现重复消费的现象] consumer );
- 示例代码:
#1、如果是Web项目,可以使用类继承IHostService接口的方式实现,然后将消费的代码放到StartAsync函数中即可。 #2、然后再Startup类中的ConfigureServices函数中注册这个类 [services.AddHostService<类名>()] Console.WriteLine("-------------消息开始消费---------------"); //建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost = "/" }; //建立连接 using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //定义队列 channel.QueueDeclare( "testQueue",//队列名称 true,//消息的持久化 false, false, null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine($"{model}"); var bytes = ea.Body; var result = Encoding.UTF8.GetString(bytes.ToArray()); Console.WriteLine($"{result}"); }; channel.BasicConsume( "testQueue", true, //备注:true:自动消息确认 consumer); Console.WriteLine("-------------消息消费结束 ---------------");
- 代码:
- 场景
- 消息持久化
- 场景
生产者发送消息到RabbitMq服务器,RabbitMq服务器突然宕机了,导致消费者无法消费;这就需要用到消息持久化机制,将消息存储到磁盘中,启动RabbitMq服务器后再重新加载消息,让消费者重新消费。 - 生产者代码
//建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost="/" }; using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //定义队列 channel.QueueDeclare( "testQueue",//队列名称 false,//消息的持久化 false, false, null); //消息数据 Stutent stu = new Stutent() { name = "小刚", sex = "男" }; string json = JsonConvert.SerializeObject(stu); var body = Encoding.UTF8.GetBytes(json); //发送消息 var properties = channel.CreateBasicProperties(); properties.Persistent = true;//设置消息持久化 channel.BasicPublish( "", "testQueue", properties, body); Console.WriteLine("--------------------消息已发送!---------------------");
- 消费者代码
- 代码
//定义队列 channel.QueueDeclare( "testQueue", true,//消息的持久化 ture:消息持久化;false:不用消息持久化 false, false, null);
- 示例代码
#1、如果是Web项目,可以使用类继承IHostService接口的方式实现,然后将消费的代码放到StartAsync函数中即可。 #2、然后再Startup类中的ConfigureServices函数中注册这个类 [services.AddHostService<类名>()] Console.WriteLine("-------------消息开始消费---------------"); //建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost = "/" }; //建立连接 using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //定义队列 channel.QueueDeclare( "testQueue", true,//消息的持久化 false, false, null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine($"{model}"); var bytes = ea.Body; var result = Encoding.UTF8.GetString(bytes.ToArray()); Console.WriteLine($"{result}"); }; channel.BasicConsume( "testQueue", true, //备注:true:自动消息确认 consumer); Console.WriteLine("-------------消息消费结束 ---------------");
- 代码
- 场景
- 手动确认 [推荐]
- 场景
当RabbitMq给应用服务发送了消息,应用服务也收到了消息,同时应用服务讲确认消息给RabbitMq期间,执行的业务逻辑代码失败了; - 生产者代码
//建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost="/" }; using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //定义队列 channel.QueueDeclare( "testQueue",//队列名称 false,//消息的持久化 false, false, null); //消息数据 Stutent stu = new Stutent() { name = "小刚", sex = "男" }; string json = JsonConvert.SerializeObject(stu); var body = Encoding.UTF8.GetBytes(json); //发送消息 var properties = channel.CreateBasicProperties(); properties.Persistent = true;//设置消息持久化 channel.BasicPublish( "", "testQueue", properties, body); Console.WriteLine("--------------------消息已发送!---------------------");
- 消费者代码
- 代码
consumer.Received += (model, ea) => { ..... //添加手动确认配置 channel.BasicAck(ea.DeliveryTag,true); }; channel.BasicConsume( "testQueue", false, //手动确认 consumer);
- 示例代码
#1、如果是Web项目,可以使用类继承IHostService接口的方式实现,然后将消费的代码放到StartAsync函数中即可。 #2、然后再Startup类中的ConfigureServices函数中注册这个类 [services.AddHostService<类名>()] Console.WriteLine("-------------消息开始消费---------------"); //建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost = "/" }; //建立连接 using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //定义队列 channel.QueueDeclare( "testQueue", true,//消息的持久化 false, false, null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine($"{model}"); var bytes = ea.Body; var result = Encoding.UTF8.GetString(bytes.ToArray()); Console.WriteLine($"{result}"); //配置手动确认 channel.BasicAck(ea.DeliveryTag,true); }; channel.BasicConsume( "testQueue", false, //手动确认 consumer); Console.WriteLine("-------------消息消费结束 ---------------");
- 代码
- 场景
- QOS质量检测
消息分发到实例之前,查看当前实例是否处理完消息,如果没有处理完则分发给其他的实例。- 场景
消息堆积:应用服务来不及处理,导致的堆积。 - 解决方案
使用QOS质量检测。
使用应用集群的方式来解决。 - 实现
- 生产者代码
//建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost="/" }; using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //定义队列 channel.QueueDeclare( "testQueue",//队列名称 false,//消息的持久化 第一种方式 false, false, null); //消息数据 Stutent stu = new Stutent() { name = "小刚", sex = "男" }; string json = JsonConvert.SerializeObject(stu); var body = Encoding.UTF8.GetBytes(json); //发送消息 var properties = channel.CreateBasicProperties(); properties.Persistent = true;//第二种方式设置消息持久化 channel.BasicPublish( "", "testQueue", properties, body); Console.WriteLine("--------------------消息已发送!---------------------");
- 消费者代码
- 代码
//Qos质量检测 channel.BasicQos(0,1,false);
- 示例代码
#1、如果是Web项目,可以使用类继承IHostService接口的方式实现,然后将消费的代码放到StartAsync函数中即可。 #2、然后再Startup类中的ConfigureServices函数中注册这个类 [services.AddHostService<类名>()] Console.WriteLine("-------------消息开始消费---------------"); //建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost = "/" }; //建立连接 using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //定义队列 channel.QueueDeclare( "testQueue", true,//消息的持久化 false, false, null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine($"{model}"); var bytes = ea.Body; var result = Encoding.UTF8.GetString(bytes.ToArray()); Console.WriteLine($"{result}"); //配置手动确认 channel.BasicAck(ea.DeliveryTag,true); }; //Qos质量检测 channel.BasicQos(0,1,false); channel.BasicConsume( "testQueue", false, //手动确认 consumer); Console.WriteLine("-------------消息消费结束 ---------------");
- 代码
- 生产者代码
- 场景
五、RabbitMq交换机落地
- 概念
交换机就是将客户端的消息发送到队列中。 - 分类
- 扇形交换机 [性能较高]
-
使用场景
发送一个消息队列,多个消费者必须去消费[订阅/发布];如图:
-
实现
- 生产者代码
- 代码
var channel = connect.CreateModel(); //定义交换机 //参数1:交换机自定义名称 //参数2:类型 fanout 扇形交换机 channel.ExchangeDeclare("testQueue",ExchangeType.Fanout);
- 示例代码
//建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost="/" }; using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //定义扇形交换机 channel.ExchangeDeclare("testQueue",ExchangeType.Fanout); //消息数据 Stutent stu = new Stutent() { name = "小刚", sex = "男" }; string json = JsonConvert.SerializeObject(stu); var body = Encoding.UTF8.GetBytes(json); //发送消息 var properties = channel.CreateBasicProperties(); properties.Persistent = true;//第二种方式设置消息持久化 channel.BasicPublish( "testQueue", //交换机名称 "", properties, body); Console.WriteLine("--------------------消息已发送!---------------------");
- 代码
- 消费者代码
- 代码
var channel = connect.CreateModel(); //1 定义扇形交换机 channel.ExchangeDeclare("testQueue",ExchangeType.Fanout); //2 定义随机队列 var queueName = channel.QueueDeclare.QueueName; //3 队列与交换机绑定 channel.QueueBind(queueName,"testQueue","");
- 示例代码
//多个消费者消费需要写同样的代码去消费消息 Console.WriteLine("-------------消息开始消费---------------"); //建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost = "/" }; //建立连接 using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //1 定义扇形交换机 channel.ExchangeDeclare("testQueue",ExchangeType.Fanout); //2 定义随机名称队列 var queueName = channel.QueueDeclare.QueueName; //3 队列与交换机绑定 channel.QueueBind(queueName,"testQueue",""); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine($"{model}"); var bytes = ea.Body; var result = Encoding.UTF8.GetString(bytes.ToArray()); //配置手动确认 channel.BasicAck(ea.DeliveryTag,true); Console.WriteLine($"{result}"); }; //Qos质量检测 channel.BasicQos(0,1,false); channel.BasicConsume( queueName,//随机队列名称 false, //消息手动确认 consumer); Console.WriteLine("-------------消息消费结束 ---------------");
- 代码
- 生产者代码
-
- 直接交换机
- 使用场景
比如:购买完商品后,有的用户希望系统给自己发送短信,有的用户希望系统给自己发送邮件。 - 实现
使用 routingKey,路由key。- 生产者代码
- 代码
//发送消息的使用必须指定下routingKey路由名称; //注意:生产者与消费者名称必须是一致才可以; channel.BasicPublish( "testQueue", //交换机名称 "[自定义指定名称]",//routingKey 指定路由名称,生产者与消费者名称必须是一致才可以 properties, body);
- 示例代码
//建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost="/" }; using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //定义扇形交换机 channel.ExchangeDeclare("testQueue",ExchangeType.Direct); //消息数据 Stutent stu = new Stutent() { name = "小刚", sex = "男" }; string json = JsonConvert.SerializeObject(stu); var body = Encoding.UTF8.GetBytes(json); //发送消息 var properties = channel.CreateBasicProperties(); properties.Persistent = true;//第二种方式设置消息持久化 channel.BasicPublish( "testQueue", //交换机名称 "[自定义指定名称]",//routingKey 指定路由名称,生产者与消费者名称必须是一致才可以 properties, body); Console.WriteLine("--------------------消息已发送!---------------------");
- 代码
- 消费者代码
- 代码
var channel = connect.CreateModel(); //1 定义扇形交换机 channel.ExchangeDeclare("交换机名称",ExchangeType.Direct); //2 定义随机名称队列 var queueName = channel.QueueDeclare.QueueName; //3 队列与交换机绑定 channel.QueueBind(queueName,"testQueue","路由名称[指定队列名称]");
- 示例代码
Console.WriteLine("-------------消息开始消费---------------"); //建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost = "/" }; //建立连接 using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //1 定义扇形交换机 channel.ExchangeDeclare("testQueue",ExchangeType.Fanout); //2 定义随机名称队列 var queueName = channel.QueueDeclare.QueueName; //3 队列与交换机绑定 channel.QueueBind(queueName,"testQueue","[自定义指定名称]"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine($"{model}"); var bytes = ea.Body; var result = Encoding.UTF8.GetString(bytes.ToArray()); //配置手动确认 channel.BasicAck(ea.DeliveryTag,true); Console.WriteLine($"{result}"); }; //Qos质量检测 channel.BasicQos(0,1,false); channel.BasicConsume( queueName,//随机队列名称 false, //消息手动确认 consumer); Console.WriteLine("-------------消息消费结束 ---------------");
- 代码
- 生产者代码
- 使用场景
- 主题交换机
项目中用的主题交换机比较多。既可以1对多,也可以多对1;- 实现
- 生产者代码
- 代码
//定义主题交换机 channel.ExchangeDeclare("[交换机名称]",ExchangeType.topic); ....... //指定RoutingKey名称 channel.BasicPublish( "[交换机名称]", //交换机名称 "[routingKey 自定义指定名称]",//routingKey 指定路由名称,生产者前缀与消费者前缀名称必须是一致才可以[消费者用的通配符的方式:比如:生产者:sms.xxx 生产者:sms.*] properties, body);
- 示例代码 [多个生产者]
//建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost="/" }; using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //定义扇形交换机 channel.ExchangeDeclare("test_topic",ExchangeType.topic); //消息数据 Stutent stu = new Stutent() { name = "小刚", sex = "男" }; string json = JsonConvert.SerializeObject(stu); var body = Encoding.UTF8.GetBytes(json); //发送消息 var properties = channel.CreateBasicProperties(); properties.Persistent = true;//第二种方式设置消息持久化 channel.BasicPublish( "test_topic", //交换机名称 "sms.Queue",//routingKey 指定路由名称,生产者前缀与消费者前缀名称必须是一致才可以[消费者用的通配符的方式:比如:生产者:sms.xxx 生产者:sms.*] properties, body); Console.WriteLine("--------------------消息已发送!---------------------");
- 代码
- 消费者代码
- 代码
//1 定义主题交换机 channel.ExchangeDeclare("[交换机名称]",ExchangeType.Topic); //2 定义随机名称队列 var queueName = channel.QueueDeclare.QueueName; //3 队列与交换机绑定 // * :只能匹配一级。 // # :可以匹配多级。 channel.QueueBind(queueName,"[交换机名称]","[自定义生产者RoutingKey名称前缀].[*|#]");
- 示例代码
Console.WriteLine("-------------消息开始消费---------------"); //建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost = "/" }; //建立连接 using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //1 定义主题交换机 channel.ExchangeDeclare("test_topic",ExchangeType.Topic); //2 定义随机名称队列 var queueName = channel.QueueDeclare.QueueName; //3 队列与交换机绑定 // * :只能匹配一级。 // # :可以匹配多级。 channel.QueueBind(queueName,"test_topic","sms.*"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine($"{model}"); var bytes = ea.Body; var result = Encoding.UTF8.GetString(bytes.ToArray()); //配置手动确认 channel.BasicAck(ea.DeliveryTag,true); Console.WriteLine($"{result}"); }; //Qos质量检测 channel.BasicQos(0,1,false); channel.BasicConsume( queueName,//随机队列名称 false, //消息手动确认 consumer); Console.WriteLine("-------------消息消费结束 ---------------");
- 代码
- 生产者代码
- 实现
- 扇形交换机 [性能较高]
六、RabbitMq-RPC回调落地
- 场景
1 以添加商品为例,添加完商品再去查询当前的商品,怎么去验证当前的商品添加成功呢?
2 分布式异步事务使用 rpc回调 - 目的
为了生产者发送消息到队列,消费者消费队列后并创建商品成功后,通知要查询商品的服务,商品创建成功,可以查询。 - 实现
- 生产者代码
- 关键代码
//定义队列 string replyQueueName = channel.QueueDeclare().QueueName; var properties = channel.CreateBasicProperties(); var correlationId = Guid.NewGuid().ToString(); properties.CorrelationId = correlationId; properties.ReplyTo = replyQueueName; ....... //消息回调 var consumer = new EventingBasicConsumer(channel); channel.Received += (model,ea)=>{ Console.WriteLine($"{model}"); var bytes = ea.Body; var result = Encoding.UTF8.GetString(bytes.ToArray()); if (ea.BasicProperties.CorrelationId==correlationId) { Console.WriteLine($"回调成功:{result}"); } };
- 示例代码
//建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost="/" }; var connect = factory.CreateConnection(); var channel = connect.CreateModel(); //定义交换机 string replyQueueName = channel.QueueDeclare().QueueName; var properties = channel.CreateBasicProperties(); var correlationId = Guid.NewGuid().ToString(); properties.CorrelationId = correlationId; properties.ReplyTo = replyQueueName; //消息数据 Stutent stu = new Stutent() { name = "小刚", sex = "男" }; string json = JsonConvert.SerializeObject(stu); var body = Encoding.UTF8.GetBytes(json); properties.Persistent = true;//第二种方式设置消息持久化 channel.BasicPublish( "", //交换机名称 "rpc_queue", properties, body); //消息回调 var consumer = new EventingBasicConsumer(channel); channel.Received += (model,ea)=>{ Console.WriteLine($"{model}"); var bytes = ea.Body; var result = Encoding.UTF8.GetString(bytes.ToArray()); if (ea.BasicProperties.CorrelationId==correlationId) { Console.WriteLine($"回调成功:{result}"); } }; Console.WriteLine("--------------------消息已发送!---------------------");
- 关键代码
- 消费者代码
- 示例代码
Console.WriteLine("-------------消息开始消费---------------"); //建立连接 var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost = "/" }; //建立连接 using var connect = factory.CreateConnection(); var channel = connect.CreateModel(); channel.QueueDeclare( "rpc_queue",//队列名称 true,//消息的持久化 false, false, null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine($"{model}"); var bytes = ea.Body; var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { var result = Encoding.UTF8.GetString(bytes.ToArray()); Console.WriteLine($"{result}"); } catch(Exception ex) { } finally { var responssMeaaage = Encodinig.UTF8.GetBytes("回调成功"); channel.BasicPublish( "", //交换机名称 props.ReplyTo, replyProps, responssMeaaage); } }; //Qos质量检测 channel.BasicQos(0,1,false); channel.BasicConsume( "rpc_queue",//队列名称 true, consumer); Console.WriteLine("-------------消息消费结束 ---------------");
- 示例代码
- 生产者代码