当前位置: 首页 > news >正文

实时数据同步工具<Maxwell 操作案例>

文章目录

  • 案例一:监控MySQL中的数据并输出到控制台
  • 案例二:Maxwell监控mysql的数据输出到kafka
  • 案例三:监控MySQL指定表的数据并输出到kafka

案例一:监控MySQL中的数据并输出到控制台

  1. 运行Maxwell来监控mysql数据的更新

    [root@bigdata01 maxwell-1.29.2]# bin/maxwell --user='maxwell' --password='123456' --host='bigdata01' --producer=stdout
    
  2. 向mysql的test_maxwell数据库中创建表test

    mysql> CREATE TABLE test(`name` VARCHAR(30),sex VARCHAR(10),score INT);
    

    Maxwell控制台的输出内容如下:

    11:20:31,509 INFO  AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000002:67898], lastHeartbeat=1664594424396] after applying 
    "create table test(
    `name` varchar(30),
    sex varchar(10),
    score int)" 
    to test_maxwell, new schema id is 2
    
  3. 向表test中插入一条数据

    mysql> INSERT INTO test VALUES('ZhangSan','male',28); 
    

    向test表中插入数据时在maxwell控制台输出的数据如下:

    {
    	"database":"test_maxwell", //数据库名字
    	"table":"test", //表名
    	"type":"insert", //操作类型
    	"ts":1664594529, //操作时间
    	"xid":1403, //操作id
    	"commit":true, //提交状态
    	"data":   //数据
    		{"name":"ZhangSan",
    		"sex":"male",
    		"score":28
    		}
    }
    

    当插入2条数据时Maxwell控制台的输出内容:
    在这里插入图片描述
    说明:当我们对表test插入两条数据,控制台就会有两个json日志,说明Maxwell是以数据行为为单位进行进行日志的采集

  4. 当我们修改test表中的一条数据时,

    mysql> UPDATE test SET score=26 WHERE  NAME='lisi';
    

    Maxwell控制台的输出如下:

    {
    	"database":"test_maxwell",
    	"table":"test",
    	"type":"update",  //对数据的操作类型
    	"ts":1664594697, //操作的时间
    	"xid":1855,   //操作id
    	"commit":true,  //任务提交状态true表示成功
    	"data":  //修改后的数据
    		{
    		"name":"lisi",
    		"sex":"male",
    		"score":26
    		},
    	"old":    //原来数据
    	{
    		"score":19
    	}
    }
    
  5. 删除test表中一条数据

    mysql> delete from test where name='lisi';
    

    maxwell 的控制台输出:

    {
        "database":"test_maxwell",
        "table":"test",
        "type":"delete",
        "ts":1664595306,
        "xid":3458,
        "commit":true,
        "data":
        {
            "name":"lisi",
            "sex":"male",
            "score":26
        }
    }
    

