当前位置: 首页 > news >正文

Flink实时开发添加水印的案例分析

在Flink中,处理时间序列数据时,通常需要考虑事件时间和水印(watermarks)的处理。以下是修改前后的代码对比分析:

修改前的代码:

val systemDS = unitDS.map(dp => {dp.setDeviceCode(DeviceCodeEnum.fromPidToSystem(dp.getDeviceCode))dp
}).keyBy(_.getDeviceCode)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new MySystemWinF)
  1. unitDS 经过一个 map 操作,将每个元素的 deviceCode 转换为系统设备码。
  2. 使用 keyBy(_.getDeviceCode) 对转换后的设备码进行分组。
  3. 定义了一个基于事件时间的滚动窗口,窗口大小为60秒。
  4. 使用 process 操作应用自定义的窗口函数 HPageSystemWinF 来处理每个窗口中的数据。

注意:修改前的代码没有显示地处理水印(watermarks),这可能导致在处理乱序数据或延迟数据时出现问题。

修改后的代码:

val systemDS = unitDS.map(dp => {dp.setDeviceCode(DeviceCodeEnum.fromPidToSystem(dp.getDeviceCode))dp
}).keyBy(_.getDeviceCode)
.assignTimestampsAndWatermarks(WatermarkStrategy.<boundedOutOfOrdernessDaysPower>forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 假设这里应该是.forBoundedOutOfOrderness而不是.forBoundedOutOfOrdernessDaysPower.withIdleness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner[DaysPower] {override def extractTimestamp(element: DaysPower, recordTimestamp: Long): Long = {Math.max(element.getEventTime, recordTimestamp)}})
).keyBy(_.getDeviceCode)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new MySystemWinF)
  1. 与修改前相同的部分:mapkeyBy, 和 window 操作。
  2. 添加了 assignTimestampsAndWatermarks 方法来处理事件时间和水印:
    • 使用 WatermarkStrategy.forBoundedOutOfOrderness 允许一定程度的乱序数据(这里是5秒)。
    • .withIdleness(Duration.ofSeconds(5)) 设置了空闲超时时间为5秒,用于处理不活跃的键。
    • 使用 withTimestampAssigner 自定义了时间戳分配器,确保使用的事件时间是元素中的 eventTime 和记录的 recordTimestamp 中的较大值。

不同点和适用场景:

  • 事件时间和水印处理:修改后的代码显式地处理了事件时间和水印,这对于处理乱序数据、延迟数据以及确保正确的时间窗口计算是非常重要的。如果您的数据流中存在乱序或延迟数据,或者您希望更严格地保证处理时间窗口的正确性,那么应该使用修改后的代码。
  • 空闲超时:通过设置空闲超时,可以处理那些长时间不活跃的键,避免因为某些键长时间没有新数据而导致整个程序挂起。
  • 延迟数据处理:如果数据有可能晚到,但仍然需要被纳入正确的窗口进行计算,水印可以帮助界定数据的“迟到”界限。
    精确的时间窗口分析:对于需要基于事件实际发生时间而非数据处理时间进行分析的场景,如实时监控、金融交易分析等,事件时间模型是必须的。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 【Qt】之【Bug】error:C1083 无法打开包括文件
  • 第七章 单片机的串行口
  • 小程序为什么要做分包处理
  • [Unity]碰撞器的接触捕获层详解
  • springboot 重新注册 bean
  • 【C语言】全面解析冒泡排序
  • vscode通过ssh链接远程服务器上的docker
  • 基于深度学习的车距检测系统
  • vi 编辑器快捷生成 main 函数和基本框架
  • python的readline()和readlines()
  • Hadoop基础组件介绍!
  • 【Git】Git Submodules 介绍(通俗易懂,总结了工作完全够用的 submodule 命令)
  • 签名优化:请求数据类型不是`application/json`,将只对随机数进行签名计算,例如文件上传接口。
  • 网络编程-TCP 协议的三次握手和四次挥手做了什么
  • Spark安装
  • 08.Android之View事件问题
  • 2017 前端面试准备 - 收藏集 - 掘金
  • Idea+maven+scala构建包并在spark on yarn 运行
  • Linux Process Manage
  • Spark VS Hadoop:两大大数据分析系统深度解读
  • swift基础之_对象 实例方法 对象方法。
  • Webpack4 学习笔记 - 01:webpack的安装和简单配置
  • 快速体验 Sentinel 集群限流功能,只需简单几步
  • 前端之Sass/Scss实战笔记
  • No resource identifier found for attribute,RxJava之zip操作符
  • ​​​​​​​开发面试“八股文”:助力还是阻力?
  • ​2020 年大前端技术趋势解读
  • #stm32整理(一)flash读写
  • #我与Java虚拟机的故事#连载04:一本让自己没面子的书
  • (1)SpringCloud 整合Python
  • (3)医疗图像处理:MRI磁共振成像-快速采集--(杨正汉)
  • (4)logging(日志模块)
  • (C语言)深入理解指针2之野指针与传值与传址与assert断言
  • (Redis使用系列) Springboot 实现Redis消息的订阅与分布 四
  • (八)c52学习之旅-中断实验
  • (办公)springboot配置aop处理请求.
  • (几何:六边形面积)编写程序,提示用户输入六边形的边长,然后显示它的面积。
  • (六)DockerCompose安装与配置
  • (排序详解之 堆排序)
  • (数据结构)顺序表的定义
  • (四)Controller接口控制器详解(三)
  • (一)Java算法:二分查找
  • (转)EOS中账户、钱包和密钥的关系
  • (转)程序员疫苗:代码注入
  • (转载)(官方)UE4--图像编程----着色器开发
  • ***汇编语言 实验16 编写包含多个功能子程序的中断例程
  • .NET 6 在已知拓扑路径的情况下使用 Dijkstra,A*算法搜索最短路径
  • .NET CORE 第一节 创建基本的 asp.net core
  • .NET 反射的使用
  • .NET4.0并行计算技术基础(1)
  • .NET的数据绑定
  • .NET企业级应用架构设计系列之技术选型
  • .NET值类型变量“活”在哪?
  • [ CTF ] WriteUp- 2022年第三届“网鼎杯”网络安全大赛(朱雀组)
  • [20171102]视图v$session中process字段含义