当前位置: 首页 > news >正文

大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink Sink JDBC
  • Flink Sink Kafka

在这里插入图片描述

注意事项

DataSetAPI 和 DataStream API一样有三个部分组成,各部分的作用对应一致,此处不再赘述。

FlinkDataSet

在 Apache Flink 中,DataSet API 是 Flink 批处理的核心接口,它主要用于处理静态数据集。虽然 Flink 的 DataStream API 被广泛用于流式数据处理,但 DataSet API 适用于大规模批处理场景,如数据清洗、ETL、分析等。虽然近年来 Flink 更多地向流处理方向发展,但批处理仍然是数据处理中的一个重要场景。

DataSource

对DataSet批处理而言,较为频繁的操作是读取HDFS中的文件数据,因为这里主要介绍两个 DataSource 组件:

  • 基于集合:fromCollection 主要是为了方便测试
  • 基于文件:readTextFile,基于HDFS中的数据进行计算分析

基本概念

Flink 的 DataSet API 是一个功能强大的批处理 API,专为处理静态、离线数据集设计。DataSet 中的数据是有限的,处理时系统会先等待整个数据集加载完毕。DataSet 可以通过多种方式创建,例如从文件、数据库、集合等加载数据,然后通过一系列转换操作(如 map、filter、join 等)进行处理。

核心特性

  • 支持丰富的转换操作。
  • 提供多种输入输出数据源。
  • 支持复杂的数据类型,包括基本类型、元组、POJO、列表等。
  • 支持优化计划,例如通过 cost-based optimizer 来优化查询执行计划。

DataSet 创建

在 Flink 中,可以通过多种方式创建 DataSet。以下是常见的数据源:

从本地文件读取

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("path/to/file");

从 CSV 文件读取

DataSet<Tuple3<Integer, String, Double>> csvData = env.readCsvFile("path/to/file.csv").types(Integer.class, String.class, Double.class);

从集合中创建

List<Tuple2<String, Integer>> data = Arrays.asList(new Tuple2<>("Alice", 1),new Tuple2<>("Bob", 2)
);
DataSet<Tuple2<String, Integer>> dataSet = env.fromCollection(data);

从数据库中读取

可以通过自定义的输入格式(如 JDBC 输入格式)从数据库中读取数据,虽然 Flink 本身并没有内置 JDBC 源的批处理 API,但可以通过自定义实现。

DataSet 的转换操作(Transformation)

Flink 的 DataSet API 提供了丰富的转换操作,可以对数据进行各种变换,以下是常用的转换操作:
在这里插入图片描述
在这里插入图片描述

Map

将 DataSet 中的每一条记录进行映射操作,生成新的 DataSet。

DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
DataSet<Integer> squaredNumbers = numbers.map(n -> n * n);

Filter

过滤掉不满足条件的记录。

DataSet<Integer> evenNumbers = numbers.filter(n -> n % 2 == 0);

FlatMap

类似于 map,但允许一条记录生成多条输出记录。

DataSet<String> lines = env.fromElements("hello world", "flink is great");
DataSet<String> words = lines.flatMap((line, collector) -> {for (String word : line.split(" ")) {collector.collect(word);}
});

Reduce

将数据集根据某种聚合逻辑进行合并

DataSet<Integer> sum = numbers.reduce((n1, n2) -> n1 + n2);

GroupBy 和 Reduce

对数据集进行分组,然后在每个组上执行聚合操作

DataSet<Tuple2<String, Integer>> wordCounts = words.map(word -> new Tuple2<>(word, 1)).groupBy(0).reduce((t1, t2) -> new Tuple2<>(t1.f0, t1.f1 + t2.f1));

Join

类似于 SQL 中的连接操作,连接两个 DataSet。

DataSet<Tuple2<Integer, String>> persons = env.fromElements(new Tuple2<>(1, "Alice"),new Tuple2<>(2, "Bob")
);
DataSet<Tuple2<Integer, String>> cities = env.fromElements(new Tuple2<>(1, "Berlin"),new Tuple2<>(2, "Paris")
);
DataSet<Tuple2<String, String>> personWithCities = persons.join(cities).where(0).equalTo(0).with((p, c) -> new Tuple2<>(p.f1, c.f1));

DataSet 输出

DataSet API 提供多种方式将数据写出到外部系统:

写入文件

wordCounts.writeAsCsv("output/wordcounts.csv", "\n", ",");

写入数据库

虽然 DataSet API 没有直接提供 JDBC Sink,可以通过自定义 Sink 实现写入数据库功能。

打印控制台

wordCounts.print();

批处理的优化

