当前位置: 首页 > news >正文

Flink中Table Api和SQL(二)

目录

11.2.5 输出表

11.2.6 表和流的转换

 11.3 流处理中的表

11.3.1 动态表和持续查询

 11.3.2 将流转换成动态表

 11.3.2 用 SQL 持续查询

11.3.3 将动态表转换为流


11.2.5 输出表

表的创建和查询,就对应着流处理中的读取数据源(Source)和转换(Transform);而最 后一个步骤 Sink,也就是将结果数据输出到外部系统,就对应着表的输出操作。在代码上,输出一张表最直接的方法,就是调用 Table 的方法 executeInsert()方法将一个 Table 写入到注册过的表中,方法传入的参数就是注册的表名。

// 注册表,用于输出数据到外部系统 
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )"); 
 
// 经过查询转换,得到结果表 
Table result = ... 
 
// 将结果表写入已注册的输出表中 
result.executeInsert("OutputTable"); 

在底层,表的输出是通过将数据写入到 TableSink 来实现的。TableSink 是 Table API 中提 供的一个向外部系统写入数据的通用接口,可以支持不同的文件格式(比如 CSV、Parquet)、 存储数据库(比如 JDBC、HBase、Elasticsearch)和消息队列(比如 Kafka)。它有些类似于DataStream API 中调用 addSink()方法时传入的 SinkFunction,有不同的连接器对它进行了实现。

11.2.6 表和流的转换

从创建表环境开始,历经表的创建、查询转换和输出,我们已经可以使用 Table API 和 SQL进行完整的流处理了。不过在应用的开发过程中,我们测试业务逻辑一般不会直接将结果直接 写入到外部系统,而是在本地控制台打印输出。对于 DataStream 这非常容易,直接调用 print()方法就可以看到结果数据流的内容了;但对于 Table 就比较悲剧——它没有提供 print()方法。 这该怎么办呢?

在 Flink 中我们可以将 Table 再转换成 DataStream,然后进行打印输出。这就涉及了表和流的转换。

1. 将表(Table)转换成流(DataStream)

(1)调用 toDataStream()方法

将一个 Table 对象转换成 DataStream 非常简单,只要直接调用表环境的方法 toDataStream()就可以了。例如,我们可以将 11.2.4 小节经查询转换得到的表 maryClickTable 转换成流打印输 出,这代表了“Mary 点击的 url 列表”:

Table aliceVisitTable = tableEnv.sqlQuery( 
 "SELECT user, url " + 
 "FROM EventTable " + 
 "WHERE user = 'Alice' " 
 ); 
 
// 将表转换成数据流 
tableEnv.toDataStream(aliceVisitTable).print();

这里需要将要转换的 Table 对象作为参数传入。

(2)调用 toChangelogStream()方法

将 maryClickTable 转换成流打印输出是很简单的;然而,如果我们同样希望将“用户点击 次数统计”表 urlCountTable 进行打印输出,就会抛出一个 TableException 异常:

Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 
'default_catalog.default_database.Unregistered_DataStream_Sink_1' doesn't 
support consuming update changes ... 

这表示当前的 TableSink 并不支持表的更新(update)操作。这是什么意思呢?

因为 print 本身也可以看作一个 Sink 操作,所以这个异常就是说打印输出的 Sink 操作不支持对数据进行更新。具体来说,urlCountTable 这个表中进行了分组聚合统计,所以表中的每一行是会“更新”的。也就是说,Alice 的第一个点击事件到来,表中会有一行(Alice, 1); 第二个点击事件到来,这一行就要更新为(Alice, 2)。但之前的(Alice, 1)已经打印输出了,“覆水难收”,我们怎么能对它进行更改呢?所以就会抛出异常。

解决的思路是,对于这样有更新操作的表,我们不要试图直接把它转换成 DataStream 打印输出,而是记录一下它的“更新日志”(change log)。这样一来,对于表的所有更新操作, 就变成了一条更新日志的流,我们就可以转换成流打印输出了

