当前位置: 首页 > news >正文

storm drpc实例

本文主要演示一下storm drpc实例

配置

version: '2'
services:
    supervisor:
        image: storm
        container_name: supervisor
        command: storm supervisor -c storm.local.hostname="192.168.99.100" -c drpc.servers='["192.168.99.100"]' -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774
        depends_on:
            - nimbus
            - zookeeper
        links:
            - nimbus
            - zookeeper
        restart: always
        ports:
            - 6700:6700
            - 6701:6701
            - 6702:6702
            - 6703:6703
            - 8000:8000
    drpc:
        image: storm
        container_name: drpc
        command: storm drpc -c storm.local.hostname="192.168.99.100" -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774
        depends_on:
            - nimbus
            - supervisor
            - zookeeper
        links:
            - nimbus
            - supervisor
            - zookeeper
        restart: always
        ports:
            - 3772:3772
            - 3773:3773
            - 3774:3774
  • 这里对supervisor配置drpc.servers及drpc.port、drpc.invocations.port,好让worker通过drpc.invocations.port去访问drpc节点
  • 对于drpc服务,则暴露drpc.port(好让外部的DRPCClient访问)、drpc.invocations.port(让worker访问)

TridentTopology

    @Test
    public void testDeployDRPCStateQuery() throws InterruptedException, TException {
        TridentTopology topology = new TridentTopology();
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
                new Values("the cow jumped over the moon"),
                new Values("the man went to the store and bought some candy"),
                new Values("four score and seven years ago"),
                new Values("how many apples can you eat"));
        spout.setCycle(true);
        TridentState wordCounts =
                topology.newStream("spout1", spout)
                        .each(new Fields("sentence"), new Split(), new Fields("word"))
                        .groupBy(new Fields("word"))
                        //NOTE transforms a Stream into a TridentState object
                        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
                        .parallelismHint(6);

        topology.newDRPCStream("words")
                .each(new Fields("args"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
                .each(new Fields("count"), new FilterNull())
                .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

        StormTopology stormTopology = topology.build();

        //远程提交 mvn clean package -Dmaven.test.skip=true
        //storm默认会使用System.getProperty("storm.jar")去取,如果不设定,就不能提交
        System.setProperty("storm.jar",TOPOLOGY_JAR);

        Config conf = new Config();
        conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus连接主机地址,比如:192.168.10.1
        conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus连接端口,默认 6627
        conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper连接主机地址,可以使用集合存放多个
        conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper连接端口,默认2181

        StormSubmitter.submitTopology("DRPCStateQuery", conf, stormTopology);
    }
  • 这里newStream创建了一个TridentState,然后newDRPCStream创建了一个DRPCStream,其stateQuery指定为前面创建的TridentState
  • 由于TridentState把结果存储到了MemoryMapState,因而这里的DRPCStream通过drpc进行stateQuery

DRPCClient

    @Test
    public void testLaunchDrpcClient() throws TException {
        Config conf = new Config();
        //NOTE 要设置Config.DRPC_THRIFT_TRANSPORT_PLUGIN属性,不然client直接跑空指针
        conf.put(Config.DRPC_THRIFT_TRANSPORT_PLUGIN,SimpleTransportPlugin.class.getName());
        conf.put(Config.STORM_NIMBUS_RETRY_TIMES,3);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL,10000);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING,10000);
        conf.put(Config.DRPC_MAX_BUFFER_SIZE, 104857600); // 100M
        DRPCClient client = new DRPCClient(conf, "192.168.99.100", 3772);
        System.out.println(client.execute("words", "cat dog the man"));
    }
  • 注意这里的配置项不能少,否则会引发空指针
  • Config.DRPC_THRIFT_TRANSPORT_PLUGIN这里使用的是SimpleTransportPlugin.class.getName(),虽然该类被废弃了,不过还可以跑通
  • 由于使用了SimpleTransportPlugin.class,因而这里要配置Config.DRPC_MAX_BUFFER_SIZE
  • DRPCClient配置了drpc的地址及port
  • client.execute这里要传入newDRPCStream指定的function名称

