当前位置: 首页 > news >正文

Flink系列之:窗口去重

Flink系列之:窗口去重

  • 一、窗口去重
  • 二、示例
  • 三、限制

一、窗口去重

  • 适用于Streaming
  • 窗口去重是一种特殊的去重,它根据指定的多个列来删除重复的行,保留每个窗口和分区键的第一个或最后一个数据。
  • 对于流式查询,与普通去重不同,窗口去重只在窗口的最后返回结果数据,不会产生中间结果。它会清除不需要的中间状态。 因此,窗口去重查询在用户不需要更新结果时,性能较好。通常,窗口去重直接用于 窗口表值函数 上。另外,它可以用于基于 窗口表值函数 的操作。比如 窗口聚合,窗口TopN 和 窗口关联。
  • 窗口Top-N的语法和普通的Top-N相同。 除此之外,窗口去重需要 PARTITION BY 子句包含表的 window_start 和 window_end 列。 否则优化器无法翻译。
  • Flink 使用 ROW_NUMBER() 移除重复数据,就像 窗口 Top-N 一样。理论上,窗口是一种特殊的窗口 Top-N:N是1并且是根据处理时间或事件时间排序的。

下面展示了窗口去重的语法:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]ORDER BY time_attr [asc|desc]) AS rownumFROM table_name) -- relation applied windowing TVF
WHERE (rownum = 1 | rownum <=1 | rownum < 2) [AND conditions]

参数说明:

  • ROW_NUMBER():为每一行分配一个唯一且连续的序号,从1开始。
  • PARTITION BY window_start, window_end [, col_key1…]: 指定分区字段,需要包含window_start, window_end以及其他分区键。
  • ORDER BY time_attr [asc|desc]: 指定排序列,必须是 时间属性。目前 Flink 支持 处理时间属性 和 事件时间属性。 Order by ASC 表示保留第一行,Order by DESC 表示保留最后一行。
  • WHERE (rownum = 1 | rownum <=1 | rownum < 2): 优化器通过 rownum = 1 | rownum <=1 | rownum < 2 来识别查询能否被翻译成窗口去重。

二、示例

下面的示例展示了在10分钟的滚动窗口上保持最后一条记录。

-- tables must have time attribute, e.g. `bidtime` in this table
Flink SQL> DESC Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
|        name |                   type | null | key | extras |                       watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
|     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND |
|       price |         DECIMAL(10, 2) | true |     |        |                                 |
|        item |                 STRING | true |     |        |                                 |
+-------------+------------------------+------+-----+--------+---------------------------------+Flink SQL> SELECT * FROM Bid;
+------------------+-------+------+
|          bidtime | price | item |
+------------------+-------+------+
| 2020-04-15 08:05 |  4.00 | C    |
| 2020-04-15 08:07 |  2.00 | A    |
| 2020-04-15 08:09 |  5.00 | D    |
| 2020-04-15 08:11 |  3.00 | B    |
| 2020-04-15 08:13 |  1.00 | E    |
| 2020-04-15 08:17 |  6.00 | F    |
+------------------+-------+------+Flink SQL> SELECT *FROM (SELECT bidtime, price, item, supplier_id, window_start, window_end, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY bidtime DESC) AS rownumFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))) WHERE rownum <= 1;
+------------------+-------+------+-------------+------------------+------------------+--------+
|          bidtime | price | item | supplier_id |     window_start |       window_end | rownum |
+------------------+-------+------+-------------+------------------+------------------+--------+
| 2020-04-15 08:09 |  5.00 |    D |   supplier4 | 2020-04-15 08:00 | 2020-04-15 08:10 |      1 |
| 2020-04-15 08:17 |  6.00 |    F |   supplier5 | 2020-04-15 08:10 | 2020-04-15 08:20 |      1 |
+------------------+-------+------+-------------+------------------+------------------+--------+

注意: 为了更好地理解窗口行为,这里把 timestamp 值后面的0去掉了。例如:在 Flink SQL Client 中,如果类型是 TIMESTAMP(3) ,2020-04-15 08:05 应该显示成 2020-04-15 08:05:00.000。

