当前位置: 首页 > news >正文

RocketMQ快速入门:集成spring, springboot实现各类消息消费(七)附带源码

0. 引言

rocketmq支持两种消费模式:pull和push,在实际开发中这两种模式分别是如何实现的呢,在spring框架和springboot框架中集成有什么差异?今天我们一起来探究这两个问题。

1. java client实现消息消费

1、添加依赖

		<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency>

1.1 Push消息消费

rocketmq的push消费是通过pull模式为基础来进行模拟的,就是通过监听器,不断的pull来实现,因此其实现重点就是实现监听器

rocektmq的监听器支持2种:

  • MessageListenerConcurrently 拉取到新消息之后就提交到线程池去消费
  • MessageListenerOrderly 通过加分布式锁和本地锁保证同时只有一条线程去消费一个队列上的数据,以此保证顺序消费

这里虽然还有MessageListener类型,实际上是上述两种的父类,该方法也被弃用了
在这里插入图片描述
所以push模式的的重点就是实现MessageListenerConcurrently监听器,其内部只有一个consumeMessage方法
在这里插入图片描述
那么实现的重点就是consumeMessage方法,这里我们睡眠了10s,用于模拟该监听器运行10s

public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");consumer.setNamesrvAddr("127.0.0.1:9876");// 集群消费模式consumer.setMessageModel(MessageModel.CLUSTERING);// 设置topicconsumer.subscribe("topic_test", "*");// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {for (MessageExt msg : list) {String topic = msg.getTopic();try {String messageBody = new String(msg.getBody(), "utf-8");System.out.println(topic+":"+messageBody);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();Thread.sleep(10000);}

当然,如上的形式只能用于我们单元测试使用,集成在生产中时肯定不能这样用,我们需要将其注册为bean形式,并在项目启动时进行调用,让其注册为监听器

@Component
public class Consumer1PushListener implements MessageListenerConcurrently {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {String topic = msg.getTopic();try {String messageBody = new String(msg.getBody(), "utf-8");System.out.println(topic+":"+messageBody);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}@PostConstructpublic void init(){DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");consumer.setNamesrvAddr("127.0.0.1:9876");// 集群消费模式consumer.setMessageModel(MessageModel.CLUSTERING);// 注册监听器consumer.registerMessageListener(this);try{// 设置topicconsumer.subscribe("topic_test", "*");// 启动示例consumer.start();}catch (Exception e){e.printStackTrace();System.out.println("rocketmq 消费者启动失败");}}
}

我们启动项目,发送一条消息,会发现消费者可以实时消费

在这里插入图片描述
消息模式如何调整?
rocektmq 有集群模式和广播模式两种消息模式,如果需要调整的话,通过消费者的setMessageModel方法即可调整

// 集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

1.2 Pull消息消费

pull模式的实现更加简单,直接查看pull消费者类DefaultMQPullConsumer,其下有pull方法
在这里插入图片描述
官方给出的示例代码如下:

public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.start();try {MessageQueue mq = new MessageQueue();mq.setQueueId(0);mq.setTopic("topic_test");mq.setBrokerName("Broker");long offset = 26;PullResult pullResult = consumer.pull(mq, "*", offset, 32);if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {System.out.printf("%s%n", pullResult.getMsgFoundList());consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());}} catch (Exception e) {e.printStackTrace();}consumer.shutdown();}

但是截止目前,该类已经被弃用了
在这里插入图片描述
更加推荐的是用DefaultLitePullConsumer类实现,其下的poll方法可以帮助我们更加方便的实现消息消费,这里需要注意,两个类,一个是pull,一个是poll,pull实际上是需要指定偏移量的,而poll则自动帮我们更新了偏移量

public static void main(String[] args) throws MQClientException {DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("group2");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("topic_test", "*");consumer.start();try {List<MessageExt> messageList = consumer.poll(3000);for (MessageExt message : messageList) {System.out.println("pull消费:"+new String(message.getBody()));}} catch (Exception e) {e.printStackTrace();}consumer.shutdown();}

发送几条消息,运行测试
在这里插入图片描述
生产中使用时,大家可以把DefaultLitePullConsumer定义为bean, 以此减少每次资源创建的消耗,具体方式可参考上述push模式的实现代码

1.3 顺序消息消费

rocketmq中提供了两种消费处理形式:并发消费(MessageListenerConcurrently)和顺序消费(MessageListenerOrderly

并发消费消费者会创建多个线程同时消费队列消息,而顺序消费流程跟并发消费最大的区别在于,顺序消费对要处理的队列加锁,确保同一队列,同一时间,只允许一个消费线程处理

我们在之前消息发送的章节已经提前体验过顺序消费代码实现了,通过上述对监听器类型的描述,我们也能知道顺序消费的实现,就是实现MessageListenerOrderly监听器

public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");consumer.setNamesrvAddr("127.0.0.1:9876");// 集群消费模式consumer.setMessageModel(MessageModel.CLUSTERING);// 设置topicconsumer.subscribe("topic_order", "*");// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {byte[] body = list.get(0).getBody();System.out.println("接收消息:"+new String(body, StandardCharsets.UTF_8));return ConsumeOrderlyStatus.SUCCESS;}});// 启动消费者实例consumer.start();Thread.sleep(10000);}

2. springboot实现消息消费

1、添加依赖

		<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>

2、修改配置项

rocketmq:name-server: localhost:9876producer:group: group_test # 生产者分组,事务消息会使用send-message-timeout: 3000 # 消息发送超时时长,默认3sretry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

2.1 push消息消费

通过实现RocketMQListener<T>接口,其中T是泛型,及消息内容的数据类型,可以是String, JSONObject,也可以是自定义数据结构类型

将监听器声明为bean,并实现onMessage方法即可

@Component
@RocketMQMessageListener(topic = "topic_test", consumerGroup = "group_test")
public class MessageListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("消费消息:" + s);}
}

