当前位置: 首页 > news >正文

flink 设置空闲等待推进水位线,避免子任务上游最小的水位线迟迟未达到触发时间

文章目录

    • 1. 空闲等待
      • 1.1 空闲等待
      • 1.2 withIdleness
      • 1.3 源码


1. 空闲等待

1.1 空闲等待

  多并行度的flink作业,watermark水位线的传递遵循接收到上游多个水位线时取最小往下游多个子任务发送水位线时进行广播。此时,如果有其中一个子任务没有数据,导致当前Task的水位线无法推进,窗口无法触发,需要等待上游最小的水位线达到触发时间。于是,flink添加了空闲等待的设置

1.2 withIdleness

  在设置WatermarkStrategy时,添加.withIdleness(Duration.ofSeconds(5))

        WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy//升序的watermark,没有等待时间.<WaterSensor>forGenerator(new WatermarkGeneratorSupplier<WaterSensor>() {@Overridepublic WatermarkGenerator<WaterSensor> createWatermarkGenerator(Context context) {return new MyWatermark<>(Duration.ofSeconds(3));}})//指定时间戳分配器,从数据中提取时间戳.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);//返回的数据为毫秒return element.getTs() * 1000;}}).withIdleness(Duration.ofSeconds(5));

1.3 源码

  其核心逻辑为:

@Public
public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> {private final WatermarkGenerator<T> watermarks;private final IdlenessTimer idlenessTimer;private boolean isIdleNow = false;/*** Creates a new WatermarksWithIdleness generator to the given generator idleness detection with* the given timeout.** @param watermarks The original watermark generator.* @param idleTimeout The timeout for the idleness detection.*/public WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout) {this(watermarks, idleTimeout, SystemClock.getInstance());}@VisibleForTestingWatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout, Clock clock) {checkNotNull(idleTimeout, "idleTimeout");checkArgument(!(idleTimeout.isZero() || idleTimeout.isNegative()),"idleTimeout must be greater than zero");this.watermarks = checkNotNull(watermarks, "watermarks");this.idlenessTimer = new IdlenessTimer(clock, idleTimeout);}@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {watermarks.onEvent(event, eventTimestamp, output);idlenessTimer.activity();isIdleNow = false;}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {if (idlenessTimer.checkIfIdle()) {if (!isIdleNow) {output.markIdle();isIdleNow = true;}} else {watermarks.onPeriodicEmit(output);}}// ------------------------------------------------------------------------@VisibleForTestingstatic final class IdlenessTimer {/** The clock used to measure elapsed time. */private final Clock clock;/** Counter to detect change. No problem if it overflows. */private long counter;/** The value of the counter at the last activity check. */private long lastCounter;/*** The first time (relative to {@link Clock#relativeTimeNanos()}) when the activity check* found that no activity happened since the last check. Special value: 0 = no timer.*/private long startOfInactivityNanos;/** The duration before the output is marked as idle. */private final long maxIdleTimeNanos;IdlenessTimer(Clock clock, Duration idleTimeout) {this.clock = clock;long idleNanos;try {idleNanos = idleTimeout.toNanos();} catch (ArithmeticException ignored) {// long integer overflowidleNanos = Long.MAX_VALUE;}this.maxIdleTimeNanos = idleNanos;}public void activity() {counter++;}public boolean checkIfIdle() {if (counter != lastCounter) {// activity since the last check. we reset the timerlastCounter = counter;startOfInactivityNanos = 0L;return false;} else // timer started but has not yet reached idle timeoutif (startOfInactivityNanos == 0L) {// first time that we see no activity since the last periodic probe// begin the timerstartOfInactivityNanos = clock.relativeTimeNanos();return false;} else {return clock.relativeTimeNanos() - startOfInactivityNanos > maxIdleTimeNanos;}}}
}

checkIfIdle()方法用于判断是否触发水位线推进


相关文章:

  • python的正则表达式
  • 《高性能MYSQL》-架构,锁,事务
  • H264的打包,nal,es,pes,pts,dts,ps,ts
  • 第十四届蓝桥杯大赛B组 JAVA 蜗牛 (递归剪枝)
  • 模版进阶C++
  • AI写的wordpress网站首页模板 你觉得怎么样?
  • [GXYCTF2019]BabyUpload1 -- 题目分析与详解
  • 探讨苹果 Vision Pro 的 AI 数字人形象问题
  • Linux相关小技巧《一》
  • LeetCode每日一题之 移动0
  • C++之结构体以及通讯录管理系统
  • 第四十七回 一丈青单捉王矮虎 宋公明二打祝家庄-强大而灵活的python装饰器
  • 在git中自动把CRLF转换到LF的方法
  • iOS-UILabel调整行间距
  • RK3568开发笔记-qt程序运行报错Failed to move cursor on screen
  • 【Leetcode】101. 对称二叉树
  • 30天自制操作系统-2
  • css的样式优先级
  • golang中接口赋值与方法集
  • gops —— Go 程序诊断分析工具
  • java第三方包学习之lombok
  • js 实现textarea输入字数提示
  • Netty+SpringBoot+FastDFS+Html5实现聊天App(六)
  • nfs客户端进程变D,延伸linux的lock
  • Odoo domain写法及运用
  • 等保2.0 | 几维安全发布等保检测、等保加固专版 加速企业等保合规
  • 前端知识点整理(待续)
  • 实现菜单下拉伸展折叠效果demo
  • 树莓派 - 使用须知
  • 通信类
  • 追踪解析 FutureTask 源码
  • nb
  • ​2021半年盘点,不想你错过的重磅新书
  • # include “ “ 和 # include < >两者的区别
  • #android不同版本废弃api,新api。
  • #LLM入门|Prompt#1.7_文本拓展_Expanding
  • (pytorch进阶之路)CLIP模型 实现图像多模态检索任务
  • (带教程)商业版SEO关键词按天计费系统:关键词排名优化、代理服务、手机自适应及搭建教程
  • (附源码)spring boot基于Java的电影院售票与管理系统毕业设计 011449
  • (七)Java对象在Hibernate持久化层的状态
  • (最优化理论与方法)第二章最优化所需基础知识-第三节:重要凸集举例
  • .net core 控制台应用程序读取配置文件app.config
  • .NET Standard / dotnet-core / net472 —— .NET 究竟应该如何大小写?
  • .NET 读取 JSON格式的数据
  • .NET 中使用 Mutex 进行跨越进程边界的同步
  • .NET6 命令行启动及发布单个Exe文件
  • .NET设计模式(11):组合模式(Composite Pattern)
  • .net使用excel的cells对象没有value方法——学习.net的Excel工作表问题
  • .net图片验证码生成、点击刷新及验证输入是否正确
  • .Net下使用 Geb.Video.FFMPEG 操作视频文件
  • /etc/shadow字段详解
  • @Controller和@RestController的区别?
  • @开发者,一文搞懂什么是 C# 计时器!
  • [8-23]知识梳理:文件系统、Bash基础特性、目录管理、文件管理、文本查看编辑处理...
  • [AIR] NativeExtension在IOS下的开发实例 --- IOS项目的创建 (一)