当前位置: 首页 > news >正文

大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节完成了如下的内容:

  • FlinkDataStreamAPI 自定义输入源
  • 非并行源介绍与代码
  • 并行源介绍与代码

在这里插入图片描述

Rich并行源

基本介绍

在 Apache Flink 中,RichSourceFunction 是一种增强的源函数(Source Function),它允许开发者在定义源操作时,能够访问 Flink 的生命周期方法、状态管理、配置访问等更多功能。RichSourceFunction 是并行源的一个扩展,它继承自 RichFunction 接口,而 RichFunction 提供了更丰富的功能,比如访问运行时上下文、管理状态、以及在作业开始和结束时执行初始化或清理操作。

主要特点

  • 生命周期方法:RichSourceFunction 提供了 open() 和 close() 方法,分别在作业开始时和结束时调用。这允许你在数据读取前进行初始化操作(如打开连接、加载配置),以及在作业结束时进行清理工作(如关闭连接、释放资源)。
  • 访问运行时上下文:通过 getRuntimeContext() 方法,RichSourceFunction 可以访问 Flink 的运行时上下文,获取并行度信息、任务名称、指标管理器,以及与状态相关的操作。
  • 状态管理:RichSourceFunction 可以结合 Flink 的状态管理机制,保存和恢复状态。这对于需要在流处理中维护中间状态的源函数非常有用,尤其是在故障恢复时,状态可以帮助恢复到故障前的状态。
  • 并行执行:与普通的 SourceFunction 类似,RichSourceFunction 也可以通过设置并行度来并行执行,这使得它可以处理大规模的数据
    源。

状态管理

RichFunction 与 Flink 的状态管理系统高度集成,允许你在分布式环境中维护和管理操作符的中间状态。Flink 支持两种主要类型的状态:ValueState 和 ListState,以及更复杂的 MapState 和 ReducingState。

  • ValueState: 适用于需要保存单个值的场景,如计数器、标志位等。
  • ListState: 适用于需要保存多个值的场景,如窗口计算中的中间结果。
  • MapState: 适用于需要维护键值对的场景,特别是在进行复杂的数据关联或聚合时。
  • ReducingState: 适用于需要持续聚合数据的场景,比如计数、求和等。

示例代码