注解中的messageModel属性可以用来设置消息模式,默认为集群模式

在这里插入图片描述

2.2 pull消息消费

添加消费者配置

rocketmq:name-server: localhost:9876consumer:group: "group_test"topic: "topic_test"

通过receive方法实现消费

 @GetMapping(value = "/poll")public void poll() {List<String> list = rocketMQTemplate.receive(String.class);for (String message : list) {System.out.println("poll消费:"+message);}}

2.3 顺序消息消费

与普通消息不同的是,要声明消费模式为顺序消费consumeMode= ConsumeMode.ORDERLY

@Component
@RocketMQMessageListener(topic = "topic_order", consumerGroup = "group_order", consumeMode= ConsumeMode.ORDERLY)
public class MessageOrderListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("顺序消费消息:" + s);}
}

3. 总结

消息消费相对更加简单,实际上掌握一种之后,其他类型的也就能够举一反三了,本文也只是针对最常用的类型进行列举,还有更多参数的支持,需要大家在实际应用中探索。

本文演示源码见:https://gitee.com/wuhanxue/wu_study/tree/master/demo/rocketmq_demo

在这里插入图片描述

相关文章:

  • 热门开源项目
  • 【python】PyQt5初体验,窗口等组件开发技巧,面向对象方式开发流程实战
  • 报错:ZeroDivisionError_ division by zero
  • PHP框架之ThinkPHP框架
  • vscode插件开发之 - TestController
  • VUE3版本新特性
  • 05. Java多线程 join 方法
  • c++设计模式之一创建型模式
  • 1964springboot VUE 智慧社区可视化平台系统开发mysql数据库web结构java编程计算机网页源码maven项目
  • 问题解决:Problem exceeding maximum token in azure openai (with java)
  • 分布式光纤测温DTS使用的单模光纤与多模光纤有何区别?
  • Leetcode - 周赛401
  • 八大经典排序算法
  • kali中安装docker
  • SARscape——Lee滤波
  • 【RocksDB】TransactionDB源码分析
  • css的样式优先级
  • Docker 笔记(2):Dockerfile
  • express如何解决request entity too large问题
  • input实现文字超出省略号功能
  • Java 多线程编程之:notify 和 wait 用法
  • JAVA 学习IO流
  • JS数组方法汇总
  • Netty 框架总结「ChannelHandler 及 EventLoop」
  • Perseus-BERT——业内性能极致优化的BERT训练方案
  • Sass 快速入门教程
  • Vue源码解析(二)Vue的双向绑定讲解及实现
  • 代理模式
  • 实战|智能家居行业移动应用性能分析
  • 使用 Node.js 的 nodemailer 模块发送邮件(支持 QQ、163 等、支持附件)
  • 世界上最简单的无等待算法(getAndIncrement)
  • 走向全栈之MongoDB的使用
  • Play Store发现SimBad恶意软件,1.5亿Android用户成受害者 ...
  • 如何用纯 CSS 创作一个菱形 loader 动画
  • ​插件化DPI在商用WIFI中的价值
  • #VERDI# 关于如何查看FSM状态机的方法
  • #面试系列-腾讯后端一面
  • (20)目标检测算法之YOLOv5计算预选框、详解anchor计算
  • (30)数组元素和与数字和的绝对差
  • (第三期)书生大模型实战营——InternVL(冷笑话大师)部署微调实践
  • (六) ES6 新特性 —— 迭代器(iterator)
  • (十五)devops持续集成开发——jenkins流水线构建策略配置及触发器的使用
  • (转)ORM
  • .apk 成为历史!
  • .bat批处理(九):替换带有等号=的字符串的子串
  • .NET Framework 和 .NET Core 在默认情况下垃圾回收(GC)机制的不同(局部变量部分)
  • .NET 使用 ILRepack 合并多个程序集(替代 ILMerge),避免引入额外的依赖
  • .skip() 和 .only() 的使用
  • [ vulhub漏洞复现篇 ] ECShop 2.x / 3.x SQL注入/远程执行代码漏洞 xianzhi-2017-02-82239600
  • [ 攻防演练演示篇 ] 利用通达OA 文件上传漏洞上传webshell获取主机权限
  • [\u4e00-\u9fa5] //匹配中文字符
  • [240527] 谷歌 CEO 承认 AI 编造虚假信息问题难解(此文使用 @gemini 命令二次创作)| ICQ 停止运作
  • [ABP实战开源项目]---ABP实时服务-通知系统.发布模式
  • [Android] Binder 里的 Service 和 Interface 分别是什么
  • [CLR via C#]11. 事件