当前位置: 首页 > news >正文

SpringCloud Alibaba】(十三)学习 RocketMQ 消息队列

目录

  • 1、MQ 使用场景与选型对比
    • 1.1、MQ 的使用场景
    • 1.2、引入 MQ 后的注意事项
    • 1.3、MQ 选型对比
  • 2、下载、安装 RocketMQ 及 RocketMQ 控制台
    • 2.1、下载安装 RocketMQ
    • 2.2、测试 RocketMQ 环境
    • 2.3、RocketMQ 控制台【图形化管理控制台】
      • 2.3.1、下载、安装
      • 2.3.2、验证 RocketMQ 控制台
  • 3、RocketMQ 快速入门
    • 3.1、导入 RocketMQ 依赖
    • 3.2、编写生产者代码
    • 3.3、编写消费者代码
    • 3.4、测试消息的生产与消费
  • 4、集成 RocketMQ
    • 4.1、用户微服务集成 RocketMQ
    • 4.2、订单微服务整合 RocketMQ
    • 4.3、测试集成的 RocketMQ

1、MQ 使用场景与选型对比

1.1、MQ 的使用场景

MQ 的英文全称是 Message Queue,翻译成中文就是消息队列,队列实现了先进先出(FIFO)的消息模型。通过消息队列,我们可以实现多个进程之间的通信,例如,可以实现多个微服务之间的消息通信。MQ 的最简模型就是生产者生产消息,将消息发送到 MQ,消息消费者订阅 MQ,消费消息

在这里插入图片描述

MQ的使用场景通常包含:异步解耦、流量削峰

1.2、引入 MQ 后的注意事项

引入MQ最大的优点就是异步解耦和流量削峰,但是引入 MQ 后也有很多需要注意的事项和问题,主要包括:系统的整体可用性降低、系统的复杂度变高、引入了消息一致性的问题

  • 系统的整体可用性降低:在对一个系统进行架构设计时,引入的外部依赖越多,系统的稳定性和可用性就会降低。系统中引入了MQ,部分业务就会出现强依赖MQ的现象,此时,如果MQ宕机,则部分业务就会变得不可用。所以,引入MQ时,我们就要考虑如何实现MQ的高可用。
  • 系统的复杂度变高:引入 MQ 后,会使之前的同步接口调用变成通过 MQ 的异步调用,在实际的开发过程中,异步调用会比同步调用复杂的多。并且异步调用出现问题后,重现问题,定位问题和解决问题都会比同步调用复杂的多。并且引入 MQ 后,还要考虑如何保证消息的顺序等问题
  • 消息一致性问题 :引入 MQ 后,不得不考虑的一个问题就是消息的一致性问题。这期间就要考虑如何保证消息不丢失,消息幂等和消息数据处理的幂等性问题

1.3、MQ 选型对比

目前,在行业内使用的比较多的 MQ 包含 RabbitMQ、Kafka 和 RocketMQ。这里,我将三者的对比简单整理了个表格,如下所示:

消息中间件(MQ)优点缺点使用场景
RabbitMQ功能全面、消息的可靠性比较高吞吐量低,消息大量积累会影响性能,使用的开发语言是 erlang,不好定制功能规模不大的场景
Kafka吞吐量最高,性能最好,集群模式下高可用功能上比较单一,会丢失部分数据日志分析,大数据场景
RocketMQ吞吐量高,性能高,可用性高,功能全面。使用 Java 语言开发,容易定制功能开源版不如阿里云上版文档比较简单几乎支持所有场景,包含大数据场景和业务场景

2、下载、安装 RocketMQ 及 RocketMQ 控制台

2.1、下载安装 RocketMQ

Apache RocketMQ开发者指南

Windows 部署 RocketMQ

RocketMQ 下载、安装过程:

  1. 下载、解压。下载地址:二进制版本 4.9.2 官方下载

