当前位置: 首页 > news >正文

pyflink的窗口

PyFlink 中的窗口操作教程

在流处理应用中,窗口(Window)是一个非常重要的概念,它用于对无界的数据流进行切分,使得我们可以对流中的数据执行聚合、计数、排序等操作。PyFlink 提供了丰富的窗口类型和操作,可以对流数据进行时间和计数等维度的切片,进行实时的数据处理。

在本教程中,我们将介绍 PyFlink 中的几种常见窗口类型,并展示如何使用窗口进行数据处理。

1. 安装 PyFlink

在开始之前,确保你已经安装了 PyFlink:

pip install apache-flink

2. 什么是窗口?

窗口(Window)是 Flink 处理无界数据流的核心技术,它将无限的数据流划分为有限的块,这样可以对这些块进行聚合、计数等操作。常见的窗口类型包括:

  • 滚动窗口(Tumbling Window):将数据流划分为不重叠的固定长度时间段。
  • 滑动窗口(Sliding Window):将数据流划分为固定长度的时间段,这些时间段可以相互重叠。
  • 会话窗口(Session Window):基于数据的活动时间来划分数据流,窗口之间有间隔(即活动的间歇)。
  • 计数窗口(Count Window):基于事件的数量而非时间划分窗口。

3. PyFlink 中的窗口操作

在 PyFlink 中,窗口通常和时间、事件一起使用,通过对数据流应用窗口函数来执行聚合操作。以下是几种常见的窗口操作。

4. 流环境设置

在 PyFlink 中,窗口操作通常在流模式下进行。首先,我们需要设置流环境并定义一些基础数据流。

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings# 创建流执行环境
env = StreamExecutionEnvironment.get_execution_environment()# 创建 Table 环境
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=settings)

5. 时间特性设置

时间特性分为两种类型:事件时间(Event Time)和 处理时间(Processing Time)。事件时间基于事件生成时的时间,而处理时间基于 Flink 系统处理事件的时间。

设置事件时间(Event Time)

事件时间需要通过在数据流中添加时间戳和水印(Watermark)来支持。

# 设置事件时间属性
t_env.get_config().set_local_timezone('UTC')  # 使用 UTC 时区

6. 创建窗口

6.1 滚动窗口(Tumbling Window)

滚动窗口会将数据流划分为固定长度的时间段,并且这些时间段互不重叠。

from pyflink.table.window import Tumble
from pyflink.table import expressions as expr# 创建示例表
t_env.execute_sql("""CREATE TEMPORARY TABLE source_table (user_id STRING,item STRING,amount DOUBLE,event_time TIMESTAMP(3),WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND) WITH ('connector' = 'datagen')
""")# 定义滚动窗口,窗口大小为10分钟
result_table = t_env.from_path("source_table") \.window(Tumble.over(expr.lit(10).minutes).on(expr.col("event_time")).alias("w")) \.group_by(expr.col("w"), expr.col("user_id")) \.select(expr.col("user_id"), expr.col("amount").sum.alias("total_spent"), expr.col("w").end.alias("window_end"))# 输出查询结果
result_table.execute().print()
6.2 滑动窗口(Sliding Window)

滑动窗口将数据划分为固定长度的时间段,这些时间段可以相互重叠。窗口的滑动步长定义了相邻窗口的开始时间。

from pyflink.table.window import Slide# 定义滑动窗口,窗口大小为10分钟,滑动步长为5分钟
result_table = t_env.from_path("source_table") \.window(Slide.over(expr.lit(10).minutes).every(expr.lit(5).minutes).on(expr.col("event_time")).alias("w")) \.group_by(expr.col("w"), expr.col("user_id")) \.select(expr.col("user_id"), expr.col("amount").sum.alias("total_spent"), expr.col("w").end.alias("window_end"))# 输出查询结果
result_table.execute().print()
6.3 会话窗口(Session Window)

会话窗口基于数据的活动时间和不活动时间来划分数据流。如果一段时间内没有新的事件到达,窗口会结束。

from pyflink.table.window import Session# 定义会话窗口,不活动间隔为30分钟
result_table = t_env.from_path("source_table") \.window(Session.with_gap(expr.lit(30).minutes).on(expr.col("event_time")).alias("w")) \.group_by(expr.col("w"), expr.col("user_id")) \.select(expr.col("user_id"), expr.col("amount").sum.alias("total_spent"), expr.col("w").end.alias("window_end"))# 输出查询结果
result_table.execute().print()
6.4 计数窗口(Count Window)

计数窗口是基于记录的数量来划分窗口,而不是基于时间。例如,每 1000 条记录形成一个窗口。

from pyflink.table.window import Tumble# 定义计数窗口,每 1000 条记录形成一个窗口
result_table = t_env.from_path("source_table") \.window(Tumble.over(expr.lit(1000).rows).on(expr.col("event_time")).alias("w")) \.group_by(expr.col("w"), expr.col("user_id")) \.select(expr.col("user_id"), expr.col("amount").sum.alias("total_spent"), expr.col("w").end.alias("window_end"))# 输出查询结果
result_table.execute().print()

7. 自定义窗口聚合函数

除了使用内置的窗口聚合函数(如 SUM, COUNT 等),你还可以自定义窗口聚合逻辑。

