当前位置: 首页 > news >正文

Hive3.1.2的HQL执行过程

Hive3.1.2的HQL执行过程

前言

上一篇讲解了Hive3.1.2的Beeline执行过程:lizhiyong.blog.csdn.net/article/details/126634843

总结概括如下:

Main方法初始化Beeline对象→吊起唯一的方法→读取配置的load方法→
启动历史的setupHistory方法→平滑退出的addBeelineShutdownHook方法→
初始化命令行读取器的initializeConsoleReader方法→初始化入参的initArgs方法→
调度的dispatch方法→执行脚本文件的executeFile方法【不一定执行】→
最终执行execute方法【如果有-f传入脚本文件,内部也会吊起调度的dispatch方法】

通过扒源码的方式得出结论:Beeline底层是走JDBC方式操作Hive。

[root@zhiyong2 ~]# cd /opt/usdp-srv/srv/udp/2.0.0.0/hive/bin
[root@zhiyong2 bin]# ls -ltr
总用量 64
-rwxrwxrwx. 1 hadoop hadoop   884 823 2019 schematool
-rwxrwxrwx. 1 hadoop hadoop   832 823 2019 metatool
-rwxrwxrwx. 1 hadoop hadoop  3064 823 2019 init-hive-dfs.sh
-rwxrwxrwx. 1 hadoop hadoop   880 823 2019 hplsql
-rwxrwxrwx. 1 hadoop hadoop   885 823 2019 hiveserver2
-rwxrwxrwx. 1 hadoop hadoop   881 823 2019 beeline
drwxrwxrwx. 3 hadoop hadoop  4096 1224 2020 ext
-rwxrwxrwx. 1 hadoop hadoop  1981 1214 2021 hive-config.sh
-rwxrwxrwx. 1 hadoop hadoop 10414 31 2022 hive
-rwxrwxrwx. 1 hadoop hadoop   141 31 2022 init-metastore-db.sh
-rwxrwxrwx. 1 hadoop hadoop   601 31 2022 metastore-ctl.sh
-rwxrwxrwx. 1 hadoop hadoop   588 31 2022 hive-server2-ctl.sh
-rwxrwxrwx. 1 hadoop hadoop   962 31 2022 check-warehouse-dir.sh
-rwxrwxrwx. 1 hadoop hadoop  1077 31 2022 check-tez-dir.sh

在Hive安装路径的bin下,有beeline及hive这2个shell脚本。beeline的内容:

#!/usr/bin/env bash

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/hive --service beeline "$@"

很简洁,就是切到bin路径,然后执行了hive这个shell脚本,并且传参。hive的脚本:

#!/usr/bin/env bash

cygwin=false
case "`uname`" in
   CYGWIN*) cygwin=true;;
esac

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/hive-config.sh

SERVICE=""
HELP=""
SKIP_HBASECP=false
SKIP_HADOOPVERSION=false

SERVICE_ARGS=()
while [ $# -gt 0 ]; do
  case "$1" in
    --version)
      shift
      SERVICE=version
      ;;
    --service)
      shift
      SERVICE=$1
      shift
      ;;
    --rcfilecat)
      SERVICE=rcfilecat
      shift
      ;;
    --orcfiledump)
      SERVICE=orcfiledump
      shift
      ;;
    --llapdump)
      SERVICE=llapdump
      shift
      ;;
    --skiphadoopversion)
      SKIP_HADOOPVERSION=true
      shift
      ;;
    --skiphbasecp)
      SKIP_HBASECP=true
      shift
      ;;
    --help)
      HELP=_help
      shift
      ;;
    --debug*)
      DEBUG=$1
      shift
      ;;
    *)
      SERVICE_ARGS=("${SERVICE_ARGS[@]}" "$1")
      shift
      ;;
  esac
done

if [ "$SERVICE" = "" ] ; then
  if [ "$HELP" = "_help" ] ; then
    SERVICE="help"
  else
    SERVICE="cli"
  fi
fi

if [[ "$SERVICE" == "cli" && "$USE_BEELINE_FOR_HIVE_CLI" == "true" ]] ; then
  SERVICE="beeline"
fi

if [[ "$SERVICE" =~ ^(help|version|orcfiledump|rcfilecat|schemaTool|cleardanglingscratchdir|metastore|beeline|llapstatus|llap)$ ]] ; then
  SKIP_HBASECP=true
fi

if [[ "$SERVICE" =~ ^(help|schemaTool)$ ]] ; then
  SKIP_HADOOPVERSION=true
fi

if [ -f "${HIVE_CONF_DIR}/hive-env.sh" ]; then
  . "${HIVE_CONF_DIR}/hive-env.sh"
fi

if [[ -z "$SPARK_HOME" ]]
then
  bin=`dirname "$0"`
  # many hadoop installs are in dir/{spark,hive,hadoop,..}
  if test -e $bin/../../spark; then
    sparkHome=$(readlink -f $bin/../../spark)
    if [[ -d $sparkHome ]]
    then
      export SPARK_HOME=$sparkHome
    fi
  fi
fi

CLASSPATH="${TEZ_CONF_DIR:-/etc/tez/conf}:${HIVE_CONF_DIR}"

HIVE_LIB=${HIVE_HOME}/lib

# needed for execution
if [ ! -f ${HIVE_LIB}/hive-exec-*.jar ]; then
  echo "Missing Hive Execution Jar: ${HIVE_LIB}/hive-exec-*.jar"
  exit 1;
fi

if [ ! -f ${HIVE_LIB}/hive-metastore-*.jar ]; then
  echo "Missing Hive MetaStore Jar"
  exit 2;
fi

# cli specific code
if [ ! -f ${HIVE_LIB}/hive-cli-*.jar ]; then
  echo "Missing Hive CLI Jar"
  exit 3;
fi

# Hbase and Hadoop use their own log4j jars.  Including hives log4j jars can cause
# log4j warnings.  So save hives log4j jars in LOG_JAR_CLASSPATH, and add it to classpath
# after Hbase and Hadoop calls finish
LOG_JAR_CLASSPATH="";