修改配置文件 conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH# 自动创建Topic
autoCreateTopicEnable=true
# nameServ地址
namesrvAddr=127.0.0.1:9876
# 存储路径
storePathRootDir=D:/dev/rocketmq-4.9.2/data/dataDir
# commitLog路径
storePathCommitLog=D:/dev/rocketmq-4.9.2/data/dataDir/commitlog
# 消息队列存储路径
storePathConsumeQueue=D:/dev/rocketmq-4.9.2/data/dataDir/consumequeue
# 消息索引存储路径
storePathIndex=D:/dev/rocketmq-4.9.2/data/dataDir/index
# checkpoint文件路径
storeCheckpoint=D:/dev/rocketmq-4.9.2/data/dataDir/checkpoint
# abort文件存储路径
abortFile=D:/dev/rocketmq-4.9.2/data/dataDir/abort
  1. 配置 ROCKETMQ 环境变量:否则,启动报错

    • 先添加一个环境变量 ROCKETMQ_HOME
      在这里插入图片描述
    • 在 Path 中进行添加

在这里插入图片描述

  1. 内存分配设置【可选】

①:编辑 server 启动文件:bin/runserver.cmd

set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

在这里插入图片描述

②:编辑 broker 启动文件:bin/runbroker.cmd

set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx256m -Xmn128m"

在这里插入图片描述

  1. 修改日志存储默认路径:conf/logback_broker.xmlconf/logback_namesrv.xmlconf/logback_tools.xml

在这里插入图片描述

  1. 启动 NameServer

命令行执行:

mqnamesrv.cmd

打印出如下信息,说明 RocketMQ 的 NameServer 启动成功了:

在这里插入图片描述

  1. 启动 Broker

命令行执行:

mqbroker.cmd -n localhost:9876

打印出如下信息,说明 RocketMQ 的 Broker 服务启动成功了:

在这里插入图片描述

2.2、测试 RocketMQ 环境

RocketMQ 内置了大量的测试案例,并且这些测试案例可以通过 RocketMQ 的 bin 目录下的 tools.cmd 命令进行测试

1、启动生产者程序向 RocketMQ 发送消息

重新打开 cmd 命令行,进入 RocketMQ 的 bin 目录,在命令行输入如下命令调用 RocketMQ 自带的生产者程序向 RocketMQ 发送消息:

set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Producer

可以看到,执行完上述两条命令后,生产者程序开始向 RocketMQ 发送消息:

在这里插入图片描述

2、启动消费者程序消费 RocketMQ 中的消息

重新打开 cmd 命令行,进入 RocketMQ 的 bin 目录,在命令行输入如下命令调用 RocketMQ 自带的消费者程序消费 RocketMQ 中的消息

set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Consumer

可以看到,执行完上述两条命令后,消费者程序开始消费 RocketMQ 中的消息:

在这里插入图片描述

2.3、RocketMQ 控制台【图形化管理控制台】

2.3.1、下载、安装

RocketMQ 控制台本质上是一个 SpringBoot 程序,启动后默认监听的端口是 8080。RocketMQ 的新版控制台已经从 RocketMQ 的 rocketmq-externals 项目中分离出来了。也就是说,新版的 RocketMQ 控制台已经从 https://github.com/apache/rocketmq-externals 链接所示的项目中分离出来,新版控制台的链接地址为:https://github.com/apache/rocketmq-dashboard

1、下载 RocketMQ 控制台源码
2、修改配置:src/main/resources/application.properties

  • 端口:7000
  • namesrvAddr:localhost:9876
  • dataPath:D:/dev/rocket-mq-master/data

在这里插入图片描述

3、打开 cmd 命令行,进入 RocketMQ 控制台源码的根目录,输入如下 Maven 命令开始编 RocketMQ 控制台的源码:

mvn clean install -Dmaven.test.skip=true

4、编译完成后,会在 RocketMQ 控制台源码的根目录下生成 target 目录,进入 target 目录,可以看到生成了 rocketmq-dashboard-1.0.1-SNAPSHOT.jar 文件,如下所示

在这里插入图片描述

5、重新打开 cmd 命令行,进入 rocketmq-dashboard-1.0.0.jar 文件所在的目录,在命令行直接输入如下命令启动 RocketMQ 控制台程序

java -jar rocketmq-console-ng-1.0.0.jar

2.3.2、验证 RocketMQ 控制台

在浏览器中输入 http://localhost:7000 后,出现如下画面说明 RocketMQ 启动成功【可切换语言】:

在这里插入图片描述

选择【Topic】菜单想后可以看到目前 RocketMQ 中存在一个名称为 TopicTest 的主题:

在这里插入图片描述

点击 TopicTest 主题的状态按钮,如下所示:

在这里插入图片描述

可以看到,正确显示出了 TopicTest 主题的消息队列信息,说明 RocketMQ 控制台启动成功了

3、RocketMQ 快速入门

3.1、导入 RocketMQ 依赖

在用户微服务 shop-userpom.xml 中,添加 RocketMQ 相关的依赖,如下所示:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.3</version>
</dependency>
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.5.2</version>
</dependency>

3.2、编写生产者代码

在用户微服务的 src/test/java 目录下新建 com.zzc.rocketmq.test包,在包下创建 RocketMQProducer 类,作为 RocketMQ 的生产者,代码如下所示:

public class RocketMQProducer {public static void main(String[] args) throws Exception {// 1.创建消息生产者producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("producerGroup");// 2.指定Nameserver地址producer.setNamesrvAddr("127.0.0.1:9876");// 3.启动producerproducer.start();// 4.构建消息对象Message message = new Message("bingheTopic", "bingheTag", "Hello RocketMQ".getBytes());System.out.println("生产者发出的消息为:" + JSONObject.toJSONString(message));// 5.发送消息并接收结果SendResult sendResult = producer.send(message);System.out.println("生产者收到的发送结果信息为:" + JSONObject.toJSONString(sendResult));// 6.关闭生产者producer.shutdown();}
}

3.3、编写消费者代码

com.zzc.rocketmq.test 包下新建 RocketMQConsumer 类,作为 RocketMQ 的消费者,代码如下所示:

public class RocketMQConsumer {public static void main(String[] args) throws Exception {// 1.创建消息消费者 consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");// 2.指定Nameserver地址consumer.setNamesrvAddr("127.0.0.1:9876");// 3.订阅 testTopic主题consumer.subscribe("testTopic", "*");// 4.设置消息监听,当收到消息时 RocketMQ 会回调消息监听consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {// 打印消息消费者收到的RocketMQ消息System.out.println("消费者收到的消息为:" + list);// 返回消息消费成功的标识return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5.启动消费者System.out.println("消费者启动成功");consumer.start();}
}

3.4、测试消息的生产与消费

1、为了便于观察,这里我们先启动消费者程序 RocketMQConsumer,启动 RocketMQConsumer后会在 IDEA 的控制台打印如下信息:

在这里插入图片描述

2、运行生产者程序 RocketMQProducer,运行后 RocketMQProducer 程序控制台会输出如下信息:

在这里插入图片描述

说明生产者程序 RocketMQProducer 成功将消息发送到 RocketMQ

3、接下来,再看下消费者程序 RocketMQConsumer 的控制台,如下所示:

在这里插入图片描述

说明生产者发送到 RocketMQ 的消息,被消费者成功消费到了

4、集成 RocketMQ

在项目中模拟一个用户成功下单后,为用户发送通知,通知用户下单成功的逻辑。

具体的流程:

下单成功后将订单的信息发送到 RocketMQ,然后用户微服务订阅 RocketMQ 的消息,接收到消息后进行打印

