当前位置: 首页 > news >正文

springboot集成canal

目录

      • 一、打开mysql的binlog
        • 1.1 打开 MySQL 配置文件 `my.cnf`(通常位于 `/etc/mysql/my.cnf` 或 `/etc/my.cnf`)并添加或修改以下设置:
        • 1.2 重启mysql服务
        • 1.3 验证是否生效
      • 二、 部署canal 服务端(docker)
        • 2.1 下载启动脚本(可能需要梯子)
        • 2.2 启动服务
        • 2.3 验证服务启动成功
      • 三、springboot端集成canal客户端
        • 3.1 添加依赖 /配置
        • 3.2 客户端代码
        • 3.3 数据同步效果

项目上需要一个app,但是他们没有公网服务器,所以就在自家公网服务器开了一个mysql,项目上的服务器是能访问外网的,所以canal完美适配了这个需求

原理简介:canal服务端模拟mysql主从协议伪装成从数据库,从而读取主库的binlog,我们使用canal客户端自定义数据同步规则。

具体步骤

一、打开mysql的binlog

1.1 打开 MySQL 配置文件 my.cnf(通常位于 /etc/mysql/my.cnf/etc/my.cnf)并添加或修改以下设置:
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=row

注意 :确保binlog-format是 row模式

1.2 重启mysql服务

具体命令根据你的服务器类型决定

1.3 验证是否生效
SHOW MASTER STATUS;

二、 部署canal 服务端(docker)

2.1 下载启动脚本(可能需要梯子)
# 下载脚本
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh 
2.2 启动服务
# 构建一个destination name为test的队列
sh run.sh -e canal.auto.scan=false \-e canal.destinations=test \-e canal.instance.master.address=127.0.0.1:3306  \-e canal.instance.dbUsername=canal  \-e canal.instance.dbPassword=canal  \-e canal.instance.connectionCharset=UTF-8 \-e canal.instance.tsdb.enable=true \-e canal.instance.gtidon=false  \-e canal.instance.filter.regex=.*\\..* 

参数解释:

-e canal.auto.scan=false:关闭自动扫描数据库实例。即 Canal 不会自动检测数据库的变更实例,而是使用手动指定的配置。
-e canal.destinations=test:设置 Canal 的目标队列名称为 test。destination 是 Canal 中用来标识不同数据源的名称。
-e canal.instance.master.address=127.0.0.1:3306:指定主数据库的地址和端口。这里是本地 MySQL 实例,监听在 3306 端口。
-e canal.instance.dbUsername=canal:设置连接到主数据库的用户名为 canal。这个用户名需要有足够的权限以读取 MySQL 的 binlog。
-e canal.instance.dbPassword=canal:设置连接到主数据库的密码为 canal。这个密码需要与 dbUsername 配对,以验证用户身份。
-e canal.instance.connectionCharset=UTF-8:设置数据库连接的字符集为 UTF-8。确保字符集正确可以避免中文字符等数据的乱码问题。
-e canal.instance.tsdb.enable=true:启用 Canal 的时间序列数据库(TSDB)。TSDB 用于存储时间戳和位置信息,这有助于在重启时恢复复制状态。
-e canal.instance.gtidon=false:关闭 GTID(全局事务标识符)。如果 GTID 处于关闭状态,Canal 将基于 binlog 文件和位置进行复制,而不是 GTID。
-e canal.instance.filter.regex=.*\\..*:设置 binlog 过滤规则。这条规则表示 Canal 将监听所有数据库和所有表的变更。正则表达式 .*\\..* 匹配所有数据库(.)和表(.*)。
2.3 验证服务启动成功
docker logs <containerids>

可以看到这样的打印:
image.png

三、springboot端集成canal客户端

