当前位置: 首页 > news >正文

【hive和spark】hive和spark数据lineage血缘实现思路

一、背景

hive、spark、flink是hadoop最常用的,三个计算入口。hive最古老,同时有metastore,spark算的快,flink流技术支持最好。目前hive和spark融合度高,flink较为独行。

1.1 spark、hive关系:

hive和spark相互依存,如hive使用spark引擎,进行计算(当然也能使用tez引擎),spark连接hive metastore,获取表的元数据关系。本文不讨论tez引擎的问题。仅讨论hive使用spark引擎,和spark连接spark元数据情形。

hive使用spark引擎:
虽然hive使用spark引擎,但是不会触发spark的钩子函数的。仅作为引擎使用。

spark连接hive metastore。
spark内执行ddl的时候,spark监听ddl可以捕捉到。同时由于修改hive metastore,hive metastore的钩子也能监听到。所以spark 和 hive都能监听到,如果都监听务必仅向数据库写入一份,或者直接都用merge方式写入。

1.2 目前已经有框架

1.2.1、datahub

datahub
领英,创建,完全开源。社区较为活跃,依赖图数据库,文档详尽,支持很多数据源。定义为数据发现平台(Data Discovery Platform ),数据管理平台,集成了元数据管理和数据血缘功能。拥有UI界面。由python和java编写完成,元数据导入等使用python脚本完成。

数据血缘的表示如下:
source
在这里插入图片描述

在这里插入图片描述

1.2.2、atlas

atlas
阿特拉斯,由apache主导,社区活跃略低,较重,依赖hbase。集成数据发现功能,数据定义提较为灵活。
支持

在这里插入图片描述

1.3、血缘粒度

hive在不太老的版本上可以实现column级血缘,下文有介绍。
spark目前较难实现column级血缘。只能实现table级。

1.4、项目架构

其中有些是比较特别的,比如SQL形式的create table ascreate view,和SPARK的save into datasource,都是又是ddl也是dml的,会被执行两次。但由于项目中数据模型的一致性且存储模型均使用merge方式,所以最终存储到图数据的数据不会受到影响。
在这里插入图片描述

二、hive数据血缘实现思路

2.1 hive数据血缘捕捉实现

实现org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext
可以获取到任意类型的sql,比如insert,create table as,explain,use database等等。
需要注意的是:
(1)此类监测的hiveserver2,因此spark、flink的ddl等都不会被此类监测到。
(2)此类能检测到sql文本内容。

此类只有一个方法

void run(HookContext var1) throws Exception;

从HookContext中可以获取到

// 查询计划val plan: QueryPlan = hookContext.getQueryPlan// 查询的operation,即查询类型// 例如 org.apache.hadoop.hive.ql.plan.HiveOperation#QUERY 通过 getOperationName方法可以和op进行匹配val op = plan.getOperationName// 通过SessionState可以获取到当前的SessionId和当前数据库名,还有当前Session内的所有临时temporary表名称,还有当前的HiveConf。val ss: SessionState = SessionState.get// 获取当前Session的用户名称val userName = hookContext.getUgi.getUserName
// 执行时间val queryTime = getQueryTime(plan)// 执行的Sql语句val sql = plan.getQueryStr.trim// 通过HiveMetaStoreClient 可以实时再查询库表的元数据。// 使用ms需要注意:如果此表已经drop或者alter就不要使用ms查询了,否则程序报异常。val ms: HiveMetaStoreClient = getMetastore(ss.getConf)

和血缘有关系的操作有一些内容:
本例中view相关的也纳入血缘范畴。

