当前位置: 首页 > news >正文

【自定义Source、Sink】Flink自定义Source、Sink对ClickHouse进行读和批量写操作

ClickHouse官网文档

Flink 读取 ClickHouse 数据两种驱动

  1. ClickHouse 官方提供Clickhouse JDBC.【建议使用
  2. 第3方提供的Clickhouse JDBC. ru.yandex.clickhouse.ClickHouseDriver

ru.yandex.clickhouse.ClickHouseDriver.现在是没有维护

ClickHouse 官方提供Clickhouse JDBC的包名:com.clickhouse.jdbc.*

有些版本com.clickhouse.jdbc.* 包含了 ru.yandex.clickhouse.ClickHouseDriver.

因此加载包的时候一定要注意导入的包名

引入依赖

        <!-- clickhouse jdbc driver --><dependency><groupId>com.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId></dependency>

使用的是 0.3 这个版本,该版本就包含上述3方CH jdbc包

     <!-- CH JDBC版本推荐使用 0.3, 0.4的版本是要 JDK 17 --><clickhouse-jdbc.version>0.3.2-patch11</clickhouse-jdbc.version>

自定义Source

测试表映射实体类,该表仅有一个name字段

@Data
@NoArgsConstructor
@AllArgsConstructor
public class CHTestPO {private String name;}

Flink Clickhouse Source

public class ClickHouseSource implements SourceFunction<CHTestPO> {private final String URL;private final String SQL;public ClickHouseSource(String URL, String SQL) {this.URL = URL;this.SQL = SQL;}@Overridepublic void run(SourceContext<CHTestPO> output) throws Exception {//  Properties是持久化的属性集 Properties的key和value都是字符串Properties properties = new Properties();ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(URL, properties);// 使用 try-with-resource 方式关闭JDBC连接 无需手动关闭try (ClickHouseConnection conn = clickHouseDataSource.getConnection()) {// clickhouse 通过游标的方式读取数据Statement stmt = conn.createStatement();ResultSet rs = stmt.executeQuery(SQL);while (rs.next()) {String name = rs.getString(1);output.collect(new CHTestPO(name));}}}@Overridepublic void cancel() {}
}

自定义Sink

需额外引入依赖

        <!-- Flink-Connector-Jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId></dependency>

Java 对sql语句处理的两个对象

