当前位置: 首页 > news >正文

03 RocketMQ - Broker 源码分析

文章目录

    • Broker 整体流程
    • Broker 消息存储
    • Broker 启动源码
    • Broker 消息写入源码
    • Broker 核心设计
      • NRS 与 NRC 的功能号设计
      • Commitlog 写入时锁的选择
      • MMAP
      • 堆外内存

Broker 整体流程

Broker 消息存储

  • CommitLog
    消息存储文件,所有的消息都存储在 CommitLog 中
    在这里插入图片描述
    CommitLog 以物理文件的方式存放,每台 Broker 上的 CommitLog 被本机器所有 ConsumeQueue 共享。每一个消息的存储长度是不固定的,前4个字节存储该条消息的总长度。在 CommitLog 中顺序写,随机读。CommitLog 文件默认大小为 1G,当文件满了,写入下一个文件,可通过在 broker 配置文件中设置 mapedFileSizeCommitLog 属性来改变默认大小。

  • ConsumeQueue
    ConsumeQueue 消息消费队列,可以看成是基于topic的commitlog索引文件,存储的是消息的物理存储的地址。每个Topic下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件,引入的目的主要是提高消息消费的性能。
    在这里插入图片描述
    在这里插入图片描述
    ConsumeQueue中存储的是消息条目,为了加速 ConsumeQueue 消息条目的检索速度与节省磁盘空间,每一个 Consumequeue条目不会存储消息的全量信息,消息条目如下:

    ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。

  • IndexFile
    IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:$HOME/store/index/{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故RocketMQ的索引文件其底层实现为hash索引。

    在这里插入图片描述

  • Config
    config 文件夹中存储着 Topic 和 Consumer 等相关信息。主题和消费者群组相关的信息就存在此。
    consumerFilter.json :主题消息过滤信息
    consumerOffset.json :集群消费模式消息消进度
    delayOffset.json :延时消息队列拉取进度
    subscriptionGroup.json :消息消费组配置信息
    topics.json : topic 配置属性

    在这里插入图片描述

Broker 启动源码


BrokerStartup.class

public static void main(String[] args) {
        start(createBrokerController(args));
    }

BrokerStartup::createBrokerController()

  1. 解析命令行参数
  2. 是否选择 dleger技术(raft)
  3. 创建 BrokerController
  4. 初始化 BrokerController
public static BrokerController createBrokerController(String[] args) {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
            NettySystemConfig.socketSndbufSize = 131072;
        }

        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
            NettySystemConfig.socketRcvbufSize = 131072;
        }

        try {
            //PackageConflictDetect.detectFastjson();
            // 1.解析命令行参数
            Options options = ServerUtil.buildCommandlineOptions(new Options());
            commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                new PosixParser());
            if (null == commandLine) {
                System.exit(-1);
            }

            final BrokerConfig brokerConfig = new BrokerConfig();
            //netty服务器配置,与生产者通信
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            //netty客户端配置,与NameSever通信
            final NettyClientConfig nettyClientConfig = new NettyClientConfig();

            nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
                String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
            nettyServerConfig.setListenPort(10911);
            final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
            //如果是从节点
            if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
                int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
                messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
            }
            //解析命令行中 -c 的参数
            if (commandLine.hasOption('c')) {
                String file = commandLine.getOptionValue('c');
                if (file != null) {
                    configFile = file;
                    InputStream in = new BufferedInputStream(new FileInputStream(file));
                    properties = new Properties();
                    properties.load(in);

                    properties2SystemEnv(properties);
                    MixAll.properties2Object(properties, brokerConfig);
                    MixAll.properties2Object(properties, nettyServerConfig);
                    MixAll.properties2Object(properties, nettyClientConfig);
                    MixAll.properties2Object(properties, messageStoreConfig);

                    BrokerPathConfigHelper.setBrokerConfigPath(file);
                    in.close();
                }
            }

            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

            if (null == brokerConfig.getRocketmqHome()) {
                System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
                System.exit(-2);
            }
            //获取nameSever地址
            String namesrvAddr = brokerConfig.getNamesrvAddr();
            if (null != namesrvAddr) {
                try {
                    String[] addrArray = namesrvAddr.split(";");
                    for (String addr : addrArray) {
                        RemotingUtil.string2SocketAddress(addr);
                    }
                } catch (Exception e) {
                    System.out.printf(
                        "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
                        namesrvAddr);
                    System.exit(-3);
                }
            }
            //主从设置
            switch (messageStoreConfig.getBrokerRole()) {
                case ASYNC_MASTER:
                case SYNC_MASTER:
                    brokerConfig.setBrokerId(MixAll.MASTER_ID);
                    break;
                case SLAVE:
                    if (brokerConfig.getBrokerId() <= 0) {
                        System.out.printf("Slave's brokerId must be > 0");
                        System.exit(-3);
                    }

                    break;
                default:
                    break;
            }
            // 2.是否选择 dleger技术 raft:类似于ZK选举,但是性能不好,BUG比较多,推荐不使用
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
                brokerConfig.setBrokerId(-1);
            }

            messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
            JoranConfigurator configurator = new JoranConfigurator();
            configurator.setContext(lc);
            lc.reset();
            configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");

            if (commandLine.hasOption('p')) {
                InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
                MixAll.printObjectProperties(console, brokerConfig);
                MixAll.printObjectProperties(console, nettyServerConfig);
                MixAll.printObjectProperties(console, nettyClientConfig);
                MixAll.printObjectProperties(console, messageStoreConfig);
                System.exit(0);
            } else if (commandLine.hasOption('m')) {
                InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
                MixAll.printObjectProperties(console, brokerConfig, true);
                MixAll.printObjectProperties(console, nettyServerConfig, true);
                MixAll.printObjectProperties(console, nettyClientConfig, true);
                MixAll.printObjectProperties(console, messageStoreConfig, true);
                System.exit(0);
            }

            log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
            MixAll.printObjectProperties(log, brokerConfig);
            MixAll.printObjectProperties(log, nettyServerConfig);
            MixAll.printObjectProperties(log, nettyClientConfig);
            MixAll.printObjectProperties(log, messageStoreConfig);
            // 3.创建 BrokerController
            final BrokerController controller = new BrokerController(
                brokerConfig,
                nettyServerConfig,
                nettyClientConfig,
                messageStoreConfig);
            // remember all configs to prevent discard
            controller.getConfiguration().registerConfig(properties);
            //4. 初始化 BrokerController
            boolean initResult = controller.initialize();
            if (!initResult) {
                controller.shutdown();
                System.exit(-3);
            }
            //jvm关闭的勾子函数
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                private volatile boolean hasShutdown = false;
                private AtomicInteger shutdownTimes = new AtomicInteger(0);

                @Override
                public void run() {
                    synchronized (this) {
                        log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
                        if (!this.hasShutdown) {
                            this.hasShutdown = true;
                            long beginTime = System.currentTimeMillis();
                            controller.shutdown();
                            long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                            log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
                        }
                    }
                }
            }, "ShutdownHook"));

            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

