当前位置: 首页 > news >正文

大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink Time 详解
  • 示例内容分析
  • Watermark

在这里插入图片描述

Watermark

Watermark 在窗口计算中的作用

在使用基于事件时间的窗口时,Flink 依赖 Watermark 来决定何时触发窗口计算。例如,如果你有一个每 10 秒的滚动窗口,当 Watermark 达到某个窗口的结束时间后,Flink 才会触发该窗口的计算。

假设有一个 10 秒的窗口,并且 Watermark 达到 12:00:10,此时 Flink 会触发 12:00:00 - 12:00:10 的窗口计算。

如何处理迟到事件

尽管 Watermark 能有效解决乱序问题,但总有可能会出现事件在生成 Watermark 之后才到达的情况(即“迟到事件”)。为此,Flink 提供了处理迟到事件的机制:

  • 允许一定的延迟处理:可以配置窗口允许迟到的时间。
  • 迟到事件的侧输出流(Side Output):可以将迟到的事件发送到一个侧输出流中,以便后续处理。
DataStream<Tuple2<String, Integer>> mainStream = stream.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(5)).sideOutputLateData(lateOutputTag);

代码实现

数据格式

01,1586489566000
01,1586489567000
01,1586489568000
01,1586489569000
01,1586489570000
01,1586489571000
01,1586489572000
01,1586489573000
01,1586489574000
01,1586489575000
01,1586489576000
01,1586489577000
01,1586489578000
01,1586489579000

编写代码

这段代码实现了:

  • 通过 socket 获取实时流数据。
  • 将流数据映射成带有时间戳的二元组形式。
  • 应用了一个允许 5 秒乱序的水印策略,确保 Flink 可以处理乱序的事件流。
  • 按照事件的 key 进行分组,并在事件时间的基础上进行 5 秒的滚动窗口计算。
  • 最后输出每个窗口内事件的时间范围、窗口开始和结束时间等信息。

其中,这里对流数据进行了按 key(事件的第一个字段)分组,并且使用了 滚动窗口(Tumbling Window),窗口长度为 5 秒。
在 apply 方法中,你收集窗口中的所有事件,并根据事件时间戳进行排序,然后输出每个窗口的开始和结束时间,以及窗口中最早和最晚事件的时间戳。

SingleOutputStreamOperator<String> res = waterMark.keyBy(new KeySelector<Tuple2<String, Long>, String>() {@Overridepublic String getKey(Tuple2<String, Long> value) throws Exception {return value.f0;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {@Overridepublic void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {List<Long> list = new ArrayList<>();for (Tuple2<String, Long> next : input) {list.add(next.f1);}Collections.sort(list);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String result = "key: " + s + ", list.size(): " + list.size() + ", list.get(0): " + sdf.format(list.get(0)) + ", list.get(last): " + sdf.format(list.get(list.size() - 1))+ ", start: " + sdf.format(window.getStart()) + ", end: " + sdf.format(window.getEnd());out.collect(result);}});

水印的策略,定义了一个Bounded Out-of-Orderness 的水印策略,允许最多 5 秒的事件乱序,在 extractTimestamp 中,提取了事件的时间戳,并打印出每个事件的 key 和对应的事件时间。还维护了一个 currentMaxTimestamp 来记录当前最大的事件时间戳:

WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {Long currentMaxTimestamp = 0L;final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {long timestamp = element.f1;currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);System.out.println("Key:" + element.f0 + ", EventTime: " + element.f1 + ", " + format.format(element.f1));return element.f1;}});

完整代码如下所示,代码实现了一个基于事件时间的流处理系统,并通过水印(Watermark)机制来处理乱序事件:

package icu.wzk;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;public class WatermarkTest01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStreamSource<String> data = env.socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Long>> mapped = data.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {String[] split = value.split(",");return new Tuple2<>(split[0], Long.valueOf(split[1]));}});WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {Long currentMaxTimestamp = 0L;final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {long timestamp = element.f1;currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);System.out.println("Key:" + element.f0 + ", EventTime: " + element.f1 + ", " + format.format(element.f1));return element.f1;}});SingleOutputStreamOperator<Tuple2<String, Long>> waterMark = mapped.assignTimestampsAndWatermarks(watermarkStrategy);SingleOutputStreamOperator<String> res = waterMark.keyBy(new KeySelector<Tuple2<String, Long>, String>() {@Overridepublic String getKey(Tuple2<String, Long> value) throws Exception {return value.f0;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {@Overridepublic void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {List<Long> list = new ArrayList<>();for (Tuple2<String, Long> next : input) {list.add(next.f1);}Collections.sort(list);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String result = "key: " + s + ", list.size(): " + list.size() + ", list.get(0): " + sdf.format(list.get(0)) + ", list.get(last): " + sdf.format(list.get(list.size() - 1))+ ", start: " + sdf.format(window.getStart()) + ", end: " + sdf.format(window.getEnd());out.collect(result);}});res.print();env.execute();}
}