3.1 添加依赖 /配置
<!--  canal begin-->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.0</version>
</dependency>
<!--  canal end-->
canal:host: 127.0.0.1 #自己的canal服务器ipport: 11111  #canal默认端口destination: test #配置文件配置的名称username: rootpassword: 214365batch:size: 100
3.2 客户端代码
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.eco.db.entity.Record;
import com.eco.fishway.service.RecordService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Slf4j
@Component
public class CanalClient implements InitializingBean, DisposableBean {@Value("${canal.host}")private String canalHost;@Value("${canal.port}")private int canalPort;@Value("${canal.destination}")private String canalDestination;@Value("${canal.username}")private String canalUsername;@Value("${canal.password}")private String canalPassword;@Value("${canal.batch.size}")private int batchSize;private final RecordService recordService;private CanalConnector canalConnector;private ExecutorService executorService;public CanalClient(RecordService recordService) {this.recordService = recordService;}@Overridepublic void afterPropertiesSet() throws Exception {this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost, canalPort),canalDestination,canalUsername,canalPassword);this.executorService = Executors.newSingleThreadExecutor();this.executorService.execute(new Task());}@Overridepublic void destroy() throws Exception {if (executorService != null) {executorService.shutdown();}}private class Task implements Runnable {@Overridepublic void run() {while (true) {try {//连接canalConnector.connect();//订阅canalConnector.subscribe();while (true) {Message message = canalConnector.getWithoutAck(batchSize); // batchSize为每次获取的batchSize大小long batchId = message.getId();//获取批量的数量int size = message.getEntries().size();try {//如果没有数据if (batchId == -1 || size == 0) {// log.info("无数据");// 线程休眠2秒Thread.sleep(2000);} else {// 如果有数据,处理数据printEntry(message.getEntries());// 确认处理完成canalConnector.ack(batchId);}} catch (Exception e) {log.error(e.getMessage());// 程序错误,也直接确认,跳过这次偏移canalConnector.ack(batchId);}} catch (Exception e) {log.error("Error occurred when running Canal Client", e);} finally {canalConnector.disconnect();}}}}private void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (isTransactionEntry(entry)){//开启/关闭事务的实体类型,跳过continue;}//RowChange对象,包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等CanalEntry.RowChange rowChange;try {rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}//获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChange.getEventType();//打印Header信息log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType);//判断是否是DDL语句if (rowChange.getIsDdl()) {log.info("================》;isDdl: true,sql:{}", rowChange.getSql());}log.info(rowChange.getSql());//获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {//如果是删除语句if (eventType == CanalEntry.EventType.DELETE) {log.info(">>>>>>>>>> 删除 >>>>>>>>>>");printColumnAndExecute(rowData.getBeforeColumnsList(), "DELETE");//如果是新增语句} else if (eventType == CanalEntry.EventType.INSERT) {log.info(">>>>>>>>>> 新增 >>>>>>>>>>");printColumnAndExecute(rowData.getAfterColumnsList(), "INSERT");//如果是更新的语句} else {log.info(">>>>>>>>>> 更新 >>>>>>>>>>");//变更前的数据log.info("------->; before");printColumnAndExecute(rowData.getBeforeColumnsList(), null);//变更后的数据log.info("------->; after");printColumnAndExecute(rowData.getAfterColumnsList(), "UPDATE");}}}}/*** 执行数据同步* @param columns* @param type*/private void printColumnAndExecute(List<CanalEntry.Column> columns, String type) {if(type == null){return;}JSONObject jsonObject = new JSONObject();for (CanalEntry.Column column : columns) {jsonObject.put(column.getName(), column.getValue());}// 此处使用json转对象的方式进行转换Record bean = jsonObject.toBean(Record.class);if(type.equals("INSERT")){// 执行新增recordService.save(bean);log.info("新增成功->{}", jsonObject.toJSONString(0));}else if (type.equals("UPDATE")){// 执行编辑recordService.updateById(bean);log.info("编辑成功->{}", jsonObject.toJSONString(0));}else if (type.equals("DELETE")){// 执行删除recordService.removeById(bean.getRecordId());log.info("删除成功->{}", jsonObject.toJSONString(0));}}/*** 判断当前entry是否为事务日志*/private boolean isTransactionEntry(CanalEntry.Entry entry){if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN){log.info("********* 日志文件为:{}, 事务开始偏移量为:{}, 事件类型为type={}",entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getEntryType());return true;}else if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){log.info("********* 日志文件为:{}, 事务结束偏移量为:{}, 事件类型为type={}",entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getEntryType());return true;}else {return false;}}}
3.3 数据同步效果

image.png
有点感叹需求就是最好的老师,但是完不成需求就不好玩了

相关文章:

  • Windows系统使用内网穿透配置Mysql公网地址实现IDEA远程连接
  • 【持续监控与反馈】DevOps中的监控与反馈机制
  • TypeScript 装饰器详解
  • 大模型是否潜在地进行多跳推理?
  • 51 for 循环与 while 循环
  • OpenShift 4 - 用 oc-mirror 为离线 OpenShift 集群的 Mirror Registry 同步容器镜像
  • 使用npm全局安装typescript
  • Java 并发编程:一文了解 synchronized 的使用
  • JMeter接口测试-5.JMeter高级使用
  • 大模型学习笔记 - InstructGPT中的微调与对齐
  • AI测试入门:认识RAG(检索增强生成)
  • pip‘ 不是内部或外部命令,也不是可运行的程序 或批处理文件。
  • Unity 资源之 Break Items - Toon VFX破碎物品与卡通硬币动画分享
  • 码蹄集部分题目(2024OJ赛7.31-8.4;树状数组+并查集)
  • 8.3,8.4总结
  • [分享]iOS开发 - 实现UITableView Plain SectionView和table不停留一起滑动
  • EOS是什么
  • ES10 特性的完整指南
  • Git的一些常用操作
  • iOS 系统授权开发
  • Less 日常用法
  • nodejs实现webservice问题总结
  • Redis 懒删除(lazy free)简史
  • vue 配置sass、scss全局变量
  • XForms - 更强大的Form
  • 动态规划入门(以爬楼梯为例)
  • 分享自己折腾多时的一套 vue 组件 --we-vue
  • 关于Flux,Vuex,Redux的思考
  • 力扣(LeetCode)965
  • 免费小说阅读小程序
  • 深入体验bash on windows,在windows上搭建原生的linux开发环境,酷!
  • 数据科学 第 3 章 11 字符串处理
  • 我建了一个叫Hello World的项目
  • 线上 python http server profile 实践
  • 智能网联汽车信息安全
  • 格斗健身潮牌24KiCK获近千万Pre-A轮融资,用户留存高达9个月 ...
  • 如何正确理解,内页权重高于首页?
  • 支付宝花15年解决的这个问题,顶得上做出十个支付宝 ...
  • ​Linux·i2c驱动架构​
  • # 详解 JS 中的事件循环、宏/微任务、Primise对象、定时器函数,以及其在工作中的应用和注意事项
  • ### Error querying database. Cause: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
  • #{}和${}的区别是什么 -- java面试
  • #VERDI# 关于如何查看FSM状态机的方法
  • #Z2294. 打印树的直径
  • #鸿蒙生态创新中心#揭幕仪式在深圳湾科技生态园举行
  • (2)关于RabbitMq 的 Topic Exchange 主题交换机
  • (十五)Flask覆写wsgi_app函数实现自定义中间件
  • (太强大了) - Linux 性能监控、测试、优化工具
  • (图)IntelliTrace Tools 跟踪云端程序
  • (一)、软硬件全开源智能手表,与手机互联,标配多表盘,功能丰富(ZSWatch-Zephyr)
  • (原)本想说脏话,奈何已放下
  • (转) Face-Resources
  • (转)Unity3DUnity3D在android下调试
  • .NET 漏洞分析 | 某ERP系统存在SQL注入
  • .NetCore实践篇:分布式监控Zipkin持久化之殇