当前位置: 首页 > news >正文

Flink SQL 中的流式概念:状态算子

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

传统的关系模型和 SQL 最开始都是为了批式处理而设计的,当把一个关系型查询应用到流式处理上时,在实现和转换的过程中,会有很多和批处理场景非常不同的地方,典型的例子就是:为了实现 SQL 的某些语义,Flink 必须在流上维持状态,典型的代表就是:连接、聚合 、去重 这些操作,它们都是“状态算子”,本质原因还是因为:流处理的表是无界的,流式查询是持续不停的,所以在流上维持状态是必须的。

此外,我们应意识到:由于 Table API & SQL 程序是声明式的,管道会哪里维持状态以及状态如何被使用都是不明确的,就是说不能从 SQL 直接简单地推断出来,另外,Flink 还会对查询进行优化,尽可能地减少“状态”的使用。

下面是官方文档给出的一个状态算子的示例:

CREATE TABLE doc (word STRING
) WITH ('connector' = '...'
);
CREATE TABLE word_cnt (word STRING PRIMARY KEY NOT ENFORCED,cnt  BIGINT
) WITH ('connector' = '...'
);INSERT INTO word_cnt
SELECT word, COUNT(1) AS cnt
FROM doc
GROUP BY word;

这里的聚合函数 count 就需要状态维持,同时又由于分组(group by)的存在,要维持的状态数据就一下变多了,每一个单词都要独立维护一个对应的状态。下图是针对上面的查询语句“编译”(转换)出的流式程序的图解:

img

在这张详细的图解中,我们应该注意这些重点:

  1. count函数是一个状态算子,它的要维持状态数据,也就是每个单词的词频,这些状态数据又同时是下游的输入数据
  2. 状态数据需要实时地推送到下游,状态数据的变更也是以 changelog 形式传导的,所以才会有 +U('hello', 2)-U('hello', 1)这样的消息产生

除了 连接、聚合 、去重 这些显式的状态算子,还有一些“隐式”的状态算子,按官方文档的介绍是说:由优化器隐式推导出来的。这里面的实现机理暂时还不清楚,但是例子是非常典型的!我们在《Flink 实时数仓关键技术解读:Upsert Kafka 和 动态表(Dynamic Table)》这篇文章中曾经详细地解读过 upsert-kafka 作为 sink 时写入到 kafka 中的数据,当再次以这些数据作为 source 进行流式读取时,upsert-kafka 是能够完整推导出 changelog 数据的,利用的就是这里所谓的“隐式推导”能力,具体地说就是一个叫 ChangelogNormalize 的状态算子。

在持续运行的流上维持状态可能是一个成分非常大的操作,因为流是不会停止的,随着时间的推移和大量数据的涌入,状态数据可能会越积越多,导致内存挤爆。所以 Flink 提供了状态的 TTL 机制,当状态在一定时间内没有被更新后就会被自动移除,这个参数就是:table.exec.state.ttl

定义了状态的键在被更新后要保持多长时间才被移除。 在之前的查询例子中,word 的数目会在配置的时间内未更新时立刻被移除。

通过移除状态的键,连续查询会完全忘记它曾经见过这个键。如果一个状态带有曾被移除状态的键被处理了,这条记录将被认为是对应键的第一条记录。上述例子中意味着 cnt 会再次从 0 开始计数。


补充介绍:

管道 (Pipeline):Flink 文档中会反复出现这个名词,在 Flink 中,它指的是一个流式查询从 Source 到 Sink 的完整 DAG,中间是各种算子,简单地说就是:一个查询被“翻译”成一个流后的所有的处理环节。

相关文章:

  • 240Hz高刷电竞显示器 - HKC VG253KM
  • 【分布式事务 XA模式】MySQL XA模式详解
  • 如何操作系统缓冲区减少了磁盘碎片化?
  • springboot-基础-添加model和controller的简单例子+常用注解含义
  • vscode不能远程连接ubuntu18.04.6
  • ElasticSearch之找到乔丹的空中大灌篮电影
  • Android 接入指纹识别
  • 【QT+QGIS跨平台编译】之五十四:【QGIS_CORE跨平台编译】—【qgssqlstatementlexer.cpp生成】
  • 【Linux】部署前后端分离项目---(Nginx自启,负载均衡)
  • SpringMVC(2)
  • 如何在 CentOS 上安装 ONLYOFFICE 文档 8.0
  • MySQL集群 双主架构(配置命令)
  • Nginx 和 Apache 的比较
  • Python 鼠标模拟
  • 【电子通识】为什么单片机芯片上会有多组VDD电源?
  • 【挥舞JS】JS实现继承,封装一个extends方法
  • ABAP的include关键字,Java的import, C的include和C4C ABSL 的import比较
  • create-react-app做的留言板
  • docker容器内的网络抓包
  • js算法-归并排序(merge_sort)
  • Material Design
  • Python利用正则抓取网页内容保存到本地
  • webpack项目中使用grunt监听文件变动自动打包编译
  • yii2权限控制rbac之rule详细讲解
  • 实习面试笔记
  • 网络应用优化——时延与带宽
  • 微服务核心架构梳理
  • 我建了一个叫Hello World的项目
  • 想使用 MongoDB ,你应该了解这8个方面!
  • 在 Chrome DevTools 中调试 JavaScript 入门
  • 支付宝花15年解决的这个问题,顶得上做出十个支付宝 ...
  • 智能情侣枕Pillow Talk,倾听彼此的心跳
  • !!java web学习笔记(一到五)
  • #NOIP 2014#Day.2 T3 解方程
  • #周末课堂# 【Linux + JVM + Mysql高级性能优化班】(火热报名中~~~)
  • (delphi11最新学习资料) Object Pascal 学习笔记---第8章第2节(共同的基类)
  • (Matlab)基于蝙蝠算法实现电力系统经济调度
  • (pt可视化)利用torch的make_grid进行张量可视化
  • (附源码)计算机毕业设计SSM疫情社区管理系统
  • (数据结构)顺序表的定义
  • (一)【Jmeter】JDK及Jmeter的安装部署及简单配置
  • (原)本想说脏话,奈何已放下
  • (源码版)2024美国大学生数学建模E题财产保险的可持续模型详解思路+具体代码季节性时序预测SARIMA天气预测建模
  • (转)MVC3 类型“System.Web.Mvc.ModelClientValidationRule”同时存在
  • .NET Core使用NPOI导出复杂,美观的Excel详解
  • .NET 发展历程
  • .NET 跨平台图形库 SkiaSharp 基础应用
  • .NET和.COM和.CN域名区别
  • .net下简单快捷的数值高低位切换
  • ??eclipse的安装配置问题!??
  • @CacheInvalidate(name = “xxx“, key = “#results.![a+b]“,multi = true)是什么意思
  • @configuration注解_2w字长文给你讲透了配置类为什么要添加 @Configuration注解
  • [ 英语 ] 马斯克抱水槽“入主”推特总部中那句 Let that sink in 到底是什么梗?
  • [100天算法】-x 的平方根(day 61)
  • [2016.7 Day.4] T1 游戏 [正解:二分图 偏解:奇葩贪心+模拟?(不知如何称呼不过居然比std还快)]