当前位置: 首页 > news >正文

Iceberg与SparkSQL整合DDL操作

前言

使用SparkSql操作Iceberg表之前我们得先配置好catalog,配置方式参考这篇博客。

创建非分区表

Spark3使用USING iceberg来创建表:

CREATE TABLE prod.db.sample (id bigint NOT NULL COMMENT 'unique id',data string)
USING iceberg;

这里的数据类型,我们就用Spark的数据类型,iceberg会自动转成对应的iceberg类型。其实基本上一模一样,可以参考官网查看。

参数:

  • PARTITIONED BY (partition-expressions) :配置分区
  • LOCATION ‘(fully-qualified-uri)’ :指定表路径
  • COMMENT ‘table documentation’ :配置表备注
  • TBLPROPERTIES (‘key’=‘value’, …) :配置表属性

对 Iceberg 表的每次更改都会生成一个新的元数据文件(json 文件)以提供原子性。默认情况下,旧元数据文件作为历史文件保存不会删除。如 果 要 自 动 清 除 元 数 据 文 件 , 在 表 属 性 中 设 置
write.metadata.delete-after-commit.enabled=true 。 这 将 保 留 一 些 元 数 据 文 件 ( 直 到
write.metadata.previous-versions-max),并在每个新创建的元数据文件之后删除旧的元数据文件。

创建分区表

分区使用PARTITIONED BY来指定。

  1. 非隐藏分区
CREATE TABLE prod.db.sample (id bigint,data string,category string)
USING iceberg
PARTITIONED BY (category);
  1. 隐藏分区
CREATE TABLE prod.db.sample (id bigint,data string,category string,ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category);

支持的隐藏分区转换函数有:

  • year(ts):按年划分
  • month(ts):按月划分
  • day(ts)或 date(ts):等效于 dateint 分区
  • hour(ts)或 date_hour(ts):等效于 dateint 和 hour 分区
  • bucket(N, col):按哈希值划分 mod N 个桶
  • truncate(L, col):按截断为 L 的值划分,字符串被截断为给定的长度;整型和长型截断为 bin: truncate(10, i)生成分区 0,10,20,30,…。

老的函数years(ts), months(ts), days(ts) and hours(ts)也支持。

CATS(CREATE TABLE … AS SELECT)建表

当使用SparkCatalog时,Iceberg支持将CTAS作为原子操作。但在使用SparkSessionCatalog时不是原子的。所以我们一般最好用SparkCatalog。否则会有一些潜在的风险。

CREATE TABLE prod.db.sample
USING iceberg
AS SELECT ...

新创建的表不会继承源表的分区和属性,可以使用CTAS中的PARTITIONED BY和TBLPROPERTIES来声明新表的分区和属性。

CREATE TABLE prod.db.sample
USING iceberg
PARTITIONED BY (part)
TBLPROPERTIES ('key'='value')
AS SELECT ...

RTAS(REPLACE TABLE … AS SELECT)建表

当使用SparkCatalog时,Iceberg支持将RTAS作为原子操作。但在使用SparkSessionCatalog时不是原子性的。所以我们一般最好用SparkCatalog。否则会有一些潜在的风险。

替换的表会根据select查询的结果创建新的快照,但是会保留原表的历史记录。

REPLACE TABLE prod.db.sample
USING iceberg
AS SELECT ...
REPLACE TABLE prod.db.sample
USING iceberg
PARTITIONED BY (part)
TBLPROPERTIES ('key'='value')
AS SELECT ...
CREATE OR REPLACE TABLE prod.db.sample
USING iceberg
AS SELECT ...

如果我们仅仅是替换表中的数据,而不改变表的结构或属性,那么用INSERT OVERWRITE来替换REPLACE

删除表

  1. 删除表
DROP TABLE prod.db.sample;
  1. 删除表和数据
DROP TABLE prod.db.sample PURGE;

