当前位置: 首页 > news >正文

Metacat实现原理解析

本文首发微信公众号:码上观世界

Metacat 是Netflix开源的元数据管理平台, 它的三个主要目标是:

  • 提供元数据系统的联合视图

  • 用于数据集元数据的统一 API

  • 支持业务和用户元数据存储

本文从系统特性与架构、Metacat元数据存取模型以及技术实现方面来讲述Metacat的实现原理。

Metacat特性与架构

743077dcc5dbb84509de706194d5a035.png

Metacat本身不存储数据源的元数据,只存储跟数据源相关的业务元数据和用户自定义元数据。从高层视角,可以将Metacat的特性归为以下几类:

  • 数据抽象和互操作性:通过引入通用的抽象层,提供统一的访问API,不同的引擎可以交互访问这些数据集。为便于与 Spark、Flink和 Trino等 集成,提供支持 Hive 的 Thrift 接口。

  • 存储业务和用户自定义的元数据:统一元存储只存储技术元数据,实际上,还会有部分业务元数据和用户自定义元数据,例如 RDS 数据源)、配置信息、度量指标(Hive/S3 分区和表)以及数据表的 TTL(生存时间)等。它们是一种自由格式的元数据,可由用户根据自己的用途进行定义。

业务元数据也可以大致分为逻辑元数据和物理元数据。有关逻辑结构(如表)的业务元数据被视为逻辑元数据。我们使用元数据进行数据分类和标准化我们的 ETL 处理流程。数据表的所有者可在业务元数据中提供数据表的审计信息。他们还可以为列提供默认值和验证规则,在写入数据时会用到这些。存储在表中或分区中的实际数据的元数据被视为物理元数据。我们的 ETL 处理在完成作业时会保存数据的度量标准, 在稍后用于验证。相同的度量可用来分析数据的成本和空间。因为两个表可以指向相同的位置(如 Hive), 所以要能够区分逻辑元数据与物理元数据。两个表可以具有相同的物理元数据,但应该具有不同的逻辑元数据。

  • 数据发现:作为数据的消费者,我们应该能够轻松发现和浏览各种数据集。为提升查询效率和能力,需要将Schema元数据和业务及用户定义的元数据发布到 Elasticsearch,以便进行全文搜索。SQL 编辑器因此能够实现 SQL 语句的自动建议和自动完成功能。将数据集组织为Catalog有助于消费者浏览信息,根据不同的主题使用标签对数据进行分类。我们还使用标签来识别表格,进行数据生命周期管理。

  • 数据变更审计和通知:作为数据存储的中央网关,统一元数据能够捕获所有元数据变更和数据更新,通过构建基于事件驱动的系统架构,将元数据变更通知发布到消息系统,不仅有助于上下游系统解耦,还有助于下游系统响应的及时性。

  • Schema和元数据的版本控制:用于提供数据表的历史记录。例如,跟踪特定列的元数据变更,或查看表的大小随时间变化的趋势。能够查看过去某个时刻元数据的信息对于审计、调试以及重新处理和回滚来说都非常有用。

  • Hive Metastore 优化:由 RDS 支持的 Hive Metastore 在高负载下表现不佳。我们已经注意到,在使用元数据存储 API 写入和读取分区方面存在很多问题。为此,我们不再使用这些 API。我们对 Hive 连接器(在读写分区时,该连接器直接与 RDS 通信)进行了改进。之前,添加数千个分区的 Hive Metastore 调用通常会超时,在重新实现后,这不再是个问题。另一个问题是,Hive Metastore Server配置项是静态的,无法支持动态添加配置,特别是在需要支持多种外部存储(如S3、HDFS等)的场景。我们通过绕过原有的Thrfit RPC接口访问方式,根据原生底层API跟Hive Metastore交互,不仅提升了访问速率,还可以实现动态传递任意原来不能传递的Configuration配置信息。

88ee85aabad89ca1c07c37e49de5e0cc.png

Metacat元数据存取模型

这里按照从Controller API 请求开始,到对底层数据源的元数据存取的流程来看看具体涉及到的系统组件和功能,在Controller API层Metacat

定义了一套专有的数据结构,比如CatalogDto、DatabaseDto、TableDto等。以创建数据表为例,首先创建Catalog,然后再创建Database,最后再创建表,当然数据库可以使用系统默认数据库。不管请求哪种元数据服务接口,系统都会按照下图右侧的逻辑流程处理:

86a15c92a5f7854c056c798750dbe643.png

这些组件的UML关系可表示为:

076d066cb21abde5cbe2a607d680d1ef.png

这个图表示了系统组件的宏观视图。

PluginManager

