当前位置: 首页 > news >正文

centos7部署Canal与Canal集成使用

在这里插入图片描述

1、简介

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x 、5.5.x 、5.6.x 、 5.7.x 、8.0.x

2、工作原理
2.1、MySQL主备复制原理
  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
2.2、canal 工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)
3、部署
3.1、准备

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
  • 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3.2、启动

下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.0.17 版本为例

wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz

解压缩

mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal

解压完成后,进入 /tmp/canal 目录,可以看到如下结构

drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang  160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang   48 2013-02-05 21:29 logs

配置修改

vi conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1

如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false

启动

sh bin/startup.sh

查看 server 日志

vi logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

查看 instance 的日志

vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

关闭

sh bin/stop.sh
4、安装canal-admin

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作

4.1、准备

canal-admin的限定依赖:

MySQL,用于存储配置和节点等相关数据
canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)

4.2、部署

下载 canal-admin, 访问 release 页面 , 选择需要的包下载, 如以 1.1.4 版本为例

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

解压缩

mkdir /tmp/canal-admin
tar zxvf canal.admin-$version.tar.gz  -C /tmp/canal-admin

解压完成后,进入 /tmp/canal 目录,可以看到如下结构

drwxr-xr-x   6 agapple  staff   204B  8 31 15:37 bin
drwxr-xr-x   8 agapple  staff   272B  8 31 15:37 conf
drwxr-xr-x  90 agapple  staff   3.0K  8 31 15:37 lib
drwxr-xr-x   2 agapple  staff    68B  8 31 15:26 logs

配置修改

vi conf/application.yml
server:port: 8089
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8spring.datasource:address: 127.0.0.1:3306database: canal_managerusername: canalpassword: canaldriver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=falsehikari:maximum-pool-size: 30minimum-idle: 1canal:adminUser: adminadminPasswd: admin

初始化元数据库

mysql -h127.1 -uroot -p

导入初始化SQL

> source conf/canal_manager.sql

a. 初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化 b. canal_manager.sql默认会在conf目录下,也可以通过链接下载 canal_manager.sql

启动

sh bin/startup.sh

查看 admin 日志

vi logs/admin.log
2019-08-31 15:43:38.162 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 8089 (http)
2019-08-31 15:43:38.180 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8089"]
2019-08-31 15:43:38.191 [main] INFO  org.apache.catalina.core.StandardService - Starting service [Tomcat]
2019-08-31 15:43:38.194 [main] INFO  org.apache.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/8.5.29
....
2019-08-31 15:43:39.789 [main] INFO  o.s.w.s.m.m.annotation.ExceptionHandlerExceptionResolver - Detected @ExceptionHandler methods in customExceptionHandler
2019-08-31 15:43:39.825 [main] INFO  o.s.b.a.web.servlet.WelcomePageHandlerMapping - Adding welcome page: class path resource [public/index.html]

此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456
在这里插入图片描述

关闭

sh bin/stop.sh

canal-server端配置
使用canal_local.properties的配置覆盖canal.properties

# register ip
canal.register.ip =# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =

启动admin-server即可。

或在启动命令中使用参数:sh bin/startup.sh local 指定配置

5、springboot项目集成使用

依赖配置:

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>
5.1、创建mvn标准工程:
mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample

maven3.0.5以上版本舍弃了create,使用generate生成项目

mvn archetype:generate -DgroupId=com.alibaba.otter -DartifactId=canal.sample
5.2、修改pom.xml,添加依赖
5.3、ClientSample代码
package com.alibaba.otter.canal.sample;
import java.net.InetSocketAddress;
import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}
}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}
}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}
}}
5.4、运行Client

首先启动Canal Server,

启动Canal Client后,可以从控制台从看到类似消息:

empty count : 1
empty count : 2
empty count : 3
empty count : 4

此时代表当前数据库无变更数据

  1. 触发数据库变更
mysql> use test;
Database changed
mysql> CREATE TABLE `xdual` (->   `ID` int(11) NOT NULL AUTO_INCREMENT,->   `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,->   PRIMARY KEY (`ID`)-> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;
Query OK, 0 rows affected (0.06 sec)
mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)

可以从控制台中看到:

empty count : 1
empty count : 2
empty count : 3
empty count : 4
================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERT
ID : 4    update=true
X : 2013-02-05 23:29:46    update=true

相关文章:

  • Python:词法分析(行结构与显式、隐式行拼接)
  • Leetcode2834. 找出美丽数组的最小和
  • 3.30每日一题(多元函数微分学)
  • Spring Boot自动配置原理、实战、手撕自动装配源码
  • linux入门---信号量
  • docker创建并访问本地前端
  • [java后端研发]——文件上传与下载(2种方式)
  • 代码随想录算法训练营Day 49 || 123.买卖股票的最佳时机III 、188.买卖股票的最佳时机IV
  • 49.批处理命令(1/2)
  • 《Redis实战》笔记
  • 企业计算机中了mkp勒索病毒怎么办,服务器中了勒索病毒如何处理
  • tomcat下载与使用教程
  • 每日一题(LeetCode)----数组--移除元素(五)
  • 无人驾驶智能:两车居然可以“交流”
  • 原型制作神器ProtoPie的使用Unity与网页跨端交互
  • electron原来这么简单----打包你的react、VUE桌面应用程序
  • express + mock 让前后台并行开发
  • express如何解决request entity too large问题
  • flask接收请求并推入栈
  • input的行数自动增减
  • ucore操作系统实验笔记 - 重新理解中断
  • ViewService——一种保证客户端与服务端同步的方法
  • 测试如何在敏捷团队中工作?
  • 批量截取pdf文件
  • 如何实现 font-size 的响应式
  • 数据科学 第 3 章 11 字符串处理
  • 微服务入门【系列视频课程】
  • AI算硅基生命吗,为什么?
  • #绘制圆心_R语言——绘制一个诚意满满的圆 祝你2021圆圆满满
  • (2)(2.10) LTM telemetry
  • (6)设计一个TimeMap
  • (附源码)ssm航空客运订票系统 毕业设计 141612
  • (未解决)jmeter报错之“请在微信客户端打开链接”
  • (一)插入排序
  • (转)Android学习系列(31)--App自动化之使用Ant编译项目多渠道打包
  • (转)PlayerPrefs在Windows下存到哪里去了?
  • .net MySql
  • .NET 分布式技术比较
  • .Net的DataSet直接与SQL2005交互
  • @GetMapping和@RequestMapping的区别
  • [202209]mysql8.0 双主集群搭建 亲测可用
  • [c#基础]值类型和引用类型的Equals,==的区别
  • [DM复习]Apriori算法-国会投票记录关联规则挖掘(上)
  • [E链表] lc83. 删除排序链表中的重复元素(单链表+模拟)
  • [Flutter]WindowsPlatform上运行遇到的问题总结
  • [GN] Vue3.2 快速上手 ---- 核心语法2
  • [HTML]Web前端开发技术12(HTML5、CSS3、JavaScript )——喵喵画网页
  • [IE 技巧] 显示/隐藏IE 的菜单/工具栏
  • [IE技巧] 如何让IE 启动的时候不加载任何插件
  • [JDK工具-2] javap 类文件解析工具-帮助理解class文件,了解Java编译器机制
  • [LeetCode][面试算法]逻辑闭环的二分查找代码思路
  • [LeetCode]Pow(x,n)
  • [LeetCode]—Simplify Path 简化路径表达式
  • [MySQL]日期和时间函数
  • [oeasy]python0002_终端_CLI_GUI_编程环境_游戏_真实_元宇宙