当前位置: 首页 > news >正文

RocketMQ源码 Broker-SubscriptionGroupManager 订阅组管理组件源码分析

前言

SubscriptionGroupManager 继承了ConfigManager配置管理组件,拥有将内存数据持久化到磁盘文件subscriptionGroup.json的能力。它主要负责维护所有消费组在内存中的订阅数据。


源码版本:4.9.3

源码架构图

核心数据结构

主要的数据结构比较简单,维护了Map<消费组名称, 订阅组配置>的映射关系。

// 订阅组管理组件
public class SubscriptionGroupManager extends ConfigManager {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);// Map<消费组名称,订阅组配置>private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);// 内存数据版本号private final DataVersion dataVersion = new DataVersion();
}

深入看下SubscriptionGroupConfig 的数据结构。

public class SubscriptionGroupConfig {// 消费组名称private String groupName;// 是否开启消费private boolean consumeEnable = true;// 是否允许消费最早消息private boolean consumeFromMinEnable = true;// 是否允许广播消费private boolean consumeBroadcastEnable = true;// 重试队列数private int retryQueueNums = 1;// 重试最大次数private int retryMaxTimes = 16;// brokerIdprivate long brokerId = MixAll.MASTER_ID;// 当产生慢消费时,选择第几个brokerprivate long whichBrokerWhenConsumeSlowly = 1;// 是否通知消费者ids变化private boolean notifyConsumerIdsChangedEnable = true;
}

核心数据行为

数据行为主要都是对上面提到的数据结构的维护,代码 + 注释如下:

// 订阅组管理组件
public class SubscriptionGroupManager extends ConfigManager {public SubscriptionGroupManager() {this.init();}public SubscriptionGroupManager(BrokerController brokerController) {this.brokerController = brokerController;this.init();}private void init() {{// 初始化系统消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化过滤服务消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化自测消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化http代理消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_PULL消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true); // 激活广播模式this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_PERMISSION消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_OWNER消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);}}// 更新订阅配置,且更新内存数据版本号public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);if (old != null) {log.info("update subscription group config, old: {} new: {}", old, config);} else {log.info("create new subscription group, {}", config);}this.dataVersion.nextVersion();this.persist();}// 失效消费组public void disableConsume(final String groupName) {SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);if (old != null) {old.setConsumeEnable(false);this.dataVersion.nextVersion();}}// 查找指定消费组的订阅配置public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);if (null == subscriptionGroupConfig) {if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(group);SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);if (null == preConfig) {log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());}this.dataVersion.nextVersion();this.persist();}}return subscriptionGroupConfig;}// 将内存数据结构编码成字符串@Overridepublic String encode() {return this.encode(false);}// 获取配置文件路径@Overridepublic String configFilePath() {return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());}// 从字符串中恢复数据,写回内存数据结构@Overridepublic void decode(String jsonString) {if (jsonString != null) {SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);if (obj != null) {this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);this.dataVersion.assignNewOne(obj.dataVersion);this.printLoadDataWhenFirstBoot(obj);}}}// 将内存数据结构编码成字符串public String encode(final boolean prettyFormat) {return RemotingSerializable.toJson(this, prettyFormat);}// 当第一次启动时,打印加载数据时的日志private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();while (it.hasNext()) {Entry<String, SubscriptionGroupConfig> next = it.next();log.info("load exist subscription group, {}", next.getValue().toString());}}public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {return subscriptionGroupTable;}public DataVersion getDataVersion() {return dataVersion;}// 删除指定消费组的订阅配置public void deleteSubscriptionGroupConfig(final String groupName) {SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);if (old != null) {log.info("delete subscription group OK, subscription group:{}", old);this.dataVersion.nextVersion();this.persist();} else {log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);}}
}

相关文章:

  • fastapi-amis-admin快速创建一个后台管理系统增加音乐管理功能(3)
  • 前端接入若依后,页面白屏问题排查
  • 玩转大数据11:数据可视化与交互式分析
  • scala编码
  • 《算法面试宝典》--机器学习常见问题汇总
  • [ndss 2023]确保联邦敏感主题分类免受中毒攻击
  • “新华三杯”第十届成都信息工程大学ACM程序设计竞赛(同步赛)L. 怎么走啊(最短路+二分 分段函数)
  • 视频监控案例分析
  • Jenkins离线安装部署教程简记
  • SQL Server ,使用递归查询具有层级关系的数据。
  • 家政预约小程序带商城,图文详解
  • LeetCode刷题--- 二叉搜索树中第K小的元素
  • 如何使用 Flutter 和地理位置 API 构建基于位置的移动应用程序?
  • 【前端】HTML5 CSS3新特性(学习笔记)
  • 12. IO
  • SegmentFault for Android 3.0 发布
  • AHK 中 = 和 == 等比较运算符的用法
  • Android 架构优化~MVP 架构改造
  • Eureka 2.0 开源流产,真的对你影响很大吗?
  • Flex布局到底解决了什么问题
  • JavaScript实现分页效果
  • js如何打印object对象
  • MYSQL 的 IF 函数
  • Mysql优化
  • Terraform入门 - 1. 安装Terraform
  • Vue源码解析(二)Vue的双向绑定讲解及实现
  • 初探 Vue 生命周期和钩子函数
  • 大数据与云计算学习:数据分析(二)
  • 高性能JavaScript阅读简记(三)
  • 理清楚Vue的结构
  • 面试题:给你个id,去拿到name,多叉树遍历
  • 入口文件开始,分析Vue源码实现
  • 入手阿里云新服务器的部署NODE
  • 什么是Javascript函数节流?
  • 微信支付JSAPI,实测!终极方案
  • 一道闭包题引发的思考
  • 用Canvas画一棵二叉树
  • 智能合约Solidity教程-事件和日志(一)
  • 测评:对于写作的人来说,Markdown是你最好的朋友 ...
  • 整理一些计算机基础知识!
  • ​【原创】基于SSM的酒店预约管理系统(酒店管理系统毕业设计)
  • ​比特币大跌的 2 个原因
  • #include到底该写在哪
  • #常见电池型号介绍 常见电池尺寸是多少【详解】
  • #快捷键# 大学四年我常用的软件快捷键大全,教你成为电脑高手!!
  • #我与Java虚拟机的故事#连载04:一本让自己没面子的书
  • (ZT)薛涌:谈贫说富
  • (附源码)spring boot基于Java的电影院售票与管理系统毕业设计 011449
  • (附源码)计算机毕业设计ssm电影分享网站
  • (附源码)计算机毕业设计SSM疫情居家隔离服务系统
  • (蓝桥杯每日一题)平方末尾及补充(常用的字符串函数功能)
  • (原创) cocos2dx使用Curl连接网络(客户端)
  • (转)拼包函数及网络封包的异常处理(含代码)
  • (轉貼) 2008 Altera 亞洲創新大賽 台灣學生成果傲視全球 [照片花絮] (SOC) (News)
  • .bat批处理(二):%0 %1——给批处理脚本传递参数