当前位置: 首页 > news >正文

Flink状态State | 大数据技术

简单说两句

✨ 正在努力的小叮当~
💖 超级爱分享,分享各种有趣干货!
👩‍💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~

作者:小叮当撩代码CSDN后端领域新星创作者 |阿里云专家博主

CSDN个人主页:小叮当撩代码

🔎GZH哆啦A梦撩代码

🎉欢迎关注🔎点赞👍收藏⭐️留言📝

Flink状态

image-20240602200907231

Flink中的State

image-20240602192616430

State概念

在 Flink 中,状态是流处理程序中非常重要的一部分,它允许你保存和访问数据,以实现复杂的计算逻辑。

可以简单理解为: 历史计算结果

Flink中的算子任务的State分类通常分为两类

1️⃣ 有状态

有状态需要考虑历史的数据,相同的输入可能会得到不同的输出

比如:sum/reduce/maxBy, 对单词按照key分组聚合,进来一个(hello,1),得到(hello,1), 再进来一个(hello,1), 得到的结果为(hello,2)

2️⃣ 无状态

无状态简单说就是不需要考虑历史的数据,相同的输入得到相同的结果

比如map、filter、flatmap算子都属于无状态,不需要依赖其他数据

Flink默认已经支持了无状态和有状态计算!

状态分类

Flink中有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)

Managed State是由Flink管理的,Flink帮忙存储、恢复和优化

Raw State是开发者自己管理的,需要自己序列化

❇️通常情况下,我们采用托管状态来实现我们的需求!!!

托管状态

​ Flink 中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以Flink 能管理的状态在并行任务间是无法共享的每个状态只能针对当前子任务的实例有效

​ 很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。

🎨所以:我们又可以将托管状态分为两类:算子状态按键分区状态

键控状态Keyed State

详细内容可以瞅瞅官网:https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/fault-tolerance/state/

Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。

需要注意的是键控状态只能在 KeyedStream 上进行使用,可以通过 stream.keyBy(…) 来得到 KeyedStream 。

img

Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State):

ValueState:存储单值类型的状态。可以使用 update(T) 进行更新,并通过 T value() 进行检索。

ListState:存储列表类型的状态。可以使用 add(T) 或 addAll(List) 添加元素;并通过 get() 获得整个列表。

ReducingState:用于存储经过 ReduceFunction 计算后的结果,使用 add(T) 增加元素。

AggregatingState:用于存储经过 AggregatingState 计算后的结果,使用 add(IN) 添加元素。

FoldingState:已被标识为废弃,会在未来版本中移除,官方推荐使用 AggregatingState 代替。

MapState:维护 Map 类型的状态。


Code实操

例子1

使用KeyState中的ValueState来模拟实现maxBy

代码清单


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author tiancx*/
public class StateMaxByDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//加载数据DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("北京", 1),Tuple2.of("上海", 2),Tuple2.of("广州", 3),Tuple2.of("北京", 4),Tuple2.of("上海", 5),Tuple2.of("广州", 6),Tuple2.of("北京", 3)).keyBy(t -> t.f0);source.map(new RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {//定义状态,用于存储最大值ValueState<Integer> maxValueState = null;//进行初始化@Overridepublic void open(Configuration parameters) throws Exception {//创建状态描述器ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("maxValueState", Integer.class);maxValueState = getRuntimeContext().getState(descriptor);}@Overridepublic Tuple3<String, Integer, Integer> map(Tuple2<String, Integer> value) throws Exception {//获取当前值Integer currentVal = value.f1;Integer currentMax = maxValueState.value();if (currentMax == null || currentVal > currentMax) {maxValueState.update(currentVal);}return Tuple3.of(value.f0, value.f1, maxValueState.value());}}).print();env.execute();}
}

运行看结果

5c1eb573f51d5a9cec2032e503b0dee3

例子2

如果一个人的体温超过阈值38度,超过3次及以上,则输出: 姓名 [温度1,温度2,温度3]

代码清单


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.List;/*** @author tiancx*/
public class StateDemo01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);DataStream<Tuple2<String, Integer>> source = stream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(" ");return Tuple2.of(split[0], Integer.parseInt(split[1]));}}).keyBy(t -> t.f0);source.flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, List<Integer>>>() {ListState<Integer> listState = null;//存放超过38度的次数ValueState<Integer> valueState = null;@Overridepublic void open(Configuration parameters) throws Exception {ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<Integer>("listState", Integer.class);ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("valueState", Integer.class);listState = getRuntimeContext().getListState(listStateDescriptor);valueState = getRuntimeContext().getState(descriptor);}@Overridepublic void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, List<Integer>>> out) throws Exception {System.out.println("进入flatMap");Integer val = value.f1;if (valueState.value() == null) {valueState.update(0);}if (val > 38) {listState.add(val);valueState.update(valueState.value() + 1);}if (valueState.value() >= 3) {List<Integer> list = (List<Integer>) listState.get();out.collect(Tuple2.of(value.f0, list));listState.clear();valueState.clear();}}}).print();env.execute();}
}

输入

image-20240602100424957

运行结果

image-20240602100441746

