当前位置: 首页 > news >正文

FlinkSQL系列04-CDC连接器

目录

  • CDC
    • 使用场景
    • flinkSQL 对 cdc 的支持
  • mysql-cdc-connector

CDC

CDC,Change Data Capture,变更数据获取的简称,使用 CDC 我们可以从数据库中获取已提交的更改,并将这些更改发送到下游,供下游使用。这些变更可以包括 INSERT, DELETE, UPDATE 等。

使用场景

  • 使用 flink sql 进行数据同步,将数据从一个地方同步到其他地方,比如从 mysql 到 doris
  • 可以在源数据库上实时的物化一个聚合视图
  • 因为只是增量同步,所以可以实时的低延迟的同步数据

flinkSQL 对 cdc 的支持

在 flinksql 中,cdc 数据几乎等价于 changelog,核心就在对 record 的 rowkind(+I/-U/+U/-D)进行适配。
flinksql 中操作 cdc 数据,通过 cdc 连接器即可。flink 的 cdc connector,在核心包中是没有集成的, 需要额外引入依赖。

mysql-cdc-connector

示例:用 flink-mysql-cdc 连接器,映射源表,并进行查询计算写回mysql表

public static void main(String[] args) {

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
	env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint");

	StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

	// 建表
	tenv.executeSql("CREATE TABLE flink_score (" +
			"      id INT," +
			"      name string," +
			"      gender string," +
			"      score double," +
			"     PRIMARY KEY(id) NOT ENFORCED" +
			"     ) WITH (" +
			"     'connector' = 'mysql-cdc'," +
			"     'hostname' = 'hdp01'," +
			"     'port' = '3306'," +
			"     'username' = 'root'," +
			"     'password' = 'root'," +
			"     'database-name' = 'flinktest'," +
			"     'table-name' = 'score'" +
			")");

	// 查询
	tenv.executeSql("select * from flink_score")/*.print()*/;

	tenv.executeSql("select  gender,avg(score) as avg_score  from  flink_score group by gender")/*.print()*/;

	// 建一个目标表,用来存放查询结果:每种性别中,总分最高的前2个人
	tenv.executeSql(
			"create table flink_rank(" +
					"   gender    string," +
					"   name      string," +
					"   score_amt double," +
					"   rn        bigint," +
					"   primary key(gender,rn) not enforced" +
					") with (" +
					"  'connector' = 'jdbc'," +
					"  'url' = 'jdbc:mysql://hdp01:3306/flinktest'," +
					"  'table-name' = 'score_rank'," +
					"  'username' = 'root'," +
					"  'password' = 'root' " +
					")"
	);

	tenv.executeSql("insert into flink_rank" +
			"SELECT" +
			"  gender," +
			"  name," +
			"  score_amt," +
			"  rn" +
			"from(" +
			"SELECT" +
			"  gender," +
			"  name," +
			"  score_amt," +
			"  row_number() over(partition by gender order by score_amt desc) as rn" +
			"from" +
			"(" +
			"SELECT" +
			"gender," +
			"name," +
			"sum(score) as score_amt" +
			"from flink_score" +
			"group by gender,name" +
			") o1" +
			") o2" +
			"where rn <= 2");
}

相关文章:

  • 包-node.js中的第三方模块
  • vscode 个人实用插件(资源集合)
  • GTOT-Toolkit模板参考
  • [贪心]Min-Max Array Transformation Codeforces1721C
  • 猿创征文|【算法入门必刷】数据结构-栈(二)
  • 数据结构-压缩软件核心(利用哈夫曼树进行编码,对文件进行压缩与解压缩)
  • 月薪12.8K,零基础转行软件测试5月斩获3份过万offer,分享一些我的秘招~
  • 推荐一款新式开源的反向代理工具(FRP)
  • 复习一:基本概念和术语
  • Vue基础自学系列 | webpack中的插件
  • 稻盛和夫:让年轻人脱胎换骨的6条自我提升原则
  • HTML5新特性 day_02(8.8)
  • springboot2.0 配置ssl证书详解
  • 客群画像|解决分群与特征分类问题,试一下这个处理方法
  • 【cmake实战六】如何使用编译的库(动态库dll)——windows系统
  • C语言笔记(第一章:C语言编程)
  • ES学习笔记(10)--ES6中的函数和数组补漏
  • Java 23种设计模式 之单例模式 7种实现方式
  • Redis的resp协议
  • vuex 学习笔记 01
  • 动态魔术使用DBMS_SQL
  • 记一次删除Git记录中的大文件的过程
  • 简单实现一个textarea自适应高度
  • 深度学习入门:10门免费线上课程推荐
  • 手写双向链表LinkedList的几个常用功能
  • ​【原创】基于SSM的酒店预约管理系统(酒店管理系统毕业设计)
  • # 日期待t_最值得等的SUV奥迪Q9:空间比MPV还大,或搭4.0T,香
  • #NOIP 2014# day.1 生活大爆炸版 石头剪刀布
  • (java版)排序算法----【冒泡,选择,插入,希尔,快速排序,归并排序,基数排序】超详细~~
  • (Redis使用系列) SpirngBoot中关于Redis的值的各种方式的存储与取出 三
  • (Redis使用系列) Springboot 使用redis实现接口幂等性拦截 十一
  • (分享)一个图片添加水印的小demo的页面,可自定义样式
  • (附表设计)不是我吹!超级全面的权限系统设计方案面世了
  • (接口封装)
  • (六)Hibernate的二级缓存
  • (已解决)什么是vue导航守卫
  • (原創) 未来三学期想要修的课 (日記)
  • (转)ObjectiveC 深浅拷贝学习
  • ***汇编语言 实验16 编写包含多个功能子程序的中断例程
  • .net 微服务 服务保护 自动重试 Polly
  • .NET是什么
  • .Net语言中的StringBuilder:入门到精通
  • .sdf和.msp文件读取
  • ??myeclipse+tomcat
  • [APIO2015]巴厘岛的雕塑
  • [C++]类和对象【上篇】
  • [ERROR] ocp-server-ce-py_script_start_check-4.2.1 RuntimeError: ‘tenant_name‘
  • [IE技巧] 如何关闭Windows Server版IE的安全限制
  • [LeetCode] Max Points on a Line
  • [Lua实战]整理Lua中忽略的问题
  • [oeasy]python001_先跑起来_python_三大系统选择_windows_mac_linux
  • [one_demo_9]判断数组是否递增
  • [Oracle][Metadata]如何查找与某一个功能相关的数据字典名
  • [python]基于opencv实现的车道线检测
  • [QT] TCP协议演示