当前位置: 首页 > news >正文

TiDB系列之:使用Flink TiDB CDC Connector采集数据

TiDB系列之:使用Flink TiDB CDC Connector采集数据

  • 一、依赖项
  • 二、Maven依赖
  • 三、SQL Client JAR
  • 四、如何创建 TiDB CDC 表
  • 五、连接器选项
  • 六、可用元数据
  • 七、特征
    • 一次性处理
    • 启动阅读位置
    • 多线程读取
    • DataStream Source
  • 八、数据类型映射

TiDB CDC 连接器允许从 TiDB 数据库读取快照数据和增量数据。本文档介绍如何设置 TiDB CDC 连接器以对 TiDB 数据库运行 SQL 查询。

  • TiDB系列之:使用TiCDC增量同步TiDB数据库数据
  • TiDB系列之:TiCDC同步数据到Kafka集群使用Debezium数据格式
  • TiDB系列之:TiCDC同步TiDB数据库数据到Kafka集群Topic

一、依赖项

为了设置 TiDB CDC 连接器,下表提供了使用构建自动化工具(例如 Maven 或 SBT)的项目和带有 SQL JAR 包的 SQL Client 的依赖信息。

二、Maven依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-tidb-cdc</artifactId><version>3.0.1</version>
</dependency>

三、SQL Client JAR

下载链接仅适用于稳定版本。

下载 flink-sql-connector-tidb-cdc-3.0.1.jar 并将其放在 <FLINK_HOME>/lib/ 下。

四、如何创建 TiDB CDC 表

TiDB CDC 表可以定义如下:

-- checkpoint every 3000 milliseconds                       
Flink SQL> SET 'execution.checkpointing.interval' = '3s';   -- register a TiDB table 'orders' in Flink SQL
Flink SQL> CREATE TABLE orders (order_id INT,order_date TIMESTAMP(3),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY(order_id) NOT ENFORCED) WITH ('connector' = 'tidb-cdc','tikv.grpc.timeout_in_ms' = '20000', 'pd-addresses' = 'localhost:2379','database-name' = 'mydb','table-name' = 'orders'
);-- read snapshot and binlogs from orders table
Flink SQL> SELECT * FROM orders;

五、连接器选项

参数是否必须默认值类型描述
connectorrequired(none)String指定使用什么连接器,这里应该是“tidb-cdc”。
database-namerequired(none)String要监控的 TiDB 服务器的数据库名称。
table-namerequired(none)String要监控的 TiDB 数据库的表名。
scan.startup.modeoptionalinitialStringTiDB CDC Consumer 可选的启动模式,有效枚举为“initial”和“latest-offset”。
pd-addressesrequired(none)StringTiKV 集群的 PD 地址。
tikv.grpc.timeout_in_msoptional(none)LongTiKV GRPC 超时(以毫秒为单位)。
tikv.grpc.scan_timeout_in_msoptional(none)LongTiKV GRPC 扫描超时(以毫秒为单位)。
tikv.batch_get_concurrencyoptional20IntegerTiKV GRPC 批量获取并发。
tikv.*optional(none)String传递 TiDB 客户端的属性。

六、可用元数据

以下格式元数据可以在表定义中公开为只读(虚拟)列。

keyDataType描述
table_nameSTRING NOT NULL包含该行的表的名称。
database_nameSTRING NOT NULL包含该行的数据库的名称。
op_tsTIMESTAMP_LTZ(3) NOT NULL它指示在数据库中进行更改的时间。
如果记录是从表的快照而不是binlog中读取的,则该值始终为0。

扩展的 CREATE TABLE 示例演示了公开这些元数据字段的语法:

CREATE TABLE products (db_name STRING METADATA FROM 'database_name' VIRTUAL,table_name STRING METADATA  FROM 'table_name' VIRTUAL,operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'tidb-cdc','tikv.grpc.timeout_in_ms' = '20000','pd-addresses' = 'localhost:2379','database-name' = 'mydb','table-name' = 'orders'
);

七、特征

一次性处理

TiDB CDC 连接器是一个 Flink Source 连接器,它会先读取数据库快照,然后继续读取更改事件,即使发生故障也只处理一次。

启动阅读位置

配置选项 scan.startup.mode 指定 TiDB CDC Consumer 的启动模式。有效的枚举是:

  • initial(默认):拍摄捕获表的结构和数据的快照;如果您想从捕获的表中获取数据的完整表示,则很有用。
  • latest-offset:仅对捕获的表的结构进行快照;如果只需要获取从现在开始发生的更改,则很有用。

多线程读取

TiDB CDC 源可以并行读取工作,因为有多个任务可以接收更改事件。

DataStream Source

