当前位置: 首页 > news >正文

15-Flink实战项目之实时热销排行

戳更多文章:

1-Flink入门

2-本地环境搭建&构建第一个Flink应用

3-DataSet API 4-DataSteam API

5-集群部署

6-分布式缓存

7-重启策略

8-Flink中的窗口

9-Flink中的Time

需求

某个图书网站,希望看到双十一秒杀期间实时的热销排行榜单。我们可以将“实时热门商品”翻译成程序员更好理解的需求:每隔5秒钟输出最近一小时内点击量最多的前 N 个商品/图书.

需求分解

将这个需求进行分解我们大概要做这么几件事情:

  • 告诉 Flink 框架基于时间做窗口,我们这里用processingTime,不用自带时间戳
  • 过滤出图书点击行为数据
  • 按一小时的窗口大小,每5秒钟统计一次,做滑动窗口聚合(Sliding Window)
  • 聚合,输出窗口中点击量前N名的商品

代码实现

向Kafka发消息模拟购买事件

public class KafkaProducer {


    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
	    FlinkKafkaProducer<String> producer = new FlinkKafkaProducer("topn",new SimpleStringSchema(),properties);
/*
        //event-timestamp事件的发生时间
        producer.setWriteTimestampToKafka(true);
*/
        text.addSink(producer);
        env.execute();
    }
}//
复制代码

其中的:MyNoParalleSource 是作者自己实现的一个并行度为1的发送器,用来向kafka发送数据:

public class MyNoParalleSource implements SourceFunction<String> {//1

    //private long count = 1L;
    private boolean isRunning = true;

    /**
     * 主要的方法
     * 启动一个source
     * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while(isRunning){
            //图书的排行榜
            List<String> books = new ArrayList<>();
            books.add("Pyhton从入门到放弃");//10
            books.add("Java从入门到放弃");//8
            books.add("Php从入门到放弃");//5
            books.add("C++从入门到放弃");//3
            books.add("Scala从入门到放弃");//0-4
            int i = new Random().nextInt(5);
            ctx.collect(books.get(i));

            //每1秒产生一条数据
            Thread.sleep(1000);
        }
    }
    //取消一个cancel的时候会调用的方法
    @Override
    public void cancel() {
        isRunning = false;
    }
}

复制代码

可见,我们每过1秒向Kafka的topn这个topic随机发送一本书的名字用来模拟购买行为。

整体实现代码如下:

public class TopN {

	public static void main(String[] args) throws Exception{

		/**
		 *
		 *  书1 书2 书3
		 *  (书1,1) (书2,1) (书3,1)
		 *
		 *
		 */
		//每隔5秒钟 计算过去1小时 的 Top 3 商品
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		env.setParallelism(1);

		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作为时间语义


		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
		FlinkKafkaConsumer<String> input = new FlinkKafkaConsumer<>("topn", new SimpleStringSchema(), properties);

		//从最早开始消费 位点
		input.setStartFromEarliest();


		DataStream<String> stream = env
				.addSource(input);

		DataStream<Tuple2<String, Integer>> ds = stream
				.flatMap(new LineSplitter()); //将输入语句split成一个一个单词并初始化count值为1的Tuple2<String, Integer>类型


		DataStream<Tuple2<String, Integer>> wcount = ds
				.keyBy(0)
				.window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(5)))
				//key之后的元素进入一个总时间长度为600s,每5s向后滑动一次的滑动窗口
				.sum(1);// 将相同的key的元素第二个count值相加

		wcount
				.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//(shu1, xx) (shu2,xx)....
				//所有key元素进入一个5s长的窗口(选5秒是因为上游窗口每5s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
				.process(new TopNAllFunction(3))
				.print();
