当前位置: 首页 > news >正文

storm-kafka(storm spout作为kafka的消费端)

storm是grovvy写的

kafka是scala写的

storm-kafka  storm连接kafka consumer的插件

下载地址:

https://github.com/wurstmeister/storm-kafka-0.8-plus




除了需要storm和kafka相关jar包还需要google-collections-1.0.jar

以及zookeeper相关包 curator-framework-1.3.3.jar和curator-client-1.3.3.jar

以前由com.netflix.curator组织开发现在归到org.apache.curator下面



1.Kafka Consumer即Storm Spout代码

package demo;

import java.util.ArrayList;
import java.util.List;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;

public class MyKafkaSpout {
public static void main(String[] args) {
    
    String topic ="track";
    ZkHosts zkhosts  = new ZkHosts("192.168.1.107:2181,192.168.1.108:2181,192.168.1.109:2181");
    
    SpoutConfig spoutConfig = new SpoutConfig(zkhosts, topic,
            "/MyKafka", //偏移量offset的根目录
            "MyTrack");//子目录对应一个应用    
    List<String> zkServers=new ArrayList<String>();
    //zkServers.add("192.168.1.107");
    //zkServers.add("192.168.1.108");
    for(String host:zkhosts.brokerZkStr.split(","))
    {
        zkServers.add(host.split(":")[0]);
    }
    
    spoutConfig.zkServers=zkServers;
    spoutConfig.zkPort=2181;
    spoutConfig.forceFromStart=true;//从头开始消费,实际上是要改成false的
    spoutConfig.socketTimeoutMs=60;
    spoutConfig.scheme=new SchemeAsMultiScheme(new StringScheme());//定义输出为string类型
    
    TopologyBuilder builder=new TopologyBuilder();
    builder.setSpout("spout", new KafkaSpout(spoutConfig),1);//引用spout,并发度设为1
    builder.setBolt("bolt1", new MyKafkaBolt(),1).shuffleGrouping("spout");
    
    Config config =new Config();
    config.setDebug(true);//上线之前都要改成false否则日志会非常多
    if(args.length>0){
        
        try {
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } catch (AlreadyAliveException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InvalidTopologyException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }else{
        
        LocalCluster localCluster=new LocalCluster();
        localCluster.submitTopology("mytopology", config,  builder.createTopology());
        //本地模式在一个进程里面模拟一个storm集群的所有功能
    }
    
    
    
}
}


2.Bolt代码只是简单打印输出,覆写execute方法即可

package demo;

import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class MyKafkaBolt implements IBasicBolt {

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        // TODO Auto-generated method stub

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

    @Override
    public void execute(Tuple input, BasicOutputCollector arg1) {
    String kafkaMsg =input.getString(0);
    System.err.println("bolt"+kafkaMsg);

    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1) {
        // TODO Auto-generated method stub

    }

}






本文出自 “点滴积累” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077/1701258

相关文章:

  • js没有重载
  • 【索引】Oracle之不可见索引和虚拟索引的比对
  • 分区
  • class文件概述
  • 关于 LVM 逻辑卷管理
  • mysql学习之旅-数据库自动备份和手动恢复
  • 寻找二叉查找树中的下一个结点
  • nginx的安装及基本配置,及多个域名服务
  • 滚动字幕标记marquee/marquee
  • 观察者模式实现非直接耦合
  • 怎么把Maven项目转为动态Web项目?
  • mysql外键的使用
  • 08.Switch的使用方法
  • Python学习笔记11—函数
  • iOS - AppStores App 上架
  • 【从零开始安装kubernetes-1.7.3】2.flannel、docker以及Harbor的配置以及作用
  • C++类中的特殊成员函数
  • canvas实际项目操作,包含:线条,圆形,扇形,图片绘制,图片圆角遮罩,矩形,弧形文字...
  • Druid 在有赞的实践
  • express.js的介绍及使用
  • Java新版本的开发已正式进入轨道,版本号18.3
  • Python中eval与exec的使用及区别
  • SpriteKit 技巧之添加背景图片
  • SQLServer之创建显式事务
  • TypeScript实现数据结构(一)栈,队列,链表
  • Yeoman_Bower_Grunt
  • 构建工具 - 收藏集 - 掘金
  • 回顾2016
  • 解析带emoji和链接的聊天系统消息
  • 前端相关框架总和
  • 数据库写操作弃用“SELECT ... FOR UPDATE”解决方案
  • 移动端 h5开发相关内容总结(三)
  • 原生js练习题---第五课
  • elasticsearch-head插件安装
  • ​低代码平台的核心价值与优势
  • # 再次尝试 连接失败_无线WiFi无法连接到网络怎么办【解决方法】
  • #LLM入门|Prompt#2.3_对查询任务进行分类|意图分析_Classification
  • (Matalb回归预测)PSO-BP粒子群算法优化BP神经网络的多维回归预测
  • (二)什么是Vite——Vite 和 Webpack 区别(冷启动)
  • (四)Controller接口控制器详解(三)
  • (淘宝无限适配)手机端rem布局详解(转载非原创)
  • (转)程序员疫苗:代码注入
  • (转)详解PHP处理密码的几种方式
  • (转载)深入super,看Python如何解决钻石继承难题
  • (转载)微软数据挖掘算法:Microsoft 时序算法(5)
  • **PyTorch月学习计划 - 第一周;第6-7天: 自动梯度(Autograd)**
  • .desktop 桌面快捷_Linux桌面环境那么多,这几款优秀的任你选
  • .gitignore文件—git忽略文件
  • .NET 8.0 发布到 IIS
  • .Net Core/.Net6/.Net8 ,启动配置/Program.cs 配置
  • .net FrameWork简介,数组,枚举
  • .NET Standard 支持的 .NET Framework 和 .NET Core
  • .net连接MySQL的方法
  • .NET运行机制
  • .pyc文件还原.py文件_Python什么情况下会生成pyc文件?