当前位置: 首页 > news >正文

Hive小文件处理

MR任务

mr任务参考链接

set hive.exec.reducers.max=3

set hive.exec.dynamic.partition.mode = true; --使用动态分区时,设置为ture。 set hive.exec.dynamic.partition.mode = nonstrict; --动态分区模式,默认值:strict,表示必须指定一个分区为静态分区;nonstrict模式表示允许所有的分区字段都可以使用动态分区。一般需要设置为nonstrict。 set hive.exec.max.dynamic.partitions.pernode =10; --在每个执行MR的节点上,最多可以创建多少个动态分区,默认值:100。 set hive.exec.max.dynamic.partitions =1000; --在所有执行MR的节点上,最多一共可以创建多少个动态分区,默认值:1000。 set hive.exec.max.created.files = 100000; --整个MR Job中最多可以创建多少个HDFS文件,默认值:100000。 set hive.error.on.empty.partition = false; --当有空分区产生时,是否抛出异常,默认值:false。 Hive文件产生大量小文件的原因: 一是文件本身的原因:小文件多,以及文件的大小; 二是使用动态分区,可能会导致产生大量分区,从而产生很多小文件,也会导致产生很多Mapper; 三是Reduce数量较多,Hive SQL输出文件的数量和Reduce的个数是一样的。 小文件带来的影响: 文件的数量和大小决定Mapper任务的数量,小文件越多,Mapper任务越多,每一个Mapper都会启动一个JVM来运行,所以这些任务的初始化和执行会花费大量的资源,严重影响性能。 在NameNode中每个文件大约占150字节,小文件多,会严重影响NameNode性能。 解决小文件问题: 如果动态分区数量不可预测,最好不用。如果用,最好使用distributed by分区字段,这样会对字段进行一个hash操作,把相同的分区给同一个Reduce处理; 减少Reduce数量; 进行以一些参数调整。

Hdfs文件数

指定目录下的文件夹,文件,容量大小
[root@mz-hadoop-01 ~]# hdfs dfs -count  /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed568         7433         6065483664 /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed[root@mz-hadoop-01 ~]# hdfs dfs -count -h /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed568        7.3 K              5.6 G /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed

Hive文件数

SELECT tbl_id,SUM(PARAM_VALUE) AS file_cnts
FROM
(
SELECT * FROM PARTITIONS WHERE tbl_id='97387'
) aLEFT JOIN (SELECT * FROM partition_params WHERE PARAM_KEY='numFiles' ) b
ON a.part_id=b.part_idGROUP BY tbl_id
ORDER BY file_cnts DESC;TBL_ID  file_cnts  
------  -----------97387         2082

所有文件数

SELECT SUM(PARAM_VALUE) AS file_cnts
FROM
(
SELECT * FROM PARTITIONS
) aLEFT JOIN (SELECT * FROM partition_params WHERE PARAM_KEY='numFiles' ) b
ON a.part_id=b.part_idfile_cnts  
-----------340323

表文件数topN

SELECT e.*,f.*
FROM 
(SELECT c.*,d.db_id,d.tbl_name
FROM
(
SELECT tbl_id,SUM(PARAM_VALUE) AS file_cnts
FROM
(
SELECT * FROM PARTITIONS
) aLEFT JOIN (SELECT * FROM partition_params WHERE PARAM_KEY='numFiles' ) b
ON a.part_id=b.part_idGROUP BY tbl_id
ORDER BY file_cnts DESC
) cLEFT JOIN (SELECT * FROM tbls
) d
ON c.tbl_id=d.tbl_id) e LEFT JOIN
(SELECT db_id AS db_id2,`desc`,DB_LOCATION_URI,NAME as db_name,OWNER_NAME,OWNER_TYPE FROM dbs
)f ON e.db_id=f.DB_ID2

库文件数topN

