01 RocketMQ - NameServer 源码分析
文章目录
- NameServer 整体流程
- NameServer 启动流程
- 入口源码
- createNamesrvController()
- start()
- initialize()
- start()
- 存储
NameServer 整体流程
NameServer 是整个 RocketMQ 的“大脑”,它是 RocketMQ 的服务注册中心,所以 RocketMQ 需要先启动NameServer 再启动 Rocket中的 Broker。
-
NameServer 启动
启动监听,等待Broker、Producer、Comsumer连接。- Broker在启动时向所有NameServer注册
- 生产者在发送消息之前先从NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选择一台服务器进行消息发送
- 消费者在订阅某个主题的消息之前从NamerServer获取Broker服务器地址列表(有可能是集群),但是消费者选择从Broker中订阅消息,订阅规则由 Broker 配置决定
-
路由注册
Broker启动后向所有NameServer发送路由及心跳信息。 -
路由剔除
移除心跳超时的Broker相关路由信息。NameServer与每台Broker服务保持长连接,并间隔10S检查Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除。这样就可以实现RocketMQ的高可用。
NameServer 启动流程
从源码的启动可知,NameServer 单独启动。
入口类:NamesrvController
核心方法:NamesrvController 类中 main()->main0-> createNamesrvController()->start() -> initialize()
流程图如下:
入口源码
public static NamesrvController main0(String[] args) {
try {
// 组装 NamesrvController 数据
NamesrvController controller = createNamesrvController(args);
// 启动
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
createNamesrvController()
- 设置启动端口号
- 解析启动-c参数(如:-c E:\jar\RocketMQ\conf\broker.conf)
- 解析启动-p参数(测试的参数,会打印所有的参数信息)
- 将启动参数填充到namesrvConfig、nettyServerConfig
- 创建NameServerController
- 写入所有的配置
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
// 版本号
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
//创建NamesrvConfig
final NamesrvConfig namesrvConfig = new NamesrvConfig();
//创建NettyServerConfig
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//1. 设置启动端口号
nettyServerConfig.setListenPort(9876);
//2. 解析启动-c参数(如:-c E:\jar\RocketMQ\conf\broker.conf)
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
//3. 解析启动-p参数(测试的参数,会打印所有的参数信息)
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
//4. 将启动参数填充到namesrvConfig、nettyServerConfig
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
//5. 创建NameServerController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
//6. 写入所有的配置
controller.getConfiguration().registerConfig(properties);
return controller;
}
start()
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 1.初始化
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//钩子方法
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 2.启动
controller.start();
return controller;
}
initialize()
- 加载KV配置
- 创建NettyServer网络处理对象
- 开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker
- 开启定时任务:每隔10min打印一次KV配置
public boolean initialize() {
//1.加载KV配置
this.kvConfigManager.load();
//2.创建NettyServer网络处理对象
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//3.开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
//每隔10s扫描一次为活跃Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//todo 每隔10s扫描一次为活跃Broker
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//4.开启定时任务:每隔10min打印一次KV配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
scanNotActiveBroker()
//超过120s,则认为Broker失效
public void scanNotActiveBroker() {
//获得brokerLiveTable
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
//遍历brokerLiveTable
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
//如果收到心跳包的时间距当时时间是否超过120s
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
//关闭连接
RemotingUtil.closeChannel(next.getValue().getChannel());
//移除broker
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
//维护路由表(更新路由表)
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
start()
public void start() throws Exception {
this.remotingServer.start();//启动NRS组件
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
存储
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
//读写锁,一对锁,一个读锁,一个写锁,控制下列各个Map在并发读写下的安全性: 适用于 读多、写少, 效率特别高
private final ReadWriteLock lock = new ReentrantReadWriteLock();
//topic对应队列信息
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//broker信息
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//broker集群信息,key-集群名称 value-对应集群中所有broker名称
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//broker地址及对应对应broker存活信息(心跳信息)
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//broker地址及对应Filter Server列表
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
- ReentrantReadWriteLock
使用读写锁,针对于此读多写少的情况,使得并发性相比一般的排他锁有很大提升 - 存储基于内存
NameServer 的实现基于内存,并不会持久化路由信息,提高了 NameServer 的处理能力