代码中需要调用的是表环境的 toChangelogStream()方法:

Table urlCountTable = tableEnv.sqlQuery( 
 "SELECT user, COUNT(url) " + 
 "FROM EventTable " + 
 "GROUP BY user " 
 ); 
 
// 将表转换成更新日志流 
tableEnv.toDataStream(urlCountTable).print(); 

与“更新日志流”(Changelog Streams)对应的,是那些只做了简单转换、没有进行聚合统计的表,例如前面提到的 maryClickTable。它们的特点是数据只会插入、不会更新,所以也 被叫作“仅插入流”(Insert-Only Streams)。

2. 将流(DataStream)转换成表(Table)

(1)调用 fromDataStream()方法

想要将一个 DataStream 转换成表也很简单,可以通过调用表环境的 fromDataStream()方法 来实现,返回的就是一个 Table 对象。例如,我们可以直接将事件流 eventStream 转换成一个表:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
 
// 获取表环境 
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); 
 
// 读取数据源 
SingleOutputStreamOperator<Event> eventStream = env.addSource(...) 
 
// 将数据流转换成表 
Table eventTable = tableEnv.fromDataStream(eventStream); 

由于流中的数据本身就是定义好的 POJO 类型 Event,所以我们将流转换成表之后,每一 行数据就对应着一个 Event,而表中的列名就对应着 Event 中的属性。另外,我们还可以在 fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中 的字段名,并可以任意指定位置:

// 提取 Event 中的 timestamp 和 url 作为表中的列 
Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp"), $("url")); 

需要注意的是,timestamp 本身是 SQL 中的关键字,所以我们在定义表名、列名时要尽量 避免。这时可以通过表达式的 as()方法对字段进行重命名:

// 将 timestamp 字段重命名为 ts 
Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp").as("ts"),$("url")); 

(2)调用 createTemporaryView()方法

调用 fromDataStream()方法简单直观,可以直接实现 DataStream 到 Table 的转换;不过如 果我们希望直接在 SQL 中引用这张表,就还需要调用表环境的 createTemporaryView()方法来 创建虚拟视图了。

对于这种场景,也有一种更简洁的调用方式。我们可以直接调用 createTemporaryView()方法创建虚拟表,传入的两个参数,第一个依然是注册的表名,而第二个可以直接就是DataStream。之后仍旧可以传入多个参数,用来指定表中的字段

tableEnv.createTemporaryView("EventTable", eventStream, $("timestamp").as("ts"),$("url"));

这样,我们接下来就可以直接在 SQL 中引用表 EventTable 了。

(3)调用 fromChangelogStream ()方法

表环境还提供了一个方法 fromChangelogStream(),可以将一个更新日志流转换成表。这 个方法要求流中的数据类型只能是 Row,而且每一个数据都需要指定当前行的更新类型 (RowKind);所以一般是由连接器帮我们实现的,直接应用比较少见,感兴趣的读者可以查 看官网的文档说明。

3. 支持的数据类型

前面示例中的 DataStream,流中的数据类型都是定义好的 POJO 类。如果 DataStream 中 的类型是简单的基本类型,还可以直接转换成表吗?这就涉及了 Table 中支持的数据类型。

整体来看,DataStream 中支持的数据类型,Table 中也是都支持的,只不过在进行转换时 需要注意一些细节。

(1)原子类型

在 Flink 中,基础数据类型(Integer、Double、String)和通用数据类型(也就是不可再拆分的数据类型)统一称作“原子类型”。原子类型的 DataStream,转换之后就成了只有一列的Table,列字段(field)的数据类型可以由原子类型推断出。另外,还可以在 fromDataStream()方法里增加参数,用来重新命名列字段。

StreamTableEnvironment tableEnv = ...; 
 
DataStream<Long> stream = ...; 
 
// 将数据流转换成动态表,动态表只有一个字段,重命名为 myLong 
Table table = tableEnv.fromDataStream(stream, $("myLong")); 

