当前位置: 首页 > news >正文

大数据Flink(一百一十八):SQL水印操作(Watermark)

文章目录

​​​​​​SQL水印操作(Watermark)

一、为什么要有WaterMark

二、​​​​​​​Watermark解决的问题

三、​​​​​​​​​​​​​​代码演示


​​​​​​SQL水印操作(Watermark)

一、​​​​​​​为什么要有WaterMark

当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示:

假设在一个5秒的Tumble窗口,有一个EventTime是 11秒的数据,在第16秒时候到来了。图示第11秒的数据,在16秒到来了,如下图:该如何处理迟到数据

二、​​​​​​​​​​​​​​Watermark解决的问题

上面的问题在于如何将迟来的EventTime 为11的元素正确处理?

当Watermark的时间戳等于Event中携带的EventTime时候,上面场景(Watermark=EventTime)的计算结果如下:

如果想正确处理迟来的数据可以定义Watermark生成策略为 Watermark = EventTime -5s, 如下: 

通过watermark来解决,简单来说就是延迟窗口关闭的时间,等一会迟到的数据,窗口关闭不在依据数据的时间,而是到达的watermark的时间。

watermark可以理解为一个特殊的数据,这个数据不参与计算,仅仅是对窗口的触发关闭起作用。

三、​​​​​​​​​​​​​​代码演示

  • 使用Socket模拟接收数据
  • 设置WaterMark
    • 设置的逻辑:在第一条数据进来时,设置WaterMark为0,指定第一条数据的时间戳后,获取该时间戳与当前 WaterMark的最大值,并将最大值设置为下一条数据的WaterMark,以此类推
  • 使用滚动Event Time窗口,将5秒内的同组数据,进行聚合输出
