当前位置: 首页 > news >正文

RisingWave 1.9 发布!新增 Snowflake sink 连接器

我们非常高兴地宣布:RisingWave1.9 版本正式发布!此次我们带来了许多重要更新,例如:优化了许多上游和下游连接器、新增了 Snowflake sink 连接器、弃用此前的s3连接器,转为指定 AWS S3 source 连接器为s3_v2等。此外,本版本还新增了许多实用的 SQL 命令和函数,例如创建订阅功能等。一起来了解本次更新的主要亮点吧!

1. Recover 命令

我们自 1.9 版本开始支持RECOVER命令。该命令可以触发临时性的恢复,当存在较高延迟时可能需要使用。需要注意的是,只有超级用户才能使用RECOVER命令,以限制可以触发恢复的用户范围。此外,RisingWave 仅能从已提交的时间点进行恢复,且该命令不会等待恢复完成。执行此命令后,可以调整流速限制确保 RisingWave 持续稳定运行,并且任何DROPCANCEL命令都将立即生效。更多细节,请查看:

  • RECOVER命令

2. 配置全局参数

您可以使用ALTER SYSTEM命令在整个服务器上设置运行时参数或系统参数。使用此命令后,新参数值也将成为每个新会话的默认值。如果您喜欢使用不同的参数运行每个 RisingWave 会话,用此命令可以更轻松地设置每个会话。例如,以下 SQL 命令将运行时参数rw_enable_join_ordering设置为true

ALTER SYSTEM SET rw_enable_join_ordering TO true; 

您可以使用SHOW ALL命令查看所有运行时参数,如果想查看当前运行时某个特定运行时参数的值,可使用SHOW parameter_name命令。如果想查看所有系统参数及其当前值,则可使用SHOW PARAMETERS命令。更多细节,请查看:

  • ALTER SYSTEM 命令
  • 查看和配置运行时参数
  • 查看和配置系统参数

3. 增强时间连接

我们现在支持非仅追加模式的时间连接 (Non-append-only temporal joins), 意味着时间连接的外部或左侧不需要是一个仅追加模式的源或表。该功能在所有情况下使用相同的语法。此次改进可以让您更加灵活地连接数据。假设stream_source是一个非仅追加的源,而prod是一个表,则以下 SQL 命令使用了时间连接来连接两个表。

SELECT * 
FROM stream_source
JOIN prod FOR SYSTEM_TIME AS OF PROC_TIME()
ON source_id = prod_id;

更多细节,请查看:

  • 非仅追加模式的时间连接

4. 支持订阅和订阅游标

此版本更新后,您可以从表和物化视图创建订阅队列,并为订阅队列创建游标。这些新功能旨在使数据检索更加方便。请使用 CREATE SUBSCRIPTION命令创建订阅。您还可以删除、修改和显示现有的订阅。创建订阅时可以选择增量数据的保留时间以及订阅可以访问的时间。以下是 SQL 查询在物化视图上创建订阅,并将数据保留一天的示例。

CREATE SUBSCRIPTION sub1 FROM tbl1 WITH (retention = '1D' );

此外,您还可以修改订阅的名称、所有者、模式和并行性。您也可以为订阅创建游标,其功能是以更小的批次读取查询结果,一次检索几行,以避免内存溢出。要使用游标,需要先创建游标,然后从游标中提取结果。以下是一个示例,为上文创建的订阅创建游标。

DECLARE cursor1 SUBSCRIPTION CURSOR FOR sub1;

接下来,您可以从游标中提取结果。

FETCH NEXT FROM cursor1;----结果
col1 | col2 | col3 | op | rw_timestamp
----+----+----+----+-------------------1  |   2  |   3  |  1 |
(1 行)

col1col2col3是订阅的表中的列,op则是生成的特殊列。op的值可以是1234,分别对应于INSERTDELETEUPDATE_DELETEUPDATE_INSERTUPDATE语句在这里被转换为UPDATE_DELETEUPDATE_INSERT。请注意,每次从游标中提取时,只会返回表中的一行数据。要查看其他行的数据,必须再次调用FETCH命令。更多细节,请查看:

  • 订阅

5. 使用 Iceberg source 进行 Time travel

对于 Iceberg source,您现在可以使用 AS OF语法从特定时间段或版本中选择数据。RisingWave 目前仅支持从 Iceberg source 进行批量查询,拥有此功能后,随着时间推移跟踪数据集的变化会更容易。同时,从特定版本选择数据则使得数据可重现性增强,这对于调试非常重要:如果在数据中检测到错误,您可以轻松选择之前版本的数据。以下是选择历史版本的示例, 我们通过特定的快照 ID 中选择了 Iceberg source 的数据。

SELECT * FROM iceberg_source FOR system_version AS OF 1567123456789;

以下是选择时间段的示例, 我们从 Iceberg source 中选择了特定时间戳之后的数据。

SELECT * FROM iceberg_source FOR system_time AS OF '2006-09-21 00:00:00+00:00';

更多细节,请查看:

  • 从 Apache Iceberg 中摄取数据