(2)Tuple 类型

当原子类型不做重命名时,默认的字段名就是“f0”,容易想到,这其实就是将原子类型 看作了一元组 Tuple1 的处理结果。 Table 支持 Flink 中定义的元组类型 Tuple,对应在表中字段名默认就是元组中元素的属性 名 f0、f1、f2...。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段还可以通 过调用表达式的 as()方法来进行重命名。

StreamTableEnvironment tableEnv = ...; 
 
DataStream<Tuple2<Long, Integer>> stream = ...; 
 
// 将数据流转换成只包含 f1 字段的表 
Table table = tableEnv.fromDataStream(stream, $("f1")); 
 
// 将数据流转换成包含 f0 和 f1 字段的表,在表中 f0 和 f1 位置交换 
Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0")); 
 
// 将 f1 字段命名为 myInt,f0 命名为 myLong 
Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), 
$("f0").as("myLong")); 

(3)POJO 类型

Flink 也支持多种数据类型组合成的“复合类型”,最典型的就是简单 Java 对象(POJO类型)。由于 POJO 中已经定义好了可读性强的字段名,这种类型的数据流转换成 Table 就显得 无比顺畅了。

将 POJO 类型的 DataStream 转换成 Table,如果不指定字段名称,就会直接使用原始 POJO类型中的字段名称。POJO 中的字段同样可以被重新排序、提却和重命名,这在之前的例子中 已经有过体现。

StreamTableEnvironment tableEnv = ...; 
 
DataStream<Event> stream = ...; 
 
Table table = tableEnv.fromDataStream(stream); 
Table table = tableEnv.fromDataStream(stream, $("user")); 
Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"),$("url").as("myUrl")); 

(4)Row 类型

Flink 中还定义了一个在关系型表中更加通用的数据类型——行(Row),它是 Table 中数据的基本组织形式。Row 类型也是一种复合类型,它的长度固定,而且无法直接推断出每个字段的类型,所以在使用时必须指明具体的类型信息;我们在创建 Table 时调用的 CREATE

语句就会将所有的字段名称和类型指定,这在 Flink 中被称为表的“模式结构”(Schema)。除 此之外,Row 类型还附加了一个属性 RowKind,用来表示当前行在更新操作中的类型。这样,Row 就可以用来表示更新日志流(changelog stream)中的数据,从而架起了 Flink 中流和表的 转换桥梁。

所以在更新日志流中,元素的类型必须是 Row,而且需要调用 ofKind()方法来指定更新类型。

下面是一个具体的例子:

DataStream<Row> dataStream = 
 env.fromElements( 
 Row.ofKind(RowKind.INSERT, "Alice", 12), 
 Row.ofKind(RowKind.INSERT, "Bob", 5), 
 Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12), 
 Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100)); 
 
// 将更新日志流转换为表 
Table table = tableEnv.fromChangelogStream(dataStream); 

        用户 Alice 的点击 url 列表只需要一个简单的条件查询就可以得到,对应的表中只有插入 操作,所以我们可以直接调用 toDataStream()将它转换成数据流,然后打印输出。控制台输出 的结果如下:

这里每条数据前缀的+I 就是 RowKind,表示 INSERT(插入)。而由于统计点击次数时用到了分组聚合,造成结果表中数据会有更新操作,所以在打印输 出时需要将表 urlCountTable 转换成更新日志流(changelog stream)。控制台输出的结果如下:

count> +I[Alice, 1] 
count> +I[Bob, 1] 
count> -U[Alice, 1] 
count> +U[Alice, 2] 
count> +I[Cary, 1] 
count> -U[Bob, 1] 
count> +U[Bob, 2] 
count> -U[Alice, 2] 
count> +U[Alice, 3] 

