当前位置: 首页 > news >正文

发一个 storm 小分代码 kafka

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

之前写的storm消费kafka代码,记录一下

处理数据bolt

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.huawei.day2;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class DealBolt extends BaseBasicBolt {
	Map m = new HashMap();
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
 		//System.out.println(tuple.getValues());
 		//System.out.println(tuple.getValue(4) );
 		collector.emit(new Values(tuple.getValue(4) ));
 		//collector.emit(new Values( tuple.getValue(4) ));
	}
	@Override
	public void cleanup() {
		System.out.println("---------------------------------------------------------------------------------------------------------------");
		System.out.println(m);
	}
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("mobile"));
		// TODO Auto-generated method stub
	}
}

计算数据bolt

package com.huwei.bolt;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.TupleWindow;

public class SlidingWindowBolt extends BaseWindowedBolt {
	 OutputCollector collector;
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector=collector;
  	}
	@Override
	public void execute(TupleWindow inputWindow) {
		List<Tuple> list = inputWindow.get();
		if (list!=null   ){
			Map<Object, AtomicLong> m=new  HashMap<Object, AtomicLong>(); 
			for (Tuple input : list){
				Object o = input.getValueByField("mobile");
				AtomicLong count=m.get(o);
				if ( count==null ) {
					count=new AtomicLong(1l);
					 
				}else{
					count.addAndGet(1l);
				}
				m.put(o,count);
			};
			collector.emit(new Values( m));	
		}
	}
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("result" ));
	}
}

输出bolt

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.huawei.day2;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;

public class PrinterBolt extends BaseBasicBolt {
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		 System.out.println( tuple.getValues()  );
 	}
	@Override
	public void cleanup() {
		System.out.println("---------------------------------------------------------------------------------------------------------------");
	}
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		//declarer.declare(new Fields("mobile"));
		// TODO Auto-generated method stub
	}
}

 

消费代码,时间窗口

package com.huwei.consumer;

import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt.Duration;

import com.huawei.day2.DealBolt;
import com.huawei.day2.PrinterBolt;
import com.huwei.bolt.SlidingWindowBolt;

public class StormConsumer {
	public static void main(String[] args){
		KafkaSpout<String, String> kafkaSpout=new KafkaSpout<String,String>(KafkaSpoutConfig.builder("localhost:9092", "test")
		        .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true").setProp(  ConsumerConfig.GROUP_ID_CONFIG , "test").build());
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("spout", kafkaSpout );
        builder.setBolt("dealBolt", new DealBolt(),1)  .shuffleGrouping("spout");
        builder.setBolt("slidingWindowBolt", new SlidingWindowBolt().withWindow(new Duration(3600*24, TimeUnit.SECONDS  ), new Duration(1, TimeUnit.SECONDS)),1).shuffleGrouping("dealBolt");
        builder.setBolt("print", new PrinterBolt(),1).shuffleGrouping("slidingWindowBolt");
		Config conf = new Config();
		conf.setDebug(false);
		conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
		//conf.put(conf.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
		LocalCluster cluster = new LocalCluster();
		StormTopology topology = builder.createTopology();
		cluster.submitTopology("test", conf, topology);
		// Utils.sleep(6000000);
		//cluster.killTopology("test");
		//cluster.shutdown();
	}
}

 时间窗口

 

转载于:https://my.oschina.net/internetafei/blog/1806264

相关文章:

  • Windows系统基本设置
  • MySQL单机多实例配置实战
  • Java获取泛型T的类型 T.class
  • OSChina 周六乱弹 —— 他曾经是个王者,后来出车祸了
  • 环境搭建,8种基本类型,Static,package和import,log4j
  • 矩阵快速幂求斐波那契数列
  • Appium 自动化测试环境部署篇
  • 纯OC实现iOS DLNA投屏功能了解一下
  • Odoo domain写法及运用
  • jQuery学习小结
  • 重塑旅游业的颠覆者:虚拟现实技术和人工智能
  • IntelliJ IDEA 乱码解决方案 (项目代码、控制台等)
  • Confluence 6 升级自定义的站点和空间应用你的自定义布局
  • php对字符串的操作
  • js,H5本地存储
  • 【面试系列】之二:关于js原型
  • Angular 响应式表单之下拉框
  • angular学习第一篇-----环境搭建
  • canvas 绘制双线技巧
  • CAP理论的例子讲解
  • ES10 特性的完整指南
  • HTML中设置input等文本框为不可操作
  • jdbc就是这么简单
  • JS实现简单的MVC模式开发小游戏
  • PHP 小技巧
  • quasar-framework cnodejs社区
  • Redux系列x:源码分析
  • Shadow DOM 内部构造及如何构建独立组件
  • spark本地环境的搭建到运行第一个spark程序
  • SpiderData 2019年2月23日 DApp数据排行榜
  • Spring Cloud(3) - 服务治理: Spring Cloud Eureka
  • text-decoration与color属性
  • vue数据传递--我有特殊的实现技巧
  • 分布式任务队列Celery
  • 基于Javascript, Springboot的管理系统报表查询页面代码设计
  • 前端面试之CSS3新特性
  • 前嗅ForeSpider采集配置界面介绍
  • 深度学习入门:10门免费线上课程推荐
  • 数组的操作
  • 学习笔记:对象,原型和继承(1)
  • 正则表达式
  • 2017年360最后一道编程题
  • #Linux杂记--将Python3的源码编译为.so文件方法与Linux环境下的交叉编译方法
  • (4)事件处理——(7)简单事件(Simple events)
  • (52)只出现一次的数字III
  • (C语言)求出1,2,5三个数不同个数组合为100的组合个数
  • (iPhone/iPad开发)在UIWebView中自定义菜单栏
  • (Java岗)秋招打卡!一本学历拿下美团、阿里、快手、米哈游offer
  • (react踩过的坑)antd 如何同时获取一个select 的value和 label值
  • (阿里巴巴 dubbo,有数据库,可执行 )dubbo zookeeper spring demo
  • (附源码)springboot助农电商系统 毕业设计 081919
  • (论文阅读22/100)Learning a Deep Compact Image Representation for Visual Tracking
  • (七)理解angular中的module和injector,即依赖注入
  • (三) prometheus + grafana + alertmanager 配置Redis监控
  • (四)docker:为mysql和java jar运行环境创建同一网络,容器互联