当前位置: 首页 > news >正文

RocketMQ源码分析 - 环境搭建

RocketMQ源码分析 - 环境搭建

    • 环境搭建
      • 源码拉取
      • 导入IDEA
      • 调试
        • 1) 启动NameServer
        • 2) 启动Broker
        • 3) 发送消息
        • 4) 消费消息

环境搭建

依赖工具

  • JDK:1.8+
  • Maven
  • Intellij IDEA

源码拉取

从官方仓库 https://github.com/apache/rocketmq clone或者download源码。
在这里插入图片描述
源码目录结构:

  • broker:broker模块(broker启动进程)
  • client:消息客户端,包含消息生产者、消息消费者相关类
  • common:公共包
  • dev:开发者信息(非源代码)
  • distribution:部署实例文件夹(非源代码)
  • example:RocketMQ例代码
  • filter:消息过滤相关基础类
  • filtersrv:消息过滤服务器实现相关类(Filter启动进程)
  • logappender:日志实现相关类
  • namesrv:NameServer实现相关类(NameServer启动进程)
  • openmessageing:消息开放标准
  • remoting:远程通信模块,给予Netty
  • srcutil:服务工具类
  • store:消息存储实现相关类
  • style:checkstyle相关实现
  • test:测试相关类
  • tools:工具类,监控命令相关实现类

导入IDEA

在这里插入图片描述
执行安装

clean install -Dmaven.test.skip=true

在这里插入图片描述

。。。。。。
[INFO] 
[INFO] Apache RocketMQ 4.5.1 .............................. SUCCESS [ 24.872 s]
[INFO] rocketmq-logging 4.5.1 ............................. SUCCESS [  3.511 s]
[INFO] rocketmq-remoting 4.5.1 ............................ SUCCESS [  4.462 s]
[INFO] rocketmq-common 4.5.1 .............................. SUCCESS [  5.444 s]
[INFO] rocketmq-client 4.5.1 .............................. SUCCESS [  4.268 s]
[INFO] rocketmq-store 4.5.1 ............................... SUCCESS [  3.219 s]
[INFO] rocketmq-srvutil 4.5.1 ............................. SUCCESS [  1.431 s]
[INFO] rocketmq-filter 4.5.1 .............................. SUCCESS [  1.321 s]
[INFO] rocketmq-acl 4.5.1 ................................. SUCCESS [  1.082 s]
[INFO] rocketmq-broker 4.5.1 .............................. SUCCESS [  3.667 s]
[INFO] rocketmq-tools 4.5.1 ............................... SUCCESS [  2.806 s]
[INFO] rocketmq-namesrv 4.5.1 ............................. SUCCESS [  1.228 s]
[INFO] rocketmq-logappender 4.5.1 ......................... SUCCESS [  1.394 s]
[INFO] rocketmq-openmessaging 4.5.1 ....................... SUCCESS [  1.122 s]
[INFO] rocketmq-example 4.5.1 ............................. SUCCESS [  1.282 s]
[INFO] rocketmq-test 4.5.1 ................................ SUCCESS [  1.439 s]
[INFO] rocketmq-distribution 4.5.1 ........................ SUCCESS [  0.147 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:02 min
[INFO] Finished at: 2024-07-19T08:46:25+08:00
[INFO] Final Memory: 57M/913M
[INFO] ------------------------------------------------------------------------Process finished with exit code 0

调试

创建conf配置文件夹,从distribution拷贝broker.conf和logback_broker.xml和logback_namesrv.xml
在这里插入图片描述

1) 启动NameServer
  • 展开namesrv模块,右键NamesrvStartup.java
    在这里插入图片描述

  • 配置ROCKETMO_HOME
    在这里插入图片描述
    在这里插入图片描述

  • 重新启动
    控制台打印结果

Connected to the target VM, address: '127.0.0.1:65350', transport: 'socket'
The Name Server boot success. serializeType=JSON

在这里插入图片描述

2) 启动Broker
  • broker.conf配置文件内容

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# namesrvAddr地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 开启客户端创建主题功能
autoCreateTopicEnable=true# 存储路径
storePathRootDir=D:\\work\\mq\\rocketmq-master\\dataDir
# commitLog路径
storePathCommitLog=D:\\work\\mq\\rocketmq-master\\dataDir\\commitLog
# 消息队列存储路径
storePathConsumeQueue=D:\\work\\mq\\rocketmq-master\\dataDir\\consumequeue
# 消息索引存储路径
storePathIndex=D:\\work\\mq\\rocketmq-master\\dataDir\\index
# checkpoint文件路径
storeCheckpoint=D:\\work\\mq\\rocketmq-master\\dataDir\\checkpoint
# abort文件存储路径
abortFile=D:\\work\\mq\\rocketmq-master\\dataDir\\abort
  • 创建数据文件夹dataDir
  • 启动BrokerStartup,配置broker.conf和ROCKETMQ_HOME
    在这里插入图片描述
    在这里插入图片描述
