当前位置: 首页 > news >正文

Flink State 状态原理解析 | 京东物流技术团队

一、Flink State 概念

State 用于记录 Flink 应用在运行过程中,算子的中间计算结果或者元数据信息。运行中的 Flink 应用如果需要上次计算结果进行处理的,则需要使用状态存储中间计算结果。如 Join、窗口聚合场景。

Flink 应用运行中会保存状态信息到 State 对象实例中,State 对象实例通过 StateBackend 实现将相关数据存储到 FS 文件系统或者 RocksDB 数据库中。在Flink应用运行过程中,通过 checkpoint 快照定期地保存状态数据。并在 Flink 应用重启时加载checkpoint/savepoint 来实现状态的恢复,从而让 Flink 应用继续完成之前的数据计算,实现数据精确一次向下游传递。

1.1 Apache Flink 中 State 的存储实现 StateBackend 分类

分为以下3类:

  • 基于内存的 HeapStateBackend。状态存储在内存中。
  • 基于 HDFS 或 OSS 的 FsStateBackend。状态存储在内存,并在做 cp(checkpoint)时存到远端。
  • 基于 RocksDB 的 RocksDBStateBackend。将对象序列化成二进制存在内存和本地磁盘的 RocksDB 数据中,并在 cp 时存到远端。

HeapStateBackend 和 RocksDBStateBackend 分别对应在 TaskManager 内存模型中的位置:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

RocksDBStateBackend 中存储结构:

namespace: 在不同的 namespace 下存在相同名称的状态。

1.1.1 State 状态持久化

通过 Chandy-Lamport 分布式快照算法进行 checkpoint 完成状态数据的持久化。然后在 Flink 应用重启时读取 State 状态数据,进行运行现场的还原。

chekcpoint 分类:

  • 基于内存的全量 checkpoint
  • HDFS 全量 checkpoint
  • RocksDB 全量 checkpoint/增量 checkpoint

1.2 State 基于算子和数据分组的分类

State 可分为 Operator State 和 Keyed State 两类。

  • Operator State(称为 non-keyed state)

常常存在于Source, Sink中。具体实现类例如:

  • BroadcastState

例:Kafka Source 中用 OperatorState 记录 offset。

  • Keyed State

任何类型的 keyed state 都可以有有效期(TTL),所有状态类型都支持单元素的 TTL。 这意味着 List 元素和 Map 映射元素将独立到期。

例:SQL GroupBy/PartitionBy 后的窗口中的数据,每个 key 都有对应的 State。key 与 key 之间的 State 数据不可见。

keyed state 的具体实现类:

  • ValueState
  • MapState
  • ListState
  • AggregatingState
  • ReducingState
  • 。。。。。

Flink State思维导图:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

Keyed StateOperator State
适用算子类型只适用于KeyedStream上的算子可用于所有算子
状态分配每个Key对应一个状态一个算子子任务对应一个状态
横向扩展状态随着keyBy的分组KeyGroup自动在多个算子子任务上迁移有多种状态重新分配的方式
创建和访问方式自定义算子(重写RichFunction,通过State 名称从 getRuntimeContext方法创建或获得 State )实现 CheckpointedFunction 等接口
支持数据结构ValueState、ListState、MapState等ListState、BroadcastState等

二、常见状态相关处理流程

2.1 Flink 应用中状态是如何存储的?

1. Kafka Source 如何存储 OperatorState?

class FlinkKafkaConsumerBase {private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates; // state名称:"topic-partition-offset-states"
// 特殊的State类型:Union State 
}

unionOffsetStates这个变量就是 OperatorState类型的。

2. Map算子如何存储需要累计的数据?

  • ValueState/MapState/ListState/…

思考:keyby 后的数据分发与多并行度 subtask 之间的关系是怎样的?

首先,datastream 中数据经过 keyby 之后,会划分到各个 KeyedStream 中。每个 KeyedStream 有自己的 KeyedState(如ValueState/ListState/MapState)。

其次,KeyedStream 中的数据会以 KeyGroup 方式组织在一起。KeyGroup 是 Flink 重新分发 key state 的最小单元。

最后,KeyGroup 中的数据会通过取模最大并行度的方式分散到各个 subtask 中。以下是关键源码:

KeyGroupStreamPartitioner#selectChannel(record)
{K key;key = keySelector.getKey(record.getInstance().getValue());return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
}
--KeyGroupRangeAssignment#assignKeyToParallelOperator(){return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));}--KeyGroupRangeAssignment#computeOperatorIndexForKeyGroup()公式:OperatorIndex = keyGroupId * parallelism / maxParallelism--KeyGroupRangeAssignment#assignToKeyGroup(){return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);}

2.2 修改并行度场景时 State 状态存储的变化

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

2.3 State 与 Checkpoint 关系

分布式快照 Checkpoint 的概念,定期将 State 持久化到 外部存储系统(HDFS/OSS) 上。用户可以通过实现 CheckpointedFunction 接口来使用 operator state。通过 barrier 来对齐 checkpoint,等待 State 持久化完成(此过程参数不同也可能是异步的)。

