当前位置: 首页 > news >正文

详解RocketMQ中的consumer

上述就是MQ中有关Consumer的类图,下面来介绍一下每个类

 

 1.MQAdmin:底层类,上篇博客已经提过,就不再此重提

 2.MQConsumer:Consumer公共的接口,常用的方法如下

 

 如果消费失败的话,消息将会返回到broker中,并且延迟一会消费的时间

   void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)  throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

 

 3.MQPushConsumer:Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法

 

4.MQPullConsumer:Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制

 

 在上图中出现了两类的消费者分别是PushConsumer和PullConsumer,下面来看一下

 

 PushConsumer:通过注册监听的方式来消费信息

 

[java]  view plain  copy
 
 print?
  1. <span style="font-family:Comic Sans MS;font-size:18px;">/**      
  2.  * @FileName: Consumer.java    
  3.  * @Package:com.test    
  4.  * @Description: TODO   
  5.  * @author: LUCKY     
  6.  * @date:2015年12月28日 下午2:43:23    
  7.  * @version V1.0      
  8.  */  
  9. package com.test;  
  10.   
  11. import java.util.List;  
  12.   
  13. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
  14. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
  15. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
  16. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
  17. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
  18. import com.alibaba.rocketmq.common.message.Message;  
  19. import com.alibaba.rocketmq.common.message.MessageExt;  
  20.   
  21. /** 
  22.  * @ClassName: Consumer 
  23.  * @Description: 模拟消费者 
  24.  * @author: LUCKY 
  25.  * @date:2015年12月28日 下午2:43:23 
  26.  */  
  27. public class ConsumerTest {  
  28.   
  29.     public static void main(String[] args) {  
  30.         DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a");  
  31.         consumer.setNamesrvAddr("100.66.154.81:9876");  
  32.         try {  
  33.               
  34.             // 订阅PushTopic下Tag为push的消息,都订阅消息  
  35.             consumer.subscribe("PushTopic", "push");  
  36.               
  37.             // 程序第一次启动从消息队列头获取数据  
  38.             consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  39.             //可以修改每次消费消息的数量,默认设置是每次消费一条  
  40.             // consumer.setConsumeMessageBatchMaxSize(10);  
  41.   
  42.             //注册消费的监听  
  43.             consumer.registerMessageListener(new MessageListenerConcurrently() {  
  44.                //在此监听中消费信息,并返回消费的状态信息  
  45.                 public ConsumeConcurrentlyStatus consumeMessage(  
  46.                         List<MessageExt> msgs,  
  47.                         ConsumeConcurrentlyContext context) {  
  48.                       
  49.                     // msgs中只收集同一个topic,同一个tag,并且key相同的message  
  50.                     // 会把不同的消息分别放置到不同的队列中  
  51.                     for(Message msg:msgs){  
  52.               
  53.                         System.out.println(new String(msg.getBody()));  
  54.                     }     
  55.                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
  56.                 }  
  57.             });  
  58.   
  59.             consumer.start();  
  60.             Thread.sleep(5000);  
  61.             //5秒后挂载消费端消费  
  62.             consumer.suspend();  
  63.               
  64.         } catch (Exception e) {  
  65.             e.printStackTrace();  
  66.         }  
  67.     }  
  68. }  
  69. </span>  

 

 

 PullConsumer:通过拉去的方式来消费消息

 

[java]  view plain  copy
 
 print?
  1. <span style="font-family:Comic Sans MS;font-size:18px;">/**      
  2.  * @FileName: Consumer.java    
  3.  * @Package:com.test    
  4.  * @Description: TODO   
  5.  * @author: LUCKY     
  6.  * @date:2015年12月28日 下午2:43:23    
  7.  * @version V1.0      
  8.  */  
  9. package com.test;  
  10.   
  11. import java.util.Set;  
  12.   
  13. import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;  
  14. import com.alibaba.rocketmq.client.consumer.MessageQueueListener;  
  15. import com.alibaba.rocketmq.common.message.MessageQueue;  
  16.   
  17. /** 
  18.  * @ClassName: Consumer 
  19.  * @Description: 模拟消费者 
  20.  * @author: LUCKY 
  21.  * @date:2015年12月28日 下午2:43:23 
  22.  */  
  23. public class ConsumerPullTest {  
  24.   
  25.     public static void main(String[] args) {  
  26.         DefaultMQPullConsumer consumer=new DefaultMQPullConsumer();  
  27.         consumer.setNamesrvAddr("100.66.154.81:9876");  
  28.        consumer.setConsumerGroup("broker");  
  29.         try {  
  30.             consumer.start();  
  31.         Set<MessageQueue> messageQueues=  consumer.fetchSubscribeMessageQueues("PushTopic");        
  32.   
  33.         for(MessageQueue messageQueue:messageQueues){  
  34.           
  35.             System.out.println(messageQueue.getTopic());  
  36.         }  
  37.           
  38.           
  39.         //消息队列的监听  
  40.         consumer.registerMessageQueueListener("", new MessageQueueListener() {  
  41.               
  42.             @Override  
  43.             //消息队列有改变,就会触发  
  44.             public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,  
  45.                     Set<MessageQueue> mqDivided) {  
  46.                 // TODO Auto-generated method stub  
  47.                   
  48.             }  
  49.         });  
  50.               
  51.       
  52.         } catch (Exception e) {  
  53.             e.printStackTrace();  
  54.         }  
  55.     }  
  56. }  
  57. </span>  


