当前位置: 首页 > news >正文

RabbitMq应用二

在应用一中,基本的消息队列使用已经完成了,在实际项目中,一定会出现各种各样的需求和问题,rabbitmq内置的很多强大机制和功能会帮助我们解决很多的问题,下面就一个一个的一起学习一下。

消息响应机制

应用一的列子,在消费者从指定队列获取消息的时候,把通知参数no_ack给设成true了,这样就不需要给rabbitMq服务发送已经处理完毕的通知,rabbitmq把消息发出去后,就会直接删除掉,不去管消费者是否处理成功,这样在实际项目中存在很大的风险,出现代码的健壮性很差的错误。所以一定要把no_ack参数设成false:

 channel.BasicConsume("newQueue", false, customer);

在接受逻辑全部处理成功后加上一句代码,通知rabbitmq,接到通知后才会删除

 var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue();
                        //将消息二进制转回字符串
                        var msg = Encoding.UTF8.GetString(ea.Body);
                        //通知队列,已经处理完成
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine(msg);

 

消息持久化

响应保证了消息不会被错误删除,假如rabbitmq挂了,所有消息全部会丢掉,rabbitmq一个广泛使用的机制就是可以持久化,做持久化要两步

1.队列持久化

 //队列是否持久化
                    bool durable = true;
channel.QueueDeclare("firstQueue",durable,false,false,null);

2.消息持久化,通过设置IBasicProperties.SetPersistent来做

 //消息持久化
                    var properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);
                    properties.DeliveryMode = 2; //消息是持久的,存在并不会受服务器重启影响  

上面的持久化,大部分时候不会出现问题,但是假如在写入队列的时候rabbitmq挂了,还是不会持久上,这种情况,我们就要用到我们代码的逻辑来强制进行持久化了。。。。

负载均衡分发消息

如果有两个接收端消费者同时订阅一个队列,会出现不固定的分发流程,某个消费者可能会出现过多的消息流入造成压力,而另一个空闲的蛋疼。所以,如果能公平的接受消息,处理完一个,接受另一个,同时保证压力的均衡。代码在消费者端设置:

channel.BasicQos(0, 1, false);

上面是几个rabbitmq比较重要的机制,下面开始是rabbitmq的核心牛逼的东西路由

这里涉及2个概念:

1.exchange,这是交换机,也叫路由器,在消息生产者发送消息的时候,实际上不是直接发送到queue队列中,因为他不知道发送到哪个队列,他会先发送到路由器中exchange里,exchange再通过路由匹配把消息发送到匹配的队列当中。

 

2.routingKey这个是路由的匹配规则,当消息发送到exchange里后,会根据routingkey来匹配到底发送到哪个队列,如果没匹配到,则消息丢失

 

exchange的四种类型:

 

1.direct:按routingkey的名称匹配

 

2.fanout:广播,无需匹配routingkey消息会发送到所有队列

 

3.topic:这个是贪婪匹配,也是最灵活的匹配方式,有两种符号#,*.,......*号的意思是

 

 #符号的意思是比如a_#,可以匹配的队列可以是a_a,a_aa,a_aaaaaa,a_a_b.......多词

 

 *符号的意识是比如a_*,可以匹配的队列可以是a_a,a_b,a_c.......单词

 

这个是应用一中发送消息给队列的代码,

 

 

channel.BasicPublish("", "firstQueue", null, body);

 

 

通过查看这个方法的参数中可看到第一个参数是exchange路由,第二个是routingkey匹配规则,而发送的代码第一个参数是"",第二个参数是firstQueue,开始以为是队列实际并不是,原因是如果用空字符串去申明一个exchange,那么系统就会使用"amq.direct"这个exchange。我们在创建一个queue的时候,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去,因为在第一个参数选择了默认的exchange,而我们申明的队列叫firstQueue,所以默认的,它在新建一个也叫firstQueue的routingKey,并绑定在默认的exchange上,导致了我们可以在第二个参数routingKey中写firstQueue,这样它就会找到定义的同名的queue,并把消息放进去。

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