常见 State 与 CP 相关的问题

  • State 状态过大。现象为多个算子或单个算子多个 subtask 做 checkpoint 慢,可导致 CP 对齐时间长,严重时会导致 CP 超时。
  • 数据倾斜导致某个 subtask 处理不及时。现象为单个算子少数几个 subtask 做 checkpoint 慢,导致 CP 对齐时间长。严重时会导致 CP 超时。
  • 大作业(并行度搞)频繁做 CP,会频繁上传小文件,导致 HDFS 集群小文件过多。

常用解决措施:调大托管内存大小。

三、参考文档:

  • Flink State 官方文档:Flink 状态与容错
  • https://cloud.tencent.com/developer/article/1403939
  • https://www.modb.pro/db/81206

作者:京东物流 吴云涛

来源:京东云开发者社区 自猿其说Tech 转载请注明来源

相关文章:

  • 区块链媒体宣发:揭示优势与趋势,引领信息传播新时代
  • 操作系统笔记——储存系统、文件系统(王道408)
  • 智能优化算法应用:基于缎蓝园丁鸟算法3D无线传感器网络(WSN)覆盖优化 - 附代码
  • 强化学习(二)——Dueling Network(DQN改进)
  • 公务员常考应用文公文格式模板
  • 定时任务特辑 | Quartz、xxl-job、elastic-job、Cron四个定时任务框架对比,和Spring Boot集成实战
  • 定时补偿方案
  • C#图像处理OpenCV开发指南(CVStar,09)——边缘识别之Scharr算法的实例代码
  • Python----内置模块
  • php实现个性化域名(短网址)和个性化登录模版的解决方案
  • 2023-12-05 Qt学习总结3
  • LV.13 D2 开发板启动流程 学习笔记
  • Diffusion Models: A Comprehensive Survey of Methods and Applications
  • mybatis多表映射-对多关联
  • AttributeError: module ‘lib‘ has no attribute ‘X509_V_FLAG_CB_ISSUER_CHECK‘解决方案
  • 【5+】跨webview多页面 触发事件(二)
  • Android框架之Volley
  • CentOS7简单部署NFS
  • Java教程_软件开发基础
  • MYSQL 的 IF 函数
  • Sublime Text 2/3 绑定Eclipse快捷键
  • 好的网址,关于.net 4.0 ,vs 2010
  • 欢迎参加第二届中国游戏开发者大会
  • 如何胜任知名企业的商业数据分析师?
  • 我这样减少了26.5M Java内存!
  • RDS-Mysql 物理备份恢复到本地数据库上
  • ​【已解决】npm install​卡主不动的情况
  • ​TypeScript都不会用,也敢说会前端?
  • ​草莓熊python turtle绘图代码(玫瑰花版)附源代码
  • ​一文看懂数据清洗:缺失值、异常值和重复值的处理
  • # 利刃出鞘_Tomcat 核心原理解析(八)-- Tomcat 集群
  • # 数仓建模:如何构建主题宽表模型?
  • #鸿蒙生态创新中心#揭幕仪式在深圳湾科技生态园举行
  • #我与Java虚拟机的故事#连载03:面试过的百度,滴滴,快手都问了这些问题
  • (160)时序收敛--->(10)时序收敛十
  • (42)STM32——LCD显示屏实验笔记
  • (k8s)Kubernetes本地存储接入
  • (windows2012共享文件夹和防火墙设置
  • (二十一)devops持续集成开发——使用jenkins的Docker Pipeline插件完成docker项目的pipeline流水线发布
  • (分布式缓存)Redis持久化
  • (附源码)SSM环卫人员管理平台 计算机毕设36412
  • (附源码)计算机毕业设计ssm电影分享网站
  • (附源码)计算机毕业设计SSM疫情下的学生出入管理系统
  • (三)c52学习之旅-点亮LED灯
  • (原創) X61用戶,小心你的上蓋!! (NB) (ThinkPad) (X61)
  • (转)EXC_BREAKPOINT僵尸错误
  • (转)IOS中获取各种文件的目录路径的方法
  • (转)利用ant在Mac 下自动化打包签名Android程序
  • (轉貼)《OOD启思录》:61条面向对象设计的经验原则 (OO)
  • .NET C# 使用 SetWindowsHookEx 监听鼠标或键盘消息以及此方法的坑
  • .NET 线程 Thread 进程 Process、线程池 pool、Invoke、begininvoke、异步回调
  • .NET编程——利用C#调用海康机器人工业相机SDK实现回调取图与软触发取图【含免费源码】
  • .NET开源的一个小而快并且功能强大的 Windows 动态桌面软件 - DreamScene2
  • .secret勒索病毒数据恢复|金蝶、用友、管家婆、OA、速达、ERP等软件数据库恢复
  • [ vulhub漏洞复现篇 ] ECShop 2.x / 3.x SQL注入/远程执行代码漏洞 xianzhi-2017-02-82239600