for f in ${HIVE_LIB}/*.jar; do
  if [[ $f == *"log4j"* ]]; then
    LOG_JAR_CLASSPATH=${LOG_JAR_CLASSPATH}:$f;
  else
    CLASSPATH=${CLASSPATH}:$f;
  fi
done

# add the auxillary jars such as serdes
if [ -d "${HIVE_AUX_JARS_PATH}" ]; then
  hive_aux_jars_abspath=`cd ${HIVE_AUX_JARS_PATH} && pwd`
  for f in $hive_aux_jars_abspath/*.jar; do
    if [[ ! -f $f ]]; then
        continue;
    fi
    if $cygwin; then
	f=`cygpath -w "$f"`
    fi
    AUX_CLASSPATH=${AUX_CLASSPATH}:$f
    if [ "${AUX_PARAM}" == "" ]; then
        AUX_PARAM=file://$f
    else
        AUX_PARAM=${AUX_PARAM},file://$f;
    fi
  done
elif [ "${HIVE_AUX_JARS_PATH}" != "" ]; then
  HIVE_AUX_JARS_PATH=`echo $HIVE_AUX_JARS_PATH | sed 's/,/:/g'`
  if $cygwin; then
      HIVE_AUX_JARS_PATH=`cygpath -p -w "$HIVE_AUX_JARS_PATH"`
      HIVE_AUX_JARS_PATH=`echo $HIVE_AUX_JARS_PATH | sed 's/;/,/g'`
  fi
  AUX_CLASSPATH=${AUX_CLASSPATH}:${HIVE_AUX_JARS_PATH}
  AUX_PARAM="file://$(echo ${HIVE_AUX_JARS_PATH} | sed 's/:/,file:\/\//g')"
fi

# adding jars from auxlib directory
for f in ${HIVE_HOME}/auxlib/*.jar; do
  if [[ ! -f $f ]]; then
      continue;
  fi
  if $cygwin; then
      f=`cygpath -w "$f"`
  fi
  AUX_CLASSPATH=${AUX_CLASSPATH}:$f
  if [ "${AUX_PARAM}" == "" ]; then
    AUX_PARAM=file://$f
  else
    AUX_PARAM=${AUX_PARAM},file://$f;
  fi
done
if $cygwin; then
    CLASSPATH=`cygpath -p -w "$CLASSPATH"`
    CLASSPATH=${CLASSPATH};${AUX_CLASSPATH}
else
    CLASSPATH=${CLASSPATH}:${AUX_CLASSPATH}
fi

# supress the HADOOP_HOME warnings in 1.x.x
export HADOOP_HOME_WARN_SUPPRESS=true

# to make sure log4j2.x and jline jars are loaded ahead of the jars pulled by hadoop
export HADOOP_USER_CLASSPATH_FIRST=true

# pass classpath to hadoop
if [ "$HADOOP_CLASSPATH" != "" ]; then
  export HADOOP_CLASSPATH="${CLASSPATH}:${HADOOP_CLASSPATH}"
else
  export HADOOP_CLASSPATH="$CLASSPATH"
fi

# also pass hive classpath to hadoop
if [ "$HIVE_CLASSPATH" != "" ]; then
  export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:${HIVE_CLASSPATH}";
fi

# check for hadoop in the path
HADOOP_IN_PATH=`which hadoop 2>/dev/null`
if [ -f ${HADOOP_IN_PATH} ]; then
  HADOOP_DIR=`dirname "$HADOOP_IN_PATH"`/..
fi
# HADOOP_HOME env variable overrides hadoop in the path
HADOOP_HOME=${HADOOP_HOME:-${HADOOP_PREFIX:-$HADOOP_DIR}}
if [ "$HADOOP_HOME" == "" ]; then
  echo "Cannot find hadoop installation: \$HADOOP_HOME or \$HADOOP_PREFIX must be set or hadoop must be in the path";
  exit 4;
fi

# add distcp to classpath, hive depends on it
for f in ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-distcp-*.jar; do
  export HADOOP_CLASSPATH=${HADOOP_CLASSPATH}:$f;
done

HADOOP=$HADOOP_HOME/bin/hadoop
if [ ! -f ${HADOOP} ]; then
  echo "Cannot find hadoop installation: \$HADOOP_HOME or \$HADOOP_PREFIX must be set or hadoop must be in the path";
  exit 4;
fi

if [ "$SKIP_HADOOPVERSION" = false ]; then
  # Make sure we're using a compatible version of Hadoop
  if [ "x$HADOOP_VERSION" == "x" ]; then
      HADOOP_VERSION=$($HADOOP version 2>&2 | awk -F"\t" '/Hadoop/ {print $0}' | cut -d' ' -f 2);
  fi

  # Save the regex to a var to workaround quoting incompatabilities
  # between Bash 3.1 and 3.2
  hadoop_version_re="^([[:digit:]]+)\.([[:digit:]]+)(\.([[:digit:]]+))?.*$"

  if [[ "$HADOOP_VERSION" =~ $hadoop_version_re ]]; then
      hadoop_major_ver=${BASH_REMATCH[1]}
      hadoop_minor_ver=${BASH_REMATCH[2]}
      hadoop_patch_ver=${BASH_REMATCH[4]}
  else
      echo "Unable to determine Hadoop version information."
      echo "'hadoop version' returned:"
      echo `$HADOOP version`
      exit 5
  fi

  if [ "$hadoop_major_ver" -lt "1" -a  "$hadoop_minor_ver$hadoop_patch_ver" -lt "201" ]; then
      echo "Hive requires Hadoop 0.20.x (x >= 1)."
      echo "'hadoop version' returned:"
      echo `$HADOOP version`
      exit 6
  fi
fi

if [ "$SKIP_HBASECP" = false ]; then
  # HBase detection. Need bin/hbase and a conf dir for building classpath entries.
  # Start with BigTop defaults for HBASE_HOME and HBASE_CONF_DIR.
  HBASE_HOME=${HBASE_HOME:-"/usr/lib/hbase"}
  HBASE_CONF_DIR=${HBASE_CONF_DIR:-"/etc/hbase/conf"}
  if [[ ! -d $HBASE_CONF_DIR ]] ; then
    # not explicitly set, nor in BigTop location. Try looking in HBASE_HOME.
    HBASE_CONF_DIR="$HBASE_HOME/conf"
  fi

  # perhaps we've located the HBase config. if so, include it on classpath.
  if [[ -d $HBASE_CONF_DIR ]] ; then
    export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:${HBASE_CONF_DIR}"
  fi

  # look for the hbase script. First check HBASE_HOME and then ask PATH.
  if [[ -e $HBASE_HOME/bin/hbase ]] ; then
    HBASE_BIN="$HBASE_HOME/bin/hbase"
  fi
  HBASE_BIN=${HBASE_BIN:-"$(which hbase)"}

  # perhaps we've located HBase. If so, include its details on the classpath
  if [[ -n $HBASE_BIN ]] ; then
    # exclude ZK, PB, and Guava (See HIVE-2055)
    # depends on HBASE-8438 (hbase-0.94.14+, hbase-0.96.1+) for `hbase mapredcp` command
    for x in $($HBASE_BIN mapredcp 2>&2 | tr ':' '\n') ; do
      if [[ $x == *zookeeper* || $x == *protobuf-java* || $x == *guava* ]] ; then
        continue
      fi
      # TODO: should these should be added to AUX_PARAM as well?
      export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:${x}"
    done
  fi
fi

if [ "${AUX_PARAM}" != "" ]; then
  if [[ "$SERVICE" != beeline ]]; then
    HIVE_OPTS="$HIVE_OPTS --hiveconf hive.aux.jars.path=${AUX_PARAM}"
  fi
  AUX_JARS_CMD_LINE="-libjars ${AUX_PARAM}"
fi

if [ "$SERVICE" = "hiveserver2" ] ; then
  export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS $HIVE_SERVER2_JMX_OPTS "
fi

if [ "$SERVICE" = "metastore" ] ; then
  export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS $HIVE_METASTORE_JMX_OPTS "
fi

SERVICE_LIST=""

for i in "$bin"/ext/*.sh ; do
  . $i
done

for i in "$bin"/ext/util/*.sh ; do
  . $i
done

if [ "$DEBUG" ]; then
  if [ "$HELP" ]; then
    debug_help
    exit 0
  else
    get_debug_params "$DEBUG"
    export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS $HIVE_MAIN_CLIENT_DEBUG_OPTS"
  fi
fi

TORUN=""
for j in $SERVICE_LIST ; do
  if [ "$j" = "$SERVICE" ] ; then
    TORUN=${j}$HELP
  fi
done

# to initialize logging for all services

export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Dlog4j2.formatMsgNoLookups=true -Dlog4j.configurationFile=hive-log4j2.properties "

if [ -f "${HIVE_CONF_DIR}/parquet-logging.properties" ]; then
  export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Djava.util.logging.config.file=${HIVE_CONF_DIR}/parquet-logging.properties "
else
  export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Djava.util.logging.config.file=$bin/../conf/parquet-logging.properties "
fi

if [[ "$SERVICE" =~ ^(hiveserver2|beeline|cli)$ ]] ; then
  # If process is backgrounded, don't change terminal settings
  if [[ ( ! $(ps -o stat= -p $$) =~ "+" ) && ! ( -p /dev/stdin ) && ( ! $(ps -o tty= -p $$) =~ "?" ) ]]; then
    export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Djline.terminal=jline.UnsupportedTerminal"
  fi
fi

# include the log4j jar that is used for hive into the classpath
CLASSPATH="${CLASSPATH}:${LOG_JAR_CLASSPATH}"
export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:${LOG_JAR_CLASSPATH}"

if [ "$TORUN" = "" ] ; then
  echo "Service $SERVICE not found"
  echo "Available Services: $SERVICE_LIST"
  exit 7
else
  set -- "${SERVICE_ARGS[@]}"
  $TORUN "$@"
fi

从hive脚本可以看出,

for i in "$bin"/ext/*.sh ; do
  . $i
done

for i in "$bin"/ext/util/*.sh ; do
  . $i
done

Hive脚本会把安装包路径下/ext及/ext/util的所有脚本都执行一遍!!!也就是这些脚本:

/opt/usdp-srv/srv/udp/2.0.0.0/hive/bin
[root@zhiyong2 bin]# ll
总用量 64
-rwxrwxrwx. 1 hadoop hadoop   881 823 2019 beeline
-rwxrwxrwx. 1 hadoop hadoop  1077 31 2022 check-tez-dir.sh
-rwxrwxrwx. 1 hadoop hadoop   962 31 2022 check-warehouse-dir.sh
drwxrwxrwx. 3 hadoop hadoop  4096 1224 2020 ext
-rwxrwxrwx. 1 hadoop hadoop 10414 31 2022 hive
-rwxrwxrwx. 1 hadoop hadoop  1981 1214 2021 hive-config.sh
-rwxrwxrwx. 1 hadoop hadoop   885 823 2019 hiveserver2
-rwxrwxrwx. 1 hadoop hadoop   588 31 2022 hive-server2-ctl.sh
-rwxrwxrwx. 1 hadoop hadoop   880 823 2019 hplsql
-rwxrwxrwx. 1 hadoop hadoop  3064 823 2019 init-hive-dfs.sh
-rwxrwxrwx. 1 hadoop hadoop   141 31 2022 init-metastore-db.sh
-rwxrwxrwx. 1 hadoop hadoop   601 31 2022 metastore-ctl.sh
-rwxrwxrwx. 1 hadoop hadoop   832 823 2019 metatool
-rwxrwxrwx. 1 hadoop hadoop   884 823 2019 schematool
[root@zhiyong2 bin]# cd ext/
[root@zhiyong2 ext]# ll
总用量 88
-rwxrwxrwx. 1 hadoop hadoop 1591 823 2019 beeline.sh
-rwxrwxrwx. 1 hadoop hadoop 1113 823 2019 cleardanglingscratchdir.sh
-rwxrwxrwx. 1 hadoop hadoop 1596 823 2019 cli.sh
-rwxrwxrwx. 1 hadoop hadoop 3199 823 2019 debug.sh
-rwxrwxrwx. 1 hadoop hadoop 1531 823 2019 fixacidkeyindex.sh
-rwxrwxrwx. 1 hadoop hadoop 1456 823 2019 help.sh
-rwxrwxrwx. 1 hadoop hadoop 1187 823 2019 hiveburninclient.sh
-rwxrwxrwx. 1 hadoop hadoop 1341 823 2019 hiveserver2.sh
-rwxrwxrwx. 1 hadoop hadoop 1372 823 2019 hplsql.sh
-rwxrwxrwx. 1 hadoop hadoop 1424 823 2019 jar.sh
-rwxrwxrwx. 1 hadoop hadoop 1238 823 2019 lineage.sh
-rwxrwxrwx. 1 hadoop hadoop 1185 823 2019 llapdump.sh
-rwxrwxrwx. 1 hadoop hadoop 1669 823 2019 llap.sh
-rwxrwxrwx. 1 hadoop hadoop 1471 823 2019 llapstatus.sh
-rwxrwxrwx. 1 hadoop hadoop 1393 823 2019 metastore.sh
-rwxrwxrwx. 1 hadoop hadoop 1101 823 2019 metatool.sh
-rwxrwxrwx. 1 hadoop hadoop 1894 823 2019 orcfiledump.sh
-rwxrwxrwx. 1 hadoop hadoop 1059 823 2019 rcfilecat.sh
-rwxrwxrwx. 1 hadoop hadoop 1080 823 2019 schemaTool.sh
-rwxrwxrwx. 1 hadoop hadoop 1078 823 2019 strictmanagedmigration.sh
-rwxrwxrwx. 1 hadoop hadoop 2871 823 2019 tokentool.sh
drwxrwxrwx. 2 hadoop hadoop   28 1115 2020 util
-rwxrwxrwx. 1 hadoop hadoop 1271 823 2019 version.sh
[root@zhiyong2 ext]# cd util/
[root@zhiyong2 util]# ll
总用量 4
-rwxrwxrwx. 1 hadoop hadoop 1460 823 2019 execHiveCmd.sh

除了一大坨取环境变量之类的参数变量外,最主要的需要3个依赖包:

[root@zhiyong2 lib]# pwd
/opt/usdp-srv/srv/udp/2.0.0.0/hive/lib
[root@zhiyong2 lib]# ll | grep -e hive-exec -e hive-metastore -e hive-cli
-rwxrwxrwx. 1 hadoop hadoop    46942 1115 2020 hive-cli-3.1.2.jar
-rwxrwxrwx. 1 hadoop hadoop 40625273 625 2021 hive-exec-3.1.2.jar
-rwxrwxrwx. 1 hadoop hadoop    36888 1115 2020 hive-metastore-3.1.2.jar

就是这3个,显然Hive脚本的运行必须有这3个包。
其实Hive的Java API开发过程中也非常依赖hive-exec【例如写UDF:lizhiyong.blog.csdn.net/article/details/126186377】,这是Hive运行时的具体执行类库。hive-metastore当然是Hive的元数据类库。hive-cli当然就是hive命令行的类库,所有命令都是通过它作为入口来运行的。其实JDBC和hive-cli只是入口不同,底层执行【没错,就是指代SQL→MapReduce/Tez/Spark】是一致的。这次就从入口扒源码,一探Hive的HQL执行过程,加深对Hive底层实现的理解。绝对不能像肤浅的SQL Boy们那样,只停留在表面的几句SQL,距离真正意义的大数据开发还有很长一段路要走。

入口

需要注意:虽然里外都叫hive和beeline脚本,但是bin和bin/ext的一定多少有所差异,不然谁也不会闲的蛋疼特意用路径区分。

Beeline的真实入口

[root@zhiyong2 ext]# pwd
/opt/usdp-srv/srv/udp/2.0.0.0/hive/bin/ext
[root@zhiyong2 ext]# cat beeline.sh
# Need arguments [host [port [db]]]
THISSERVICE=beeline
export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "

beeline () {
  CLASS=org.apache.hive.beeline.BeeLine;

  # include only the beeline client jar and its dependencies
  beelineJarPath=`ls ${HIVE_LIB}/hive-beeline-*.jar`
  superCsvJarPath=`ls ${HIVE_LIB}/super-csv-*.jar`
  jlineJarPath=`ls ${HIVE_LIB}/jline-*.jar`
  hadoopClasspath=""
  if [[ -n "${HADOOP_CLASSPATH}" ]]
  then
    hadoopClasspath="${HADOOP_CLASSPATH}:"
  fi
  export HADOOP_CLASSPATH="${hadoopClasspath}${HIVE_CONF_DIR}:${beelineJarPath}:${superCsvJarPath}:${jlineJarPath}"
  export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Dlog4j.configurationFile=beeline-log4j2.properties "

  exec $HADOOP jar ${beelineJarPath} $CLASS $HIVE_OPTS "$@"
}

beeline_help () {
  beeline "--help"
}

从这个写死的org.apache.hive.beeline.BeeLine类库可以看出,之前之前分析Hive3.1.2的Beeline执行过程:lizhiyong.blog.csdn.net/article/details/126634843

盲猜的入口类无误,此处验证了之前的猜测。此处不再赘述。

Cli的真实入口

[root@zhiyong2 util]# pwd
/opt/usdp-srv/srv/udp/2.0.0.0/hive/bin/ext
[root@zhiyong2 ext]# cat cl
cleardanglingscratchdir.sh  cli.sh
[root@zhiyong2 ext]# cat cli.sh
THISSERVICE=cli
export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "

# Set old CLI as the default client
# if USE_DEPRECATED_CLI is not set or is not equal to false use old CLI
if [ -z "$USE_DEPRECATED_CLI" ] || [ "$USE_DEPRECATED_CLI" != "false" ]; then
  USE_DEPRECATED_CLI="true"
fi

updateCli() {
  if [ "$USE_DEPRECATED_CLI" == "true" ]; then
    export HADOOP_CLIENT_OPTS=" -Dproc_hivecli $HADOOP_CLIENT_OPTS "
    CLASS=org.apache.hadoop.hive.cli.CliDriver
    JAR=hive-cli-*.jar
  else
    export HADOOP_CLIENT_OPTS=" -Dproc_beeline $HADOOP_CLIENT_OPTS -Dlog4j.configurationFile=beeline-log4j2.properties"
    CLASS=org.apache.hive.beeline.cli.HiveCli
    JAR=hive-beeline-*.jar
  fi
}

cli () {
  updateCli
  execHiveCmd $CLASS $JAR "$@"
}

cli_help () {
  updateCli
  execHiveCmd $CLASS $JAR "--help"
}

从这里可以看出,Hive Cli的真实入口有2个:org.apache.hive.beeline.cli.HiveCli和org.apache.hadoop.hive.cli.CliDriver。

[root@zhiyong2 util]# pwd
/opt/usdp-srv/srv/udp/2.0.0.0/hive/bin/ext/util
[root@zhiyong2 util]# ll
总用量 4
-rwxrwxrwx. 1 hadoop hadoop 1460 823 2019 execHiveCmd.sh
[root@zhiyong2 util]# cat execHiveCmd.sh
CLI_JAR="hive-cli-*.jar"
BEELINE_JAR="hive-beeline-*.jar"

execHiveCmd () {
  CLASS=$1;
  shift;

  # if jar is not passed as parameter use corresponding cli jar
  if [ "$1" == "$CLI_JAR" ] || [ "$1" == "$BEELINE_JAR" ]; then
    JAR="$1"
    shift;
  else
    if [ "$USE_DEPRECATED_CLI" == "true" ]; then
      JAR="$CLI_JAR"
    else
      JAR="$BEELINE_JAR"
    fi
  fi

  # cli specific code
  if [ ! -f ${HIVE_LIB}/$JAR ]; then
    echo "Missing $JAR Jar"
    exit 3;
  fi

  if $cygwin; then
    HIVE_LIB=`cygpath -w "$HIVE_LIB"`
  fi

  # hadoop 20 or newer - skip the aux_jars option. picked up from hiveconf
  exec $HADOOP jar ${HIVE_LIB}/$JAR $CLASS $HIVE_OPTS "$@"
}

当然这个是具体执行jar包的脚本,根据USE_DEPRECATED_CLI来判断是用淘汰的cli的Jar包还是新的BeeLine的Jar包。

BeeLine和Hive Cli的区别

此处科普一下区别:BeeLine更新,且轻量级【是一个纯Client】,SQL命令会先走JDBC传给Hive Server2,然后Hive Server2再去访问Meta Store,更容易实现权限管控。Hive Cli是淘汰的老版本,比较重量级【本地编译HQL】,SQL命令编译好以后直接访问Meta Store。显然Hive Cli提供的2种方式,也就给了用户可选择的余地。

Hive Cli的BeeLine真实入口

package org.apache.hive.beeline.cli;

import org.apache.hive.beeline.BeeLine;

import java.io.IOException;
import java.io.InputStream;

public class HiveCli {
  private BeeLine beeLine;

  public static void main(String[] args) throws IOException {
    int status = new HiveCli().runWithArgs(args, null);
    System.exit(status);
  }

  public int runWithArgs(String[] cmd, InputStream inputStream) throws IOException {
    beeLine = new BeeLine(false);
    try {
      return beeLine.begin(cmd, inputStream);
    } finally {
      beeLine.close();
    }
  }
}

结合之前分析Hive3.1.2的Beeline执行过程:lizhiyong.blog.csdn.net/article/details/126634843

Hive Cli的入口和BeeLine的入口调用的方法底层都是:beeLine.begin(cmd, inputStream),但是初始化BeeLine时给构造方法传的参数不同【Hive Cli的isBeeLine=false】,也就导致了底层执行的分支不同:

if (isBeeLine) {
  int code = initArgs(args);
  if (code != 0) {
    return code;
  }
} else {
  int code = initArgsFromCliVars(args);
  if (code != 0 || exit) {
    return code;
  }
  defaultConnect(false);
}

对应的初始化方法也不同:

int initArgsFromCliVars(String[] args) {
  List<String> commands = Collections.emptyList();

  CliOptionsProcessor optionsProcessor = new CliOptionsProcessor();
  if (!optionsProcessor.process(args)) {
    return 1;
  }
  CommandLine commandLine = optionsProcessor.getCommandLine();


  Properties confProps = commandLine.getOptionProperties("hiveconf");
  for (String propKey : confProps.stringPropertyNames()) {
    setHiveConfVar(propKey, confProps.getProperty(propKey));
  }

  Properties hiveVars = commandLine.getOptionProperties("define");
  for (String propKey : hiveVars.stringPropertyNames()) {
    getOpts().getHiveConfVariables().put(propKey, hiveVars.getProperty(propKey));
  }

  Properties hiveVars2 = commandLine.getOptionProperties("hivevar");
  for (String propKey : hiveVars2.stringPropertyNames()) {
    getOpts().getHiveConfVariables().put(propKey, hiveVars2.getProperty(propKey));
  }

  getOpts().setScriptFile(commandLine.getOptionValue("f"));

  if (commandLine.getOptionValues("i") != null) {
    getOpts().setInitFiles(commandLine.getOptionValues("i"));
  }

  dbName = commandLine.getOptionValue("database");
  getOpts().setVerbose(Boolean.parseBoolean(commandLine.getOptionValue("verbose")));
  getOpts().setSilent(Boolean.parseBoolean(commandLine.getOptionValue("silent")));

  int code = 0;
  if (commandLine.getOptionValues("e") != null) {
    commands = Arrays.asList(commandLine.getOptionValues("e"));
  }

  if (!commands.isEmpty() && getOpts().getScriptFile() != null) {
    System.err.println("The '-e' and '-f' options cannot be specified simultaneously");
    optionsProcessor.printCliUsage();
    return 1;
  }

  if (!commands.isEmpty()) {
    embeddedConnect();
    connectDBInEmbededMode();
    for (Iterator<String> i = commands.iterator(); i.hasNext(); ) {
      String command = i.next().toString();
      debug(loc("executing-command", command));
      if (!dispatch(command)) {
        code++;
      }
    }
    exit = true; // execute and exit
  }
  return code;
}

最后还是通过dispatch方法分发:

boolean dispatch(String line) {
  if (line == null) {
    // exit
    exit = true;
    return true;
  }

  if (line.trim().length() == 0) {
    return true;
  }

  if (isComment(line)) {
    return true;
  }

  line = line.trim();

  // save it to the current script, if any
  if (scriptOutputFile != null) {
    scriptOutputFile.addLine(line);
  }

  if (isHelpRequest(line)) {
    line = "!help";
  }

  if (isBeeLine) {
    if (line.startsWith(COMMAND_PREFIX)) {
      // handle SQLLine command in beeline which starts with ! and does not end with ;
      return execCommandWithPrefix(line);
    } else {
      return commands.sql(line, getOpts().getEntireLineAsCommand());
    }
  } else {
    return commands.sql(line, getOpts().getEntireLineAsCommand());
  }
}

然后执行了commands.sql(line, getOpts().getEntireLineAsCommand())方法:

public boolean sql(String line, boolean entireLineAsCommand) {
  return execute(line, false, entireLineAsCommand);
}

之后就和BeeLine半差不差,走JDBC执行命令。再然后executeFile和最终的execute也会被执行。和普通的BeeLine并没有太大的差距。不再赘述。

Hive Cli的老版本真实入口

老版本倒也不是一无是处,由于是本地编译HQL,所以顺藤摸瓜可以找到HQL解析MapReduce、Tez、Spark任务的入口。

package org.apache.hadoop.hive.cli;

public class CliDriver {
  public static void main(String[] args) throws Exception {
    int ret = new CliDriver().run(args);
    System.exit(ret);
  }
    
  public  int run(String[] args) throws Exception {

    OptionsProcessor oproc = new OptionsProcessor();
    if (!oproc.process_stage1(args)) {
      return 1;
    }

    // NOTE: It is critical to do this here so that log4j is reinitialized
    // before any of the other core hive classes are loaded
    boolean logInitFailed = false;
    String logInitDetailMessage;
    try {
      logInitDetailMessage = LogUtils.initHiveLog4j();
    } catch (LogInitializationException e) {
      logInitFailed = true;
      logInitDetailMessage = e.getMessage();
    }

    CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
    ss.in = System.in;
    try {
      ss.out = new PrintStream(System.out, true, "UTF-8");
      ss.info = new PrintStream(System.err, true, "UTF-8");
      ss.err = new CachingPrintStream(System.err, true, "UTF-8");
    } catch (UnsupportedEncodingException e) {
      return 3;
    }

    if (!oproc.process_stage2(ss)) {
      return 2;
    }

    if (!ss.getIsSilent()) {
      if (logInitFailed) {
        System.err.println(logInitDetailMessage);
      } else {
        SessionState.getConsole().printInfo(logInitDetailMessage);
      }
    }

    // set all properties specified via command line
    HiveConf conf = ss.getConf();
    for (Map.Entry<Object, Object> item : ss.cmdProperties.entrySet()) {
      conf.set((String) item.getKey(), (String) item.getValue());
      ss.getOverriddenConfigurations().put((String) item.getKey(), (String) item.getValue());
    }

    // read prompt configuration and substitute variables.
    prompt = conf.getVar(HiveConf.ConfVars.CLIPROMPT);
    prompt = new VariableSubstitution(new HiveVariableSource() {
      @Override
      public Map<String, String> getHiveVariable() {
        return SessionState.get().getHiveVariables();
      }
    }).substitute(conf, prompt);
    prompt2 = spacesForString(prompt);

    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CLI_TEZ_SESSION_ASYNC)) {
      // Start the session in a fire-and-forget manner. When the asynchronously initialized parts of
      // the session are needed, the corresponding getters and other methods will wait as needed.
      SessionState.beginStart(ss, console);
    } else {
      SessionState.start(ss);
    }

    ss.updateThreadName();

    // Create views registry
    HiveMaterializedViewsRegistry.get().init();

    // execute cli driver work
    try {
      return executeDriver(ss, conf, oproc);
    } finally {
      ss.resetThreadName();
      ss.close();
    }
  }    
}    

从这里就找到了Hive Cli的老版本真实入口。executeDriver(ss, conf, oproc)这个方法就是SQL具体转换为运算引擎任务的方法。

此处new出来的HiveConf非常重要,存储了所有Hive的配置项【HiveConf.java有5000多行,需要时自行查找即可】。Hive在beeline的所有Hadoop及Hive的配置项可以参考:lizhiyong.blog.csdn.net/article/details/126634922

当然后续的SQL转换运算引擎任务就都是顺着executeDriver(ss, conf, oproc)这个方法。

执行Cli工作的主方法executeDriver

package org.apache.hadoop.hive.cli;
public class CliDriver {}

/**
 * Execute the cli work
 * @param ss CliSessionState of the CLI driver
 * @param conf HiveConf for the driver session
 * @param oproc Operation processor of the CLI invocation
 * @return status of the CLI command execution
 * @throws Exception
 */
private int executeDriver(CliSessionState ss, HiveConf conf, OptionsProcessor oproc)
    throws Exception {

  CliDriver cli = new CliDriver();
  cli.setHiveVariables(oproc.getHiveVariables());

  // use the specified database if specified
  cli.processSelectDatabase(ss);

  // Execute -i init files (always in silent mode)
  cli.processInitFiles(ss);

  if (ss.execString != null) {
    int cmdProcessStatus = cli.processLine(ss.execString);
    return cmdProcessStatus;
  }

  try {
    if (ss.fileName != null) {
      return cli.processFile(ss.fileName);
    }
  } catch (FileNotFoundException e) {
    System.err.println("Could not open input file for reading. (" + e.getMessage() + ")");
    return 3;
  }
  if ("mr".equals(HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE))) {
    console.printInfo(HiveConf.generateMrDeprecationWarning());
  }

  setupConsoleReader();

  String line;
  int ret = 0;
  String prefix = "";
  String curDB = getFormattedDb(conf, ss);
  String curPrompt = prompt + curDB;
  String dbSpaces = spacesForString(curDB);

  while ((line = reader.readLine(curPrompt + "> ")) != null) {
    if (!prefix.equals("")) {
      prefix += '\n';
    }
    if (line.trim().startsWith("--")) {
      continue;
    }
    if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
      line = prefix + line;
      ret = cli.processLine(line, true);
      prefix = "";
      curDB = getFormattedDb(conf, ss);
      curPrompt = prompt + curDB;
      dbSpaces = dbSpaces.length() == curDB.length() ? dbSpaces : spacesForString(curDB);
    } else {
      prefix = prefix + line;
      curPrompt = prompt2 + dbSpaces;
      continue;
    }
  }

  return ret;
}

