当前位置: 首页 > news >正文

Oracle-OracleConnector

提示:OracleConnector 类是 Debezium 中用于与 Oracle 数据库交互的一个连接器组件

文章目录

  • 前言
  • 一、核心功能
  • 二、代码分析
  • 总结

前言

提示:OracleConnector 类负责配置、启动、管理和验证与 Oracle 数据库的连接,并为后续的数据捕获任务准备必要的配置。


提示:以下是本篇文章正文内容

一、核心功能

核心功能详细说明

  1. 配置管理:

    • 接收配置: 在 start 方法中接收配置属性 (props) 并将其转换为不可变的映射 (properties)。
    • 配置验证: 通过 config 方法返回配置定义,用于定义连接器所需的配置项。
    • 连接验证: 在 validateConnection 方法中测试与 Oracle 数据库的连接是否有效。如果连接失败,会记录错误日志并添加错误消息到配置值中。
  2. 任务管理:

    • 任务类指定: 在 taskClass 方法中返回 OracleConnectorTask 类,这是执行具体复制任务的类。
    • 任务配置: 在 taskConfigs 方法中返回任务配置列表,通常只包含一个配置映射,因为该连接器默认只支持单个任务。
  3. 版本信息:

    • 版本报告: 在 version 方法中返回连接器的版本信息。
  4. 表筛选:

    • 获取匹配表: 在 getMatchingCollections 方法中获取与配置过滤器匹配的 Oracle 表列表。这涉及读取数据库中的表信息,并根据配置过滤器筛选出目标表。
  5. 生命周期管理:

    • 启动: 在 start 方法中初始化连接器。
    • 停止: stop 方法为空,表示没有额外的清理工作需要执行。

二、代码分析

package io.debezium.connector.oracle;import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.common.RelationalBaseSourceConnector;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.util.Strings;// 定义 OracleConnector 类,这是一个扩展了 RelationalBaseSourceConnector 的类,
// 用于从 Oracle 数据库捕获变更数据。
public class OracleConnector extends RelationalBaseSourceConnector {private static final Logger LOGGER = LoggerFactory.getLogger(OracleConnector.class);// 创建一个静态 Logger 实例,用于记录日志信息。private Map<String, String> properties;// 定义一个私有成员变量 properties,用于存储配置属性。@Overridepublic String version() {return Module.version();}// 实现 version() 方法,返回连接器的版本信息。@Overridepublic void start(Map<String, String> props) {this.properties = Collections.unmodifiableMap(new HashMap<>(props));}// 实现 start() 方法,接收配置属性并将其转换为不可变的映射。// 这个映射将在后续方法中使用。@Overridepublic Class<? extends Task> taskClass() {return OracleConnectorTask.class;}// 实现 taskClass() 方法,返回执行具体复制任务的类。@Overridepublic List<Map<String, String>> taskConfigs(int maxTasks) {if (maxTasks > 1) {throw new IllegalArgumentException("Only a single connector task may be started");}// 如果尝试启动的任务数大于 1,则抛出异常,因为该连接器只支持单个任务。return Collections.singletonList(properties);}// 实现 taskConfigs() 方法,返回任务配置列表。对于 OracleConnector,列表中只有一个配置映射。@Overridepublic void stop() {}// 实现 stop() 方法,用于执行清理工作。在这个实现中,没有额外的清理工作。@Overridepublic ConfigDef config() {return OracleConnectorConfig.configDef();}// 实现 config() 方法,返回配置定义,用于定义连接器所需的配置项。@Overrideprotected void validateConnection(Map<String, ConfigValue> configValues, Configuration config) {final ConfigValue databaseValue = configValues.get(RelationalDatabaseConnectorConfig.DATABASE_NAME.name());if (!databaseValue.errorMessages().isEmpty()) {return;}// 验证数据库名称配置项是否有效。final ConfigValue hostnameValue = configValues.get(RelationalDatabaseConnectorConfig.HOSTNAME.name());final ConfigValue userValue = configValues.get(RelationalDatabaseConnectorConfig.USER.name());// 获取主机名和用户名配置项。OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);try (OracleConnection connection = new OracleConnection(connectorConfig.getJdbcConfig())) {// 使用提供的配置创建 OracleConnection 实例,并尝试建立连接。LOGGER.debug("Successfully tested connection for {} with user '{}'", OracleConnection.connectionString(connectorConfig.getJdbcConfig()),connection.username());}catch (SQLException | RuntimeException e) {// 如果连接失败,记录错误日志并添加错误消息到配置值中。LOGGER.error("Failed testing connection for {} with user '{}'", config.withMaskedPasswords(), userValue, e);hostnameValue.addErrorMessage("Unable to connect: " + e.getMessage());}}// 实现 validateConnection() 方法,用于测试与 Oracle 数据库的连接是否有效。@Overrideprotected Map<String, ConfigValue> validateAllFields(Configuration config) {return config.validate(OracleConnectorConfig.ALL_FIELDS);}// 实现 validateAllFields() 方法,验证所有配置字段的有效性。@SuppressWarnings("unchecked")@Overridepublic List<TableId> getMatchingCollections(Configuration config) {final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);final String databaseName = connectorConfig.getCatalogName();try (OracleConnection connection = new OracleConnection(connectorConfig.getJdbcConfig(), false)) {if (!Strings.isNullOrBlank(connectorConfig.getPdbName())) {connection.setSessionToPdb(connectorConfig.getPdbName());}// @TODO: we need to expose a better method from the connector, particularly getAllTableIds// the following's performance is acceptable when using PDBs but not as ideal with non-PDBreturn connection.readTableNames(databaseName, null, null, new String[]{ "TABLE" }).stream().filter(tableId -> connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)).collect(Collectors.toList());}catch (SQLException e) {throw new DebeziumException(e);}}// 实现 getMatchingCollections() 方法,获取与配置过滤器匹配的 Oracle 表列表。// 这涉及读取数据库中的表信息,并根据配置过滤器筛选出目标表。}
  1. 封装:

