当前位置: 首页 > news >正文

大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动基于事件驱动

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink DataSet
  • Flink DataSet 转换操作
  • Flink DataSet 输出
  • 容错机制、对比、发展方向

在这里插入图片描述

Flink Window 背景

Flink认为Batch是Streaming的一个特例,因此Flink底层引擎是一个流式引擎,去上面实现了流处理和批处理,而Window就是从Streaming到Batch的桥梁。

通俗讲,Window是用来对一个无限的流的设置一个有限的集合,从而有界数据集上进行操作的一种机制,流上的集合由Window来划定范围,比如“计算过去10分钟”或者“最后50个元素的和”。
Window可以由时间(TimeWindow)比如30秒或者数据,(CountWindow)比如100个元素驱动。
DataStreamAPI提供了Time和Count的Window。

Flink Window 总览

基本概念

  • Window 是Flink处理无限流的核心,Windows将流拆分为有限大小“桶”,我们可以在其上应用计算。
  • Flink 认为Batch是Streaming的一个特例,所以Flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。
  • 而Window窗口是从Streaming到Batch的一个桥梁。
  • Flink提供了非常完善的窗口机制
  • 在流处理中,数据是连续不断的,因此我们不可能等到所有等到所有数据都到了再开始处理。
  • 当然我们可以每来一个消息就处理一次,但是有时候我们需要做一些聚合操作,例如:在过去一分钟内有多少用户点击了我们的网页
  • 在这种情况下,我们必须定义一个窗口,用来收集最近的一分钟内的数据,并对这个窗口的内数据进行计算
  • 窗口可以基于时间驱动、也可以基于事件驱动
  • 同样基于不同事件驱动的可以分为:翻滚窗口(TumblingWindow 无重叠)、滑动窗口(Sliding Window 有重叠)、会话窗口(SessionWindow 活动间隙)、全局窗口
  • Flink要操作窗口,先要将StreamSource转换成WindowedStream

转换步骤

  • 获取流数据源
  • 获取窗口
  • 操作窗口数据
  • 输出窗口数据

滚动时间窗口

在这里插入图片描述

类型特点

将数据依据固定的窗口长度对数据进行切分:

  • 时间对齐
  • 窗口长度固定,没有重叠

Flink 的滚动时间窗口(Tumbling Window)是一种常见的基于时间的窗口机制,可以通过事件驱动进行计算。滚动窗口的特点是时间窗口是固定长度的,窗口之间没有重叠,每个事件只能进入一个窗口。

在 Flink 中,滚动时间窗口可以基于事件时间(Event Time)或者处理时间(Processing Time)来定义。为了基于事件时间驱动,可以使用 EventTimeSessionWindows 或者 TumblingEventTimeWindows 来进行定义。

关键点

  • 事件时间和水印 (Watermark): 通过 assignTimestampsAndWatermarks 来指定事件时间,并使用水印确保窗口计算不会遗漏延迟的事件。
  • 窗口定义: 使用 TumblingEventTimeWindows.of(Time.seconds(x)) 定义滚动窗口。窗口长度为 x 秒。
  • 触发器: 采用 EventTimeTrigger 触发计算,确保窗口是基于事件时间的。

基于时间驱动

场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)。
启动的主类:

package icu.wzk;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.text.SimpleDateFormat;
import java.util.Random;public class TumblingWindow {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timeMillis = System.currentTimeMillis();int random = new Random().nextInt(10);System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));return Tuple2.of(value, random);}});KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {@Overridepublic Tuple getKey(Tuple2<String, Integer> value) throws Exception {return Tuple1.of(value.f0);}});// 基于时间驱动 每隔 10秒 划分一个窗口WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10));timeWindow.apply(new MyTimeWindowFunction()).print();env.execute("TumblingWindow");}}

我们实现一个 MyTimeWindowFunction,滚动时间窗口:

package icu.wzk;import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;public class MyTimeWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow> {/*** 场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)* @author wzk* @date 16:58 2024/7/26**/@Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");int sum = 0;for (Tuple2<String, Integer> tuple2 : input) {sum += tuple2.f1;}out.collect("key: " + tuple.getField(0) + ", value: " + sum  +", window start: " + format.format(window.getStart()) + ", window end: " + format.format(window.getEnd()));}
}

基于事件驱动

场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。
编写一个启动类:

