当前位置: 首页 > news >正文

canal: 连接kafka (docker)

一、确保mysql binlog开启并使用ROW作为日志格式
docker 启动mysql 5.7配置文件
my.cnf

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server-id=1

在这里插入图片描述

在这里插入图片描述
一定要确保上述两个值一个为ROW,一个为ON

二、下载canal的run.sh

https://github.com/alibaba/canal/blob/master/docker/run.sh

三、启动canal容器,其中配置了mysql的信息和kafka的信息:

./run.sh -e canal.auto.scan=false -e canal.destinations=me -e canal.instance.master.address=xx.xx.xx.xx:3306  -e canal.instance.dbUsername=root  -e canal.instance.dbPassword=yourpassword  -e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false -e canal.instance.topic=me -e canal.mq.partition=0  -e canal.mq.topic=me

四、通过docker exec -it canal-server /bin/bash进入canal容器,编辑容器中的

/home/admin/canal-server/conf/canal.properties

其中要修改两处,一是工作模式改为kafka(canal.serverMode),二是改MQ的地址(canal.mq.servers),完整配置类似如下:

#################################################
######### 		common argument		############# 
#################################################
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation = false# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360# aliyun ak/sk , support rds/mq
canal.aliyun.accesskey =
canal.aliyun.secretkey =#################################################
######### 		destinations		############# 
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml##################################################
######### 		     MQ 		     #############
##################################################
canal.mq.servers = xx.xx.xx.xx:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all

五、重启canal容器
docker restart canal-server

这样,对于上面的mysql 数据库中的增删改,在kakfa都可以在me 这个topic 收到数据了。

在这里插入图片描述

相关文章:

  • 北京小蓝蜂科技有限公司 基本情况
  • SAP Fiori开发中的JavaScript基础知识9 - 代码注释,严格模式,JSON
  • Adobe Illustrator 2023 for Mac/Win:创意无限,设计无界
  • 【Qt】:坐标
  • 使用Docker Compose一键部署前后端分离项目(图文保姆级教程)
  • 基于Spring Boot的在线学习系统的设计与实现
  • 虚幻引擎资源加密方案解析
  • vue3+threejs新手从零开发卡牌游戏(十四):调整卡组位置,添加玩家生命值HP和法力值Mana信息
  • 在项目中缓存如何优化?SpringCache接口返回值的缓存【CachePut、CacheEvict、Cacheable】
  • 【Java八股面试系列】中间件-Redis
  • android 13 相册和拍照问题
  • css简单动画实现
  • 记录关于智能家居的路程的一个bug___Segmentation fault(段错误)
  • 基于springboot实现网页时装购物系统项目【项目源码+论文说明】
  • 深度学习pytorch——卷积神经网络(持续更新)
  • ES6指北【2】—— 箭头函数
  • 【EOS】Cleos基础
  • 【翻译】babel对TC39装饰器草案的实现
  • 4. 路由到控制器 - Laravel从零开始教程
  • css属性的继承、初识值、计算值、当前值、应用值
  • ES6--对象的扩展
  • Facebook AccountKit 接入的坑点
  • Git 使用集
  • MySQL用户中的%到底包不包括localhost?
  • Python socket服务器端、客户端传送信息
  • Redux系列x:源码分析
  • SegmentFault 社区上线小程序开发频道,助力小程序开发者生态
  • Spring Security中异常上抛机制及对于转型处理的一些感悟
  • Vim Clutch | 面向脚踏板编程……
  • 产品三维模型在线预览
  • 彻底搞懂浏览器Event-loop
  • 猫头鹰的深夜翻译:Java 2D Graphics, 简单的仿射变换
  • 使用Tinker来调试Laravel应用程序的数据以及使用Tinker一些总结
  • 通过git安装npm私有模块
  • 微信小程序开发问题汇总
  • 一份游戏开发学习路线
  • 运行时添加log4j2的appender
  • 如何用纯 CSS 创作一个货车 loader
  • ​你们这样子,耽误我的工作进度怎么办?
  • #FPGA(基础知识)
  • (4)(4.6) Triducer
  • (C语言)字符分类函数
  • (附源码)springboot人体健康检测微信小程序 毕业设计 012142
  • (解决办法)ASP.NET导出Excel,打开时提示“您尝试打开文件'XXX.xls'的格式与文件扩展名指定文件不一致
  • (全注解开发)学习Spring-MVC的第三天
  • (译)计算距离、方位和更多经纬度之间的点
  • (原創) 人會胖會瘦,都是自我要求的結果 (日記)
  • (转)Android中使用ormlite实现持久化(一)--HelloOrmLite
  • .NET Core 网络数据采集 -- 使用AngleSharp做html解析
  • .net core控制台应用程序初识
  • .NET Standard、.NET Framework 、.NET Core三者的关系与区别?
  • .net 开发怎么实现前后端分离_前后端分离:分离式开发和一体式发布
  • .NET/C# 反射的的性能数据,以及高性能开发建议(反射获取 Attribute 和反射调用方法)
  • /etc/apt/sources.list 和 /etc/apt/sources.list.d
  • @RequestMapping-占位符映射