当前位置: 首页 > news >正文

Apache Doris 系列: 基础篇-Routine Load

简介

Routine Load 支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中。
目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入 CSV 或 Json 格式的数据。

接下来通过一个案例介绍 Routine Load 的使用。
案例内容:

  1. 部署单节点Kafka
  2. 准备测试数据并导入kafka
  3. 导入数据到 Doris

部署单节点Kafka

下载

https://archive.apache.org/dist/kafka/2.4.1/kafka_2.11-2.4.1.tgz

部署

  1. 解压
tar xvf  kafka_2.11-2.4.1.tgz
# 创建软连接
ln -s kafka_2.11-2.4.1 kafka
cd kafka

2)启动zookeeper和kafka

# 启动 zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties  

# 启动 kafka server
bin/kafka-server-start.sh -daemon config/server.properties
  1. 查看日志
tail -f logs/server.log

出现一下日志说明Kafka server已经启动成功了。

[2022-09-13 06:38:03,596] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

准备测试数据并导入kafka

{"order_date":"2022-09-07","order_id":"10001","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:00","update_time":"2022-09-07 09:00:00"}
{"order_date":"2022-09-07","order_id":"10002","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:01","update_time":"2022-09-07 09:00:01"}
{"order_date":"2022-09-07","order_id":"10003","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:02","update_time":"2022-09-07 09:00:02"}
{"order_date":"2022-09-07","order_id":"10004","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:03","update_time":"2022-09-07 09:00:03"}
{"order_date":"2022-09-07","order_id":"10005","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:04","update_time":"2022-09-07 09:00:04"}
{"order_date":"2022-09-07","order_id":"10006","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:05","update_time":"2022-09-07 09:00:05"}

创建kafka topic

创建名叫 order_info 的kafka topic, 3个分区,1个副本

 bin/kafka-topics.sh --create --topic order_info --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

导入数据到 Kafka topic

 bin/kafka-console-producer.sh --topic order_info --broker-list localhost:9092 < /tmp/data.txt 

查看导入的数据

bin/kafka-console-consumer.sh --topic order_info --bootstrap-server localhost:9092 --from-beginning

{"order_date":"2022-09-07","order_id":"10001","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:00","update_time":"2022-09-07 09:00:00"}
{"order_date":"2022-09-07","order_id":"10002","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:01","update_time":"2022-09-07 09:00:01"}
{"order_date":"2022-09-07","order_id":"10003","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:02","update_time":"2022-09-07 09:00:02"}
{"order_date":"2022-09-07","order_id":"10004","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:03","update_time":"2022-09-07 09:00:03"}
{"order_date":"2022-09-07","order_id":"10005","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:04","update_time":"2022-09-07 09:00:04"}
{"order_date":"2022-09-07","order_id":"10006","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:05","update_time":"2022-09-07 09:00:05"}

导入数据到 Doris

登录mysql客户端

创建 Doris 数据表

CREATE TABLE IF NOT EXISTS test.order_info_example
(  
    `order_date` DATE NOT NULL COMMENT "下单日期",  
    `order_id` INT NOT NULL COMMENT "订单id",  
    `buy_num` TINYINT COMMENT "购买件数",  
    `user_id` INT COMMENT "[-9223372036854775808, 9223372036854775807]",  
    `create_time` DATETIME COMMENT "创建时间",  
    `update_time` DATETIME COMMENT "更新时间"
)  
ENGINE=olap  
DUPLICATE KEY(`order_date`, `order_id`)  
DISTRIBUTED BY HASH(`order_id`) BUCKETS 32  
PROPERTIES (  
 "replication_num" = "1"); 

创建 Doris ROUTINE LOAD

语法:

CREATE ROUTINE LOAD [db.]job_name ON tbl_name  
[merge_type]  
[load_properties]  
[job_properties]  
FROM data_source [data_source_properties]  

CREATE ROUTINE LOAD test.order_info_routine_load_example ON order_info_example  
WITH APPEND
COLUMNS(order_date,order_id,buy_num,user_id,create_time,update_time)  
PROPERTIES  
(  
    "desired_concurrent_number"="3",    "max_batch_interval" = "20",    "max_batch_rows" = "300000",    "max_batch_size" = "209715200",    "strict_mode" = "false",    "format" = "json")  
FROM KAFKA  
(  
    "kafka_broker_list" = "localhost:9092",    "kafka_topic" = "order_info",   "property.group.id" = "order_info_routine_load_example",  "property.kafka_default_offsets" = "OFFSET_BEGINNING");  

merge_type: 数据合并类型,默认为 WITH APPEND,表示导入的数据都是普通的追加写操作。MERGE 和 DELETE 类型仅适用于 Unique Key 模型表。
strict_mode: 是否开启严格模式,默认为关闭。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤
format: 指定导入数据格式,默认是csv,支持json格式。
更多参数请参考ROUTINE LOAD