在 0.14 之前,运行 DROP TABLE 将从 catalog 中删除表并删除表内容。
从 0.14 开始,DROP TABLE 只会从 catalog 中删除表,不会删除数据。为了删除表内容,应该使用 DROP table PURGE

修改表

Iceberg 在 Spark 3 中完全支持 ALTER TABLE,包括:

  • 重命名表
  • 设置或删除表属性
  • 添加、删除和重命名列
  • 添加、删除和重命名嵌套字段
  • 重新排序顶级列和嵌套结构字段
  • 扩大 int、float 和 decimal 字段的类型
  • 将必选列变为可选

此外,还可以使用 SQL 扩展来添加对分区演变的支持和设置表的写顺序。

  1. 修改表名(ALTER TABLE … RENAME TO)
ALTER TABLE prod.db.sample RENAME TO prod.db.new_name;
  1. 修改表属性(ALTER TABLE … SET(UNSET) TBLPROPERTIES)
ALTER TABLE prod.db.sample SET TBLPROPERTIES ('read.split.target-size'='268435456'
);

包括修改comment

ALTER TABLE prod.db.sample SET TBLPROPERTIES ('comment' = 'A table comment.'
);

USET可以移除属性

ALTER TABLE prod.db.sample UNSET TBLPROPERTIES ('read.split.target-size');
  1. 添加列(ALTER TABLE … ADD COLUMN)
ALTER TABLE hadoop_prod.default.sample
ADD COLUMNS (
category string comment 'new_column'
)
-- 添加 struct 类型的列
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN point struct<x: double, y: double>;
-- 往 struct 类型的列中添加字段
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN point.z double
-- 创建 struct 的嵌套数组列
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN points array<struct<x: double, y: double>>;
-- 在数组中的结构中添加一个字段。使用关键字'element'访问数组的元素列。
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN points.element.z double
-- 创建一个包含 Map 类型的列,key 和 value 都为 struct 类型
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN pointsm map<struct<x: int>, struct<a: int>>;
-- 在 Map 类型的 value 的 struct 中添加一个字段。
ALTER TABLE hadoop_prod.default.sample
ADD COLUMN pointsm.value.b int

在 Spark 2.4.4 及以后版本中,可以通过添加 FIRST 或 AFTER 子句在任何位置添加

ALTER TABLE hadoop_prod.default.sample
ADD COLUMN new_column bigint AFTER idALTER TABLE hadoop_prod.default.sample
ADD COLUMN new_column bigint FIRST
  1. 修改列名(ALTER TABLE … RENAME COLUMN)
ALTER TABLE prod.db.sample RENAME COLUMN data TO payload;
ALTER TABLE prod.db.sample RENAME COLUMN location.lat TO latitude;
  1. 修改类型(ALTER TABLE … ALTER COLUMN)

注意:只允许安全的转换

ALTER TABLE prod.db.sample ALTER COLUMN measurement TYPE double;
  1. 修改注释(ALTER TABLE … ALTER COLUMN)
ALTER TABLE prod.db.sample ALTER COLUMN measurement TYPE double COMMENT 'unit is bytes per second';
ALTER TABLE prod.db.sample ALTER COLUMN measurement COMMENT 'unit is kilobytes per second';
  1. 修改列顺序
ALTER TABLE prod.db.sample ALTER COLUMN col FIRST;
ALTER TABLE prod.db.sample ALTER COLUMN nested.col AFTER other_col;
  1. 修改列是否允许为NULL
ALTER TABLE prod.db.sample ALTER COLUMN id DROP NOT NULL;

ALTER COLUMN 不用于更新 struct 类型。使用 ADD COLUMN 和 DROP COLUMN 添加或删除 struct 类型的字段。

  1. 删除列(ALTER TABLE … DROP COLUMN)
ALTER TABLE prod.db.sample DROP COLUMN id;
ALTER TABLE prod.db.sample DROP COLUMN point.z;
  1. 添加分区(Spark3,需要配置扩展,ALTER TABLE … ADD PARTITION FIELD)
    扩展配置,我们这篇博客也做了简单介绍,一般我们都会一次性修改Spark配置:
vim spark-default.conf
spark.sql.extensions =org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
ALTER TABLE prod.db.sample ADD PARTITION FIELD catalog; -- identity transform

修改分区转换:

ALTER TABLE prod.db.sample ADD PARTITION FIELD bucket(16, id);
ALTER TABLE prod.db.sample ADD PARTITION FIELD truncate(4, data);
ALTER TABLE prod.db.sample ADD PARTITION FIELD year(ts);
-- use optional AS keyword to specify a custom name for the partition field 
ALTER TABLE prod.db.sample ADD PARTITION FIELD bucket(16, id) AS shard;

注意添加分区字段是元数据操作,不会更改任何现有表数据。新数据将使用新分区写入,但现有数据将保留在旧分区布局中。对于元数据表中的新分区字段,旧数据文件将具有空值。

当表的分区发生变化时,动态分区覆盖行为也会发生变化,因为动态覆盖会隐式地替换分区。要显式覆盖,请使用新的DataFrameWriterV2 API。

重要
在这里插入图片描述

当你想要改变数据的分区粒度时,比如从每天的数据分区细化到每小时的数据分区,你可以利用transforms(转换)来实现这一点,而不需要删除原有的按天分区的字段。这么做的好处就是,历史的任务可能很多都是通过天粒度进行查看的,后面任务才会用小时查看,因此天分区不要删除了。这个很有用。

危险
在这里插入图片描述

当分区发生变化时,动态分区覆盖行为将发生变化。例如,如果你按天划分分区并改为按小时划分分区,覆盖将覆盖每小时分区,而不再覆盖按天分区。

  1. 删除分区(Spark3,需要配置扩展,ALTER TABLE … DROP PARTITION FIELD)
ALTER TABLE prod.db.sample DROP PARTITION FIELD catalog;
ALTER TABLE prod.db.sample DROP PARTITION FIELD bucket(16, id);
ALTER TABLE prod.db.sample DROP PARTITION FIELD truncate(4, data);
ALTER TABLE prod.db.sample DROP PARTITION FIELD year(ts);
ALTER TABLE prod.db.sample DROP PARTITION FIELD shard;

注意,尽管删除了分区,但列仍然存在于表结构中。
删除分区字段是元数据操作,不会改变任何现有的表数据。新数据将被写入新的分区,但现有数据将保留在旧的分区布局中。

当分区发生变化时,动态分区覆盖行为也会发生变化。例如,如果按天划分分区,而改为按小时划分分区,那么覆盖将覆盖每小时划分的分区,而不再覆盖按天划分的分区。

删除分区字段时要小心,可能导致元数据查询失败或产生不同的结果。

  1. 修改分区(Spark3,需要配置扩展,ALTER TABLE … REPLACE PARTITION FIELD)
ALTER TABLE prod.db.sample REPLACE PARTITION FIELD ts_day WITH day(ts);
-- use optional AS keyword to specify a custom name for the new partition field 
ALTER TABLE prod.db.sample REPLACE PARTITION FIELD ts_day WITH day(ts) AS day_of_ts;
  1. 修改表的写入顺序(ALTER TABLE … WRITE ORDERED BY)
ALTER TABLE prod.db.sample WRITE ORDERED BY category, id
-- use optional ASC/DEC keyword to specify sort order of each field (default ASC)
ALTER TABLE prod.db.sample WRITE ORDERED BY category ASC, id DESC
-- use optional NULLS FIRST/NULLS LAST keyword to specify null order of each field (default FIRST)
ALTER TABLE prod.db.sample WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST

表写顺序不能保证查询的数据顺序。它只影响数据写入表的方式。
WRITE ORDERED BY 设置了一个全局排序,即跨任务的行排序,就像在 INSERT 命令中使用 ORDER BY 一样

INSERT INTO prod.db.sample
SELECT id, data, category, ts FROM another_table
ORDER BY ts, category