    • OracleConnectorConfig: 封装了连接器的配置信息,使得配置管理更加清晰和易于维护。
    • OracleConnection: 封装了与 Oracle 数据库的连接逻辑,包括连接的建立、关闭以及执行特定的操作(如读取表名)。
  2. 继承与多态:

    • readTableNames 方法可能是一个多态方法,不同的数据库连接类可以有不同的实现方式。
  3. 依赖注入:

    • OracleConnectorConfigOracleConnection 的实例都是通过构造函数传递进来的,这符合依赖注入的原则,提高了代码的可测试性和灵活性。
  4. 异常处理:

    • 使用 try-with-resources 语句确保资源(如数据库连接)被正确关闭,即使发生异常也能释放资源。
    • 异常处理通过抛出 DebeziumException 来简化调用者的错误处理逻辑。
  5. 流式编程:

    • 使用 Java 8 的 Stream API 来处理表名的筛选过程,使得代码更加简洁和易于阅读。

启发

  1. 模块化设计:

    • 将复杂的逻辑分解成多个小的、独立的类,每个类负责一部分功能,这样可以提高代码的可读性和可维护性。
  2. 关注点分离:

    • 分离配置管理、连接管理和业务逻辑,每个部分都有明确的责任范围,这有助于减少耦合度和提高代码质量。
  3. 异常处理:

    • 明确异常处理策略,避免让异常“沉默”,确保程序的健壮性和稳定性。
  4. 资源管理:

    • 使用 try-with-resources 语句来自动管理资源,减少内存泄漏的风险。
  5. 代码复用:

    • 通过继承和多态等机制实现代码复用,减少重复代码,提高开发效率。 


总结

  1. private Map<String, String> properties:

    • 作用:存储配置属性的私有成员变量。
    • 说明:在 start 方法中被初始化为不可变的映射,用于后续方法中访问配置信息。

方法的含义

  1. public String version():

    • 作用:返回连接器的版本信息。
    • 说明:通过调用 Module.version() 返回连接器的具体版本号。
  2. public void start(Map<String, String> props):

    • 作用:初始化连接器,接收配置属性并将其转换为不可变的映射。
    • 说明:将传入的配置属性 props 转换为 HashMap,然后进一步转换为不可变映射 properties,供后续方法使用。
  3. public Class<? extends Task> taskClass():

    • 作用:返回执行具体复制任务的类。
    • 说明:返回 OracleConnectorTask.class,这是执行具体复制任务的类。
  4. public List<Map<String, String>> taskConfigs(int maxTasks):

    • 作用:返回任务配置列表。
    • 说明:对于 OracleConnector,列表中只有一个配置映射,即 properties。如果 maxTasks 大于 1,则抛出异常,因为该连接器只支持单个任务。
  5. public void stop():

    • 作用:执行清理工作。
    • 说明:在这个实现中,没有额外的清理工作。
  6. public ConfigDef config():