初始化-i等提供的参数这些不是重点。

其中根据这个配置:

package org.apache.hadoop.hive.conf;
public class HiveConf extends Configuration {}
HIVE_EXECUTION_ENGINE("hive.execution.engine", "mr", new StringSet(true, "mr", "tez", "spark"),
    "Chooses execution engine. Options are: mr (Map reduce, default), tez, spark. While MR\n" +
    "remains the default engine for historical reasons, it is itself a historical engine\n" +
    "and is deprecated in Hive 2 line. It may be removed without further warning.")
    
  public static String generateMrDeprecationWarning() {
    return "Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. "
        + "Consider using a different execution engine (i.e. " + HiveConf.getNonMrEngines()
        + ") or using Hive 1.X releases.";
  }    

当运算引擎是默认的mr时就会报这种喜闻乐见的警告【但是至今也还能用,只不过做类似insert的操作时,容易报错】。

修改执行引擎参考:lizhiyong.blog.csdn.net/article/details/123436630

根据cloudera的描述,目前CDP7只留下Hive On Tez:https://docs.cloudera.com/cdp-private-cloud-base/7.1.3/hive-introduction/topics/hive-unsupported.html

在这里插入图片描述

所以本文主要会关注Tez而非MapReduce和Spark。主要是运算引擎的不同,解析AST之类的操作还是相同的。

简单分析后可以定位到ret = cli.processLine(line, true),这个方法明显是具体执行拆分后的SQL语句的。

执行拆分后SQL的processLine方法

package org.apache.hadoop.hive.cli;
public class CliDriver {}
/**
 * Processes a line of semicolon separated commands
 *
 * @param line
 *          The commands to process
 * @param allowInterrupting
 *          When true the function will handle SIG_INT (Ctrl+C) by interrupting the processing and
 *          returning -1
 * @return 0 if ok
 */
public int processLine(String line, boolean allowInterrupting) {
  SignalHandler oldSignal = null;
  Signal interruptSignal = null;

  if (allowInterrupting) {
    // Remember all threads that were running at the time we started line processing.
    // Hook up the custom Ctrl+C handler while processing this line
    interruptSignal = new Signal("INT");
    oldSignal = Signal.handle(interruptSignal, new SignalHandler() {
      private boolean interruptRequested;

      @Override
      public void handle(Signal signal) {
        boolean initialRequest = !interruptRequested;
        interruptRequested = true;

        // Kill the VM on second ctrl+c
        if (!initialRequest) {
          console.printInfo("Exiting the JVM");
          System.exit(127);
        }

        // Interrupt the CLI thread to stop the current statement and return
        // to prompt
        console.printInfo("Interrupting... Be patient, this might take some time.");
        console.printInfo("Press Ctrl+C again to kill JVM");

        // First, kill any running MR jobs
        HadoopJobExecHelper.killRunningJobs();
        TezJobExecHelper.killRunningJobs();
        HiveInterruptUtils.interrupt();
      }
    });
  }

  try {
    int lastRet = 0, ret = 0;

    // we can not use "split" function directly as ";" may be quoted
    List<String> commands = splitSemiColon(line);

    String command = "";
    for (String oneCmd : commands) {

      if (StringUtils.endsWith(oneCmd, "\\")) {
        command += StringUtils.chop(oneCmd) + ";";
        continue;
      } else {
        command += oneCmd;
      }
      if (StringUtils.isBlank(command)) {
        continue;
      }

      ret = processCmd(command);
      command = "";
      lastRet = ret;
      boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
      if (ret != 0 && !ignoreErrors) {
        return ret;
      }
    }
    return lastRet;
  } finally {
    // Once we are done processing the line, restore the old handler
    if (oldSignal != null && interruptSignal != null) {
      Signal.handle(interruptSignal, oldSignal);
    }
  }
}

这个方法检测到ctrl+C就强制结束MapReduce和Tez的任务,但是没有看到kill掉Spark任务的方法也是神奇,不过Hive On Spark正常情况没什么人吧?Spark On Hive的Spark SQL好像还更科学一些。

之后就会执行ret = processCmd(command)方法,执行具体的命令行操作。

执行命令行的processCmd方法

package org.apache.hadoop.hive.cli;
public class CliDriver {}
public int processCmd(String cmd) {
  CliSessionState ss = (CliSessionState) SessionState.get();
  ss.setLastCommand(cmd);

  ss.updateThreadName();

  // Flush the print stream, so it doesn't include output from the last command
  ss.err.flush();
  String cmd_trimmed = HiveStringUtils.removeComments(cmd).trim();
  String[] tokens = tokenizeCmd(cmd_trimmed);
  int ret = 0;

  if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {

    // if we have come this far - either the previous commands
    // are all successful or this is command line. in either case
    // this counts as a successful run
    ss.close();
    System.exit(0);

  } else if (tokens[0].equalsIgnoreCase("source")) {
    String cmd_1 = getFirstCmd(cmd_trimmed, tokens[0].length());
    cmd_1 = new VariableSubstitution(new HiveVariableSource() {
      @Override
      public Map<String, String> getHiveVariable() {
        return SessionState.get().getHiveVariables();
      }
    }).substitute(ss.getConf(), cmd_1);

    File sourceFile = new File(cmd_1);
    if (! sourceFile.isFile()){
      console.printError("File: "+ cmd_1 + " is not a file.");
      ret = 1;
    } else {
      try {
        ret = processFile(cmd_1);
      } catch (IOException e) {
        console.printError("Failed processing file "+ cmd_1 +" "+ e.getLocalizedMessage(),
          stringifyException(e));
        ret = 1;
      }
    }
  } else if (cmd_trimmed.startsWith("!")) {
    // for shell commands, use unstripped command
    String shell_cmd = cmd.trim().substring(1);
    shell_cmd = new VariableSubstitution(new HiveVariableSource() {
      @Override
      public Map<String, String> getHiveVariable() {
        return SessionState.get().getHiveVariables();
      }
    }).substitute(ss.getConf(), shell_cmd);

    // shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'";
    try {
      ShellCmdExecutor executor = new ShellCmdExecutor(shell_cmd, ss.out, ss.err);
      ret = executor.execute();
      if (ret != 0) {
        console.printError("Command failed with exit code = " + ret);
      }
    } catch (Exception e) {
      console.printError("Exception raised from Shell command " + e.getLocalizedMessage(),
          stringifyException(e));
      ret = 1;
    }
  }  else { // local mode
    try {

      try (CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf)) {
        if (proc instanceof IDriver) {
          // Let Driver strip comments using sql parser
          ret = processLocalCmd(cmd, proc, ss);
        } else {
          ret = processLocalCmd(cmd_trimmed, proc, ss);
        }
      }
    } catch (SQLException e) {
      console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
        org.apache.hadoop.util.StringUtils.stringifyException(e));
      ret = 1;
    }
    catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  ss.resetThreadName();
  return ret;
}

这个方法会根据exit和quit关键字关闭会话,结束cli。之后根据是否解析出source关键字、是否“!”开头做了一些骚操作,不是重点。最后一个else分支中try的操作才是需要关注的核心。首先根据配置项构建CommandProcessor,如果proc是IDriver类的实例对象就传参cmd否则传参cmd_trimeed,都是执行processLocalCmd方法。这个IDriver显然是个很重要的类。

首先CommandProcessor是个接口类:

在这里插入图片描述

而IDriver是其一个继承的接口类。除了IDriver,剩下的都是普通实现类。显然这普通实现类就是执行add jar这类非SQL命令的处理器,HQL具体执行还是依靠IDriver:

package org.apache.hadoop.hive.ql;

/**
 * Hive query executer driver
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface IDriver extends CommandProcessor {

  int compile(String string);

  CommandProcessorResponse compileAndRespond(String statement);

  QueryPlan getPlan();

  QueryDisplay getQueryDisplay();

  void setOperationId(String guid64);

  CommandProcessorResponse run();
  @Override
  CommandProcessorResponse run(String command);


  // create some "cover" to the result?
  boolean getResults(List res) throws IOException;

  void setMaxRows(int maxRows);

  FetchTask getFetchTask();

  Schema getSchema();

  boolean isFetchingTable();

  void resetFetch() throws IOException;

  // close&destroy is used in seq coupling most of the time - the difference is either not clear; or not relevant - remove?
  @Override
  void close();
  void destroy();

  HiveConf getConf();

  Context getContext();

  boolean hasResultSet();

}

这个接口类有编译compile方法和获取执行计划getPlan方法,显然是需要关注的类。

执行本地命令行的processLocalCmd方法

package org.apache.hadoop.hive.cli;
public class CliDriver {}
int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) {
  boolean escapeCRLF = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CLI_PRINT_ESCAPE_CRLF);
  int ret = 0;

  if (proc != null) {
    if (proc instanceof IDriver) {
      IDriver qp = (IDriver) proc;
      PrintStream out = ss.out;
      long start = System.currentTimeMillis();
      if (ss.getIsVerbose()) {
        out.println(cmd);
      }

      ret = qp.run(cmd).getResponseCode();
      if (ret != 0) {
        qp.close();
        return ret;
      }

      // query has run capture the time
      long end = System.currentTimeMillis();
      double timeTaken = (end - start) / 1000.0;

      ArrayList<String> res = new ArrayList<String>();

      printHeader(qp, out);

      // print the results
      int counter = 0;
      try {
        if (out instanceof FetchConverter) {
          ((FetchConverter) out).fetchStarted();
        }
        while (qp.getResults(res)) {
          for (String r : res) {
                if (escapeCRLF) {
                  r = EscapeCRLFHelper.escapeCRLF(r);
                }
            out.println(r);
          }
          counter += res.size();
          res.clear();
          if (out.checkError()) {
            break;
          }
        }
      } catch (IOException e) {
        console.printError("Failed with exception " + e.getClass().getName() + ":" + e.getMessage(),
            "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
        ret = 1;
      }

      qp.close();

      if (out instanceof FetchConverter) {
        ((FetchConverter) out).fetchFinished();
      }

      console.printInfo(
          "Time taken: " + timeTaken + " seconds" + (counter == 0 ? "" : ", Fetched: " + counter + " row(s)"));
    } else {
      String firstToken = tokenizeCmd(cmd.trim())[0];
      String cmd_1 = getFirstCmd(cmd.trim(), firstToken.length());

      if (ss.getIsVerbose()) {
        ss.out.println(firstToken + " " + cmd_1);
      }
      CommandProcessorResponse res = proc.run(cmd_1);
      if (res.getResponseCode() != 0) {
        ss.out
            .println("Query returned non-zero code: " + res.getResponseCode() + ", cause: " + res.getErrorMessage());
      }
      if (res.getConsoleMessages() != null) {
        for (String consoleMsg : res.getConsoleMessages()) {
          console.printInfo(consoleMsg);
        }
      }
      ret = res.getResponseCode();
    }
  }

  return ret;
}

这个方法又有2个分支:如果传入的proc是IDriver类的实例对象就强转,然后执行Run方法并获取返回码。显示结果和关闭对象的操作不是重点。

如果不是IDriver类的实例对象就解析参数,然后同样是执行run方法,显然最后肯定是执行了各种不同的实现类具体的实现方法。需要关注的当然就是IDriver的run方法。

Driver的run方法

首先查看接口的实现类:

在这里插入图片描述

可以看到有Driver和ReExecDriver2种实现类,根据名称就可以知道,需要先看Driver类再去看重新执行的ReExecDriver类。其实只需要关注Driver类的run方法就好:

package org.apache.hadoop.hive.ql;

public class Driver implements IDriver {}
public CommandProcessorResponse run(String command) {
  return run(command, false);
}
  public CommandProcessorResponse run(String command, boolean alreadyCompiled) {

    try {
      runInternal(command, alreadyCompiled);
      return createProcessorResponse(0);
    } catch (CommandProcessorResponse cpr) {

    SessionState ss = SessionState.get();
    if(ss == null) {
      return cpr;
    }
    MetaDataFormatter mdf = MetaDataFormatUtils.getFormatter(ss.getConf());
    if(!(mdf instanceof JsonMetaDataFormatter)) {
      return cpr;
    }
    /*Here we want to encode the error in machine readable way (e.g. JSON)
     * Ideally, errorCode would always be set to a canonical error defined in ErrorMsg.
     * In practice that is rarely the case, so the messy logic below tries to tease
     * out canonical error code if it can.  Exclude stack trace from output when
     * the error is a specific/expected one.
     * It's written to stdout for backward compatibility (WebHCat consumes it).*/
    try {
      if(downstreamError == null) {
        mdf.error(ss.out, errorMessage, cpr.getResponseCode(), SQLState);
        return cpr;
      }
      ErrorMsg canonicalErr = ErrorMsg.getErrorMsg(cpr.getResponseCode());
      if(canonicalErr != null && canonicalErr != ErrorMsg.GENERIC_ERROR) {
        /*Some HiveExceptions (e.g. SemanticException) don't set
          canonical ErrorMsg explicitly, but there is logic
          (e.g. #compile()) to find an appropriate canonical error and
          return its code as error code. In this case we want to
          preserve it for downstream code to interpret*/
        mdf.error(ss.out, errorMessage, cpr.getResponseCode(), SQLState, null);
        return cpr;
      }
      if(downstreamError instanceof HiveException) {
        HiveException rc = (HiveException) downstreamError;
        mdf.error(ss.out, errorMessage,
                rc.getCanonicalErrorMsg().getErrorCode(), SQLState,
                rc.getCanonicalErrorMsg() == ErrorMsg.GENERIC_ERROR ?
                        org.apache.hadoop.util.StringUtils.stringifyException(rc)
                        : null);
      }
      else {
        ErrorMsg canonicalMsg =
                ErrorMsg.getErrorMsg(downstreamError.getMessage());
        mdf.error(ss.out, errorMessage, canonicalMsg.getErrorCode(),
                SQLState, org.apache.hadoop.util.StringUtils.
                stringifyException(downstreamError));
      }
    }
    catch(HiveException ex) {
      console.printError("Unable to JSON-encode the error",
              org.apache.hadoop.util.StringUtils.stringifyException(ex));
    }
    return cpr;
    }
  }

当然此时alreadyCompiled=false,处于未编译状态。真正有用的也就是runInternal(command, alreadyCompiled)方法。

Driver的runInternal方法

package org.apache.hadoop.hive.ql;

public class Driver implements IDriver {}
private void runInternal(String command, boolean alreadyCompiled) throws CommandProcessorResponse {
  errorMessage = null;
  SQLState = null;
  downstreamError = null;
  LockedDriverState.setLockedDriverState(lDrvState);

  lDrvState.stateLock.lock();
  try {
    if (alreadyCompiled) {
      if (lDrvState.driverState == DriverState.COMPILED) {
        lDrvState.driverState = DriverState.EXECUTING;
      } else {
        errorMessage = "FAILED: Precompiled query has been cancelled or closed.";
        console.printError(errorMessage);
        throw createProcessorResponse(12);
      }
    } else {
      lDrvState.driverState = DriverState.COMPILING;
    }
  } finally {
    lDrvState.stateLock.unlock();
  }

  // a flag that helps to set the correct driver state in finally block by tracking if
  // the method has been returned by an error or not.
  boolean isFinishedWithError = true;
  try {
    HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf,
        alreadyCompiled ? ctx.getCmd() : command);
    // Get all the driver run hooks and pre-execute them.
    try {
      hookRunner.runPreDriverHooks(hookContext);
    } catch (Exception e) {
      errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
      SQLState = ErrorMsg.findSQLState(e.getMessage());
      downstreamError = e;
      console.printError(errorMessage + "\n"
          + org.apache.hadoop.util.StringUtils.stringifyException(e));
      throw createProcessorResponse(12);
    }

    PerfLogger perfLogger = null;

    if (!alreadyCompiled) {
      // compile internal will automatically reset the perf logger
      compileInternal(command, true);
      // then we continue to use this perf logger
      perfLogger = SessionState.getPerfLogger();
    } else {
      // reuse existing perf logger.
      perfLogger = SessionState.getPerfLogger();
      // Since we're reusing the compiled plan, we need to update its start time for current run
      plan.setQueryStartTime(perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
    }
    // the reason that we set the txn manager for the cxt here is because each
    // query has its own ctx object. The txn mgr is shared across the
    // same instance of Driver, which can run multiple queries.
    ctx.setHiveTxnManager(queryTxnMgr);

    checkInterrupted("at acquiring the lock.", null, null);

    lockAndRespond();

    try {
      if (!isValidTxnListState()) {
        LOG.info("Compiling after acquiring locks");
        // Snapshot was outdated when locks were acquired, hence regenerate context,
        // txn list and retry
        // TODO: Lock acquisition should be moved before analyze, this is a bit hackish.
        // Currently, we acquire a snapshot, we compile the query wrt that snapshot,
        // and then, we acquire locks. If snapshot is still valid, we continue as usual.
        // But if snapshot is not valid, we recompile the query.
        retrial = true;
        backupContext.addRewrittenStatementContext(ctx);
        backupContext.setHiveLocks(ctx.getHiveLocks());
        ctx = backupContext;
        conf.set(ValidTxnList.VALID_TXNS_KEY, queryTxnMgr.getValidTxns().toString());
        if (plan.hasAcidResourcesInQuery()) {
          recordValidWriteIds(queryTxnMgr);
        }

        if (!alreadyCompiled) {
          // compile internal will automatically reset the perf logger
          compileInternal(command, true);
        } else {
          // Since we're reusing the compiled plan, we need to update its start time for current run
          plan.setQueryStartTime(queryDisplay.getQueryStartTime());
        }

        if (!isValidTxnListState()) {
          // Throw exception
          throw handleHiveException(new HiveException("Operation could not be executed"), 14);
        }

        //Reset the PerfLogger
        perfLogger = SessionState.getPerfLogger(true);

        // the reason that we set the txn manager for the cxt here is because each
        // query has its own ctx object. The txn mgr is shared across the
        // same instance of Driver, which can run multiple queries.
        ctx.setHiveTxnManager(queryTxnMgr);
      }
    } catch (LockException e) {
      throw handleHiveException(e, 13);
    }

    try {
      execute();
    } catch (CommandProcessorResponse cpr) {
      rollback(cpr);
      throw cpr;
    }

    //if needRequireLock is false, the release here will do nothing because there is no lock
    try {
      //since set autocommit starts an implicit txn, close it
      if(queryTxnMgr.isImplicitTransactionOpen() || plan.getOperation() == HiveOperation.COMMIT) {
        releaseLocksAndCommitOrRollback(true);
      }
      else if(plan.getOperation() == HiveOperation.ROLLBACK) {
        releaseLocksAndCommitOrRollback(false);
      }
      else {
        //txn (if there is one started) is not finished
      }
    } catch (LockException e) {
      throw handleHiveException(e, 12);
    }

    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_RUN);
    queryDisplay.setPerfLogStarts(QueryDisplay.Phase.EXECUTION, perfLogger.getStartTimes());
    queryDisplay.setPerfLogEnds(QueryDisplay.Phase.EXECUTION, perfLogger.getEndTimes());

    // Take all the driver run hooks and post-execute them.
    try {
      hookRunner.runPostDriverHooks(hookContext);
    } catch (Exception e) {
      errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
      SQLState = ErrorMsg.findSQLState(e.getMessage());
      downstreamError = e;
      console.printError(errorMessage + "\n"
          + org.apache.hadoop.util.StringUtils.stringifyException(e));
      throw createProcessorResponse(12);
    }
    isFinishedWithError = false;
  } finally {
    if (lDrvState.isAborted()) {
      closeInProcess(true);
    } else {
      // only release the related resources ctx, driverContext as normal
      releaseResources();
    }

    lDrvState.stateLock.lock();
    try {
      lDrvState.driverState = isFinishedWithError ? DriverState.ERROR : DriverState.EXECUTED;
    } finally {
      lDrvState.stateLock.unlock();
    }
  }
}