PluginManager通过SPI动态加载系统中注册的ConnectorPlugin,每种ConnectorPlugin代表一种数据源。比如系统中注册的Hive和Mysql:

com.netflix.metacat.connector.hive.HiveConnectorPlugin
com.netflix.metacat.connector.mysql.MySqlConnectorPlugin

3023f1346a6e63a4654c5ba319577254.png

public class PluginManager {
    private final ConnectorManager connectorManager;
    
    public void loadPlugins() throws Exception {
        final ServiceLoader<ConnectorPlugin> serviceLoader =
            ServiceLoader.load(ConnectorPlugin.class, this.getClass().getClassLoader());
        final List<ConnectorPlugin> connectorPlugins = ImmutableList.copyOf(serviceLoader);
        for (ConnectorPlugin connectorPlugin : connectorPlugins) {
            log.info("Installing {}", connectorPlugin.getClass().getName());
            this.installPlugin(connectorPlugin);
            log.info("-- Finished loading plugin {} --", connectorPlugin.getClass().getName());
        }
    }
    private void installPlugin(final ConnectorPlugin connectorPlugin) {
        this.connectorManager.addPlugin(connectorPlugin);
    }
}

PluginManager将注册的数据源注册到ConnectorManager:

public class ConnectorManager {
    // Map of connector plugins registered.
    private final ConcurrentMap<String, ConnectorPlugin> plugins = new ConcurrentHashMap<>();
    public void addPlugin(final ConnectorPlugin connectorPlugin) {
    plugins.put(connectorPlugin.getType(), connectorPlugin);
    }
}

有了每种数据源相应的ConnectorPlugin,就能够创建对应数据源的各种元数据请求服务,比如catalogService,databaseService,tableServices等:

public synchronized void createConnection(final ConnectorContext connectorContext) {
    final String connectorType = connectorContext.getConnectorType();
    final String catalogName = connectorContext.getCatalogName();
    final ConnectorPlugin connectorPlugin = plugins.get(connectorType);
    if (connectorPlugin != null) {
        final ConnectorFactory connectorFactory = connectorPlugin.create(connectorContext);
        catalogServices.add(connectorFactory.getCatalogService());
        databaseServices.add(connectorFactory.getDatabaseService());
        tableServices.add(connectorFactory.getTableService());
        partitionServices.add(connectorFactory.getPartitionService());
    }
}

ConnectorFactory

ConnectorFactory是一个声明了获取各种元数据服务的接口:

ce6765524223edf964c0bf8146d851a1.png

主要有两种实现:

HiveConnectorFactory 
MySqlConnectorFactory

两种ConnectorFactory实现方式大不相同,前者通过HMS RPC方式获取元数据,后者通过JDBC获取元数据。但对外暴露接口一样,以Hive数据源为例说明:

4a77ce1462872d1aee2286552dfda843.png

对Hive,除了通过Thrift RPC方式,Metacat还实现了另外一套快速访问元数据的方式。

HiveConnectorFactory通过Spring bean注册和获取相应的服务:

public class HiveConnectorFactory extends SpringConnectorFactory {
    @Override
    public ConnectorCatalogService getCatalogService() {
        return this.ctx.getBean(HiveConnectorCatalogService.class);
    }
    @Override
    public ConnectorDatabaseService getDatabaseService() {
        return this.ctx.getBean(HiveConnectorDatabaseService.class);
    }
    @Override
    public ConnectorTableService getTableService() {
        return this.ctx.getBean(HiveConnectorTableService.class);
    }
}

而MySqlConnectorFactory通过Guice注册和获取服务。

ConnectorCatalogService

目前只要Hive3 数据源支持实现ConnectorCatalogService,比如:

public class HiveConnectorCatalogService implements ConnectorCatalogService {
    private final IMetacatHiveClient metacatHiveClient;
    @Override
    public void create(final ConnectorRequestContext requestContext, final CatalogInfo catalogInfo) {
        final QualifiedName catalogName = catalogInfo.getName();
        metacatHiveClient.createCatalog(hiveMetacatConverters.fromCatalogInfo(catalogInfo));
}

ConnectorDatabaseService

Hive实现的创建数据库方法:

public class HiveConnectorDatabaseService implements ConnectorDatabaseService {
    private final IMetacatHiveClient metacatHiveClient;
   
