当前位置: 首页 > news >正文

【消息中间件】RocketMQ如何实现Producer的负载均衡

目录

一、前言

二、实现Producer的负载均衡

1、负载均衡选取一条消息队列并且高可用

1.1、模拟随机递增取模消息队列数为5

1.2、模拟随机递增取模消息队列数为6

1.3、判断Broker代理是否可用

2、更新故障项维护startTimestamp字段

2.1、退避运算

2.2、更新故障项维护startTimestamp字段

3、总结


一、前言

    Producer端在发送消息的时候,会先根据主题Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在MQFaultStrategy这个类中定义。这里有一个sendLatencyFaultEnable开关变量,如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550L ms,就退避30000L ms;超过1000L,就退避60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。

二、实现Producer的负载均衡

 上一节我们讲解到了生产者消息发送与重试的那块逻辑,也提到了容错策略,本篇文章我们先回顾一下以便接下来的内容分析。更新故障项updateFaultItem()方法中接收的数据为Broker代理的服务名、第二个入参数据就是发送消息远程调用到响应成功或失败所花的时间、第3个是boolean类型(成功或者中断异常情况下为false、其它情况下为true)

1、负载均衡选取一条消息队列并且高可用

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                // 随机递增计算索引
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                // 轮询,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,可以保证每次选择的不一样
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    // 取模
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // 判断broker代理是否可用,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }

                // 由于更新故障项,可能导致上面判断broker代理都不可用的情况,但不是绝对的,这里选取一个broker
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    // 同样是采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

该方法的作用就是负载均衡选取一条高可用的消息队列。

  1. sendLatencyFaultEnable开关打开,采用轮询的方式;采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,可以保证每次选择的不一样,即负载均衡;判断broker代理是否可用,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在
  2. 由于更新故障项,可能导致上面判断broker代理都不可用的情况,但不是绝对的,这里选取一个broker;同样是采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息

1.1、模拟随机递增取模消息队列数为5

    public static void main(String[] args) {
        ThreadLocalIndex threadLocalIndex = new ThreadLocalIndex();
        for (int i = 0; i < 10; i++) {
            int incrementAndGet = threadLocalIndex.incrementAndGet();
            // 模拟消息队列数为5
            int pos1 = Math.abs(incrementAndGet++)%5;
            System.out.println("第 " + i + " 为:" + incrementAndGet + "  5选取消息队列为:" + pos1);
        }
    }

实验结果:

实验结论:

由实验结果可以看出随机递增取模的方式选取队列,每次选取都是不同的消息队列,这就是负载均衡,也是轮询。

1.2、模拟随机递增取模消息队列数为6

    public static void main(String[] args) {
        ThreadLocalIndex threadLocalIndex = new ThreadLocalIndex();
        for (int i = 0; i < 10; i++) {
            int incrementAndGet = threadLocalIndex.incrementAndGet();
            // 模拟消息队列数为6
            int pos2 = Math.abs(incrementAndGet++)%6;
            System.out.println("第 " + i + " 为:" + incrementAndGet + "  6选取消息队列为:" + pos2);
        }
    }

实验结果:

实验结论:

同样由实验结果可以看出随机递增取模的方式选取队列,每次选取都是不同的消息队列,这就是负载均衡,也采用了轮询。虽然说这样可以保证在上次发送成功或失败的情况下不再选取同一条消息队列,即多负载了或认为其不可用,但是你仅通过轮询以及随机递增取模是不可以保证下一条消息队列就是可用的。故判断broker代理是否可用,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。

1.3、判断Broker代理是否可用

    @Override
    public boolean isAvailable(final String name) {
        final FaultItem faultItem = this.faultItemTable.get(name);
        if (faultItem != null) {
            return faultItem.isAvailable();
        }
        return true;
    }

    /**
     * 故障项
     */
    class FaultItem implements Comparable<FaultItem> {
        /**
         * brokerName
         */
        private final String name;
        /**
         * 这次发送消息到出现异常的时间
         */
        private volatile long currentLatency;
        /**
         * 在这个时间点以前,这个brokerName都会标记为可能存在故障
         */
        private volatile long startTimestamp;

        public boolean isAvailable() {
            return (System.currentTimeMillis() - startTimestamp) >= 0;
        }
    }
  • 判断brokerName对应下Broker代理是否可用,不是绝对的。
  • startTimestampLatencyFaultToleranceImpl内部类故障项FaultItem中维护的字段,作用是在这个时间点以前,这个brokerName对应下Broker代理都会标记为可能存在故障,即不可用。
  • 为了避免选择故障项,startTimestamp需要通过setter更新维护,而故障项如果恢复的话,你也要更新维护。那么维护它的时机是在哪里呢?前面有提及了更新故障项,那么是否也在该时机执行呢?

2、更新故障项维护startTimestamp字段

public class MQFaultStrategy {
    private final static InternalLogger log = ClientLogger.getLog();
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

    private boolean sendLatencyFaultEnable = false;

    /**
     * 潜伏期最大值
     */
    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    /**
     * 不可用持续时间
     */
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};


    /**
     * 该方法在发送消息成功或失败后执行
     *
     * @param brokerName 代理服务器服务名
     * @param currentLatency 当前等待时间 = 执行远程调用完毕时间 - 开始时间
     * @param isolation isolation
     */
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            // 计算不可用时间
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            // 延迟错误误差
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }

latencyMaxnotAvailableDuration是一一对应关系:latencyMax中数值越小即整个请求处理成功或失败所花的时间越少,越少那么往往都是成功的,故对应notAvailableDuration的值越小甚至0,以便isAvailable()判断Broker代理是否可用不过滤;相反,latencyMax中数值越大,那么故障的可能性越大,这时就尽量不再选择它,退避处理。

2.1、退避运算

    private long computeNotAvailableDuration(final long currentLatency) {
        // 倒序,一旦发送失败,那么尽最大可能避免再次选择发送失败的服务器
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }

        return 0;
    }

这里逻辑就是退避运算:这里数组索引是倒序的,一旦发送失败,那么尽最大可能避免再次选择发送失败的服务器。感兴趣的读者可以自己去模拟下实验,拿几组数据自己去探究探究。

2.2、更新故障项维护startTimestamp字段

    @Override
    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        // 根据代理服务器服务名brokerName,从本地缓存获取故障项
        FaultItem old = this.faultItemTable.get(name);
        // 没有新建再缓存
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            faultItem.setCurrentLatency(currentLatency);
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

            //
            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            // 存在则直接更新
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }

根据代理服务器服务名brokerName,从本地缓存获取故障项,没有新建再缓存;有则直接setter关系维护startTimestamp字段

3、总结

本篇文章涉及到的重点就是负载均衡的算法实现以及退避运算中涉及的设计思想等:采用轮询的方式;采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,可以保证每次选择的不一样,即负载均衡;判断broker代理是否可用,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在,内部两个二维数组字段参与到退避运算中,计算出的结果维护到startTimestamp字段,以便判断broker代理是否可用、达到高可用目的。

相关文章:

  • 阳了在家没事干?教大家用python在家做一个万能看视频软件,绝对正经啦~
  • Footprint Analytics 如何帮助区块链研究人员进行数据研究
  • 代码质量管理平台实战| SonarQube 安装、配置及 JaCoCo、Maven 集成
  • 黑马Hive+Spark离线数仓工业项目--数仓事实层DWB层构建(2)
  • VIAVI唯亚威光纤高分辨率多模 OTDR 测试方案
  • rust program英文和汉语混合笔记(4)
  • Attention:何为注意力机制?
  • 高级网络复习——防火墙,OSPF协议,rip协议,三层,DHCP中继知识题解(带答案)
  • 【BF算法】
  • 多线程与高并发(三)
  • 【Spring(二)】IoC入门案例(XML版)
  • 剑指offer----C语言版----第一天
  • 量子计算(十八):量子计算机
  • c语言操作符(上)
  • 计算机基础知识(基础入门小白专属)五
  • echarts的各种常用效果展示
  • Essential Studio for ASP.NET Web Forms 2017 v2,新增自定义树形网格工具栏
  • express + mock 让前后台并行开发
  • JavaScript创建对象的四种方式
  • JavaScript设计模式之工厂模式
  • 基于阿里云移动推送的移动应用推送模式最佳实践
  • 前端之React实战:创建跨平台的项目架构
  • 前嗅ForeSpider采集配置界面介绍
  • 软件开发学习的5大技巧,你知道吗?
  • 为什么要用IPython/Jupyter?
  • 为物联网而生:高性能时间序列数据库HiTSDB商业化首发!
  • 主流的CSS水平和垂直居中技术大全
  • 教程:使用iPhone相机和openCV来完成3D重建(第一部分) ...
  • ######## golang各章节终篇索引 ########
  • (2024.6.23)最新版MAVEN的安装和配置教程(超详细)
  • (BAT向)Java岗常问高频面试汇总:MyBatis 微服务 Spring 分布式 MySQL等(1)
  • (done) NLP “bag-of-words“ 方法 (带有二元分类和多元分类两个例子)词袋模型、BoW
  • (阿里云万网)-域名注册购买实名流程
  • (四) 虚拟摄像头vivi体验
  • (四)opengl函数加载和错误处理
  • (转)ObjectiveC 深浅拷贝学习
  • .locked1、locked勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复
  • .NET Core Web APi类库如何内嵌运行?
  • .Net IE10 _doPostBack 未定义
  • .NET 设计模式—简单工厂(Simple Factory Pattern)
  • .NET的微型Web框架 Nancy
  • .NET建议使用的大小写命名原则
  • @Autowired多个相同类型bean装配问题
  • @Autowired和@Resource装配
  • @select 怎么写存储过程_你知道select语句和update语句分别是怎么执行的吗?
  • [ C++ ] STL---string类的模拟实现
  • [3D游戏开发实践] Cocos Cyberpunk 源码解读-高中低端机性能适配策略
  • [AIGC 大数据基础]hive浅谈
  • [Android Pro] AndroidX重构和映射
  • [Android]通过PhoneLookup读取所有电话号码
  • [AutoSar]BSW_Memory_Stack_003 NVM与APP的显式和隐式同步
  • [BZOJ] 2044: 三维导弹拦截
  • [EFI]NUC11电脑 Hackintosh 黑苹果efi引导文件
  • [ERROR] ocp-server-ce-py_script_start_check-4.2.1 RuntimeError: ‘tenant_name‘
  • [JavaWeb]—Spring入门