这里最重要的就是compileInternal方法去编译HQL,以及execute方法具体执行。

Driver的complieInternal

package org.apache.hadoop.hive.ql;

public class Driver implements IDriver {}
  private void compileInternal(String command, boolean deferClose) throws CommandProcessorResponse {
    Metrics metrics = MetricsFactory.getInstance();
    if (metrics != null) {
      metrics.incrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
    }

    PerfLogger perfLogger = SessionState.getPerfLogger();
    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.WAIT_COMPILE);
    final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled,
      command);
    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.WAIT_COMPILE);
    if (metrics != null) {
      metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
    }

    if (compileLock == null) {
      throw createProcessorResponse(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode());
    }


    try {
      compile(command, true, deferClose);
    } catch (CommandProcessorResponse cpr) {
      try {
        releaseLocksAndCommitOrRollback(false);
      } catch (LockException e) {
        LOG.warn("Exception in releasing locks. " + org.apache.hadoop.util.StringUtils.stringifyException(e));
      }
      throw cpr;
    } finally {
      compileLock.unlock();
    }

    //Save compile-time PerfLogging for WebUI.
    //Execution-time Perf logs are done by either another thread's PerfLogger
    //or a reset PerfLogger.
    queryDisplay.setPerfLogStarts(QueryDisplay.Phase.COMPILATION, perfLogger.getStartTimes());
    queryDisplay.setPerfLogEnds(QueryDisplay.Phase.COMPILATION, perfLogger.getEndTimes());
  }

显然还要执行compile方法。

Driver的compile方法

// deferClose indicates if the close/destroy should be deferred when the process has been
// interrupted, it should be set to true if the compile is called within another method like
// runInternal, which defers the close to the called in that method.
private void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorResponse {
  PerfLogger perfLogger = SessionState.getPerfLogger(true);
  perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);
  perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
  lDrvState.stateLock.lock();
  try {
    lDrvState.driverState = DriverState.COMPILING;
  } finally {
    lDrvState.stateLock.unlock();
  }

  command = new VariableSubstitution(new HiveVariableSource() {
    @Override
    public Map<String, String> getHiveVariable() {
      return SessionState.get().getHiveVariables();
    }
  }).substitute(conf, command);

  String queryStr = command;

  try {
    // command should be redacted to avoid to logging sensitive data
    queryStr = HookUtils.redactLogString(conf, command);
  } catch (Exception e) {
    LOG.warn("WARNING! Query command could not be redacted." + e);
  }

  checkInterrupted("at beginning of compilation.", null, null);

  if (ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) {
    // close the existing ctx etc before compiling a new query, but does not destroy driver
    closeInProcess(false);
  }

  if (resetTaskIds) {
    TaskFactory.resetId();
  }

  LockedDriverState.setLockedDriverState(lDrvState);

  String queryId = queryState.getQueryId();

  if (ctx != null) {
    setTriggerContext(queryId);
  }
  //save some info for webUI for use after plan is freed
  this.queryDisplay.setQueryStr(queryStr);
  this.queryDisplay.setQueryId(queryId);

  LOG.info("Compiling command(queryId=" + queryId + "): " + queryStr);

  conf.setQueryString(queryStr);
  // FIXME: sideeffect will leave the last query set at the session level
  if (SessionState.get() != null) {
    SessionState.get().getConf().setQueryString(queryStr);
    SessionState.get().setupQueryCurrentTimestamp();
  }

  // Whether any error occurred during query compilation. Used for query lifetime hook.
  boolean compileError = false;
  boolean parseError = false;

  try {

    // Initialize the transaction manager.  This must be done before analyze is called.
    if (initTxnMgr != null) {
      queryTxnMgr = initTxnMgr;
    } else {
      queryTxnMgr = SessionState.get().initTxnMgr(conf);
    }
    if (queryTxnMgr instanceof Configurable) {
      ((Configurable) queryTxnMgr).setConf(conf);
    }
    queryState.setTxnManager(queryTxnMgr);

    // In case when user Ctrl-C twice to kill Hive CLI JVM, we want to release locks
    // if compile is being called multiple times, clear the old shutdownhook
    ShutdownHookManager.removeShutdownHook(shutdownRunner);
    final HiveTxnManager txnMgr = queryTxnMgr;
    shutdownRunner = new Runnable() {
      @Override
      public void run() {
        try {
          releaseLocksAndCommitOrRollback(false, txnMgr);
        } catch (LockException e) {
          LOG.warn("Exception when releasing locks in ShutdownHook for Driver: " +
              e.getMessage());
        }
      }
    };
    ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY);

    checkInterrupted("before parsing and analysing the query", null, null);

    if (ctx == null) {
      ctx = new Context(conf);
      setTriggerContext(queryId);
    }

    ctx.setHiveTxnManager(queryTxnMgr);
    ctx.setStatsSource(statsSource);
    ctx.setCmd(command);
    ctx.setHDFSCleanup(true);

    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);

    // Trigger query hook before compilation
    hookRunner.runBeforeParseHook(command);

    ASTNode tree;
    try {
      tree = ParseUtils.parse(command, ctx);
    } catch (ParseException e) {
      parseError = true;
      throw e;
    } finally {
      hookRunner.runAfterParseHook(command, parseError);
    }
    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);

    hookRunner.runBeforeCompileHook(command);
    // clear CurrentFunctionsInUse set, to capture new set of functions
    // that SemanticAnalyzer finds are in use
    SessionState.get().getCurrentFunctionsInUse().clear();
    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);

    // Flush the metastore cache.  This assures that we don't pick up objects from a previous
    // query running in this same thread.  This has to be done after we get our semantic
    // analyzer (this is when the connection to the metastore is made) but before we analyze,
    // because at that point we need access to the objects.
    Hive.get().getMSC().flushCache();

    backupContext = new Context(ctx);
    boolean executeHooks = hookRunner.hasPreAnalyzeHooks();

    HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
    if (executeHooks) {
      hookCtx.setConf(conf);
      hookCtx.setUserName(userName);
      hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
      hookCtx.setCommand(command);
      hookCtx.setHiveOperation(queryState.getHiveOperation());

      tree =  hookRunner.runPreAnalyzeHooks(hookCtx, tree);
    }

    // Do semantic analysis and plan generation
    BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);

    if (!retrial) {
      openTransaction();
      generateValidTxnList();
    }

    sem.analyze(tree, ctx);

    if (executeHooks) {
      hookCtx.update(sem);
      hookRunner.runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks());
    }

    LOG.info("Semantic Analysis Completed (retrial = {})", retrial);

    // Retrieve information about cache usage for the query.
    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) {
      cacheUsage = sem.getCacheUsage();
    }

    // validate the plan
    sem.validate();
    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);

    checkInterrupted("after analyzing query.", null, null);

    // get the output schema
    schema = getSchema(sem, conf);
    plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
      queryState.getHiveOperation(), schema);

    conf.set("mapreduce.workflow.id", "hive_" + queryId);
    conf.set("mapreduce.workflow.name", queryStr);

    // initialize FetchTask right here
    if (plan.getFetchTask() != null) {
      plan.getFetchTask().initialize(queryState, plan, null, ctx.getOpContext());
    }

    //do the authorization check
    if (!sem.skipAuthorization() &&
        HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {

      try {
        perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);
        doAuthorization(queryState.getHiveOperation(), sem, command);
      } catch (AuthorizationException authExp) {
        console.printError("Authorization failed:" + authExp.getMessage()
            + ". Use SHOW GRANT to get more details.");
        errorMessage = authExp.getMessage();
        SQLState = "42000";
        throw createProcessorResponse(403);
      } finally {
        perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);
      }
    }

    if (conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {
      String explainOutput = getExplainOutput(sem, plan, tree);
      if (explainOutput != null) {
        LOG.info("EXPLAIN output for queryid " + queryId + " : "
          + explainOutput);
        if (conf.isWebUiQueryInfoCacheEnabled()) {
          queryDisplay.setExplainPlan(explainOutput);
        }
      }
    }
  } catch (CommandProcessorResponse cpr) {
    throw cpr;
  } catch (Exception e) {
    checkInterrupted("during query compilation: " + e.getMessage(), null, null);

    compileError = true;
    ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());
    errorMessage = "FAILED: " + e.getClass().getSimpleName();
    if (error != ErrorMsg.GENERIC_ERROR) {
      errorMessage += " [Error "  + error.getErrorCode()  + "]:";
    }

    // HIVE-4889
    if ((e instanceof IllegalArgumentException) && e.getMessage() == null && e.getCause() != null) {
      errorMessage += " " + e.getCause().getMessage();
    } else {
      errorMessage += " " + e.getMessage();
    }

    if (error == ErrorMsg.TXNMGR_NOT_ACID) {
      errorMessage += ". Failed command: " + queryStr;
    }

    SQLState = error.getSQLState();
    downstreamError = e;
    console.printError(errorMessage, "\n"
        + org.apache.hadoop.util.StringUtils.stringifyException(e));
    throw createProcessorResponse(error.getErrorCode());
  } finally {
    // Trigger post compilation hook. Note that if the compilation fails here then
    // before/after execution hook will never be executed.
    if (!parseError) {
      try {
        hookRunner.runAfterCompilationHook(command, compileError);
      } catch (Exception e) {
        LOG.warn("Failed when invoking query after-compilation hook.", e);
      }
    }

    double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE)/1000.00;
    ImmutableMap<String, Long> compileHMSTimings = dumpMetaCallTimingWithoutEx("compilation");
    queryDisplay.setHmsTimings(QueryDisplay.Phase.COMPILATION, compileHMSTimings);

    boolean isInterrupted = lDrvState.isAborted();
    if (isInterrupted && !deferClose) {
      closeInProcess(true);
    }
    lDrvState.stateLock.lock();
    try {
      if (isInterrupted) {
        lDrvState.driverState = deferClose ? DriverState.EXECUTING : DriverState.ERROR;
      } else {
        lDrvState.driverState = compileError ? DriverState.ERROR : DriverState.COMPILED;
      }
    } finally {
      lDrvState.stateLock.unlock();
    }

    if (isInterrupted) {
      LOG.info("Compiling command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds");
    } else {
      LOG.info("Completed compiling command(queryId=" + queryId + "); Time taken: " + duration + " seconds");
    }
  }
}

这里通过tree = ParseUtils.parse(command, ctx)解析到AST树,之后刷新了函数及MetaStore。然后还要sem.analyze(tree, ctx)方法进行语义分析,sem.validate()方法做计划生成。

这个BaseSemanticAnalyzer类其实是个抽象类:

package org.apache.hadoop.hive.ql.parse;
public abstract class BaseSemanticAnalyzer {}

  public void analyze(ASTNode ast, Context ctx) throws SemanticException {
    initCtx(ctx);
    init(true);
    analyzeInternal(ast);
  }

  public void validate() throws SemanticException {
    // Implementations may choose to override this
  }

自然没什么用,还要看实现类overwrite的方法怎么实现。

在这里插入图片描述

根据名称就可以看出,有解析DDL的SemanticAnalyzer、解析Explain命令的ExplainSemanticAnalyzer、解析UDF的FunctionSemanticAnalyzer,当然最主要的就是SemanticAnalyzer,这个是解析常规SQL的。

至此获取到执行计划。

Driver的execute方法

package org.apache.hadoop.hive.ql;

