当前位置: 首页 > news >正文

Fink CDC数据同步(四)Mysql数据同步到Kafka

依赖项

将下列依赖包放在flink/lib

flink-sql-connector-kafka-1.16.2

创建映射表

创建MySQL映射表

CREATE TABLE if not exists mysql_user (id     int,name   STRING,birth  STRING,gender    STRING,PRIMARY KEY (`id`) NOT ENFORCED
) WITH ('connector'= 'mysql-cdc','hostname'= '192.168.0.1','port'= '3306','username'= 'user','password'='password','server-time-zone'= 'Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'= 'bigdata','table-name'= 'user'
); select * from mysql_user;

创建upsert-kafka 表

CREATE TABLE kafka_user_upsert(id     int,name   string,birth  string,gender    string,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'flink-cdc-user','properties.bootstrap.servers' = '192.168.0.4:6668','properties.group.id' = 'flink-cdc-kafka-group','key.format' = 'json','value.format' = 'json'
);

这里指定的Kafka topic会自动创建,也可以预先自行创建

生成作业

insert into kafka_user_upsert select * from mysql_user;select * from kafka_user_upsert;


 系列文章

Fink CDC数据同步(一)环境部署icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017355?spm=1001.2014.3001.5502
Fink CDC数据同步(二)MySQL数据同步icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017472?spm=1001.2014.3001.5501
Fink CDC数据同步(三)Flink集成Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017571?spm=1001.2014.3001.5501
Fink CDC数据同步(四)Mysql数据同步到Kafkaicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023747?spm=1001.2014.3001.5501
Fink CDC数据同步(五)Kafka数据同步Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023837?spm=1001.2014.3001.5501

Fink CDC数据同步(六)数据入湖Hudiicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023939?spm=1001.2014.3001.5502

相关文章:

  • python+flask+django农产品供销展销电子商务系统lkw43
  • C++引用(内含和指针的对比)
  • Ubuntu22.04安装黑屏(进入U盘安装引导时 和 安装完成后)
  • 爬虫练习——动态网页的爬取(股票和百度翻译)
  • Netty应用(五) 之 Netty引入 EventLoop
  • 基于Vue的移动端UI框架整理
  • 内网安全-内网穿透
  • Stable Diffusion 模型下载:Disney Pixar Cartoon Type B(迪士尼皮克斯动画片B类)
  • vue+springboot前后端视频文件等的上传与展示(基于七牛云)
  • Elasticsearch:混合搜索是 GenAI 应用的未来
  • Leetcode 300 最长递增子序列
  • 【Java万花筒】加速Java应用程序:探索性能优化的利器
  • 基于华为云欧拉操作系统(HCE OS)构建HCE OS基础镜像
  • 【Redis笔记】分布式锁及4种常见实现方法
  • vue-生命周期+工程化开发(三)
  • 自己简单写的 事件订阅机制
  • [译]如何构建服务器端web组件,为何要构建?
  • 【翻译】Mashape是如何管理15000个API和微服务的(三)
  • CentOS 7 修改主机名
  • Invalidate和postInvalidate的区别
  • JavaScript设计模式系列一:工厂模式
  • Java新版本的开发已正式进入轨道,版本号18.3
  • Phpstorm怎样批量删除空行?
  • PHP面试之三:MySQL数据库
  • python_bomb----数据类型总结
  • React-redux的原理以及使用
  • Vue 2.3、2.4 知识点小结
  • vue的全局变量和全局拦截请求器
  • 机器学习 vs. 深度学习
  • 前端技术周刊 2018-12-10:前端自动化测试
  • 学习笔记:对象,原型和继承(1)
  • 用Visual Studio开发以太坊智能合约
  • 如何通过报表单元格右键控制报表跳转到不同链接地址 ...
  • # Apache SeaTunnel 究竟是什么?
  • # 安徽锐锋科技IDMS系统简介
  • # 达梦数据库知识点
  • #HarmonyOS:基础语法
  • #pragma data_seg 共享数据区(转)
  • $.ajax()方法详解
  • (poj1.3.2)1791(构造法模拟)
  • (备忘)Java Map 遍历
  • (读书笔记)Javascript高级程序设计---ECMAScript基础
  • (附源码)ssm考试题库管理系统 毕业设计 069043
  • (附源码)ssm跨平台教学系统 毕业设计 280843
  • (六)库存超卖案例实战——使用mysql分布式锁解决“超卖”问题
  • (三)mysql_MYSQL(三)
  • (十七)Flask之大型项目目录结构示例【二扣蓝图】
  • (使用vite搭建vue3项目(vite + vue3 + vue router + pinia + element plus))
  • (一)Dubbo快速入门、介绍、使用
  • **PyTorch月学习计划 - 第一周;第6-7天: 自动梯度(Autograd)**
  • .Net CF下精确的计时器
  • .NET Core 网络数据采集 -- 使用AngleSharp做html解析
  • .NET MVC第三章、三种传值方式
  • .Net mvc总结
  • .NET 反射 Reflect