这里数据的前缀出现了+I、-U 和+U 三种 RowKind,分别表示 INSERT(插入)、UPDATE_BEFORE(更新前)和 UPDATE_AFTER(更新后)。当收到每个用户的第一次点击 事件时,会在表中插入一条数据,例如+I[Alice, 1]、+I[Bob, 1]。而之后每当用户增加一次点击 事件,就会带来一次更新操作,更新日志流(changelog stream)中对应会出现两条数据,分 别表示之前数据的失效和新数据的生效;例如当 Alice 的第二条点击数据到来时,会出现一个

-U[Alice, 1]和一个+U[Alice, 2],表示 Alice 的点击个数从 1 变成了 2。

这种表示更新日志的方式,有点像是声明“撤回”了之前的一条数据再插入一条更新后的数据,所以也叫作“撤回流”(Retract Stream)。

 11.3 流处理中的表

 其实关系型表和 SQL,主要就是针对批处理设计的,这和流处理有着天生的 隔阂。既然“八字不合”,那 Flink 中的 Table API 和 SQL 又是怎样做流处理的呢?接下来我 们就来深入探讨一下流处理中表的概念。

11.3.1 动态表和持续查询

流处理面对的数据是连续不断的,这导致了流处理中的“表”跟我们熟悉的关系型数据库 中的表完全不同;而基于表执行的查询操作,也就有了新的含义。

如果我们希望把流数据转换成表的形式,那么这表中的数据就会不断增长;如果进一步基 于表执行 SQL 查询,那么得到的结果就不是一成不变的,而是会随着新数据的到来持续更新。

1. 动态表(Dynamic Tables)

当流中有新数据到来,初始的表中会插入一行;而基于这个表定义的 SQL 查询,就应该 在之前的基础上更新结果。这样得到的表就会不断地动态变化,被称为“动态表”(Dynamic Tables)。

动态表是Flink在Table API和SQL中的核心概念,它为流数据处理提供了表和SQL支持。 我们所熟悉的表一般用来做批处理,面向的是固定的数据集,可以认为是“静态表”;而动态表则完全不同,它里面的数据会随时间变化

其实动态表的概念,我们在传统的关系型数据库中已经有所接触。数据库中的表,其实是 一系列 INSERT、UPDATE 和 DELETE 语句执行的结果;在关系型数据库中,我们一般把它 称为更新日志流(changelog stream)。如果我们保存了表在某一时刻的快照(snapshot),那么 接下来只要读取更新日志流,就可以得到表之后的变化过程和最终结果了。在很多高级关系型数据库(比如 Oracle、DB2)中都有“物化视图”(Materialized Views)的概念,可以用来缓存 SQL 查询的结果;它的更新其实就是不停地处理更新日志流的过程。 Flink 中的动态表,就借鉴了物化视图的思想。

2. 持续查询(Continuous Query)

动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义 的 SQL 查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。

这样的查询就被称作“持续查询”(Continuous Query)。对动态表定义的查询操作,都是持续查询;而持续查询的结果也会是一个动态表。

由于每次数据到来都会触发查询操作,因此可以认为一次查询面对的数据集,就是当前输 入动态表中收到的所有数据。这相当于是对输入动态表做了一个“快照”(snapshot),当作有 限数据集进行批处理;流式数据的到来会触发连续不断的快照查询,像动画一样连贯起来,就 构成了“持续查询”。

持续查询的步骤如下:

(1)流(stream)被转换为动态表(dynamic table);

(2)对动态表进行持续查询(continuous query),生成新的动态表;

(3)生成的动态表被转换成流。

这样,只要 API 将流和动态表的转换封装起来,我们就可以直接在数据流上执行 SQL 查 询,用处理表的方式来做流处理了。

 11.3.2 将流转换成动态表

为了能够使用 SQL 来做流处理,我们必须先把流(stream)转换成动态表。当然,之前 在讲解基本 API 时,已经介绍过代码中的 DataStream 和 Table 如何转换;现在我们则要抛开 具体的数据类型,从原理上理解流和动态表的转换过程。