算子状态OperatorState

​ 算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务,就会访问到同一个 Operator State。

​ 算 子 状 态 也 支 持 不 同 的 结 构 类 型 , 主 要 有 三 种 : ListState 、 UnionListState 和BroadcastState。


code实操

例子1:

在 map 算子中计算数据的个数

代码清单


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author tiancx*/
public class OperatorListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);stream.map(new MyCountMapFunction()).print();env.execute();}public static class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction {private Long count = 0L;private ListState<Long> listState;@Overridepublic Long map(String value) throws Exception {return ++count;}/*** 本地变量持久化:将 本地变量拷贝到算子状态中,开启checkpoint 时才会调用 snapshotState 方法** @param context the context for drawing a snapshot of the operator* @throws Exception*/@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println("MyCountMapFunction.snapshotState");listState.clear();listState.add(count);}/*** 初始化本地变量:程序启动和恢复时,从状态中把数据添加到本地变量,每个子任务调用一次** @param context the context for initializing the operator* @throws Exception*/@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("MyCountMapFunction.initializeState");//从上下文初始化状态listState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("listState", Types.LONG()));//从算子状态中把数据拷贝到本地变量if (context.isRestored()) {for (Long aLong : listState.get()) {count += aLong;}}}}
}

输入

image-20240602110341359

运行结果

image-20240602110403448

【都看到这了,点点赞点点关注呗,爱你们】😚😚

蓝白色微信公众号大学生校园清新简单纸飞机动态引导关注简洁新媒体分享中文动态引导关注

💬

✨ 正在努力的小叮当~
💖 超级爱分享,分享各种有趣干货!
👩‍💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~

作者:小叮当撩代码CSDN后端领域新星创作者 |阿里云专家博主

CSDN个人主页:小叮当撩代码

🔎GZH哆啦A梦撩代码

🎉欢迎关注🔎点赞👍收藏⭐️留言📝

相关文章:

  • elementUI - 折叠以及多选的组件
  • Java1.8+ idea hbuilder+ uniapp、vue上门家政小程序APP源码开发
  • 【Spring Cloud】微服务日志收集系统-ELK+Kafka
  • AndroidFlutter混合开发
  • 一个小时搞定JAVA面向对象(4)——继承
  • LeetCode-239.滑动窗口最大值
  • 用增之Google
  • 24、Linux网络端口
  • 详解redis配置文件
  • SQL常用语句--模糊查询LIKE
  • Android 编译 C 文件报错 fatal error: ‘jni.h‘ file not found
  • 网络安全中攻击溯源方法
  • 对人脸图像进行性别和年龄的判断
  • 结构体指针
  • 【Java高级教程】集合部分
  • [分享]iOS开发 - 实现UITableView Plain SectionView和table不停留一起滑动
  • 《Java编程思想》读书笔记-对象导论
  • 10个确保微服务与容器安全的最佳实践
  • ES2017异步函数现已正式可用
  • gops —— Go 程序诊断分析工具
  • hadoop集群管理系统搭建规划说明
  • Mysql5.6主从复制
  • vue+element后台管理系统,从后端获取路由表,并正常渲染
  • vue-router 实现分析
  • Vue组件定义
  • Webpack入门之遇到的那些坑,系列示例Demo
  • 大整数乘法-表格法
  • 电商搜索引擎的架构设计和性能优化
  • 复杂数据处理
  • 基于Mobx的多页面小程序的全局共享状态管理实践
  • 浅谈Golang中select的用法
  • Redis4.x新特性 -- 萌萌的MEMORY DOCTOR
  • 微龛半导体获数千万Pre-A轮融资,投资方为国中创投 ...
  • (1)安装hadoop之虚拟机准备(配置IP与主机名)
  • (2021|NIPS,扩散,无条件分数估计,条件分数估计)无分类器引导扩散
  • (机器学习-深度学习快速入门)第一章第一节:Python环境和数据分析
  • (四)Android布局类型(线性布局LinearLayout)
  • (原創) 系統分析和系統設計有什麼差別? (OO)
  • (转载)在C#用WM_COPYDATA消息来实现两个进程之间传递数据
  • (轉)JSON.stringify 语法实例讲解
  • .FileZilla的使用和主动模式被动模式介绍
  • .NET Core 版本不支持的问题
  • .NET中的Event与Delegates,从Publisher到Subscriber的衔接!
  • .pub是什么文件_Rust 模块和文件 - 「译」
  • /etc/fstab 只读无法修改的解决办法
  • @RequestMapping用法详解
  • [ 渗透测试面试篇 ] 渗透测试面试题大集合(详解)(十)RCE (远程代码/命令执行漏洞)相关面试题
  • [20150629]简单的加密连接.txt
  • [20181219]script使用小技巧.txt
  • [4.9福建四校联考]
  • [Android] 240204批量生成联系人,短信,通话记录的APK
  • [Android]竖直滑动选择器WheelView的实现
  • [Apio2012]dispatching 左偏树
  • [AutoSar]工程中的cpuload陷阱(三)测试
  • [C#]实现GRPC通讯的服务端和客户端实例