BrokerController::initialize()
1.加载 config 文件夹里面的相关信息
2.创建消息存储管理组件
3.存储组件启动(包含零拷贝MMAP技术)
4.创建处理消息的线程池
5.注册各种处理消息请求(功能号设计)
6.开启持久化配置文件相关定时任务
7.处理事务相关消息

public boolean initialize() throws CloneNotSupportedException {
        //1.加载 config 信息
        //todo 加载Broker中的主题信息  json
        boolean result = this.topicConfigManager.load();
        //todo 加载消费进度
        result = result && this.consumerOffsetManager.load();
        //todo 加载订阅信息
        result = result && this.subscriptionGroupManager.load();
        //todo 加载订消费者过滤信息
        result = result && this.consumerFilterManager.load();

        if (result) {
            try {
                //2.创建消息存储管理组件
                this.messageStore =
                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                        this.brokerConfig);
                //如果开启了多副本机制,会为集群节点选主器添加roleChangeHandler事件处理器,即节点发送变更后的事件处理器。
                if (messageStoreConfig.isEnableDLegerCommitLog()) {
                    DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                    ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
                }
                //broker的统计组件
                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                //load plugin
                MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
            } catch (IOException e) {
                result = false;
                log.error("Failed to initialize", e);
            }
        }
        //3.存储组件启动(包含零拷贝MMAP技术)

        result = result && this.messageStore.load();
        if (result) {
            //构建netty服务端(监听10911)
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
            NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
            //在RocketMQ 4.5.1之后,10909端口vipChannelEnabled默认为了false)
            fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
            this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
            //4.创建处理消息的线程池
            //发送消息的线程池
            this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getSendMessageThreadPoolNums(),
                this.brokerConfig.getSendMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.sendThreadPoolQueue,
                new ThreadFactoryImpl("SendMessageThread_"));
            //pull消息的线程池
            this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getPullMessageThreadPoolNums(),
                this.brokerConfig.getPullMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.pullThreadPoolQueue,
                new ThreadFactoryImpl("PullMessageThread_"));
            //应答线程池
            this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.replyThreadPoolQueue,
                new ThreadFactoryImpl("ProcessReplyMessageThread_"));
            //查询线程池
            this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getQueryMessageThreadPoolNums(),
                this.brokerConfig.getQueryMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.queryThreadPoolQueue,
                new ThreadFactoryImpl("QueryMessageThread_"));

            this.adminBrokerExecutor =
                Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                    "AdminBrokerThread_"));

            this.clientManageExecutor = new ThreadPoolExecutor(
                this.brokerConfig.getClientManageThreadPoolNums(),
                this.brokerConfig.getClientManageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.clientManagerThreadPoolQueue,
                new ThreadFactoryImpl("ClientManageThread_"));
            //心跳处理的线程池
            this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getHeartbeatThreadPoolNums(),
                this.brokerConfig.getHeartbeatThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.heartbeatThreadPoolQueue,
                new ThreadFactoryImpl("HeartbeatThread_", true));

            this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getEndTransactionThreadPoolNums(),
                this.brokerConfig.getEndTransactionThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.endTransactionThreadPoolQueue,
                new ThreadFactoryImpl("EndTransactionThread_"));

            this.consumerManageExecutor =
                Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                    "ConsumerManageThread_"));
            //5.注册各种处理消息请求(功能号设计)
            this.registerProcessor();
            //6.开启持久化配置文件相关定时任务
            final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
            final long period = 1000 * 60 * 60 * 24;
            //检查broker的状态
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.getBrokerStats().record();
                    } catch (Throwable e) {
                        log.error("schedule record error.", e);
                    }
                }
            }, initialDelay, period, TimeUnit.MILLISECONDS);
            //consumerOffset
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumerOffset error.", e);
                    }
                }
            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerFilterManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumer filter error.", e);
                    }
                }
            }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.protectBroker();
                    } catch (Throwable e) {
                        log.error("protectBroker error.", e);
                    }
                }
            }, 3, 3, TimeUnit.MINUTES);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.printWaterMark();
                    } catch (Throwable e) {
                        log.error("printWaterMark error.", e);
                    }
                }
            }, 10, 1, TimeUnit.SECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                    } catch (Throwable e) {
                        log.error("schedule dispatchBehindBytes error.", e);
                    }
                }
            }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

            if (this.brokerConfig.getNamesrvAddr() != null) {
                this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
            } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                        } catch (Throwable e) {
                            log.error("ScheduledTask fetchNameServerAddr exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
            }

            if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                        this.updateMasterHAServerAddrPeriodically = false;
                    } else {
                        this.updateMasterHAServerAddrPeriodically = true;
                    }
                } else {
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.printMasterAndSlaveDiff();
                            } catch (Throwable e) {
                                log.error("schedule printMasterAndSlaveDiff error.", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
                }
            }

            if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                // Register a listener to reload SslContext
                try {
                    fileWatchService = new FileWatchService(
                        new String[] {
                            TlsSystemConfig.tlsServerCertPath,
                            TlsSystemConfig.tlsServerKeyPath,
                            TlsSystemConfig.tlsServerTrustCertPath
                        },
                        new FileWatchService.Listener() {
                            boolean certChanged, keyChanged = false;

                            @Override
                            public void onChanged(String path) {
                                if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                    log.info("The trust certificate changed, reload the ssl context");
                                    reloadServerSslContext();
                                }
                                if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                    certChanged = true;
                                }
                                if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                    keyChanged = true;
                                }
                                if (certChanged && keyChanged) {
                                    log.info("The certificate and private key changed, reload the ssl context");
                                    certChanged = keyChanged = false;
                                    reloadServerSslContext();
                                }
                            }

                            private void reloadServerSslContext() {
                                ((NettyRemotingServer) remotingServer).loadSslContext();
                                ((NettyRemotingServer) fastRemotingServer).loadSslContext();
                            }
                        });
                } catch (Exception e) {
                    log.warn("FileWatchService created error, can't load the certificate dynamically");
                }
            }
            //7.处理事务相关消息
            initialTransaction();
            initialAcl();
            initialRpcHooks();
        }
        return result;
    }