可通过show routine load查看任务的运行情况

查看数据

mysql> select * from order_info_example;
±-----------±---------±--------±--------±--------------------±--------------------+
| order_date | order_id | buy_num | user_id | create_time | update_time |
±-----------±---------±--------±--------±--------------------±--------------------+
| 2022-09-07 | 10006 | 2 | 30001 | 2022-09-07 09:00:05 | 2022-09-07 09:00:05 |
| 2022-09-07 | 10005 | 2 | 30001 | 2022-09-07 09:00:04 | 2022-09-07 09:00:04 |
| 2022-09-07 | 10004 | 2 | 30001 | 2022-09-07 09:00:03 | 2022-09-07 09:00:03 |
| 2022-09-07 | 10002 | 2 | 30001 | 2022-09-07 09:00:01 | 2022-09-07 09:00:01 |
| 2022-09-07 | 10003 | 2 | 30001 | 2022-09-07 09:00:02 | 2022-09-07 09:00:02 |
| 2022-09-07 | 10001 | 2 | 30001 | 2022-09-07 09:00:00 | 2022-09-07 09:00:00 |
±-----------±---------±--------±--------±--------------------±--------------------+
6 rows in set (0.05 sec)

至此,数据已经成功写入到Doris。

相关文章:

  • 机器学习笔记 - CRAFT(文本检测的字符区域感知)论文解读
  • 【云原生-Docker】Docker 安装 Python
  • ESP8266-Arduino编程实例-TLV493D磁传感器驱动
  • Hue在大数据生态圈的集成
  • AtCoder Beginner Contest 267 (A~D)
  • 羊了个羊游戏源码搭建开发过程
  • 基于人工蜂群算法的新型概率密度模型无人机路径规划(Matlab代码实现)
  • File Inclusion 全级别
  • 微信小程序——云开发|计费方式调整大家怎么看?
  • Github 最新霸榜,号称架构师修炼之路的“葵花宝典”限时开源
  • RFSoC应用笔记 - RF数据转换器 -07- RFSoC关键配置之RF-DAC内部解析(一)
  • 【老生谈算法】matlab实现霍夫变换算法源码——霍夫变换算法
  • 赶紧进来看看!!!你一定要会做的八道经典指针笔试题!!!
  • 力扣刷题流程--记录用
  • bp神经网络优化算法对比,bp神经网络的优化算法
  • 【React系列】如何构建React应用程序
  • Apache的80端口被占用以及访问时报错403
  • codis proxy处理流程
  • echarts的各种常用效果展示
  • ES6核心特性
  • Golang-长连接-状态推送
  • JS+CSS实现数字滚动
  • Quartz初级教程
  • scala基础语法(二)
  • WordPress 获取当前文章下的所有附件/获取指定ID文章的附件(图片、文件、视频)...
  • 测试开发系类之接口自动化测试
  • 基于Mobx的多页面小程序的全局共享状态管理实践
  • 开发了一款写作软件(OSX,Windows),附带Electron开发指南
  • 猫头鹰的深夜翻译:JDK9 NotNullOrElse方法
  • 转载:[译] 内容加速黑科技趣谈
  • ( 用例图)定义了系统的功能需求,它是从系统的外部看系统功能,并不描述系统内部对功能的具体实现
  • (04)Hive的相关概念——order by 、sort by、distribute by 、cluster by
  • (超详细)2-YOLOV5改进-添加SimAM注意力机制
  • (蓝桥杯每日一题)平方末尾及补充(常用的字符串函数功能)
  • .net 中viewstate的原理和使用
  • .NET/C# 使用 #if 和 Conditional 特性来按条件编译代码的不同原理和适用场景
  • .net开发时的诡异问题,button的onclick事件无效
  • @JoinTable会自动删除关联表的数据
  • @staticmethod和@classmethod的作用与区别
  • [<事务专题>]
  • [Android Studio 权威教程]断点调试和高级调试
  • [Angular] 笔记 9:list/detail 页面以及@Output
  • [ASP.NET MVC]Ajax与CustomErrors的尴尬
  • [bzoj1912]异象石(set)
  • [C++] sqlite3_get_table 的使用
  • [HAOI2016]食物链
  • [iHooya]2023年1月30日作业解析
  • [Oh My C++ Diary]一元作用域运算符::的使用
  • [python] dataclass 快速创建数据类
  • [SpringBoot系列]进阶配置
  • [SpringCloud | Linux] CentOS7 部署 SpringCloud 微服务
  • [uniapp] uni-ui+vue3.2小程序评论列表组件 回复评论 点赞和删除
  • [Unity] 基于迭代器的协程底层原理详解
  • [Unity][VR]透视开发系列4-解决只看得到Passthrough但看不到Unity对象的问题
  • [WCF安全系列]从两种安全模式谈起