  • 用户微服务:消费者
  • 订单微服务:生产者

4.1、用户微服务集成 RocketMQ

1、在用户微服务 shop-user 导入了 RocketMQ 的依赖

2、在用户微服务 shop-userapplication.yml 文件中添加如下 RocketMQ 的配置

rocketmq:name-server: 127.0.0.1:9876

3、在用户微服务 shop-user 中创建 com.zzc.user.mq 包,在包下创建 RocketConsumeListener,实现 org.apache.rocketmq.spring.core.RocketMQListener 接口,具体代码如下所示:

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "user-group", topic = "order-topic")
public class RocketConsumeListener implements RocketMQListener<Order> {@Overridepublic void onMessage(Order order) {log.info("用户微服务收到了订单信息:{}", JSONObject.toJSONString(order));}
}

@RocketMQMessageListener 注解,表示当前类是一个 RocketMQ 的消费者,在@RocketMQMessageListener 注解中配置了消费者组为 user-group,主题为 order-topic

4.2、订单微服务整合 RocketMQ

1、在订单微服务 shop-orderpom.xml 文件中添加 RocketMQ 的依赖

2、在订单微服务 shop-orderapplication.yml 文件中添加如下配置:

rocketmq:name-server: 127.0.0.1:9876producer:group: order-group

3、修改 OrderServiceImpl

@Slf4j
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {// ...@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Override@Transactional(rollbackFor = Exception.class)public void saveOrder(OrderParamVo orderParamVo) {// ...log.info("库存扣减成功");// 发送消息rocketMQTemplate.convertAndSend("order-topic", order);}
}

4.3、测试集成的 RocketMQ

1、分别启动 Nacos,Sentinel,ZipKin 和 RocketMQ

2、分别启动用户微服务、商品微服务、订单微服务和网关服务

3、在浏览器中输入localhost:10001/server-order/order/submit_order?userId=1001&productId=1001&count=1

4、查看用户微服务 shop-user 的控制台,发现会输出订单的信息,如下所示:

在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 003.精读《MapReduce: Simplified Data Processing on Large Clusters》
  • Npm install 原理
  • Dockerfile应用、私有仓库
  • 昇腾AI处理器的计算核心 - AI Core即DaVinci Core
  • 机器学习数学公式推导之高斯分布
  • 逆向工程核心原理 Chapter22 | 恶意键盘记录器
  • Jenkins安装使用详解,jenkins实现企业级CICD流程
  • 【解压即玩】PC极限竞速:地平线5 顶级豪华中文版 v1.656.386 全DLC 联机补丁810辆全车存档
  • HTML沙漏爱心
  • ansys apdl目标区域节点号提取,通过workbench设置节点集合
  • 安装python软件
  • UniApp 中页面跳转的方法及传值
  • 金融风控领域的15大顶级学术期刊
  • 2024年Java最新面试题总结(三年经验)
  • OSPF理论
  • “Material Design”设计规范在 ComponentOne For WinForm 的全新尝试!
  • 「面试题」如何实现一个圣杯布局?
  • OpenStack安装流程(juno版)- 添加网络服务(neutron)- controller节点
  • PHP那些事儿
  • Storybook 5.0正式发布:有史以来变化最大的版本\n
  • vue和cordova项目整合打包,并实现vue调用android的相机的demo
  • 从0到1:PostCSS 插件开发最佳实践
  • 关于使用markdown的方法(引自CSDN教程)
  • 基于游标的分页接口实现
  • 类orAPI - 收藏集 - 掘金
  • 前嗅ForeSpider中数据浏览界面介绍
  • 驱动程序原理
  • 使用 5W1H 写出高可读的 Git Commit Message
  • 吐槽Javascript系列二:数组中的splice和slice方法
  • 消息队列系列二(IOT中消息队列的应用)
  • 在electron中实现跨域请求,无需更改服务器端设置
  • ​DB-Engines 11月数据库排名:PostgreSQL坐稳同期涨幅榜冠军宝座
  • # 数论-逆元
  • #LLM入门|Prompt#2.3_对查询任务进行分类|意图分析_Classification
  • #VERDI# 关于如何查看FSM状态机的方法
  • #图像处理
  • (Matalb时序预测)PSO-BP粒子群算法优化BP神经网络的多维时序回归预测
  • (MATLAB)第五章-矩阵运算
  • (Mirage系列之二)VMware Horizon Mirage的经典用户用例及真实案例分析
  • (篇九)MySQL常用内置函数
  • (三)SvelteKit教程:layout 文件
  • (转) SpringBoot:使用spring-boot-devtools进行热部署以及不生效的问题解决
  • (转)LINQ之路
  • .NET Reactor简单使用教程
  • @transactional 方法执行完再commit_当@Transactional遇到@CacheEvict,你的代码是不是有bug!...
  • [ CTF ] WriteUp- 2022年第三届“网鼎杯”网络安全大赛(白虎组)
  • [100天算法】-每个元音包含偶数次的最长子字符串(day 53)
  • [20160902]rm -rf的惨案.txt
  • [AI]文心一言出圈的同时,NLP处理下的ChatGPT-4.5最新资讯
  • [AIR] NativeExtension在IOS下的开发实例 --- IOS项目的创建 (一)
  • [Algorithm][动态规划][01背包问题][目标和][最后一块石头的重量Ⅱ]详细讲解
  • [Android Studio 权威教程]断点调试和高级调试
  • [Angular] 笔记 21:@ViewChild
  • [AutoSAR系列] 1.3 AutoSar 架构
  • [BZOJ2850]巧克力王国