当前位置: 首页 > news >正文

解决Spark流处理产生的小文件问题

做流批一体,湖仓一体的大数据架构,常见的做法就是:

数据源->spark Streaming->ODS(数据湖)->spark streaming->DWD(数据湖)->...

那么数据源->spark Streaming->ODS,以这段为例,在数据源通过spark structured streaming写入ODS在数据湖(Delta Lake)落盘时候必然会产生很多小文件。

图片

1、目的

为了在批处理spark-sql运行更快,也避免因为小文件而导致报错。

2、影响

WARNING: Failed to connect to /172.16.xx.xx:9866 for block, add to deadNodes and continue. java.net.SocketException: 
Too many open files
  1. 小文件在批处理数据IO消耗巨大,程序可能卡死。

  2. 小文件块都有对应的元数据,元数据放在NameNode,导致需要的内存大大增大,增加NameNode压力,这样会限制了集群的扩展。

  3. 在HDFS或者对象储存中,小文件的读写处理速度要远远小于大文件,(寻址耗时)。

3、解决思路

3.1、事前

(1)避免写入时候产生过多小文件 做好分区partitionBy(年,月,日), 避免小文件过于分散 Trigger触发时间可以设置为1分钟,这样会攒一批一写入,避免秒级别写入而产生大量小文件(但是使用spark structured 想要做real-time不能这样,只适合做准实时)

(2)打开自适应框架的开关

spark.sql.adaptive.enabled true

(3)通过spark的coalesce()方法和repartition()方法

val rdd2 = rdd1.coalesce(8, true) //(true表示是否shuffle)
val rdd3 = rdd1.repartition(8)

coalesce:coalesce()方法的作用是返回指定一个新的指定分区的Rdd,如果是生成一个窄依赖的结果,那么可以不发生shuffle,分区的数量发生激烈的变化,计算节点不足,不设置true可能会出错。

repartition:coalesce()方法shuffle为true的情况。

3.2、事后(小文件引起已经产生)

(1)优化 Delta 表的写入,避免小文件产生 在开源版 Spark 中,每个 executor 向 partition 中写入数据时,都会创建一个表文件进行写入,最终会导致一个 partition 中产生很多的小文件。

Databricks 对 Delta 表的写入过程进行了优化,对每个 partition,使用一个专门的 executor 合并其他 executor 对该 partition 的写入,从而避免了小文件的产生。

图片

该特性由表属性 delta.autoOptimize.optimizeWrite 来控制:可以在创建表时指定

CREATE TABLE student (id INT, name STRING)
TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);

也可以修改表属性

ALTER TABLE table_name
SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);

该特性有两个优点:

(1)通过减少被写入的表文件数量,提高写数据的吞吐量;

(2)避免小文件的产生,提升查询性能。

缺点:

其缺点也是显而易见的,由于使用了一个 executor 来合并表文件的写入,从而降低了表文件写入的并行度,此外,多引入的一层 executor 需要对写入的数据进行 shuffle,带来额外的开销。因此,在使用该特性时,需要对场景进行评估。

场景:

该特性适用的场景:频繁使用 MERGE,UPDATE,DELETE,INSERT INTO,CREATE TABLE AS SELECT 等 SQL 语句的场景;

该特性不适用的场景:写入 TB 级以上数据。

(2)自动合并小文件

在流处理场景中,比如流式数据入湖场景下,需要持续的将到达的数据插入到 Delta 表中,每次插入都会创建一个新的表文件用于存储新到达的数据,假设每10s触发一次,那么这样的流处理作业一天产生的表文件数量将达到8640个,且由于流处理作业通常是 long-running 的,运行该流处理作业100天将产生上百万个表文件。这样的 Delta 表,仅元数据的维护就是一个很大的挑战,查询性能更是急剧恶化。

为了解决上述问题,Databricks 提供了小文件自动合并功能,在每次向 Delta 表中写入数据之后,会检查 Delta 表中的表文件数量,如果 Delta 表中的小文件(size < 128MB 的视为小文件)数量达到阈值,则会执行一次小文件合并,将 Delta 表中的小文件合并为一个新的大文件。

该特性由表属性 delta.autoOptimize.autoCompact 控制,和特性 delta.autoOptimize.optimizeWrite 相同,可以在创建表时指定,也可以对已创建的表进行修改。自动合并的阈值由 spark.databricks.delta.autoCompact.minNumFiles 控制,默认为50,即小文件数量达到50会执行表文件合并;

合并后产生的文件最大为128MB,如果需要调整合并后的目标文件大小,可以通过调整配置 spark.databricks.delta.autoCompact.maxFileSize 实现。

(3)手动合并小文件(我常用,每天定时运行合并分区内小文件,再去处理批任务)