BrokerStartup::start()

public static BrokerController start(BrokerController controller) {
        try {
            //启动
            controller.start();

            String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
                + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();

            if (null != controller.getBrokerConfig().getNamesrvAddr()) {
                tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
            }

            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

BrokerController::start()

public void start() throws Exception {
        //启动消息存储组件
        if (this.messageStore != null) {
            this.messageStore.start();
        }
        //启动netty服务器
        if (this.remotingServer != null) {
            this.remotingServer.start();
        }

        if (this.fastRemotingServer != null) {
            this.fastRemotingServer.start();
        }

        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
        //todo 对外通信组件,例如给namesever发心跳
        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.start();
        }

        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.start();
        }

        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.start();
        }

        if (this.filterServerManager != null) {
            this.filterServerManager.start();
        }
        //开启DLedger多副本主从切换
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            //开启事务状态回查处理器
            startProcessorByHa(messageStoreConfig.getBrokerRole());
            //处理从节点的元数据同步
            handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
            this.registerBrokerAll(true, false, true);
        }
        //todo broker每隔30s向NameServer发送心跳包
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    //todo 向NameServer发送心跳包
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.start();
        }

        if (this.brokerFastFailure != null) {
            this.brokerFastFailure.start();
        }


    }

Broker 消息写入源码

