当前位置: 首页 > news >正文

详解 Flink CDC 的介绍和入门案例

一、Flink CDC 简介

1. CDC 介绍

​ CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

2. CDC 种类

基于查询的 CDC基于 Binlog 的 CDC
开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
执行模式BatchStreaming
是否可以捕获所有数据变化
延迟性高延迟低延迟
是否增加数据库压力

3. Flink CDC 介绍

​ Flink CDC 是一个内置了 Debezium 的基于 Binlog 的可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。开源地址:https://github.com/ververica/flink-cdc-connectors

二、Flink CDC 案例实操

1. DataStream 实现

1.1 导入依赖
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency>
</dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>
1.2 编写程序代码
public class FlinkCDC {public static void main(String[] args) throws Exception {//1. 创建 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序//1.1 开启 Checkpoint,每隔 5 秒钟做一次 CKenv.enableCheckpointing(5000L);//1.2 指定 CK 的一致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//1.3 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//1.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//1.5 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));//1.6 设置访问 HDFS 的用户名System.setProperty("HADOOP_USER_NAME", "lgb");//2. 创建 FlinkCDC Source/*StartupOptions 有 5 种类型:1. initial:默认,先使用查询的方式读取表中所有的数据,然后再从 binlog 的最近位置监控读取2. earliest:从 binlog 最开始的位置读取,要求在数据库创建之前就开启了 binlog3. latest:从 binlog 的最近位置监控读取4. specificOffset:从 binlog 的指定位置读取5. timestamp:从 binlog 的指定时间戳读取*/DebeziumSourceFunction<String> mysqlSource = MysqlSource.<String>builder().hostname("hadoop102") //Mysql所在主机名.port(3306) //mysql端口号.username("root") //登录mysql用户名.password("123456") //登录mysql密码.databaseList("cdc_test") //监控的数据库列表,可变参数.tableList("cdc_test.user_info") //监控的数据表,不指定则监控数据库下所有表.deserializer(new StringDebeziumDeserializationSchema()) //反序列化器.startupOptions(StartupOptions.initial()) //指定读取策略.build();//3. 通过 FlinkCDC Source 创建 DataStreamDataStream<String> dataStream = env.addSource(mysqlSource);//4. 打印输出流dataStream.print();//5. 启动任务env.execute("FlinkCDC");}
}
1.3 测试
1.3.1 本地测试
  • 开启 MySQL Binlog 并重启 MySQL
  • 在 Mysql 中创建对应的数据库和数据表并插入一条数据
  • 启动 FlinkCDC 程序,查看控制台结果,可以看到通过查询的方式获取到了数据表里的所有数据
  • 在数据表中进行增删改操作,查看程序控制台输出结果
1.3.2 集群测试
  • 将 FlinkCDC 程序进行打包并上传到集群

  • 启动 Hadoop、zookeeper 和 Flink 集群

  • 运行 FlinkCDC 程序

    bin/flink run -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
    
  • 给当前的 Flink 程序创建 Savepoint

    bin/flink savepoint [JobId] hdfs://hadoop102:8020/flink/save
  • 停止 FlinkCDC 程序

  • 在Mysql数据表中进行增删改操作

  • 从 Savepoint 重启程序查看程序输出结果

