2019独角兽企业重金招聘Python工程师标准>>>
之前写的storm消费kafka代码,记录一下
处理数据bolt
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.huawei.day2;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class DealBolt extends BaseBasicBolt {
Map m = new HashMap();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//System.out.println(tuple.getValues());
//System.out.println(tuple.getValue(4) );
collector.emit(new Values(tuple.getValue(4) ));
//collector.emit(new Values( tuple.getValue(4) ));
}
@Override
public void cleanup() {
System.out.println("---------------------------------------------------------------------------------------------------------------");
System.out.println(m);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("mobile"));
// TODO Auto-generated method stub
}
}
计算数据bolt
package com.huwei.bolt;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.TupleWindow;
public class SlidingWindowBolt extends BaseWindowedBolt {
OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
@Override
public void execute(TupleWindow inputWindow) {
List<Tuple> list = inputWindow.get();
if (list!=null ){
Map<Object, AtomicLong> m=new HashMap<Object, AtomicLong>();
for (Tuple input : list){
Object o = input.getValueByField("mobile");
AtomicLong count=m.get(o);
if ( count==null ) {
count=new AtomicLong(1l);
}else{
count.addAndGet(1l);
}
m.put(o,count);
};
collector.emit(new Values( m));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("result" ));
}
}
输出bolt
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.huawei.day2;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
public class PrinterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println( tuple.getValues() );
}
@Override
public void cleanup() {
System.out.println("---------------------------------------------------------------------------------------------------------------");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//declarer.declare(new Fields("mobile"));
// TODO Auto-generated method stub
}
}
消费代码,时间窗口
package com.huwei.consumer;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt.Duration;
import com.huawei.day2.DealBolt;
import com.huawei.day2.PrinterBolt;
import com.huwei.bolt.SlidingWindowBolt;
public class StormConsumer {
public static void main(String[] args){
KafkaSpout<String, String> kafkaSpout=new KafkaSpout<String,String>(KafkaSpoutConfig.builder("localhost:9092", "test")
.setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true").setProp( ConsumerConfig.GROUP_ID_CONFIG , "test").build());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", kafkaSpout );
builder.setBolt("dealBolt", new DealBolt(),1) .shuffleGrouping("spout");
builder.setBolt("slidingWindowBolt", new SlidingWindowBolt().withWindow(new Duration(3600*24, TimeUnit.SECONDS ), new Duration(1, TimeUnit.SECONDS)),1).shuffleGrouping("dealBolt");
builder.setBolt("print", new PrinterBolt(),1).shuffleGrouping("slidingWindowBolt");
Config conf = new Config();
conf.setDebug(false);
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
//conf.put(conf.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
LocalCluster cluster = new LocalCluster();
StormTopology topology = builder.createTopology();
cluster.submitTopology("test", conf, topology);
// Utils.sleep(6000000);
//cluster.killTopology("test");
//cluster.shutdown();
}
}
时间窗口