3) 发送消息
  • 进入example模块的org.apache.rocketmq.example.quickstart
  • 指定Namesrv地址
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
  • 运行main方法,发送消息

4) 消费消息
  • 进入example模块的org.apache.rocketmq.example.quickstart
  • 指定Namesrv地址
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");```
- 运行main方法,发送消息
```javascript
D:\install\jdk\jdk-8u131-windows-x64\bin\java.exe -javaagent:D:\install\idea\ideaIU-2018.3.5.win\lib\idea_rt.jar=57007:D:\install\idea\ideaIU-2018.3.5.win\bin -Dfile.encoding=UTF-8 -classpath D:\install\jdk\jdk-8u131-windows-x64\jre\lib\charsets.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\deploy.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\access-bridge-64.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\cldrdata.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\dnsns.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\jaccess.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\jfxrt.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\localedata.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\nashorn.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\sunec.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\sunjce_provider.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\sunmscapi.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\sunpkcs11.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\ext\zipfs.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\javaws.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\jce.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\jfr.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\jfxswt.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\jsse.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\management-agent.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\plugin.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\resources.jar;D:\install\jdk\jdk-8u131-windows-x64\jre\lib\rt.jar;D:\work\mq\rocketmq-master\example\target\classes;D:\work\mq\rocketmq-master\client\target\classes;D:\work\mq\rocketmq-master\common\target\classes;C:\Users\muxu\.m2\repository\org\apache\commons\commons-lang3\3.4\commons-lang3-3.4.jar;D:\work\mq\rocketmq-master\srvutil\target\classes;D:\work\mq\rocketmq-master\remoting\target\classes;C:\Users\muxu\.m2\repository\com\alibaba\fastjson\1.2.51\fastjson-1.2.51.jar;C:\Users\muxu\.m2\repository\io\netty\netty-all\4.0.42.Final\netty-all-4.0.42.Final.jar;C:\Users\muxu\.m2\repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;C:\Users\muxu\.m2\repository\commons-cli\commons-cli\1.2\commons-cli-1.2.jar;C:\Users\muxu\.m2\repository\com\google\guava\guava\19.0\guava-19.0.jar;C:\Users\muxu\.m2\repository\ch\qos\logback\logback-classic\1.0.13\logback-classic-1.0.13.jar;C:\Users\muxu\.m2\repository\ch\qos\logback\logback-core\1.0.13\logback-core-1.0.13.jar;C:\Users\muxu\.m2\repository\org\slf4j\slf4j-api\1.7.7\slf4j-api-1.7.7.jar;C:\Users\muxu\.m2\repository\org\javassist\javassist\3.20.0-GA\javassist-3.20.0-GA.jar;C:\Users\muxu\.m2\repository\io\openmessaging\openmessaging-api\0.3.1-alpha\openmessaging-api-0.3.1-alpha.jar;D:\work\mq\rocketmq-master\openmessaging\target\classes;D:\work\mq\rocketmq-master\acl\target\classes;D:\work\mq\rocketmq-master\logging\target\classes;C:\Users\muxu\.m2\repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;C:\Users\muxu\.m2\repository\commons-codec\commons-codec\1.9\commons-codec-1.9.jar org.apache.rocketmq.example.quickstart.Consumer
22:16:16.493 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=2, sysFlag=0, bornTimestamp=1721571319594, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319598, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F0000000000000642, commitLogOffset=1602, bodyCRC=193412630, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382412, UNIQ_KEY=0200000111C818B4AAC26BC5B72A000A, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 48], transactionId='null'}]] 
ConsumeMessageThread_16 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=11, sysFlag=0, bornTimestamp=1721571319797, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319798, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F0000000000001F6E, commitLogOffset=8046, bodyCRC=529756006, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382418, UNIQ_KEY=0200000111C818B4AAC26BC5B7F5002E, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52, 54], transactionId='null'}]] 
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=3, sysFlag=0, bornTimestamp=1721571319624, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319627, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F000000000000090E, commitLogOffset=2318, bodyCRC=216726031, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382419, UNIQ_KEY=0200000111C818B4AAC26BC5B748000E, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 52], transactionId='null'}]] 
ConsumeMessageThread_11 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=6, sysFlag=0, bornTimestamp=1721571319698, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319702, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F0000000000001172, commitLogOffset=4466, bodyCRC=1237960928, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382419, UNIQ_KEY=0200000111C818B4AAC26BC5B792001A, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50, 54], transactionId='null'}]] 
ConsumeMessageThread_16 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=18, sysFlag=0, bornTimestamp=1721571319880, bornHost=/2.0.0.1:56937, storeTimestamp=1721571319881, storeHost=/2.0.0.1:10911, msgId=0200000100002A9F0000000000003302, commitLogOffset=13058, bodyCRC=1521507721, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1721571382425, UNIQ_KEY=0200000111C818B4AAC26BC5B848004A, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 52], transactionId='null'}]] 
。。。。。。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 【ARM Hypervisor And SMMU 系列 5 -- SMMU 和 IOMMU技术】
  • Python爬虫实战训练:爬取网络小说
  • plsql表格怎么显示中文 plsql如何导入表格数据
  • ant design pro access.ts 是如何控制多角色的权限的
  • 网络编程UDP和TCP
  • Java二十三种设计模式-责任链模式(17/23)
  • 做谷歌seo如何确保网站的速度快?
  • Python版《超级玛丽+源码》-Python制作超级玛丽游戏
  • [Linux CMD] 查询占用进程 fuser
  • tp5php7.4配置sqlserver问题汇总
  • Windows 11 24H2 终于允许多个应用程序同时使用摄像头
  • Java重修笔记 第三十八天 String翻转
  • 初阶数据结构之计数排序
  • 【电子通识】IPC-A-600中对验收标准的定义
  • chromedriver下载地址大全(包括124.*后)以及替换exe后仍显示版本不匹配的问题
  • 【跃迁之路】【519天】程序员高效学习方法论探索系列(实验阶段276-2018.07.09)...
  • Android组件 - 收藏集 - 掘金
  • Apache的基本使用
  • exif信息对照
  • JAVA之继承和多态
  • Laravel Mix运行时关于es2015报错解决方案
  • mysql 数据库四种事务隔离级别
  • Three.js 再探 - 写一个跳一跳极简版游戏
  • Vue ES6 Jade Scss Webpack Gulp
  • VUE es6技巧写法(持续更新中~~~)
  • 产品三维模型在线预览
  • 探索 JS 中的模块化
  • 我看到的前端
  • 用mpvue开发微信小程序
  • 深度学习之轻量级神经网络在TWS蓝牙音频处理器上的部署
  • 【运维趟坑回忆录】vpc迁移 - 吃螃蟹之路
  • # 数论-逆元
  • #LLM入门|Prompt#2.3_对查询任务进行分类|意图分析_Classification
  • #我与Java虚拟机的故事#连载15:完整阅读的第一本技术书籍
  • (1)svelte 教程:hello world
  • (13)Latex:基于ΤΕΧ的自动排版系统——写论文必备
  • (代码示例)使用setTimeout来延迟加载JS脚本文件
  • (三维重建学习)已有位姿放入colmap和3D Gaussian Splatting训练
  • (一)Spring Cloud 直击微服务作用、架构应用、hystrix降级
  • (原創) 如何優化ThinkPad X61開機速度? (NB) (ThinkPad) (X61) (OS) (Windows)
  • (转载)虚函数剖析
  • ./include/caffe/util/cudnn.hpp: In function ‘const char* cudnnGetErrorString(cudnnStatus_t)’: ./incl
  • .NET/ASP.NETMVC 大型站点架构设计—迁移Model元数据设置项(自定义元数据提供程序)...
  • .NET基础篇——反射的奥妙
  • .NET运行机制
  • @ConditionalOnProperty注解使用说明
  • [ solr入门 ] - 利用solrJ进行检索
  • [BPU部署教程] 教你搞定YOLOV5部署 (版本: 6.2)
  • [C#][DevPress]事件委托的使用
  • [C#]猫叫人醒老鼠跑 C#的委托及事件
  • [C++]C++入门--引用
  • [codeforces]Checkpoints
  • [C语言]编译和链接
  • [Debugger]调试Arm设备
  • [delphi]保证程序只运行一个实例