当前位置: 首页 > news >正文

HIVE 3 使用 MR 引擎多表关联 (JOIN) 导致丢数的问题复现、问题根源及解决方案 (附代码)

概述

本文意图解决 HIVE 3 版本中使用 MR 作为运算引擎进行 JOIN 操作时导致的丢数情况。

问题描述

Apache Hive 在 2.3 版本后宣布放弃维护 MapReduce 作为底层执行引擎,并转而使用 Tez 作为默认的查询引擎。但是由于 Tez 在大作业量和高并发时的严重性能问题,导致许多任务不得不继续使用 MapReduce 进行操作,因此就需要开发者自行维护 Hive 对于 MR 的可用性。

然而,在 Hive 升级至 Hive 3 版本中,继续使用 MapReduce 会导致非常严重的恶性错误。例如,即使进行非常简单的 JOIN 操作,都会导致部分应该被关联上的数据丢失。

本文档意图提供测试场景浮现上述恶性漏洞,并阐述其根本原因,最后对出现问题部分的源代码进行修改,以彻底修复该问题。

问题复现

场景1: 多表 (超过三张表) 时数据丢失

在复现开始之前先对 Hive 的部分参数进行设置:

SET hive.execution.engine=mr;
SET mapred.reduce.tasks=2;
SET hive.auto.convert.join=false;

首先,创建三张表。这三张表除了表名不一样,其他包括列信息甚至数据在内完全相同。

建表语句如下。我们使用文件的形式快速插入,当然为了复现这个问题您也可以手动插入如下数据:

USE default;

create table table_a(id string, name string, addr string) stored as orc;
create table table_b(id string, name string, addr string) stored as orc;
create table table_c(id string, name string, addr string) stored as orc;

LOAD DATA LOCAL INPATH "/home/hadoop/reproduce_hive/Scenario1/table_a_data.orc" INTO TABLE table_a;
LOAD DATA LOCAL INPATH "/home/hadoop/reproduce_hive/Scenario1/table_b_data.orc" INTO TABLE table_b;
LOAD DATA LOCAL INPATH "/home/hadoop/reproduce_hive/Scenario1/table_c_data.orc" INTO TABLE table_c;
  

通过以下语句查看三张表的内容,可以看到其中的数据完全一致。

hive> select * from table_a;
OK
11      a       aaa
22      b       bbb
33      c       ccc
44      d       ddd
55      e       eee
66      f       fff
77      g       ggg
88      h       hhh
99      i       iii
00      j       jjj
Time taken: 0.157 seconds, Fetched: 10 row(s)
hive> select * from table_b;
OK
11      a       aaa
22      b       bbb
33      c       ccc
44      d       ddd
55      e       eee
66      f       fff
77      g       ggg
88      h       hhh
99      i       iii
00      j       jjj
Time taken: 0.471 seconds, Fetched: 10 row(s)
hive> select * from table_c;
OK
11      a       aaa
22      b       bbb
33      c       ccc
44      d       ddd
55      e       eee
66      f       fff
77      g       ggg
88      h       hhh
99      i       iii
00      j       jjj
Time taken: 0.186 seconds, Fetched: 10 row(s)  

在确认三张表的数据准确无误后,使用如下关联语句对三张表进行关联:

select a.id as a_id, b.name as b_name, c.addr as c_addr from table_a a join table_b b on(a.id=b.id) join table_c c on(c.name=b.name);  

关联结果如下,数据丢失的结果令人咋舌。

MapReduce Jobs Launched: 
Stage-Stage-1:  HDFS Read: 23439 HDFS Write: 5508 SUCCESS
Stage-Stage-2:  HDFS Read: 26292 HDFS Write: 5508 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
22      b       bbb
66      f       fff
88      h       hhh
55      e       eee
99      i       iii
Time taken: 3.343 seconds, Fetched: 5 row(s)

可以非常明显地看到,本来应该被完全关联在一起的 10 条数据,居然出现了严重的数据丢失。有一半的数据竟然没有被成功关联。如果多次运行关联语句,可以发现这不是偶然情况。每次关联 2 张表以上的数据都会出现极为严重的数据丢失问题。

场景2: 表的某些属性 (e.g. bucketing_version) 不同时,即使两张表关联也会导致数据丢失