DataSet API 提供了优化机制,通过成本模型和执行计划的分析来优化任务执行。在 Flink 内部,编译器会根据任务定义的转换操作生成一个优化的执行计划,这个过程类似于 SQL 查询优化器的工作原理。

  • DataSet 的分区:Flink 可以根据数据集的分区进行优化。例如,通过 partitionByHash 或 partitionByRange 来手动控制数据的分布方式。
  • DataSet 的缓存:可以通过 rebalance()、hashPartition() 等方法来均衡数据负载,以提高并行度和计算效率。

DataSet API 的容错机制

Flink 的 DataSet API 提供了容错机制,支持在发生故障时重新执行失败的任务。虽然 DataSet API 没有像 DataStream 那样依赖于 Checkpoint 机制,但其批处理特性允许任务从头开始重新执行,确保数据处理的正确性。

DataSet 与 DataStream 的对比

DataSet API 与 DataStream API 之间有一些重要的区别:

请添加图片描述

DataSet API 的未来

需要注意的是,Flink 的官方路线图中已经不再优先开发 DataSet API 的新特性,未来的主要开发将集中在 DataStream API,甚至批处理功能都将通过 DataStream API 来实现。
因此,如果可能,建议新项目尽量使用 DataStream API 来替代 DataSet API。
特别是 Flink 的 Table API 和 SQL API 也适用于批处理和流处理,这些高层 API 提供了更简洁的语法和更强的优化能力。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 在BrowserStack上进行自动化爬虫测试的终极指南
  • Java项目: 基于SpringBoot+mysql高校心理教育辅导管理系统分前后台(含源码+数据库+开题报告+毕业论文)
  • 关于edge浏览器登陆CSDN安全验证不跳出验证码
  • Linux--基础知识点--0--面试
  • C++11(1)
  • 力扣刷题(4)
  • 日系编曲:日系架子鼓写作思路 支点音符 幽灵音 抢拍(重音移位)半速与倍速
  • Kotlin 范型之协变、逆变、不变
  • Springboot-文件的上传和下载
  • 解析 MySQL 数据库的 Python 接口:`mysqlclient` 与 `django-mysql` 实战指南20240904
  • ArcGIS展线/投线教程
  • idea 编译断点运行 tomcat 10.1.28 源码
  • redis的一些重要的基础知识
  • onvif应用--IPC鉴权(认证)
  • Linux起源
  • CentOS学习笔记 - 12. Nginx搭建Centos7.5远程repo
  • eclipse的离线汉化
  • input实现文字超出省略号功能
  • JSDuck 与 AngularJS 融合技巧
  • mac修复ab及siege安装
  • OpenStack安装流程(juno版)- 添加网络服务(neutron)- controller节点
  • Python打包系统简单入门
  • 当SetTimeout遇到了字符串
  • 机器学习 vs. 深度学习
  • 开放才能进步!Angular和Wijmo一起走过的日子
  • 浏览器缓存机制分析
  • 码农张的Bug人生 - 见面之礼
  • 前端每日实战 2018 年 7 月份项目汇总(共 29 个项目)
  • 使用common-codec进行md5加密
  • 王永庆:技术创新改变教育未来
  • 怎么将电脑中的声音录制成WAV格式
  • 阿里云服务器购买完整流程
  • ​用户画像从0到100的构建思路
  • ${ }的特别功能
  • (ros//EnvironmentVariables)ros环境变量
  • (附源码)springboot码头作业管理系统 毕业设计 341654
  • (附源码)计算机毕业设计SSM基于健身房管理系统
  • (附源码)流浪动物保护平台的设计与实现 毕业设计 161154
  • (论文阅读31/100)Stacked hourglass networks for human pose estimation
  • (切换多语言)vantUI+vue-i18n进行国际化配置及新增没有的语言包
  • (四)七种元启发算法(DBO、LO、SWO、COA、LSO、KOA、GRO)求解无人机路径规划MATLAB
  • (已更新)关于Visual Studio 2019安装时VS installer无法下载文件,进度条为0,显示网络有问题的解决办法
  • .form文件_SSM框架文件上传篇
  • .NET 6 在已知拓扑路径的情况下使用 Dijkstra,A*算法搜索最短路径
  • .Net Core 生成管理员权限的应用程序
  • .Net Core 中间件验签
  • .NET 读取 JSON格式的数据
  • .Net 高效开发之不可错过的实用工具
  • .NET 直连SAP HANA数据库
  • .net(C#)中String.Format如何使用
  • .NET+WPF 桌面快速启动工具 GeekDesk
  • .net访问oracle数据库性能问题
  • .net开发日常笔记(持续更新)
  • .NET企业级应用架构设计系列之结尾篇
  • .net图片验证码生成、点击刷新及验证输入是否正确