基本概念已经差不多了,我是很擅长排版解释,往下是各种匹配规则的代码和运行情况,直接上代码:

1.路由类型direct,匹配规则rroutingKey相同,一个生产者,两个消费者,采用负载均衡方式分发:

生产者

 

 //创建链接工厂,设置目标,用户,密码
            var factory = new ConnectionFactory() { 
                HostName = "127.0.0.1",
                UserName = "feiyang",
                Password = "123456",
                AutomaticRecoveryEnabled = true, //自动重连 
                RequestedHeartbeat = UInt16.MaxValue//心跳超时时间
            };
            
            //开启当前服务设置的用户的链接
            using (var connection =  factory.CreateConnection())
            {
                //开启一个频道
                using (var channel = connection.CreateModel())
                {
                    //创建一个队列
                    //队列是否持久化
                    bool durable = true;
                    //已经存在的队列,不能再定义持久化
                   // channel.QueueDeclare("firstQueue",false,false,false,null);
                    //创建一个新的,持久的交换区  
                    channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null);  
                    //持久的队列, 没有排他性,与不自动删除 
                    channel.QueueDeclare("newQueue", durable, false, false, null);
                    // 绑定队列到交换区  
                    channel.QueueBind("newQueue", "NewExchange", "newRoutingKey"); 
                    //消息持久化
                    var properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);
                    properties.DeliveryMode = 2; //消息是持久的,存在并不会受服务器重启影响  
                    byte[] body = null;
                    //消息是以二进制数组的形式传输的,所以如果消息是实体对象的话,需要序列化和然后转化为二进制数组。
                    for (int i = 0; i < 100; i++)
                    {
                        body = Encoding.UTF8.GetBytes("这是第-----"+i+"-----条消息");
                        //channel.BasicPublish("", "firstQueue", null, body);
                        channel.BasicPublish("NewExchange", "newRoutingKey", properties, body); 
                        Console.Write("成功发送第-----"+i+"-----条消息!");
                    }
                    Console.ReadKey();
                }
            }

 

消费者a:

 

 

static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "127.0.0.1";
            factory.UserName = "feiyang";
            factory.Password = "123456";
            using (var connection  = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //创建一个新的,持久的交换区  
                    channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null);  
                    //还是连接到哪个队列
                    channel.QueueDeclare("newQueue",true,false,false,null);
                    // 绑定队列到交换区  
                    channel.QueueBind("newQueue", "NewExchange", "newRoutingKey"); 
                    //定义消息接受者
                    var customer = new QueueingBasicConsumer(channel);
                    //从指定队列获取消息,
                    //中间这个参数实际必须打开,为false,意思是是否不通知rabbitm已经处理完毕,我们这里要设成false,要通知
                    //channel.BasicConsume("firstQueue",true,customer);
                    channel.BasicConsume("newQueue", false, customer);
                    //由于队列分发不公平导致一个压力很大,一个很小,在这设置下,公平q分发,也就是一个消费者处理完通知队列后,才会继续分发一个
                    channel.BasicQos(0, 1, false);
                    //开始不断循环出队列的消息
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue();
                        //将消息二进制转回字符串
                        var msg = Encoding.UTF8.GetString(ea.Body);
                        //通知队列,已经处理完成
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine(msg);
                    }
                    //sw.Stop();
                    //Console.WriteLine("共用时" + sw.ElapsedTicks + "毫秒");
                    //Console.ReadKey();
                }
            }
        }