public class Driver implements IDriver {}
private void execute() throws CommandProcessorResponse {
  PerfLogger perfLogger = SessionState.getPerfLogger();
  perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);

  boolean noName = StringUtils.isEmpty(conf.get(MRJobConfig.JOB_NAME));
  int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
  Metrics metrics = MetricsFactory.getInstance();

  String queryId = queryState.getQueryId();
  // Get the query string from the conf file as the compileInternal() method might
  // hide sensitive information during query redaction.
  String queryStr = conf.getQueryString();

  lDrvState.stateLock.lock();
  try {
    // if query is not in compiled state, or executing state which is carried over from
    // a combined compile/execute in runInternal, throws the error
    if (lDrvState.driverState != DriverState.COMPILED &&
        lDrvState.driverState != DriverState.EXECUTING) {
      SQLState = "HY008";
      errorMessage = "FAILED: unexpected driverstate: " + lDrvState + ", for query " + queryStr;
      console.printError(errorMessage);
      throw createProcessorResponse(1000);
    } else {
      lDrvState.driverState = DriverState.EXECUTING;
    }
  } finally {
    lDrvState.stateLock.unlock();
  }

  maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER);

  HookContext hookContext = null;

  // Whether there's any error occurred during query execution. Used for query lifetime hook.
  boolean executionError = false;

  try {
    LOG.info("Executing command(queryId=" + queryId + "): " + queryStr);
    // compile and execute can get called from different threads in case of HS2
    // so clear timing in this thread's Hive object before proceeding.
    Hive.get().clearMetaCallTiming();

    plan.setStarted();

    if (SessionState.get() != null) {
      SessionState.get().getHiveHistory().startQuery(queryStr, queryId);
      SessionState.get().getHiveHistory().logPlanProgress(plan);
    }
    resStream = null;

    SessionState ss = SessionState.get();

    // TODO: should this use getUserFromAuthenticator?
    hookContext = new PrivateHookContext(plan, queryState, ctx.getPathToCS(), SessionState.get().getUserName(),
        ss.getUserIpAddress(), InetAddress.getLocalHost().getHostAddress(), operationId,
        ss.getSessionId(), Thread.currentThread().getName(), ss.isHiveServerQuery(), perfLogger, queryInfo, ctx);
    hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);

    hookRunner.runPreHooks(hookContext);

    // Trigger query hooks before query execution.
    hookRunner.runBeforeExecutionHook(queryStr, hookContext);

    setQueryDisplays(plan.getRootTasks());
    int mrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
    int jobs = mrJobs + Utilities.getTezTasks(plan.getRootTasks()).size()
        + Utilities.getSparkTasks(plan.getRootTasks()).size();
    if (jobs > 0) {
      logMrWarning(mrJobs);
      console.printInfo("Query ID = " + queryId);
      console.printInfo("Total jobs = " + jobs);
    }
    if (SessionState.get() != null) {
      SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_NUM_TASKS,
          String.valueOf(jobs));
      SessionState.get().getHiveHistory().setIdToTableMap(plan.getIdToTableNameMap());
    }
    String jobname = Utilities.abbreviate(queryStr, maxlen - 6);

    // A runtime that launches runnable tasks as separate Threads through
    // TaskRunners
    // As soon as a task isRunnable, it is put in a queue
    // At any time, at most maxthreads tasks can be running
    // The main thread polls the TaskRunners to check if they have finished.

    checkInterrupted("before running tasks.", hookContext, perfLogger);

    DriverContext driverCxt = new DriverContext(ctx);
    driverCxt.prepare(plan);

    ctx.setHDFSCleanup(true);
    this.driverCxt = driverCxt; // for canceling the query (should be bound to session?)

    SessionState.get().setMapRedStats(new LinkedHashMap<>());
    SessionState.get().setStackTraces(new HashMap<>());
    SessionState.get().setLocalMapRedErrors(new HashMap<>());

    // Add root Tasks to runnable
    for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
      // This should never happen, if it does, it's a bug with the potential to produce
      // incorrect results.
      assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
      driverCxt.addToRunnable(tsk);

      if (metrics != null) {
        tsk.updateTaskMetrics(metrics);
      }
    }

    preExecutionCacheActions();

    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
    // Loop while you either have tasks running, or tasks queued up
    while (driverCxt.isRunning()) {
      // Launch upto maxthreads tasks
      Task<? extends Serializable> task;
      while ((task = driverCxt.getRunnable(maxthreads)) != null) {
        TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
        if (!runner.isRunning()) {
          break;
        }
      }

      // poll the Tasks to see which one completed
      TaskRunner tskRun = driverCxt.pollFinished();
      if (tskRun == null) {
        continue;
      }
      /*
        This should be removed eventually. HIVE-17814 gives more detail
        explanation of whats happening and HIVE-17815 as to why this is done.
        Briefly for replication the graph is huge and so memory pressure is going to be huge if
        we keep a lot of references around.
      */
      String opName = plan.getOperationName();
      boolean isReplicationOperation = opName.equals(HiveOperation.REPLDUMP.getOperationName())
          || opName.equals(HiveOperation.REPLLOAD.getOperationName());
      if (!isReplicationOperation) {
        hookContext.addCompleteTask(tskRun);
      }

      queryDisplay.setTaskResult(tskRun.getTask().getId(), tskRun.getTaskResult());

      Task<? extends Serializable> tsk = tskRun.getTask();
      TaskResult result = tskRun.getTaskResult();

      int exitVal = result.getExitVal();
      checkInterrupted("when checking the execution result.", hookContext, perfLogger);

      if (exitVal != 0) {
        Task<? extends Serializable> backupTask = tsk.getAndInitBackupTask();
        if (backupTask != null) {
          setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
          console.printError(errorMessage);
          errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName();
          console.printError(errorMessage);

          // add backup task to runnable
          if (DriverContext.isLaunchable(backupTask)) {
            driverCxt.addToRunnable(backupTask);
          }
          continue;

        } else {
          setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
          if (driverCxt.isShutdown()) {
            errorMessage = "FAILED: Operation cancelled. " + errorMessage;
          }
          invokeFailureHooks(perfLogger, hookContext,
            errorMessage + Strings.nullToEmpty(tsk.getDiagnosticsMessage()), result.getTaskError());
          SQLState = "08S01";

          // 08S01 (Communication error) is the default sql state.  Override the sqlstate
          // based on the ErrorMsg set in HiveException.
          if (result.getTaskError() instanceof HiveException) {
            ErrorMsg errorMsg = ((HiveException) result.getTaskError()).
                getCanonicalErrorMsg();
            if (errorMsg != ErrorMsg.GENERIC_ERROR) {
              SQLState = errorMsg.getSQLState();
            }
          }

          console.printError(errorMessage);
          driverCxt.shutdown();
          // in case we decided to run everything in local mode, restore the
          // the jobtracker setting to its initial value
          ctx.restoreOriginalTracker();
          throw createProcessorResponse(exitVal);
        }
      }

      driverCxt.finished(tskRun);

      if (SessionState.get() != null) {
        SessionState.get().getHiveHistory().setTaskProperty(queryId, tsk.getId(),
            Keys.TASK_RET_CODE, String.valueOf(exitVal));
        SessionState.get().getHiveHistory().endTask(queryId, tsk);
      }

      if (tsk.getChildTasks() != null) {
        for (Task<? extends Serializable> child : tsk.getChildTasks()) {
          if (DriverContext.isLaunchable(child)) {
            driverCxt.addToRunnable(child);
          }
        }
      }
    }
    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS);

    postExecutionCacheActions();

    // in case we decided to run everything in local mode, restore the
    // the jobtracker setting to its initial value
    ctx.restoreOriginalTracker();

    if (driverCxt.isShutdown()) {
      SQLState = "HY008";
      errorMessage = "FAILED: Operation cancelled";
      invokeFailureHooks(perfLogger, hookContext, errorMessage, null);
      console.printError(errorMessage);
      throw createProcessorResponse(1000);
    }

    // remove incomplete outputs.
    // Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions.
    // remove them
    HashSet<WriteEntity> remOutputs = new LinkedHashSet<WriteEntity>();
    for (WriteEntity output : plan.getOutputs()) {
      if (!output.isComplete()) {
        remOutputs.add(output);
      }
    }

    for (WriteEntity output : remOutputs) {
      plan.getOutputs().remove(output);
    }


    hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK);

    hookRunner.runPostExecHooks(hookContext);

    if (SessionState.get() != null) {
      SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE,
          String.valueOf(0));
      SessionState.get().getHiveHistory().printRowCount(queryId);
    }
    releasePlan(plan);
  } catch (CommandProcessorResponse cpr) {
    executionError = true;
    throw cpr;
  } catch (Throwable e) {
    executionError = true;

    checkInterrupted("during query execution: \n" + e.getMessage(), hookContext, perfLogger);

    ctx.restoreOriginalTracker();
    if (SessionState.get() != null) {
      SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE,
          String.valueOf(12));
    }
    // TODO: do better with handling types of Exception here
    errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
    if (hookContext != null) {
      try {
        invokeFailureHooks(perfLogger, hookContext, errorMessage, e);
      } catch (Exception t) {
        LOG.warn("Failed to invoke failure hook", t);
      }
    }
    SQLState = "08S01";
    downstreamError = e;
    console.printError(errorMessage + "\n"
        + org.apache.hadoop.util.StringUtils.stringifyException(e));
    throw createProcessorResponse(12);
  } finally {
    // Trigger query hooks after query completes its execution.
    try {
      hookRunner.runAfterExecutionHook(queryStr, hookContext, executionError);
    } catch (Exception e) {
      LOG.warn("Failed when invoking query after execution hook", e);
    }

    if (SessionState.get() != null) {
      SessionState.get().getHiveHistory().endQuery(queryId);
    }
    if (noName) {
      conf.set(MRJobConfig.JOB_NAME, "");
    }
    double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_EXECUTE)/1000.00;

    ImmutableMap<String, Long> executionHMSTimings = dumpMetaCallTimingWithoutEx("execution");
    queryDisplay.setHmsTimings(QueryDisplay.Phase.EXECUTION, executionHMSTimings);

    Map<String, MapRedStats> stats = SessionState.get().getMapRedStats();
    if (stats != null && !stats.isEmpty()) {
      long totalCpu = 0;
      console.printInfo("MapReduce Jobs Launched: ");
      for (Map.Entry<String, MapRedStats> entry : stats.entrySet()) {
        console.printInfo("Stage-" + entry.getKey() + ": " + entry.getValue());
        totalCpu += entry.getValue().getCpuMSec();
      }
      console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu));
    }
    lDrvState.stateLock.lock();
    try {
      lDrvState.driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED;
    } finally {
      lDrvState.stateLock.unlock();
    }
    if (lDrvState.isAborted()) {
      LOG.info("Executing command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds");
    } else {
      LOG.info("Completed executing command(queryId=" + queryId + "); Time taken: " + duration + " seconds");
    }
  }

  if (console != null) {
    console.printInfo("OK");
  }
}

该方法的重点就是TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);这一步会根据运行时的最大线程数循环启动任务。

Driver的launchTask方法

/**
 * Launches a new task
 *
 * @param tsk
 *          task being launched
 * @param queryId
 *          Id of the query containing the task
 * @param noName
 *          whether the task has a name set
 * @param jobname
 *          name of the task, if it is a map-reduce job
 * @param jobs
 *          number of map-reduce jobs
 * @param cxt
 *          the driver context
 */
private TaskRunner launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName,
    String jobname, int jobs, DriverContext cxt) throws HiveException {
  if (SessionState.get() != null) {
    SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName());
  }
  if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
    if (noName) {
      conf.set(MRJobConfig.JOB_NAME, jobname + " (" + tsk.getId() + ")");
    }
    conf.set(DagUtils.MAPREDUCE_WORKFLOW_NODE_NAME, tsk.getId());
    Utilities.setWorkflowAdjacencies(conf, plan);
    cxt.incCurJobNo(1);
    console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
  }
  tsk.initialize(queryState, plan, cxt, ctx.getOpContext());
  TaskRunner tskRun = new TaskRunner(tsk);

  cxt.launching(tskRun);
  // Launch Task
  if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.canExecuteInParallel()) {
    // Launch it in the parallel mode, as a separate thread only for MR tasks
    if (LOG.isInfoEnabled()){
      LOG.info("Starting task [" + tsk + "] in parallel");
    }
    tskRun.start();
  } else {
    if (LOG.isInfoEnabled()){
      LOG.info("Starting task [" + tsk + "] in serial mode");
    }
    tskRun.runSequential();
  }
  return tskRun;
}

当HiveConf.java的:

EXECPARALLEL("hive.exec.parallel", false, "Whether to execute jobs in parallel")

这个参数被设置为true时,会并行启动任务。如果保留默认值,就是串行启动任务。

Driver的runSequential方法

package org.apache.hadoop.hive.ql.exec;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * TaskRunner implementation.
 **/

public class TaskRunner extends Thread {
  protected Task<? extends Serializable> tsk;
  protected TaskResult result;
  protected SessionState ss;
  private static AtomicLong taskCounter = new AtomicLong(0);
  private static ThreadLocal<Long> taskRunnerID = new ThreadLocal<Long>() {
    @Override
    protected Long initialValue() {
      return taskCounter.incrementAndGet();
    }
  };

  protected Thread runner;

  private static transient final Logger LOG = LoggerFactory.getLogger(TaskRunner.class);

  public TaskRunner(Task<? extends Serializable> tsk) {
    this.tsk = tsk;
    this.result = new TaskResult();
    ss = SessionState.get();
  }

  public Task<? extends Serializable> getTask() {
    return tsk;
  }

  public TaskResult getTaskResult() {
    return result;
  }

  public Thread getRunner() {
    return runner;
  }

  public boolean isRunning() {
    return result.isRunning();
  }

  @Override
  public void run() {
    runner = Thread.currentThread();
    try {
      SessionState.start(ss);
      runSequential();
    } finally {
      try {
        // Call Hive.closeCurrent() that closes the HMS connection, causes
        // HMS connection leaks otherwise.
        Hive.closeCurrent();
      } catch (Exception e) {
        LOG.warn("Exception closing Metastore connection:" + e.getMessage());
      }
      runner = null;
      result.setRunning(false);
    }
  }

  /**
   * Launches a task, and sets its exit value in the result variable.
   */

  public void runSequential() {
    int exitVal = -101;
    try {
      exitVal = tsk.executeTask(ss == null ? null : ss.getHiveHistory());
    } catch (Throwable t) {
      if (tsk.getException() == null) {
        tsk.setException(t);
      }
      LOG.error("Error in executeTask", t);
    }
    result.setExitVal(exitVal);
    if (tsk.getException() != null) {
      result.setTaskError(tsk.getException());
    }
  }

  public static long getTaskRunnerID () {
    return taskRunnerID.get();
  }
}

之后调用tsk.executeTask:

package org.apache.hadoop.hive.ql.exec;
/**
 * Task implementation.
 **/

public abstract class Task<T extends Serializable> implements Serializable, Node {}
  /**
   * This method is called in the Driver on every task. It updates counters and calls execute(),
   * which is overridden in each task
   *
   * @return return value of execute()
   */
  public int executeTask(HiveHistory hiveHistory) {
    try {
      this.setStarted();
      if (hiveHistory != null) {
        hiveHistory.logPlanProgress(queryPlan);
      }
      int retval = execute(driverContext);
      this.setDone();
      if (hiveHistory != null) {
        hiveHistory.logPlanProgress(queryPlan);
      }
      return retval;
    } catch (IOException e) {
      throw new RuntimeException("Unexpected error: " + e.getMessage(), e);
    }
  }

protected abstract int execute(DriverContext driverContext);

之后根据上下文对象执行execute方法。当然这货又是个抽象方法,需要先找到继承类/实现类:

在这里插入图片描述

其中3个类:MapredLocalTask、TezTask、SparkTask就是MapReduce、Tez、Spark运算引擎的调用类。

MapredLocalTask

package org.apache.hadoop.hive.ql.exec.mr;
/**
 * MapredLocalTask represents any local work (i.e.: client side work) that hive needs to
 * execute. E.g.: This is used for generating Hashtables for Mapjoins on the client
 * before the Join is executed on the cluster.
 *
 * MapRedLocalTask does not actually execute the work in process, but rather generates
 * a command using ExecDriver. ExecDriver is what will finally drive processing the records.
 */
public class MapredLocalTask extends Task<MapredLocalWork> implements Serializable {}
  @Override
  public int execute(DriverContext driverContext) {
    if (conf.getBoolVar(HiveConf.ConfVars.SUBMITLOCALTASKVIACHILD)) {
      // send task off to another jvm
      return executeInChildVM(driverContext);
    } else {
      // execute in process
      return executeInProcess(driverContext);
    }
  }

该方法还要使用一个HiveConf.java的配置:

SUBMITLOCALTASKVIACHILD("hive.exec.submit.local.task.via.child", true,
    "Determines whether local tasks (typically mapjoin hashtable generation phase) runs in \n" +
    "separate JVM (true recommended) or not. \n" +
    "Avoids the overhead of spawning new JVM, but can lead to out-of-memory issues.")

这种淘汰的MapReduce执行引擎大概看看就好,毕竟淘汰了。。。除了百e千e这种数据量Tez和Spark容易炸,才会不得不退而求其次,跑MapReduce,而且一跑就是十天半个月。。。对于只有千万到几亿这种小体量数据的公司,其实MPP数据库【Doris、StarRocks这种】也跑得动。。。

TezTask

package org.apache.hadoop.hive.ql.exec.tez;

/**
 *
 * TezTask handles the execution of TezWork. Currently it executes a graph of map and reduce work
 * using the Tez APIs directly.
 *
 */
@SuppressWarnings({"serial"})
public class TezTask extends Task<TezWork> {}
  @Override
  public int execute(DriverContext driverContext) {
    int rc = 1;
    boolean cleanContext = false;
    Context ctx = null;
    Ref<TezSessionState> sessionRef = Ref.from(null);

    try {
      // Get or create Context object. If we create it we have to clean it later as well.
      ctx = driverContext.getCtx();
      if (ctx == null) {
        ctx = new Context(conf);
        cleanContext = true;
        // some DDL task that directly executes a TezTask does not setup Context and hence TriggerContext.
        // Setting queryId is messed up. Some DDL tasks have executionId instead of proper queryId.
        String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID);
        WmContext wmContext = new WmContext(System.currentTimeMillis(), queryId);
        ctx.setWmContext(wmContext);
      }
      // Need to remove this static hack. But this is the way currently to get a session.
      SessionState ss = SessionState.get();
      // Note: given that we return pool sessions to the pool in the finally block below, and that
      //       we need to set the global to null to do that, this "reuse" may be pointless.
      TezSessionState session = sessionRef.value = ss.getTezSession();
      if (session != null && !session.isOpen()) {
        LOG.warn("The session: " + session + " has not been opened");
      }

      // We only need a username for UGI to use for groups; getGroups will fetch the groups
      // based on Hadoop configuration, as documented at
      // https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html
      String userName = getUserNameForGroups(ss);
      List<String> groups = null;
      if (userName == null) {
        userName = "anonymous";
      } else {
        try {
          groups = UserGroupInformation.createRemoteUser(userName).getGroups();
        } catch (Exception ex) {
          LOG.warn("Cannot obtain groups for " + userName, ex);
        }
      }
      MappingInput mi = new MappingInput(userName, groups,
          ss.getHiveVariables().get("wmpool"), ss.getHiveVariables().get("wmapp"));

      WmContext wmContext = ctx.getWmContext();
      // jobConf will hold all the configuration for hadoop, tez, and hive
      JobConf jobConf = utils.createConfiguration(conf);
      // Get all user jars from work (e.g. input format stuff).
      String[] allNonConfFiles = work.configureJobConfAndExtractJars(jobConf);
      // DAG scratch dir. We get a session from the pool so it may be different from Tez one.
      // TODO: we could perhaps reuse the same directory for HiveResources?
      Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), conf);
      CallerContext callerContext = CallerContext.create(
          "HIVE", queryPlan.getQueryId(), "HIVE_QUERY_ID", queryPlan.getQueryStr());

      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_GET_SESSION);
      session = sessionRef.value = WorkloadManagerFederation.getSession(
          sessionRef.value, conf, mi, getWork().getLlapMode(), wmContext);
      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_GET_SESSION);

      try {
        ss.setTezSession(session);
        LOG.info("Subscribed to counters: {} for queryId: {}", wmContext.getSubscribedCounters(),
          wmContext.getQueryId());

        // Ensure the session is open and has the necessary local resources.
        // This would refresh any conf resources and also local resources.
        ensureSessionHasResources(session, allNonConfFiles);

        // This is a combination of the jar stuff from conf, and not from conf.
        List<LocalResource> allNonAppResources = session.getLocalizedResources();
        logResources(allNonAppResources);

        Map<String, LocalResource> allResources = DagUtils.createTezLrMap(
            session.getAppJarLr(), allNonAppResources);

        // next we translate the TezWork to a Tez DAG
        DAG dag = build(jobConf, work, scratchDir, ctx, allResources);
        dag.setCallerContext(callerContext);

        // Note: we no longer call addTaskLocalFiles because all the resources are correctly
        //       updated in the session resource lists now, and thus added to vertices.
        //       If something breaks, dag.addTaskLocalFiles might need to be called here.

        // Check isShutdown opportunistically; it's never unset.
        if (this.isShutdown) {
          throw new HiveException("Operation cancelled");
        }
        DAGClient dagClient = submit(jobConf, dag, sessionRef);
        session = sessionRef.value;
        boolean wasShutdown = false;
        synchronized (dagClientLock) {
          assert this.dagClient == null;
          wasShutdown = this.isShutdown;
          if (!wasShutdown) {
            this.dagClient = dagClient;
          }
        }
        if (wasShutdown) {
          closeDagClientOnCancellation(dagClient);
          throw new HiveException("Operation cancelled");
        }

        // finally monitor will print progress until the job is done
        TezJobMonitor monitor = new TezJobMonitor(work.getAllWork(), dagClient, conf, dag, ctx);
        rc = monitor.monitorExecution();

        if (rc != 0) {
          this.setException(new HiveException(monitor.getDiagnostics()));
        }

        // fetch the counters
        try {
          Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
          counters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters();
        } catch (Exception err) {
          // Don't fail execution due to counters - just don't print summary info
          LOG.warn("Failed to get counters. Ignoring, summary info will be incomplete. " + err, err);
          counters = null;
        }
      } finally {
        // Note: due to TEZ-3846, the session may actually be invalid in case of some errors.
        //       Currently, reopen on an attempted reuse will take care of that; we cannot tell
        //       if the session is usable until we try.
        // We return this to the pool even if it's unusable; reopen is supposed to handle this.
        wmContext = ctx.getWmContext();
        try {
          if (sessionRef.value != null) {
            sessionRef.value.returnToSessionManager();
          }
        } catch (Exception e) {
          LOG.error("Failed to return session: {} to pool", session, e);
          throw e;
        }

        if (!conf.getVar(HiveConf.ConfVars.TEZ_SESSION_EVENTS_SUMMARY).equalsIgnoreCase("none") &&
          wmContext != null) {
          if (conf.getVar(HiveConf.ConfVars.TEZ_SESSION_EVENTS_SUMMARY).equalsIgnoreCase("json")) {
            wmContext.printJson(console);
          } else if (conf.getVar(HiveConf.ConfVars.TEZ_SESSION_EVENTS_SUMMARY).equalsIgnoreCase("text")) {
            wmContext.print(console);
          }
        }
      }

      if (LOG.isInfoEnabled() && counters != null
          && (HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
          Utilities.isPerfOrAboveLogging(conf))) {
        for (CounterGroup group: counters) {
          LOG.info(group.getDisplayName() +":");
          for (TezCounter counter: group) {
            LOG.info("   "+counter.getDisplayName()+": "+counter.getValue());
          }
        }
      }
    } catch (Exception e) {
      LOG.error("Failed to execute tez graph.", e);
      // rc will be 1 at this point indicating failure.
    } finally {
      Utilities.clearWork(conf);

      // Clear gWorkMap
      for (BaseWork w : work.getAllWork()) {
        JobConf workCfg = workToConf.get(w);
        if (workCfg != null) {
          Utilities.clearWorkMapForConf(workCfg);
        }
      }

      if (cleanContext) {
        try {
          ctx.clear();
        } catch (Exception e) {
          /*best effort*/
          LOG.warn("Failed to clean up after tez job", e);
        }
      }
      // need to either move tmp files or remove them
      DAGClient dagClient = null;
      synchronized (dagClientLock) {
        dagClient = this.dagClient;
        this.dagClient = null;
      }
      // TODO: not clear why we don't do the rest of the cleanup if dagClient is not created.
      //       E.g. jobClose will be called if we fail after dagClient creation but no before...
      //       DagClient as such should have no bearing on jobClose.
      if (dagClient != null) {
        // rc will only be overwritten if close errors out
        rc = close(work, rc, dagClient);
      }
    }
    return rc;
  }

