当前位置: 首页 > news >正文

JAVA通过debezium实时采集mysql数据

前期准备

需要提前安装mysql并且开启binlog,需要准备kafka和zookeeper环境

示例采用debezium1.9.0版本

Maven配置

<version.debezium>1.9.0.Final</version.debezium>

<dependency>

    <groupId>io.debezium</groupId>

    <artifactId>debezium-api</artifactId>

    <version>${version.debezium}</version>

</dependency>

<dependency>

    <groupId>io.debezium</groupId>

    <artifactId>debezium-embedded</artifactId>

    <version>${version.debezium}</version>

</dependency>

<dependency>

    <groupId>io.debezium</groupId>

    <artifactId>debezium-connector-mysql</artifactId>

    <version>${version.debezium}</version>

</dependency>

Java代码

import io.debezium.engine.ChangeEvent;

import io.debezium.engine.DebeziumEngine;

import io.debezium.engine.format.Json;

import java.io.IOException;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

public class DebeziumTest {

    private static DebeziumEngine<ChangeEvent<String, String>> engine;

    public static void main(String[] args) throws Exception {

        final Properties props = new Properties();

        props.setProperty("name", "dbz-engine");

        props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");

        //offset config begin - 使用文件来存储已处理的binlog偏移量

        props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");

//        props.setProperty("offset.storage", "org.apache.kafka.connect.storage.KafkaOffsetBackingStore");

        props.setProperty("offset.storage.file.filename", "D:/tmp/dbz/storage/mysql_offsets.dat");

        props.setProperty("offset.flush.interval.ms", "0");

        //offset config end

        props.setProperty("database.server.name", "mysql-connector");

        props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");

        props.setProperty("database.history.file.filename", "D:/tmp/dbz/storage/mysql_dbhistory.txt");

        props.setProperty("database.server.id", "122110"); //随机设置

        props.setProperty("database.hostname", "localhost");

        props.setProperty("database.port", "3306");

        props.setProperty("database.user", "root");

        props.setProperty("database.password", "123456");

        props.setProperty("database.include.list", "test");//要捕获的数据库名

        props.setProperty("topic.prefix", "mysql-");

//        props.setProperty("table.include.list", "inventory.a");//要捕获的数据表

        props.setProperty("snapshot.mode", "initial");//全量+增量

        props.setProperty("bootstrap.servers", "localhost:9092");

        props.setProperty("topic", "test");

//        props.setProperty("offset.storage.topic", "log-t");

//        props.setProperty("offset.storage.partitions", "1");

//        props.setProperty("offset.storage.replication.factor", "1");

//        KafkaProducerTest test = new KafkaProducerTest("test1841");

        // 使用上述配置创建Debezium引擎,输出样式为Json字符串格式

        engine = DebeziumEngine.create(Json.class)

                .using(props)

                .notifying(record -> {

                    //test.sendMsg(record.value());

                    System.out.println(record.value());//输出到控制台

                })

                .using((success, message, error) -> {

                    if (error != null) {

                        error.printStackTrace();

                        // 报错回调

                        System.out.println("------------error, message:" + message);

                        System.out.println( "exception:" + error);

                    }

                    closeEngine(engine);

                })

                .build();

        ExecutorService executor = Executors.newSingleThreadExecutor();

        executor.execute(engine);

        addShutdownHook(engine);

        awaitTermination(executor);

        System.out.println("------------main finished.");

    }

    private static void closeEngine(DebeziumEngine<ChangeEvent<String, String>> engine) {

        try {

            engine.close();

        } catch (IOException ignored) {

        }

    }

    private static void addShutdownHook(DebeziumEngine<ChangeEvent<String, String>> engine) {

        Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine)));

    }

    private static void awaitTermination(ExecutorService executor) {

        if (executor != null) {

            try {

                executor.shutdown();

                while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {

                }

            } catch (InterruptedException e) {

                Thread.currentThread().interrupt();

            }

        }

    }

}

运行效果

随意修改一条mysql表中的数据

修改后