消费者b:

 var factory = new ConnectionFactory();
            factory.HostName = "127.0.0.1";
            factory.UserName = "feiyang";
            factory.Password = "123456";
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //创建一个新的,持久的交换区  
                    //channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null);
                    //还是连接到哪个队列
                    channel.QueueDeclare("newQueue", true, false, false, null);
                    // 绑定队列到交换区  
                    //channel.QueueBind("newQueue", "NewExchange", "newRoutingKey");
                    //定义消息接受者
                    var customer = new QueueingBasicConsumer(channel);
                    //从指定队列获取消息,
                    //中间这个参数实际必须打开,为false,意思是是否不通知rabbitm已经处理完毕,我们这里要设成false,要通知
                    //channel.BasicConsume("firstQueue",true,customer);
                    channel.BasicConsume("newQueue", false, customer);
                    //由于队列分发不公平导致一个压力很大,一个很小,在这设置下,公平q分发,也就是一个消费者处理完通知队列后,才会继续分发一个
                    channel.BasicQos(0, 1, false);
                    //开始不断循环出队列的消息
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue();
                        //将消息二进制转回字符串
                        var msg = Encoding.UTF8.GetString(ea.Body);
                        //通知队列,已经处理完成
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine(msg);
                    }
                    //sw.Stop();
                    //Console.WriteLine("共用时" + sw.ElapsedTicks + "毫秒");
                    //Console.ReadKey();
                }
            }

运行结果

 

2.1个生产者,2个消费者,路由类型direct,匹配规则routingKey相同,匹配不同的队列,一次发送到2个队列各个消费者取出各自的队列消息。

生产者,创建一个交换区,创建一个队列,

 //创建一个队列
                    //队列是否持久化
                    bool durable = true;
                    //已经存在的队列,不能再定义持久化
                   // channel.QueueDeclare("firstQueue",false,false,false,null);
                    //创建一个新的,持久的交换区  
                    channel.ExchangeDeclare("queueExchange", ExchangeType.Direct, true, false, null);  
                    //持久的队列, 没有排他性,与不自动删除 
                    channel.QueueDeclare("queue_a", durable, false, false, null);
                    // 绑定队列到交换区  
                    channel.QueueBind("queue_a", "queueExchange", "queueRoutingKey"); 
                    //消息持久化
                    var properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);
                    properties.DeliveryMode = 2; //消息是持久的,存在并不会受服务器重启影响  
                    byte[] body = null;
                    //消息是以二进制数组的形式传输的,所以如果消息是实体对象的话,需要序列化和然后转化为二进制数组。
                    for (int i = 0; i < 100; i++)
                    {
                        body = Encoding.UTF8.GetBytes("这是第-----"+i+"-----条消息");
                        //channel.BasicPublish("", "firstQueue", null, body);
                        channel.BasicPublish("queueExchange", "queueRoutingKey", properties, body); 
                        Console.Write("成功发送第-----"+i+"-----条消息!");
                    }

消费者a,创建一个新队列,绑定到和生产者同一个交换区,读取刚刚创建的新队列数据。

 //创建一个新的,持久的交换区  
                    //channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null);  
                    //还是连接到哪个队列
                    channel.QueueDeclare("queue_a_b", true, false, false, null);
                    // 绑定队列到交换区  
                    channel.QueueBind("queue_a_b", "queueExchange", "queueRoutingKey"); 
                    //定义消息接受者
                    var customer = new QueueingBasicConsumer(channel);
                    //从指定队列获取消息,
                    //中间这个参数实际必须打开,为false,意思是是否不通知rabbitm已经处理完毕,我们这里要设成false,要通知
                    //channel.BasicConsume("firstQueue",true,customer);
                    channel.BasicConsume("queue_a_b", false, customer);
                    //由于队列分发不公平导致一个压力很大,一个很小,在这设置下,公平q分发,也就是一个消费者处理完通知队列后,才会继续分发一个
                    channel.BasicQos(0, 1, false);
                    //开始不断循环出队列的消息
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue();
                        //将消息二进制转回字符串
                        var msg = Encoding.UTF8.GetString(ea.Body);
                        //通知队列,已经处理完成
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine(msg);
                    }