一般在应用中都会采用push的方法来自动的消费信息

相关文章:

  • PHP中使用XMLReader读取xml文档内容
  • 学习Android路上的一些感慨和总结,慢慢来,比较快!
  • UVA 11212 Editing a Book
  • tomcat报错:java.net.SocketException: Permission denied[http-nio-80]
  • 入手阿里云新服务器的部署NODE
  • C#组件系列——又一款Excel处理神器Spire.XLS,你值得拥有
  • 运行时添加log4j2的appender
  • win产品密钥大搜集
  • PowerShell查询AD域内长期没有登录的计算机对象
  • 取distinct数据同时还取其他字段
  • RHCS+Conga+GFS+cLVM共享存储的高可用性web集群
  • 【20160924】GOCVHelper综述
  • Maven 自定义 archetype
  • 谈谈一些有趣的CSS题目(六)-- 全兼容的多列均匀布局问题
  • 7.12 Java-based container configuration (基于java的容器配置)
  • 《微软的软件测试之道》成书始末、出版宣告、补充致谢名单及相关信息
  • 2017年终总结、随想
  • 345-反转字符串中的元音字母
  • Consul Config 使用Git做版本控制的实现
  • echarts花样作死的坑
  • JDK9: 集成 Jshell 和 Maven 项目.
  • Spring思维导图,让Spring不再难懂(mvc篇)
  • ⭐ Unity 开发bug —— 打包后shader失效或者bug (我这里用Shader做两张图片的合并发现了问题)
  • vagrant 添加本地 box 安装 laravel homestead
  • 百度贴吧爬虫node+vue baidu_tieba_crawler
  • 基于OpenResty的Lua Web框架lor0.0.2预览版发布
  • 使用 5W1H 写出高可读的 Git Commit Message
  • 试着探索高并发下的系统架构面貌
  • 王永庆:技术创新改变教育未来
  • 一加3T解锁OEM、刷入TWRP、第三方ROM以及ROOT
  • 职业生涯 一个六年开发经验的女程序员的心声。
  • nb
  • ​Linux·i2c驱动架构​
  • # Python csv、xlsx、json、二进制(MP3) 文件读写基本使用
  • ###51单片机学习(2)-----如何通过C语言运用延时函数设计LED流水灯
  • (2/2) 为了理解 UWP 的启动流程,我从零开始创建了一个 UWP 程序
  • (Redis使用系列) Springboot 实现Redis 同数据源动态切换db 八
  • (solr系列:一)使用tomcat部署solr服务
  • (附源码)ssm基于jsp的在线点餐系统 毕业设计 111016
  • (九)One-Wire总线-DS18B20
  • (论文阅读40-45)图像描述1
  • (没学懂,待填坑)【动态规划】数位动态规划
  • (三)终结任务
  • (一)使用Mybatis实现在student数据库中插入一个学生信息
  • (原创) cocos2dx使用Curl连接网络(客户端)
  • ***linux下安装xampp,XAMPP目录结构(阿里云安装xampp)
  • *1 计算机基础和操作系统基础及几大协议
  • *p=a是把a的值赋给p,p=a是把a的地址赋给p。
  • .NET(C#、VB)APP开发——Smobiler平台控件介绍:Bluetooth组件
  • .netcore 如何获取系统中所有session_ASP.NET Core如何解决分布式Session一致性问题
  • .NET微信公众号开发-2.0创建自定义菜单
  • .pub是什么文件_Rust 模块和文件 - 「译」
  • .vimrc php,修改home目录下的.vimrc文件,vim配置php高亮显示
  • @ConditionalOnProperty注解使用说明
  • @JSONField或@JsonProperty注解使用