当前位置: 首页 > news >正文

Hive/Spark小文件解决方案(企业级实战)–参数和SQL优化

重点是后面的参数优化

一、小文件的定义

在Hadoop的上下文中,小文件的定义是相对于Hadoop分布式文件系统(HDFS)的块(Block)大小而言的。HDFS是Hadoop生态系统中的核心组件之一,它设计用于存储和处理大规模数据集。在HDFS中,数据被分割成多个块,每个块的大小是固定的,这个大小在Hadoop的不同版本和配置中可能有所不同,但常见的默认块大小包括128MB、256MB等。

基于这个背景,我们可以将小文件定义为那些其大小明显小于HDFS块大小的文件。具体来说,如果一个文件的大小远小于HDFS的块大小(比如,文件大小只有几KB、几MB,而HDFS的块大小是128MB或更大),那么这个文件就可以被认为是小文件。

需要注意的是,小文件的定义并不是绝对的,它取决于HDFS的块大小配置以及具体的应用场景。在一些场景下,即使文件大小接近块大小的一半,也可能被视为小文件,特别是当这些文件数量众多时,它们会对HDFS的性能和可扩展性产生显著影响。

小文件问题在Hadoop中是一个常见的挑战,因为它们会显著增加NameNode的内存负担(因为每个文件都需要在NameNode中存储元数据),同时也会影响MapReduce等作业的性能(因为小文件会增加作业启动时间和增加任务调度的复杂性)。因此,在Hadoop应用中,通常需要采取一些策略来管理和优化小文件,如合并小文件、使用归档技术等。

二、小文件产生过多的原因

  1. 写操作不当: 如果在写数据时,设置的分区策略不当,或者没有指定合适的压缩策略,则可能产生大量小文件。

  2. 数据倾斜: 如果spark任务处理的数据,某一个分区的数据量远远大于其他分区时,可能会导致该分区产生大量小文件。

    简单来说就是

    1.由于分区键太多导致的小文件问题
    2.由于写入文件太多导致的小文件问题

三、小文件过多的影响

1. 存储空间浪费
  • 块空间未充分利用: HDFS是为存储大文件而设计的,每个文件无论大小都会占用一个或多个完整的块(Block)。小文件通常只占用块的一小部分空间,导致块内剩余空间被浪费。
  • 元数据开销: 每个文件在HDFS中都需要存储相应的元数据,包括文件名、权限、时间戳等信息。小文件数量多,意味着需要存储的元数据数量也相应增加,从而占用更多的NameNode内存空间。
2. NameNode内存压力增大
  • 内存资源紧张: NameNode负责维护整个文件系统的命名空间,包括文件名、目录结构和文件属性等。随着小文件数量的增加,NameNode需要存储和管理的元数据量急剧上升,可能导致NameNode内存资源紧张,影响系统的稳定性和可扩展性。
  • 性能下降: NameNode在处理大量小文件的元数据请求时,可能会因为内存资源不足而导致性能下降,进而影响整个Hadoop集群的响应速度和吞吐量。
3. 数据访问效率低下
  • 启动时间延长: 在MapReduce等计算框架中,每个小文件都可能被当作一个独立的输入分片来处理。因此,处理大量小文件会显著增加作业的启动时间,因为系统需要为每个小文件分配资源并启动相应的任务。
  • 寻址开销增加: HDFS中的数据是分布式存储的,访问小文件时可能需要跨多个节点进行寻址和传输数据,这会增加额外的网络开销和磁盘I/O操作,从而降低数据访问的效率。
  • **增加文件读取时间: ** 当HDFS中某个表包含大量小文件时,访问该表数据需先从NameNode获取元数据,再从DataNode读取数据。这会导致频繁访问,增加IO操作,从而降低读写效率。
4. 任务调度和数据处理复杂性增加
  • 任务分布不均: 在处理MapReduce等作业时,HDFS会根据数据块的位置来进行任务调度。大量小文件会导致数据块分布不均,进而使得任务调度变得复杂和困难。
  • 数据处理效率低下:每个小文件都可能生成一个或多个Map任务,这会增加系统的任务管理开销和上下文切换成本,从而降低数据处理的整体效率。
  • **容易导致task数量过多: ** 在Spark计算过程中,每个小文件通常作为一个独立的分区,Spark会为每个分区启动一个或多个任务。过多的小文件将导致生成过多的任务,可能引起内存溢出的错误。此外,任务的启动与结束也会消耗时间,进一步影响效率。

四、如何查看小文件

查看文件数量
hadoop fs -ls + 文件路径 | wc -l
查看文件大小
hadoop fs -du -h + 文件路径

以上是命令方式,另外还可以使用三方工具、使用Hadoop Web界面等等方式

