当前位置: 首页 > news >正文

Spring Boot+Debezium:解决 MySQL Binlog监听

在当今的企业应用中,对数据变更进行实时捕获和处理是至关重要的。而Debezium作为一个强大的开源平台,能够实时捕获数据库的变化事件,为实时数据流处理提供了可靠的解决方案。在本文中,我们将探讨如何将Debezium与SpringBoot项目无缝集成,以便对MySQL Binlog进行监听和处理。

为什么选择Debezium?

  • 实时性:Debezium能够实时监控数据库的变更,捕获到变更事件并立即进行处理,使得应用能够及时响应数据的变化。

  • 可靠性:Debezium基于可靠的CDC(Change Data
    Capture)技术,能够保证数据的准确性和一致性,确保捕获到的变更事件能够被正确地处理。

  • 易用性:Debezium提供了丰富的API和文档,使得集成到SpringBoot项目中变得简单和容易。

如何整合Debezium到Spring Boot项目中?

1. 添加依赖

首先,我们需要在pom.xml中添加必要的依赖。包括SpringBoot的基础依赖和Debezium的MySQL连接器依赖。

<dependencies><!-- Spring Boot Starter Dependencies --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Debezium Dependencies --><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>1.8.0.Final</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>1.8.0.Final</version></dependency>
</dependencies>

2. 配置Debezium

在SpringBoot项目的application.properties文件中,配置Debezium连接MySQL所需的参数。

# MySQL connection properties
debezium.mysql.hostname=localhost
debezium.mysql.port=3306
debezium.mysql.user=debezium
debezium.mysql.password=debezium
debezium.mysql.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.mysql.database.history.file.filename=/tmp/dbhistory.dat# Debezium connector properties
debezium.name=engine
debezium.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.database.hostname=${debezium.mysql.hostname}
debezium.database.port=${debezium.mysql.port}
debezium.database.user=${debezium.mysql.user}
debezium.database.password=${debezium.mysql.password}
debezium.database.server.id=85744
debezium.database.server.name=my-app-connector
debezium.database.include.list=mydatabase
debezium.table.include.list=mydatabase.mytable
debezium.database.history=${debezium.mysql.database.history}
debezium.database.history.file.filename=${debezium.mysql.database.history.file.filename}

3. 创建Debezium引擎配置类

创建一个Spring配置类,用于初始化和配置Debezium引擎。