select 
db_id,db_name,DB_LOCATION_URI,sum(file_cnts) as file_cnts
from (SELECT e.*,f.*
FROM 
(SELECT c.*,d.db_id,d.tbl_name
FROM
(
SELECT tbl_id,SUM(PARAM_VALUE) AS file_cnts
FROM
(
SELECT * FROM PARTITIONS
) aLEFT JOIN (SELECT * FROM partition_params WHERE PARAM_KEY='numFiles' ) b
ON a.part_id=b.part_idGROUP BY tbl_id
ORDER BY file_cnts DESC
) cLEFT JOIN (SELECT * FROM tbls
) d
ON c.tbl_id=d.tbl_id) e LEFT JOIN
(SELECT db_id AS db_id2,`desc`,DB_LOCATION_URI,NAME as db_name,OWNER_NAME,OWNER_TYPE FROM dbs
)f ON e.db_id=f.DB_ID2)g group by db_id,db_name,DB_LOCATION_URI order by file_cnts desc

小文件压缩任务

package com.mingzhi.common.universalimport com.mingzhi.common.interf.{IDate, MySaveMode}
import com.mingzhi.common.utils.{HiveUtil, SinkUtil, SparkUtils, TableUtils}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel/*** 处理只有一个分区dt的表*/
object table_compress_process {private var hive_dbs: String = "paascloud"private var hive_tables: String = "dwd_order_info_abi"private var dt: String = "2023-06-30"private var dt1: String = "2023-06-30"def main(args: Array[String]): Unit = {System.setProperty("HADOOP_USER_NAME", "root")val builder = SparkUtils.getBuilderif (System.getProperties.getProperty("os.name").contains("Windows")) {builder.master("local[*]")} else {hive_dbs = args(0)hive_tables = args(1)dt = args(2)dt1 = args(3)}val spark: SparkSession = builder.appName("clean_process").getOrCreate()HiveUtil.openDynamicPartition(spark)spark.sql("set spark.sql.shuffle.partitions=1")if ("all".equalsIgnoreCase(hive_dbs)) {val builder = new StringBuilder()val frame_db = spark.sql("show databases").select("databaseName")frame_db.show(false)frame_db.collect().foreach(db => {builder.append(db.toString().replace("[", "").replace("]", ","))})println("dbs:" + builder.toString())hive_dbs = builder.toString()}hive_dbs.split(",").foreach(db => {if (StringUtils.isNotBlank(db)) {if ("all".equalsIgnoreCase(hive_tables)) {compress_all_table(spark, db)} else {hive_tables.split(",").foreach(t => {compress_the_table(spark, db, t)})}}})spark.stop()}private def compress_the_table(spark: SparkSession, hive_db: String, table: String): Unit = {println("compress_the_table======>:" + hive_db + "." + table)spark.sql(s"use $hive_db")if (TableUtils.tableExists(spark, hive_db, table)) {try {new IDate {override def onDate(dt: String): Unit = {/*** 建议:对需要checkpoint的RDD,先执行persist(StorageLevel.DISK_ONLY)*/val f1 = spark.sql(s"""||select * from $hive_db.$table where dt='$dt'|""".stripMargin).persist(StorageLevel.MEMORY_ONLY)val r_ck: (DataFrame, String) = SparkUtils.persistDataFrame(spark, f1)val f2 = r_ck._1println("f2 show===>")f2.show(false)val type_ = TableUtils.getCompressType(spark, hive_db, table)if ("HiveFileFormat".equalsIgnoreCase(type_)) {println("sink HiveFileFormat table:" + table)SinkUtil.sink_to_hive_HiveFileFormat(spark, f2, hive_db, table, null)} else {//spark表SinkUtil.sink_to_hive(dt, spark, f2, hive_db, table, type_, MySaveMode.OverWriteByDt, 1)}spark.sql(s"drop table ${r_ck._2} ")}}.invoke(dt, dt1)} catch {case e: org.apache.spark.sql.AnalysisException => {println("exception1:" + e)}case e: Exception => println("exception:" + e)}}}private def compress_all_table(spark: SparkSession, hive_db: String): Unit = {spark.sql(s"use $hive_db")val frame_table = spark.sql(s"show tables")frame_table.show(100, false)frame_table.printSchema()frame_table.filter(r => {!r.getAs[Boolean]("isTemporary")}).select("tableName").collect().foreach(r => {//r:[ads_order_topn]val table = r.toString().replace("[", "").replace("]", "")println("compress table:" + hive_db + "." + table)if (TableUtils.tableExists(spark, hive_db, table)) {try {new IDate {override def onDate(dt: String): Unit = {val f1 = spark.sql(s"""||select * from $hive_db.$table where dt='$dt'|""".stripMargin)SinkUtil.sink_to_hive(dt, spark, f1, hive_db, table, "orc", MySaveMode.OverWriteByDt, 1)}}.invoke(dt, dt1)} catch {case e: org.apache.spark.sql.AnalysisException => {println("exception1:" + e)}case e: Exception => println("exception:" + e)}}})}
}

相关文章:

  • 记一次struct2漏洞获取服务器
  • SystemV
  • Postman接口测试 —— Jenkins实现持续集成构建流程!
  • API之 要求接口上传pdf 以 合同PDF的二进制数据,multpart方式上传
  • MySQL -- DQL
  • 麻雀搜索优化算法MATLAB实现,SSA-BP网络
  • 开发仿抖音APP遇到的问题和解决方案
  • 记华为荣耀手机调试H5
  • 4D毫米波雷达和3D雷达、激光雷达全面对比
  • JVM对象创建与内存分配
  • String类常用方法总结
  • Spring依赖注入之@autowire注解详解
  • 拼多多官方开放平台接口app商品详情接口获取实时商品详情数据演示
  • 数据结构-快速排序“人红是非多”?看我见招拆招
  • uniapp+vue3使用pinia,安卓端报错白屏
  • 【159天】尚学堂高琪Java300集视频精华笔记(128)
  • Android Studio:GIT提交项目到远程仓库
  • CentOS7简单部署NFS
  • CSS 提示工具(Tooltip)
  • ECMAScript入门(七)--Module语法
  • JS基础之数据类型、对象、原型、原型链、继承
  • Netty 4.1 源代码学习:线程模型
  • 读懂package.json -- 依赖管理
  • 工作踩坑系列——https访问遇到“已阻止载入混合活动内容”
  • 开放才能进步!Angular和Wijmo一起走过的日子
  • 使用 QuickBI 搭建酷炫可视化分析
  • 线性表及其算法(java实现)
  • 小程序 setData 学问多
  • 走向全栈之MongoDB的使用
  • CMake 入门1/5:基于阿里云 ECS搭建体验环境
  • 第二十章:异步和文件I/O.(二十三)
  • ​水经微图Web1.5.0版即将上线
  • #NOIP 2014# day.2 T2 寻找道路
  • $(selector).each()和$.each()的区别
  • $.ajax,axios,fetch三种ajax请求的区别
  • (4)通过调用hadoop的java api实现本地文件上传到hadoop文件系统上
  • (草履虫都可以看懂的)PyQt子窗口向主窗口传递参数,主窗口接收子窗口信号、参数。
  • (第61天)多租户架构(CDB/PDB)
  • (附源码)springboot社区居家养老互助服务管理平台 毕业设计 062027
  • (附源码)ssm码农论坛 毕业设计 231126
  • (接口自动化)Python3操作MySQL数据库
  • (转)Android学习笔记 --- android任务栈和启动模式
  • (转)iOS字体
  • (转)菜鸟学数据库(三)——存储过程
  • .naturalWidth 和naturalHeight属性,
  • .NET Core 项目指定SDK版本
  • .NET 使用 XPath 来读写 XML 文件
  • .net项目IIS、VS 附加进程调试
  • @Bean有哪些属性
  • [Android] Implementation vs API dependency
  • [BUUCTF 2018]Online Tool
  • [C\C++]读入优化【技巧】
  • [C++] 多线程编程-thread::yield()-sleep_for()
  • [C语言]——内存函数
  • [lintcode easy]Maximum Subarray