消费b,直接读取生产者创建的queue_a队列消息

 //创建一个新的,持久的交换区  
                    //channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null);
                    //还是连接到哪个队列
                    //channel.QueueDeclare("newQueue", true, false, false, null);
                    // 绑定队列到交换区  
                    //channel.QueueBind("newQueue", "NewExchange", "newRoutingKey");
                    //定义消息接受者
                    var customer = new QueueingBasicConsumer(channel);
                    //从指定队列获取消息,
                    //中间这个参数实际必须打开,为false,意思是是否不通知rabbitm已经处理完毕,我们这里要设成false,要通知
                    //channel.BasicConsume("firstQueue",true,customer);
                    channel.BasicConsume("queue_a", false, customer);
                    //由于队列分发不公平导致一个压力很大,一个很小,在这设置下,公平q分发,也就是一个消费者处理完通知队列后,才会继续分发一个
                    channel.BasicQos(0, 1, false);
                    //开始不断循环出队列的消息
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue();
                        //将消息二进制转回字符串
                        var msg = Encoding.UTF8.GetString(ea.Body);
                        //通知队列,已经处理完成
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine(msg);
                    }

运行结果

可以看见,通过路由匹配,一次发送消息,发送到匹配到的两个队列中,两个消费者各自读取各自的队列。

3.篇幅有限,再来一个路由类型为Topic的代码例子。

生产者,由于已经创建了一个queueexChange类型为direct的交换区,不能更改类型,所以重新创建一个交换区

  //创建一个队列
                    //队列是否持久化
                    bool durable = true;
                    //已经存在的队列,不能再定义持久化
                   // channel.QueueDeclare("firstQueue",false,false,false,null);
                    //创建一个新的,持久的交换区  
                    channel.ExchangeDeclare("queueTopicExchange", ExchangeType.Topic, true, false, null);  
                    //持久的队列, 没有排他性,与不自动删除 
                    channel.QueueDeclare("queue.a", durable, false, false, null);
                    // 绑定队列到交换区  
                    channel.QueueBind("queue.a", "queueTopicExchange", "queue.#"); 
                    //消息持久化
                    var properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);
                    properties.DeliveryMode = 2; //消息是持久的,存在并不会受服务器重启影响  
                    byte[] body = null;
                    //消息是以二进制数组的形式传输的,所以如果消息是实体对象的话,需要序列化和然后转化为二进制数组。
                    for (int i = 0; i < 100; i++)
                    {
                        body = Encoding.UTF8.GetBytes("这是第-----"+i+"-----条消息");
                        //channel.BasicPublish("", "firstQueue", null, body);
                        channel.BasicPublish("queueTopicExchange", "queue.#", properties, body); 
                        Console.Write("成功发送第-----"+i+"-----条消息!");
                    }

消费者a:

 //创建一个新的,持久的交换区  
                    //channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null);  
                    //还是连接到哪个队列
                    channel.QueueDeclare("queue.a.b", true, false, false, null);
                    // 绑定队列到交换区  
                    channel.QueueBind("queue.a.b", "queueTopicExchange", "queue.#"); 
                    //定义消息接受者
                    var customer = new QueueingBasicConsumer(channel);
                    //从指定队列获取消息,
                    //中间这个参数实际必须打开,为false,意思是是否不通知rabbitm已经处理完毕,我们这里要设成false,要通知
                    //channel.BasicConsume("firstQueue",true,customer);
                    channel.BasicConsume("queue.a.b", false, customer);
                    //由于队列分发不公平导致一个压力很大,一个很小,在这设置下,公平q分发,也就是一个消费者处理完通知队列后,才会继续分发一个
                    channel.BasicQos(0, 1, false);
                    //开始不断循环出队列的消息
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue();
                        //将消息二进制转回字符串
                        var msg = Encoding.UTF8.GetString(ea.Body);
                        //通知队列,已经处理完成
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine(msg);
                    }