op match {case dml if (Set(HiveOperation.QUERY.getOperationName  // insert into , HiveOperation.CREATETABLE_AS_SELECT.getOperationName// create table as , HiveOperation.CREATEVIEW.getOperationName // create view, HiveOperation.ALTERVIEW_RENAME.getOperationName // alter view as).contains(op)) => {// 下文将详细讲解}case load if Set(HiveOperation.LOAD.getOperationName).contains(load) => {LOG.info(s"lineage event: ${op}!")// 封装对象 }case truncate if Set(HiveOperation.TRUNCATETABLE.getOperationName).contains(truncate) => {// 封装对象}case other=>  LOG.info(s"lineage event: ${op} passed!")
}

这里着重说明case dml,情形较为复杂:
主要分为finalSelOps是否为空两种情况。

  def toScalaLinkedHashMap[K, V](input: java.util.Map[K, V]): LinkedHashMap[K, V] = {val output: LinkedHashMap[K, V] = LinkedHashMap.emptyoutput.putAll(input)output}// index是指每一列都对应已改索引,血缘信息需要通过index查找到val index: LineageCtx.Index = hookContext.getIndex// finalSelOps就是select最终获取的列的信息// LinkedHashMap[列名称,[SelectOperator,sink表]]val finalSelOps: mutable.LinkedHashMap[String, ObjectPair[SelectOperator, Table]] = toScalaLinkedHashMap(index.getFinalSelectOps)// hive2.3.9以后修复了finalSelOps为空的bug,能获取列的血缘信息。
// 具体参考:https://issues.apache.org/jira/browse/HIVE-14706
if (finalSelOps.values.isEmpty) {// 无法获取最终选取的列的信息,就只能获取表的血缘了。// plan.getInputs获取数据的source,可能是表,视图,临时表,也可能是 insert into values(我是临时表) 生成的临时表 ,另外注意:如insert into table from some view时候,plan.getInputs会将view和其关联的table名都获取到,此时无法区分到底是从视图+表而来,还是只从视图来。// plan.getOutputs获取数据的sink,可能含有database需要过滤,比如create table实际也修改database信息。需要注意:如果师表必须判断是否为temporary表,如果是临时表需要在程序里创建一个临时cache,存储此临时表的血缘信息。之后可能在plan.getInputs中使用到此临时表的血缘信息,将临时表替换为实体表或视图的血缘信息。
}else{// 可以获取列级别血缘// 获取到最终的insert into表的列信息(不一定是全部列)var tgtTblCurrSchemas: Seq[FieldSchema] = plan.getResultSchema.getFieldSchemas.toListfor (pair <- finalSelOps.values) {
// 获取每一列的学院信息finalSelOp就是一列的血缘信息。val finalSelOp: SelectOperator = pair.getFirst// 每一列的血缘可能来自多个表的多个列。val tblDeps: Seq[LineageInfo.Dependency]=index.getDependencies(finalSelOp).values().toSeq// sink表可以通过// (1) pair.getSecond// (2) val tblOutputs: mutable.Set[WriteEntity] = plan.getOutputs.filter(out => Set(Entity.Type.TABLE, Entity.Type.PARTITION).contains(out.getType)) // WriteEntity中含有表信息。
}

LineageInfo.Dependency追溯内容如下:
即能获取到表的列信息。

    public static class Dependency implements Serializable {private static final long serialVersionUID = 1L;private DependencyType type;private String expr;private Set<BaseColumnInfo> baseCols;}public static class BaseColumnInfo implements Serializable {private static final long serialVersionUID = 1L;private TableAliasInfo tabAlias;private FieldSchema column;}

2.2 hive数据血缘debug方式

具体参考
远程remote debug hive的方法,用于hive监听器/钩子编写
只能远程debug,不能本地debug。

三、hive元数据捕捉思路

3.1 全量

使用HiveMetaStoreClient的方法直接遍历即可。

// 实际使用中更简单,只需要配置HIVE_CONF_DIR环境变量,HiveConf类就会读取hive-site.xml等xml文件内容。
val metastore=new HiveMetaStoreClient(new HiveConf())

3.2 增量

实现org.apache.hadoop.hive.metastore.MetaStoreEventListener即可
需要注意的是
(1)此类是监听的hivemetastore,即spark、flink等对Metastore的修改都会被监听到。
(2)此类获取的信息较少,不含sql。
(3)hive启动会自动创建实现类不用担心config: Configuration参数。

class HiveMetastoreHook(config: Configuration) extends MetaStoreEventListener(config) {
...
}

MetaStoreEventListener 类常用的方法如下:

public abstract class MetaStoreEventListener implements Configurable {public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {// contain create view}public void onDropTable(DropTableEvent tableEvent) throws MetaException {// contain drop view}public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {// contain alter view}public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {}public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {}
}

3.3 debug

同样参考参考:
远程remote debug hive的方法,用于hive监听器/钩子编写

四、spark数据血缘实现思路

4.1 批处理血缘实现

实现org.apache.spark.sql.util.QueryExecutionListeneronSuccess方法即可。

  /*** A callback function that will be called when a query executed successfully.** @param funcName name of the action that triggered this query.* @param qe the QueryExecution object that carries detail information like logical plan,*           physical plan, etc.* @param durationNs the execution time for this query in nanoseconds.** @note This can be invoked by multiple different threads.*/@DeveloperApidef onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit

需要注意的是:
(1)此方法一般使用post hook,即程序执行完毕后调用此方法,对于以spark-sql、spark-shell作为入口执行spark,则所有sql或者代码都会触发此钩子函数,但是对于spark-submit jar方法提交的任务,由于用户main程序执行完毕后会触发spark session close,程序实际会直接退出,连带这血缘的thread也会退出,导致血缘无法完成,不管是idea本地debug或者yarn执行都如此。
当然也可以使用(pre hook,但产生的血缘关系也是不准确的,因为任务可以取消)
(2)此方法可以加监测到batch,micro batch(stream),ml三种任务的血缘事件。本例忽略了ml机器学习相关的事件。
(3)当在spark中对hive表执行load操作也会被监测到。
(4)spark数据源分为两类:1、hive metastore表;2、datasource,例如:
在这里插入图片描述当然,parquet,csv也都可以是datasource,它的信息是不存储在hive metastore中的。
(5)继续细分spark表或datasource数据模型,可以为:hive metastore表、hdfs 文件、本地文件、jdbc表、kafka topics、hbase表,local(程序内写死的数据),console print等。

(6)spark中表实体类:

// table id 只有表名+数据库名
case class TableIdentifier(table: String, database: Option[String]) extends IdentifierWithDatabase// catalog table 即从metastore或者内存临时创建的table
case class CatalogTable(identifier: TableIdentifier,tableType: CatalogTableType,storage: CatalogStorageFormat,schema: StructType, // 存储列信息provider: Option[String] = None,partitionColumnNames: Seq[String] = Seq.empty,bucketSpec: Option[BucketSpec] = None,owner: String = "",createTime: Long = System.currentTimeMillis,lastAccessTime: Long = -1,createVersion: String = "",properties: Map[String, String] = Map.empty,stats: Option[CatalogStatistics] = None,viewText: Option[String] = None,comment: Option[String] = None,unsupportedFeatures: Seq[String] = Seq.empty,tracksPartitionsInCatalog: Boolean = false,schemaPreservesCase: Boolean = true,ignoredProperties: Map[String, String] = Map.empty)

(6)获取sparkSession方式:
有时候执行获取到表名,还需列信息,此时就需要再使用sparkSession从metastore获取。

val sessionOption: Option[SparkSession] = SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession)

(7)spark博大精深,很难弄明白。

4.2 从QueryExecution收集执行过程

// qe:QueryExecution几乎可以获取到所有信息。var outNodes: Seq[SparkPlan] = qe.sparkPlan.collect {case p: UnionExec => p.childrencase p: DataWritingCommandExec => Seq(p)case p: WriteToDataSourceV2Exec => Seq(p)case p: LeafExecNode => Seq(p)}.flattenif (qd.sink.isDefined && !outNodes.exists(_.isInstanceOf[WriteToDataSourceV2Exec])) {val sink = qd.sink.getoutNodes ++= Seq(WriteToDataSourceV2Exec(new MicroBatchWriter(0, // MicroBatchWriter就是streamnew SinkDataSourceWriter(sink)), qd.qe.sparkPlan))}// 以下内容实际提取模型的代码量较大。省略。
outNodes.flatMap {case r: ExecutedCommandExec =>{// 主要和ddl相关的,LoadDataCommand、SaveIntoDataSourceCommand除外}case r: DataWritingCommandExec =>{// 数据写入相关,例如: insert into,create table as,save into datasource}case r: WriteToDataSourceV2Exec => {// stream相关,比如kafka。建议此处忽略`MicroBatchWriter`流相关的血缘。}case ignore => {}

以下具体分析:

4.3 执行过程之RunnableCommand子类解析

分析下ExecutedCommandExec的成员的RunnableCommand

case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode // 举例子
case class SaveIntoDataSourceCommand(query: LogicalPlan, // source信息从LogicalPlan获取dataSource: CreatableRelationProvider, // 判断类型options: Map[String, String], // sink信息从此map获取mode: SaveMode) extends RunnableCommand case class LoadDataCommand(table: TableIdentifier, // 表path: String, // 路径isLocal: Boolean,isOverwrite: Boolean,partition: Option[TablePartitionSpec]) extends RunnableCommandcase class TruncateTableCommand(tableName: TableIdentifier,partitionSpec: Option[TablePartitionSpec]) extends RunnableCommand
// 剩下的都的大同小异。

4.4 执行过程之DataWritingCommand子类解析

分析下DataWritingCommandExec成员DataWritingCommand

case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan)   extends SparkPlantrait DataWritingCommand extends Command case class CreateHiveTableAsSelectCommand(tableDesc: CatalogTable, // sink表query: LogicalPlan, // source信息从LogicalPlan获取outputColumnNames: Seq[String],mode: SaveMode)extends DataWritingCommandcase class CreateDataSourceTableAsSelectCommand(table: CatalogTable, // sink表mode: SaveMode,query: LogicalPlan, // source信息从LogicalPlan获取outputColumnNames: Seq[String])extends DataWritingCommandcase class InsertIntoHiveDirCommand(isLocal: Boolean,storage: CatalogStorageFormat, // location 信息query: LogicalPlan,  // source信息从LogicalPlan获取overwrite: Boolean,outputColumnNames: Seq[String]) extends SaveAsHiveFile
// 剩下的都的大同小异。

可以看到血缘信息都藏在query: LogicalPlan中。
重点来了,LogicalPlan分析:

4.4.1 执行过程之LogicalPlan解析血缘

随意找个LogicalPlan的子类InsertIntoTable看看。

case class InsertIntoTable(table: LogicalPlan,partition: Map[String, Option[String]],query: LogicalPlan,overwrite: Boolean,ifPartitionNotExists: Boolean)extends LogicalPlan

子类太多了分析不过来。

解决:分析血缘从logicalPlan入手采用collectLeaves方法即可,收集最末端的叶子节点。

    val children = logicalPlan.collectLeaves()val res: Seq[BaseEntityElement] = children.flatMap {case r: HiveTableRelation => {// hive metastore table// 只有一个单表}case v: View => {// hive metastore view// 只有一个单视图}case UnresolvedRelation(tblId) => {// case class UnresolvedRelation(tableIdentifier: TableIdentifier) extends LeafNode extends IdentifierWithDatabase// 就是表信息,但是不含列信息等 case class TableIdentifier(table: String, database: Option[String]),含表名称,可能含数据库名称// 如果需要表详细信息可从SparkSession获取。}case JDBCParser(jdbcs) => {// jdbc source // 可能多个jdbc表或者模糊匹配}case KafkaParser(kafkas) => {// kafka source // 可能多个topic或者模糊匹配}case LogicalRelation(relation, _, catalogTable, _) =>// must be at last!!!if (catalogTable.isDefined) {// metastore 表信息} else relation match {case fileRelation: FileRelation => // hdfs路径:文件或者文件夹信息case _ => Seq.empty}case l: LocalRelation =>{// 无法被识别到的认定是本地输入 ,即写死的数据。}case e =>LOG.warn(s"Missing unknown leaf node: $e")Seq.empty}res}

解析jdbc和kafka信息比较复杂,spark实体类可能是包级private,无法直接访问需要使用反射获取成员内容,具体参考spark-atlas-connector项目。
com.hortonworks.spark.atlas.sql.CommandsHarvester.JDBCEntities
com.hortonworks.spark.atlas.sql.CommandsHarvester.KafkaEntities

本例只是利用了scala的match case - unapply的模式匹配特性使代码更好看,unapply方法返回Some(实体类)则匹配成功,返回None则匹配失败,会继续下一个case情形。

object JDBCParser {private val JDBC_RELATION_CLASS_NAME ="org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation"private val JDBC_PROVIDER_CLASS_NAME ="org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider"def unapply(plan: LogicalPlan): Option[自定义Table实体类] = {// 具体参考altas}

4.5 执行过程之DataSourceWriter子类解析

分析下WriteToDataSourceV2Exec的成员DataSourceWriter

case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) extends SparkPlanpublic interface DataSourceWriter{ ... }
// stream的写入类信息,
class MicroBatchWriter(batchId: Long, val writer: StreamWriter) extends DataSourceWriter
// StreamWriter的实现之一就是KafkaWriterCommitMessage,能获取到topic和boostrapServers
class KafkaStreamWriter(topic: Option[String], producerParams: Map[String, String], schema: StructType) StreamWriter// 可以获取到topic和boostrapServers。
class KafkaStreamWriter(topic: Option[String], producerParams: Map[String, String], schema: StructType) StreamWriter 
}
// 控制台输出,也是以stream方式
class ConsoleWriter(schema: StructType, options: DataSourceOptions)extends StreamWriter

4.6 流处理血缘实现

实现org.apache.spark.sql.streaming.StreamingQueryListener即可。
需要注意的是:
流执行过程中每隔一段时间onQueryProgress就会触发一次,所以需增加设计,记录首次触发,忽略后续所有触发。

// 只能使用onQueryProgress方法,onQueryStarted和onQueryTerminated获取不到血缘信息
def onQueryProgress(event: QueryProgressEvent): Unit={val query: StreamingQuery = SparkSession.active.streams.get(event.progress.id)if (query != null) {val qd = query match {case query: StreamingQueryWrapper =>... case query: StreamExecution =>... case _ =>LOG.warn(s"Unexpected type of streaming query: ${query.getClass}")None}}

4.7 spark血缘的debug方式

十分简单,本地debug即可,以idea为例:
因为spark driver运行在本地debug的jvm
如下内容即可:

    val builder = SparkSession.builderbuilder.appName("lk-spark-local").master("local[*]").config("spark.sql.queryExecutionListeners", "com.test.MySparkEventTracker")//  .config("spark.extraListeners", "com.test.MySparkEventTracker").config("spark.sql.streaming.streamingQueryListeners", "com.test.MySparkStreamingEventTracker").config("spark.sql.streaming.checkpointLocation","hdfs:///tmp/spark/chkp") // 本地debug需要设置此项否则会出现checkpoint找不到情况。.enableHiveSupport() //开启hive metastore支持val spark = builder.getOrCreate()spark.sql("your sql")// sql or dsl are all ok to run!val jdbcDataframe = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&rewriteBatchedStatements=true").option("dbtable", "test.sometbl").option("user", "root").option("password", "123456").load()
// 注意以下内容!!!
spark.sql("some big query should follow your lineage test sql or code!!!")

需要注意的是:
因为本地调用spark在spark执行后会直接退出spark session主程序,为了延缓spark session退出,需要在测试血缘的sql或者code后,跟一个大sql,运行时间最好控制在2s以上。

4.8 集群部署

所有节点添加jar包,并修改spark-defaults.conf添加配置项

# spark.extraListeners com.test.MySparkEventTracker
spark.sql.queryExecutionListeners com.test.MySparkEventTracker
spark.sql.streaming.streamingQueryListeners com.test.MySparkStreamingEventTracker

4.8 spark血缘后语

org.apache.spark.scheduler.SparkListener也是监听器,但其主要监听任务信息,其onOtherEvent也能监听query等事件,稍麻烦,本例不采用。

override def onOtherEvent(event: SparkListenerEvent): Unit = { }

五、参考文章

数据血缘工具大PK

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 只强的Java学习之路8-5
  • 【L1.第二章】如何搭建 Appium 环境与配置
  • 【STM32 FreeRTOS】任务
  • 227还原实战(五)控制流专题
  • 抽象代数精解【6】
  • RabbitMQ使用Jackson进行消息队列的对象传输
  • CSP 2019 第四题: 加工零件
  • 量产工具——复习及改进(后附百问网课程视频链接)
  • 数字信号处理2: 离散信号与系统的频谱分析
  • 解决客户访问超时1s问题
  • C#如何对某个词在字符串中出现的次数进⾏计数(LINQ)
  • YOLOX修改检测框、标签文字的粗细大小
  • 产业链分析指南:产业链分析的七个步骤!
  • <数据集>电梯内人车识别数据集<目标检测>
  • 14. 计算机网络HTTPS协议(二)
  • 「译」Node.js Streams 基础
  • 【刷算法】从上往下打印二叉树
  • 2017-09-12 前端日报
  • C++回声服务器_9-epoll边缘触发模式版本服务器
  • css布局,左右固定中间自适应实现
  • es的写入过程
  • FastReport在线报表设计器工作原理
  • Hibernate最全面试题
  • If…else
  • JavaScript设计模式系列一:工厂模式
  • JAVA之继承和多态
  • js ES6 求数组的交集,并集,还有差集
  • js作用域和this的理解
  • Redis的resp协议
  • SQLServer插入数据
  • Traffic-Sign Detection and Classification in the Wild 论文笔记
  • 笨办法学C 练习34:动态数组
  • 多线程 start 和 run 方法到底有什么区别?
  • 关于 Linux 进程的 UID、EUID、GID 和 EGID
  • 后端_MYSQL
  • 基于组件的设计工作流与界面抽象
  • 利用jquery编写加法运算验证码
  • 世界上最简单的无等待算法(getAndIncrement)
  • 微信小程序填坑清单
  • 我的zsh配置, 2019最新方案
  • 智能网联汽车信息安全
  • 自定义函数
  • [Shell 脚本] 备份网站文件至OSS服务(纯shell脚本无sdk) ...
  • ​低代码平台的核心价值与优势
  • ​软考-高级-系统架构设计师教程(清华第2版)【第15章 面向服务架构设计理论与实践(P527~554)-思维导图】​
  • ​字​节​一​面​
  • #APPINVENTOR学习记录
  • (C#)if (this == null)?你在逗我,this 怎么可能为 null!用 IL 编译和反编译看穿一切
  • (补充)IDEA项目结构
  • (多级缓存)缓存同步
  • (生成器)yield与(迭代器)generator
  • (四)React组件、useState、组件样式
  • (原創) 人會胖會瘦,都是自我要求的結果 (日記)
  • (转)http-server应用
  • (转)Java socket中关闭IO流后,发生什么事?(以关闭输出流为例) .