当前位置: 首页 > news >正文

MySQL数据流转集成,从快速测试Maxwell开始

这是学习笔记的第 2142 篇文章


640?wx_fmt=gif

  在Binlog解析方向和数据流转方向上,经常会提到比较有名的几类工具,阿里的Canal,Zendesk的Maxwell和Yelp的mysql_streamer,我们来简单说一下Maxwell。

  

   在功能完善性和生态建设上,Canal和Zendesk整体的表现要好一些,它们都是基于Java开发,支持多种模式的数据上下游集成,如果是想快速上手,Maxwell是一个不错的选择,而mysql_streamer的维护时间在2017年左右,在行业里看到的案例相对要少。

     Maxwell相对比较精巧,它能实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,这一点是我优先考虑Maxwell的首要原因,当然它也可以作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。如果说使用场景,它的常见应用场景有ETL、维护缓存、收集表级别的DML指标、增量到搜索引擎、数据分区迁移等。

    我们可以快速通过一个部署测试的过程来快速熟悉Maxwell,整个测试基于云主机环境。 

    首先要安装JDK和maven

   可以快速验证:

  

  # java -version	
    openjdk version "1.8.0_232"	
 	
   # mvn -v	
    Apache Maven 3.0.5 (Red Hat 3.0.5-17)

其中对于maven源,可以考虑云服务的配置,如果是默认的maven源在后续编译安装的时候会有些麻烦。 

   在此演示的是通过获得Maxwell的源码编译得到安装文件来进行说明的,因为GitHub的下载文件较大,下载网速较慢,我们使用Maxwell的源码文件编译安装,这个项目代码量相对比较小,所以比较便捷,也方便后续做一些改动和调试。

 下载源码包的链接是:,

   wget https://github.com/zendesk/maxwell/archive/v1.23.2.tar.gz

  解压进入安装目录后,执行make package命令,即可开始整个工具的编译安装。

  maven源如果使用默认的,整个编译过程会比较长。

[INFO] BUILD SUCCESS	
[INFO] Total time: 2:21.275s	
[INFO] Finished at: Thu Oct 24 18:38:30 CST 2019	
[INFO] Final Memory: 48M/182M

而如果使用云主机服务范围内的maven源,速度就快了好多。

[INFO] Total time: 48.574s	
[INFO] Finished at: Thu Oct 24 19:08:50 CST 2019	
[INFO] Final Memory: 49M/186M

编译完成之后在母target下面就有编译生成的

我们来做下数据库的初始化, 可以参考如下的文章快速完成数据库环境的部署。

然后创建数据库相关用户和权限配置

主要有复制相关的权限,在解析的过程中,Maxwell会把自己包装成一个Slave,然后进行数据通信,当然这个过程

CREATE USER 'maxwell'@'%' identified by 'XXXXXX';	
GRANT ALL on maxwell.* to 'maxwell'@'%' ;	
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';	
FLUSH PRIVILEGES;

为了方便测试和降低复杂度,我们先不做Kafka的数据流转测试,而是基于标准JSON输出做一些简单的分析。

接下来就可以直接调用Maxwell来做一些测试了,在此我没有使用配置文件,而是使用了大部分默认选项,使用标准输出模式。

bin/maxwell --user='maxwell' --password='XXXXXX' --host=127.0.0.1 --port=33071 --producer=stdout

创建表输出的JSON信息如下,它是完全按照SQL类型定义的不同的JSON结构。

18:45:35,946 INFO  AbstractSchemaStore - storing schema @Position[BinlogPosition[binlog.000006:32499], lastHeartbeat=1571913934970] after applying "create table test_data(id int,name varchar(30))" to test, new schema id is 2	
{"database":"test","table":"test_data","type":"insert","ts":1571913951,"xid":811,"commit":true,"data":{"id":1,"name":"aa"}}

我们测试下手工开启事务后,是否在解析中能够正常解析出事务的信息。

mysql> begin;	
Query OK, 0 rows affected (0.00 sec)	

	
mysql> insert into test_data values(2,'bb');	
Query OK, 1 row affected (0.00 sec)	

	
mysql> insert into test_data values(3,'cc'); 	
Query OK, 1 row affected (0.00 sec)	

	
mysql> commit;	
Query OK, 0 rows affected (0.03 sec)

解析的结果如下,可以看到xid的基本信息。

{"database":"test","table":"test_data","type":"insert","ts":1571914005,"xid":835,"xoffset":0,"data":{"id":2,"name":"bb"}}	
{"database":"test","table":"test_data","type":"insert","ts":1571914009,"xid":835,"commit":true,"data":{"id":3,"name":"cc"}}

删除数据,如果影响行数是多行,会基于行模式解析

mysql> delete from test_data where id>1;	
Query OK, 2 rows affected (0.01 sec)

解析日志如下:

{"database":"test","table":"test_data","type":"delete","ts":1571914621,"xid":1021,"xoffset":0,"data":{"id":2,"name":"bb"}}	
{"database":"test","table":"test_data","type":"delete","ts":1571914621,"xid":1021,"commit":true,"data":{"id":3,"name":"cc"}}

update数据,会显示变更前和变更后的数据情况。

mysql> update test_data set name='aaa' where id=1;	
Query OK, 1 row affected (0.01 sec)	
Rows matched: 1  Changed: 1  Warnings: 0

显示日志如下:

{"database":"test","table":"test_data","type":"update","ts":1571914754,"xid":1067,"commit":true,"data":{"id":1,"name":"aaa"},"old":{"name":"aa"}}

在Maxwell的实现中,有很多定制化的配置,比如默认会创建一个maxwell命名的数据库,当然也可以指定多种选型进行配置管理,然后在这个数据库下面配置一些表,这方面的内容我们随后的文章会展开来进行分析。

mysql> show tables;	
+-------------------+	
| Tables_in_maxwell |	
+-------------------+	
| bootstrap         |	
| columns           |	
| databases         |	
| heartbeats        |	
| positions         |	
| schemas           |	
| tables            |	
+-------------------+	
7 rows in set (0.00 sec)

从昨天下班前快速测试Maxwell的方案,从部署到基本验证,整个过程不超过30分钟,整体来说已经比较快捷了。

近期热文:

QQ群号:763628645

QQ群二维码如下,个人微信号:jeanron100, 添加请注明:姓名+地区+职位,否则不予通过

640?wx_fmt=png640?wx_fmt=png

在看,让更多人看到

相关文章:

  • 从故障处理流程看结构化思维
  • dbaplus-爱可生社区-北京站沙龙归来
  • 唯一ID生成算法剖析,看看这篇就够了
  • 关于学习,很有必要看看这张图
  • 迁移到MySQL的业务架构演进实战
  • 一个看似纠结的MySQL标签需求的梳理
  • 技术学习中的三个有趣的数字
  • 数据库修改密码风险高,如何保证业务持续,这几种密码双活方案可以参考
  • MySQL业务双活的初步设计方案
  • 美女主持直播,被突发意外打断!湾区网友却高喊: 我懂!超甜
  • MySQL周期表管理太繁琐,通过Python自定义工具方法优雅解决
  • 千万不要养大型犬,多么痛的领悟
  • 《吊打面试官》系列-Redis基础
  • 为什么我们的数据科学团队无法产生价值
  • 在繁杂的业务需求中,如何找到API设计的平衡点
  • __proto__ 和 prototype的关系
  • 【399天】跃迁之路——程序员高效学习方法论探索系列(实验阶段156-2018.03.11)...
  • 【刷算法】从上往下打印二叉树
  • chrome扩展demo1-小时钟
  • docker容器内的网络抓包
  • EventListener原理
  • Python学习之路16-使用API
  • spring cloud gateway 源码解析(4)跨域问题处理
  • Xmanager 远程桌面 CentOS 7
  • Zsh 开发指南(第十四篇 文件读写)
  • 百度贴吧爬虫node+vue baidu_tieba_crawler
  • 从零开始学习部署
  • 前端存储 - localStorage
  • 数据可视化之 Sankey 桑基图的实现
  • 思否第一天
  • 通过几道题目学习二叉搜索树
  • 源码之下无秘密 ── 做最好的 Netty 源码分析教程
  • 自定义函数
  • 自制字幕遮挡器
  • 浅谈sql中的in与not in,exists与not exists的区别
  • #、%和$符号在OGNL表达式中经常出现
  • #【QT 5 调试软件后,发布相关:软件生成exe文件 + 文件打包】
  • ( 用例图)定义了系统的功能需求,它是从系统的外部看系统功能,并不描述系统内部对功能的具体实现
  • (1)SpringCloud 整合Python
  • (4.10~4.16)
  • (AtCoder Beginner Contest 340) -- F - S = 1 -- 题解
  • (介绍与使用)物联网NodeMCUESP8266(ESP-12F)连接新版onenet mqtt协议实现上传数据(温湿度)和下发指令(控制LED灯)
  • (论文阅读23/100)Hierarchical Convolutional Features for Visual Tracking
  • (五)关系数据库标准语言SQL
  • (已更新)关于Visual Studio 2019安装时VS installer无法下载文件,进度条为0,显示网络有问题的解决办法
  • (转)Linux下编译安装log4cxx
  • (转)MVC3 类型“System.Web.Mvc.ModelClientValidationRule”同时存在
  • (转)ORM
  • ***通过什么方式***网吧
  • .java 9 找不到符号_java找不到符号
  • .NET 4.0网络开发入门之旅-- 我在“网” 中央(下)
  • .NET HttpWebRequest、WebClient、HttpClient
  • .NET 的程序集加载上下文
  • .NET/C# 利用 Walterlv.WeakEvents 高性能地定义和使用弱事件
  • .NET/C# 如何获取当前进程的 CPU 和内存占用?如何获取全局 CPU 和内存占用?