案例二:Maxwell监控mysql的数据输出到kafka

  1. 普通输出到kafka
    (1)此案例需要先启动zookeeper和kafka

    启动zk,启动kafka
    [root@bigdata01 maxwell-1.29.2]# xcall.sh jps
    ---------------------bigdata01----------------
    3648 Jps
    2947 QuorumPeerMain
    3331 Kafka
    ---------------------bigdata02----------------
    2378 QuorumPeerMain
    2939 Jps
    2764 Kafka
    ---------------------bigdata03----------------
    2965 Jps
    2760 Kafka
    2381 QuorumPeerMain
    

    (2)启动Maxwell监控binlog

    bin/maxwell --user='maxwell' --password='123456' --host='hadoop102'--producer=kafka --kafka.bootstrap.servers=bigdata01:9092 --kafka_topic=maxwell
    
    [root@bigdata01 ~]# xcall.sh jps
    	---------------------bigdata01----------------
    	4688 Maxwell
    	2947 QuorumPeerMain
    	3331 Kafka
    	4781 Jps
    	---------------------bigdata02----------------
    

    (3)打开 kafka的控制台的消费者消费maxwell主题

    bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic maxwell
    
    [root@bigdata01 ~]# jps
    4688 Maxwell
    2947 QuorumPeerMain
    3331 Kafka
    5195 Jps
    4812 ConsoleConsumer
    

    (4)对test_maxwell库中的test表进行操作
    插入一条数据

    mysql> INSERT INTO test VALUES('sisi','famale',100);
    

    (5)kafka消费者端的输出

    {
    	"database":"test_maxwell",
    	"table":"test",
    	"type":"insert",
    	"ts":1664595971,
    	"xid":4331,
    	"commit":true,
    	"data":{
    		"name":"wangwu",
    		"sex":"male",
    		"score":80
    	}
    }
    

    注意:kafka 消费者端输出结果与我们使用maxwell监控mysql时输出到maxwell控制台的json数据:格式相同

  2. Maxwell配置监控实现kafka分区控制

    在公司生产环境中,我们一般都会用 maxwell 监控多个 mysql 库的数据,然后将这
    些数据发往 kafka 的一个主题 Topic,并且这个主题也肯定是多分区的,为了提高并发度。
    那么如何控制这些数据的分区问题,就变得至关重要,实现步骤如下:

    (1)创建一个具有三个分区的topic

    [root@bigdata01 kafka]# bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 3 --replication-factor 3 --topic maxwell3
    

    (2)修改Maxwell的配置文件config.properties
    生产者环境设置为kafka,本文选择的分区模式是按数据库进行分区,控制分区模式包括 库名,表名,列名,主键

    producer=kafka
    kafka.bootstrap.servers=bigdata01:9092
    
    # mysql login info
    host=bigdata01
    user=maxwell
    password=123456
    
    kafka_topic=maxwell3
    producer_partition_by=database
    

    (3) 使用Maxwell的配置文件config.properties启动Maxwell进程

    [root@bigdata01 maxwell-1.29.2]# bin/maxwell --config ./config.properties
    

    (4)向Maxwell监控的数据库test_maxwell和test_maxwell3中分别插入一条数据,我们观察kafka的变化

    maxwell监控哪些数据库可以再my.cnf文件中进行配置
    [mysqld]
    server_id=1
    log-bin=mysql-bin
    binlog_format=row
    binlog-do-db=test_maxwell
    binlog-do-db=test_maxwell2

    • 向test_maxwell库中的test表插入一条数据

    INSERT INTO test VALUES(‘wangwu’,‘male’,80);

    在kafka tool中查看kafka的状态
    在这里插入图片描述

    • 向test_maxwell2库中的test表插入一条数据
    mysql> INSERT INTO test VALUES('sisi','famale',100);
    

    在kafka tool查看kafka状态
    在这里插入图片描述

    注意:我们可以看见对test_maxwell库的操作传输到了kafka中主题maxwell3的1号分区,而对test_maxwell2中表的操作传输到了2号分区,证实了我们定制的安装数据库进行分区的规则。

案例三:监控MySQL指定表的数据并输出到kafka

(1)运行zookeeper和kafka

[root@bigdata01 maxwell-1.29.2]# xcall.sh jps
	---------------------bigdata01----------------
	3648 Jps
	2947 QuorumPeerMain
	3331 Kafka
	---------------------bigdata02----------------
	2378 QuorumPeerMain
	2939 Jps
	2764 Kafka
	---------------------bigdata03----------------
	2965 Jps
	2760 Kafka
	2381 QuorumPeerMain

(2)通过配置config.properties,定制化启动Maxwell并指定监控test_maxwell下的test表,并将数据传给kafka的maxwell3主题

  • 配置config.properties文件中的filter参数,指定监控某个表
filter = exclude: *.*, include: test_maxwell.test