package com.example.debezium.config;import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.engine.format.Json;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Properties;@Configuration
public class DebeziumConfig {@Value("${debezium.name}")private String name;@Value("${debezium.connector.class}")private String connectorClass;@Value("${debezium.database.hostname}")private String hostname;@Value("${debezium.database.port}")private String port;@Value("${debezium.database.user}")private String user;@Value("${debezium.database.password}")private String password;@Value("${debezium.database.server.id}")private String serverId;@Value("${debezium.database.server.name}")private String serverName;@Value("${debezium.database.include.list}")private String includeList;@Value("${debezium.table.include.list}")private String tableIncludeList;@Value("${debezium.database.history}")private String databaseHistory;@Value("${debezium.database.history.file.filename}")private String databaseHistoryFile;@Beanpublic Configuration debeziumConfiguration() {Properties props = new Properties();props.setProperty("name", name);props.setProperty("connector.class", connectorClass);props.setProperty("database.hostname", hostname);props.setProperty("database.port", port);props.setProperty("database.user", user);props.setProperty("database.password", password);props.setProperty("database.server.id", serverId);props.setProperty("database.server.name", serverName);props.setProperty("database.include.list", includeList);props.setProperty("table.include.list", tableIncludeList);props.setProperty("database.history", databaseHistory);props.setProperty("database.history.file.filename", databaseHistoryFile);return Configuration.from(props);}@Beanpublic EmbeddedEngine debeziumEngine(Configuration config) {return EmbeddedEngine.create().using(config).notifying(record -> {// Handle the change event hereSystem.out.println(record);}).build();}
}

4. 创建监听器

编写一个监听器类,用于处理捕获到的数据库变化事件。

package com.example.debezium.listener;import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.Json;
import org.springframework.stereotype.Component;@Component
public class DebeziumListener implements EmbeddedEngine.CompletionCallback {@Overridepublic void handle(RecordChangeEvent<SourceRecord> event, EmbeddedEngine.Context context) {// 解析并处理事件String value = event.record().value().toString();System.out.println("Received event: " + value);}@Overridepublic void handle(Throwable throwable, EmbeddedEngine.Context context) {// 错误处理throwable.printStackTrace();}
}

5. 启动Debezium引擎

在SpringBoot的主类中启动Debezium引擎。

package com.example.debezium;import io.debezium.embedded.EmbeddedEngine;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class DebeziumApplication implements CommandLineRunner {@Autowiredprivate EmbeddedEngine debeziumEngine;public static void main(String[] args) {SpringApplication.run(DebeziumApplication.class, args);}@Overridepublic void run(String... args) throws Exception {new Thread(debeziumEngine).start();}
}

启动SpringBoot应用程序,Debezium引擎将开始监听MySQL的Binlog。当指定数据库或表的数据发生变化时,监听器会接收到变化事件,并打印或处理这些事件。
结论

通过上述步骤,我们成功地在SpringBoot项目中整合了Debezium,实现了对MySQL Binlog的监听。这样,我们可以实时捕获和处理数据库的变化事件,为数据同步、监控和分析等场景提供了强大的支持。希望本文对你有所帮助,在实际开发中能够灵活运用这一技术。

相关文章:

  • 出书,是「盖你自己的房子」你知道吗?
  • 清华新突破||新研究揭示多智能体协作的秘密武器
  • springboot + Vue前后端项目(第十一记)
  • ArcGIS中离线发布路径分析服务,并实现小车根据路径进行运动
  • 【Spring Boot】在项目中使用Spring AI
  • Vue.js功能实现博客
  • Golang使用HTTP框架zdpgo_resty实现文件下载
  • [Linux打怪升级之路]-进程和线程
  • Web基础考点
  • vue中axios的使用
  • faster_whisper语音识别
  • jvm的类加载
  • 『USB3.0Cypress』FPGA开发(3)GPIF II短包零包时序分析
  • next.js 服务端组件 -客户端组件
  • 游戏子弹类python设计与实现详解
  • 【干货分享】SpringCloud微服务架构分布式组件如何共享session对象
  • android高仿小视频、应用锁、3种存储库、QQ小红点动画、仿支付宝图表等源码...
  • Asm.js的简单介绍
  • es6
  • input实现文字超出省略号功能
  • Less 日常用法
  • node学习系列之简单文件上传
  • Spring技术内幕笔记(2):Spring MVC 与 Web
  • WebSocket使用
  • 百度贴吧爬虫node+vue baidu_tieba_crawler
  • 技术:超级实用的电脑小技巧
  • 前端每日实战:70# 视频演示如何用纯 CSS 创作一只徘徊的果冻怪兽
  • 世界上最简单的无等待算法(getAndIncrement)
  • 试着探索高并发下的系统架构面貌
  • 通信类
  • 小试R空间处理新库sf
  • 优化 Vue 项目编译文件大小
  • 在weex里面使用chart图表
  • 曜石科技宣布获得千万级天使轮投资,全方面布局电竞产业链 ...
  • ​Java基础复习笔记 第16章:网络编程
  • ​ubuntu下安装kvm虚拟机
  • ​虚拟化系列介绍(十)
  • #FPGA(基础知识)
  • #WEB前端(HTML属性)
  • #如何使用 Qt 5.6 在 Android 上启用 NFC
  • #设计模式#4.6 Flyweight(享元) 对象结构型模式
  • (23)mysql中mysqldump备份数据库
  • (4)STL算法之比较
  • (arch)linux 转换文件编码格式
  • (ISPRS,2021)具有遥感知识图谱的鲁棒深度对齐网络用于零样本和广义零样本遥感图像场景分类
  • (Java实习生)每日10道面试题打卡——JavaWeb篇
  • (MATLAB)第五章-矩阵运算
  • (编程语言界的丐帮 C#).NET MD5 HASH 哈希 加密 与JAVA 互通
  • (九)信息融合方式简介
  • (四)Android布局类型(线性布局LinearLayout)
  • (转)如何上传第三方jar包至Maven私服让maven项目可以使用第三方jar包
  • (转贴)用VML开发工作流设计器 UCML.NET工作流管理系统
  • * 论文笔记 【Wide Deep Learning for Recommender Systems】
  • .NET 6 Mysql Canal (CDC 增量同步,捕获变更数据) 案例版
  • .NET CORE 2.0发布后没有 VIEWS视图页面文件