当前位置: 首页 > news >正文

五.海量数据实时分析-FlinkCDC+DorisConnector实现数据的全量增量同步

前言

前面四篇文字都在学习Doris的理论知识,也是比较枯燥,当然Doris的理论知识还很多,我们后面慢慢学,本篇文章我们尝试使用SpringBoot来整合Doris完成基本的CRUD。

由于 Doris 高度兼容 Mysql 协议,两者在 SQL 语法方面有着比较强的一致性,另外 Mysql 客户端也是 Doris 官方选择的客户端。因此,如需对 Mysql 进行数据分析,使用 Doris 的迁移成本较低。但是对于数据规模特别大的情况下Mysql的方式还是不太建议的,所以这里还会介绍一种 方式就是通过CDC+DorisConnector

一.整合Mybatis操作Doris

1.准备数据库

CREATE TABLE `doris_test` (`id` int NULL COMMENT "id",`price` decimal(10,2) NULL COMMENT "价格",`title` varchar(20)NULL COMMENT "标题") ENGINE=OLAPDUPLICATE KEY(`id`)DISTRIBUTED BY HASH(`id`) BUCKETS 1PROPERTIES ("replication_num" = "1");
Query OK, 0 rows affected (0.06 sec)

2.搭建SpringBoot项目

第一步:创建SpringBoot项目,导入依赖,主要是Mybatis相关的依赖

<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.1</version><relativePath/></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--如果要用传统的xml或properties配置,则需要添加此依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId></dependency><!--mybatisplus持久层依赖--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.3.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!-- fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.70</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.23</version></dependency></dependencies>

3.整合Mybatis

创建好启动类,实体类,服务层,持久层,SQL映射文件等,这里和我们操作Mysql没有任何区别.

启动类如下,通过 @MapperScan 来扫描Mapper接口

@SpringBootApplication
@MapperScan(basePackages = "cn.whale.mapper")
public class DorisApplication {public static void main(String[] args) {SpringApplication.run(DorisApplication.class,args);}
}

服务层和持久层代码如下 , 省略了部分代码

@Service
public class DorisServiceImpl implements DorisService {@AutowiredDorisMapper dorisMapper;@Overridepublic List<Order> listDoris() {return dorisMapper.listDoris();}@Overridepublic int add(Order order) {return dorisMapper.add(order);}
}public interface DorisMapper {List<Order> listDoris();int add(Order order);}

下面是sql映射文件,和操作数据库没有任何区别

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.whale.mapper.DorisMapper"><select id="listDoris" resultType="cn.whale.domain.Order">select id,price,title from orders</select><insert id="add" parameterType="cn.whale.domain.Order">INSERT INTO orders(id,price,title) VALUES(#{id},#{price},#{title})</insert>
</mapper>

4.编写配置文件

