publicclassBrokerConfig{privatestaticfinalInternalLogger log =InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);//从配置文件中拿到rocketmq主目录路径,如果没有设置,默认取环境变量ROCKETMQ_HOME的值privateString rocketmqHome =System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,System.getenv(MixAll.ROCKETMQ_HOME_ENV));@ImportantFieldprivateString namesrvAddr =System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY,System.getenv(MixAll.NAMESRV_ADDR_ENV));@ImportantFieldprivateString brokerIP1 =RemotingUtil.getLocalAddress();privateString brokerIP2 =RemotingUtil.getLocalAddress();@ImportantFieldprivateString brokerName =localHostName();@ImportantFieldprivateString brokerClusterName ="DefaultCluster";@ImportantFieldprivatelong brokerId =MixAll.MASTER_ID;privateint brokerPermission =PermName.PERM_READ|PermName.PERM_WRITE;privateint defaultTopicQueueNums =8;@ImportantFieldprivateboolean autoCreateTopicEnable =true;privateboolean clusterTopicEnable =true;privateboolean brokerTopicEnable =true;@ImportantFieldprivateboolean autoCreateSubscriptionGroup =true;privateString messageStorePlugIn ="";@ImportantFieldprivateString msgTraceTopicName =MixAll.RMQ_SYS_TRACE_TOPIC;@ImportantFieldprivateboolean traceTopicEnable =false;/*** thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default* value is 1.*/privateint sendMessageThreadPoolNums =1;//16 + Runtime.getRuntime().availableProcessors() * 4;privateint pullMessageThreadPoolNums =16+Runtime.getRuntime().availableProcessors()*2;privateint queryMessageThreadPoolNums =8+Runtime.getRuntime().availableProcessors();privateint adminBrokerThreadPoolNums =16;privateint clientManageThreadPoolNums =32;privateint consumerManageThreadPoolNums =32;privateint heartbeatThreadPoolNums =Math.min(32,Runtime.getRuntime().availableProcessors());/*** Thread numbers for EndTransactionProcessor*/privateint endTransactionThreadPoolNums =8+Runtime.getRuntime().availableProcessors()*2;privateint flushConsumerOffsetInterval =1000*5;privateint flushConsumerOffsetHistoryInterval =1000*60;@ImportantFieldprivateboolean rejectTransactionMessage =false;@ImportantFieldprivateboolean fetchNamesrvAddrByAddressServer =false;privateint sendThreadPoolQueueCapacity =10000;privateint pullThreadPoolQueueCapacity =100000;privateint queryThreadPoolQueueCapacity =20000;privateint clientManagerThreadPoolQueueCapacity =1000000;privateint consumerManagerThreadPoolQueueCapacity =1000000;privateint heartbeatThreadPoolQueueCapacity =50000;privateint endTransactionPoolQueueCapacity =100000;privateint filterServerNums =0;privateboolean longPollingEnable =true;privatelong shortPollingTimeMills =1000;privateboolean notifyConsumerIdsChangedEnable =true;privateboolean highSpeedMode =false;privateboolean commercialEnable =true;privateint commercialTimerCount =1;privateint commercialTransCount =1;privateint commercialBigCount =1;privateint commercialBaseCount =1;privateboolean transferMsgByHeap =true;privateint maxDelayTime =40;privateString regionId =MixAll.DEFAULT_TRACE_REGION_ID;privateint registerBrokerTimeoutMills =6000;privateboolean slaveReadEnable =false;privateboolean disableConsumeIfConsumerReadSlowly =false;privatelong consumerFallbehindThreshold =1024L*1024*1024*16;privateboolean brokerFastFailureEnable =true;privatelong waitTimeMillsInSendQueue =200;privatelong waitTimeMillsInPullQueue =5*1000;privatelong waitTimeMillsInHeartbeatQueue =31*1000;privatelong waitTimeMillsInTransactionQueue =3*1000;privatelong startAcceptSendRequestTimeStamp =0L;privateboolean traceOn =true;// Switch of filter bit map calculation.// If switch on:// 1. Calculate filter bit map when construct queue.// 2. Filter bit map will be saved to consume queue extend file if allowed.privateboolean enableCalcFilterBitMap =false;// Expect num of consumers will use filter.privateint expectConsumerNumUseFilter =32;// Error rate of bloom filter, 1~100.privateint maxErrorRateOfBloomFilter =20;//how long to clean filter data after dead.Default: 24hprivatelong filterDataCleanTimeSpan =24*3600*1000;// whether do filter when retry.privateboolean filterSupportRetry =false;privateboolean enablePropertyFilter =false;privateboolean compressedRegister =false;privateboolean forceRegister =true;/*** This configurable item defines interval of topics registration of broker to name server. Allowing values are* between 10, 000 and 60, 000 milliseconds.*/privateint registerNameServerPeriod =1000*30;/*** The minimum time of the transactional message to be checked firstly, one message only exceed this time interval* that can be checked.*/@ImportantFieldprivatelong transactionTimeOut =6*1000;/*** The maximum number of times the message was checked, if exceed this value, this message will be discarded.*/@ImportantFieldprivateint transactionCheckMax =15;/*** Transaction message check interval.*/@ImportantFieldprivatelong transactionCheckInterval =60*1000;/*** Acl feature switch*/@ImportantFieldprivateboolean aclEnable =false;}
6.2 MixAll
publicclassMixAll{privatestaticfinalInternalLogger log =InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);//rocketmq环境变量的名字publicstaticfinalStringROCKETMQ_HOME_ENV="ROCKETMQ_HOME";//roketmq主目录路径,可在配置文件中设置publicstaticfinalStringROCKETMQ_HOME_PROPERTY="rocketmq.home.dir";//namesrv环境变更publicstaticfinalStringNAMESRV_ADDR_ENV="NAMESRV_ADDR";//namesrv地址配置项publicstaticfinalStringNAMESRV_ADDR_PROPERTY="rocketmq.namesrv.addr";//设置消息的压缩级别,只有0-9级,0表示不压缩,9表示最佳压缩(当然速度就最慢),rocket默认是折中为5publicstaticfinalStringMESSAGE_COMPRESS_LEVEL="rocketmq.message.compressLevel";//下面几项设置都是当启动时没有设置namesrv地址时,broker自己去找合适的namesrv地址,生产一般不用这种情况,所以也不去研究这块publicstaticfinalStringDEFAULT_NAMESRV_ADDR_LOOKUP="jmenv.tbsite.net";publicstaticfinalStringWS_DOMAIN_NAME=System.getProperty("rocketmq.namesrv.domain",DEFAULT_NAMESRV_ADDR_LOOKUP);publicstaticfinalStringWS_DOMAIN_SUBGROUP=System.getProperty("rocketmq.namesrv.domain.subgroup","nsaddr");//http://jmenv.tbsite.net:8080/rocketmq/nsaddr//public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP;//默认自动创建的topic名publicstaticfinalStringAUTO_CREATE_TOPIC_KEY_TOPIC="TBW102";// Will be created at broker when isAutoCreateTopicEnablepublicstaticfinalStringBENCHMARK_TOPIC="BenchmarkTest";publicstaticfinalStringDEFAULT_PRODUCER_GROUP="DEFAULT_PRODUCER";publicstaticfinalStringDEFAULT_CONSUMER_GROUP="DEFAULT_CONSUMER";publicstaticfinalStringTOOLS_CONSUMER_GROUP="TOOLS_CONSUMER";publicstaticfinalStringFILTERSRV_CONSUMER_GROUP="FILTERSRV_CONSUMER";publicstaticfinalStringMONITOR_CONSUMER_GROUP="__MONITOR_CONSUMER";publicstaticfinalStringCLIENT_INNER_PRODUCER_GROUP="CLIENT_INNER_PRODUCER";publicstaticfinalStringSELF_TEST_PRODUCER_GROUP="SELF_TEST_P_GROUP";publicstaticfinalStringSELF_TEST_CONSUMER_GROUP="SELF_TEST_C_GROUP";publicstaticfinalStringSELF_TEST_TOPIC="SELF_TEST_TOPIC";publicstaticfinalStringOFFSET_MOVED_EVENT="OFFSET_MOVED_EVENT";publicstaticfinalStringONS_HTTP_PROXY_GROUP="CID_ONS-HTTP-PROXY";publicstaticfinalStringCID_ONSAPI_PERMISSION_GROUP="CID_ONSAPI_PERMISSION";publicstaticfinalStringCID_ONSAPI_OWNER_GROUP="CID_ONSAPI_OWNER";publicstaticfinalStringCID_ONSAPI_PULL_GROUP="CID_ONSAPI_PULL";publicstaticfinalStringCID_RMQ_SYS_PREFIX="CID_RMQ_SYS_";publicstaticfinalList<String>LOCAL_INET_ADDRESS=getLocalInetAddress();publicstaticfinalStringLOCALHOST=localhost();publicstaticfinalStringDEFAULT_CHARSET="UTF-8";publicstaticfinallongMASTER_ID=0L;publicstaticfinallongCURRENT_JVM_PID=getPID();publicstaticfinalStringRETRY_GROUP_TOPIC_PREFIX="%RETRY%";publicstaticfinalStringDLQ_GROUP_TOPIC_PREFIX="%DLQ%";publicstaticfinalStringSYSTEM_TOPIC_PREFIX="rmq_sys_";publicstaticfinalStringUNIQUE_MSG_QUERY_FLAG="_UNIQUE_KEY_QUERY";publicstaticfinalStringDEFAULT_TRACE_REGION_ID="DefaultRegion";publicstaticfinalStringCONSUME_CONTEXT_TYPE="ConsumeContextType";publicstaticfinalStringRMQ_SYS_TRANS_HALF_TOPIC="RMQ_SYS_TRANS_HALF_TOPIC";publicstaticfinalStringRMQ_SYS_TRACE_TOPIC="RMQ_SYS_TRACE_TOPIC";publicstaticfinalStringRMQ_SYS_TRANS_OP_HALF_TOPIC="RMQ_SYS_TRANS_OP_HALF_TOPIC";publicstaticfinalStringCID_SYS_RMQ_TRANS="CID_RMQ_SYS_TRANS";publicstaticfinalStringACL_CONF_TOOLS_FILE="/conf/tools.yml";}