6. PostgreSQL 和 MySQL 连接器的新配置

本版本新增了一系列配置选项,大多数用于 MySQL 和 PostgreSQL CDC source。这为您在创建 CDC 表时提供了更多的控制和灵活性,以满足独特的需求和用例。以下是其中一些新配置选项。

6.1 配置 SSL/TLS

对于 MySQL 和 PostgreSQL CDC source,现在可以使用ssl.mode参数设置 SSL/TLS 加密级别。配置 SSL/TLS 可确保传输的数据的完整性和机密性,保护敏感信息免受威胁和攻击。许多监管标准和最佳实践也使用 SSL/TLS,这使得它成为许多用户关心的重要功能。ssl.mode参数接受disabledpreferredrequired这三个值,默认值为 disabled

CREATE SOURCE pg_source WITH (connector = 'postgres-cdc',hostname = '127.0.0.1',port = '5432',username = 'user',password = 'password',database.name = 'mydb',slot.name = 'mydb_slot',ssl.mode = 'required'
);

6.2 配置快照

除了为 CDC 表配置快照之外,您现在还可以配置屏障间隔和批次大小。在与 MySQL 或 PostgreSQL 数据库建立连接时,您可以创建 CDC source 或 CDC 表。CDC source 连接到整个数据库,而 CDC 表连接到单个 MySQL 或 PostgreSQL 表。快照相关的参数选项仅适用于创建表时。
新参数snapshot.interval可以设置开始快照读取时的屏障间隔;snapshot.batch_size可以配置快照读取的批次大小。

6.3 配置超时

最后,我们还新增了一项运行时参数来更改 CDC source 的超时配置。某些 CDC source 建立连接可能需要消耗一些时间,造成这一点的原因有很多,比如大量的数据或复杂的表结构。默认情况下,超时设置为 30 秒,但这个时间在某些极端情况下可能不够。如果您的 CDC source 创建由于控制流中的错误而失败,可以尝试使用运行时参数 cdc_source_wait_streaming_start_timeout来增加超时阈值。

SET cdc_source_wait_streaming_start_timeout to 90;

更多细节,请查看:

  • 从 MySQL CDC 中摄取数据
  • 从 PostgreSQL CDC 中摄取数据

7. 支持更多 Sink 连接器

本版本还新增了一项 Sink 连接器,并对现有 Sink 连接器进行增强。Sink 连接器在数据流水线中扮演着关键角色,助力与各种下游系统的无缝集成。如果您对某个连接器感兴趣,请查看我们的集成页面,了解我们目前支持的内容,并投票选择您想要的连接器。我们一直致力于改进数据集成功能,让 RisingWave 能够轻易与任何流处理管道集成。

7.1 支持 Snowflake sink 连接器

Snowflake 是一种云原生、多功能的数据仓库平台,具有极强的可扩展性。它提供了全面的托管服务,可以安全地存储和分析大量数据。由于 Snowflake 仅支持从外部存储中接收数据,因此数据将在传输到 Snowflake 之前被上传到外部存储系统。目前,RisingWave 仅支持 Amazon S3 作为外部存储。在 RisingWave 中创建 Snowflake sink 非常简单。如果您已经设置了 Amazon S3 bucket,只需使用CREATE SINKSQL 语句即可完成。

CREATE SINK snowflake_sink FROM mv WITH (connector = 'snowflake',type = 'append-only',snowflake.database = 'db',snowflake.schema = 'schema',snowflake.pipe = 'pipe',snowflake.account_identifier = '<ORG_NAME>-<ACCOUNT_NAME>',snowflake.user = 'user',snowflake.rsa_public_key_fp = 'fp',snowflake.private_key = 'pk',snowflake.s3_bucket = 's3_bucket',snowflake.aws_access_key_id = 'aws_id',snowflake.aws_secret_access_key = 'secret_key',snowflake.aws_region = 'region',snowflake.max_batch_row_num = '1030',snowflake.s3_path = 's3_path',
);

7.2 支持 BigQuery sink 的 upsert 类型

此前我们只支持创建append-only类型的 BigQuery sink,这意味着只有INSERT操作会传递到 BigQuery 表中。现在,1.9 版本新增支持upsert类型的 Sink,允许传递UPDATEDELETE操作。创建upsertSink 时,需要在 BigQuery 中进行额外的配置,并且必须设置相应的权限和主键。

CREATE SINK big_query_sink
FROM mv
WITH (connector = 'bigquery',type = 'upsert',bigquery.s3.path= '${s3_service_account_json_path}',bigquery.project= '${project_id}',bigquery.dataset= '${dataset_id}',bigquery.table= '${table_id}',access_key = '${aws_access_key}',secret_access = '${aws_secret_access}',region = '${aws_region}',
);

7.3 Delta Lake sink 支持 GCS

您现在可以使用 Google Cloud Storage 作为 Delta Lake 文件的存储位置来创建 Delta Lake sink。此前我们只支持 AWS S3 和 MinIO 对象存储。Google Cloud Storage 被普遍用于数据存储,此功能会使 Delta Lake sink 连接器更加便捷。创建 Delta Lake sink 时若使用 GCS,请使用匹配参数的CREATE SINK命令。