五、如何解决小文件问题

解决思路
  • 读取前优化: 优化文件存储,比如Hive分区存储,控制每个文件数据量等;
  • 读取时优化: 在读取hive或者文件时优化(重点);
  • 读取后优化: 在读取文件后进行优化。
常用手段

读取前优化: 在文件读取前优化文件,比如Hive分区、分桶、压缩存储,控制每个文件数据量等;
**读取时优化: ** 在读取hive或者文件时优化(支持的文件格式:Parquet, JSON and ORC.)(推荐);
使用:spark.sql.files.maxPartitionBytes(非常有用),设置读取文件时每个分区的最大字节数,从而控制每个Task处理的 数据量。通过减小这个参数的值,可以增加分区的数量,从而增加Task的数量。
使用:spark.sql.files.openCostInBytes,设置打开文件的固定开销,影响文件分区的决策。在计算分区数时,Spark会考虑打开文件的成本。如果文件很小,Spark可能会将多个文件合并到同一个分区中,即使这意味着超过maxPartitionBytes的限制。在处理大量小文件时,调整这个参数可以帮助Spark更合理地分配Task。

读取后优化: 在读取文件后进行优化。

  1. 使用repartition或coalesce
  2. 使用repartitionAndSortWithinPartitions
  3. 使用spark.sql.shuffle.partitions
1、distribute by rand()

distribute by可以确保相同键值的数据分配到相同的分区中,减少数据的移动,提高聚合、连接、排序等操作的效率。

少用动态分区,如果场景下必须使用时,那么记得在SQL语句最后添加上distribute by

假设现在有20个分区,我们可以将dt(分区键)相同的数据放到同一个Reduce处理,这样最多也就产生20个文件,dt相同的数据放到同一个Reduce可以使用DISTRIBUTE BY dt实现

insert overwrite table ads.student_info partition(dt)
select * from dwd.student_info
DISTRIBUTE BY dt;

修改后的SQL运行良好,但出现了新的问题:20个分区目录下每个都只有一个文件。这导致使用计算框架(MR/Spark)读取计算时,Mapper/Task数量受限于文件数量,并发度不高,直接影响了SQL的运行速度。

可以通过DISTRIBUTE BY rand()在Hive中均匀分配数据到Reduce端。此方法利用随机数来划分数据,确保每次数据分配都尽可能随机,从而使得每个Reduce处理的数据量相对均衡,提升作业并发度和执行效率。

如果想要具体最后落地生成多少个文件数,使用 distribute by ceil(rand() * N) 这里的N是指具体最后落地生成多少个文件数,那么最终就是每个分区目录下生成5个 文件大小基本一致的文件。修改的SQL如下

insert overwrite table ads.student_info partition(dt)
select * from dwd.student_info
DISTRIBUTE BY dt , cast(rand() * 5 as int) -- 或者 ceil(rand() * 5);
2、我们可以通过配置以下参数来达到目的
set hive.exec.reducers.bytes.per.reducer=67108864;  --设置每个reducer处理大约64MB数据。
set hive.merge.mapfiles=true;  --在Map任务完成后合并小文件。
set hive.merge.mapredfiles=true; --在Reduce任务完成后合并小文件。
set hive.merge.smallfiles.avgsize=16000000; --当输出文件的平均大小小于该值时,会启动一个独立的MapReduce任务进行文件merge。
set hive.merge.size.per.task=256*1000*1000; --设置当输出文件大小小于这个值时,触发文件合并。

如果用spark3.0以上版本,我们可以通过配置sparksql的partiton大小来达到目的

set spark.sql.adaptive.enabled=true  --开启spark自适应优化
set spark.sql.adaptive.coalescePartitions.enabled=true --Spark 会根据目标大小(由 指定) spark.sql.adaptive.advisoryPartitionSizeInBytes 合并连续的随机分区,以避免过多的小任务。
set spark.sql.adaptive.coalescePartitions.minPartitionNum=5  --合并后的最小随机分区数。如果未设置,则默认值为 Spark 群集的默认并行度。此配置仅在同时启用和 spark.sql.adaptive.coalescePartitions.enabled 启用时 spark.sql.adaptive.enabled 才有效。
set spark.sql.adaptive.coalescePartitions.initialPartitionNum=200 --合并前的初始随机分区数。如果未设置,则等于 spark.sql.shuffle.partitions 。此配置仅在同时启用和 spark.sql.adaptive.coalescePartitions.enabled 启用时 spark.sql.adaptive.enabled 才有效。
set spark.sql.adaptive.advisoryPartitionSizeInBytes=64*1024*1024 --自适应优化期间随机分区的建议大小(以字节为单位)(当为 true 时 spark.sql.adaptive.enabled )。当 Spark 合并小的随机分区或拆分倾斜的随机分区时,它就会生效。

