当前位置: 首页 > news >正文

聊聊storm client的nimbus.seeds参数

本文主要研究一下storm client的nimbus.seeds参数

NIMBUS_SEEDS

storm-core-1.1.0-sources.jar!/org/apache/storm/Config.java

    /**
     * The host that the master server is running on, added only for backward compatibility,
     * the usage deprecated in favor of nimbus.seeds config.
     */
    @Deprecated
    @isString
    public static final String NIMBUS_HOST = "nimbus.host";

    /**
     * List of seed nimbus hosts to use for leader nimbus discovery.
     */
    @isStringList
    public static final String NIMBUS_SEEDS = "nimbus.seeds";
复制代码
  • 可以看到这里废除了 nimbus.host 参数,而 nimbus.seeds 参数主要用于发现 nimbus leader

StormSubmitter

storm-core-1.1.0-sources.jar!/org/apache/storm/StormSubmitter.java

    public static void submitTopologyAs(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser)
            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException {
        if(!Utils.isValidConf(stormConf)) {
            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
        }
        stormConf = new HashMap(stormConf);
        stormConf.putAll(Utils.readCommandLineOpts());
        Map conf = Utils.readStormConfig();
        conf.putAll(stormConf);
        stormConf.putAll(prepareZookeeperAuthentication(conf));

        validateConfs(conf, topology);

        Map<String,String> passedCreds = new HashMap<>();
        if (opts != null) {
            Credentials tmpCreds = opts.get_creds();
            if (tmpCreds != null) {
                passedCreds = tmpCreds.get_creds();
            }
        }
        Map<String,String> fullCreds = populateCredentials(conf, passedCreds);
        if (!fullCreds.isEmpty()) {
            if (opts == null) {
                opts = new SubmitOptions(TopologyInitialStatus.ACTIVE);
            }
            opts.set_creds(new Credentials(fullCreds));
        }
        try {
            if (localNimbus!=null) {
                LOG.info("Submitting topology " + name + " in local mode");
                if (opts!=null) {
                    localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts);
                } else {
                    // this is for backwards compatibility
                    localNimbus.submitTopology(name, stormConf, topology);
                }
                LOG.info("Finished submitting topology: " +  name);
            } else {
                String serConf = JSONValue.toJSONString(stormConf);
                try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
                    if (topologyNameExists(name, client)) {
                        throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                    }

                    // Dependency uploading only makes sense for distributed mode
                    List<String> jarsBlobKeys = Collections.emptyList();
                    List<String> artifactsBlobKeys;

                    DependencyUploader uploader = new DependencyUploader();
                    try {
                        uploader.init();

                        jarsBlobKeys = uploadDependencyJarsToBlobStore(uploader);

                        artifactsBlobKeys = uploadDependencyArtifactsToBlobStore(uploader);
                    } catch (Throwable e) {
                        // remove uploaded jars blobs, not artifacts since they're shared across the cluster
                        uploader.deleteBlobs(jarsBlobKeys);
                        uploader.shutdown();
                        throw e;
                    }

                    try {
                        setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys);
                        submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf, client);
                    } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                        // remove uploaded jars blobs, not artifacts since they're shared across the cluster
                        // Note that we don't handle TException to delete jars blobs
                        // because it's safer to leave some blobs instead of topology not running
                        uploader.deleteBlobs(jarsBlobKeys);
                        throw e;
                    } finally {
                        uploader.shutdown();
                    }
                }
            }
        } catch(TException e) {
            throw new RuntimeException(e);
        }
        invokeSubmitterHook(name, asUser, conf, topology);

    }
复制代码
  • StormSubmitter 的 submitTopologyAs 通过 NimbusClient.getConfiguredClientAs(conf, asUser) 创建 NimbusClient

NimbusClient

storm-core-1.1.0-sources.jar!/org/apache/storm/utils/NimbusClient.java

    public static NimbusClient getConfiguredClientAs(Map conf, String asUser) {
        if (conf.containsKey(Config.STORM_DO_AS_USER)) {
            if (asUser != null && !asUser.isEmpty()) {
                LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence."
                        , asUser, conf.get(Config.STORM_DO_AS_USER));
            }
            asUser = (String) conf.get(Config.STORM_DO_AS_USER);
        }

        List<String> seeds;
        if(conf.containsKey(Config.NIMBUS_HOST)) {
            LOG.warn("Using deprecated config {} for backward compatibility. Please update your storm.yaml so it only has config {}",
                     Config.NIMBUS_HOST, Config.NIMBUS_SEEDS);
            seeds = Lists.newArrayList(conf.get(Config.NIMBUS_HOST).toString());
        } else {
            seeds = (List<String>) conf.get(Config.NIMBUS_SEEDS);
        }

        for (String host : seeds) {
            int port = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString());
            NimbusSummary nimbusSummary;
            NimbusClient client = null;
            try {
                client = new NimbusClient(conf, host, port, null, asUser);
                nimbusSummary = client.getClient().getLeader();
                if (nimbusSummary != null) {
                    String leaderNimbus = nimbusSummary.get_host() + ":" + nimbusSummary.get_port();
                    LOG.info("Found leader nimbus : {}", leaderNimbus);
                    if (nimbusSummary.get_host().equals(host) && nimbusSummary.get_port() == port) {
                        NimbusClient ret = client;
                        client = null;
                        return ret;
                    }
                    try {
                        return new NimbusClient(conf, nimbusSummary.get_host(), nimbusSummary.get_port(), null, asUser);
                    } catch (TTransportException e) {
                        throw new RuntimeException("Failed to create a nimbus client for the leader " + leaderNimbus, e);
                    }
                }
            } catch (Exception e) {
                LOG.warn("Ignoring exception while trying to get leader nimbus info from " + host
                                 + ". will retry with a different seed host.", e);
                continue;
            } finally {
                if (client != null) {
                    client.close();
                }
            }
            throw new NimbusLeaderNotFoundException("Could not find a nimbus leader, please try " +
                                                            "again after some time.");
        }
        throw new NimbusLeaderNotFoundException(
                "Could not find leader nimbus from seed hosts " + seeds + ". " +
                        "Did you specify a valid list of nimbus hosts for config " +
                        Config.NIMBUS_SEEDS + "?");
    }
