当前位置: 首页 > news >正文

Spark SQL 血缘解析方案

背景

项目背景建设数据中台,往往数据开发人员首先需要能够通过有效的途径检索到所需要的数据,然后根据检索的数据模型进行业务加工然后得到一些中间模型,最后再通过数据抽取工具或者OLAP分析工具直接将数据仓库中加工好的公共模型输出到应用层。这里我不在去介绍数据仓库为何需要分层以及该如何分层,这个逻辑已经有很多厂商在业务中实践过,这里就不再赘述,本次主要需要解决的事数据链路加工血缘采集的方案。本着知识积累的原则记录一下方案。

Hive DDL采集和血缘

目前这个是最简单的,如果没有特殊的需求,可以直接对Apache Atlas中的hive hook进行裁剪,最终可以得到业务所需的血缘采集插件,一般可以到字段级别血缘。

Spark SQL血缘采集

目前针对Spark SQL血缘采集,首先DDL元数据采集依旧使用Apache Atlas中的hive Hook,因为即使使用Spark操作Hive也是最终链接的是hive的metastore数据库。现在主要解决的是Spark SQL计算中如何记录下血缘信息:

  • 方案1:
    如果用过Kyuubi的同学可能知道在该项目的源码中已经集成了Spark SQL血缘采集的板块,这块同样如果需要可以直接裁剪出来。但是这里小编不推荐,因为这个插件解析出来的信息不算是多么丰富,在某些场景下的血缘解析甚至无法正确解析出来。项目地址https://github.com/apache/kyuubi/tree/master/extensions/spark/kyuubi-spark-lineage
    Kyuubi Spark Lineage
  • 方案2:
    开源真的很强大,除了kyuubi产品之外,还有个比较强大的产品Apche Linkis,在这个产品里面也集成了Spark SQL血缘,这个工具解析比较全面给出的信息也比较多,解析的准确率很高。但是输出的json结构比较复杂,这点其实无所谓了,我们可以在了解完它的结构之后,可以对结果进行处理。项目地址https://github.com/AbsaOSS/spline-spark-agent,项目打包也很简单直接选择scala-2.12和spark-xxx即可打包。原生插件集成步骤很多,这里小编就介绍一下kafka的集成。
  • 拷贝kafka-clients-2.4.0.jar和spark-版本-spline-agent-bundle_2.12-2.0.0.jar到spark安装目录下的jar目录
  • 配置conf/spark-default.conf文件