根据注释可以看到,这是使用了Tez的Java API实现的。里边的具体实现过程也可以看出调用API的端倪。甚至还获取了UGI。。。

SparkTask

package org.apache.hadoop.hive.ql.exec.spark;
public class SparkTask extends Task<SparkWork> {}
  @Override
  public int execute(DriverContext driverContext) {

    int rc = 0;
    perfLogger = SessionState.getPerfLogger();
    SparkSession sparkSession = null;
    SparkSessionManager sparkSessionManager = null;
    try {
      printConfigInfo();
      sparkSessionManager = SparkSessionManagerImpl.getInstance();
      sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager);

      SparkWork sparkWork = getWork();
      sparkWork.setRequiredCounterPrefix(getOperatorCounters());

      // Submit the Spark job
      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
      submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB);
      jobRef = sparkSession.submit(driverContext, sparkWork);
      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);

      // If the driver context has been shutdown (due to query cancellation) kill the Spark job
      if (driverContext.isShutdown()) {
        LOG.warn("Killing Spark job");
        killJob();
        throw new HiveException("Operation is cancelled.");
      }

      // Get the Job Handle id associated with the Spark job
      sparkJobHandleId = jobRef.getJobId();

      // Add Spark job handle id to the Hive History
      addToHistory(Keys.SPARK_JOB_HANDLE_ID, jobRef.getJobId());

      LOG.debug("Starting Spark job with job handle id " + sparkJobHandleId);

      // Get the application id of the Spark app
      jobID = jobRef.getSparkJobStatus().getAppID();

      // Start monitoring the Spark job, returns when the Spark job has completed / failed, or if
      // a timeout occurs
      rc = jobRef.monitorJob();

      // Get the id the Spark job that was launched, returns -1 if no Spark job was launched
      sparkJobID = jobRef.getSparkJobStatus().getJobId();

      // Add Spark job id to the Hive History
      addToHistory(Keys.SPARK_JOB_ID, Integer.toString(sparkJobID));

      // Get the final state of the Spark job and parses its job info
      SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
      getSparkJobInfo(sparkJobStatus, rc);

      if (rc == 0) {
        sparkStatistics = sparkJobStatus.getSparkStatistics();
        printExcessiveGCWarning();
        if (LOG.isInfoEnabled() && sparkStatistics != null) {
          LOG.info(sparkStatisticsToString(sparkStatistics, sparkJobID));
        }
        LOG.info("Successfully completed Spark job[" + sparkJobID + "] with application ID " +
                jobID + " and task ID " + getId());
      } else if (rc == 2) { // Cancel job if the monitor found job submission timeout.
        // TODO: If the timeout is because of lack of resources in the cluster, we should
        // ideally also cancel the app request here. But w/o facilities from Spark or YARN,
        // it's difficult to do it on hive side alone. See HIVE-12650.
        LOG.debug("Failed to submit Spark job with job handle id " + sparkJobHandleId);
        LOG.info("Failed to submit Spark job for application id " + (Strings.isNullOrEmpty(jobID)
                ? "UNKNOWN" : jobID));
        killJob();
      } else if (rc == 4) {
        LOG.info("The spark job or one stage of it has too many tasks" +
            ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID );
        killJob();
      }

      if (this.jobID == null) {
        this.jobID = sparkJobStatus.getAppID();
      }
      sparkJobStatus.cleanup();
    } catch (Exception e) {
      String msg = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e) + "'";

      // Has to use full name to make sure it does not conflict with
      // org.apache.commons.lang.StringUtils
      console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
      LOG.error(msg, e);
      setException(e);
      if (e instanceof HiveException) {
        HiveException he = (HiveException) e;
        rc = he.getCanonicalErrorMsg().getErrorCode();
      } else {
        rc = 1;
      }
    } finally {
      startTime = perfLogger.getEndTime(PerfLogger.SPARK_SUBMIT_TO_RUNNING);
      // The startTime may not be set if the sparkTask finished too fast,
      // because SparkJobMonitor will sleep for 1 second then check the state,
      // right after sleep, the spark job may be already completed.
      // In this case, set startTime the same as submitTime.
      if (startTime < submitTime) {
        startTime = submitTime;
      }
      finishTime = perfLogger.getEndTime(PerfLogger.SPARK_RUN_JOB);
      Utilities.clearWork(conf);
      if (sparkSession != null && sparkSessionManager != null) {
        rc = close(rc);
        try {
          sparkSessionManager.returnSession(sparkSession);
        } catch (HiveException ex) {
          LOG.error("Failed to return the session to SessionManager", ex);
        }
      }
    }
    return rc;
  }

当然并没有像平时那种方式执行spark-submit,而是sparkSession.submit直接利用API去提交任务。Hive On Spark现在也没啥人用了,用Spark On Hive的Spark SQL还能享受catalyst优化器,尤其是Spark3.3.0较2.4.0做了很多SQL Boy们很喜欢的join优化,不比Tez香么?

Tez执行引擎

可以看出,Hive完成了AST解析、语义分析、计划生成后就会调用Tez的API去实际跑任务。显然只有Hive的参数是远远不够的,还需要对Tez的参数进行调整才能获得更好的计算性能。虽然离线批处理对时效性要求不算很高,对性能要求也不会很过分,但是能快一点也没什么错,节省的算力哪怕是给HBase做归档Merge都是极好的。

Tez运行时参数

参照官网:https://tez.apache.org/releases/0.10.1/tez-runtime-library-javadocs/configs/TezRuntimeConfiguration.html

TezRuntimeConfiguration