CREATE TABLE watermark_zero (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '0' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_zero
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;

若输入第一条数据:hello,2022-03-25 16:39:45

那么,我先假设后续的数据Event Time间隔为1秒,推断一下WaterMark的设定,如下图所示

1.第一条数据的Event Time为1648197585000,那么当前窗口时间为:1648197585000-> 1648197589000,即下图中红色框线

2.第一条数据进来时,这条数据之前的WaterMark为0,当第一条数据已经进入后,指定Event Time位置,并与现在的WaterMark比较,将两者中大的那个值设置为新的WaterMark,那么当前数据的WaterMark为1648197585000

3.第二条数据进来时,前一条数据的WaterMark为1648197585000,第二条数据的Event Time比之前的WaterMark大,于是更新WaterMark,将当前的WaterMark更新为1648197586000,但还没到窗口触发时间,不进行计算

4.后面几个以此类推,直到Event Time为:1648197590000的数据进来的时候,前一条数据的WaterMark为1648197589000,于是更新当前的WaterMark为1648197590000,Flink认为1648197590000之前的数据都已经到达,且达到了窗口的触发条件,开始进行计算

根据上面的推断,启动程序验证一下,向9999端口监听终端输入以下内容:

hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50

 Flink输出结果:

Rowtime列在经过窗口操作后,其Event Time属性将丢失。可以使用辅助函数TUMBLE_ROWTIME、HOP_ROWTIME或SESSION_ROWTIME,获取窗口中的Rowtime列的最大值max(rowtime)作为时间窗口的Rowtime,其类型是具有Rowtime属性的TIMESTAMP,取值为 window_end - 1 。 例如[00:00, 00:15) 的窗口,返回值为00:14:59.999 。

数据乱序的场景

上面的实例,Event Time是有序,现在来做一下数据乱序的场景模拟启动程序(注意要关闭之前的查询,重新运行查询语句),在监听终端中输入如下数据:

其中,在触发了了第一个窗口计算后,又来了两条迟到数据hello,2022-03-25 16:39:47,hello,2022-03-25 16:39:46

hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55

Flink结果:

从结果中可以看到,在第二个窗口中,那两条迟到数据并没有进行处理,这个就是迟到丢弃

乱序时间的设置:

为了解决上面的问题,我们允许Flink处理延迟在5秒内的迟到数据

修改最大乱序时间(新建的表仅水印与之前不同)

CREATE TABLE watermark_five (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_five
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;

在监听终端中,输入数据

hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55

Flink输出结果:  

可以看到,之前迟到的两条数据在第一个窗口中进行了处理。因为设置了最大允许乱序时间后,WaterMark要比原来低5秒,可以对延迟5秒内的数据进行处理,窗口的触发条件也同样会往后延迟关于延迟时间,请结合业务场景进行设置。


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 项目实战 (11)---搜索进度
  • 人工智能辅助汽车造型设计
  • 第307题|快速掌握 反常积分敛散性判定的方法|武忠祥老师每日一题
  • 基于k8s手动部署rabbitmq集群(Manually Deploying RabbitMQ Cluster Based on k8s)
  • Spring Cloud集成Gateaway
  • 【视频教程】基于python深度学习遥感影像地物分类与目标识别、分割实践技术应用
  • Linux shell编程学习笔记80:gzip命令——让文件瘦身
  • redis底层—数据结构
  • 在职研生活学习--20240908
  • JQuery简介 - 什么是jQuery
  • SpringBoot实现房产销售系统全解析
  • vs code: pnpm : 无法加载文件 C:\Program Files\nodejs\pnpm.ps1,因为在此系统上禁止运行脚本
  • modbus调试助手/mqtt调试工具/超轻巧物联网组件/多线程实时采集/各种协议支持
  • Linux编程获取指定网口MAC地址
  • 9.11 QT ( Day 4)
  • [笔记] php常见简单功能及函数
  • Android交互
  • C# 免费离线人脸识别 2.0 Demo
  • C++类中的特殊成员函数
  • Docker入门(二) - Dockerfile
  • Go 语言编译器的 //go: 详解
  • nginx(二):进阶配置介绍--rewrite用法,压缩,https虚拟主机等
  • Python中eval与exec的使用及区别
  • seaborn 安装成功 + ImportError: DLL load failed: 找不到指定的模块 问题解决
  • vue和cordova项目整合打包,并实现vue调用android的相机的demo
  • 服务器从安装到部署全过程(二)
  • 汉诺塔算法
  • 简单基于spring的redis配置(单机和集群模式)
  • 面试总结JavaScript篇
  • 如何学习JavaEE,项目又该如何做?
  • 使用iElevator.js模拟segmentfault的文章标题导航
  • 携程小程序初体验
  • shell使用lftp连接ftp和sftp,并可以指定私钥
  • #pragam once 和 #ifndef 预编译头
  • #pragma multi_compile #pragma shader_feature
  • (1)(1.9) MSP (version 4.2)
  • (1/2)敏捷实践指南 Agile Practice Guide ([美] Project Management institute 著)
  • (51单片机)第五章-A/D和D/A工作原理-A/D
  • (Bean工厂的后处理器入门)学习Spring的第七天
  • (Mac上)使用Python进行matplotlib 画图时,中文显示不出来
  • (八)Spring源码解析:Spring MVC
  • (差分)胡桃爱原石
  • (附源码)spring boot车辆管理系统 毕业设计 031034
  • (精确度,召回率,真阳性,假阳性)ACC、敏感性、特异性等 ROC指标
  • (论文阅读11/100)Fast R-CNN
  • (三)Kafka离线安装 - ZooKeeper开机自启
  • .helper勒索病毒的最新威胁:如何恢复您的数据?
  • .NET 中 GetHashCode 的哈希值有多大概率会相同(哈希碰撞)
  • .NET编程C#线程之旅:十种开启线程的方式以及各自使用场景和优缺点
  • /dev下添加设备节点的方法步骤(通过device_create)
  • @media screen 针对不同移动设备
  • @RequestBody与@ResponseBody的使用
  • [ vulhub漏洞复现篇 ] ThinkPHP 5.0.23-Rce
  • [20171102]视图v$session中process字段含义
  • [AIGC codze] Kafka 的 rebalance 机制