以下是一个使用 RichParallelSourceFunction 的简单示例,展示了如何在 Flink 中实现一个并行的、具有生命周期管理的源函数:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;public class MyRichParallelSource extends RichParallelSourceFunction<String> {private volatile boolean isRunning = true;@Overridepublic void open(Configuration parameters) throws Exception {// 在任务开始时执行初始化操作System.out.println("Task " + getRuntimeContext().getTaskName() + " is starting.");}@Overridepublic void run(SourceContext<String> ctx) throws Exception {// 模拟数据流的产生while (isRunning) {synchronized (ctx.getCheckpointLock()) {ctx.collect("Data from task " + getRuntimeContext().getIndexOfThisSubtask());}Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}@Overridepublic void close() throws Exception {// 在任务结束时执行清理操作System.out.println("Task " + getRuntimeContext().getTaskName() + " is closing.");}
}

代码解析

  • open() 方法:在任务开始时调用,适用于进行连接初始化、参数设置等操作。在这个方法中,你可以访问 Flink 的配置和运行时上下文。
  • run() 方法:实现数据源的核心逻辑,这个方法会在源函数启动后被调用。可以使用 ctx.collect() 方法将生成的数据发送到下游处理。
  • cancel() 方法:用于取消任务。当作业被取消或停止时,Flink 会调用这个方法,可以在这里做一些清理工作或者安全地停止数据生成。
  • close() 方法:在任务结束时调用,用于释放资源和进行清理操作。

注意事项

  • 状态一致性:在并行源中,如果需要维护状态,一定要注意状态的一致性和恢复机制,确保在作业恢复时可以正确地恢复数据源的状态。
  • 并行度设置:RichParallelSourceFunction 作为并行源,可以通过 setParallelism 方法设置并行度,确保根据任务的需求合理分配并行实例的数量。

RichParallelSource

package icu.wzk;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;public class RichParallelSourceRich extends RichParallelSourceFunction<String> {private long count = 1L;private boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (running) {count ++;ctx.collect(String.valueOf(count));Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

RichParallelSourceTest

package icu.wzk;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;public class RichParallelSourceRichTest {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> data = env.getJavaEnv().addSource(new RichParallelSourceRich());data.print();env.execute("RichParallelSourceRichTest");}}

运行结果

3> 10
5> 10
8> 10
6> 10
2> 10
4> 10
7> 10
1> 10
6> 11
5> 11
8> 11
2> 11
3> 11
4> 11
7> 11
1> 11
2> 12
3> 12
...

控制台输出结果如下所示:
在这里插入图片描述

为什么 Rich 类使用广泛

  • 生命周期管理:Rich 类提供了 open() 和 close() 方法,允许开发者在任务开始和结束时执行初始化和清理操作。这对于需要设置资源(如数据库连接、文件读写、外部服务连接)的操作非常有用。
  • 运行时上下文访问:通过 getRuntimeContext(),Rich 类可以访问任务的并行度信息、任务名称、子任务索引、状态管理等。对于需要根据任务上下文调整行为或需要跨并行实例共享状态的场景,这些信息是至关重要的。
  • 状态管理:RichFunction 可以方便地与 Flink 的状态管理结合使用。在状态丰富的应用场景(如需要维护中间计算结果、计数器、缓存等)的流处理中,Rich 类显得非常有用。
  • 性能监控:Rich 类允许开发者在 open() 方法中注册 Flink 的度量指标(Metrics),帮助监控和优化作业的性能。

什么时候不用 Rich 类

  • 简单操作:如果你只是需要进行简单的转换或过滤操作,没有复杂的初始化、状态管理或清理需求,那么 Rich 类的额外功能可能并不必要。
  • 高性能要求的场景:在一些对性能要求极高的场景中,尽量减少复杂的操作和额外的上下文访问,直接使用轻量级的 MapFunction、FilterFunction 等可能会有更好的性能表现。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 国际化产品经理的挑战与机遇:跨文化产品管理的探索
  • 大数据新视界--大数据大厂之MySQL 数据库课程设计:数据安全深度剖析与未来展望
  • CentOS全面停服,国产化提速,央国企信创即时通讯/协同门户如何选型?
  • 开源模型应用落地-LangChain高阶-记忆组件-ConversationTokenBufferMemory正确使用(七)
  • 深度学习-OpenCv的运用(4)
  • 群论 (笔记)
  • uniapp常用标签
  • MATLAB 中的 reshape 函数
  • blender中获取虚拟相机渲染图片上每像素对应的纹理上的像素值
  • 3.美食推荐系统(Java项目springboot和vue)
  • linux 硬件 arm架构 汇编语言
  • 步进电机相关
  • erlang学习:用OTP构建系统23.12练习题
  • day-48 分割回文串
  • 1037 计算数列和
  • 「前端早读君006」移动开发必备:那些玩转H5的小技巧
  • 【划重点】MySQL技术内幕:InnoDB存储引擎
  • 10个确保微服务与容器安全的最佳实践
  • exif信息对照
  • flask接收请求并推入栈
  • hadoop入门学习教程--DKHadoop完整安装步骤
  • iOS高仿微信项目、阴影圆角渐变色效果、卡片动画、波浪动画、路由框架等源码...
  • Iterator 和 for...of 循环
  • Java 实战开发之spring、logback配置及chrome开发神器(六)
  • Javascript Math对象和Date对象常用方法详解
  • java中的hashCode
  • js
  • markdown编辑器简评
  • Redis学习笔记 - pipline(流水线、管道)
  • Redis中的lru算法实现
  • Redux 中间件分析
  • Vue UI框架库开发介绍
  • vue数据传递--我有特殊的实现技巧
  • webpack入门学习手记(二)
  • 短视频宝贝=慢?阿里巴巴工程师这样秒开短视频
  • 分享一份非常强势的Android面试题
  • 复杂数据处理
  • 后端_MYSQL
  • 基于组件的设计工作流与界面抽象
  • 简单易用的leetcode开发测试工具(npm)
  • 使用Swoole加速Laravel(正式环境中)
  • 小程序开发中的那些坑
  • ​LeetCode解法汇总307. 区域和检索 - 数组可修改
  • #13 yum、编译安装与sed命令的使用
  • #Java第九次作业--输入输出流和文件操作
  • $forceUpdate()函数
  • (TOJ2804)Even? Odd?
  • (附源码)python房屋租赁管理系统 毕业设计 745613
  • (附源码)ssm智慧社区管理系统 毕业设计 101635
  • (力扣)1314.矩阵区域和
  • (六)Flink 窗口计算
  • (深入.Net平台的软件系统分层开发).第一章.上机练习.20170424
  • (游戏设计草稿) 《外卖员模拟器》 (3D 科幻 角色扮演 开放世界 AI VR)
  • (转) ns2/nam与nam实现相关的文件
  • .naturalWidth 和naturalHeight属性,