复制代码
  • 这里仍然兼容NIMBUS_HOST 参数,如果有 NIMBUS_HOST 参数则从中读取 seeds,没有则从 NIMBUS_SEEDS 参数获取
  • 之后遍历 seeds,根据每个 seed 创建 NimbusClient,然后调用 client.getClient().getLeader() 获取 leader 信息,如果获取成功,则判断 leader 是否当前连接的 seed,如果是则直接返回,如果不是则根据 leader 的 host 和 port 创建新的 NimbusClient 返回
  • 如果 nimbusSummary 为 null,则会抛出 NimbusLeaderNotFoundException("Could not find a nimbus leader, please try again after some time.")
  • 如果连接 leader 出现异常,则遍历下一个 seed,进行 retry 操作,如果所有 seed 都 retry 失败,则跳出循环,最后抛出 NimbusLeaderNotFoundException("Could not find leader nimbus from seed hosts " + seeds + ". Did you specify a valid list of nimbus hosts for config nimbus.seeds?")

小结

  • 对于storm client 来说,nimbus.seeds 参数用于 client 进行寻找 nimbus leader,而 nimbus.host 参数已经被废弃
  • 寻找 nimbus leader 的过程就是挨个遍历 seeds 配置的 host,进行连接,然后获取 leader 的信息,如果获取成功但是 nimbusSummary 为 null,则抛出 NimbusLeaderNotFoundException("Could not find a nimbus leader, please try again after some time.")。
  • 如果有异常则遍历下一个 seed 进行 retry,如果都不成功,则最后跳出循环,抛出 NimbusLeaderNotFoundException("Could not find leader nimbus from seed hosts " + seeds + ". Did you specify a valid list of nimbus hosts for config nimbus.seeds?")

doc

  • Setting-up-a-Storm-cluster

相关文章:

  • 深入源码分析Java线程池的实现原理
  • 第15讲 | 深入区块链技术(七):哈希与加密算法
  • Babel配置的不完全指南
  • IP数据报
  • 过了技术面却在HR面被刷?必备40问!从容应对HR,斩获N多大厂offer!
  • 如何解决 Django 前后端分离开发的跨域问题
  • JSP学习-02隐式对象
  • R1 学习记录
  • 167. Two Sum II - Input array is sorted
  • 想用Unity3D引擎技术赚点钱的看过来
  • python3爬取墨迹天气并发送给微信好友,附源码
  • 晒一晒老司机写的“超融合私有云”解决方案
  • 4种删除Word空白页的小技巧,都是你需要用到的!
  • ASP.NET Core 2.2.0-preview3 发布
  • LaTeX-用polynom宏包排版多项式的除法
  • 10个确保微服务与容器安全的最佳实践
  • iOS 颜色设置看我就够了
  • Java方法详解
  • JS笔记四:作用域、变量(函数)提升
  • js面向对象
  • MySQL主从复制读写分离及奇怪的问题
  • node入门
  • PHP的类修饰符与访问修饰符
  • SQLServer之创建显式事务
  • ViewService——一种保证客户端与服务端同步的方法
  • 订阅Forge Viewer所有的事件
  • 工作手记之html2canvas使用概述
  • 关于Flux,Vuex,Redux的思考
  • 基于HAProxy的高性能缓存服务器nuster
  • 聚类分析——Kmeans
  • 什么是Javascript函数节流?
  • 协程
  • 再次简单明了总结flex布局,一看就懂...
  • ###项目技术发展史
  • #设计模式#4.6 Flyweight(享元) 对象结构型模式
  • $var=htmlencode(“‘);alert(‘2“); 的个人理解
  • (003)SlickEdit Unity的补全
  • (2009.11版)《网络管理员考试 考前冲刺预测卷及考点解析》复习重点
  • (Arcgis)Python编程批量将HDF5文件转换为TIFF格式并应用地理转换和投影信息
  • (delphi11最新学习资料) Object Pascal 学习笔记---第8章第5节(封闭类和Final方法)
  • (function(){})()的分步解析
  • (阿里巴巴 dubbo,有数据库,可执行 )dubbo zookeeper spring demo
  • (超简单)构建高可用网络应用:使用Nginx进行负载均衡与健康检查
  • (第二周)效能测试
  • (二)PySpark3:SparkSQL编程
  • (附源码)计算机毕业设计SSM基于java的云顶博客系统
  • (每日持续更新)jdk api之FileFilter基础、应用、实战
  • (算法)Travel Information Center
  • (图)IntelliTrace Tools 跟踪云端程序
  • (一)kafka实战——kafka源码编译启动
  • (转)Android学习系列(31)--App自动化之使用Ant编译项目多渠道打包
  • (转)EXC_BREAKPOINT僵尸错误
  • (转)jdk与jre的区别
  • (转)使用VMware vSphere标准交换机设置网络连接
  • .NET Framework 服务实现监控可观测性最佳实践