当前位置: 首页 > news >正文

rocketmq-producer

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

producer业务流程

 

1.选择namesrv

/**
 * 选择namesrv
 * @return
 * @throws InterruptedException
 */
private Channel getAndCreateNameserverChannel() throws InterruptedException {
    String addr = this.namesrvAddrChoosed.get();
    if (addr != null) {
        ChannelWrapper cw = this.channelTables.get(addr);
        if (cw != null && cw.isOK()) {
            return cw.getChannel();
        }
    }

    //获取namesrv所有地址
    final List<String> addrList = this.namesrvAddrList.get();
    if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
        try {
            addr = this.namesrvAddrChoosed.get();
            if (addr != null) {
                ChannelWrapper cw = this.channelTables.get(addr);
                if (cw != null && cw.isOK()) {
                    return cw.getChannel();
                }
            }

            //顺序选择namesrv
            if (addrList != null && !addrList.isEmpty()) {
                for (int i = 0; i < addrList.size(); i++) {
                    int index = this.namesrvIndex.incrementAndGet();
                    index = Math.abs(index);
                    index = index % addrList.size();
                    String newAddr = addrList.get(index);

                    this.namesrvAddrChoosed.set(newAddr);
                    log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
                    Channel channelNew = this.createChannel(newAddr);
                    if (channelNew != null) {
                        return channelNew;
                    }
                }
            }
        } catch (Exception e) {
            log.error("getAndCreateNameserverChannel: create name server channel exception", e);
        } finally {
            this.lockNamesrvChannel.unlock();
        }
    } else {
        log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
    }

    return null;
}

2.获取默认的topic(TBW102)

第一次由于broker上面不存在自定义topic,会进行第二次获取,根据TBW102去获取。

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
    DefaultMQProducer defaultMQProducer) {
    try {
        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                TopicRouteData topicRouteData;
                if (isDefault && defaultMQProducer != null) {
                    // 获取topic = TBW102 的broker
                    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                        1000 * 3);
                    if (topicRouteData != null) {
                        for (QueueData data : topicRouteData.getQueueDatas()) {
                            //默认设置为4个队列
                            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                            data.setReadQueueNums(queueNums);
                            data.setWriteQueueNums(queueNums);
                        }
                    }
                } else {
                    //根据topic查询
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                }
                if (topicRouteData != null) {
                    TopicRouteData old = this.topicRouteTable.get(topic);
                    boolean changed = topicRouteDataIsChange(old, topicRouteData);
                    if (!changed) {
                        changed = this.isNeedUpdateTopicRouteInfo(topic);
                    } else {
                        log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                    }

                    if (changed) {
                        TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

                        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                            this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                        }

                        // Update Pub info
                        {
                            TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                            publishInfo.setHaveTopicRouterInfo(true);
                            Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                            while (it.hasNext()) {
                                Entry<String, MQProducerInner> entry = it.next();
                                MQProducerInner impl = entry.getValue();
                                if (impl != null) {
                                    impl.updateTopicPublishInfo(topic, publishInfo);
                                }
                            }
                        }

                        // Update sub info
                        {
                            Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                            Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                            while (it.hasNext()) {
                                Entry<String, MQConsumerInner> entry = it.next();
                                MQConsumerInner impl = entry.getValue();
                                if (impl != null) {
                                    impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                }
                            }
                        }
                        log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                        this.topicRouteTable.put(topic, cloneTopicRouteData);
                        return true;
                    }
                } else {
                    log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
                }
            } catch (Exception e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                    log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                }
            } finally {
                this.lockNamesrv.unlock();
            }
        } else {
            log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
        }
    } catch (InterruptedException e) {
        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
    }

    return false;
}

3.选择messageQueue

/**
 * 随机取模获取messageQueue
 * @return
 */
public MessageQueue selectOneMessageQueue() {
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}

4.sendMessage