Property NameDefault ValueDescriptionTypeIs Private?Is Unstable?Is Evolving?
tez.runtime.cleanup.files.on.interruptfalseUsed only for internal testing. Strictly not recommended to be used elsewhere. This parameter could be changed/dropped later.booleantruefalsetrue
tez.runtime.combiner.classnullSpecifies a combiner class (primarily for Shuffle)stringfalsefalsefalse
tez.runtime.combine.min.spills3integerfalsefalsefalse
tez.runtime.compressnullbooleanfalsefalsefalse
tez.runtime.compress.codecnullstringfalsefalsefalse
tez.runtime.convert.user-payload.to.history-textfalseValue: Boolean Whether to publish configuration information to History logger. Default false.stringfalsefalsefalse
tez.runtime.empty.partitions.info-via-events.enabledtruebooleanfalsefalsefalse
tez.runtime.enable.final-merge.in.outputtrueExpert level setting. Enable final merge in ordered (defaultsorter/pipelinedsorter) outputs. Speculative execution needs to be turned off when disabling this parameter. //TODO: TEZ-2132booleanfalsefalsefalse
tez.runtime.group.comparator.classnullstringfalsefalsefalse
tez.runtime.ifile.readaheadtrueConfiguration key to enable/disable IFile readahead.booleanfalsefalsefalse
tez.runtime.ifile.readahead.bytes4194304Configuration key to set the IFile readahead length in bytes.integerfalsefalsefalse
tez.runtime.index.cache.memory.limit.bytes1048576integerfalsefalsefalse
tez.runtime.task.input.post-merge.buffer.percentnullfloatfalsefalsefalse
tez.runtime.internal.sorter.classnullstringfalsefalsefalse
tez.runtime.io.sort.factor100integerfalsefalsefalse
tez.runtime.io.sort.mb100integerfalsefalsefalse
tez.runtime.key.classnullstringfalsefalsefalse
tez.runtime.key.comparator.classnullstringfalsefalsefalse
tez.runtime.key.secondary.comparator.classnullstringfalsefalsefalse
tez.runtime.optimize.local.fetchtrueIf the shuffle input is on the local host bypass the http fetch and access the files directlybooleanfalsefalsefalse
tez.runtime.optimize.shared.fetchfalseShare data fetched between tasks running on the same host if applicablebooleanfalsefalsefalse
tez.runtime.partitioner.classnullSpecifies a partitioner class, which is used in Tez Runtime components like OnFileSortedOutputstringfalsefalsefalse
tez.runtime.pipelined-shuffle.enabledfalseExpert level setting. Enable pipelined shuffle in ordered outputs and in unordered partitioned outputs. In ordered cases, it works with PipelinedSorter. set tez.runtime.sort.threads to greater than 1 to enable pipelinedsorter. Ensure to set tez.runtime.enable.final-merge.in.output=false. Speculative execution needs to be turned off when using this parameter. //TODO: TEZ-2132booleanfalsefalsefalse
tez.runtime.pipelined.sorter.lazy-allocate.memoryfalseSetting this to true would enable sorter to auto-allocate memory on need basis in progressive fashion. Setting to false would allocate all available memory during initialization of sorter. In such cases,@link{#TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB} would be honored and memory specified in @link{#TEZ_RUNTIME_IO_SORT_MB} would be initialized upfront.booleanfalsefalsefalse
tez.runtime.pipelined.sorter.min-block.size.in.mb2000Tries to allocate @link{#TEZ_RUNTIME_IO_SORT_MB} in chunks specified in this parameter.integerfalsefalsefalse
tez.runtime.pipelined.sorter.sort.threads2integerfalsefalsefalse
tez.runtime.merge.progress.records10000integertruefalsetrue
tez.runtime.report.partition.statsnullReport partition statistics (e.g better scheduling in ShuffleVertexManager). TEZ-2496 This can be enabled/disabled at vertex level. {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats} defines the list of values that can be specified. TODO TEZ-3303 Given ShuffleVertexManager doesn’t consume precise stats yet. So do not set the value to “precise” yet when ShuffleVertexManager is used.stringfalsefalsefalse
tez.runtime.shuffle.acceptable.host-fetch.failure.fraction0.2floattruefalsetrue
tez.runtime.shuffle.batch.wait-1Expert level setting. How long should @link{ShuffleManager} wait for batching before sending the events in milliseconds. Set to -1 to not wait.integerfalsefalsefalse
tez.runtime.shuffle.buffersize8192integerfalsefalsefalse
tez.runtime.shuffle.connect.timeoutnullintegerfalsefalsefalse
tez.runtime.shuffle.memory-to-memory.enablefalsebooleanfalsefalsefalse
tez.runtime.shuffle.ssl.enablefalsebooleanfalsefalsefalse
tez.runtime.shuffle.failed.check.since-last.completiontruebooleantruefalsetrue
tez.runtime.shuffle.fetcher.use-shared-poolfalsebooleantruefalsetrue
tez.runtime.shuffle.fetch.buffer.percent0.9floatfalsefalsefalse
tez.runtime.shuffle.fetch.failures.limit5integerfalsefalsefalse
tez.runtime.shuffle.fetch.max.task.output.at.once20integerfalsefalsefalse
tez.runtime.shuffle.fetch.verify-disk-checksumtrueControls verification of data checksums when fetching data directly to disk. Enabling verification allows the fetcher to detect corrupted data and report the failure against the upstream task before the data reaches the Processor and causes the fetching task to fail.booleanfalsefalsefalse
tez.runtime.shuffle.host.penalty.time.limit600000Specifies in milliseconds the maximum delay a penalized host can have before being retried, defaults to 10 minutes.integerfalsefalsefalse
tez.runtime.shuffle.keep-alive.enabledfalsebooleanfalsefalsefalse
tez.runtime.shuffle.keep-alive.max.connections20integerfalsefalsefalse
tez.runtime.shuffle.max.allowed.failed.fetch.fraction0.5floattruefalsetrue
tez.runtime.shuffle.max.stall.time.fraction0.5floattruefalsetrue
tez.runtime.shuffle.memory.limit.percent0.25floatfalsefalsefalse
tez.runtime.shuffle.memory-to-memory.segmentsnullintegerfalsefalsefalse
tez.runtime.shuffle.merge.percent0.9floatfalsefalsefalse
tez.runtime.shuffle.min.failures.per.host4integertruefalsetrue
tez.runtime.shuffle.min.required.progress.fraction0.5floattruefalsetrue
tez.runtime.shuffle.notify.readerrortruebooleanfalsefalsefalse
tez.runtime.shuffle.parallel.copies20integerfalsefalsefalse
tez.runtime.shuffle.read.timeout180000integerfalsefalsefalse
tez.runtime.shuffle.src-attempt.abort.limit-1integertruefalsetrue
tez.runtime.shuffle.use.async.httpfalsebooleanfalsefalsefalse
tez.runtime.sorter.classnullString value. Which sorter implementation to use. Valid values: - LEGACY - PIPELINED ( default ) {@link org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl}stringfalsefalsefalse
tez.runtime.sort.spill.percent0.8floatfalsefalsefalse
tez.runtime.unordered.output.buffer.size-mb100Size of the buffer to use if not writing directly to disk.integerfalsefalsefalse
tez.runtime.unordered.output.max-per-buffer.size-bytesnullMaximum size for individual buffers used in the UnsortedPartitionedOutput. This is only meant to be used by unit tests for now.integertruefalsefalse
tez.runtime.unordered-partitioned-kvwriter.buffer-merge-percent0Integer value. Percentage of buffer to be filled before we spill to disk. Default value is 0, which will spill for every buffer.intfalsefalsefalse
tez.runtime.value.classnullstringfalsefalsefalse

运行时的可调参数还是很多的。

Tez静态参数

参照官网:https://tez.apache.org/releases/0.10.1/tez-api-javadocs/configs/TezConfiguration.html

TezConfiguration


Property NameDefault ValueDescriptionTypeIs Private?Is Unstable?Is Evolving?
tez.dag.recovery.enabledtrueBoolean value. Enable recovery of DAGs. This allows a restarted app master to recover the incomplete DAGs from the previous instance of the app master.booleanfalsefalsefalse
tez.dag.recovery.io.buffer.size8192Int value. Size in bytes for the IO buffer size while processing the recovery file. Expert level setting.integerfalsefalsefalse
tez.dag.recovery.flush.interval.secs30Int value. Interval, in seconds, between flushing recovery data to the recovery log.integerfalsefalsefalse
tez.dag.recovery.max.unflushed.events100Int value. Number of recovery events to buffer before flushing them to the recovery log.integerfalsefalsefalse
tez.task.heartbeat.timeout.check-ms30000Int value. Time interval, in milliseconds, between checks for lost tasks. Expert level setting.integerfalsefalsefalse
tez.task.timeout-ms300000Int value. Time interval, in milliseconds, within which a task must heartbeat to the app master before its considered lost. Expert level setting.integerfalsefalsefalse
tez.am.acls.enabledtrueBoolean value. Configuration to enable/disable ACL checks.booleanfalsefalsefalse
tez.allow.disabled.timeline-domainsfalseBoolean value. Allow disabling of Timeline Domains even if Timeline is being used.booleantruefalsefalse
tez.am.client.am.port-rangenullString value. Range of ports that the AM can use when binding for client connections. Leave blank to use all possible ports. Expert level setting. It’s hadoop standard range configuration. For example 50000-50050,50100-50200stringfalsefalsefalse
tez.am.client.heartbeat.timeout.secs-1Int value. Time interval (in seconds). If the Tez AM does not receive a heartbeat from the client within this time interval, it will kill any running DAG and shut down. Required to re-cycle orphaned Tez applications where the client is no longer alive. A negative value can be set to disable this check. For a positive value, the minimum value is 10 seconds. Values between 0 and 10 seconds will be reset to the minimum value. Only relevant in session mode. This is disabled by default i.e. by default, the Tez AM will go on to complete the DAG and only kill itself after hitting the DAG submission timeout defined by {@link #TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS}integerfalsefalsefalse
tez.am.client.am.thread-count2Int value. Number of threads to handle client RPC requests. Expert level setting.integerfalsefalsefalse
tez.am.commit-all-outputs-on-dag-successtrueBoolean value. Determines when the final outputs to data sinks are committed. Commit is an output specific operation and typically involves making the output visible for consumption. If the config is true, then the outputs are committed at the end of DAG completion after all constituent vertices have completed. If false, outputs for each vertex are committed after that vertex succeeds. Depending on the desired output visibility and downstream consumer dependencies this value must be appropriately chosen. Defaults to the safe choice of true.booleanfalsefalsefalse
tez.am.containerlauncher.thread-count-limit500Int value. Upper limit on the number of threads user to launch containers in the app master. Expert level setting.integerfalsefalsefalse
tez.am.container.idle.release-timeout-max.millis10000Int value. The maximum amount of time to hold on to a container if no task can be assigned to it immediately. Only active when reuse is enabled. The value must be +ve and >= TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS. Containers will have an expire time set to a random value between TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS && TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS. This creates a graceful reduction in the amount of idle resources heldlongfalsefalsefalse
tez.am.container.idle.release-timeout-min.millis5000Int value. The minimum amount of time to hold on to a container that is idle. Only active when reuse is enabled. Set to -1 to never release idle containers (not recommended).integerfalsefalsefalse
tez.am.container.reuse.enabledtrueBoolean value. Configuration to specify whether container should be reused across tasks. This improves performance by not incurring recurring launch overheads.booleanfalsefalsefalse
tez.am.container.reuse.locality.delay-allocation-millis250Int value. The amount of time to wait before assigning a container to the next level of locality. NODE -> RACK -> NON_LOCAL. Delay scheduling parameter. Expert level setting.longfalsefalsefalse
tez.am.container.reuse.new-containers.enabledfalseBoolean value. Whether to reuse new containers that could not be immediately assigned to pending requests. If enabled then newly assigned containers that cannot be immediately allocated will be held for potential reuse as if it were a container that had just completed a task. If disabled then newly assigned containers that cannot be immediately allocated will be released. Active only if container reuse is enabled.booleanfalsefalsefalse
tez.am.container.reuse.non-local-fallback.enabledfalseBoolean value. Whether to reuse containers for non-local tasks. Active only if reuse is enabled. Turning this on can severely affect locality and can be bad for jobs with high data volume being read from the primary data sources.booleanfalsefalsefalse
tez.am.container.reuse.rack-fallback.enabledtrueBoolean value. Whether to reuse containers for rack local tasks. Active only if reuse is enabled.booleanfalsefalsefalse
tez.am.credentials-mergenullBoolean value. If true then Tez will add the ApplicationMaster credentials to all task credentials.booleanfalsefalsefalse
tez.am.dag.appcontext.thread-count-limit10Int value. Upper limit on the number of threads used by app context (vertex management and input init events).integerfalsefalsefalse
tez.am.dag.cleanup.on.completionfalseBoolean value. Instructs AM to delete Dag directory upon completionbooleanfalsefalsefalse
tez.am.dag.deletion.thread-count-limit10Int value. Upper limit on the number of threads used to delete DAG directories on nodes.integerfalsefalsefalse
tez.am.dag.scheduler.classorg.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderString value. The class to be used for DAG Scheduling. Expert level setting.stringfalsefalsefalse
tez.am.deletion.tracker.classorg.apache.tez.dag.app.launcher.DeletionTrackerImplString value that is a class name. Specify the class to use for Deletion tracking.stringfalsefalsefalse
tez.am.disable.client-version-checkfalseBoolean value. Disable version check between client and AM/DAG. Default false.booleantruefalsefalse
tez.am.task.estimator.exponential.lambda.msnullLong value. Specifies amount of time (in ms) of the lambda value in the smoothing function of the task estimatorlongfalsefalsetrue
tez.am.task.estimator.exponential.skip.initials24The number of initial readings that the estimator ignores before giving a prediction. At the beginning the smooth estimator won’t be accurate in predictionintegerfalsefalsetrue
tez.am.task.estimator.exponential.stagnated.msnullThe window length in the simple exponential smoothing that considers the task attempt is stagnated.longfalsefalsetrue
tez.am.inline.task.execution.enabledfalseTez AM Inline Mode flag. Not valid till Tez-684 get checked-inbooleantruefalsefalse
tez.am.inline.task.execution.max-tasks1Int value. The maximium number of tasks running in parallel within the app master process.integerfalsefalsefalse
tez.am.launch.cluster-default.cmd-opts-server -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARNString value. Command line options which will be prepended to {@link #TEZ_AM_LAUNCH_CMD_OPTS} during the launch of the AppMaster process. This property will typically be configured to include default options meant to be used by all jobs in a cluster. If required, the values can be overridden per job.stringfalsefalsefalse
tez.am.launch.cluster-default.envnullString value. Env settings will be merged with {@link #TEZ_AM_LAUNCH_ENV} during the launch of the AppMaster process. This property will typically be configured to include default system env meant to be used by all jobs in a cluster. If required, the values can be appended to per job.stringfalsefalsefalse
tez.am.launch.cmd-optsnullString value. Command line options provided during the launch of the Tez AppMaster process. Its recommended to not set any Xmx or Xms in these launch opts so that Tez can determine them automatically.stringfalsefalsefalse
tez.am.launch.envString value. Env settings for the Tez AppMaster process. Should be specified as a comma-separated of key-value pairs where each pair is defined as KEY=VAL e.g. “LD_LIBRARY_PATH=.,USERNAME=foo” These take least precedence compared to other methods of setting env. These get added to the app master environment prior to launching it. This setting will prepend existing settings in the cluster defaultstringfalsefalsefalse
tez.am.legacy.speculative.single.task.vertex.timeout-1Long value. Specifies the timeout after which tasks on a single task vertex must be speculated. A negative value means not to use timeout for speculation of single task vertices.longfalsefalsetrue
tez.am.legacy.speculative.slowtask.thresholdnullFloat value. Specifies how many standard deviations away from the mean task execution time should be considered as an outlier/slow task.floatfalsefalsetrue
tez.am.log.levelINFORoot Logging level passed to the Tez app master. Simple configuration: Set the log level for all loggers. e.g. INFO This sets the log level to INFO for all loggers. Advanced configuration: Set the log level for all classes, along with a different level for some. e.g. DEBUG;org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO This sets the log level for all loggers to DEBUG, expect for the org.apache.hadoop.ipc and org.apache.hadoop.security, which are set to INFO Note: The global log level must always be the first parameter. DEBUG;org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is valid org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is not validstringfalsefalsefalse
tez.am.max.allowed.time-sec.for-read-error300int value. Represents the maximum time in seconds for which a consumer attempt can report a read error against its producer attempt, after which the producer attempt will be re-run to re-generate the output. There are other heuristics which determine the retry and mainly try to guard against a flurry of re-runs due to intermittent read errors (due to network issues). This configuration puts a time limit on those heuristics to ensure jobs dont hang indefinitely due to lack of closure in those heuristics Expert level setting.integerfalsefalsefalse
tez.am.max.app.attempts2Int value. Specifies the number of times the app master can be launched in order to recover from app master failure. Typically app master failures are non-recoverable. This parameter is for cases where the app master is not at fault but is lost due to system errors. Expert level setting.integerfalsefalsefalse
tez.am.maxtaskfailures.per.node10Int value. Specifies the number of task failures on a node before the node is considered faulty.integerfalsefalsefalse
tez.am.minimum.allowed.speculative.tasks10Integer value. The minimum allowed tasks that can be speculatively re-executed at any time.integerfalsefalsetrue
tez.am.modify-aclsnullString value. AM modify ACLs. This allows the specified users/groups to run modify operations on the AM such as submitting DAGs, pre-warming the session, killing DAGs or shutting down the session. Comma separated list of users, followed by whitespace, followed by a comma separated list of groupsstringfalsefalsefalse
tez.am.node-blacklisting.enabledtrueBoolean value. Enabled blacklisting of nodes of nodes that are considered faulty. These nodes will not be used to execute tasks.booleanfalsefalsefalse
tez.am.node-blacklisting.ignore-threshold-node-percent33Int value. Specifies the percentage of nodes in the cluster that may be considered faulty. This limits the number of nodes that are blacklisted in an effort to minimize the effects of temporary surges in failures (e.g. due to network outages).integerfalsefalsefalse
tez.am.node-unhealthy-reschedule-tasksfalseBoolean value. Enable task rescheduling for node updates. When enabled the task scheduler will reschedule task attempts that are associated with an unhealthy node to avoid potential data transfer errors from downstream tasks.booleanfalsefalsefalse
tez.am.preemption.heartbeats-between-preemptions3Int value. The number of RM heartbeats to wait after preempting running tasks before preempting more running tasks. After preempting a task, we need to wait at least 1 heartbeat so that the RM can act on the released resources and assign new ones to us. Expert level setting.integerfalsefalsefalse
tez.am.preemption.max.wait-time-ms60000Int value. Time (in millisecs) that an unsatisfied request will wait before preempting other resources. In rare cases, the cluster says there are enough free resources but does not end up getting enough on a node to actually assign it to the job. This configuration tries to put a deadline on such wait to prevent indefinite job hangs.integerfalsefalsefalse
tez.am.preemption.percentage10Int value. Specifies the percentage of tasks eligible to be preempted that will actually be preempted in a given round of Tez internal preemption. This slows down preemption and gives more time for free resources to be allocated by the cluster (if any) and gives more time for preemptable tasks to finish. Valid values are 0-100. Higher values will preempt quickly at the cost of losing work. Setting to 0 turns off preemption. Expert level setting.integerfalsefalsefalse
tez.am.proportion.running.tasks.speculatable0.1Double value. The max percent (0-1) of running tasks that can be speculatively re-executed at any time.doublefalsefalsetrue
tez.am.proportion.total.tasks.speculatable0.01Double value. The max percent (0-1) of all tasks that can be speculatively re-executed at any time.doublefalsefalsetrue
tez.am.resource.cpu.vcores1Int value. The number of virtual cores to be used by the app masterintegerfalsefalsefalse
tez.am.resource.memory.mb1024Int value. The amount of memory in MB to be used by the AppMasterintegerfalsefalsefalse
tez.am.am-rm.heartbeat.interval-ms.max1000Int value. The maximum heartbeat interval between the AM and RM in milliseconds Increasing this reduces the communication between the AM and the RM and can help in scaling up. Expert level setting.integerfalsefalsefalse
tez.am.session.min.held-containers0Int value. The minimum number of containers that will be held in session mode. Not active in non-session mode. Enables an idle session (not running any DAG) to hold on to a minimum number of containers to provide fast response times for the next DAG.integerfalsefalsefalse
tez.am.mode.sessionfalseBoolean value. Execution mode for the Tez application. True implies session mode. If the client code is written according to best practices then the same code can execute in either mode based on this configuration. Session mode is more aggressive in reserving execution resources and is typically used for interactive applications where multiple DAGs are submitted in quick succession by the same user. For long running applications, one-off executions, batch jobs etc non-session mode is recommended. If session mode is enabled then container reuse is recommended.booleanfalsefalsefalse
tez.am.shuffle.auxiliary-service.idmapreduce_shuffleString value. Specifies the name of the shuffle auxiliary service.stringfalsefalsefalse
tez.am.soonest.retry.after.no.speculate1000Long value. Specifies amount of time (in ms) that needs to elapse to do the next round of speculation if there is no task speculated in this round.longfalsefalsetrue
tez.am.soonest.retry.after.speculate15000Long value. Specifies amount of time (in ms) that needs to elapse to do the next round of speculation if there are tasks speculated in this round.longfalsefalsetrue
tez.am.speculation.enabledfalsebooleanfalsefalsetrue
tez.am.speculator.classnullThe class that should be used for speculative execution calculations.stringfalsefalsefalse
tez.staging-dirnullString value. Specifies a directory where Tez can create temporary job artifacts.stringfalsefalsefalse
tez.am.staging.scratch-data.auto-deletetrueBoolean value. If true then Tez will try to automatically delete temporary job artifacts that it creates within the specified staging dir. Does not affect any user data.booleanfalsefalsefalse
tez.am.task.estimator.classnullThe class that should be used for task runtime estimation.stringfalsefalsefalse
tez.am.task.listener.thread-count30Int value. The number of threads used to listen to task heartbeat requests. Expert level setting.integerfalsefalsefalse
tez.am.task.max.attempts0Int value. The maximum number of attempts that can run for a particular task before the task is failed. This count every attempts, including failed, killed attempts. Task failure results in DAG failure. Default is 0, which disables this feature.integerfalsefalsefalse
tez.am.task.max.failed.attempts4Int value. The maximum number of attempts that can fail for a particular task before the task is failed. This does not count killed attempts. Task failure results in DAG failure.integerfalsefalsefalse
tez.am.task.reschedule.higher.prioritytrueBoolean value. Specifies whether a re-scheduled attempt of a task, caused by previous failures gets higher prioritybooleanfalsefalsefalse
tez.am.task.reschedule.relaxed.localitytrueBoolean value. Specifies whether a re-scheduled attempt of a task, caused by previous failure get relaxed localitybooleanfalsefalsefalse
tez.am.tez-ui.history-url.templateHISTORY_URL_BASE/#/tez-app/APPLICATION_IDString value Tez UI URL template for the application. Expert level setting. The AM will redirect the user to the Tez UI via this url. Template supports the following parameters to be replaced with the actual runtime information: APPLICATION_ID : Replaces this with application ID HISTORY_URL_BASE: replaces this with TEZ_HISTORY_URL_BASE For example, "http://uihost:9001/#/tez-app/APPLICATION_ID/ will be replaced to http://uihost:9001/#/tez-app/application_1421880306565_0001/stringfalsefalsefalse
tez.am.vertex.max-task-concurrency-1Int value. The maximum number of attempts that can run concurrently for a given vertex. Setting <=0 implies no limitintegerfalsefalsefalse
tez.am.view-aclsnullString value. AM view ACLs. This allows the specified users/groups to view the status of the AM and all DAGs that run within this AM. Comma separated list of users, followed by whitespace, followed by a comma separated list of groupsstringfalsefalsefalse
tez.am.tez-ui.webservice.enabletrueString value Allow disabling of the Tez AM webservice. If set to false the Tez-UI wont show progress updates for running application.booleanfalsefalsefalse
tez.am.yarn.scheduler.classorg.apache.tez.dag.app.rm.YarnTaskSchedulerServiceString value. The class to be used for the YARN task scheduler. Expert level setting.stringfalsefalsefalse
tez.application.tagsnullString value. Tags for the job that will be passed to YARN at submission time. Queries to YARN for applications can filter on these tags.stringfalsefalsefalse
tez.aux.urisnullAuxiliary resources to be localized for the Tez AM and all its containers. Value is comma-separated list of fully-resolved directories or file paths. All resources are made available into the working directory of the AM and/or containers i.e. $CWD. If directories are specified, they are not traversed recursively. Only files directly under the specified directory are localized. All duplicate resources are ignored.stringfalsefalsefalse
tez.cancel.delegation.tokens.on.completiontruebooleantruefalsefalse
tez.classpath.add-hadoop-conffalseBoolean value. If this value is true then tez explicitly adds hadoop conf directory into classpath for AM and task containers. Default is false.booleantruefalsetrue
tez.client.asynchronous-stoptrueBoolean value. Backwards compatibility setting. Changes TezClient stop to be a synchronous call waiting until AM is in a final state before returning to the user. Expert level setting.booleanfalsefalsefalse
tez.client.diagnostics.wait.timeout-ms3000Long value Time to wait (in milliseconds) for yarn app’s diagnotics is available Workaround for YARN-2560longtruefalsefalse
tez.client.timeout-ms30000Long value. Time interval, in milliseconds, for client to wait during client-requested AM shutdown before issuing a hard kill to the RM for this application. Expert level setting.longfalsefalsefalse
tez.java.opts.checker.classnullString value. Ability to provide a different implementation to check/verify java opts defined for vertices/tasks. Class has to be an instance of JavaOptsCheckerstringtruefalsefalse
tez.java.opts.checker.enabledtrueBoolean value. Default true. Ability to disable the Java Opts Checkerbooleantruefalsefalse
tez.task.concurrent.edge.trigger.typenullString value. In the presence of concurrent input edge to a vertex, this describes the timing of scheduling downstream vertex tasks. It may be closely related to the type of event that will contribute to a scheduling decision.stringfalsefalsefalse
tez.container.max.java.heap.fraction0.8Double value. Tez automatically determines the Xmx for the JVMs used to run Tez tasks and app masters. This feature is enabled if the user has not specified Xmx or Xms values in the launch command opts. Doing automatic Xmx calculation is preferred because Tez can determine the best value based on actual allocation of memory to tasks the cluster. The value if used as a fraction that is applied to the memory allocated Factor to size Xmx based on container memory size. Value should be greater than 0 and less than 1. Set this value to -1 to allow Tez to use different default max heap fraction for different container memory size. Current policy is to use 0.7 for container smaller than 4GB and use 0.8 for larger container.floatfalsefalsefalse
tez.counters.counter-name.max-length64Int value. Configuration to limit the length of counter names. This can be used to limit the amount of memory being used in the app master to store the counters. Expert level setting.integerfalsefalsetrue
tez.counters.group-name.max-length256Int value. Configuration to limit the counter group names per app master. This can be used to limit the amount of memory being used in the app master to store the counters. Expert level setting.integerfalsefalsetrue
tez.counters.max1200Int value. Configuration to limit the counters per dag (AppMaster and Task). This can be used to limit the amount of memory being used in the app master to store the counters. Expert level setting.integerfalsefalsetrue
tez.counters.max.groups500Int value. Configuration to limit the number of counter groups for a DAG. This can be used to limit the amount of memory being used in the app master to store the counters. Expert level setting.integerfalsefalsetrue
tez.credentials.pathnullString value that is a file path. Path to a credentials file (with serialized credentials) located on the local file system.stringfalsefalsefalse
tez.dag.status.pollinterval-ms500Long value Status Poll interval in Milliseconds used when getting DAG status with timeout.longfalsefalsefalse
tez.generate.debug.artifactsfalsebooleanfalsefalsetrue
tez.history.logging.log.levelnullEnum value. Config to limit the type of events published to the history logging service. The valid log levels are defined in the enum {@link HistoryLogLevel}. The default value is defined in {@link HistoryLogLevel#DEFAULT}.stringfalsefalsefalse
tez.history.logging.proto-base-dirnullString value. The base directory into which history data will be written when proto history logging service is used for {@link TezConfiguration#TEZ_HISTORY_LOGGING_SERVICE_CLASS}. If this is not set, then logging is disabled for ProtoHistoryLoggingService.stringfalsefalsefalse
tez.history.logging.proto-doasfalseLong value. The amount of time in seconds to wait to ensure all events for a day is synced to disk. This should be maximum time variation b/w machines + maximum time to sync file content and metadata.booleanfalsefalsefalse
tez.history.logging.queue.size100000Int value. Maximum queue size for proto history event logger.integerfalsefalsefalse
tez.history.logging.split-dag-startfalseBoolean value. Set this to true, if the underlying file system does not support flush (Ex: s3). The dag submitted, initialized and started events are written into a file and closed. The rest of the events are written into another file.booleanfalsefalsefalse
tez.history.logging.proto-sync-window-secs60Long value. The amount of time in seconds to wait to ensure all events for a day is synced to disk. This should be maximum time variation b/w machines + maximum time to sync file content and metadata.longfalsefalsefalse
tez.history.logging.service.classorg.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingServiceString value that is a class name. Specify the class to use for logging history data. To disable, set this to “org.apache.tez.dag.history.logging.impl.DevNullHistoryLoggingService”stringfalsefalsefalse
tez.history.logging.taskattempt-filtersnullList of comma separated enum values. Specifies the list of task attempt termination causes, which have to be suppressed from being logged to ATS. The valid filters are defined in the enum TaskAttemptTerminationCause. The filters are applied only if tez.history.logging.log.level is set to TASK_ATTEMPT.stringfalsefalsefalse
tez.history.logging.timeline-cache-plugin.old-num-dags-per-groupnullComma separated list of Integers. These are the values that were set for the config value for {@value #TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP}. The older values are required so that the groupIds generated previously will continue to be generated by the plugin. If an older value is not present then the UI may not show information for DAGs which were created with a different grouping value. Note: Do not add too many values here as it will affect the performance of Yarn Timeline Server/Tez UI due to the need to scan for more log files.stringtruefalsetrue
tez.history.logging.timeline.num-dags-per-group1Integer value. Number of DAGs to be grouped together. This is used by the history logging service to generate groupIds such that numDagsPerGroup will have same groupId in a given session. If the value is set to 1 then we disable grouping. This config is used to control the number of DAGs written into one log file, and hence controls number of files created in the Filesystem used by YARN Timeline.integertruefalsetrue
tez.tez-ui.history-url.basenullString value Tez-UI Url base. This gets replaced in the TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE ex http://ui-host:9001 or if its hosted with a prefix http://ui-host:9001/~user if the ui is hosted on the default port (80 for http and 443 for https), the port should not be specified.stringfalsefalsefalse
tez.ignore.lib.urisnullBoolean value. Allows to ignore ‘tez.lib.uris’. Useful during development as well as raw Tez application where classpath is propagated with application via {@link LocalResource}s. This is mainly useful for developer/debugger scenarios.booleanfalsefalsetrue
tez.ipc.payload.reserved.bytes5242880Int value. SubmitDAGPlanRequest cannot be larger than Max IPC message size minus this number; otherwise, it will be serialized to HDFS and we transfer the path to server. Server will deserialize the request from HDFS.inttruefalsefalse
tez.job.fs-serversnullAcquire all FileSystems info. e.g., all namenodes info of HDFS federation cluster.stringfalsefalsefalse
tez.job.fs-servers.token-renewal.excludenullSkip delegation token renewal for specified FileSystems.stringfalsefalsefalse
tez.tez.jvm.system-properties-to-lognullString value. Determines what JVM properties will be logged for debugging purposes in the AM and Task runtime logs.stringfalsefalsefalse
tez.lib.urisnullString value to a file path. The location of the Tez libraries which will be localized for DAGs. This follows the following semanticsTo use .tar.gz or .tgz files (generated by the tez or hadoop builds), the full path to this file (including filename) should be specified. The internal structure of the uncompressed tgz will be defined by 'tez.lib.uris.classpath’If a single file is specified without the above mentioned extensions - it will be treated as a regular file. This means it will not be uncompressed during runtime.If multiple entries existRegular Files: will be treated as regular files (not uncompressed during runtime)Archive Files: will be treated as archives and will be uncompressed during runtimeDirectories: all files under the directory (non-recursive) will be made available (but not uncompressed during runtime).stringfalsefalsefalse
tez.lib.uris.classpathnullSpecify additional user classpath information to be used for Tez AM and all containers. This will be appended to the classpath after PWD ‘tez.lib.uris.classpath’ defines the relative classpath into the archives that are set in ‘tez.lib.uris’stringfalsefalsefalse
tez.local.cache.root.folder.String value. TezLocalCacheManager uses this folder as a root for temp and localized files.stringfalsefalsefalse
tez.local.modefalseBoolean value. Enable local mode execution in Tez. Enables tasks to run in the same process as the app master. Primarily used for debugging.booleanfalsefalsefalse
tez.local.mode.without.networkfalseBoolean value. Enable local mode execution in Tez without using network for communicating with DAGAppMaster. This option only makes sense when {@link #TEZ_LOCAL_MODE} is true. When TEZ_LOCAL_MODE_WITHOUT_NETWORK is turned on, LocalClient will call DAGAppMaster’s methods directly.booleanfalsefalsefalse
tez.mrreader.config.update.propertiesnullComma-separated list of properties that MRReaderMapred should return (if present) when calling for config updates.stringfalsefalsefalse
tez.queue.namenullString value. The queue name for all jobs being submitted from a given client.stringfalsefalsefalse
tez.session.am.dag.submit.timeout.secs300Int value. Time (in seconds) for which the Tez AM should wait for a DAG to be submitted before shutting down. Only relevant in session mode. Any negative value will disable this check and allow the AM to hang around forever in idle mode.integerfalsefalsefalse
tez.session.client.timeout.secs120Int value. Time (in seconds) to wait for AM to come up when trying to submit a DAG from the client. Only relevant in session mode. If the cluster is busy and cannot launch the AM then this timeout may be hit. In those case, using non-session mode is recommended if applicable. Otherwise increase the timeout (set to -1 for infinity. Not recommended)integerfalsefalsefalse
tez.simple.history.logging.dirnullString value. The directory into which history data will be written. This defaults to the container logging directory. This is relevant only when SimpleHistoryLoggingService is being used for {@link TezConfiguration#TEZ_HISTORY_LOGGING_SERVICE_CLASS}stringfalsefalsefalse
tez.simple.history.max.errors10Int value. Maximum errors allowed while logging history data. After crossing this limit history logging gets disabled. The job continues to run after this.integerfalsefalsefalse
tez.task.am.heartbeat.counter.interval-ms.max4000Int value. Interval, in milliseconds, after which counters are sent to AM in heartbeat from tasks. This reduces the amount of network traffice between AM and tasks to send high-volume counters. Improves AM scalability. Expert level setting.integerfalsefalsefalse
tez.task.am.heartbeat.interval-ms.max100Int value. The maximum heartbeat interval, in milliseconds, between the app master and tasks. Increasing this can help improve app master scalability for a large number of concurrent tasks. Expert level setting.integerfalsefalsefalse
tez.task.generate.counters.per.iofalseWhether to generate counters per IO or not. Enabling this will rename CounterGroups / CounterNames to making them unique per Vertex + Src|Destinationbooleantruefalsetrue
tez.task.get-task.sleep.interval-ms.max200Int value. The maximum amount of time, in milliseconds, to wait before a task asks an AM for another task. Increasing this can help improve app master scalability for a large number of concurrent tasks. Expert level setting.integerfalsefalsefalse
tez.task.initialize-processor-firstfalseBoolean value. Backwards compatibility setting for initializing IO processor before inputs and outputs. Expert level setting.booleanfalsefalsefalse
tez.task.initialize-processor-io-seriallyfalseBoolean value. Backwards compatibility setting for initializing inputs and outputs serially instead of the parallel default. Expert level setting.booleanfalsefalsefalse
tez.task.launch.cluster-default.cmd-opts-server -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARNString value. Command line options which will be prepended to {@link #TEZ_TASK_LAUNCH_CMD_OPTS} during the launch of Tez tasks. This property will typically be configured to include default options meant to be used by all jobs in a cluster. If required, the values can be overridden per job.stringfalsefalsefalse
tez.task.launch.cluster-default.envnullString value. Env settings will be merged with {@link #TEZ_TASK_LAUNCH_ENV} during the launch of the task process. This property will typically be configured to include default system env meant to be used by all jobs in a cluster. If required, the values can be appended to per job.stringfalsefalsefalse
tez.task.launch.cmd-optsnullString value. Command line options provided during the launch of Tez Task processes. Its recommended to not set any Xmx or Xms in these launch opts so that Tez can determine them automatically.stringfalsefalsefalse
tez.task.launch.envString value. Env settings for the Tez Task processes. Should be specified as a comma-separated of key-value pairs where each pair is defined as KEY=VAL e.g. “LD_LIBRARY_PATH=.,USERNAME=foo” These take least precedence compared to other methods of setting env These get added to the task environment prior to launching it. This setting will prepend existing settings in the cluster defaultstringfalsefalsefalse
tez.task.log.levelINFORoot Logging level passed to the Tez tasks. Simple configuration: Set the log level for all loggers. e.g. INFO This sets the log level to INFO for all loggers. Advanced configuration: Set the log level for all classes, along with a different level for some. e.g. DEBUG;org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO This sets the log level for all loggers to DEBUG, expect for the org.apache.hadoop.ipc and org.apache.hadoop.security, which are set to INFO Note: The global log level must always be the first parameter. DEBUG;org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is valid org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is not validstringfalsefalsefalse
tez.task.max-events-per-heartbeat500Int value. Maximum number of of events to fetch from the AM by the tasks in a single heartbeat. Expert level setting. Expert level setting.integerfalsefalsefalse
tez.task.max-event-backlog10000Int value. Maximum number of pending task events before a task will stop asking for more events in the task heartbeat. Expert level setting.integerfalsefalsefalse
tez.task.progress.stuck.interval-ms-1Long value. Interval, in milliseconds, within which any of the tasks Input/Processor/Output components need to make successive progress notifications. If the progress is not notified for this interval then the task will be considered hung and terminated. The value for this config should be larger than {@link TezConfiguration#TASK_HEARTBEAT_TIMEOUT_MS} and larger than 2 times the value of {@link TezConfiguration#TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS}. A config value <=0 disables this.stringfalsefalsefalse
tez.task.resource.calculator.process-tree.classnullstringtruefalsetrue
tez.task.resource.cpu.vcores1Int value. The number of virtual cores to be used by tasks.integerfalsefalsefalse
tez.task.resource.memory.mb1024Int value. The amount of memory in MB to be used by tasks. This applies to all tasks across all vertices. Setting it to the same value for all tasks is helpful for container reuse and thus good for performance typically.integerfalsefalsefalse
tez.task.scale.memory.additional-reservation.fraction.maxnullfloattruefalsetrue
tez.task.scale.memory.additional-reservation.fraction.per-ionullFraction of available memory to reserve per input/output. This amount is removed from the total available pool before allocation and is for factoring in overheads.floattruefalsetrue
tez.task.scale.memory.allocator.classorg.apache.tez.runtime.library.resources.WeightedScalingMemoryDistributorThe allocator to use for initial memory allocationstringtruefalsetrue
tez.task.scale.memory.enabledtrueWhether to scale down memory requested by each component if the total exceeds the available JVM memorybooleantruefalsetrue
tez.task.scale.memory.reserve-fraction0.3The fraction of the JVM memory which will not be considered for allocation. No defaults, since there are pre-existing defaults based on different scenarios.doubletruefalsetrue
tez.task.scale.memory.ratiosnullstringtruefalsetrue
tez.task-specific.launch.cmd-optsnullAdditional launch command options to be added for specific tasks. VERTEX_NAME and TASK_INDEX can be specified, which would be replaced at runtime by vertex name and task index. e.g tez.task-specific.launch.cmd-opts= “-agentpath:libpagent.so,dir=/tmp/VERTEX_NAME/TASK_INDEXstringfalsefalsetrue
tez.task-specific.launch.cmd-opts.listnullSet of tasks for which specific launch command options need to be added. Format: “vertexName[csv of task ids];vertexName[csv of task ids]…” Valid e.g: v[0,1,2] - Additional launch-cmd options for tasks 0,1,2 of vertex v v[1,2,3];v2[5,6,7] - Additional launch-cmd options specified for tasks of vertices v and v2. v[1:5,20,30];v2[2:5,60,7] - Additional launch-cmd options for 1,2,3,4,5,20,30 of vertex v; 2, 3,4,5,60,7 of vertex v2 Partial ranges like :5, 1: are not supported. v[] - Additional launch-cmd options for all tasks in vertex vstringfalsefalsetrue
tez.task-specific.log.levelnullTask specific log level. Simple configuration: Set the log level for all loggers. e.g. INFO This sets the log level to INFO for all loggers. Advanced configuration: Set the log level for all classes, along with a different level for some. e.g. DEBUG;org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO This sets the log level for all loggers to DEBUG, expect for the org.apache.hadoop.ipc and org.apache.hadoop.security, which are set to INFO Note: The global log level must always be the first parameter. DEBUG;org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is valid org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is not validstringfalsefalsetrue
tez.test.minicluster.app.wait.on.shutdown.secs30Long value. Time to wait (in seconds) for apps to complete on MiniTezCluster shutdown.longtruefalsefalse
tez.user.classpath.firsttrueBoolean value. Specify whether the user classpath takes precedence over the Tez framework classpath.booleanfalsefalsefalse
tez.use.cluster.hadoop-libsfalseBoolean value. Specify whether hadoop libraries required to run Tez should be the ones deployed on the cluster. This is disabled by default - with the expectation being that tez.lib.uris has a complete tez-deployment which contains the hadoop libraries.booleanfalsefalsefalse
tez.yarn.ats.acl.domains.auto-createtruebooleanfalsefalsefalse
tez.yarn.ats.event.flush.timeout.millis-1Int value. Time, in milliseconds, to wait while flushing YARN ATS data during shutdown. Expert level setting.longfalsefalsefalse
tez.yarn.ats.max.events.per.batch5Int value. Max no. of events to send in a single batch to ATS. Expert level setting.integerfalsefalsefalse
tez.yarn.ats.max.polling.time.per.event.millis10Int value. Time, in milliseconds, to wait for an event before sending a batch to ATS. Expert level setting.integerfalsefalsefalse

不必怀疑。。。静态参数就是比运行时参数更多。

总结

通过查脚本及源代码,可以知道Hive的命令行方式有3个入口:

Beeline:先进,直接org.apache.hive.beeline.BeeLine主类Main方法,构造方法isBeeLine=true
Hive Cli:落后,可选Beeline入口【org.apache.hive.beeline.cli.HiveCli】,构造方法isBeeLine=false
		可选Cli入口【org.apache.hadoop.hive.cli.CliDriver】

从落后的Cli入口可以看出HQL的具体执行过程:

入口org.apache.hadoop.hive.cli.CliDriver进入→run()方法→executeDriver方法具体完成SQL解析及计算任务吊起

主方法executeDriver的执行又细分为如下步骤:

processLine方法执行拆分后的每个SQL→processCmd方法执行具体SQL命令→根据是否解析出source关键字、是否“!”开头做了一些骚操作→
processLocalCmd方法吊起run方法做运算→processLocalCmd方法做结果展示

processLocalCmd方法具体调用的是Driver的run方法:

未编译状态执行runInternal方法→
compileInternal方法编译SQL
【主要是内部的compile方法:解析到AST树,刷新了函数及MetaStore,sem.analyze语义分析,sem.validate做计划生成】→
execute吊起【launchTask并行吊起、runSequential串行吊起】

先留个坑,最核心且最复杂的compile方法及Calcite解析优化还需要慢慢研究。。。

在这里插入图片描述
转载请注明出处。。。

相关文章:

  • 浅谈前端微服务背景及Micro-app的使用
  • 【Notepad】Notepad++ 安装XML/Json插件,格式化xml/json文件
  • 3分钟学会批量混合查询多家快递的物流信息
  • ElasticSearch Query DSL(二)
  • 【重识云原生】第六章容器6.1.7.1节——Docker核心技术cgroups综述
  • 关于C# HttpClient 的用法及相关问题的解决方法
  • illustrator插件-什么是脚本-如何使用-什么是动作-AI插件
  • UVA 10405【LCS】【背包】
  • Git学习总结
  • Java项目:SSM医药信息管理系统
  • python——装饰器深入研究(一)
  • 猿创征文|【C++游戏引擎Easy2D】炫酷动画来这学,位移动画构造函数让节点执行动画
  • 做好规划 拿下未来!
  • MATLAB算法实战应用案例精讲-【智能优化算法】非支配排序遗传算法-NSGA-Ⅱ(附python和matlab代码)
  • 完美免费在线去背景图片,便捷变速。在5秒内消除或者替换图像背景,智能调整颜色,所有操作都在浏览器完成,无需上传图像 - BgSub
  • @jsonView过滤属性
  • 【140天】尚学堂高淇Java300集视频精华笔记(86-87)
  • 07.Android之多媒体问题
  • Centos6.8 使用rpm安装mysql5.7
  • CODING 缺陷管理功能正式开始公测
  • Docker下部署自己的LNMP工作环境
  • java8-模拟hadoop
  • JavaWeb(学习笔记二)
  • Java教程_软件开发基础
  • LeetCode541. Reverse String II -- 按步长反转字符串
  • uni-app项目数字滚动
  • win10下安装mysql5.7
  • 七牛云假注销小指南
  • 如何抓住下一波零售风口?看RPA玩转零售自动化
  • 微服务核心架构梳理
  • 问:在指定的JSON数据中(最外层是数组)根据指定条件拿到匹配到的结果
  • 我从编程教室毕业
  • 异步
  • 微龛半导体获数千万Pre-A轮融资,投资方为国中创投 ...
  • 支付宝花15年解决的这个问题,顶得上做出十个支付宝 ...
  • # centos7下FFmpeg环境部署记录
  • (初研) Sentence-embedding fine-tune notebook
  • (附程序)AD采集中的10种经典软件滤波程序优缺点分析
  • (附源码)spring boot建达集团公司平台 毕业设计 141538
  • (附源码)spring boot校园拼车微信小程序 毕业设计 091617
  • (附源码)基于ssm的模具配件账单管理系统 毕业设计 081848
  • (六)软件测试分工
  • (南京观海微电子)——I3C协议介绍
  • (亲测成功)在centos7.5上安装kvm,通过VNC远程连接并创建多台ubuntu虚拟机(ubuntu server版本)...
  • (十二)python网络爬虫(理论+实战)——实战:使用BeautfulSoup解析baidu热搜新闻数据
  • (四)linux文件内容查看
  • (一)kafka实战——kafka源码编译启动
  • .net Application的目录
  • .NET C# 使用 SetWindowsHookEx 监听鼠标或键盘消息以及此方法的坑
  • .NET CORE Aws S3 使用
  • .NET Core日志内容详解,详解不同日志级别的区别和有关日志记录的实用工具和第三方库详解与示例
  • .net Stream篇(六)
  • .NET 反射 Reflect
  • .Net 路由处理厉害了
  • .net专家(张羿专栏)