如果把流看作一张表,那么流中每个数据的到来,都应该看作是对表的一次插入(Insert) 操作,会在表的末尾添加一行数据。因为流是连续不断的,而且之前的输出结果无法改变、只 能在后面追加;所以我们其实是通过一个只有插入操作(insert-only)的更新日志(changelog) 流,来构建一个表。

为了更好地说明流转换成动态表的过程,我们还是用 11.2 节中举的例子来做分析说明。

我们现在的输入数据,就是用户在网站上的点击访问行为,数据类型被包装为 POJO 类型Event。我们将它转换成一个动态表,注册为 EventTable。表中的字段定义如下:

[ 
 user: VARCHAR, // 用户名 
 url: VARCHAR, // 用户访问的 URL 
 ts: BIGINT // 时间戳 
] 

当用户点击事件到来时,就对应着动态表中的一次插入(Insert)操作, 每条数据就是表中的一行;随着插入更多的点击事件,得到的动态表将不断增长。

 11.3.2 用 SQL 持续查询

1. 更新(Update)查询

我们在代码中定义了一个 SQL 查询。

Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user"); 

这个查询很简单,主要是分组聚合统计每个用户的点击次数。我们把原始的动态表注册为EventTable,经过查询转换后得到 urlCountTable;这个结果动态表中包含两个字段,具体定义 如下:

[ 
 user: VARCHAR, // 用户名 
 cnt: BIGINT // 用户访问 url 的次数 
]

当原始动态表不停地插入新的数据时,查询得到的 urlCountTable 会持续地进行更改。由于 count 数量可能会叠加增长,因此这里的更改操作可以是简单的插入(Insert), 也可以是对之前数据的更新(Update)。换句话说,用来定义结果表的更新日志(changelog) 流中,包含了 INSERT 和 UPDATE 两种操作。

这种持续查询被称为更新查询(Update Query), 更新查询得到的结果表如果想要转换成 DataStream,必须调用 toChangelogStream()方法

具体步骤解释如下:

(1)当查询启动时,原始动态表 EventTable 为空;

(2)当第一行 Alice 的点击数据插入 EventTable 表时,查询开始计算结果表,urlCountTable中插入一行数据[Alice,1]。

(3)当第二行 Bob 点击数据插入 EventTable 表时,查询将更新结果表并插入新行[Bob,1]。

(4)第三行数据到来,同样是 Alice 的点击事件,这时不会插入新行,而是生成一个针 对已有行的更新操作。这样,结果表中第一行[Alice,1]就更新为[Alice,2]。

(5)当第四行 Cary 的点击数据插入到 EventTable 表时,查询将第三行[Cary,1]插入到 结果表中。

 2. 追加(Append)查询

上面的例子中,查询过程用到了分组聚合,结果表中就会产生更新操作。如果我们执行一 个简单的条件查询,结果表中就会像原始表 EventTable 一样,只有插入(Insert)操作了。

Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable WHERE 
user = 'Cary'"); 

这样的持续查询,就被称为追加查询(Append Query),它定义的结果表的更新日志 (changelog)流中只有 INSERT 操作。追加查询得到的结果表,转换成 DataStream 调用方法 没有限制,可以直接用 toDataStream(),也可以像更新查询一样调用 toChangelogStream()。

这样看来,我们似乎可以总结一个规律:只要用到了聚合,在之前的结果上有叠加,就会产生更新操作,就是一个更新查询。但事实上,更新查询的判断标准是结果表中的数据是否会 有 UPDATE 操作,如果聚合的结果不再改变,那么同样也不是更新查询。

什么时候聚合的结果会保持不变呢?一个典型的例子就是窗口聚合。我们考虑开一个滚动窗口,统计每一小时内所有用户的点击次数,并在结果表中增加一个endT 字段,表示当前统计窗口的结束时间。这时结果表的字段定义如下:

[ 
 user: VARCHAR, // 用户名 
 endT: TIMESTAMP, // 窗口结束时间 
 cnt: BIGINT // 用户访问 url 的次数 
] 