六、总结

  • 静态分区下保证只有一个分区参数如下
SET spark.sql.shuffle.partitions = 1;
  • 动态分区下合并小文件参数如下
set hive.merge.mapfiles = true:在只有map的作业结束时合并小文件
set hive.merge.mapredfiles = true:在Map-Reduce的任务结束时合并小文件,默认为False;
set hive.merge.size.per.task = 256000000; 合并后每个文件的大小,默认256000000
set hive.merge.smallfiles.avgsize=256000000; 当输出文件的平均大小小于该值时并且(mapfiles和mapredfiles为true),hive将会启动一个独立的map-reduce任务进行输出文件的merge。
set hive.merge.orcfile.stripe.level=false; 当这个参数设置为true,orc文件进行stripe Level级别的合并,当设置为false,orc文件进行文件级别的合并。
建议参数:
建议将参数avgsize 设置大点进而减少小文件数量.
set hive.merge.smallfiles.avgsize=256000000;-- Hive优化–自动合并输出小文件的参数推荐:
set hive.merge.mapfiles = true;
set hive.merge.mapredfiles = true;
set hive.merge.size.per.task = 256000000;
set hive.merge.smallfiles.avgsize=256000000;
set hive.merge.orcfile.stripe.level=false;

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Redis在Spring Boot中的应用详细讲解和案例示范
  • C语言实现经典排序算法
  • Go语言的前世今生与未来展望
  • 解析 Agent 在国外智慧金融领域的一个落地场景:智能顾投
  • Burp Suite Professional 2024.8 for macOS x64 ARM64 - 领先的 Web 渗透测试软件
  • java.lang.NoSuchMethodException:方法不存在异常。当访问某个类的不存在的方法时抛出该异常
  • 读取xml的内容并显示在textEdit中,导出xml文件
  • 程序设计—基于网络爬虫的股票价格分析系统 项目源码27486
  • axios发送post请求实例
  • Python基本语法知识点
  • 高级java每日一道面试题-2024年9月01日-基础篇-事物的隔离级别?
  • 【AI大模型】基于docker部署向量数据库Milvus和可视化工具Attu详解步骤
  • 【ArcGIS/GeoScenePro】Portal和Server关系
  • Https 加密原理
  • hyperf json-rpc
  • [ 一起学React系列 -- 8 ] React中的文件上传
  • Js基础知识(一) - 变量
  • leetcode386. Lexicographical Numbers
  • Linux各目录及每个目录的详细介绍
  • mysql 5.6 原生Online DDL解析
  • quasar-framework cnodejs社区
  • 番外篇1:在Windows环境下安装JDK
  • 事件委托的小应用
  • 一起来学SpringBoot | 第十篇:使用Spring Cache集成Redis
  • 一些css基础学习笔记
  • 硬币翻转问题,区间操作
  • 新海诚画集[秒速5センチメートル:樱花抄·春]
  • ​猴子吃桃问题:每天都吃了前一天剩下的一半多一个。
  • ​力扣解法汇总1802. 有界数组中指定下标处的最大值
  • #define、const、typedef的差别
  • #我与Java虚拟机的故事#连载02:“小蓝”陪伴的日日夜夜
  • #中国IT界的第一本漂流日记 传递IT正能量# 【分享得“IT漂友”勋章】
  • $.ajax中的eval及dataType
  • (6) 深入探索Python-Pandas库的核心数据结构:DataFrame全面解析
  • (BFS)hdoj2377-Bus Pass
  • (Forward) Music Player: From UI Proposal to Code
  • (安全基本功)磁盘MBR,分区表,活动分区,引导扇区。。。详解与区别
  • (附源码)SSM环卫人员管理平台 计算机毕设36412
  • (利用IDEA+Maven)定制属于自己的jar包
  • (每日持续更新)信息系统项目管理(第四版)(高级项目管理)考试重点整理 第13章 项目资源管理(七)
  • (四)事件系统
  • (转) RFS+AutoItLibrary测试web对话框
  • (转)http-server应用
  • (转)一些感悟
  • ***监测系统的构建(chkrootkit )
  • ***检测工具之RKHunter AIDE
  • .net core 管理用户机密
  • .net 按比例显示图片的缩略图
  • .NET 动态调用WebService + WSE + UsernameToken
  • .Net 路由处理厉害了
  • .net/c# memcached 获取所有缓存键(keys)
  • .NET大文件上传知识整理
  • .NET技术成长路线架构图
  • .net经典笔试题
  • .net与java建立WebService再互相调用