当前位置: 首页 > news >正文

RabbitMQ(二)队列与消息的持久化

当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要做其它的事情,且会消耗很长的时间,在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者宕掉了,这时候就要使用消息接收确认机制,可以让其它的消费者再次执行刚才宕掉的消费者没有完成的事情。另外,在默认情况下,我们创建的消息队列以及存放在队列里面的消息,都是非持久化的,也就是说当RabbitMQ宕掉了或者是重启了,创建的消息队列以及消息都不会保存,为了解决这种情况,保证消息传输的可靠性,我们可以使用RabbitMQ提供的消息队列的持久化机制。

 

 

 生产者:

 1  import  com.rabbitmq.client.ConnectionFactory;
 2  import  com.rabbitmq.client.Connection;
 3  import  com.rabbitmq.client.Channel;
 4  import  com.rabbitmq.client.MessageProperties;
 5  public   class  ClientSend1 {
 6       public   static   final  String queue_name = " my_queue " ;
 7       public   static   final   boolean  durable = true // 消息队列持久化
 8       public   static   void  main(String[] args)
 9       throws  java.io.IOException{
10          ConnectionFactory factory = new  ConnectionFactory();  // 创建连接工厂
11          factory.setHost( " localhost " );
12          factory.setVirtualHost( " my_mq " );
13          factory.setUsername( " zhxia " );
14          factory.setPassword( " 123456 " );
15          Connection connection = factory.newConnection();  // 创建连接
16          Channel channel = connection.createChannel(); // 创建信道
17           channel.queueDeclare(queue_name, durable,  false false null );  // 声明消息队列,且为可持久化的
18          String message = " Hello world " + Math.random();
19           // 将队列设置为持久化之后,还需要将消息也设为可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
20           channel.basicPublish( "" , queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
21          System.out.println( " Send message: " + message);
22          channel.close();
23          connection.close();
24      }
25 
26  }

 

说明:

行17 和行20 需要同时设置,也就是将队列设置为持久化之后,还需要将发送的消息也要设置为持久化才能保证队列和消息一直存在

 消费者:

 1  import  com.rabbitmq.client.ConnectionFactory;
 2  import  com.rabbitmq.client.Connection;
 3  import  com.rabbitmq.client.Channel;
 4  import  com.rabbitmq.client.QueueingConsumer;
 5  public   class  ClientReceive1 {
 6       public   static   final  String queue_name = " my_queue " ;
 7       public   static   final   boolean  autoAck = false ;
 8       public   static   final   boolean  durable = true ;
 9       public   static   void  main(String[] args)
10       throws  java.io.IOException,java.lang.InterruptedException{
11          ConnectionFactory factory = new  ConnectionFactory();
12          factory.setHost( " localhost " );
13          factory.setVirtualHost( " my_mq " );
14          factory.setUsername( " zhxia " );
15          factory.setPassword( " 123456 " );
16          Connection connection = factory.newConnection();
17          Channel channel = connection.createChannel();
18          channel.queueDeclare(queue_name, durable,  false false null );
19          System.out.println( " Wait for message " );
20           channel.basicQos( 1 );  // 消息分发处理
21          QueueingConsumer consumer = new  QueueingConsumer(channel);
22          channel.basicConsume(queue_name, autoAck, consumer);
23           while ( true ){
24              Thread.sleep( 500 );
25              QueueingConsumer.Delivery deliver = consumer.nextDelivery();
26              String message = new  String(deliver.getBody());
27              System.out.println( " Message received: " + message);
28              channel.basicAck(deliver.getEnvelope().getDeliveryTag(),  false );
29          }
30      }
31  }

 说明:

行22: 设置RabbitMQ调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完并且已经对刚才处理的消息进行确认之后, 才发送下一条消息,防止消费者太过于忙碌。如下图所示:

 

 

 

转载于:https://www.cnblogs.com/xiazh/archive/2011/04/29/2004859.html

相关文章:

  • 【敏捷开发实践】宣言与原则
  • Linux下有趣的命令
  • 让dedecms(织梦)的list标签支持weight排序
  • 翻译 tiobe发布2011年5月开发语言排名
  • 研磨设计模式笔记1
  • [zt]提问的艺术
  • Dell Latitude E6400找不到Broadcom USH及Contacted SmartCard驱动
  • mongodb内存映射存储引擎
  • poj2407
  • (十一)手动添加用户和文件的特殊权限
  • 日常使用频率很高的英语口语集锦
  • 时间问题导致Citrix License无法导入
  • js实现无限级树形导航列表
  • Android中文合集 最终版
  • AutoVBA利用Hacth对象填充图元对象
  • 「前端」从UglifyJSPlugin强制开启css压缩探究webpack插件运行机制
  • Angular 2 DI - IoC DI - 1
  • javascript面向对象之创建对象
  • Spring思维导图,让Spring不再难懂(mvc篇)
  • 山寨一个 Promise
  • 网络应用优化——时延与带宽
  • C# - 为值类型重定义相等性
  • 回归生活:清理微信公众号
  • 容器镜像
  • # Apache SeaTunnel 究竟是什么?
  • #Linux(权限管理)
  • (Arcgis)Python编程批量将HDF5文件转换为TIFF格式并应用地理转换和投影信息
  • (Matalb分类预测)GA-BP遗传算法优化BP神经网络的多维分类预测
  • (阿里巴巴 dubbo,有数据库,可执行 )dubbo zookeeper spring demo
  • (附源码)spring boot网络空间安全实验教学示范中心网站 毕业设计 111454
  • (十二)python网络爬虫(理论+实战)——实战:使用BeautfulSoup解析baidu热搜新闻数据
  • (四)linux文件内容查看
  • (转)如何上传第三方jar包至Maven私服让maven项目可以使用第三方jar包
  • .bat批处理(六):替换字符串中匹配的子串
  • .libPaths()设置包加载目录
  • .NET Core 通过 Ef Core 操作 Mysql
  • /etc/fstab 只读无法修改的解决办法
  • @SentinelResource详解
  • [acwing周赛复盘] 第 94 场周赛20230311
  • [C#] 如何调用Python脚本程序
  • [c++] C++多态(虚函数和虚继承)
  • [Docker]十二.Docker consul集群搭建、微服务部署,Consul集群+Swarm集群部署微服务实战
  • [HNOI2010]BUS 公交线路
  • [LeetCode周赛复盘] 第 310 场周赛20220911
  • [Linux]history 显示命令的运行时间
  • [Linux]文件基础-如何管理文件
  • [PHP] 算法-字符串的左循环的PHP实现
  • [pyqt5]PyQt5窗体背景图片拉伸填充
  • [pytorch入门] 2. tensorboard
  • [ruby on rails]rack-cors, rack-attack
  • [Servlet 3]会话管理、进阶API、监听过滤器
  • [UI5 常用控件] 01.Text
  • [Windows][Linux]字体相关
  • [河北银行 2022 CTF]
  • [开发语言][c++][python]:C++与Python中的赋值、浅拷贝与深拷贝