与之前的分组聚合一样,当原始动态表不停地插入新的数据时,查询得到的结果 result 会持续地进行更改。比如时间戳在 12:00:00 到 12:59:59 之间的有四条数据,其 中 Alice 三次点击、Bob 一次点击;所以当水位线达到 13:00:00 时窗口关闭,输出到结果表中的就是新增两条数据[Alice, 13:00:00, 3]和[Bob, 13:00:00, 1]。同理,当下一小时的窗口关闭时, 也会将统计结果追加到 result 表后面,而不会更新之前的数据。

 所以我们发现,由于窗口的统计结果是一次性写入结果表的,所以结果表的更新日志流中 只会包含插入 INSERT 操作,而没有更新 UPDATE 操作。所以这里的持续查询,依然是一个 追加(Append)查询。结果表 result 如果转换成 DataStream,可以直接调用 toDataStream()方法。

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; 
import org.apache.flink.api.common.eventtime.WatermarkStrategy; 
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.table.api.Table; 
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;  
import static org.apache.flink.table.api.Expressions.$; 
 
public class AppendQueryExample { 
 public static void main(String[] args) throws Exception { 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
 env.setParallelism(1); 
 
// 读取数据源,并分配时间戳、生成水位线 
 SingleOutputStreamOperator<Event> eventStream = env 
 .fromElements( 
 new Event("Alice", "./home", 1000L), 
 new Event("Bob", "./cart", 1000L), 
 new Event("Alice", "./prod?id=1", 25 * 60 * 1000L), 
 new Event("Alice", "./prod?id=4", 55 * 60 * 1000L), 
 new Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L), 
 new Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L), 
 new Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L) 
 ) 
 .assignTimestampsAndWatermarks( 
 WatermarkStrategy.<Event>forMonotonousTimestamps() 
 .withTimestampAssigner(new 
SerializableTimestampAssigner<Event>() { 
 @Override 
 public long extractTimestamp(Event element, long 
recordTimestamp) { 
 return element.timestamp; 
 } 
 }) 
 ); 
 
// 创建表环境 
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); 
 
// 将数据流转换成表,并指定时间属性 
 Table eventTable = tableEnv.fromDataStream( 
 eventStream, 
 $("user"), 
 $("url"), 
 $("timestamp").rowtime().as("ts") 
// 将 timestamp 指定为事件时间,并命名为 ts 
 ); 
 
// 为方便在 SQL 中引用,在环境中注册表 EventTable 
 tableEnv.createTemporaryView("EventTable", eventTable); 
 
// 设置 1 小时滚动窗口,执行 SQL 统计查询 
 Table result = tableEnv 
 .sqlQuery( 
 "SELECT " + 
 "user, " + 
 "window_end AS endT, " + // 窗口结束时间 
 "COUNT(url) AS cnt " + // 统计 url 访问次数 
 "FROM TABLE( " + 
 "TUMBLE( TABLE EventTable, " + // 1 小时滚动窗口 
 "DESCRIPTOR(ts), " + 
 "INTERVAL '1' HOUR)) " + 
 "GROUP BY user, window_start, window_end " 
 ); 
 
 tableEnv.toDataStream(result).print(); 
 env.execute(); 
 } 
} 

运行结果如下:

+I[Alice, 1970-01-01T01:00, 3] 
+I[Bob, 1970-01-01T01:00, 1] 
+I[Cary, 1970-01-01T02:00, 2] 
+I[Bob, 1970-01-01T02:00, 1] 

可以看到,所有输出结果都以+I 为前缀,表示都是以 INSERT 操作追加到结果表中的; 这是一个追加查询,所以我们直接使用 toDataStream()转换成流是没有问题的。这里输出的window_end 是一个 TIMESTAMP 类型;由于我们直接以一个长整型数作为事件发生的时间戳, 所以可以看到对应的都是 1970 年1月1日的时间。

3. 查询限制

在实际应用中,有些持续查询会因为计算代价太高而受到限制。所谓的“代价太高”,可能 是由于需要维护的状态持续增长,也可能是由于更新数据的计算太复杂。

