当前位置: 首页 > news >正文

第六章-Broker使用的通用类

6.1 BrokerConfig

public class BrokerConfig {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);//从配置文件中拿到rocketmq主目录路径,如果没有设置,默认取环境变量ROCKETMQ_HOME的值private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));@ImportantFieldprivate String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));@ImportantFieldprivate String brokerIP1 = RemotingUtil.getLocalAddress();private String brokerIP2 = RemotingUtil.getLocalAddress();@ImportantFieldprivate String brokerName = localHostName();@ImportantFieldprivate String brokerClusterName = "DefaultCluster";@ImportantFieldprivate long brokerId = MixAll.MASTER_ID;private int brokerPermission = PermName.PERM_READ | PermName.PERM_WRITE;private int defaultTopicQueueNums = 8;@ImportantFieldprivate boolean autoCreateTopicEnable = true;private boolean clusterTopicEnable = true;private boolean brokerTopicEnable = true;@ImportantFieldprivate boolean autoCreateSubscriptionGroup = true;private String messageStorePlugIn = "";@ImportantFieldprivate String msgTraceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;@ImportantFieldprivate boolean traceTopicEnable = false;/*** thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default* value is 1.*/private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();private int adminBrokerThreadPoolNums = 16;private int clientManageThreadPoolNums = 32;private int consumerManageThreadPoolNums = 32;private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());/*** Thread numbers for EndTransactionProcessor*/private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2;private int flushConsumerOffsetInterval = 1000 * 5;private int flushConsumerOffsetHistoryInterval = 1000 * 60;@ImportantFieldprivate boolean rejectTransactionMessage = false;@ImportantFieldprivate boolean fetchNamesrvAddrByAddressServer = false;private int sendThreadPoolQueueCapacity = 10000;private int pullThreadPoolQueueCapacity = 100000;private int queryThreadPoolQueueCapacity = 20000;private int clientManagerThreadPoolQueueCapacity = 1000000;private int consumerManagerThreadPoolQueueCapacity = 1000000;private int heartbeatThreadPoolQueueCapacity = 50000;private int endTransactionPoolQueueCapacity = 100000;private int filterServerNums = 0;private boolean longPollingEnable = true;private long shortPollingTimeMills = 1000;private boolean notifyConsumerIdsChangedEnable = true;private boolean highSpeedMode = false;private boolean commercialEnable = true;private int commercialTimerCount = 1;private int commercialTransCount = 1;private int commercialBigCount = 1;private int commercialBaseCount = 1;private boolean transferMsgByHeap = true;private int maxDelayTime = 40;private String regionId = MixAll.DEFAULT_TRACE_REGION_ID;private int registerBrokerTimeoutMills = 6000;private boolean slaveReadEnable = false;private boolean disableConsumeIfConsumerReadSlowly = false;private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16;private boolean brokerFastFailureEnable = true;private long waitTimeMillsInSendQueue = 200;private long waitTimeMillsInPullQueue = 5 * 1000;private long waitTimeMillsInHeartbeatQueue = 31 * 1000;private long waitTimeMillsInTransactionQueue = 3 * 1000;private long startAcceptSendRequestTimeStamp = 0L;private boolean traceOn = true;// Switch of filter bit map calculation.// If switch on:// 1. Calculate filter bit map when construct queue.// 2. Filter bit map will be saved to consume queue extend file if allowed.private boolean enableCalcFilterBitMap = false;// Expect num of consumers will use filter.private int expectConsumerNumUseFilter = 32;// Error rate of bloom filter, 1~100.private int maxErrorRateOfBloomFilter = 20;//how long to clean filter data after dead.Default: 24hprivate long filterDataCleanTimeSpan = 24 * 3600 * 1000;// whether do filter when retry.private boolean filterSupportRetry = false;private boolean enablePropertyFilter = false;private boolean compressedRegister = false;private boolean forceRegister = true;/*** This configurable item defines interval of topics registration of broker to name server. Allowing values are* between 10, 000 and 60, 000 milliseconds.*/private int registerNameServerPeriod = 1000 * 30;/*** The minimum time of the transactional message  to be checked firstly, one message only exceed this time interval* that can be checked.*/@ImportantFieldprivate long transactionTimeOut = 6 * 1000;/*** The maximum number of times the message was checked, if exceed this value, this message will be discarded.*/@ImportantFieldprivate int transactionCheckMax = 15;/*** Transaction message check interval.*/@ImportantFieldprivate long transactionCheckInterval = 60 * 1000;/*** Acl feature switch*/@ImportantFieldprivate boolean aclEnable = false;
}

6.2 MixAll