使用如下数据进行数据建表关联。在建表时使用不同的 bucketing_version 进行表的初始化。

数据文件如下:

0,Kurt,vulnedcasey@yahoo.co.uk
1,Rolland,naejose@gmx.com
2,Cortez,blategarfield@yahoo.com
3,Tyron,tameprobes@gmail.com
4,Matthew,wellezekiel@yahoo.co.uk
5,Jeffrey,fabingeborg@comcast.net
6,Gerard,oughtoutgo@att.net
7,Hal,coursedmauro@hotmail.com
8,Virgil,squintprude@gmail.com
9,Hector,lewddillon@email.com  

利用如下语句建表,并在建表时使用不同的 bucketing_version 属性。

CREATE TABLE `join_test_1`(`id` string, `first` string, `email` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ('field.delim'=',', 'serialization.format'=',') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' TBLPROPERTIES ('bucketing_version'='1');

LOAD DATA LOCAL INPATH '/home/hadoop/reproduce_hive/Scenario2/test_data.csv' OVERWRITE INTO TABLE join_test_1;

CREATE TABLE `join_test_2`(`id` string, `first` string, `email` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ('field.delim'=',', 'serialization.format'=',') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' TBLPROPERTIES ('bucketing_version'='2');

LOAD DATA LOCAL INPATH '/home/hadoop/reproduce_hive/Scenario2/test_data.csv' OVERWRITE INTO TABLE join_test_2;  

运行关联操作的 SQL 语句:

SET hive.execution.engine=mr;
SET mapred.reduce.tasks=2;
SET hive.auto.convert.join=false;
SELECT * from (SELECT id from join_test_1) as tbl1 LEFT JOIN (SELECT id from join_test_2) as tbl2 on tbl1.id = tbl2.id;  

查询关联结果,令人惊讶的事情再一次发生:

Ended Job = job_local184369678_0005
MapReduce Jobs Launched: 
Stage-Stage-1:  HDFS Read: 28434 HDFS Write: 7956 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
0       NULL
2       NULL
4       NULL
6       NULL
8       8
1       NULL
3       NULL
5       5
7       NULL
9       NULL

蕴含同样数据的两张表,仅仅由于建表时的某些属性不同,就导致了绝大部分数据的关联都不成功。数据最基本的准确性都无法得到保障,这毫无疑问是 HIVE 3 中非常致命的问题。

问题根源

由于该问题影响过于严重,导致许多使用 HIVE 的开发者第一时间发现了本问题并及时进行了 Bug Report。在 HIVE Jira 上面可以看到非常多的针对该问题的问题报告和可能的解决方案。

本文主要采用了 HIVE JIRA 中编号为 HIVE-22098 的问题描述和相应解决思路。

根据 HIVE-22098 的问题描述,究其根源,是由于 HIVE 2 与 HIVE 3 在 JOIN 操作时使用了不同的 Hash 算法,导致同样的值在关联时被不同的 Hash 算法映射成了不同的值,而这些不同的 Hash 值在进行关联时无法被相互匹配。最终导致本来该被关联在一起的数据由于 Hash 值得不同未能被关联在一起。而决定到底应用哪套 Hash 值算法则是根据 bucketing_version 的值来进行评判的。

特别地,在进行多表关联时,即使相同 bucketing_version 的 Hive 表,由于其关联的中间过程所产生的中间表,在源代码中 bucketing_version 值会被置为 -1,因此该中间表再与第三张乃至更多的表关联时会直接导致 Hash 算法的混乱计算。

因此,为了保障关联的数据准确性,必须要确保 bucketing_version 在进行多表关联或者多版本表关联时的稳定。即,保障 bucketing_version 的稳定性就是保证 Hive 3 数据关联时的准确性。

此外,HIVE 社区已经针对 bucketing_version 不稳定的问题进行了集中的问题汇总和修改建议指导。可以通过查看 JIRA: HIVE-21304 了解系统性的 bucketing_version 稳定性提高方法,此处不做过多赘述。

Ps: 该问题还可能导致许多其他异常的出现,比如 HIVE-18983HIVE-20164HIVE-22429 等诸多问题的出现。因此该 BUG 的严重级别是最高的。修复了本问题,其余数十个问题也就都可以迎刃而解。

解决思路

由于 HIVE 中 JOIN 操作执行流程的本质是一个二叉树,因此我们只需要通过算法在关联时遍历每个节点,并将每个节点的 bucketing_version 在关联前手动设置为该二叉树中的最高版本,即可保证 bucketing_version 在关联时的稳定,也就可以保障关联不丢数。

源码修改及编译上传

将如下代码替换至原代码即可修复本问题。

下面给出 Patch 中的 Git 代码,对相应的类进行修复。Patch 的代码在 Jira HIVE-22098 中都可以找到。在此衷心感谢各位 Code Contributors 对于 HIVE 社区的贡献。

 From c0774da927451008ba78ed7b8637a1a4899d9e12 Mon Sep 17 00:00:00 2001
From: luguangming <luguangming1@huawei.com>
Date: Mon, 12 Aug 2019 14:24:05 +0800
Subject: [PATCH]HIVE-22098
---
 .../apache/hadoop/hive/ql/exec/mr/ExecMapper.java  | 41 ++++++++++++++++++++++
 1 file changed, 41 insertions(+)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
index 99b33a3..d0c847e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
@@ -20,10 +20,12 @@

 import java.io.IOException;
 import java.net.URLClassLoader;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;

+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -104,6 +106,10 @@ public void configure(JobConf job) {
       // initialize map operator
       mo.initialize(job, null);
       mo.setChildren(job);
+
+      // defined self balance ReduceSinkOperator of bucketVersion
+      balanceRSOpbucketVersion(mo);
+
       l4j.info(mo.dump(0));
       // initialize map local work
       localWork = mrwork.getMapRedLocalWork();
@@ -138,6 +144,41 @@ public void configure(JobConf job) {
       }
     }
   }
+
+  /**
+   * defined-self balance ReduceSinkOperator of bucketVersion, keep values to sameness
+   * @param rootOp
+   */
+  private static void balanceRSOpbucketVersion(Operator rootOp){
+    List<Operator<? extends OperatorDesc>> needDealOps = new ArrayList<Operator<? extends OperatorDesc>>();
+    visitChildGetRSOps(rootOp, needDealOps);
+    int bucketVersion = -1;
+    for(Operator<? extends OperatorDesc> rsop : needDealOps){
+      if(rsop.getBucketingVersion() != 2 && rsop.getBucketingVersion() != 1){
+        rsop.setBucketingVersion(-1);
+      }
+      if(rsop.getBucketingVersion() > bucketVersion){
+        bucketVersion = rsop.getBucketingVersion();
+      }
+    }
+    for(Operator<? extends OperatorDesc> rsop : needDealOps){
+      l4j.info("update reduceSinkOperator name="+rsop.getName()+", opId="+rsop.getOperatorId()+", oldBucketVersion="+rsop.getBucketingVersion()+", newBucketVersion="+bucketVersion);
+      rsop.setBucketingVersion(bucketVersion);
+    }
+    needDealOps.clear();
+  }
+  private static void visitChildGetRSOps(Operator rootOp, List<Operator<? extends OperatorDesc>> needDealOps){
+    List<Operator<? extends OperatorDesc>> ops = rootOp.getChildOperators();
+    if(ops == null || ops.isEmpty()){
+      return;
+    }
+    for(Operator<? extends OperatorDesc> op : ops) {
+      if (op instanceof ReduceSinkOperator) {
+        needDealOps.add(op);
+      }
+      visitChildGetRSOps(op, needDealOps);
+    }
+  }
   @Override
   public void map(Object key, Object value, OutputCollector output,
       Reporter reporter) throws IOException {
-- 
2.9.2

编译对应的模块 hive-exec-3.1.2.jar,并将该 Jar 包替换 Hive 3 自带的 Jar 包。编译 HIVE 的命令可以去查询 HIVE 的官方文档,这里不做过多赘述。

重启 Hive 让其重新加载我们修改源码后的 Jar 包,再次重复上述两个场景,即可观察到 MapReduce 的结果正常,该问题被成功修复。

关联结果示例:

Ended Job = job_local184369678_0006
MapReduce Jobs Launched: 
Stage-Stage-1:  HDFS Read: 22434 HDFS Write: 7966 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
0       0
1       1
2       2
4       4
6       6
8       8
3       3
5       5
7       7
9       9

小结

HIVE JIRA 中有许多关于异常信息和报错的讨论。经常性地浏览社区,配合阅读源代码可以对 HIVE 的理解更加深入。

再次,HIVE 社区已经针对 bucketing_version 不稳定的问题进行了集中的问题汇总和修改建议指导。可以通过查看 JIRA: HIVE-21304 了解系统性的 bucketing_version 稳定性提高方法,此处不做过多赘述。

希望本篇文章对您的 HIVE 使用有所帮助。

References

  • JIRA: HIVE-22098
  • JIRA: HIVE-21304

相关文章:

  • 计算机毕业设计Java网上求职招聘系统(源码+系统+mysql数据库+Lw文档)
  • C#构造函数
  • 【Node.js+koa--后端管理系统】用户注册接口设计 | 连接Mysql数据库 | 校验注册权限
  • 30岁年薪28W,我还是没顶住压力跳槽了····
  • boost之string_ref
  • Java实现拼图小游戏(1)—— JFrame的认识及界面搭建
  • Java成品网站推荐 毕设从这起步就够了
  • P4 开发实践 — NG-SDN Tutorial — Exercise 5: IPv6 Routing
  • Android Studio Dolphin | 2021.3.1 发布,快来看看有什么更新吧~
  • 常见软件---SQLite3的C语言下使用
  • 嵌入式C语言(入门必看)
  • 神经网络解决优化问题,神经网络 样本不平衡
  • java 使用curl 超时无返回结果问题 有请求 无响应 卡死问题
  • BERT知识蒸馏Distilled BiLSTM
  • 启明智显分享|基于ESP32-S3方案的4寸86盒开发板快速开发及烧录
  • JavaScript-如何实现克隆(clone)函数
  • 分享一款快速APP功能测试工具
  • 《Java8实战》-第四章读书笔记(引入流Stream)
  • JavaSE小实践1:Java爬取斗图网站的所有表情包
  • Java知识点总结(JDBC-连接步骤及CRUD)
  • Mac转Windows的拯救指南
  • MYSQL如何对数据进行自动化升级--以如果某数据表存在并且某字段不存在时则执行更新操作为例...
  • Storybook 5.0正式发布:有史以来变化最大的版本\n
  • unity如何实现一个固定宽度的orthagraphic相机
  • 阿里云爬虫风险管理产品商业化,为云端流量保驾护航
  • 包装类对象
  • 创建一个Struts2项目maven 方式
  • 复杂数据处理
  • 基于MaxCompute打造轻盈的人人车移动端数据平台
  • 使用API自动生成工具优化前端工作流
  • 小李飞刀:SQL题目刷起来!
  • 字符串匹配基础上
  • 你对linux中grep命令知道多少?
  • 阿里云API、SDK和CLI应用实践方案
  • #NOIP 2014#Day.2 T3 解方程
  • #微信小程序(布局、渲染层基础知识)
  • #我与Java虚拟机的故事#连载10: 如何在阿里、腾讯、百度、及字节跳动等公司面试中脱颖而出...
  • $.each()与$(selector).each()
  • ()、[]、{}、(())、[[]]命令替换
  • (¥1011)-(一千零一拾一元整)输出
  • (附源码)ssm捐赠救助系统 毕业设计 060945
  • (十二)springboot实战——SSE服务推送事件案例实现
  • (转)编辑寄语:因为爱心,所以美丽
  • (转)视频码率,帧率和分辨率的联系与区别
  • .NET core 自定义过滤器 Filter 实现webapi RestFul 统一接口数据返回格式
  • .net Stream篇(六)
  • .net 获取url的方法
  • .Net6使用WebSocket与前端进行通信
  • .NET开发者必备的11款免费工具
  • @transaction 提交事务_【读源码】剖析TCCTransaction事务提交实现细节
  • [.NET 即时通信SignalR] 认识SignalR (一)
  • [AIR] NativeExtension在IOS下的开发实例 --- IOS项目的创建 (一)
  • [android] 天气app布局练习
  • [AutoSar]BSW_Memory_Stack_003 NVM与APP的显式和隐式同步
  • [C#]C# winform部署yolov8目标检测的openvino模型