注意:还可以设置 include: test_maxwell.*,通过此种方式来监控 mysql 某个库的所有
表,也就是说过滤整个库。读者可以自行测试。

  • 启动Maxwell,在Maxwell的目录下
    bin/maxwell --config ./config.properties
    

(3)向test表中插入一条数据,看kafka tool的变化和kafka消费者端控制台的输出情况

mysql> INSERT INTO test VALUES('lili','famale',100);
  • 在kafka tool查看
    在这里插入图片描述

  • 在kafka消费者端控制台查看
    在这里插入图片描述

相关文章:

  • 【设计模式】-创建型模式-第2章第3讲-【建造者模式】
  • CS231n Module2: CNN part1:Architecture
  • 模电学习1. 三极管基础知识及常用电路
  • 优化APK体积
  • 【初学者入门C语言】之函数(八)
  • 《Linux基本常识的介绍》
  • 【云原生】Kubernetes介绍
  • C语言自定义类型【结构体】
  • springboot请求映射原理,springboot版本2.3.4.RELEASE
  • 【数值分析+python】python生成稀疏对称正定矩阵
  • jave web开发(IDEA中配置maven)
  • 保存滚动位置的实现方法
  • 什么是数据库事务
  • 异步FIFO的原理及verilog实现(循环队列、读写域数据同步、Gray Code、空满标志、读写域元素计数)
  • 大数据_YARN的工作原理
  • #Java异常处理
  • Apache Spark Streaming 使用实例
  • js ES6 求数组的交集,并集,还有差集
  • SpiderData 2019年2月25日 DApp数据排行榜
  • SpingCloudBus整合RabbitMQ
  • 使用common-codec进行md5加密
  • 验证码识别技术——15分钟带你突破各种复杂不定长验证码
  • 关于Android全面屏虚拟导航栏的适配总结
  • ​渐进式Web应用PWA的未来
  • # 学号 2017-2018-20172309 《程序设计与数据结构》实验三报告
  • #gStore-weekly | gStore最新版本1.0之三角形计数函数的使用
  • $.ajax()
  • (02)Hive SQL编译成MapReduce任务的过程
  • (42)STM32——LCD显示屏实验笔记
  • (java版)排序算法----【冒泡,选择,插入,希尔,快速排序,归并排序,基数排序】超详细~~
  • (非本人原创)我们工作到底是为了什么?​——HP大中华区总裁孙振耀退休感言(r4笔记第60天)...
  • (三)模仿学习-Action数据的模仿
  • (四)c52学习之旅-流水LED灯
  • **PHP分步表单提交思路(分页表单提交)
  • .net core开源商城系统源码,支持可视化布局小程序
  • .net 无限分类
  • .NET开发者必备的11款免费工具
  • [ solr入门 ] - 利用solrJ进行检索
  • [ vulhub漏洞复现篇 ] Hadoop-yarn-RPC 未授权访问漏洞复现
  • [ 隧道技术 ] cpolar 工具详解之将内网端口映射到公网
  • [].shift.call( arguments ) 和 [].slice.call( arguments )
  • [20161101]rman备份与数据文件变化7.txt
  • [AIGC codze] Kafka 的 rebalance 机制
  • [ERROR] ocp-server-ce-py_script_start_check-4.2.1 RuntimeError: ‘tenant_name‘
  • [JavaWeb学习] Spring Ioc和DI概念思想
  • [luogu P1527]矩阵乘法(矩形k小)
  • [MQ]常用的mq产品图形管理web界面或客户端
  • [python]python筛选excel表格信息并保存到另一个excel
  • [svc]ssh+gg二步认证
  • [word] word艺术字体如何设置? #知识分享#职场发展#媒体
  • [ZT]Dev-C++中编译C语言报错
  • [ZT]互联网网站应该如何存储密码?
  • [zz]Linux性能测试工具Lmbench介绍和使用说明
  • [初学C语言]个人易错总结
  • [计算机网络] 高手常用的几个抓包工具(上)