⚫ 状态大小

用持续查询做流处理,往往会运行至少几周到几个月;所以持续查询处理的数据总量可能非常大。例如我们之前举的更新查询的例子,需要记录每个用户访问 url 的次数。如果随着时间的推移用户数越来越大,那么要维护的状态也将逐渐增长,最终可能会耗尽存储空间导致查询失败

SELECT user, COUNT(url) 
FROM clicks 
GROUP BY user;

⚫ 更新计算

对于有些查询来说,更新计算的复杂度可能很高。每来一条新的数据,更新结果的时候可能需要全部重新计算,并且对很多已经输出的行进行更新。一个典型的例子就是 RANK()函数, 它会基于一组数据计算当前值的排名。例如下面的 SQL 查询,会根据用户最后一次点击的时 间为每个用户计算一个排名。当我们收到一个新的数据,用户的最后一次点击时间(lastAction) 就会更新,进而所有用户必须重新排序计算一个新的排名。当一个用户的排名发生改变时,被他超过的那些用户的排名也会改变;这样的更新操作无疑代价巨大,而且还会随着用户的增多 越来越严重。

SELECT user, RANK() OVER (ORDER BY lastAction) 
FROM ( 
 SELECT user, MAX(ts) AS lastAction FROM EventTable GROUP BY user 
);

这样的查询操作,就不太适合作为连续查询在流处理中执行。这里 RANK()的使用要配合 一个 OVER 子句,这是所谓的“开窗聚合”。

11.3.3 将动态表转换为流

与关系型数据库中的表一样,动态表也可以通过插入(Insert)、更新(Update)和删除(Delete) 操作,进行持续的更改。将动态表转换为流或将其写入外部系统时,就需要对这些更改操作进行编码,通过发送编码消息的方式告诉外部系统要执行的操作

在 Flink 中,Table API 和 SQL支持三种编码方式:

⚫ 仅追加(Append-only)流

仅通过插入(Insert)更改来修改的动态表,可以直接转换为“仅追加”流。这个流中发 出的数据,其实就是动态表中新增的每一行。

⚫ 撤回(Retract)流

撤回流是包含两类消息的流,添加(add)消息撤回(retract)消息

具体的编码规则是:INSERT 插入操作编码为 add 消息;DELETE 删除操作编码为 retract消息;而 UPDATE 更新操作则编码为被更改行的 retract 消息,和更新后行(新行)的 add 消 息。这样,我们可以通过编码后的消息指明所有的增删改操作,一个动态表就可以转换为撤回流了。

可以看到,更新操作对于撤回流来说,对应着两个消息:之前数据的撤回(删除)和新数据的插入。

如图所示,显示了将动态表转换为撤回流的过程。

这里我们用+代表 add 消息(对应插入 INSERT 操作),用-代表 retract 消息(对应删除DELETE 操作);当 Alice 的第一个点击事件到来时,结果表新增一条数据[Alice, 1];而当 Alice的第二个点击事件到来时,结果表会将[Alice, 1]更新为[Alice, 2],对应的编码就是删除[Alice, 1]、 插入[Alice, 2]。这样当一个外部系统收到这样的两条消息时,就知道是要对 Alice 的点击统计次数进行更新了。

⚫ 更新插入(Upsert)流

更新插入流中只包含两种类型的消息:更新插入(upsert)消息删除(delete)消息

所谓的“upsert”其实是“update”和“insert”的合成词,所以对于更新插入流来说,INSERT 插 入操作和UPDATE更新操作,统一被编码为upsert消息;而DELETE删除操作则被编码为delete消息。

既然更新插入流中不区分插入(insert)和更新(update),那我们自然会想到一个问题: 如果希望更新一行数据时,怎么保证最后做的操作不是插入呢?

