当前位置: 首页 > news >正文

storm togolopy转换jstorm topology

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

本文参照https://my.oschina.net/shyloveliyi/blog/785812中代码,进行转换。

1、转换依赖

    storm

<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-core</artifactId>
			<version>1.0.2</version>
		</dependency>

    转换成

<dependency>
			<groupId>com.alibaba.jstorm</groupId>
			<artifactId>jstorm-core</artifactId>
			<version>2.1.0</version>
			<scope>provided</scope>
		</dependency>

2、切换import

    转换依赖后,项目报错,挨个打开代码,将import全部删掉,然后重新导入依赖,eclipse快捷键ctrl+shift+o,以下是切换后的代码:

    FromMysqlSpout

package scc.storm;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 * @ClassName: MysqlSpout
 * @Description: TODO(这里用一句话描述这个类的作用)
 * @author shangchengcai@voole.com
 * @date 2016年11月10日 下午4:26:38
 * 
 */
public class FromMysqlSpout extends BaseRichSpout {
    private Map conf;
    private TopologyContext context;
    private SpoutOutputCollector collector;

    /*
     * (非 Javadoc) <p>Title: open</p> <p>Description: </p>
     * 
     * @param conf
     * 
     * @param context
     * 
     * @param collector
     * 
     * @see org.apache.storm.spout.ISpout#open(java.util.Map, org.apache.storm.task.TopologyContext,
     * org.apache.storm.spout.SpoutOutputCollector)
     */
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.conf = conf;
        this.collector = collector;
        this.context = context;
    }

    /*
     * (非 Javadoc) <p>Title: nextTuple</p> <p>Description: </p>
     * 
     * @see org.apache.storm.spout.ISpout#nextTuple()
     */
    public void nextTuple() {
        JdbcUtils jdbcUtils = new JdbcUtils();
        try {
            if (new Random().nextInt(100) > 50) {
                return;
            }
            List<Map<String, Object>> data = jdbcUtils.findModeResult("select * from sds limit 1",
                    new ArrayList<Object>());
            this.collector.emit(new Values(data));
        } catch (SQLException e) {
            e.printStackTrace();
            this.collector.reportError(e);
        }
    }

    /*
     * (非 Javadoc) <p>Title: declareOutputFields</p> <p>Description: </p>
     * 
     * @param declarer
     * 
     * @see org.apache.storm.topology.IComponent#declareOutputFields(org.apache.storm.topology.OutputFieldsDeclarer)
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("data"));
    }

}

ToFileBolt

package scc.storm;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

/**
 * @ClassName: ToFileBolt
 * @Description: TODO(这里用一句话描述这个类的作用)
 * @author shangchengcai@voole.com
 * @date 2016年11月10日 下午4:44:09
 * 
 */
public class ToFileBolt extends BaseRichBolt {
    private Map conf;
    private TopologyContext context;
    private OutputCollector collector;