server:port: 8080
spring:#Doris数据库连接配置datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://192.168.220.253:9030/app_db?characterEncoding=utf-8&useSSL=falseusername: rootpassword: 123456type: com.alibaba.druid.pool.DruidDataSourceinitial-size: 500min-idle: 500max-active: 500#mybatis的相关配置
mybatis:mapper-locations: classpath:mapper/*.xmltype-aliases-package: com.wudl.doris.domain.DorisTest

5.编写测试类

注入Service,完成查询和添加的测试

package cn.whale.service;import cn.whale.DorisApplication;
import cn.whale.domain.Order;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.math.BigDecimal;
import java.util.List;@RunWith(SpringRunner.class)
@SpringBootTest(classes= DorisApplication.class)
public class DorisServiceTest {@Autowiredprivate DorisService dorisService;@Testpublic void listDoris() {List<Order> orders = dorisService.listDoris();orders.stream().forEach(System.out::println);}@Testpublic void addDoris() throws InterruptedException {for (int i = 100 ; i>0 ; i--){Order order = new Order();order.setId(Long.valueOf(i));order.setPrice(new BigDecimal(i));order.setTitle(i+"元");dorisService.add(order);}Thread.sleep(100000);}
}

二.SpringBoot整合FlinkCDC+doris-connector实时同步

上面通过Mysql客户端的方式来进行数据的同步方式虽然使用起来比较简单,但是不适合大量数据的实时同步,性能不是很好。Doris官方提供了doris-connector进行Doris读写,那么我们可以通过FlinkCDC监听Mysql数据变更,然后同步到SpringBoot项目中,对数据进行处理后,然后通过doris-connector同步到Doris。虽然我们之前也介绍过直接通过FlinkCDC同步Mysql到Doris,那种方式我们没办法对数据进行处理。

1.搭建项目导入依赖

版本兼容
在这里插入图片描述

  • flink-connector-mysql-cdc :flinkCDC,实时监听Mysql增量或者全量同步
  • flink-doris-connector :用来读写Doris的驱动
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.13.6</flink.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.13</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><!--mysql -cdc--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.18</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.6</version><scope>provided</scope></dependency><!-- flink doris connector --><dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.13_2.12</artifactId><version>1.0.3</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.13.6</version><scope>provided</scope></dependency></dependencies>

2.定义消息序列化器

定义序列化器,当拿到Mysql的数据后我们可以通过虚拟化器对数据进行格式化


/*** @desc mysql消息读取自定义序列化**/
public class MysqlDeserialization implements DebeziumDeserializationSchema<String> {public static final String TS_MS = "ts_ms";public static final String BIN_FILE = "file";public static final String POS = "pos";public static final String CREATE = "CREATE";public static final String BEFORE = "before";public static final String AFTER = "after";public static final String SOURCE = "source";public static final String UPDATE = "UPDATE";/**** 反序列化数据,转为变更JSON对象*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String> collector) {//拿到数据Struct struct = (Struct) sourceRecord.value();//输出数据collector.collect(getJsonObject(struct, AFTER).toJSONString());}/**** 从原数据获取出变更之前或之后的数据*/private JSONObject getJsonObject(Struct value, String fieldElement) {Struct element = value.getStruct(fieldElement);JSONObject jsonObject = new JSONObject();if (element != null) {Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for (Field field : fieldList) {Object afterValue = element.get(field);jsonObject.put(field.name(), afterValue);}}return jsonObject;}@Overridepublic TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);}
}

3.Mysql监听器

监听器的作用主要是监听Mysql的数据变更后,通过序列化器把数据进行处理后,然后同步到Doris中


/*** @desc mysql变更监听**/
@Component
public class MysqlEventListener implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//设置Mysql数据源DebeziumSourceFunction<String> dataChangeInfoMySqlSource = buildDataChangeSource();DataStream<String> streamSource = env.addSource(dataChangeInfoMySqlSource, "mysql-source").setParallelism(1);//streamSource.addSink(dataChangeSink);Properties pro = new Properties();pro.setProperty("format", "json");pro.setProperty("strip_outer_array", "true");//配置Doris信息streamSource.addSink(DorisSink.sink(DorisExecutionOptions.builder().setBatchSize(3).setBatchIntervalMs(10L).setMaxRetries(3).setStreamLoadProp(pro).setEnableDelete(true).build(),DorisOptions.builder().setFenodes("192.168.220.253:8030").setTableIdentifier("app_db.orders").setUsername("root").setPassword("123456").build()));env.execute("mysql-stream-cdc");}/*** 构造变更数据源*/private DebeziumSourceFunction<String> buildDataChangeSource() {return MySqlSource.<String>builder().hostname("192.168.220.253").port(3307).databaseList("app_db").tableList("app_db.orders").username("root").password("123456")/**initial初始化快照,即全量导入后增量导入(检测更新数据写入)* latest:只进行增量导入(不读取历史变化)* timestamp:指定时间戳进行数据导入(大于等于指定时间读取数据)*/.startupOptions(StartupOptions.latest())//序列化.deserializer(new MysqlDeserialization()).serverTimeZone("GMT+8").build();}
}

4.最后编写一个启动类