存储设计层次:
image.png

  • 业务层:也可以称之为网络层,就是收到消息之后,一般交给 SendMessageProcessor 来分配(交给哪个业务来处理)。DefaultMessageStore,这个是存储层最核心的入口。
  • 存储逻辑层:主要负责各种存储的逻辑,里面有很多跟存储同名的类。
  • 存储I/O层:主要负责存储的具体的消息与I/O处理。

普通消息写入流程:
RocketMQ 使用 Netty 处理网络,broker 收到消息写入的请求就会进入 SendMessageProcessor类 中processRequest 方法。

最终进入DefaultMessageStore类中asyncPutMessage方法进行消息的存储

然后消息进入commitlog类中的asyncPutMessage方法进行消息的存储

Broker 核心设计

NRS 与 NRC 的功能号设计

客户端发送消息时定义一个code,对应一个功能,服务端注册一个业务处理,对应一个code的业务处理。code对应码表RequestCode类。

客户端:
RocketMQ 的通讯使用的是 Netty,作为客户端核心类有两种:RemotingCommand与NettyRemotingClient。

  • RemotingCommand 主要处理消息的组装:包括消息头、消息序列化与反序列化。
  • NettyRemotingClient 主要处理消息的发送:包括同步、异步、单向、注册等操作。

发送消息传入 code 值源码
在这里插入图片描述


拉取消息传入 code 值源码
在这里插入图片描述

服务端:
Broker 启动时将服务端需要处理的 ExecutorService 存放到 processorTable 中

protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

在启动流程中 BrokerController 类中的 initialize() 中
image.png
image.png

Commitlog 写入时锁的选择

RocketMQ 在写入消息到 CommitLog 中时,使用了锁机制,即同一时刻只有一个线程可以写CommitLog文件。CommitLog 中使用了两种锁,一个是自旋锁,另一个是重入锁

在这里插入图片描述
根据配置可选择使用自旋锁或重入锁,在CommitLog 的构造方法中可看到

//useReentrantLockWhenPutMessage参数默认是false,使用自旋锁。异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁
        this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ?
                new PutMessageReentrantLock() : new PutMessageSpinLock();

//重入锁
public class PutMessageReentrantLock implements PutMessageLock {
    private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync

    @Override
    public void lock() {
        putMessageNormalLock.lock();
    }

    @Override
    public void unlock() {
        putMessageNormalLock.unlock();
    }
}