  1. PreparedStatement对象:能够对预编译之后的sql语句进行处理【SQL 语句预编译:通过占位符'?'实现,可以防止sql注入】
  2. Statement对象:只能对静态的sql语句进行处理

核心代码

/*** 使用 Flink-jdbc-connector + 批量写入 + sql语句的预编译 写入 Clickhouse*/
public class ClickHouseJdbcSink<T> {private final SinkFunction<T> sink;private final static String NA = "null";public ClickHouseJdbcSink(String sql, int batchSize, String url) {sink = JdbcSink.sink(sql,// 对sql语句进行预编译new ClickHouseJdbcStatementBuilder<T>(),// 设置批量插入数据new JdbcExecutionOptions.Builder().withBatchSize(batchSize).build(),// 设置ClickHouse连接配置new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(url).build());}public SinkFunction<T> getSink() {return this.sink;}/*** 对预编译之后的sql语句进行占位符替换** @param ps:     PreparedStatement对象 下标从 1 开始* @param fields: clickhouse表PO对象的属性字段* @param object: clickhouse表PO对象的属性字段所对应的数据类型*/public static void setPreparedStatement(PreparedStatement ps,Field[] fields,Object object) throws IllegalAccessException, SQLException {// 遍历 Field[]for (int i = 1; i <= fields.length; i++) {// 取出每个Field实例Field field = fields[i - 1];// 指示反射的对象在使用时应该取消 Java 语言访问检查field.setAccessible(true);// 通过Field实例的get方法返回指定的对象Object o = field.get(object);if (o == null) {ps.setNull(i, 0);continue;}// 这里统一设为字符型String fieldValue = o.toString();// 变量和常量的比较,通常将常量放前,可以避免空指针if (!NA.equals(fieldValue) && !"".equals(fieldValue)) {// 替换对应位置的占位符ps.setObject(i, fieldValue);} else {ps.setNull(i, 0);}}}}

对sql语句进行预编译

@Slf4j
public class ClickHouseJdbcStatementBuilder<T> implements JdbcStatementBuilder<T> {@Overridepublic void accept(PreparedStatement preparedStatement, T t) throws SQLException {/* *********************** Java通过反射获取类的字段:** 1. getDeclaredFields():获取所有的字段,不会获取父类的字段* 2. getFields(): 只能会public字段,获取包含父类的字段** *********************/Field[] fields = t.getClass().getDeclaredFields();// 将获取到的字段替换sql预编译之后的占位符。try {ClickHouseJdbcSink.setPreparedStatement(preparedStatement, fields, t);} catch (IllegalAccessException e) {log.error("sql 预编译失败", e);e.printStackTrace();}}
}

ClickHouse读写工具类

image-20231209233006017

public class ClickHouseUtil {private static final String URL;static {ParameterTool parameterTool = ParameterUtil.getParameters();URL = parameterTool.get("clickhouse.url");}/*** 读取clickhouse*/public static DataStream<CHTestPO> read(StreamExecutionEnvironment env, String sql) {return env.addSource(new ClickHouseSource(URL, sql));}/*** 批量写入ClickHouse*/public static <T> DataStreamSink<T> batchWrite(DataStream<T> dataStream,String sql,int batchSize) {//生成 SinkFunctionClickHouseJdbcSink<T> clickHouseJdbcSink =new ClickHouseJdbcSink<T>(sql, batchSize, URL);return dataStream.addSink(clickHouseJdbcSink.getSink());}}

测试一下

public class ClickHouseUtilTest {@DisplayName("测试Flink+jdbc+游标读取Clickhouse")@Testvoid testRead() throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);// 从default数据库的user表中读取数据String sql = "select * from default.user";DataStream<CHTestPO> ds = ClickHouseUtil.read(env, sql);// 打印数据流中的元素ds.print("clickhouse");// 执行程序env.execute();}@DisplayName("测试Flink-Connector-jdbc+预编译批量写入Clickhouse")@Testvoid testBatchWrite() throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);CHTestPO po = new CHTestPO();po.setName("Lucy");CHTestPO po1 = new CHTestPO();po1.setName("Jack");DataStream<CHTestPO> ds = env.fromCollection(Arrays.asList(po, po1));// 定义将数据写入ClickHouse数据库的SQL语句String sql = "insert into default.user(name) values(?)";// 调用ClickHouseUtil的batchWrite方法将数据流ds中的数据批量写入ClickHouse数据库ClickHouseUtil.batchWrite(ds, sql, 2);// 执行程序env.execute();}
}

此时表中仅一行记录

image-20231209232619959

读取没有问题!

image-20231209232741522

写入没有问题!

image-20231209232902469

相关文章:

  • 【模块化】 js 模块化(CommonJS, AMD, UMD, CMD, ES6)
  • linux系统命令
  • 基于OHTPPS实现网站HTTPS访问
  • 使用国内镜像源安装opencv
  • 计算机组成原理-选择语句和循环语句的汇编表示
  • 【数据结构】第二章——线性表(1)
  • linux(centos7)离线安装mysql-5.7.35-1.el7.x86_64.rpm-bundle.tar
  • 一文速览字节最新分布式操作系统KubeWharf
  • vue+react题集整理
  • 设计模式之结构型设计模式(二):工厂模式 抽象工厂模式 建造者模式
  • Oracle 数据库 control file的备份
  • TensortRT:sample.py:DeprecationWarning:
  • Linux shell编程学习笔记35:seq
  • 33.搜索旋转排序数组
  • Certbot实现 HTTPS 免费证书(Let‘s Encrypt)自动续期
  • __proto__ 和 prototype的关系
  • CSS选择器——伪元素选择器之处理父元素高度及外边距溢出
  • ES6语法详解(一)
  • Java IO学习笔记一
  • MySQL Access denied for user 'root'@'localhost' 解决方法
  • Protobuf3语言指南
  • PV统计优化设计
  • Python socket服务器端、客户端传送信息
  • Redis 中的布隆过滤器
  • 程序员最讨厌的9句话,你可有补充?
  • 解析 Webpack中import、require、按需加载的执行过程
  • 前端临床手札——文件上传
  • 前嗅ForeSpider中数据浏览界面介绍
  • 什么软件可以剪辑音乐?
  • 适配iPhoneX、iPhoneXs、iPhoneXs Max、iPhoneXr 屏幕尺寸及安全区域
  • 微服务核心架构梳理
  • 学习HTTP相关知识笔记
  • HanLP分词命名实体提取详解
  • hi-nginx-1.3.4编译安装
  • JavaScript 新语法详解:Class 的私有属性与私有方法 ...
  • Redis4.x新特性 -- 萌萌的MEMORY DOCTOR
  • 东超科技获得千万级Pre-A轮融资,投资方为中科创星 ...
  • ​​​​​​​GitLab 之 GitLab-Runner 安装,配置与问题汇总
  • #mysql 8.0 踩坑日记
  • (02)Cartographer源码无死角解析-(03) 新数据运行与地图保存、加载地图启动仅定位模式
  • (2020)Java后端开发----(面试题和笔试题)
  • (3)Dubbo启动时qos-server can not bind localhost22222错误解决
  • (pojstep1.3.1)1017(构造法模拟)
  • (笔记)Kotlin——Android封装ViewBinding之二 优化
  • (附源码)springboot“微印象”在线打印预约系统 毕业设计 061642
  • (十六)串口UART
  • 、写入Shellcode到注册表上线
  • .bat批处理(一):@echo off
  • .net framework4与其client profile版本的区别
  • .NET Micro Framework初体验(二)
  • .net 发送邮件
  • .net 设置默认首页
  • .NET 应用架构指导 V2 学习笔记(一) 软件架构的关键原则
  • .NET命令行(CLI)常用命令
  • .Net下C#针对Excel开发控件汇总(ClosedXML,EPPlus,NPOI)