当前位置: 首页 > news >正文

系列十(实战)、发送 接收批量消息(Java操作RocketMQ)

一、发送 & 接收批量消息

1.1、概述

        批量消息是指RocketMQ可以把一组消息集合一次性发送,这一组消息会被当做一个消息供消费者消费。

1.2、Demo05MQTestApp 

/*** @Author : 一叶浮萍归大海* @Date: 2023/12/25 11:48* @Description: 发送 & 接收批量消息*/
@Slf4j
public class Demo05MQTestApp {/*** 发送批量消息*/@Testpublic void demo5Producer() throws Exception {// 1、创建一个生产者DefaultMQProducer producer = new DefaultMQProducer("batch-producer-group");// 2、连接NameServerproducer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);// 3、启动producer.start();// 4、创建批量消息List<Message> messages = Arrays.asList(new Message("batch-topic", "我是一组消息中的A消息".getBytes(StandardCharsets.UTF_8)),new Message("batch-topic", "我是一组消息中的B消息".getBytes(StandardCharsets.UTF_8)),new Message("batch-topic", "我是一组消息中的C消息".getBytes(StandardCharsets.UTF_8)));// 5、发送消息producer.send(messages);log.info("【demo5Producer】发送消息成功!");// 6、关闭生产者producer.shutdown();}/*** 接收批量消息(Push方式)*/@Testpublic void demo5PushConsumer() throws Exception {// 1、创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch-consumer-group");// 2、连接NameServerconsumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);// 3、订阅消息,*表示订阅该主题所有的消息consumer.subscribe("batch-topic", "*");// 4、设置监听器(采用异步回调方式,一直监听)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {// 业务逻辑log.info("我是消费者【demo5PushConsumer】");for (MessageExt msg : msgs) {log.info("我是消费者【demo5PushConsumer】,我收到的消息是:{},当前时间:{}", StrUtil.utf8Str(msg.getBody()),LocalDateTimeUtil.format(LocalDateTime.now(),"yyyy-MM-dd HH:mm:ss"));}/*** 返回值:消费消息成功与否*      CONSUME_SUCCESS:表明消费成功,消息会从MQ出队*      RECONSUME_LATER:表明消费失败,消息会重新回到队里,过一会儿再重新投递出来给当前消费者或者其他消费者*/return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5、启动consumer.start();log.info("【demo5PushConsumer】启动成功,正在等待接收消息...");// 6、挂起当前JVMSystem.in.read();}}

1.3、测试

        先后运行demo5PushConsumer、demo5Producer,观察控制台日志输出信息。

相关文章:

  • 图像处理-周期噪声
  • 云计算:OpenStack 配置二层物理网卡为三层桥的接口
  • 文件监控-IT安全管理软件
  • 鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之线性布局容器Row组件
  • OpenHarmony城市技术论坛武汉站:探索大模型时代的终端操作系统创新
  • 二叉树的非递归遍历|前中后序遍历
  • android studio导入module
  • 解决:Vue2项目兼容IE,页面出现白屏
  • 单集群400TB,OceanBase稳定支撑快手核心业务场景
  • Django信号机制源码分析(观察者模式)
  • docker学习(二十一、network使用示例container、自定义)
  • 【自然语言处理】【大模型】 ΨPO:一个理解人类偏好学习的统一理论框架
  • Flink1.17实战教程(第二篇:DataStream API)
  • 云原生机器学习平台cube-studio开源项目及代码简要介绍
  • Python 网络编程之搭建简易服务器和客户端
  • [ JavaScript ] 数据结构与算法 —— 链表
  • __proto__ 和 prototype的关系
  • 【108天】Java——《Head First Java》笔记(第1-4章)
  • 2019.2.20 c++ 知识梳理
  • ABAP的include关键字,Java的import, C的include和C4C ABSL 的import比较
  • Hibernate【inverse和cascade属性】知识要点
  • JS+CSS实现数字滚动
  • Laravel 中的一个后期静态绑定
  • Laravel深入学习6 - 应用体系结构:解耦事件处理器
  • LeetCode541. Reverse String II -- 按步长反转字符串
  • macOS 中 shell 创建文件夹及文件并 VS Code 打开
  • Mysql5.6主从复制
  • Python 基础起步 (十) 什么叫函数?
  • Python爬虫--- 1.3 BS4库的解析器
  • SOFAMosn配置模型
  • 多线程事务回滚
  • 前端每日实战:70# 视频演示如何用纯 CSS 创作一只徘徊的果冻怪兽
  • 如何用vue打造一个移动端音乐播放器
  • 收藏好这篇,别再只说“数据劫持”了
  • 提醒我喝水chrome插件开发指南
  • 写代码的正确姿势
  • 原生js练习题---第五课
  • 在weex里面使用chart图表
  • 深度学习之轻量级神经网络在TWS蓝牙音频处理器上的部署
  • 基于django的视频点播网站开发-step3-注册登录功能 ...
  • ​2021半年盘点,不想你错过的重磅新书
  • ​html.parser --- 简单的 HTML 和 XHTML 解析器​
  • # 日期待t_最值得等的SUV奥迪Q9:空间比MPV还大,或搭4.0T,香
  • $.ajax()方法详解
  • (C语言)strcpy与strcpy详解,与模拟实现
  • (html5)在移动端input输入搜索项后 输入法下面为什么不想百度那样出现前往? 而我的出现的是换行...
  • (PHP)设置修改 Apache 文件根目录 (Document Root)(转帖)
  • (笔试题)合法字符串
  • (二)Eureka服务搭建,服务注册,服务发现
  • (分布式缓存)Redis分片集群
  • (附源码)计算机毕业设计SSM教师教学质量评价系统
  • (力扣记录)235. 二叉搜索树的最近公共祖先
  • (一)u-boot-nand.bin的下载
  • (一)UDP基本编程步骤
  • (转)C#调用WebService 基础