//自旋锁
public class PutMessageSpinLock implements PutMessageLock {
    //true: Can lock, false : in lock.
    private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);

    @Override
    public void lock() {
        boolean flag;
        do {
            flag = this.putMessageSpinLock.compareAndSet(true, false);
        }
        while (!flag);
    }

    @Override
    public void unlock() {
        this.putMessageSpinLock.compareAndSet(false, true);
    }
}

RocketMQ 官方文档优化建议:异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁

  • 同步刷盘时,锁竞争激烈,会有较多的线程处于等待阻塞等待锁的状态,如果采用自旋锁会浪费很多的CPU时间,所以“同步刷盘建议使用重入锁”。

  • 异步刷盘时,间隔一定的时间刷一次盘,锁竞争不激烈,不会存在大量阻塞等待锁的线程,偶尔锁等待就自旋等待一下很短的时间,不要进行上下文切换了,所以采用自旋锁更合适。

MMAP

10分钟了解什么是内存映射MMAP!
mmap ---- 内存映射原理

RocketMQ底层对commitLog、consumeQueue之类的磁盘文件的读写操作都采用了mmap技术。具体到代码里面就是利用JDK里面NIO的MapperByteBuffer的map()函数,来先将磁盘文件(CommitLog文件、consumeQueue文件)映射到内存里来。

实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上,即完成了对文件的操作而不必再调用 read、write 等系统调用函数。相反,内核空间对这段区域的修改也直接反映用户空间,从而可以实现不同进程间的文件共享。

相关源码:
MappedFile::init()
将磁盘文件映射到内存

private void init(final String fileName, final int fileSize) throws IOException {
        this.fileName = fileName;
        this.fileSize = fileSize;
        this.file = new File(fileName);
        this.fileFromOffset = Long.parseLong(this.file.getName());
        boolean ok = false;

        ensureDirOK(this.file.getParent());

        try {
            //文件通道 fileChannel
            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
            //FileChannel配合着ByteBuffer,将读写的数据缓存到内存中(操纵大文件时可以显著提高效率)
            //MappedByteBuffer (零拷贝之内存映射:mmap)
            //FileChannel 定义了一个 map() 方法,它可以把一个文件从 position 位置开始的 size 大小的区域映射为内存映像文件
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

            //原子操作类 ---CAS的原子操作类--多线程效率(加锁)
            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
            TOTAL_MAPPED_FILES.incrementAndGet();
            ok = true;
        } catch (FileNotFoundException e) {
            log.error("Failed to create file " + this.fileName, e);
            throw e;
        } catch (IOException e) {
            log.error("Failed to map file " + this.fileName, e);
            throw e;
        } finally {
            if (!ok && this.fileChannel != null) {
                this.fileChannel.close();
            }
        }
    }

MappedFile::appendMessagesInner()
消息写入

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;
        //当前这个MaapedFile的写入位置
        int currentPos = this.wrotePosition.get();

        if (currentPos < this.fileSize) {
            //异步输盘时还有两种刷盘模式可以选择
            //TODO 如果writeBuffer!= null开启了堆外内存缓冲,使用writeBuffer,否则使用mappedByteBuffer(也是继承的ByteBuffer)
            //slice方法创建一个新的字节缓冲区
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);//指定ByteBuffer中的position
            AppendMessageResult result;
            if (messageExt instanceof MessageExtBrokerInner) {
                //todo 非批量处理
                //写入具体的数据 commitlog中的数据格式
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }

堆外内存

JVM——堆外内存详解

开启堆外内存需要修改配置文件broker:transientStorePoolEnable=true,且必须为异步刷盘和主节点
DefaultMessageStore::DefaultMessageStore()





堆外缓冲区流程

RocketMQ单独创建一个ByteBuffer内存缓存池,用来临时存储数据,数据先写入该内存映射中,然后由commit线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。RocketMQ引入该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。同时因为是堆外内存,这么设计可以避免频繁的GC。

本质上分为两个阶段:
一个阶段先写入堆外内存,另外一个阶段通过定时任务再写入文件。

从图中可以发现,默认方式,mmap+PageCache的方式,读写消息都走的是pageCache(MappedByteBuffer类),这样读写都在pagecache里面不可避免会有锁的问题,在并发的读写操作情况下,会出现缺页中断降低,内存加锁,污染页的回写(脏页面)。