这就需要动态表中必须有唯一的键(key)。通过这个 key 进行查询,如果存在对应的数据 就做更新(update),如果不存在就直接插入(insert)。这是一个动态表可以转换为更新插入流 的必要条件。当然,收到这条流中数据的外部系统,也需要知道这唯一的键(key),这样才能 正确地处理消息。

可以看到,更新插入流跟撤回流的主要区别在于,更新(update)操作由于有 key 的存在, 只需要用单条消息编码就可以,因此效率更高。

需要注意的是,在代码里将动态表转换为 DataStream 时,只支持仅追加(append-only) 和撤回(retract)流,我们调用 toChangelogStream()得到的其实就是撤回流;这也很好理解,DataStream 中并没有 key 的定义,所以只能通过两条消息一减一增来表示更新操作。而连接到 外部系统时,则可以支持不同的编码方法,这取决于外部系统本身的特性。

相关文章:

  • hook函数之useEffect的使用——自定义hook函数网络请求——
  • Windows 窗体显示的“模式方式”与“非模式方式”
  • JDBC详讲Connection与 jdbc-Statement
  • 外部 SRAM 实验
  • Redis从入门到精通(二)
  • 2021.09青少年软件编程(Python)等级考试试卷(四级)
  • JAVA计算机毕业设计毕业论文管理系统Mybatis+系统+数据库+调试部署
  • Redis实战 - 01 Redis 和 SpringSecurity Oauth2 实现认证授权中心
  • 数据结构:堆
  • 基于机器学习的搜索推荐系统
  • MATLAB | 分段赋色折线图及其图例绘制
  • C#面向对象程序设计课程实验三:实验名称:C#数组和集合
  • 数据结构--(栈、队列实现及3个OJ题)
  • 实时数据同步工具<Maxwell 操作案例>
  • 【设计模式】-创建型模式-第2章第3讲-【建造者模式】
  • __proto__ 和 prototype的关系
  • iOS仿今日头条、壁纸应用、筛选分类、三方微博、颜色填充等源码
  • Java 网络编程(2):UDP 的使用
  • java架构面试锦集:开源框架+并发+数据结构+大企必备面试题
  • KMP算法及优化
  • Linux gpio口使用方法
  • NLPIR语义挖掘平台推动行业大数据应用服务
  • Protobuf3语言指南
  • 编写符合Python风格的对象
  • 从地狱到天堂,Node 回调向 async/await 转变
  • 浮动相关
  • 机器学习学习笔记一
  • 力扣(LeetCode)21
  • 译米田引理
  • 译有关态射的一切
  • ​如何在iOS手机上查看应用日志
  • #NOIP 2014# day.2 T2 寻找道路
  • (1/2) 为了理解 UWP 的启动流程,我从零开始创建了一个 UWP 程序
  • (ZT)出版业改革:该死的死,该生的生
  • (二)pulsar安装在独立的docker中,python测试
  • (附源码)SSM环卫人员管理平台 计算机毕设36412
  • (附源码)ssm失物招领系统 毕业设计 182317
  • (接口封装)
  • (学习日记)2024.01.19
  • (轉貼) VS2005 快捷键 (初級) (.NET) (Visual Studio)
  • .libPaths()设置包加载目录
  • .naturalWidth 和naturalHeight属性,
  • .net core控制台应用程序初识
  • .Net Redis的秒杀Dome和异步执行
  • .NET 中使用 TaskCompletionSource 作为线程同步互斥或异步操作的事件
  • .NetCore 如何动态路由
  • .net遍历html中全部的中文,ASP.NET中遍历页面的所有button控件
  • .net流程开发平台的一些难点(1)
  • @Autowired和@Resource装配
  • [ 云计算 | AWS 实践 ] 基于 Amazon S3 协议搭建个人云存储服务
  • [AMQP Connection 127.0.0.1:5672] An unexpected connection driver error occured
  • [Android]Android P(9) WIFI学习笔记 - 扫描 (1)
  • [BZOJ] 3262: 陌上花开
  • [C/C++]数据结构----顺序表的实现(增删查改)
  • [echarts] y轴不显示0