    /*
     * (非 Javadoc) <p>Title: prepare</p> <p>Description: </p>
     * 
     * @param stormConf
     * 
     * @param context
     * 
     * @param collector
     * 
     * @see org.apache.storm.task.IBolt#prepare(java.util.Map, org.apache.storm.task.TopologyContext,
     * org.apache.storm.task.OutputCollector)
     */
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.conf = this.conf;
        this.collector = collector;
        this.context = context;
    }

    /*
     * (非 Javadoc) <p>Title: execute</p> <p>Description: </p>
     * 
     * @param input
     * 
     * @see org.apache.storm.task.IBolt#execute(org.apache.storm.tuple.Tuple)
     */
    public void execute(Tuple input) {
        List<Map<String, Object>> data = (List<Map<String, Object>>) input.getValueByField("data");
        String outdata = data.toString() + "\r\n";
        File file = new File("\\opt\\jstorm\\stormtest.txt");
        if (!file.exists()) {
            new File("\\opt\\jstorm").mkdirs();
            try {
                new File("\\opt\\jstorm\\stormtest.txt").createNewFile();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        FileOutputStream fos;
        try {
            fos = new FileOutputStream(file, true);
            fos.write(outdata.getBytes());
            fos.flush();
            fos.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /*
     * (非 Javadoc) <p>Title: declareOutputFields</p> <p>Description: </p>
     * 
     * @param declarer
     * 
     * @see org.apache.storm.topology.IComponent#declareOutputFields(org.apache.storm.topology.OutputFieldsDeclarer)
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }

}

MyTopology

package scc.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

/**
 * @ClassName: MyTopology
 * @Description: TODO(这里用一句话描述这个类的作用)
 * @author shangchengcai@voole.com
 * @date 2016年11月10日 下午4:52:05
 * 
 */
public class MyTopology {
    public static void main(String[] args)
            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("frommysql", new FromMysqlSpout());
        builder.setBolt("tofile", new ToFileBolt()).shuffleGrouping("frommysql");
        Config conf = new Config();
        conf.setDebug(true);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("firstTopo", conf, builder.createTopology());
            Utils.sleep(100000);
            cluster.killTopology("firstTopo");
            cluster.shutdown();
        }
    }
}

JdbcUtils

/**   
 *
 * @Title: HiveJdbcTest.java 
 * @Package com.scc.hive 
 * @Description: TODO(用一句话描述该文件做什么) 
 * @author scc
 * @date 2016年11月9日 上午10:16:32   
 */
package scc.storm;

import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 *
 * @ClassName: HiveJdbcTest
 * @Description: TODO(这里用一句话描述这个类的作用)
 * @author scc
 * @date 2016年11月9日 上午10:16:32
 * 
 */
public class JdbcUtils {
    // 数据库用户名
    private static final String USERNAME = "root";
    // 数据库密码
    private static final String PASSWORD = "1234";
    // 驱动信息
    private static final String DRIVER = "com.mysql.jdbc.Driver";
    // 数据库地址
    private static final String URL = "jdbc:mysql://10.5.3.24:3306/hive";
    private Connection connection;
    private PreparedStatement pstmt;
    private ResultSet resultSet;

    public JdbcUtils() {
        // TODO Auto-generated constructor stub
        try {
            Class.forName(DRIVER);
            System.out.println("数据库连接成功!");
            this.getConnection();
        } catch (Exception e) {

        }
    }

    /**
     * 获得数据库的连接
     * 
     * @return
     */
    public Connection getConnection() {
        try {
            this.connection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return this.connection;
    }

    /**
     * 增加、删除、改
     * 
     * @param sql
     * @param params
     * @return
     * @throws SQLException
     */
    public boolean updateByPreparedStatement(String sql, List<Object> params) throws SQLException {
        boolean flag = false;
        int result = -1;
        this.pstmt = this.connection.prepareStatement(sql);
        int index = 1;
        if (params != null && !params.isEmpty()) {
            for (int i = 0; i < params.size(); i++) {
                this.pstmt.setObject(index++, params.get(i));
            }
        }
        result = this.pstmt.executeUpdate();
        flag = result > 0 ? true : false;
        return flag;
    }

    /**
     * 查询单条记录
     * 
     * @param sql
     * @param params
     * @return
     * @throws SQLException
     */
    public Map<String, Object> findSimpleResult(String sql, List<Object> params) throws SQLException {
        Map<String, Object> map = new HashMap<String, Object>();
        int index = 1;
        this.pstmt = this.connection.prepareStatement(sql);
        if (params != null && !params.isEmpty()) {
            for (int i = 0; i < params.size(); i++) {
                this.pstmt.setObject(index++, params.get(i));
            }
        }
        this.resultSet = this.pstmt.executeQuery();// 返回查询结果
        ResultSetMetaData metaData = this.resultSet.getMetaData();
        int col_len = metaData.getColumnCount();
        while (this.resultSet.next()) {
            for (int i = 0; i < col_len; i++) {
                String cols_name = metaData.getColumnName(i + 1);
                Object cols_value = this.resultSet.getObject(cols_name);
                if (cols_value == null) {
                    cols_value = "";
                }
                map.put(cols_name, cols_value);
            }
        }
        return map;
    }

    /**
     * 查询多条记录
     * 
     * @param sql
     * @param params
     * @return
     * @throws SQLException
     */
    public List<Map<String, Object>> findModeResult(String sql, List<Object> params) throws SQLException {
        List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
        int index = 1;
        this.pstmt = this.connection.prepareStatement(sql);
        if (params != null && !params.isEmpty()) {
            for (int i = 0; i < params.size(); i++) {
                this.pstmt.setObject(index++, params.get(i));
            }
        }
        this.resultSet = this.pstmt.executeQuery();
        ResultSetMetaData metaData = this.resultSet.getMetaData();
        int cols_len = metaData.getColumnCount();
        while (this.resultSet.next()) {
            Map<String, Object> map = new HashMap<String, Object>();
            for (int i = 0; i < cols_len; i++) {
                String cols_name = metaData.getColumnName(i + 1);
                Object cols_value = this.resultSet.getObject(cols_name);
                if (cols_value == null) {
                    cols_value = "";
                }
                map.put(cols_name, cols_value);
            }
            list.add(map);
        }
        // this.connection.close();
        return list;
    }

    /**
     * 通过反射机制查询单条记录
     * 
     * @param sql
     * @param params
     * @param cls
     * @return
     * @throws Exception
     */
    public <T> T findSimpleRefResult(String sql, List<Object> params, Class<T> cls) throws Exception {
        T resultObject = null;
        int index = 1;
        this.pstmt = this.connection.prepareStatement(sql);
        if (params != null && !params.isEmpty()) {
            for (int i = 0; i < params.size(); i++) {
                this.pstmt.setObject(index++, params.get(i));
            }
        }
        this.resultSet = this.pstmt.executeQuery();
        ResultSetMetaData metaData = this.resultSet.getMetaData();
        int cols_len = metaData.getColumnCount();
        while (this.resultSet.next()) {
            // 通过反射机制创建一个实例
            resultObject = cls.newInstance();
            for (int i = 0; i < cols_len; i++) {
                String cols_name = metaData.getColumnName(i + 1);
                Object cols_value = this.resultSet.getObject(cols_name);
                if (cols_value == null) {
                    cols_value = "";
                }
                Field field = cls.getDeclaredField(cols_name);
                field.setAccessible(true); // 打开javabean的访问权限
                field.set(resultObject, cols_value);
            }
        }
        return resultObject;

    }

    /**
     * 通过反射机制查询多条记录
     * 
     * @param sql
     * @param params
     * @param cls
     * @return
     * @throws Exception
     */
    public <T> List<T> findMoreRefResult(String sql, List<Object> params, Class<T> cls) throws Exception {
        List<T> list = new ArrayList<T>();
        int index = 1;
        this.pstmt = this.connection.prepareStatement(sql);
        if (params != null && !params.isEmpty()) {
            for (int i = 0; i < params.size(); i++) {
                this.pstmt.setObject(index++, params.get(i));
            }
        }
        this.resultSet = this.pstmt.executeQuery();
        ResultSetMetaData metaData = this.resultSet.getMetaData();
        int cols_len = metaData.getColumnCount();
        while (this.resultSet.next()) {
            // 通过反射机制创建一个实例
            T resultObject = cls.newInstance();
            for (int i = 0; i < cols_len; i++) {
                String cols_name = metaData.getColumnName(i + 1);
                Object cols_value = this.resultSet.getObject(cols_name);
                if (cols_value == null) {
                    cols_value = "";
                }
                Field field = cls.getDeclaredField(cols_name);
                field.setAccessible(true); // 打开javabean的访问权限
                field.set(resultObject, cols_value);
            }
            list.add(resultObject);
        }
        return list;
    }

    /**
     * 释放数据库连接
     */
    public void releaseConn() {
        if (this.resultSet != null) {
            try {
                this.resultSet.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        System.out.println(new JdbcUtils().getConnection());
    }
}

这里补充一下打包及运行操作

1、打包

    使用maven对项目执行

mvn clean package

2、运行

    将打包上传至服务器,然后执行

#jstorm jar jar包地址 main方法主类 参数(空格分割)
jstorm jar /opt/hadoop/loganalyst/storm-dependend/data/teststorm-1.0.jar teststorm.TopologyMain /opt/hadoop/loganalyst/storm-dependend/data/words.txt

3、查看

    一种方式是执行

./jstorm list

    另一种是在webui界面上查看

转载于:https://my.oschina.net/shyloveliyi/blog/789565

相关文章:

  • maven公共仓库
  • JavaScript函数表达式
  • lolcat彩虹色输出文本
  • Bootstrap学习笔记系列5------Bootstrap图片显示
  • 二.第十一单元 系统恢复
  • linux学习笔记8
  • MarkdownPad2.5 注册码
  • Git 菜鸟记录
  • DTMF三种模式(SIPINFO,RFC2833,INBAND)
  • Oracle学习(一):Oracle数据库基础
  • js 单例
  • Nginx 指定不产生日志类型(不记录图片日志)
  • php对二维数组求差集
  • uplodidy代码
  • jQery简单Tab选项卡效果
  • [译]Python中的类属性与实例属性的区别
  • Docker 笔记(1):介绍、镜像、容器及其基本操作
  • es的写入过程
  • input的行数自动增减
  • JavaScript-Array类型
  • JS数组方法汇总
  • Less 日常用法
  • PHP面试之三:MySQL数据库
  • vue和cordova项目整合打包,并实现vue调用android的相机的demo
  • 等保2.0 | 几维安全发布等保检测、等保加固专版 加速企业等保合规
  • 力扣(LeetCode)56
  • 正则表达式
  • elasticsearch-head插件安装
  • 关于Kubernetes Dashboard漏洞CVE-2018-18264的修复公告
  • ​虚拟化系列介绍(十)
  • # 睡眠3秒_床上这样睡觉的人,睡眠质量多半不好
  • #DBA杂记1
  • #Linux(权限管理)
  • (3)(3.5) 遥测无线电区域条例
  • (7)STL算法之交换赋值
  • (android 地图实战开发)3 在地图上显示当前位置和自定义银行位置
  • (C语言)fread与fwrite详解
  • (Redis使用系列) SpringBoot 中对应2.0.x版本的Redis配置 一
  • (大众金融)SQL server面试题(1)-总销售量最少的3个型号的车及其总销售量
  • (附源码)springboot青少年公共卫生教育平台 毕业设计 643214
  • (转)大型网站架构演变和知识体系
  • (转载)Linux 多线程条件变量同步
  • .cfg\.dat\.mak(持续补充)
  • .java 9 找不到符号_java找不到符号
  • .NET Core 实现 Redis 批量查询指定格式的Key
  • .net/c# memcached 获取所有缓存键(keys)
  • .Net程序帮助文档制作
  • .NET导入Excel数据
  • .NET多线程执行函数
  • .NET开源项目介绍及资源推荐:数据持久层
  • .NET框架
  • [ element-ui:table ] 设置table中某些行数据禁止被选中,通过selectable 定义方法解决
  • [\u4e00-\u9fa5] //匹配中文字符
  • [ABC294Ex] K-Coloring
  • [Android] Upload package to device fails #2720