public class MixAll {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);//rocketmq环境变量的名字public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";//roketmq主目录路径,可在配置文件中设置public static final String ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir";//namesrv环境变更public static final String NAMESRV_ADDR_ENV = "NAMESRV_ADDR";//namesrv地址配置项public static final String NAMESRV_ADDR_PROPERTY = "rocketmq.namesrv.addr";//设置消息的压缩级别,只有0-9级,0表示不压缩,9表示最佳压缩(当然速度就最慢),rocket默认是折中为5public static final String MESSAGE_COMPRESS_LEVEL = "rocketmq.message.compressLevel";//下面几项设置都是当启动时没有设置namesrv地址时,broker自己去找合适的namesrv地址,生产一般不用这种情况,所以也不去研究这块public static final String DEFAULT_NAMESRV_ADDR_LOOKUP = "jmenv.tbsite.net";public static final String WS_DOMAIN_NAME = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");//http://jmenv.tbsite.net:8080/rocketmq/nsaddr//public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP;//默认自动创建的topic名public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnablepublic static final String BENCHMARK_TOPIC = "BenchmarkTest";public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";public static final String TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER";public static final String FILTERSRV_CONSUMER_GROUP = "FILTERSRV_CONSUMER";public static final String MONITOR_CONSUMER_GROUP = "__MONITOR_CONSUMER";public static final String CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER";public static final String SELF_TEST_PRODUCER_GROUP = "SELF_TEST_P_GROUP";public static final String SELF_TEST_CONSUMER_GROUP = "SELF_TEST_C_GROUP";public static final String SELF_TEST_TOPIC = "SELF_TEST_TOPIC";public static final String OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT";public static final String ONS_HTTP_PROXY_GROUP = "CID_ONS-HTTP-PROXY";public static final String CID_ONSAPI_PERMISSION_GROUP = "CID_ONSAPI_PERMISSION";public static final String CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER";public static final String CID_ONSAPI_PULL_GROUP = "CID_ONSAPI_PULL";public static final String CID_RMQ_SYS_PREFIX = "CID_RMQ_SYS_";public static final List<String> LOCAL_INET_ADDRESS = getLocalInetAddress();public static final String LOCALHOST = localhost();public static final String DEFAULT_CHARSET = "UTF-8";public static final long MASTER_ID = 0L;public static final long CURRENT_JVM_PID = getPID();public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY";public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion";public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
}

6.3 BrokerPathConfigHelper

public class BrokerPathConfigHelper {private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store"+ File.separator + "config" + File.separator + "broker.properties";public static String getBrokerConfigPath() {return brokerConfigPath;}public static void setBrokerConfigPath(String path) {brokerConfigPath = path;}public static String getTopicConfigPath(final String rootDir) {return rootDir + File.separator + "config" + File.separator + "topics.json";}public static String getConsumerOffsetPath(final String rootDir) {return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";}public static String getSubscriptionGroupPath(final String rootDir) {return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";}public static String getConsumerFilterPath(final String rootDir) {return rootDir + File.separator + "config" + File.separator + "consumerFilter.json";}
}

相关文章:

  • 精酿啤酒:特殊酵母的发酵特性与风味表现
  • 前端 CSS 经典:grid 栅格布局
  • C# 类的操作
  • 树状数组及应用
  • RuoYi-Vue-Plus(登录流程)
  • k8s1.28.8版本安装prometheus并持久化数据
  • 【Java】IDEA集成开发工具中英文切换
  • 面试题 之 webpack
  • STL标准模板库(C++
  • Pillow教程06:将图片中出现的黄色和红色,改成绿色
  • Android仿微信视频聊天本地与远程切换功能
  • 基于springboot+vue+Mysql的超市进销存系统
  • 【JVM】JVM 运行时数据区简介
  • IntelliJ中的非JVM技术
  • 51单片机学习笔记8 中断系统及定时器
  • 345-反转字符串中的元音字母
  • 5、React组件事件详解
  • canvas 高仿 Apple Watch 表盘
  • CentOS 7 防火墙操作
  • eclipse的离线汉化
  • Java到底能干嘛?
  • js学习笔记
  • Lsb图片隐写
  • Mysql优化
  • Python打包系统简单入门
  • redis学习笔记(三):列表、集合、有序集合
  • Redux 中间件分析
  • Spark RDD学习: aggregate函数
  • Spring Cloud Alibaba迁移指南(一):一行代码从 Hystrix 迁移到 Sentinel
  • SpringCloud集成分布式事务LCN (一)
  • Terraform入门 - 1. 安装Terraform
  • 半理解系列--Promise的进化史
  • 观察者模式实现非直接耦合
  • 规范化安全开发 KOA 手脚架
  • 基于Vue2全家桶的移动端AppDEMO实现
  • 开源中国专访:Chameleon原理首发,其它跨多端统一框架都是假的?
  • 聊聊springcloud的EurekaClientAutoConfiguration
  • 前端设计模式
  • 深度解析利用ES6进行Promise封装总结
  • 十年未变!安全,谁之责?(下)
  • 数组大概知多少
  • 责任链模式的两种实现
  • - 转 Ext2.0 form使用实例
  • ​TypeScript都不会用,也敢说会前端?
  • ​力扣解法汇总1802. 有界数组中指定下标处的最大值
  • ###项目技术发展史
  • #{} 和 ${}区别
  • #define用法
  • $L^p$ 调和函数恒为零
  • (三) diretfbrc详解
  • (十)c52学习之旅-定时器实验
  • (十一)c52学习之旅-动态数码管
  • (一)基于IDEA的JAVA基础12
  • (转)AS3正则:元子符,元序列,标志,数量表达符
  • ... 是什么 ?... 有什么用处?