要在每个任务内排序,而不是跨任务排序,使用 local ORDERED BY:

ALTER TABLE prod.db.sample WRITE LOCALLY ORDERED BY category, id
  1. 按分区并行写入(ALTER TABLE … WRITE DISTRIBUTED BY PARTITION)
ALTER TABLE prod.db.sample WRITE DISTRIBUTED BY PARTITION
ALTER TABLE prod.db.sample WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id

参考文献

Spark DDL

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • el-table使用type=“expand”根据数据条件隐藏展开按钮
  • Ceph集群维护相关操作
  • 图特征工程实践指南:从节点中心性到全局拓扑的多尺度特征提取
  • 《系统架构设计师教程(第2版)》第17章-通信系统架构设计理论与实践-02-广域网网络架构
  • 解决MongoDB创建用户报错command createUser requires authentication
  • 设计模式-行为型模式-迭代器模式
  • 【秋招笔试】9.07美团秋招改编题(研发岗)
  • 【2024高教社杯国赛A题】数学建模国赛建模过程+完整代码论文全解全析
  • 纳米材料咋设计?蛋白质模块咋用?看这里就知道啦!
  • 数学建模_缺失值处理_拉格朗日、牛顿插值(全)
  • Android流式接口请求实践
  • 设计模式学习-责任链模式
  • JVM系列(十) -垃圾收集器介绍
  • 【阿里云】个人认证与公司认证
  • 目标检测-YOLOv10
  • Android 初级面试者拾遗(前台界面篇)之 Activity 和 Fragment
  • IDEA 插件开发入门教程
  • Linux编程学习笔记 | Linux IO学习[1] - 文件IO
  • Rancher如何对接Ceph-RBD块存储
  • webpack项目中使用grunt监听文件变动自动打包编译
  • WordPress 获取当前文章下的所有附件/获取指定ID文章的附件(图片、文件、视频)...
  • Zsh 开发指南(第十四篇 文件读写)
  • 给自己的博客网站加上酷炫的初音未来音乐游戏?
  • 精益 React 学习指南 (Lean React)- 1.5 React 与 DOM
  • 前端技术周刊 2019-01-14:客户端存储
  • 前端每日实战 2018 年 7 月份项目汇总(共 29 个项目)
  • 软件开发学习的5大技巧,你知道吗?
  • -- 数据结构 顺序表 --Java
  • 算法-插入排序
  • 推荐一个React的管理后台框架
  • 微信小程序实战练习(仿五洲到家微信版)
  • 我从编程教室毕业
  • 我的面试准备过程--容器(更新中)
  • Play Store发现SimBad恶意软件,1.5亿Android用户成受害者 ...
  • #vue3 实现前端下载excel文件模板功能
  • $.extend({},旧的,新的);合并对象,后面的覆盖前面的
  • (二)Pytorch快速搭建神经网络模型实现气温预测回归(代码+详细注解)
  • (一)项目实践-利用Appdesigner制作目标跟踪仿真软件
  • (转)c++ std::pair 与 std::make
  • (转)Google的Objective-C编码规范
  • (转)大道至简,职场上做人做事做管理
  • (转)母版页和相对路径
  • (轉貼) VS2005 快捷键 (初級) (.NET) (Visual Studio)
  • .360、.halo勒索病毒的最新威胁:如何恢复您的数据?
  • .babyk勒索病毒解析:恶意更新如何威胁您的数据安全
  • .bat批处理(七):PC端从手机内复制文件到本地
  • .Mobi域名介绍
  • .php结尾的域名,【php】php正则截取url中域名后的内容
  • .考试倒计时43天!来提分啦!
  • @Query中countQuery的介绍
  • [1127]图形打印 sdutOJ
  • [20171106]配置客户端连接注意.txt
  • [2018/11/18] Java数据结构(2) 简单排序 冒泡排序 选择排序 插入排序
  • [ACP云计算]组件介绍
  • [Android] Amazon 的 android 音视频开发文档