CREATE SINK s1_sink FROM s1_source
WITH (connector = 'deltalake',type = 'append-only',location = 'gs://bucket-name/path/to/file',gcs.service.account = '{type: service_acct,project_id: id,private_key:key,client_email: email}'
);

更多信息,请查看:

  • 将数据从 RisingWave 导出到 Snowflake
  • 将数据从 RisingWave 导出到 BigQuery
  • 将数据从 RisingWave 导出到 Delta Lake

8. 总结

以上只是 RisingWave 1.9 版本新增的部分功能,如果您想了解本次更新的完整列表,请查看更详细的发布说明。

9. 关于 RisingWave

RisingWave 是一款开源的分布式流处理数据库,旨在帮助用户降低实时应用的开发成本。RisingWave 采用存算分离架构,提供 Postgres-style 使用体验,具备比 Flink 高出 10 倍的性能以及更低的成本。
👨‍🔬加入 RW 社区,欢迎关注公众号:RisingWave 中文开源社区

🧑‍💻快速上手 RisingWave,欢迎体验入门教程:github.com/risingwave

💻深入使用 RisingWave,欢迎阅读用户文档:zh-cn.risingwave.com/docs

🔍更多常见问题及答案,欢迎搜索留言: risingwavelabs/discussions

相关文章:

  • 【安全函数】常用的安全函数的使用
  • WPF学习(2)--类与类的继承2-在窗口的实现
  • opencv 打开图片后,cv::mat存入共享内存的代码,如何设置队列?
  • C 语言通用MySQL 功能增删查改功能.
  • [Mysql] 的基础知识和sql 语句.教你速成(上)——逻辑清晰,涵盖完整
  • SwaggerSpy:一款针对SwaggerHub的自动化OSINT安全工具
  • NetSuite Inventory Transfer Export Saved Search
  • 安装TensorFlow2.12.0
  • Electron+Vue开源软件:洛雪音乐助手V2.8畅享海量免费歌曲
  • 【Python】从0开始的Django基础
  • 轻量级二维码生成器
  • 银河麒麟4.0.2安装带有opengl的Qt5.12.9
  • 解锁Flutter中的ProcessResult:让外部命令执行变得轻松
  • 机器人----控制方式
  • 深度学习 --- stanford cs231学习笔记五(训练神经网络的几个重要组成部分 二)
  • 【跃迁之路】【735天】程序员高效学习方法论探索系列(实验阶段492-2019.2.25)...
  • flask接收请求并推入栈
  • Intervention/image 图片处理扩展包的安装和使用
  • Java 最常见的 200+ 面试题:面试必备
  • JavaScript 一些 DOM 的知识点
  • Javascript设计模式学习之Observer(观察者)模式
  • Python3爬取英雄联盟英雄皮肤大图
  • Quartz实现数据同步 | 从0开始构建SpringCloud微服务(3)
  • Solarized Scheme
  • vue2.0一起在懵逼的海洋里越陷越深(四)
  • WordPress 获取当前文章下的所有附件/获取指定ID文章的附件(图片、文件、视频)...
  • 更好理解的面向对象的Javascript 1 —— 动态类型和多态
  • 猫头鹰的深夜翻译:JDK9 NotNullOrElse方法
  • 名企6年Java程序员的工作总结,写给在迷茫中的你!
  • 浅谈Kotlin实战篇之自定义View图片圆角简单应用(一)
  • 如何选择开源的机器学习框架?
  • 提醒我喝水chrome插件开发指南
  • Hibernate主键生成策略及选择
  • # 数论-逆元
  • #define 用法
  • (9)YOLO-Pose:使用对象关键点相似性损失增强多人姿态估计的增强版YOLO
  • (delphi11最新学习资料) Object Pascal 学习笔记---第2章第五节(日期和时间)
  • (LeetCode 49)Anagrams
  • (动态规划)5. 最长回文子串 java解决
  • (附源码)ssm码农论坛 毕业设计 231126
  • (紀錄)[ASP.NET MVC][jQuery]-2 純手工打造屬於自己的 jQuery GridView (含完整程式碼下載)...
  • (十七)Flink 容错机制
  • (算法)硬币问题
  • (学习日记)2024.04.10:UCOSIII第三十八节:事件实验
  • *p++,*(p++),*++p,(*p)++区别?
  • .net 8 发布了,试下微软最近强推的MAUI
  • .NET Core中的去虚
  • .NET LINQ 通常分 Syntax Query 和Syntax Method
  • .net mvc actionresult 返回字符串_.NET架构师知识普及
  • .NET/C# 如何获取当前进程的 CPU 和内存占用?如何获取全局 CPU 和内存占用?
  • .NET单元测试
  • .net和php怎么连接,php和apache之间如何连接
  • .NET微信公众号开发-2.0创建自定义菜单
  • .skip() 和 .only() 的使用
  • ??eclipse的安装配置问题!??