//redis sink  redis -> 接口

		env.execute();
	}//





	private static final class LineSplitter implements
			FlatMapFunction<String, Tuple2<String, Integer>> {

		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
			// normalize and split the line
			//String[] tokens = value.toLowerCase().split("\\W+");

			// emit the pairs
			/*for (String token : tokens) {
				if (token.length() > 0) {
					out.collect(new Tuple2<String, Integer>(token, 1));
				}
			}*/

			//(书1,1) (书2,1) (书3,1)
			out.collect(new Tuple2<String, Integer>(value, 1));
		}
	}

	private static class TopNAllFunction
			extends
			ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow> {

		private int topSize = 3;

		public TopNAllFunction(int topSize) {

			this.topSize = topSize;
		}

		public void process(

				ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>.Context arg0,
				Iterable<Tuple2<String, Integer>> input,
				Collector<String> out) throws Exception {

			TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>(
					new Comparator<Integer>() {

						@Override
						public int compare(Integer y, Integer x) {
							return (x < y) ? -1 : 1;
						}

					}); //treemap按照key降序排列,相同count值不覆盖

			for (Tuple2<String, Integer> element : input) {
				treemap.put(element.f1, element);
				if (treemap.size() > topSize) { //只保留前面TopN个元素
					treemap.pollLastEntry();
				}
			}


			for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap
					.entrySet()) {
				out.collect("=================\n热销图书列表:\n"+ new Timestamp(System.currentTimeMillis()) +  treemap.toString() + "\n===============\n");
			}

		}

	}


}//
复制代码

查看输出:

=================
热销图书列表:
2019-03-05 22:32:40.004{8=(Java从入门到放弃,8), 7=(C++从入门到放弃,7), 5=(Php从入门到放弃,5)}
===============
=================
热销图书列表:
2019-03-05 22:32:45.004{8=(Java从入门到放弃,8), 7=(C++从入门到放弃,7), 5=(Php从入门到放弃,5)}
===============

复制代码

所有代码,我放在了我的公众号,回复Flink可以下载

  • 海量【java和大数据的面试题+视频资料】整理在公众号,关注后可以下载~
  • 更多大数据技术欢迎和作者一起探讨~

相关文章:

  • 随笔之python下载与安装
  • print(1,2,3,sep=':')的输出结果是?
  • windows下安装jdk与jmeter
  • 上海瀚示—电力仓库的电子货位标签应用
  • 宕机的阿里云们正在杀死运维?
  • 算法图解之广度优先算法
  • uboot内存布局(平台imx6 uboot2016)
  • 【干货】Kafka实现淘宝亿万级数据统计(下)
  • 绑定hover事件
  • 腾讯58篇论文入选CVPR 2019,涵盖视觉对抗学习等方向
  • E.华华给月月准备礼物
  • 【NOI2018模拟】Yja
  • npm install -g react-native-cli 报错:errno -4048
  • 如何在GitHub上创建个人博客
  • layDay日期格式不合法报错解决
  • ES6--对象的扩展
  • happypack两次报错的问题
  • java8-模拟hadoop
  • jquery ajax学习笔记
  • Less 日常用法
  • MaxCompute访问TableStore(OTS) 数据
  • October CMS - 快速入门 9 Images And Galleries
  • Perseus-BERT——业内性能极致优化的BERT训练方案
  • spring + angular 实现导出excel
  • VUE es6技巧写法(持续更新中~~~)
  • XForms - 更强大的Form
  • 搞机器学习要哪些技能
  • 官方新出的 Kotlin 扩展库 KTX,到底帮你干了什么?
  • 名企6年Java程序员的工作总结,写给在迷茫中的你!
  • 三栏布局总结
  • 手机app有了短信验证码还有没必要有图片验证码?
  • 文本多行溢出显示...之最后一行不到行尾的解决
  • 小试R空间处理新库sf
  • 译有关态射的一切
  • Python 之网络式编程
  • #NOIP 2014# day.1 T2 联合权值
  • (2)MFC+openGL单文档框架glFrame
  • (安全基本功)磁盘MBR,分区表,活动分区,引导扇区。。。详解与区别
  • (三分钟)速览传统边缘检测算子
  • (一)Neo4j下载安装以及初次使用
  • (原创)boost.property_tree解析xml的帮助类以及中文解析问题的解决
  • (原創) 博客園正式支援VHDL語法著色功能 (SOC) (VHDL)
  • (原創) 人會胖會瘦,都是自我要求的結果 (日記)
  • (转)项目管理杂谈-我所期望的新人
  • .NET Core实战项目之CMS 第一章 入门篇-开篇及总体规划
  • .NetCore实践篇:分布式监控Zipkin持久化之殇
  • @DateTimeFormat 和 @JsonFormat 注解详解
  • @Transactional 详解
  • [1] 平面(Plane)图形的生成算法
  • [2013AAA]On a fractional nonlinear hyperbolic equation arising from relative theory
  • [20170728]oracle保留字.txt
  • [20180312]进程管理其中的SQL Server进程占用内存远远大于SQL server内部统计出来的内存...
  • [383] 赎金信 js
  • [51nod1610]路径计数
  • [Android]如何调试Native memory crash issue