    • 作用:返回配置定义,用于定义连接器所需的配置项。
    • 说明:返回 OracleConnectorConfig.configDef(),定义了连接器所需的配置项。
  7. protected void validateConnection(Map<String, ConfigValue> configValues, Configuration config):

    • 作用:测试与 Oracle 数据库的连接是否有效。
    • 说明:使用提供的配置创建 OracleConnection 实例,并尝试建立连接。如果连接失败,记录错误日志并添加错误消息到配置值中。
  8. protected Map<String, ConfigValue> validateAllFields(Configuration config):

    • 作用:验证所有配置字段的有效性。
    • 说明:通过调用 config.validate(OracleConnectorConfig.ALL_FIELDS) 验证所有配置字段的有效性。
  9. public List<TableId> getMatchingCollections(Configuration config):

    • 作用:获取与配置过滤器匹配的 Oracle 表列表。
    • 说明:涉及读取数据库中的表信息,并根据配置过滤器筛选出目标表。使用 OracleConnection 类读取表名,并通过 Stream API 进行筛选。

OracleConnector 类是 Debezium 项目中的一个核心组件,用于与 Oracle 数据库交互。它通过封装配置管理、连接验证、任务管理等功能,实现了从 Oracle 数据库捕获变更数据的能力。每个成员变量和方法都有其特定的作用,共同构成了一个功能完整的 Oracle 数据库连接器。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Linux应用层开发(7):网络编程
  • html+css+js网页设计 找法网2个页面(带js)ui还原度百分之90
  • C语言实现UDP广播
  • 力扣227题基本计算器II(Python实现)
  • Kali Linux——网络安全的瑞士军刀
  • 登录页滑块验证图
  • Windows下编译安装PETSc
  • 简单介绍BTC的Layer2项目RGB
  • Java面试篇(JVM相关专题)
  • C#使用NPOI进行Excel和Word文件处理(二)
  • 房产中介小程序
  • C语言——结构体与共用体
  • Docker 网络代理配置及防火墙设置指南
  • 【云原生之kubernetes实战】在k8s环境下部署Note Mark笔记工具
  • 不仅能防沉迷游戏的防沉迷软件(Python)
  • 【刷算法】求1+2+3+...+n
  • Android系统模拟器绘制实现概述
  • axios 和 cookie 的那些事
  • Javascript 原型链
  • MobX
  • Rancher-k8s加速安装文档
  • react-native 安卓真机环境搭建
  • React系列之 Redux 架构模式
  • select2 取值 遍历 设置默认值
  • Spring Security中异常上抛机制及对于转型处理的一些感悟
  • SQLServer之创建显式事务
  • TypeScript迭代器
  • Vue UI框架库开发介绍
  • 纯 javascript 半自动式下滑一定高度,导航栏固定
  • 多线程 start 和 run 方法到底有什么区别?
  • 二维平面内的碰撞检测【一】
  • 服务器之间,相同帐号,实现免密钥登录
  • 官方新出的 Kotlin 扩展库 KTX,到底帮你干了什么?
  • 前端 CSS : 5# 纯 CSS 实现24小时超市
  • 深入浅出webpack学习(1)--核心概念
  • Play Store发现SimBad恶意软件,1.5亿Android用户成受害者 ...
  • ​ArcGIS Pro 如何批量删除字段
  • # 消息中间件 RocketMQ 高级功能和源码分析(七)
  • #QT(智能家居界面-界面切换)
  • #常见电池型号介绍 常见电池尺寸是多少【详解】
  • #微信小程序:微信小程序常见的配置传值
  • (11)MSP430F5529 定时器B
  • (2022 CVPR) Unbiased Teacher v2
  • (C++20) consteval立即函数
  • (Java)【深基9.例1】选举学生会
  • (Matalb回归预测)PSO-BP粒子群算法优化BP神经网络的多维回归预测
  • (Mirage系列之二)VMware Horizon Mirage的经典用户用例及真实案例分析
  • (react踩过的坑)antd 如何同时获取一个select 的value和 label值
  • (Repost) Getting Genode with TrustZone on the i.MX
  • (回溯) LeetCode 40. 组合总和II
  • (区间dp) (经典例题) 石子合并
  • (五)activiti-modeler 编辑器初步优化
  • (一)80c52学习之旅-起始篇
  • (一)插入排序
  • (转)IOS中获取各种文件的目录路径的方法