FlinkSQL系列04-CDC连接器
目录
- CDC
- 使用场景
- flinkSQL 对 cdc 的支持
- mysql-cdc-connector
CDC
CDC,Change Data Capture,变更数据获取的简称,使用 CDC 我们可以从数据库中获取已提交的更改,并将这些更改发送到下游,供下游使用。这些变更可以包括 INSERT, DELETE, UPDATE 等。
使用场景
- 使用 flink sql 进行数据同步,将数据从一个地方同步到其他地方,比如从 mysql 到 doris
- 可以在源数据库上实时的物化一个聚合视图
- 因为只是增量同步,所以可以实时的低延迟的同步数据
flinkSQL 对 cdc 的支持
在 flinksql 中,cdc 数据几乎等价于 changelog,核心就在对 record 的 rowkind(+I/-U/+U/-D)进行适配。
flinksql 中操作 cdc 数据,通过 cdc 连接器即可。flink 的 cdc connector,在核心包中是没有集成的, 需要额外引入依赖。
mysql-cdc-connector
示例:用 flink-mysql-cdc 连接器,映射源表,并进行查询计算写回mysql表
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint");
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 建表
tenv.executeSql("CREATE TABLE flink_score (" +
" id INT," +
" name string," +
" gender string," +
" score double," +
" PRIMARY KEY(id) NOT ENFORCED" +
" ) WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'hdp01'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = 'root'," +
" 'database-name' = 'flinktest'," +
" 'table-name' = 'score'" +
")");
// 查询
tenv.executeSql("select * from flink_score")/*.print()*/;
tenv.executeSql("select gender,avg(score) as avg_score from flink_score group by gender")/*.print()*/;
// 建一个目标表,用来存放查询结果:每种性别中,总分最高的前2个人
tenv.executeSql(
"create table flink_rank(" +
" gender string," +
" name string," +
" score_amt double," +
" rn bigint," +
" primary key(gender,rn) not enforced" +
") with (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://hdp01:3306/flinktest'," +
" 'table-name' = 'score_rank'," +
" 'username' = 'root'," +
" 'password' = 'root' " +
")"
);
tenv.executeSql("insert into flink_rank" +
"SELECT" +
" gender," +
" name," +
" score_amt," +
" rn" +
"from(" +
"SELECT" +
" gender," +
" name," +
" score_amt," +
" row_number() over(partition by gender order by score_amt desc) as rn" +
"from" +
"(" +
"SELECT" +
"gender," +
"name," +
"sum(score) as score_amt" +
"from flink_score" +
"group by gender,name" +
") o1" +
") o2" +
"where rn <= 2");
}