当前位置: 首页 > news >正文

Canal单机部署

目录

一、前期准备

1、配置binlog日志

2、配置MQ服务

二、搭建canal

1、下载安装包

2、部署canal-admin的UI管理界面

2-1、创建&解压admin

2-2、配置UI管理界面

2-3、初始化元数据库

2-4、启动Canal Admin

3、部署canal-server服务

3-1、创建&解压deployer包

3-2、配置关联admin

3-3、启动canalServer&修改配置

三、集成springboot验证

1、必要依赖

2、配置文件

3、监听消息

4、验证


一、前期准备

1、配置binlog日志

前期检查:mysql是否开启了binlog日志

SHOW VARIABLES LIKE '%log_bin%';

如果为OFF,则需要配置开启。

进入mysql的配置文件,window版5.7.43,在安装目录找到my.ini,添加

[mysqld]

log-bin= H:\\binlog\mysql-bin
server-id=100
binlog_format = ROW

然后重启服务,重新验证查看。

2、配置MQ服务

自行搭建一套可用的MQ服务,可参考RocketMq单机部署,这里跳过

二、搭建canal

1、下载安装包

官网地址:https://github.com/alibaba/canal/releaseshttps://github.com/alibaba/canal/releaseshttps://github.com/alibaba/canal/releases

国内直接下载比较慢,建议使用加速器GitHub 文件加速 ,复制官网地址对应包的下载链接粘贴到输入框中,点击下载,可加速下载。建议下载 1.1.7

2、部署canal-admin的UI管理界面

2-1、创建&解压admin

# mkdir /usr/local/share/canalFolder/canalAdmin

# cd /usr/local/share/canalFolder/canalAdmin

# tar zxvf canal.admin-1.1.7-SNAPSHOT.tar.gz

2-2、配置UI管理界面

# cd /usr/local/share/canalFolder/canalAdmin/conf

# vim application.yml

2-3、初始化元数据库