    bin/flink run -s hdfs://hadoop102:8020/flink/save/[JobId] -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
    

2. Flink SQL 实现

2.0.0 版本的 FlinkCDC 通过 FlinkSQL 实现需要 1.13+ 版本的 Flink 支持

public class FlinkSQLCDC {public static void main(String[] args) throws Exception {//1. 创建 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2. 创建 FlinkSQL 表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//3. 配置 FlinkSQLCDC 监控单表(只能监控单表),不需要指定反序列化器,读取模式只有 initial 和 latest-offsettableEnv.executeSql("create table user_info (" +"id String primary key, name String, sex String) with (" +" 'connector' = 'mysql-cdc'," +" 'scan.startup.mode' = 'initial'," +" 'hostname' = 'hadoop102'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '123456'," +" 'database-name' = 'cdc_test'," +" 'table-name' = 'user_info'" +")");//4. 查询输出表中数据Table table = tableEnv.sqlQuery("select * from user_info");DataStream<Tuple2<Boolean, Row>> dataStream = tableEnv.toRetractStream(table, Row.class);dataStream.print();//5. 启动任务env.execute("FlinkSqlCDC");}
}

3. 自定义反序列化器

规范化数据输出格式,方便后续解析

/**自定义反序列化器:实现 DebeziumDeserializationSchema<T> 接口并实现 deserialize 和 getProducedType 方法 
*/
public class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {/*想要展示的数据格式:{"dbName":"","tableName":"","before":{"field1":"value1",...},"after":{"field1":"value1",...},"op":""}*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {JSONObject result = new JSONObject();//1.获取库名和表名String topic = sourceRecord.topic();String[] fields = topic.split("\\.");//2. 获取 before 数据Struct value = (Struct) sourceRecord.value();Struct before = value.getStruct("before");JSONObject beforeJSON = new JSONObject();if(before != null) {Schema schema = before.schema();List<Field> fields = schema.fields();for(Field field : fields) {beforeJSON.put(field.name(), before.get(field));}}//3. 获取 after 数据Struct after = value.getStruct("after");JSONObject afterJSON = new JSONObject();if(after != null) {Schema schema = after.schema();List<Field> fields = schema.fields();for(Field field : fields) {afterJSON.put(field.name(), after.get(field));}}//4. 获取操作类型 READ DELETE UPDATE CREATEEnvelope.Operation operation = Envelope.operationFor(sourceRecord);result.put("dbName", fields[1]);result.put("tableName", fields[2]);result.put("before", beforeJSON);result.put("after", afterJSON);result.put("op", operation);collcetor.collect(result.toJSONString());}@Overridepublic TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);}	
}

相关文章:

  • 微服务与分布式面试题
  • 高效文件传输攻略:利用局域网共享实现极速数据同步
  • 【计算机视觉】人脸算法之图像处理基础知识(二)
  • Allegro X PCB设计小诀窍--如何在Allegro X中为PCB标注尺寸
  • redis序列化
  • 掌握机器学习基础:Scikit-Learn(sklearn)入门指南
  • 05-5.1.3 树的性质
  • 用 C 语言实现求补码的运算
  • 基于单片机的数字频率计的设计和仿真
  • 使用宝塔面板 将vue+node+mysql部署至云服务器
  • 前端 JS 经典:package.json 属性详解
  • 安装前端依赖node-sass报错
  • 【LLM之RAG】Adaptive-RAG论文阅读笔记
  • C语言 | Leetcode C语言题解之第151题反转字符串中的单词
  • 学习java第一百天
  • CSS 提示工具(Tooltip)
  • CSS进阶篇--用CSS开启硬件加速来提高网站性能
  • es6(二):字符串的扩展
  • GDB 调试 Mysql 实战(三)优先队列排序算法中的行记录长度统计是怎么来的(上)...
  • Git学习与使用心得(1)—— 初始化
  • gops —— Go 程序诊断分析工具
  • nodejs调试方法
  • oldjun 检测网站的经验
  • PAT A1017 优先队列
  • Python socket服务器端、客户端传送信息
  • Shadow DOM 内部构造及如何构建独立组件
  • Spring-boot 启动时碰到的错误
  • 电商搜索引擎的架构设计和性能优化
  • 给第三方使用接口的 URL 签名实现
  • 构建工具 - 收藏集 - 掘金
  • 基于组件的设计工作流与界面抽象
  • 记一次删除Git记录中的大文件的过程
  • 深入 Nginx 之配置篇
  • 事件委托的小应用
  • 通过git安装npm私有模块
  • 移动互联网+智能运营体系搭建=你家有金矿啊!
  • 智能情侣枕Pillow Talk,倾听彼此的心跳
  • ​如何防止网络攻击?
  • # .NET Framework中使用命名管道进行进程间通信
  • # 深度解析 Socket 与 WebSocket:原理、区别与应用
  • #define
  • (补)B+树一些思想
  • (读书笔记)Javascript高级程序设计---ECMAScript基础
  • (分布式缓存)Redis持久化
  • (附源码)springboot人体健康检测微信小程序 毕业设计 012142
  • (六)DockerCompose安装与配置
  • (六)vue-router+UI组件库
  • (三) diretfbrc详解
  • (四)七种元启发算法(DBO、LO、SWO、COA、LSO、KOA、GRO)求解无人机路径规划MATLAB
  • (一)80c52学习之旅-起始篇
  • (转)http协议
  • (转)树状数组
  • .NET Compact Framework 多线程环境下的UI异步刷新
  • .net 简单实现MD5
  • .NET 中 GetHashCode 的哈希值有多大概率会相同(哈希碰撞)