spark.sql.queryExecutionListeners	za.co.absa.spline.harvester.listener.SplineQueryExecutionListener
spark.spline.lineageDispatcher	kafka
spark.spline.lineageDispatcher.kafka.topic	linkis_spark_lineage
spark.spline.lineageDispatcher.kafka.producer.bootstrap.servers	localhost:9092
# 添加额外属性,适合多租户场景下的血缘采集
spark.spline.postProcessingFilter	userExtraMeta
spark.spline.postProcessingFilter.userExtraMeta.className	za.co.absa.spline.harvester.postprocessing.metadata.MetadataCollectingFilter
spark.spline.postProcessingFilter.userExtraMeta.rules	{\"executionPlan\":{\"extra\":{\"companyCode\":\"1200202023020320\"\\,\"originQuery\":{\"$js\":\"session.conf().get('sql'\\,'')\"}}}}

到这里就可以启动Spark SQL客户端查看效果,例如小编执行如下sql

CREATE TABLE test.t_order (id INT,uid INT,amount INT,price DOUBLE,c_time TIMESTAMP);CREATE TABLE test.t_user (uid INT,name STRING,age INT
);CREATE TABLE test.t_order_detail (id INT,name STRING,cost DOUBLE,c_time TIMESTAMP
);
set sql= insert into t_order_detail select o.id,u.name,(o.amount * o.price) as cost,o.c_time from t_user u left join t_order o on o.uid=u.uid;insert into t_order_detail select o.id,u.name,(o.amount * o.price) as cost,o.c_time from t_user u left join t_order o on o.uid=u.uid;

消费kafka的topiclinkis_spark_lineage可以消费到如下数据:

{"id": "49a81e8e-51f2-5a05-96c3-bc22a1bc3f81","name": "SparkSQL::10.253.30.205","operations": {"write": {"outputSource": "file://ZBMac-C02CW08SM:8020/Users/jiangzhongzhou/Software/bigdata2.0/spark-3.5.0-bin-hadoop-3.2.x/spark-warehouse/test.db/t_order_detail","append": true,"id": "op-0","name": "InsertIntoHiveTable","childIds": ["op-1"],"params": {"table": {"identifier": {"table": "t_order_detail","database": "test"},"storage": "Storage(Location: file:/Users/jiangzhongzhou/Software/bigdata2.0/spark-3.5.0-bin-hadoop-3.2.x/spark-warehouse/test.db/t_order_detail, Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, Storage Properties: [serialization.format=1])"}},"extra": {"destinationType": "hive"}},"reads": [{"inputSources": ["file://ZBMac-C02CW08SM:8020/Users/jiangzhongzhou/Software/bigdata2.0/spark-3.5.0-bin-hadoop-3.2.x/spark-warehouse/test.db/t_user"],"id": "op-5",

相关文章:

  • 【Apache Doris】周FAQ集锦:第 7 期
  • python,ipython 和 jupyter notebook 之间的关系
  • 什么是N卡和A卡?有什么区别?
  • Python设计模式 - 简单工厂模式
  • Linux驱动开发笔记(十一)tty子系统及其驱动
  • AMSR/ADEOS-II L1A Raw Observation Counts V003地球表面和大气微波辐射的详细观测数据
  • 计算机组成原理笔记-第1章 计算机系统概论
  • 大疆无人机航点飞行KMZ文件提取航点坐标
  • 保存和调取得分的简易方法
  • Github 2024-06-19 C开源项目日报 Top9
  • C#面: 能够将非静态的方法覆写成静态方法吗?
  • Jenkins macos 下 failed to create dmg 操作不被允许hdiutil: create failed - 操作不被允许?
  • 使用Redis优化Java应用的性能
  • 如何将 ChatGPT 集成到你的应用中
  • lua中的lfs库介绍
  • Google 是如何开发 Web 框架的
  • 《Java编程思想》读书笔记-对象导论
  • 「前端早读君006」移动开发必备:那些玩转H5的小技巧
  • 【EOS】Cleos基础
  • 【知识碎片】第三方登录弹窗效果
  • 0基础学习移动端适配
  • dva中组件的懒加载
  • Git初体验
  • If…else
  • JavaScript标准库系列——Math对象和Date对象(二)
  • Rancher如何对接Ceph-RBD块存储
  • Redis 懒删除(lazy free)简史
  • Shadow DOM 内部构造及如何构建独立组件
  • uva 10370 Above Average
  • vue.js框架原理浅析
  • 阿里云应用高可用服务公测发布
  • 大型网站性能监测、分析与优化常见问题QA
  • 力扣(LeetCode)357
  • 深度学习在携程攻略社区的应用
  • 微信支付JSAPI,实测!终极方案
  • scrapy中间件源码分析及常用中间件大全
  • # Python csv、xlsx、json、二进制(MP3) 文件读写基本使用
  • ###STL(标准模板库)
  • ( 10 )MySQL中的外键
  • (22)C#传智:复习,多态虚方法抽象类接口,静态类,String与StringBuilder,集合泛型List与Dictionary,文件类,结构与类的区别
  • (Oracle)SQL优化技巧(一):分页查询
  • (Ruby)Ubuntu12.04安装Rails环境
  • (附源码)ssm基于微信小程序的疫苗管理系统 毕业设计 092354
  • (十三)Flink SQL
  • (转)Android学习系列(31)--App自动化之使用Ant编译项目多渠道打包
  • (转)h264中avc和flv数据的解析
  • (转)程序员疫苗:代码注入
  • (轉貼) UML中文FAQ (OO) (UML)
  • (总结)Linux下的暴力密码在线破解工具Hydra详解
  • .bat批处理(十一):替换字符串中包含百分号%的子串
  • .NET Compact Framework 多线程环境下的UI异步刷新
  • .net core webapi 部署iis_一键部署VS插件:让.NET开发者更幸福
  • .Net Core与存储过程(一)
  • .NET Framework 3.5安装教程
  • .NET 简介:跨平台、开源、高性能的开发平台