自定义聚合函数
from pyflink.table.udf import AggregateFunction, udafclass AvgAggregateFunction(AggregateFunction):def get_value(self, accumulator):return accumulator[0] / accumulator[1] if accumulator[1] > 0 else 0def create_accumulator(self):return [0, 0]  # sum, countdef accumulate(self, accumulator, value):accumulator[0] += valueaccumulator[1] += 1# 注册自定义聚合函数
avg_udaf = udaf(AvgAggregateFunction(), result_type='DOUBLE')# 使用自定义聚合函数计算窗口内平均值
result_table = t_env.from_path("source_table") \.window(Tumble.over(expr.lit(10).minutes).on(expr.col("event_time")).alias("w")) \.group_by(expr.col("w"), expr.col("user_id")) \.select(expr.col("user_id"), avg_udaf(expr.col("amount")).alias("average_spent"), expr.col("w").end.alias("window_end"))# 输出查询结果
result_table.execute().print()

8. 完整示例

以下是一个包含窗口操作的完整 PyFlink 示例:

from pyflink.table.window import Tumble
from pyflink.table import expressions as expr
from pyflink.table.udf import AggregateFunction, udaf
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings# 设置流执行环境
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=settings)# 创建示例表
t_env.execute_sql("""CREATE TEMPORARY TABLE source_table (user_id STRING,item STRING,amount DOUBLE,event_time TIMESTAMP(3),WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND) WITH ('connector' = 'datagen')
""")# 定义滚动窗口和自定义聚合函数
class AvgAggregateFunction(AggregateFunction):def get_value(self, accumulator):return accumulator[0] / accumulator[1] if accumulator[1] > 0 else 0def create_accumulator(self):return [0, 0]def accumulate(self, accumulator, value):accumulator[0] += valueaccumulator[1] += 1avg_udaf = udaf(AvgAggregateFunction(), result_type='DOUBLE')# 使用滚动窗口和自定义聚合函数
result_table = t_env.from_path("source_table") \.window(Tumble.over(expr.lit(10).minutes).on(expr.col("event_time")).alias("w")) \.group_by(expr.col("w"), expr.col("user_id")) \.select(expr.col("user_id"), avg_udaf(expr.col("amount")).alias("average_spent"), expr.col("w").end.alias("window_end"))# 输出结果
result_table.execute().print()

9. 总结

在 PyFlink 中,窗口是流处理的核心概念之一,允许你对无界数据流进行聚合、计算和操作。Flink 提供了丰富的窗口类型,包括滚动窗口、滑动窗口、会话窗口和计数窗口,以满足不同场景下的需求。通过本教程,你可以学习如何在 PyFlink 中使用窗口对流数据进行处理,并通过自定义函数来实现更复杂的计算逻辑。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • MySQL系列—7.内存结构
  • ❤《实战纪录片 1 》原生开发小程序中遇到的问题和解决方案
  • AcWing算法基础课-786第k个数-Java题解
  • 论文速读|利用局部性提高机器人操作的样本效率
  • Peewee+Postgresql+PooledPostgresqlDatabase重连机制
  • 数据结构————栈、队列
  • Uniapp基础学习(二)
  • Anchor Alignment Metric来优化目标检测的标签分配和损失函数。
  • [数据集][目标检测]西红柿成熟度检测数据集VOC+YOLO格式3241张5类别
  • Ubuntu 修改IP
  • 发烧时眼睛胀痛的多种原因
  • 基于Netty框架的桩直连协议(云快充协议1.5)
  • C++相关概念和易错语法(32)(单例模式、类型转换)
  • leetcode:516 最长回文字序列 动态规划
  • C++基础(7.Stack_Quene_List)
  • 08.Android之View事件问题
  • java正则表式的使用
  • JS基础之数据类型、对象、原型、原型链、继承
  • session共享问题解决方案
  • vuex 学习笔记 01
  • 爱情 北京女病人
  • 闭包--闭包作用之保存(一)
  • 简单基于spring的redis配置(单机和集群模式)
  • 紧急通知:《观止-微软》请在经管柜购买!
  • 类orAPI - 收藏集 - 掘金
  • 每天10道Java面试题,跟我走,offer有!
  • 目录与文件属性:编写ls
  • 前端面试题总结
  • 前嗅ForeSpider中数据浏览界面介绍
  • 如何使用 OAuth 2.0 将 LinkedIn 集成入 iOS 应用
  • 手机端车牌号码键盘的vue组件
  • 手写双向链表LinkedList的几个常用功能
  • 小程序 setData 学问多
  • 小程序开发之路(一)
  • 一起来学SpringBoot | 第十篇:使用Spring Cache集成Redis
  • 树莓派用上kodexplorer也能玩成私有网盘
  • ​​​​​​​sokit v1.3抓手机应用socket数据包: Socket是传输控制层协议,WebSocket是应用层协议。
  • #if 1...#endif
  • #QT(串口助手-界面)
  • #使用清华镜像源 安装/更新 指定版本tensorflow
  • (6)添加vue-cookie
  • (android 地图实战开发)3 在地图上显示当前位置和自定义银行位置
  • (第9篇)大数据的的超级应用——数据挖掘-推荐系统
  • (附源码)springboot猪场管理系统 毕业设计 160901
  • (附源码)小程序 交通违法举报系统 毕业设计 242045
  • (过滤器)Filter和(监听器)listener
  • (精确度,召回率,真阳性,假阳性)ACC、敏感性、特异性等 ROC指标
  • (算法)区间调度问题
  • (完整代码)R语言中利用SVM-RFE机器学习算法筛选关键因子
  • (限时免费)震惊!流落人间的haproxy宝典被找到了!一切玄妙尽在此处!
  • (一)【Jmeter】JDK及Jmeter的安装部署及简单配置
  • (转)编辑寄语:因为爱心,所以美丽
  • (自用)gtest单元测试
  • .Net Core缓存组件(MemoryCache)源码解析
  • .NET/C# 使用反射注册事件