组装requestHeader

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
    if (reconsumeTimes != null) {
        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
    }

    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
    if (maxReconsumeTimes != null) {
        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
    }

}
switch (communicationMode) {
    case ASYNC:
        Message tmpMessage = msg;
        if (msgBodyCompressed) {
            //If msg body was compressed, msgbody should be reset using prevBody.
            //Clone new message using commpressed message body and recover origin massage.
            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
            tmpMessage = MessageAccessor.cloneMessage(msg);
            msg.setBody(prevBody);
        }
        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTimeAsync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            tmpMessage,
            requestHeader,
            timeout - costTimeAsync,
            communicationMode,
            sendCallback,
            topicPublishInfo,
            this.mQClientFactory,
            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
            context,
            this);
        break;
    case ONEWAY:
    case SYNC:
        long costTimeSync = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTimeSync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }

        //发送
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            msg,
            requestHeader,
            timeout - costTimeSync,
            communicationMode,
            context,
            this);
        break;
    default:
        assert false;
        break;
}

//RequestCode.SEND_MESSAGE

RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);

 

转载于:https://my.oschina.net/penghaozhong/blog/3050481

相关文章:

  • 谈谈javascript语法里一些难点问题(一)
  • 火箭还是飞机?——DevOps 的两种模式
  • linux下smb打印服务器部署过程(对windows终端)
  • 从JEditorPane入手,分析其中的MVC模式
  • mmc控制台,访问不了目标主机
  • 网易有道——招聘
  • 使用IntelliJ IDEA 配置Maven
  • 【BZOJ】1082: [SCOI2005]栅栏(二分+dfs)
  • magento Mage custom helper not found
  • 通过递归组合多维数组!
  • 95th percentile concentration
  • 用oracle查询当前数据库中的所有表
  • Node图文教程(第四章:express)
  • ArcGIS查找空洞多边形
  • ARC066E/AtCoder2273 Addition and Subtraction Hard
  • [笔记] php常见简单功能及函数
  • Angular js 常用指令ng-if、ng-class、ng-option、ng-value、ng-click是如何使用的?
  • Asm.js的简单介绍
  • Dubbo 整合 Pinpoint 做分布式服务请求跟踪
  • JavaScript创建对象的四种方式
  • JS笔记四:作用域、变量(函数)提升
  • LeetCode算法系列_0891_子序列宽度之和
  • MySQL用户中的%到底包不包括localhost?
  • Python学习之路16-使用API
  • scrapy学习之路4(itemloder的使用)
  • sessionStorage和localStorage
  • SQLServer之索引简介
  • use Google search engine
  • 关于extract.autodesk.io的一些说明
  • 想写好前端,先练好内功
  • 小程序、APP Store 需要的 SSL 证书是个什么东西?
  • ​草莓熊python turtle绘图代码(玫瑰花版)附源代码
  • # Panda3d 碰撞检测系统介绍
  • (c语言)strcpy函数用法
  • (PHP)设置修改 Apache 文件根目录 (Document Root)(转帖)
  • (黑客游戏)HackTheGame1.21 过关攻略
  • (论文阅读22/100)Learning a Deep Compact Image Representation for Visual Tracking
  • (论文阅读40-45)图像描述1
  • (四)库存超卖案例实战——优化redis分布式锁
  • (详细版)Vary: Scaling up the Vision Vocabulary for Large Vision-Language Models
  • (一)kafka实战——kafka源码编译启动
  • (转)JVM内存分配 -Xms128m -Xmx512m -XX:PermSize=128m -XX:MaxPermSize=512m
  • ***通过什么方式***网吧
  • . Flume面试题
  • .NET 设计模式—适配器模式(Adapter Pattern)
  • .NET/C# 检测电脑上安装的 .NET Framework 的版本
  • .NET设计模式(11):组合模式(Composite Pattern)
  • .NET运行机制
  • @Data注解的作用
  • @Repository 注解
  • @select 怎么写存储过程_你知道select语句和update语句分别是怎么执行的吗?
  • [ C++ ] STL---stack与queue
  • [ element-ui:table ] 设置table中某些行数据禁止被选中,通过selectable 定义方法解决
  • [ Linux Audio 篇 ] 音频开发入门基础知识
  • [04] Android逐帧动画(一)