消费者b:

 //创建一个新的,持久的交换区  
                    //channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null);
                    //还是连接到哪个队列
                    //channel.QueueDeclare("newQueue", true, false, false, null);
                    // 绑定队列到交换区  
                    //channel.QueueBind("newQueue", "NewExchange", "newRoutingKey");
                    //定义消息接受者
                    var customer = new QueueingBasicConsumer(channel);
                    //从指定队列获取消息,
                    //中间这个参数实际必须打开,为false,意思是是否不通知rabbitm已经处理完毕,我们这里要设成false,要通知
                    //channel.BasicConsume("firstQueue",true,customer);
                    channel.BasicConsume("queue.a", false, customer);
                    //由于队列分发不公平导致一个压力很大,一个很小,在这设置下,公平q分发,也就是一个消费者处理完通知队列后,才会继续分发一个
                    channel.BasicQos(0, 1, false);
                    //开始不断循环出队列的消息
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue();
                        //将消息二进制转回字符串
                        var msg = Encoding.UTF8.GetString(ea.Body);
                        //通知队列,已经处理完成
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine(msg);
                    }

运行结果:

代码例子就不一一写出来了,还有很多种情况,实际项目根据不同的需求灵活运用,有兴趣的可以自己搭配测试一下。

转载于:https://www.cnblogs.com/saltlight-wangchao/p/6248143.html

相关文章:

  • Oracle Data Guard的配置
  • SolrJ 复杂查询 高亮显示
  • Oracle 11g 启动EM报错的解决方法
  • python第二周数据类型 字符编码 文件处理
  • ORACLE CASE WHEN 及 SELECT CASE WHEN的用法
  • oracle数据库内存调整之增加内存
  • 实战|智能家居行业移动应用性能分析
  • html5--6-23 CSS3中的文字与字体
  • activity theme parent 属性浅析
  • Filter过滤器,Interceptor拦截器,ControllerAdvice,Aspect切片
  • php调用系统命令如何获得输出结果
  • 实现一个websocket服务器-实践篇
  • 分布式架构高可用架构篇_04_Keepalived+Nginx实现高可用Web负载均衡
  • ABP理论学习之缓存Caching
  • Linux GPIO控制方法
  • 【译】理解JavaScript:new 关键字
  • Brief introduction of how to 'Call, Apply and Bind'
  • Hibernate【inverse和cascade属性】知识要点
  • Java 最常见的 200+ 面试题:面试必备
  • JWT究竟是什么呢?
  • macOS 中 shell 创建文件夹及文件并 VS Code 打开
  • PAT A1050
  • Spring Cloud中负载均衡器概览
  • 读懂package.json -- 依赖管理
  • 干货 | 以太坊Mist负责人教你建立无服务器应用
  • 开放才能进步!Angular和Wijmo一起走过的日子
  • 利用DataURL技术在网页上显示图片
  • 算法-插入排序
  • 正则与JS中的正则
  • 大数据全解:定义、价值及挑战
  • 没有任何编程基础可以直接学习python语言吗?学会后能够做什么? ...
  • ​linux启动进程的方式
  • ​一文看懂数据清洗:缺失值、异常值和重复值的处理
  • ###项目技术发展史
  • #考研#计算机文化知识1(局域网及网络互联)
  • (八)c52学习之旅-中断实验
  • (笔试题)合法字符串
  • (多级缓存)多级缓存
  • (翻译)Entity Framework技巧系列之七 - Tip 26 – 28
  • (分布式缓存)Redis分片集群
  • (附源码)ssm本科教学合格评估管理系统 毕业设计 180916
  • (一)Spring Cloud 直击微服务作用、架构应用、hystrix降级
  • *Algs4-1.5.25随机网格的倍率测试-(未读懂题)
  • .net 程序发生了一个不可捕获的异常
  • .net 生成二级域名
  • .NET/C# 使用反射调用含 ref 或 out 参数的方法
  • .vimrc php,修改home目录下的.vimrc文件,vim配置php高亮显示
  • @Service注解让spring找到你的Service bean
  • [04]Web前端进阶—JS伪数组
  • [bug总结]: Feign调用GET请求找不到请求体实体类
  • [delphi]保证程序只运行一个实例
  • [leetcode 189][轮转数组]
  • [Oh My C++ Diary]operator++()和operator++(int)的区别
  • [OLEDB] 目前还找找不到处理下面错误的办法
  • [php] 数据结构算法(PHP描述) 快速排序 quick sort