当前位置: 首页 > news >正文

RocketMQ 消费者拉取消息(Pull) 解析——图解、源码级解析

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2022年9月30日

🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • 消费者拉取消息(Pull)示例
  • fetchSubscribeMessageQueues
  • 拉取消息的核心代码

消费者拉取消息(Pull)示例

消费者使用Pull方式拉取消息的流程和Push消息的流程基本类似,包括创建消费者对象、设置组名、启动消费者消费。代码如下:

package com.wjw;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class PullConsumer {
    // 存储队列offset
    private static final Map<MessageQueue, Long> OFFSET_TABLE = new HashMap<>();

    public static void main(String[] args) throws Exception{
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("group A");
        // 启动消费者
        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Target Topic");
        for (MessageQueue mq : mqs) {
            System.out.println("Consume message from " + mq);
            // 拉取消息
            PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, OFFSET_TABLE.get(mq), 32);
            System.out.println("pullResult : " + pullResult);
            // 设置该MQ的offset
            OFFSET_TABLE.put(mq, pullResult.getNextBeginOffset());
        }
        consumer.shutdown();
    }
}

将上面的流程概括一下:

  1. 创建Pull模式的消费者对象
  2. 启动消费者消费
  3. 调用fetchSubscribeMessageQueues方法,根据Topic名称查询对应的MQ,主动拉取消息
  4. 循环遍历MQ,对于遍历到的每个MQ,取出一条消息

fetchSubscribeMessageQueues

获取所有MQ的方法源码如下,该方法位于org/apache/rocketmq/client/impl/MQAdminImpl.java中:

public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
        try {
        	// 从注册中心获取路由信息
            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
            // 如果路由信息不为空则获取路由信息中的队列集合
            if (topicRouteData != null) {
                Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                if (!mqList.isEmpty()) {
                    return mqList;
                } else {
                    throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
                }
            }
        } catch (Exception e) {
            throw new MQClientException(
                "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
                e);
        }

        throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
    }

上述代码首先从注册中心中获取TopicRouteData,其中存储了路由信息:

在这里插入图片描述

  • orderTopicConf:顺序消息配置

它的格式为:BrokerName1:QueueId1;BrokerName2:QueueId2;…BrokerNameN:QueueIdN;

  • queueDatas:队列数据数组
  • brokerAddr:Broker数据数组
  • filterServerTable:Broker地址和Filter Server之间的映射

如果拿到的TopicRouteData不为空,则提取TopicRouteData内的QueueData生成MQ,这个MQ就是当前订阅的Topic下的。如果队列集合不为空,就会直接返回。


拉取消息的核心代码

拉取消息的核心方法是pullSyncImpl,在这个方法里实现了消息的拉取

private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
        long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.isRunning();

        if (null == mq) {
            throw new MQClientException("mq is null", null);
        }

        if (offset < 0) {
            throw new MQClientException("offset < 0", null);
        }

        if (maxNums <= 0) {
            throw new MQClientException("maxNums <= 0", null);
        }

        this.subscriptionAutomatically(mq.getTopic());

        int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);

        long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;

        boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
        // 拉取消息
        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
            mq,
            subscriptionData.getSubString(),
            subscriptionData.getExpressionType(),
            isTagType ? 0L : subscriptionData.getSubVersion(),
            offset,
            maxNums,
            sysFlag,
            0,
            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
            timeoutMillis,
            CommunicationMode.SYNC,
            null
        );
        
        // 对消息数据进行处理
        this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
        // 如果namespace不是空的,则重置没有命名空间的Topic。
        this.resetTopic(pullResult.getMsgFoundList());
        
        // 把消息数据设置到上下文对象ConsumeMessageContext里
        if (!this.consumeMessageHookList.isEmpty()) {
            ConsumeMessageContext consumeMessageContext = null;
            consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace());
            consumeMessageContext.setConsumerGroup(this.groupName());
            consumeMessageContext.setMq(mq);
            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
            consumeMessageContext.setSuccess(false);
            this.executeHookBefore(consumeMessageContext);
            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
            consumeMessageContext.setSuccess(true);
            this.executeHookAfter(consumeMessageContext);
        }
        return pullResult;
    }

相关文章:

  • 字符串函数的详解
  • 【BOOST C++指针专题07】Boost.Pool
  • C# 文件/文件夹操作(文本写入,追加,覆盖,清空,文件/文件夹新建,复制,删除,移动)+驱动器+目录+路径+Path类大全
  • Less预处理——变量和嵌套
  • BEVerse 中数据集预处理代码浅析
  • 算法每日一题(合并两个有序的数组)
  • SpringBoot整合Swagger
  • ProcExp的利用
  • C++内存管理
  • 33、Java 异常掌握这些就够了(图解 Java 中的异常)
  • springboot-方法处理4-消息转换器
  • FPGA底层资源综述
  • CLIP扩展
  • 从一维卷积、因果卷积(Causal CNN)、扩展卷积(Dilation CNN) 到 时间卷积网络 (TCN)
  • 高等数学(第七版)同济大学 习题8-2 个人解答
  • 【159天】尚学堂高琪Java300集视频精华笔记(128)
  • 【EOS】Cleos基础
  • CSS实用技巧
  • Date型的使用
  • Java 23种设计模式 之单例模式 7种实现方式
  • Java 9 被无情抛弃,Java 8 直接升级到 Java 10!!
  • JavaScript创建对象的四种方式
  • JAVA之继承和多态
  • Laravel深入学习6 - 应用体系结构:解耦事件处理器
  • Making An Indicator With Pure CSS
  • OpenStack安装流程(juno版)- 添加网络服务(neutron)- controller节点
  • PaddlePaddle-GitHub的正确打开姿势
  • ViewService——一种保证客户端与服务端同步的方法
  • 工程优化暨babel升级小记
  • 关键词挖掘技术哪家强(一)基于node.js技术开发一个关键字查询工具
  • 基于 Ueditor 的现代化编辑器 Neditor 1.5.4 发布
  • 讲清楚之javascript作用域
  • 聊聊springcloud的EurekaClientAutoConfiguration
  • 前端之Sass/Scss实战笔记
  • 探索 JS 中的模块化
  • RDS-Mysql 物理备份恢复到本地数据库上
  • ​3ds Max插件CG MAGIC图形板块为您提升线条效率!
  • (官网安装) 基于CentOS 7安装MangoDB和MangoDB Shell
  • (完整代码)R语言中利用SVM-RFE机器学习算法筛选关键因子
  • ******之网络***——物理***
  • .cn根服务器被攻击之后
  • .NET / MSBuild 扩展编译时什么时候用 BeforeTargets / AfterTargets 什么时候用 DependsOnTargets?
  • .Net Redis的秒杀Dome和异步执行
  • .NET开源的一个小而快并且功能强大的 Windows 动态桌面软件 - DreamScene2
  • .net最好用的JSON类Newtonsoft.Json获取多级数据SelectToken
  • /etc/fstab 只读无法修改的解决办法
  • /etc/sudoer文件配置简析
  • @ConfigurationProperties注解对数据的自动封装
  • @LoadBalanced 和 @RefreshScope 同时使用,负载均衡失效分析
  • [ CTF ] WriteUp-2022年春秋杯网络安全联赛-冬季赛
  • [20150321]索引空块的问题.txt
  • [BZOJ] 2006: [NOI2010]超级钢琴
  • [C++] new和delete
  • [DAU-FI Net开源 | Dual Attention UNet+特征融合+Sobel和Canny等算子解决语义分割痛点]
  • [Docker]十一.Docker Swarm集群raft算法,Docker Swarm Web管理工具