@SpringBootApplication
public class FlinkCdcApplication {public static void main(String[] args) {SpringApplication.run(FlinkCdcApplication.class, args);}}

5.准备好数据库

我的Mysql使用的是 :app_db.orders 对应的是Doris中的 app_db.orders ,字段类型,字段名需要一致哦。然后启动项目,修改Mysql的数据,Doris会自动同步过去

三.其他同步方式

Doris还支持其他的数据读写方式,具体的可以根据官网案例去尝试地址:https://doris.apache.org/zh-CN/docs/1.2/ecosystem/spark-doris-connector

相关文章:

  • 成功使用DDNS动态域名访问我的群晖NAS(TP-link路由器)
  • 【鸿蒙学习】深入了解UIAbility组件
  • 关系型数据库和非关系型数据库的区别
  • JavaScript 反射(Reflect)和代理(Proxy)简单介绍
  • OLED移植
  • expressjs 中的mysql.createConnection,execute 怎么使用
  • MacOS配置python环境
  • Linux安装RabbitMQ安装
  • Ubuntu如何如何安装tcpdump
  • 微信小程序操作蓝牙
  • vue3 环境配置vue-i8n国际化
  • 当人工智能拥抱餐饮业,传统与创新的交融
  • 每天五分钟深度学习pytorch:基于pytorch搭建一元线性回归模型
  • 【EXCEL数据处理】000010 案列 EXCEL文本型和常规型转换。使用的软件是微软的Excel操作的。处理数据的目的是让数据更直观的显示出来,方便查看。
  • 【AIGC】ChatGPT提示词解析:如何打造个人IP、CSDN爆款技术文案与高效教案设计
  • 10个最佳ES6特性 ES7与ES8的特性
  • Docker 1.12实践:Docker Service、Stack与分布式应用捆绑包
  • ES6 学习笔记(一)let,const和解构赋值
  • java正则表式的使用
  • MySQL QA
  • Promise初体验
  • spring学习第二天
  • SQLServer之创建显式事务
  • text-decoration与color属性
  • ⭐ Unity + OpenCV 实现实时图像识别与叠加效果
  • Vue 2.3、2.4 知识点小结
  • 警报:线上事故之CountDownLatch的威力
  • 看完九篇字体系列的文章,你还觉得我是在说字体?
  • 使用Maven插件构建SpringBoot项目,生成Docker镜像push到DockerHub上
  • 我的zsh配置, 2019最新方案
  • 写给高年级小学生看的《Bash 指南》
  • 在GitHub多个账号上使用不同的SSH的配置方法
  • 通过调用文摘列表API获取文摘
  • #if 1...#endif
  • #window11设置系统变量#
  • #我与Java虚拟机的故事#连载13:有这本书就够了
  • (1)Map集合 (2)异常机制 (3)File类 (4)I/O流
  • (k8s中)docker netty OOM问题记录
  • (笔记)第三期书生·浦语大模型实战营(十一卷王场)--书生入门岛通关第1关Linux 基础知识
  • (超简单)使用vuepress搭建自己的博客并部署到github pages上
  • (二)Optional
  • (附源码)springboot高校宿舍交电费系统 毕业设计031552
  • (附源码)springboot太原学院贫困生申请管理系统 毕业设计 101517
  • (附源码)ssm跨平台教学系统 毕业设计 280843
  • (附源码)计算机毕业设计SSM智慧停车系统
  • (六)Hibernate的二级缓存
  • (十五)devops持续集成开发——jenkins流水线构建策略配置及触发器的使用
  • (一)utf8mb4_general_ci 和 utf8mb4_unicode_ci 适用排序和比较规则场景
  • (转)Sql Server 保留几位小数的两种做法
  • (转)全文检索技术学习(三)——Lucene支持中文分词
  • **CI中自动类加载的用法总结
  • ./configure,make,make install的作用(转)
  • .bat批处理(十):从路径字符串中截取盘符、文件名、后缀名等信息
  • .net core控制台应用程序初识
  • .net framework 4.8 开发windows系统服务