当前位置: 首页 > news >正文

Storm- 使用Storm实现累积求和的操作

 

需求:1+2+3+... = ???

实现方案

  Spout发出数字作为input

  使用Bolt来处理业务逻辑:求和

  将结果输出到控制台

拓扑设计:DataSourceSpout -->SumBolt→输出

 

package com.imooc.bigdata;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;

/**
 * 使用Storm实现累计求和的操作
 */
public class LocalSumStormTopology {
    /**
     * spout 需要继承BaseRichSpout
     * 数据源需要产生数据并发射
     */
    public static class DataSourceSpout extends BaseRichSpout{

        private SpoutOutputCollector collector;

        /**
         * 初始化方法,只会被调用一次
         * @param conf 配置参数
         * @param context 上下文
         * @param collector 数据发射器
         */
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }

        int number = 0;
        /**
         * 会产生数据,在生产上肯定是从消息队列中获取数据
         *
         * 这个方法是一个死循环,会一直不停的执行
         */
        @Override
        public void nextTuple() {
            this.collector.emit(new Values(++number));

            System.out.println("Spout:"+number);

            //防止数据产生太快
            Utils.sleep(1000);
        }

        /**
         * 声明输出字段
         * @param declarer
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("num"));
        }
    }


    /**
     * 数据的累计求和Bolt:接收数据并处理
     */
    public static class SumBolt extends BaseRichBolt{

        /**
         * 初始化方法,会被执行一次
         * @param stormConf
         * @param context
         * @param collector
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }

        int sum= 0;
        /**
         * 其实也是一个死循环,职责:获取Spout发送过来的数据
         * @param input
         */
        @Override
        public void execute(Tuple input) {

            // Bolt中获取值可以根据index获取,也可以根据上一个环节中定义的field的名称获取(建议使用该方式)
            Integer value = input.getIntegerByField("num");
            sum += value;

            System.out.println("Bolt:sum = ["+sum +"]");
        }

        /**
         *
         * @param declarer
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
    }

    public static void main(String[] args) {

        // TopologyBuilder根据Spout和Bolt来构建Topology
        // Storm中任何一个作业都是通过Topology的方式进行提交的
        // Topology中需要指定Spout和Bolt的执行顺序
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("DataSourceSpout", new DataSourceSpout());
        builder.setBolt("SumBolt", new SumBolt()).shuffleGrouping("DataSourceSpout");

        // 创建一个本地的Storm集群:本地模式运行,不需要搭建Storm集群
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalSumStormTopology", new Config(), builder.createTopology());
    }


}

 

转载于:https://www.cnblogs.com/RzCong/p/9382940.html

相关文章:

  • 性能报告产生形式
  • LOJ121 动态图连通性(LCT)
  • 暗黑3有严重BUG
  • 数组(冒泡,选择,排序)
  • 游戏中的UI问题(一)
  • 排序算法之一冒泡排序
  • 游戏停止测试标准(四)
  • NYOJ 122 Triangular Sums
  • 游戏产业制作名人录(一)
  • Vue生命周期学习
  • 容错恢复测试(一)
  • fastjson转换json时,碰到的那些首字母大小写转换的坑!(转)
  • 容错恢复性测试(二)
  • [bzoj4010][HNOI2015]菜肴制作_贪心_拓扑排序
  • 测试中的单纯性划分
  • [译] 怎样写一个基础的编译器
  • __proto__ 和 prototype的关系
  • 【附node操作实例】redis简明入门系列—字符串类型
  • create-react-app做的留言板
  • ES6系统学习----从Apollo Client看解构赋值
  • javascript 哈希表
  • JavaScript 基础知识 - 入门篇(一)
  • javascript面向对象之创建对象
  • Java反射-动态类加载和重新加载
  • js算法-归并排序(merge_sort)
  • Netty+SpringBoot+FastDFS+Html5实现聊天App(六)
  • PHP CLI应用的调试原理
  • Spring Boot MyBatis配置多种数据库
  • supervisor 永不挂掉的进程 安装以及使用
  • 阿里云Kubernetes容器服务上体验Knative
  • 每天10道Java面试题,跟我走,offer有!
  • 终端用户监控:真实用户监控还是模拟监控?
  • 《码出高效》学习笔记与书中错误记录
  • #鸿蒙生态创新中心#揭幕仪式在深圳湾科技生态园举行
  • #快捷键# 大学四年我常用的软件快捷键大全,教你成为电脑高手!!
  • #中的引用型是什么意识_Java中四种引用有什么区别以及应用场景
  • $HTTP_POST_VARS['']和$_POST['']的区别
  • (2020)Java后端开发----(面试题和笔试题)
  • (9)YOLO-Pose:使用对象关键点相似性损失增强多人姿态估计的增强版YOLO
  • (LeetCode) T14. Longest Common Prefix
  • (python)数据结构---字典
  • (九)信息融合方式简介
  • (牛客腾讯思维编程题)编码编码分组打印下标题目分析
  • (四)模仿学习-完成后台管理页面查询
  • (五) 一起学 Unix 环境高级编程 (APUE) 之 进程环境
  • (一)插入排序
  • *ST京蓝入股力合节能 着力绿色智慧城市服务
  • .\OBJ\test1.axf: Error: L6230W: Ignoring --entry command. Cannot find argumen 'Reset_Handler'
  • .NET C#版本和.NET版本以及VS版本的对应关系
  • .net mvc actionresult 返回字符串_.NET架构师知识普及
  • .net redis定时_一场由fork引发的超时,让我们重新探讨了Redis的抖动问题
  • .NET 将混合了多个不同平台(Windows Mac Linux)的文件 目录的路径格式化成同一个平台下的路径
  • .NET 事件模型教程(二)
  • .NET6使用MiniExcel根据数据源横向导出头部标题及数据
  • .net之微信企业号开发(一) 所使用的环境与工具以及准备工作