代码日志

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int32","optional":true,"field":"age"},{"type":"string","optional":true,"field":"phone"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"create_time"}],"optional":true,"name":"mysql_connector.di_test.t_user.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int32","optional":true,"field":"age"},{"type":"string","optional":true,"field":"phone"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"create_time"}],"optional":true,"name":"mysql_connector.di_test.t_user.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.di_test.t_user.Envelope"},"payload":{"before":{"id":2,"name":"lisi2","age":19,"phone":"18444444","address":"北京海淀1","create_time":"2023-12-28T06:47:07Z"},"after":{"id":2,"name":"lisi2","age":19,"phone":"18444444","address":"北京海淀","create_time":"2023-12-28T06:47:07Z"},"source":{"version":"1.9.0.Final","connector":"mysql","name":"mysql-connector","ts_ms":1722423711000,"snapshot":"false","db":"di_test","sequence":null,"table":"t_user","server_id":1,"gtid":null,"file":"binlog.000008","pos":3134,"row":0,"thread":17,"query":null},"op":"u","ts_ms":1722423711663,"transaction":null}}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Unity UnityWebRequest封装类
  • Java学习Day20:基础篇10
  • 二进制与进制转换与原码、反码、补码详解--内含许多超详细图片讲解!!!
  • React(四):DOCX文件在线预览
  • 2024杭电多校(5) 1008. 猫咪们狂欢【带权最大独立集】
  • 宅家也能高效办公?试试这四款款远程控制神器!
  • 【2024年华数杯全国大学生数学建模竞赛】C题:老外游中国 问题思路分析及Python代码实现
  • C语言初阶(12)
  • 周鸿祎回应将成三六零第一大股东:会和公司一起走下去
  • 学习硬件测试04:触摸按键+PWM 驱动蜂鸣器+数码管(P62~P67、P71、P72)
  • mysql介绍
  • 1、.Net UI框架:WPF - .Net宣传系列文章
  • 反转链表(LeetCode)
  • 重燃代码之光:在PyCharm中恢复自动高亮的秘籍
  • Linux系统中的高级内核模块调试技术
  • 收藏网友的 源程序下载网
  • android图片蒙层
  • C++入门教程(10):for 语句
  • Centos6.8 使用rpm安装mysql5.7
  • IIS 10 PHP CGI 设置 PHP_INI_SCAN_DIR
  • Phpstorm怎样批量删除空行?
  • Python中eval与exec的使用及区别
  • SegmentFault 技术周刊 Vol.27 - Git 学习宝典:程序员走江湖必备
  • tensorflow学习笔记3——MNIST应用篇
  • 不发不行!Netty集成文字图片聊天室外加TCP/IP软硬件通信
  • 纯 javascript 半自动式下滑一定高度,导航栏固定
  • ------- 计算机网络基础
  • 如何打造100亿SDK累计覆盖量的大数据系统
  • 使用Gradle第一次构建Java程序
  • 如何正确理解,内页权重高于首页?
  • 昨天1024程序员节,我故意写了个死循环~
  • ​LeetCode解法汇总2670. 找出不同元素数目差数组
  • !! 2.对十份论文和报告中的关于OpenCV和Android NDK开发的总结
  • #HarmonyOS:软件安装window和mac预览Hello World
  • #includecmath
  • #设计模式#4.6 Flyweight(享元) 对象结构型模式
  • #职场发展#其他
  • (~_~)
  • (13)[Xamarin.Android] 不同分辨率下的图片使用概论
  • (delphi11最新学习资料) Object Pascal 学习笔记---第8章第5节(封闭类和Final方法)
  • (iPhone/iPad开发)在UIWebView中自定义菜单栏
  • (LNMP) How To Install Linux, nginx, MySQL, PHP
  • (二十六)Java 数据结构
  • (附源码)spring boot火车票售卖系统 毕业设计 211004
  • (附源码)springboot助农电商系统 毕业设计 081919
  • (九)One-Wire总线-DS18B20
  • (六)vue-router+UI组件库
  • (七)MySQL是如何将LRU链表的使用性能优化到极致的?
  • (三)Pytorch快速搭建卷积神经网络模型实现手写数字识别(代码+详细注解)
  • (学习总结16)C++模版2
  • (原創) 如何安裝Linux版本的Quartus II? (SOC) (Quartus II) (Linux) (RedHat) (VirtualBox)
  • (转)Java socket中关闭IO流后,发生什么事?(以关闭输出流为例) .
  • *算法训练(leetcode)第四十五天 | 101. 孤岛的总面积、102. 沉没孤岛、103. 水流问题、104. 建造最大岛屿
  • .net core + vue 搭建前后端分离的框架
  • .NET MVC第五章、模型绑定获取表单数据