     @Override
    public void create(final ConnectorRequestContext requestContext, final DatabaseInfo databaseInfo) {
    final QualifiedName databaseName = databaseInfo.getName();
        this.metacatHiveClient.createDatabase(hiveMetacatConverters.fromDatabaseInfo(databaseInfo));
    }
}

Mysql实现的创建数据库方法:

public class JdbcConnectorDatabaseService implements ConnectorDatabaseService {
    @Override
    public void create(@Nonnull final ConnectorRequestContext context, @Nonnull final DatabaseInfo resource) {
        final String databaseName = resource.getName().getDatabaseName();
        log.debug("Beginning to create database {} for request {}", databaseName, context);
        try (final Connection connection = this.dataSource.getConnection()) {
            JdbcConnectorUtils.executeUpdate(connection, "CREATE DATABASE " + databaseName);
            log.debug("Finished creating database {} for request {}", databaseName, context);
        } catch (final SQLException se) {
            throw this.exceptionMapper.toConnectorException(se, resource.getName());
        }
}

ConnectorTableService

Hive实现的创建表方法:

public class HiveConnectorTableService implements ConnectorTableService {
    @Override
    public void create(final ConnectorRequestContext requestContext, final TableInfo tableInfo) {
        final QualifiedName tableName = tableInfo.getName();
        final Table table = hiveMetacatConverters.fromTableInfo(tableInfo);
        updateTable(requestContext, table, tableInfo);
        metacatHiveClient.createTable(table);
    }
}

以上是ConnectorManager层的实现方式,实际上,在Controller层和ConnectorManager之间还有一个Service层,实现跟具体的Connector无关的业务逻辑,比如从Controller层 DTO对象到Connector对象类型的转换,业务元数据和用户元数据的校验和存取,以及元数据的版本管理和消息订阅等功能。

相关文章:

  • MTK Camera Senor Bring up 复盘总结
  • map有关的运算符重载
  • Java项目硅谷课堂学习笔记-P9-整合网关与实现订单和营销管理模块
  • 我的周刊(第055期)
  • 【GraphSAGE实践】YelpChi评论图数据集上的反欺诈检测
  • 基于单目和低成本GPS的车道定位方法
  • MyBatisPlus——多表查询——多条件查询——分页查询
  • 面向跨模态匹配的噪声关联学习
  • Java中的反射
  • java毕业设计旅游网站设计mybatis+源码+调试部署+系统+数据库+lw
  • Java基础进阶-序列化
  • 02.3 线性代数
  • java毕业设计木材产销系统的生产管理模块mybatis+源码+调试部署+系统+数据库+lw
  • Nuxt - 自定义页面布局,<Nuxt /> 个性化多套模板(一个项目内既要有用户正常浏览的普通页面,又要存在后台管理系统布局的页面)
  • java毕业设计拉萨旅游自助民宿平台mybatis+源码+调试部署+系统+数据库+lw
  • 【140天】尚学堂高淇Java300集视频精华笔记(86-87)
  • Angular数据绑定机制
  • CAP 一致性协议及应用解析
  • css的样式优先级
  • EOS是什么
  • extract-text-webpack-plugin用法
  • Go 语言编译器的 //go: 详解
  • java8-模拟hadoop
  • Java应用性能调优
  • js中forEach回调同异步问题
  • KMP算法及优化
  • Node.js 新计划:使用 V8 snapshot 将启动速度提升 8 倍
  • react-native 安卓真机环境搭建
  • React的组件模式
  • Webpack4 学习笔记 - 01:webpack的安装和简单配置
  • 分享自己折腾多时的一套 vue 组件 --we-vue
  • 基于Javascript, Springboot的管理系统报表查询页面代码设计
  • 配置 PM2 实现代码自动发布
  • 实战:基于Spring Boot快速开发RESTful风格API接口
  • 使用 Xcode 的 Target 区分开发和生产环境
  • 想写好前端,先练好内功
  • 小李飞刀:SQL题目刷起来!
  • 译有关态射的一切
  • TPG领衔财团投资轻奢珠宝品牌APM Monaco
  • ​如何在iOS手机上查看应用日志
  • # 数据结构
  • #Linux(make工具和makefile文件以及makefile语法)
  • $.type 怎么精确判断对象类型的 --(源码学习2)
  • (02)Cartographer源码无死角解析-(03) 新数据运行与地图保存、加载地图启动仅定位模式
  • (04)Hive的相关概念——order by 、sort by、distribute by 、cluster by
  • (3)(3.2) MAVLink2数据包签名(安全)
  • (31)对象的克隆
  • (4)事件处理——(7)简单事件(Simple events)
  • (C#)Windows Shell 外壳编程系列9 - QueryInfo 扩展提示
  • (HAL库版)freeRTOS移植STMF103
  • (八)Flask之app.route装饰器函数的参数
  • (二)Pytorch快速搭建神经网络模型实现气温预测回归(代码+详细注解)
  • (附源码)spring boot北京冬奥会志愿者报名系统 毕业设计 150947
  • (三维重建学习)已有位姿放入colmap和3D Gaussian Splatting训练
  • (一)SpringBoot3---尚硅谷总结