运行代码

在这里插入图片描述

传入数据

在控制台中,输入如下的数据:
在这里插入图片描述

查看结果

控制台运行结果如下:
在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 拥抱分布式云:云基础设施的下个新时代
  • 11、Django Admin启用对计算字段的过滤
  • [ios]准备好app后使用xcode发布ios操作
  • 用SpringBoot API实现识别pdf文件是否含有表格
  • AI建模——AI生成3D内容算法产品介绍与模型免费下载
  • 【人工智能/机器学习/机器人】数学基础-学习笔记
  • Z Product | AI教母李飞飞AI创业,4 个月估值达 10 亿美金,目标是使AI能够像人类一样理解和推理三维物理世界
  • 口语笔记——定语
  • 进程管理中的三态模型
  • 828华为云征文 | Flexus X实例与华为云EulerOS的Tomcat安装指南
  • 智能监测,守护未来:QY-19 GNSS位移监测站
  • 揭秘IP地址与SSL证书:构建数字世界的信任桥梁
  • LabVIEW如何自学成为专业开发者
  • yocto传递宏(bitbake传递宏)
  • 联盟营销案例:策略与成功故事
  • gulp 教程
  • IndexedDB
  • JSDuck 与 AngularJS 融合技巧
  • Spring Cloud(3) - 服务治理: Spring Cloud Eureka
  • 阿里云容器服务区块链解决方案全新升级 支持Hyperledger Fabric v1.1
  • 分布式熔断降级平台aegis
  • 高度不固定时垂直居中
  • 日剧·日综资源集合(建议收藏)
  • 线性表及其算法(java实现)
  • 限制Java线程池运行线程以及等待线程数量的策略
  • 一些基于React、Vue、Node.js、MongoDB技术栈的实践项目
  • 树莓派用上kodexplorer也能玩成私有网盘
  • ​520就是要宠粉,你的心头书我买单
  • ​DB-Engines 12月数据库排名: PostgreSQL有望获得「2020年度数据库」荣誉?
  • # 20155222 2016-2017-2 《Java程序设计》第5周学习总结
  • #免费 苹果M系芯片Macbook电脑MacOS使用Bash脚本写入(读写)NTFS硬盘教程
  • #知识分享#笔记#学习方法
  • %@ page import=%的用法
  • (C++哈希表01)
  • (Java实习生)每日10道面试题打卡——JavaWeb篇
  • (补充)IDEA项目结构
  • (二)hibernate配置管理
  • (深度全面解析)ChatGPT的重大更新给创业者带来了哪些红利机会
  • (四)软件性能测试
  • (一)pytest自动化测试框架之生成测试报告(mac系统)
  • (转)PlayerPrefs在Windows下存到哪里去了?
  • (自适应手机端)响应式服装服饰外贸企业网站模板
  • (最全解法)输入一个整数,输出该数二进制表示中1的个数。
  • .bat文件调用java类的main方法
  • .MSSQLSERVER 导入导出 命令集--堪称经典,值得借鉴!
  • .mysql secret在哪_MYSQL基本操作(上)
  • .NET Framework杂记
  • .NET Standard 支持的 .NET Framework 和 .NET Core
  • .net 连接达梦数据库开发环境部署
  • .netcore 如何获取系统中所有session_如何把百度推广中获取的线索(基木鱼,电话,百度商桥等)同步到企业微信或者企业CRM等企业营销系统中...
  • .net反混淆脱壳工具de4dot的使用
  • .NET精简框架的“无法找到资源程序集”异常释疑
  • .NET业务框架的构建
  • @component注解的分类
  • [ 渗透测试面试篇 ] 渗透测试面试题大集合(详解)(十)RCE (远程代码/命令执行漏洞)相关面试题