三、限制

在窗口表值函数后直接进行窗口去重的限制

  • 目前,Flink 只支持在滚动窗口、滑动窗口和累积窗口的窗口表值函数后进行窗口去重。会话窗口的去重将在未来版本中支持。

根据时间属性排序的限制:

  • 目前,窗口去重只支持根据事件时间属性进行排序。根据处理时间排序将在未来版本中支持。

相关文章:

  • 科技的成就(五十四)
  • Linux:TCP 序列号简介
  • php的Url 安全的base64编码解码类
  • ACM32如何保护算法、协议不被破解或者修改
  • C练习题_3答案
  • [论文笔记] chatgpt系列 SparseMOE—GPT4的MOE结构
  • beebox靶场A1 low 命令注入通关教程(上)
  • 【PostgreSQL】从零开始:(二)PostgreSQL下载与安装
  • 【5G PHY】5G小区类型、小区组和小区节点的概念介绍
  • 音频I2S
  • 08-工厂方法
  • mybatis中oracle的sql没走索引导致特别慢(未加jdbcType的)
  • Nat. Mach. Intell. | 通过深度神经网络联合建模多个切片来构建一个三维全生物体空间图谱
  • jenkins安装
  • 【运维笔记】Hyperf正常情况下Xdebug报错死循环解决办法
  • (ckeditor+ckfinder用法)Jquery,js获取ckeditor值
  • 【译】理解JavaScript:new 关键字
  • 2018天猫双11|这就是阿里云!不止有新技术,更有温暖的社会力量
  • Sublime text 3 3103 注册码
  • Vue--数据传输
  • 安装python包到指定虚拟环境
  • 不用申请服务号就可以开发微信支付/支付宝/QQ钱包支付!附:直接可用的代码+demo...
  • 深入 Nginx 之配置篇
  • 实战:基于Spring Boot快速开发RESTful风格API接口
  • 推荐一个React的管理后台框架
  • 小程序测试方案初探
  • 一道面试题引发的“血案”
  • 湖北分布式智能数据采集方法有哪些?
  • 如何通过报表单元格右键控制报表跳转到不同链接地址 ...
  • 如何在招聘中考核.NET架构师
  • ​​​​​​​Installing ROS on the Raspberry Pi
  • ​Linux Ubuntu环境下使用docker构建spark运行环境(超级详细)
  • # centos7下FFmpeg环境部署记录
  • #!/usr/bin/python与#!/usr/bin/env python的区别
  • $redis-setphp_redis Set命令,php操作Redis Set函数介绍
  • (22)C#传智:复习,多态虚方法抽象类接口,静态类,String与StringBuilder,集合泛型List与Dictionary,文件类,结构与类的区别
  • (C++)栈的链式存储结构(出栈、入栈、判空、遍历、销毁)(数据结构与算法)
  • (二十三)Flask之高频面试点
  • (附源码)计算机毕业设计ssm电影分享网站
  • (万字长文)Spring的核心知识尽揽其中
  • (幽默漫画)有个程序员老公,是怎样的体验?
  • (转)linux 命令大全
  • (转载)hibernate缓存
  • .NET Core 成都线下面基会拉开序幕
  • .NET 简介:跨平台、开源、高性能的开发平台
  • .Net 中的反射(动态创建类型实例) - Part.4(转自http://www.tracefact.net/CLR-and-Framework/Reflection-Part4.aspx)...
  • .NET6使用MiniExcel根据数据源横向导出头部标题及数据
  • .netcore 如何获取系统中所有session_ASP.NET Core如何解决分布式Session一致性问题
  • @德人合科技——天锐绿盾 | 图纸加密软件有哪些功能呢?
  • [AIGC] MySQL存储引擎详解
  • [AndroidStudio]_[初级]_[修改虚拟设备镜像文件的存放位置]
  • [Angular] 笔记 20:NgContent
  • [C#]OpenCvSharp使用帧差法或者三帧差法检测移动物体
  • [CF226E]Noble Knight's Path
  • [CISCN2019 华北赛区 Day1 Web5]CyberPunk --不会编程的崽