而如果采用堆外缓冲区,DirectByteBuffer(堆外内存)+PageCache的两层架构方式,这样子可以实现读写消息分离,写入消息时候写到的是DirectByteBuffer——堆外内存中,读消息走的是PageCache(MappedByteBuffer类),带来的好处就是,避免了内存操作的很多容易堵的地方,降低了时延,比如说缺页中断降低,内存加锁,污染页的回写。

所以使用堆外缓冲区的方式相对来说会比较好,但是肯定的是,需要消耗一定的内存,如果服务器内存吃紧就不推荐这种模式,同时的话,堆外缓冲区的话也需要配合异步刷盘才能使用(因为写数据分成了两步,同步刷盘延迟就会比较大)。

相关文章:

  • Java日志系列——规范化日志
  • 00前言说明-Qt自定义控件大全
  • 简历内容整理
  • 金仓数据库KingbaseES客户端编程接口指南-ado.net(7. Kdbnpg支持的类型和类型映射)
  • CTCLoss原理解读
  • 数字孪生|数字孪生装备-概念与内涵
  • 图像相似度对比分析软件,图像相似度算法有哪些
  • 《深入理解JAVA虚拟机(第2版)》—— 学习笔记2
  • 高并发面试:线程池的七大参数?手写一个线程池?
  • 【音视频—基础】分辨率、码率和帧率
  • 异常点检测的应用场景与检测方法(含代码实操案例)
  • 【存储数据恢复】NetApp存储误删除的数据恢复案例
  • 期货开户保证金能极端行情保安全
  • 如何写出详细且易于阅读PRD?
  • [paper] lift,splat,shooting 论文浅析
  • 2017 年终总结 —— 在路上
  • canvas 五子棋游戏
  • JAVA多线程机制解析-volatilesynchronized
  • mysql 数据库四种事务隔离级别
  • Python 基础起步 (十) 什么叫函数?
  • Vim Clutch | 面向脚踏板编程……
  • 利用jquery编写加法运算验证码
  • 漫谈开发设计中的一些“原则”及“设计哲学”
  • 区块链将重新定义世界
  • 山寨一个 Promise
  • 手机端车牌号码键盘的vue组件
  • 微信小程序开发问题汇总
  • 为物联网而生:高性能时间序列数据库HiTSDB商业化首发!
  • 06-01 点餐小程序前台界面搭建
  • 分布式关系型数据库服务 DRDS 支持显示的 Prepare 及逻辑库锁功能等多项能力 ...
  • 曾刷新两项世界纪录,腾讯优图人脸检测算法 DSFD 正式开源 ...
  • ​LeetCode解法汇总2696. 删除子串后的字符串最小长度
  • #HarmonyOS:软件安装window和mac预览Hello World
  • (01)ORB-SLAM2源码无死角解析-(56) 闭环线程→计算Sim3:理论推导(1)求解s,t
  • (1)Nginx简介和安装教程
  • (5)STL算法之复制
  • (C#)if (this == null)?你在逗我,this 怎么可能为 null!用 IL 编译和反编译看穿一切
  • (C++17) optional的使用
  • (附源码)springboot车辆管理系统 毕业设计 031034
  • (提供数据集下载)基于大语言模型LangChain与ChatGLM3-6B本地知识库调优:数据集优化、参数调整、Prompt提示词优化实战
  • (转)Android中使用ormlite实现持久化(一)--HelloOrmLite
  • .bat批处理(三):变量声明、设置、拼接、截取
  • .Net core 6.0 升8.0
  • .Net Core/.Net6/.Net8 ,启动配置/Program.cs 配置
  • .NET Core实战项目之CMS 第一章 入门篇-开篇及总体规划
  • .NET 的程序集加载上下文
  • .net 前台table如何加一列下拉框_如何用Word编辑参考文献
  • .NET 使用 ILMerge 合并多个程序集,避免引入额外的依赖
  • .net分布式压力测试工具(Beetle.DT)
  • .Net转Java自学之路—基础巩固篇十三(集合)
  • .vue文件怎么使用_我在项目中是这样配置Vue的
  • /bin、/sbin、/usr/bin、/usr/sbin
  • @Validated和@Valid校验参数区别
  • [ vulhub漏洞复现篇 ] Django SQL注入漏洞复现 CVE-2021-35042
  • [ 隧道技术 ] 反弹shell的集中常见方式(四)python反弹shell