小结

  • 使用drpc的时候,需要通过storm drpc启动drpc server服务节点,另外要暴露两个端口,一个drpc.port是供外部DRPCClient调用,一个drpc.invocations.port是给worker来访问;drpc.http.port端口是暴露给http协议调用的(DRPCClient使用的是thrift协议调用)
  • supervisor要配置drpc.servers、drpc.invocations.port,好让worker去访问到drpc server
  • DRPCClient使用drpc.port指定的端口来访问,另外client.execute这里要传入newDRPCStream指定的function名称

doc

  • Trident Tutorial
  • Distributed RPC
  • Running Apache Storm Securely

相关文章:

  • 监控CPU(一)
  • RIP
  • Lintcode104 Merge k Sorted Lists solution 题解
  • jQuery基础一
  • heartbeat主配置文件
  • Mysql5.6到5.7升级需要以下操作
  • GDI+绘制极坐标图(Polar Diagram)
  • RHEL7/CentOS7 PXE+Kickstart自动化系统安装
  • Netty环境搭建(源码死磕2)
  • [20171113]修改表结构删除列相关问题4.txt
  • 2018.10.24-dtoj-3984 玩具(toy)
  • lvs健康检查
  • 使用vue-cli创建一个vue项目
  • 替换vShield Manager 5.0证书
  • Good Inflation SPOJ - GOODG 李超树
  • [译] React v16.8: 含有Hooks的版本
  • 【comparator, comparable】小总结
  • Android优雅地处理按钮重复点击
  • docker容器内的网络抓包
  • go语言学习初探(一)
  • in typeof instanceof ===这些运算符有什么作用
  • LeetCode算法系列_0891_子序列宽度之和
  • node学习系列之简单文件上传
  • rc-form之最单纯情况
  • SAP云平台里Global Account和Sub Account的关系
  • SpringBoot几种定时任务的实现方式
  • Sublime Text 2/3 绑定Eclipse快捷键
  • VUE es6技巧写法(持续更新中~~~)
  • vue脚手架vue-cli
  • 产品三维模型在线预览
  • 订阅Forge Viewer所有的事件
  • 给自己的博客网站加上酷炫的初音未来音乐游戏?
  • 开发了一款写作软件(OSX,Windows),附带Electron开发指南
  • 前端知识点整理(待续)
  • 使用Maven插件构建SpringBoot项目,生成Docker镜像push到DockerHub上
  • 用 vue 组件自定义 v-model, 实现一个 Tab 组件。
  • 你对linux中grep命令知道多少?
  • 《码出高效》学习笔记与书中错误记录
  • Java性能优化之JVM GC(垃圾回收机制)
  • ​​​​​​​GitLab 之 GitLab-Runner 安装,配置与问题汇总
  • ​​​​​​​Installing ROS on the Raspberry Pi
  • ​html.parser --- 简单的 HTML 和 XHTML 解析器​
  • (delphi11最新学习资料) Object Pascal 学习笔记---第7章第3节(封装和窗体)
  • (python)数据结构---字典
  • (分布式缓存)Redis分片集群
  • (附源码)ssm码农论坛 毕业设计 231126
  • (原創) 博客園正式支援VHDL語法著色功能 (SOC) (VHDL)
  • (转)Google的Objective-C编码规范
  • (转)winform之ListView
  • (转载)虚幻引擎3--【UnrealScript教程】章节一:20.location和rotation
  • ... 是什么 ?... 有什么用处?
  • .MyFile@waifu.club.wis.mkp勒索病毒数据怎么处理|数据解密恢复
  • .net core 6 使用注解自动注入实例,无需构造注入 autowrite4net
  • .NET Core 中的路径问题
  • .NET Core中Emit的使用