package icu.wzk;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.text.SimpleDateFormat;
import java.util.Random;public class TumblingWindow {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timeMillis = System.currentTimeMillis();int random = new Random().nextInt(10);System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));return Tuple2.of(value, random);}});KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {@Overridepublic Tuple getKey(Tuple2<String, Integer> value) throws Exception {return Tuple1.of(value.f0);}});// 基于时间驱动 每隔 10秒 划分一个窗口WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> globalWindow = keyedStream.countWindow(3);globalWindow.apply(new MyCountWindowFuntion());env.execute("TumblingWindow");}}

编写一个事件驱动的类:MyCountWindowFuntion

package icu.wzk;import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;public class MyCountWindowFuntion implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> {/*** 场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。* @author wzk* @date 17:11 2024/7/26**/@Overridepublic void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");int sum = 0;for (Tuple2<String, Integer> tuple2 : input) {sum += tuple2.f1;}// 无用的时间戳:默认值是:Long.MAX_VALUE,在事件驱动下,基于计数的情况,不关心时间long maxTimestamp = window.maxTimestamp();out.collect("key:" + tuple.getField(0) + ", value: " + sum + ", maxTimestamp :"+ maxTimestamp + "," + format.format(maxTimestamp));}}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Word封面对齐技巧
  • 数据库中的逐行数据处理
  • FPGA随记——OSERDESE2和IERDESE2
  • (纯JS)图片裁剪
  • PyTorch 创建数据集
  • 《论系统安全架构设计及其应用》写作框架,软考高级系统架构设计师
  • 面经学习(hbkj实习)
  • 如何在Mac中修改pip的镜像源
  • 【MySQL】批量插入数据造数-存储过程
  • 在Windows系统上部署PPTist并实现远程访问
  • IntelliJ IDEA下载安装
  • 01 Shell Script概述
  • HTTP 三、http在springboot中得应用
  • 好看的个人导航页面html源码
  • 使用Fign进行客户端远程调用和SpringFormEncoder的使用
  • 《Java8实战》-第四章读书笔记(引入流Stream)
  • IE报vuex requires a Promise polyfill in this browser问题解决
  • Java 最常见的 200+ 面试题:面试必备
  • java正则表式的使用
  • LeetCode算法系列_0891_子序列宽度之和
  • Rancher如何对接Ceph-RBD块存储
  • Redux系列x:源码分析
  • SQLServer插入数据
  • Stream流与Lambda表达式(三) 静态工厂类Collectors
  • webgl (原生)基础入门指南【一】
  • 翻译:Hystrix - How To Use
  • 跨域
  • 聊聊hikari连接池的leakDetectionThreshold
  • 聊聊spring cloud的LoadBalancerAutoConfiguration
  • 排序算法学习笔记
  • 区块链分支循环
  • 如何用Ubuntu和Xen来设置Kubernetes?
  • 如何在 Tornado 中实现 Middleware
  • 入门级的git使用指北
  • 使用 5W1H 写出高可读的 Git Commit Message
  • 腾讯视频格式如何转换成mp4 将下载的qlv文件转换成mp4的方法
  • 小程序测试方案初探
  • 一起来学SpringBoot | 第十篇:使用Spring Cache集成Redis
  • 做一名精致的JavaScripter 01:JavaScript简介
  • elasticsearch-head插件安装
  • 仓管云——企业云erp功能有哪些?
  • 微龛半导体获数千万Pre-A轮融资,投资方为国中创投 ...
  • 整理一些计算机基础知识!
  • ​十个常见的 Python 脚本 (详细介绍 + 代码举例)
  • # Java NIO(一)FileChannel
  • #70结构体案例1(导师,学生,成绩)
  • (02)Hive SQL编译成MapReduce任务的过程
  • (C++17) std算法之执行策略 execution
  • (八十八)VFL语言初步 - 实现布局
  • (官网安装) 基于CentOS 7安装MangoDB和MangoDB Shell
  • (十)【Jmeter】线程(Threads(Users))之jp@gc - Stepping Thread Group (deprecated)
  • (十三)Flink SQL
  • (算法)前K大的和
  • (一)C语言之入门:使用Visual Studio Community 2022运行hello world
  • (游戏设计草稿) 《外卖员模拟器》 (3D 科幻 角色扮演 开放世界 AI VR)