自动小文件合并会在对 Delta 表进行写入,且写入后表中小文件达到阈值时被触发。除了自动合并之外,Databricks 还提供了 Optimize 命令使用户可以手动合并小文件,优化表结构,使得表文件的结构更加紧凑。

在实现上 Optimize 使用 bin-packing 算法,该算法不但会合并表中的小文件,且合并后生成的表文件也更均衡(表文件大小相近)。例如,我们要对 Delta 表 student 的表文件进行优化,仅需执行如下命令即可实现:(Optimize 命令不但支持全表小文件的合并,还支持特定的分区的表文件的合并)。

OPTIMIZE student WHERE date >= '2024-01-01'

4、附加

面试官可能会问,我运行optimize合并小文件,但是小文件太多了,直接卡死运行不了程序(某互联网面试题)

(1)首先停掉程序,这里注意deltalake因为有历史版本这个概念,所以不存在运行一半覆盖原来版本情况,可以基于上一个版本重新运行(考点)。

(2)第二点,大数据思想分而治之,“分”,即把复杂的任务分解为若干个“简单的任务”来处理。

OPTIMIZE student WHERE date > '2024-01-01' and date < '2024-01-02'

因为前面做了partitionby(年月日),那么缩小optimize范围,在遍历这个月的每一天日期,分治处理。

(3)第三点,大数据思想,自己不行找兄弟,加节点,加计算资源。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • C语言考试内容
  • LangChain + ChatGLM 实现本地知识库问答
  • 【C++】函数模板和类模版
  • 《精通ChatGPT:从入门到大师的Prompt指南》附录C:专业术语表
  • SpringBoot+Vue实现前后端分离基本的环境搭建
  • 王学岗鸿蒙开发(北向)——————(七、八)ArkUi的各种装饰器
  • Kafka 架构
  • 快速排序(Quick_Sort)
  • python一点通: Async异步函数很好,但是如何有效执行阻塞任务?
  • chatgpt 推荐的一些关于提高认知的书,我先存一下
  • OJ3829大石头的搬运工
  • 定时器更新界面,线程报错
  • Hack The Box(黑客盒子)Redeemer篇
  • C++设计模式-外观模式,游戏引擎管理多个子系统,反汇编
  • STM32F103C8移植uCOSIII并以不同周期点亮两个LED灯(HAL库方式)【uCOS】【STM32开发板】【STM32CubeMX】
  • 《Javascript高级程序设计 (第三版)》第五章 引用类型
  • 《剑指offer》分解让复杂问题更简单
  • 【刷算法】求1+2+3+...+n
  • AngularJS指令开发(1)——参数详解
  • javascript从右向左截取指定位数字符的3种方法
  • javascript数组去重/查找/插入/删除
  • jquery cookie
  • python学习笔记 - ThreadLocal
  • ReactNative开发常用的三方模块
  • Redis学习笔记 - pipline(流水线、管道)
  • 服务器之间,相同帐号,实现免密钥登录
  • 干货 | 以太坊Mist负责人教你建立无服务器应用
  • 关于 Cirru Editor 存储格式
  • 跨域
  • 前端学习笔记之观察者模式
  • HanLP分词命名实体提取详解
  • NLPIR智能语义技术让大数据挖掘更简单
  • !!【OpenCV学习】计算两幅图像的重叠区域
  • #includecmath
  • (52)只出现一次的数字III
  • (70min)字节暑假实习二面(已挂)
  • (html5)在移动端input输入搜索项后 输入法下面为什么不想百度那样出现前往? 而我的出现的是换行...
  • (poj1.3.2)1791(构造法模拟)
  • (数据大屏)(Hadoop)基于SSM框架的学院校友管理系统的设计与实现+文档
  • (转)Google的Objective-C编码规范
  • (转)linux下的时间函数使用
  • *p=a是把a的值赋给p,p=a是把a的地址赋给p。
  • .dat文件写入byte类型数组_用Python从Abaqus导出txt、dat数据
  • .net core 使用js,.net core 使用javascript,在.net core项目中怎么使用javascript
  • .Net Framework 4.x 程序到底运行在哪个 CLR 版本之上
  • .NET Remoting Basic(10)-创建不同宿主的客户端与服务器端
  • .NET 某和OA办公系统全局绕过漏洞分析
  • .NET 项目中发送电子邮件异步处理和错误机制的解决方案
  • .NET 中让 Task 支持带超时的异步等待
  • .NET 中什么样的类是可使用 await 异步等待的?
  • .NET单元测试使用AutoFixture按需填充的方法总结
  • @DateTimeFormat 和 @JsonFormat 注解详解
  • [ Linux 长征路第二篇] 基本指令head,tail,date,cal,find,grep,zip,tar,bc,unname
  • [C/C++]数据结构 堆的详解
  • [codeforces]Recover the String