TiDB CDC 连接器也可以是 DataStream 源。您可以创建一个 SourceFunction,如下所示:

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import org.apache.flink.cdc.connectors.tidb.TDBSourceOptions;
import org.apache.flink.cdc.connectors.tidb.TiDBSource;
import org.apache.flink.cdc.connectors.tidb.TiKVChangeEventDeserializationSchema;
import org.apache.flink.cdc.connectors.tidb.TiKVSnapshotEventDeserializationSchema;
import org.tikv.kvproto.Cdcpb;
import org.tikv.kvproto.Kvrpcpb;import java.util.HashMap;public class TiDBSourceExample {public static void main(String[] args) throws Exception {SourceFunction<String> tidbSource =TiDBSource.<String>builder().database("mydb") // set captured database.tableName("products") // set captured table.tiConf(TDBSourceOptions.getTiConfiguration("localhost:2399", new HashMap<>())).snapshotEventDeserializer(new TiKVSnapshotEventDeserializationSchema<String>() {@Overridepublic void deserialize(Kvrpcpb.KvPair record, Collector<String> out)throws Exception {out.collect(record.toString());}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}).changeEventDeserializer(new TiKVChangeEventDeserializationSchema<String>() {@Overridepublic void deserialize(Cdcpb.Event.Row record, Collector<String> out)throws Exception {out.collect(record.toString());}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);env.addSource(tidbSource).print().setParallelism(1);env.execute("Print TiDB Snapshot + Binlog");}
}

八、数据类型映射

TiDB typeFlink SQL typeNOTE
TINYINTTINYINT
SMALLINT、TINYINT UNSIGNEDSMALLINT
INT、MEDIUMINT、SMALLINT UNSIGNEDINT
BIGINT、INT UNSIGNEDBIGINT
BIGINT UNSIGNEDDECIMAL(20, 0)
FLOATFLOAT
REAL、DOUBLEDOUBLE
NUMERIC(p, s) DECIMAL(p, s) where p <= 38DECIMAL(p, s)
NUMERIC(p, s) DECIMAL(p, s) where 38 < p <= 65STRING在 TiDB 中 DECIMAL 数据类型的精度最高为 65,但在 Flink 中 DECIMAL 的精度限制为 38。因此,如果定义精度大于 38 的十进制列,则应将其映射到 STRING 以避免精度损失。
BOOLEAN、TINYINT(1)、BIT(1)BOOLEAN
DATEDATE
TIME [§]TIME [§]
TIMESTAMP [§]TIMESTAMP_LTZ [§]
DATETIME [§]TIMESTAMP [§]
CHAR(n)CHAR(n)
VARCHAR(n)VARCHAR(n)
BIT(n)BINARY(⌈n/8⌉)
BINARY(n)BINARY(n)
TINYTEXT、TEXT、MEDIUMTEXT、LONGTEXTSTRING
TINYBLOB、BLOB、MEDIUMBLOB、LONGBLOBBYTES目前,TiDB 中的 BLOB 数据类型仅支持长度不大于 2,147,483,647(2 ** 31 - 1) 的 Blob。
YEARINT
ENUMSTRING
JSONSTRINGJSON 数据类型在 Flink 中会被转换为 JSON 格式的 STRING。
SETARRAY由于 TiDB 中的 SET 数据类型是一个字符串对象,可以有零个或多个值,因此它应该始终映射到字符串数组

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Java中的Map(如果想知道Java中有关Map的知识点,那么只看这一篇就足够了!)
  • 【过题记录】 8.3
  • CTFHUB-SSRF-DNS重绑定 Bypass
  • [米联客-安路飞龙DR1-FPSOC] UDP通信篇连载-01 以太网协议介绍
  • ai web 1.0靶机漏洞渗透详解
  • 搭建个人的金融系统-----第一章,数据库设计
  • Arch Linux - 2-安装中文输入法
  • 解析 C# Dictionary 代码
  • Comfyui实例容器运行横向扩展
  • 【ROS 最简单教程 003/300】ROS 快速体验:Hello World
  • C# Where关键字
  • 数学建模--蒙特卡罗随机模拟
  • 嵌入式Linux系统中pinictrl框架基本实现
  • 数学建模--禁忌搜索
  • Kafka操作
  • 【译】JS基础算法脚本:字符串结尾
  • [ 一起学React系列 -- 8 ] React中的文件上传
  • java多线程
  • leetcode378. Kth Smallest Element in a Sorted Matrix
  • nginx 负载服务器优化
  • React as a UI Runtime(五、列表)
  • Redux 中间件分析
  • SpiderData 2019年2月23日 DApp数据排行榜
  • vue的全局变量和全局拦截请求器
  • Xmanager 远程桌面 CentOS 7
  • 不发不行!Netty集成文字图片聊天室外加TCP/IP软硬件通信
  • 类orAPI - 收藏集 - 掘金
  • 前端技术周刊 2018-12-10:前端自动化测试
  • 前端知识点整理(待续)
  • 全栈开发——Linux
  • 提醒我喝水chrome插件开发指南
  • 走向全栈之MongoDB的使用
  • ​RecSys 2022 | 面向人岗匹配的双向选择偏好建模
  • ​力扣解法汇总946-验证栈序列
  • ### Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLTr
  • #NOIP 2014# day.2 T2 寻找道路
  • (06)金属布线——为半导体注入生命的连接
  • (1)Android开发优化---------UI优化
  • (HAL库版)freeRTOS移植STMF103
  • (LLM) 很笨
  • (八)Spring源码解析:Spring MVC
  • (二)【Jmeter】专栏实战项目靶场drupal部署
  • (附源码)springboot青少年公共卫生教育平台 毕业设计 643214
  • (深入.Net平台的软件系统分层开发).第一章.上机练习.20170424
  • (一)基于IDEA的JAVA基础1
  • (转) RFS+AutoItLibrary测试web对话框
  • (转)【Hibernate总结系列】使用举例
  • (转)Sql Server 保留几位小数的两种做法
  • (转载)Google Chrome调试JS
  • ***监测系统的构建(chkrootkit )
  • .NET “底层”异步编程模式——异步编程模型(Asynchronous Programming Model,APM)...
  • .NET 5种线程安全集合
  • .net core + vue 搭建前后端分离的框架
  • .Net core 6.0 升8.0
  • .net core 管理用户机密