如果服务器没有mysql客户端,可以将canal_manager.sql内容拷贝出来单独执行(即把建表语句直接复制到本地mysql客户端如navicat执行即可

2-4、启动Canal Admin

# sh startup.sh

查看端口 8089 是否有启动

# nc -zv 192.168.152.128 8089

如下表示成功

!!若出现下方这种很长串的提示。。端口又没启动成功,详细细节进入logs目录查看

# cat /usr/local/share/canalFolder/canalAdmin/logs/admin.log

错误内容是所要链接的数据权限不支持

a.原因:因为虚拟机部署把canal服务和数据库分开进行,所以有2台服务器,直接修改连接池ip后,出现了java.sql.SQLException: null, message from server: “Host ‘xxx’ is not allowed to connect这样的错误,它的意思就是安装了数据库的服务器不允许部署项目的服务器进行远程连接。也就是权限问题,修改权限就可以了,

b修改方法:

登录mysql : mysql -uroot -p;    并输入密码
b-1.打开mysql控制台,输入:进入mysql库
use mysql;
b-2.输入:查询mysql库的表
show tables;
b-3.输入:查看user表的host
select host from user;
b-4.输入:修改可以远程访问
update user set host ='%' where user ='root';
b-5.输入:
flush privileges; 

flush privileges; 命令本质上的作用是将当前user和privilige表中的用户信息/权限设置从mysql库(MySQL数据库的内置库)中提取到内存里。

MySQL用户数据和权限有修改后,希望在"不重启MySQL服务"的情况下直接生效,那么就需要执行这个命令。

通常是在修改ROOT帐号的设置后,怕重启后无法再登录进来,那么直接flush之后就可以看权限设置是否生效。而不必冒太大风险。

成功后访问页面地址信息:http://192.168.152.128:8089/

登录一开始设置的 账号密码: admin/123456

这里只做单机版,所以不进行集群配置,有需要可安装ZK自行配置

进入“Server管理”配置Server方式有2种:

第一种是直接手动在管理界面配置

第二种是在通过设定配置文件,启动后运行,因为都要监听,建议直接使用第二种方式(配置如第3项所列)

3、部署canal-server服务

3-1、创建&解压deployer包

# mkdir /usr/local/share/canalFolder/canalDeployer

# cd /usr/local/share/canalFolder/canalDeployer

# tar zxvf canal.deployer-1.1.7-SNAPSHOT.tar.gz

3-2、配置关联admin

# cd conf

# mv canal.properties canal.properties_bak

# mv canal_local.properties canal.properties

# vim canal.properties

# register ip
canal.register.ip = 192.168.152.128# canal admin config   canalAdmin 的链接、端口、用户名和MD5密码
canal.admin.manager = 192.168.152.128:8089
canal.admin.port = 11110
canal.admin.user = admin 
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# admin auto register 
canal.admin.register.auto = true
# 需要注册到canalAdmin的集群名,对应集群管理里面创建的集群名
canal.admin.register.cluster =
# 当前canalServer在Server管理里面创建的Server名称
canal.admin.register.name = canalServer

canal.admin.passwd 密码可参考一开始导入的表结构,也可修改

3-3、启动canalServer&修改配置

# sh ../bin/startup.sh

启动后,通过canal-admin管理界面,进入server管理,修改server配置

里面的配置内容修改只改几个地方

29行: canal.serverMode = rocketMQ (这里设置了rocketMQ所以需要配置162行有关的内容,不同选择修改不同地方的配置)

86行: canal.instance.tsdb.enable = false

127行: canal.mq.flatMessage = true(flatMessage 为true 生产到mq的消息就是json的, 否则就是protobuf二进制的)

PS:如果server管理中server中的状态显示“断开”,应该是密码验证问题:

canal server 与 canal admin 的认证机制

canal admin 的 conf/application.yml 里面定义了账号密码明文
canal.adminUser: admin
canal.adminPasswd:admin
canal server 的 conf/canal_local.properties 里面定义了账号密码密文
canal.admin.user: admin
canal.admin.passwd:6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9

密文的生成方式: select password(‘123456’);

双向认证: canal server 向 canal admin 注册的时候会以密码密文做认证, canal admin 对 canal server 做连通性测试的时候也会将密码明文加密之后做认证 (连通性测试失败的时候,canal admin web 会显示对应的canal server 处于 “断开” 状态)

接下来通过UI界面到 Instance管理 新增配置,内容如下

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0# enable gtid use true/false 是否开启全局事务id
canal.instance.gtidon=false
canal.instance.master.gtid=# position info
# 需要同步binlog的数据库地址及端
canal.instance.master.address=172.16.12.68:3306 
# 需要读取的起始的binlog文件
canal.instance.master.journal.name=
# 需要读取的起始的binlog文件的偏移量
canal.instance.master.position=
# 需要读取的起始的binlog的时间戳
canal.instance.master.timestamp=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# table meta tsdb info
# 是否开启table meta的时间序列版本记录功能
canal.instance.tsdb.enable=true
# 存储canal记录表结构日志的数据库jdbc
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password 需要同步binlog的数据库的用户名和密码
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex # mysql 数据解析关注的表,Perl正则表达式, .*\\..*默认所有库所有表
canal.instance.filter.regex=test\\.exc_user
# table black regex # 过滤那些不符合要求的table,这些table的数据将不会被解析和传送
canal.instance.filter.black.regex=mysql\\..*,sys\\..*,performance_schema\\..*,information_schema\\..*,seata\\..*,.*\\.undo_log
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config, 要监听的 topic 
canal.mq.topic=test-top
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

截止到此,canal,部署完成。

三、集成springboot验证

1、必要依赖

        <!-- 这里要考虑版本兼容问题,这是与jdk8匹配的jar版本 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId><version>2021.0.4.0</version></dependency><!-- Canal 相关 --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.7</version><exclusions><exclusion><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId></exclusion><exclusion><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.7</version></dependency>

2、配置文件

# rocket配置 
spring:cloud:stream:function:definition: twoConsumer # 方法定义bindings:# 发送必须配置此处producerInfo-out-0:destination: test_topic # topic消息主题# 配置channel消息通道 (接收必须配置此处)twoConsumer-in-0:destination: test-top # topic消息主题group: consumer-group # 消费者组rocketmq:binder:name-server: 192.168.152.130:9876 # rocketmq服务地址# 配置消息通道独特属性(仅适用于rocketmq)bindings:# 发送必须配置此处# 配置channel消息通道(生产者:[functionName]-out-[index],消费者:[functionName]-in-[index])producerInfo-out-0:producer:group: consumer-group #生产者的组名称(一般与消费者组相同),用于负载均衡sync: true # 是否开启同步发送

3、监听消息

消息消费者类 TwoConsumer

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;import java.util.Collection;
import java.util.function.Consumer;/*** 消息消费者类*/
@Configuration
@Slf4j
public class TwoConsumer implements Consumer<Message<Object>> {/*** 本方法处理监听 canal 服务的操作变更 (对二进制进行转化含protobuf和JSON)* @param message*/@Overridepublic void accept(Message< Object > message) {log.info("收到消息:{}", message);// 获取消息 IDString msgId = message.getHeaders().get("ROCKET_MQ_MESSAGE_ID", String.class);// 获取消息体Object payload = message.getPayload();System.out.println(JSONObject.toJSON(payload));// 这里进行业务处理}
}

4、验证

在数据库进行修改操作

接收结果如下:

OVER!

参考文献:

异常message from server: “Host ‘xx.xx.xx.xx’

CanalAdmin部署文档

协议转化处理

Canal配置

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Python模拟退火算法
  • 一个Android下载网络图片显示并保存到系统相册的完整案例
  • 关于k8s的pvc存储卷
  • haproxy七层代理总结
  • SpringBoot参数校验详解
  • PyTorch 基础学习(6)- 自动微分
  • Android Studio设置 offline 模式
  • 解决 Windows 任务栏图标不更新问题:深入解析与解决方案
  • C++初阶_2:引用
  • docker中调用GPU算力(debain12系统)
  • Spring 事务机制
  • Linux中针对文件权限的解析
  • FairyGUI-egret 优化ui资源加载
  • Linux--C语言之输入输出函数及格式控制输出
  • 如何在Shopify开发中高度还原Figma设计稿
  • 【Leetcode】101. 对称二叉树
  • (三)从jvm层面了解线程的启动和停止
  • Android 控件背景颜色处理
  • axios请求、和返回数据拦截,统一请求报错提示_012
  • Git的一些常用操作
  • HTTP中的ETag在移动客户端的应用
  • Js基础——数据类型之Null和Undefined
  • Python实现BT种子转化为磁力链接【实战】
  • Rancher-k8s加速安装文档
  • 阿里云购买磁盘后挂载
  • 产品三维模型在线预览
  • 浮现式设计
  • ​zookeeper集群配置与启动
  • ​比特币大跌的 2 个原因
  • ​如何在iOS手机上查看应用日志
  • # 透过事物看本质的能力怎么培养?
  • #C++ 智能指针 std::unique_ptr 、std::shared_ptr 和 std::weak_ptr
  • #免费 苹果M系芯片Macbook电脑MacOS使用Bash脚本写入(读写)NTFS硬盘教程
  • (2)Java 简介
  • (LeetCode C++)盛最多水的容器
  • (笔试题)分解质因式
  • (附源码)计算机毕业设计SSM智能化管理的仓库管理
  • (六)软件测试分工
  • (万字长文)Spring的核心知识尽揽其中
  • (微服务实战)预付卡平台支付交易系统卡充值业务流程设计
  • (一)Dubbo快速入门、介绍、使用
  • (转)机器学习的数学基础(1)--Dirichlet分布
  • .NET Core SkiaSharp 替代 System.Drawing.Common 的一些用法
  • .NET Entity FrameWork 总结 ,在项目中用处个人感觉不大。适合初级用用,不涉及到与数据库通信。
  • .net framework profiles /.net framework 配置
  • .NET Standard 支持的 .NET Framework 和 .NET Core
  • .NET 发展历程
  • .Net 访问电子邮箱-LumiSoft.Net,好用
  • .Net程序猿乐Android发展---(10)框架布局FrameLayout
  • .NET设计模式(8):适配器模式(Adapter Pattern)
  • .NET性能优化(文摘)
  • .net中的Queue和Stack
  • .net中调用windows performance记录性能信息
  • .vimrc php,修改home目录下的.vimrc文件,vim配置php高亮显示
  • /etc/skel 目录作用