当前位置: 首页 > news >正文

DataX迁移数据到StarRocks超大表报too many version问题记录

目录

1 背景说明

2 问题描述

3 解决思路

3.1 磁盘问题

3.2 DataX配置

3.3 分桶设置

3.4 增量迁移


1 背景说明

       项目上有两张大表,数据量在300w左右,每天凌晨通过datax将前一天最新的全量户数迁移到StarRocks对应的分区表中。分区表设置的动态分区,保存最近3天的分区数据,并自动创建未来2天的分区。 每个分区存的都是对应源表截止到分区当天的全量数据。 

       通过DataX将数据迁移到StarRocks的任务每天凌晨有几百个,非常多,而且时间基本上都集中在凌晨0点到4点之间。 随着系统的运行,时间的推移,任务越来越多,每天要迁移到数仓的数据也越来越多。 

       StarRocks版本:2.2.13 RELEASE

2 问题描述

最近一个月发现这两张超过300w数据的大表,频繁出现任务迁移失败的问题,问题报错如下:

2024-08-05 16:30:09.286 [Thread-1] WARN StarRocksWriterManager - Failed to flush batch data to StarRocks, retry times = 0
java.io.IOException: Failed to flush data to StarRocks.
Too many versions. tablet_id: 7716375, version_count: 1001, limit: 1000: be:172.16.*.*
{"Status":"Fail","BeginTxnTimeMs":0,"Message":"Too many versions. tablet_id: 7716375, version_count: 1001, limit: 1000: be:172.16.*.*","NumberUnselectedRows":0,"CommitAndPublishTimeMs":0,"Label":"4ef91eac-3e95-4313-9a36-979f23fdbf1e","LoadBytes":5246126,"StreamLoadPutTimeMs":1,"NumberTotalRows":0,"WriteDataTimeMs":53,"TxnId":5530963,"LoadTimeMs":55,"ReadDataTimeMs":5,"NumberLoadedRows":0,"NumberFilteredRows":0}at com.starrocks.connector.datax.plugin.writer.starrockswriter.manager.StarRocksStreamLoadVisitor.doStreamLoad(StarRocksStreamLoadVisitor.java:95) ~[starrockswriter-1.1.0.jar:na]
at com.starrocks.connector.datax.plugin.writer.starrockswriter.manager.StarRocksWriterManager.asyncFlush(StarRocksWriterManager.java:174) [starrockswriter-1.1.0.jar:na]
at com.starrocks.connector.datax.plugin.writer.starrockswriter.manager.StarRocksWriterManager.access$000(StarRocksWriterManager.java:21) [starrockswriter-1.1.0.jar:na]
at com.starrocks.connector.datax.plugin.writer.starrockswriter.manager.StarRocksWriterManager$1.run(StarRocksWriterManager.java:143) [starrockswriter-1.1.0.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_251]
[INFO] 2024-08-05 16:30:11.097 +0800 [taskAppId=TASK-20240805-12935239844672_7-1874900-7057918] TaskLogLogger-class org.apache.dolphinscheduler.plugin.task.datax.DataxTask:[63] - -> 2024-08-05 16:30:10.288 [Thread-1] INFO StarRocksStreamLoadVisitor - Executing stream load to: 'http://172.16.*.*:8040/api/ods_zm/ods_QLSHZT_ms0_y_uoc_plan_diversion/_stream_load', size: '5246126'

       同样的配置,其他表都是正常,但是这两张大表失败的频率非常高,基本上平均没两天一次,但是如果在白天没有定时任务的时候,手动执行DataX任务的话,任务也可以成功,问题就比较诡异。 定时任务的失败,影响第二天报表的展示,在每天一早手工跑任务担惊受怕客户投诉的背景下,我们不得不想办法彻底解决这个问题。 

3 解决思路

       最直观的解决就是查这个报错,然后对应去修改,这个报错是StarRocks很常见的报错,官方针对这个问题也有解决方案,如下:

我们按照这个做过调整,但是问题并没有根治,还是会出现这个问题。 

针对这个问题,结合实际的问题情况,我们从以下几个角度去考虑和解决问题。

3.1 磁盘问题

       因为这两张大表的数据在白天是可以正常迁移成功的,只有在凌晨任务量比较多的时候才会频繁失败,考虑有没有可能是磁盘性能问题,通过安装sar命令,对磁盘做压测,如下:

这里面每个参数信息如下:

  • DEV:设备名称,表示监控的磁盘或分区,例如 /dev/sda
  • tps:每秒传输次数,表示每秒完成的读写操作次数。
  • rd_sec/s:每秒读扇区数,表示每秒从磁盘读取的扇区数。
  • wr_sec/s:每秒写扇区数,表示每秒向磁盘写入的扇区数。
  • avgrq-sz:平均请求队列大小,表示平均每次 I/O 请求等待的平均队列长度。
  • avgqu-sz:平均队列长度,表示平均 I/O 请求队列的长度。
  • await:平均等待时间,表示 I/O 请求的平均等待时间(毫秒)。
  • svctm:平均服务时间,表示平均每次 I/O 请求的服务时间(毫秒)。
  • %util:利用率,表示磁盘繁忙程度的百分比。

       从这个参数看,await值有些偏高,初步怀疑磁盘性能可能是瓶颈。 给项目上的硬件组反馈该问题,硬件侧的同学不认可。考虑到我们压测时候的数据写入的量级和实际凌晨的任务执行时候有差异,所以我们尝试写一个监控脚本,监控每天凌晨0点到4点这期间的磁盘使用情况,每隔一分钟输出对应的Average信息。

监控使用Python开发,默认主机的python版本是2.7的版本,参考源码如下:

# -*- coding: utf-8 -*-
import time
import subprocess
import datetime# 定义输出文件的路径
output_file_path = "/middleware/output.log"
# 定义 sar 命令
sar_command = "sar -d | head -3; sar -d | grep dev252"# 定义一个函数来执行 sar 命令并写入文件
def execute_sar_command():# 格式化当前时间formatted_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")# 创建一个子进程process = subprocess.Popen(sar_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)# 获取 sar 命令的输出output, error = process.communicate()# 检查命令是否成功执行if process.returncode == 0 and output:# 分割输出文本为行lines = output.split('\n')# 获取最后一行的索引last_line_index = len(lines) - 2# 获取 Average 行average_line = lines[last_line_index]# 加上时间戳average_line_with_timestamp = "{} - {}".format(formatted_time, average_line)# 将结果写入文件with open(output_file_path, 'a') as file:file.write(average_line_with_timestamp + "\n")slog = "{} - {}".format(formatted_time,'Data saved to ' + output_file_path)print(slog)else:slog = "{} - {}".format(formatted_time, 'Failed to execute sar command: ' + error)print(slog)if __name__ == "__main__":# 主循环while True:# 获取当前时间current_time = datetime.datetime.now()# 定义开始时间start_time = current_time.replace(hour=0, minute=0, second=0, microsecond=0)# 定义结束时间end_time = current_time.replace(hour=5, minute=0, second=0, microsecond=0)# 判断是否在指定区间内if start_time <= current_time <= end_time:execute_sar_command()time.sleep(60)  # 每 1 分钟执行一次else:print("{} - {}".format(current_time, ' Current time is out of the desired range. Skipping execution.'))time.sleep(300)  # 每 5 分钟执行一次

       将这个脚本放到StarRocks集群的每天节点上,然后后台启动脚本常驻进程即可。

nohup python sarMonitor.py > nohup.log 2>&1 &

       通过这个可以监控到凌晨任务非常多时候,磁盘的使用情况。

通过监控,发现磁盘的瓶颈问题并不是核心问题,毕竟每天凌晨95%的任务都是正常的,数据量也不小,只有这一两个大表数据不行,所以还是从其他方面再考虑。

3.2 DataX配置

       首先要确认下DataX是否已经使用了starrockswriter,尽管mysqlwriter也可以正常写入数据,但是效率要差得远。

       官网关于datax导入有专门的章节,参考这里。这里,贴一段项目上的datax配置信息,如下所示:

{"job":{"content":[{"reader":{"parameter":{"password":"xxxxxxx","connection":[{"querySql":["select `plan_id`,replace(`source_code`,\"'\",\"\"),`header_id`,`line_id`,replace(`project_no`,\"'\",\"\"),replace(`project_name`,\"'\",\"\"),replace(`schedule_no`,\"'\",\"\"),replace(`schedule_type_code`,\"'\",\"\"),replace(`schedule_type_name`,\"'\",\"\"),`schedule_date`,replace(`schedule_section`,\"'\",\"\"),replace(`attribute_category`,\"'\",\"\"),replace(`single_project`,\"'\",\"\"),replace(`header_comment`,\"'\",\"\"),replace(`line_num`,\"'\",\"\"),replace(`line_comment`,\"'\",\"\"),replace(`plan_status`,\"'\",\"\"),replace(`match_status`,\"'\",\"\"),replace(`execution_way`,\"'\",\"\"),replace(`source`,\"'\",\"\"),`erp_org_id`,replace(`erp_organization_code`,\"'\",\"\"),replace(`erp_organization_name`,\"'\",\"\"),replace(`erp_org_level`,\"'\",\"\"),replace(`org_tree_path`,\"'\",\"\"),`org_id`,replace(`org_name`,\"'\",\"\"),`erp_buyer_person_id`,replace(`erp_buyer_emp_num`,\"'\",\"\"),replace(`erp_buyer_person_num`,\"'\",\"\"),replace(`erp_buyer_full_name`,\"'\",\"\"),`buyer_person_id`,replace(`buyer_person_num`,\"'\",\"\"),replace(`buyer_full_name`,\"'\",\"\"),`planer_erp_id`,replace(`use_department`,\"'\",\"\"),replace(`use_location`,\"'\",\"\"),replace(`centralized_purchasing_flag`,\"'\",\"\"),replace(`centralized_purchasing_list`,\"'\",\"\"),`item_id`,replace(`item_no`,\"'\",\"\"),replace(`item_brand`,\"'\",\"\"),replace(`category_id`,\"'\",\"\"),replace(`item_category`,\"'\",\"\"),replace(`item_category1`,\"'\",\"\"),replace(`item_category2`,\"'\",\"\"),replace(`item_category3`,\"'\",\"\"),replace(`item_channels`,\"'\",\"\"),replace(`item_desc`,\"'\",\"\"),replace(`item_specification`,\"'\",\"\"),replace(`fasch_flag`,\"'\",\"\"),replace(`tech_parameter`,\"'\",\"\"),replace(`mpn_vendor`,\"'\",\"\"),`quantity`,`scheme_qty`,`source_qty`,`order_qty`,`package_qty`,`result_qty`,`commodity_qty`,`agreement_qty`,`budget_price`,`unit_price`,`budget_amount`,`total_amount`,replace(`expense_category`,\"'\",\"\"),replace(`currency_code`,\"'\",\"\"),`need_by_date`,replace(`delivery_location`,\"'\",\"\"),replace(`material`,\"'\",\"\"),replace(`uom_code`,\"'\",\"\"),replace(`mpn_num`,\"'\",\"\"),replace(`executive_std`,\"'\",\"\"),replace(`flow_code`,\"'\",\"\"),`created_userid`,replace(`created_emp_num`,\"'\",\"\"),replace(`created_full_name`,\"'\",\"\"),`created_date`,`approved_date`,`lead_time`,replace(`procure_type`,\"'\",\"\"),replace(`status`,\"'\",\"\"),`rate`,replace(`todo`,\"'\",\"\"),replace(`cux_status`,\"'\",\"\"),replace(`erpsource`,\"'\",\"\"),`last_updated_by`,`last_update_date`,replace(`return_status`,\"'\",\"\"),replace(`return_msg`,\"'\",\"\"),replace(`in_Implement_catalog`,\"'\",\"\"),replace(`in_controls_catalog`,\"'\",\"\"),replace(`last_match_status`,\"'\",\"\"),replace(`transfer_good_status`,\"'\",\"\"),`non_recruit_order_qty`,replace(`ext1`,\"'\",\"\"),replace(`ext2`,\"'\",\"\"),replace(`ext3`,\"'\",\"\"),replace(`UPDATE_OPER_ID`,\"'\",\"\"),replace(`UPDATE_OPER_NAME`,\"'\",\"\"),`UPDATE_TIME`,replace(`province_id`,\"'\",\"\"),replace(`city_id`,\"'\",\"\"),replace(`area_id`,\"'\",\"\"),replace(`address`,\"'\",\"\"),replace(`province_name`,\"'\",\"\"),replace(`city_name`,\"'\",\"\"),replace(`area_name`,\"'\",\"\"),replace(`address_name`,\"'\",\"\"),replace(`detail_address`,\"'\",\"\"),replace(`addr_company`,\"'\",\"\"),replace(`addr_user`,\"'\",\"\"),replace(`addr_phone`,\"'\",\"\"),replace(`addr_mobile`,\"'\",\"\"),replace(`step_id`,\"'\",\"\"),`cancel_match_approval_state`,`contract_qty`,`ssc_scheme_qty`,`crc_result_qty`,`crc_source_qty`,`project_qty`, now() as sync_time, date_sub(CURDATE(), interval 1 day) as sync_date from uoc_plan_diversion"],"jdbcUrl":["jdbc:mysql://172.16.*.*:3306/database_name?serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull"]}],"username":"uname"},"name":"mysqlreader"},"writer":{"parameter":{"password":"xxxxx","loadProps":{"format":"json","strip_outer_array":true},"column":["`plan_id`","`source_code`","`header_id`","`line_id`","`project_no`","`project_name`","`schedule_no`","`schedule_type_code`","`schedule_type_name`","`schedule_date`","`schedule_section`","`attribute_category`","`single_project`","`header_comment`","`line_num`","`line_comment`","`plan_status`","`match_status`","`execution_way`","`source`","`erp_org_id`","`erp_organization_code`","`erp_organization_name`","`erp_org_level`","`org_tree_path`","`org_id`","`org_name`","`erp_buyer_person_id`","`erp_buyer_emp_num`","`erp_buyer_person_num`","`erp_buyer_full_name`","`buyer_person_id`","`buyer_person_num`","`buyer_full_name`","`planer_erp_id`","`use_department`","`use_location`","`centralized_purchasing_flag`","`centralized_purchasing_list`","`item_id`","`item_no`","`item_brand`","`category_id`","`item_category`","`item_category1`","`item_category2`","`item_category3`","`item_channels`","`item_desc`","`item_specification`","`fasch_flag`","`tech_parameter`","`mpn_vendor`","`quantity`","`scheme_qty`","`source_qty`","`order_qty`","`package_qty`","`result_qty`","`commodity_qty`","`agreement_qty`","`budget_price`","`unit_price`","`budget_amount`","`total_amount`","`expense_category`","`currency_code`","`need_by_date`","`delivery_location`","`material`","`uom_code`","`mpn_num`","`executive_std`","`flow_code`","`created_userid`","`created_emp_num`","`created_full_name`","`created_date`","`approved_date`","`lead_time`","`procure_type`","`status`","`rate`","`todo`","`cux_status`","`erpsource`","`last_updated_by`","`last_update_date`","`return_status`","`return_msg`","`in_Implement_catalog`","`in_controls_catalog`","`last_match_status`","`transfer_good_status`","`non_recruit_order_qty`","`ext1`","`ext2`","`ext3`","`UPDATE_OPER_ID`","`UPDATE_OPER_NAME`","`UPDATE_TIME`","`province_id`","`city_id`","`area_id`","`address`","`province_name`","`city_name`","`area_name`","`address_name`","`detail_address`","`addr_company`","`addr_user`","`addr_phone`","`addr_mobile`","`step_id`","`cancel_match_approval_state`","`contract_qty`","`ssc_scheme_qty`","`crc_result_qty`","`crc_source_qty`","`project_qty`","`sync_time`","`sync_date`"],"loadUrl":["172.16.0.1:8040","172.16.0.2:8040","172.16.0.3:8040"],"connection":[{"selectedDatabase":"ods_zm","jdbcUrl":"jdbc:mysql:loadbalance://172.16.0.1:9030,172.16.0.2:9030,172.16.0.3:9030/","table":["ods_QLSHZT_ms0_y_uoc_plan_diversion"]}],"writeMode":"insert","username":"uname"},"name":"starrockswriter"}}],"setting":{"speed":{"channel":"4"}}}
}

       可以看到writer已经是starrockswriter了。

       考虑到凌晨任务非常多,磁盘繁忙并且无法调整为硬件,我们可以尝试调整datax的配置,参考官方对于writer中的参数说明,我们做了一下调整:

① 修改speed配置如下

② 显示的设置maxBatchSize

设置"maxBatchSize":100857600。 通过显示的设置maxBatchSize,发现starrockswriter攒批写入频次变低了、写入速度更快了,300w的数据,原来迁移需要5分钟左右,调整后,2分钟多就可以迁移完。

效率有了明显提升。

3.3 分桶设置

       对于分区一般都有一个基本的认知,基于分区键,将数据进行不同分区的数据路由,分桶是SR独有的概念,在分区下面又有分桶,基于分桶再对数据做二次的分布,并提供副本机制。其实也比较简单,就是确定分区的情况下,在分区下面查询又有分区(分桶),主要还是为了优化查询效率。

       300w的数据量对于数仓StarRocks不算是很大的数据量,对于这种小数据量级场景的,通过设置分桶靠SR并行机制带来的查询效率提升,微乎其微,反倒是tablet细碎 有大量元数据要维护 消耗内存更多,所以我们尝统一将分桶数都改成1。

       这里其实有一个实际实施时候的困难点,业务人员对数仓的特性不了解,了解数仓特性的技术人员对业务不了解,然后基于数仓又开发了更上层的数据中台等应用,让业务人员直接使用,这就造成实际实施的时候,很难讲数仓的威力发挥出来,因为业务人员没办法明确知道那些列或者那些字段作为分桶键效果更好。 或者说什么时候需要走分桶,什么时候不需要分桶,从技术的角度来说,关于分桶的概念很好理解,但是要使用好它,前提还是要和业务数据结合起来。 业务和技术的结合,是项目实施的难点。

       回到这个问题中,对于平台提供方的我们,对业务也不懂,所以也无法给出很好的分桶设置建议,但是考虑到官方关于分桶数据量设置的建议如下:

       结合实际业务场景,我们就没必要设置多个分桶了,直接设置分桶数为1即可。

注:这块的调整,暂未在业务上做验证,任务都是凌晨跑的,没有即席查询场景,所以对于查询效率的影响没有做验证,调整分桶数主要是考虑减少tablet数量。

3.4 增量迁移

       还有一种方案就是做增量迁移,就是DataX的reader中数据查询带上条件即可。这样每天凌晨迁移数据的时候,都是只迁移增量的数据。 但是这种方案有一些限制因素,如果源表没有增量字段,或者删除是物理删除,这种场景下增量迁移可能就无法实现了。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 深度学习入门(二):常见概念(重点:泛化误差)
  • 电销机器人有哪些优势?
  • 【python基础】python基础习题练习(一)
  • C++ 依赖倒置
  • 【C++】STL | string 详解及重要函数的实现
  • 项目中万能使用tailwindcss,无版本冲突、报错
  • 中小微企业必看:税贷票贷融资策略与实战技巧
  • 最新消息:Sedex 供应商会员年费调整
  • Pytest测试报告生成专题
  • 客户三要素和五要素
  • dp+容斥原理,LeetCode 3130. 找出所有稳定的二进制数组 II
  • 【学习总结】MySQL篇
  • 我的API定义规范(未完待续,欢迎指正)
  • 关于缓存的一些心得
  • unity对象缓存技术ObjectPool
  • Docker: 容器互访的三种方式
  • ES6 学习笔记(一)let,const和解构赋值
  • HTTP传输编码增加了传输量,只为解决这一个问题 | 实用 HTTP
  • JavaScript标准库系列——Math对象和Date对象(二)
  • JavaScript实现分页效果
  • jQuery(一)
  • laravel 用artisan创建自己的模板
  • Selenium实战教程系列(二)---元素定位
  • springboot_database项目介绍
  • VUE es6技巧写法(持续更新中~~~)
  • vue学习系列(二)vue-cli
  • webgl (原生)基础入门指南【一】
  • 百度小程序遇到的问题
  • 翻译 | 老司机带你秒懂内存管理 - 第一部(共三部)
  • 算法---两个栈实现一个队列
  • 译自由幺半群
  • 在electron中实现跨域请求,无需更改服务器端设置
  • 正则学习笔记
  • ​DB-Engines 12月数据库排名: PostgreSQL有望获得「2020年度数据库」荣誉?
  • #鸿蒙生态创新中心#揭幕仪式在深圳湾科技生态园举行
  • $.extend({},旧的,新的);合并对象,后面的覆盖前面的
  • (173)FPGA约束:单周期时序分析或默认时序分析
  • (C++二叉树05) 合并二叉树 二叉搜索树中的搜索 验证二叉搜索树
  • (pojstep1.1.2)2654(直叙式模拟)
  • (剑指Offer)面试题41:和为s的连续正数序列
  • (十八)devops持续集成开发——使用docker安装部署jenkins流水线服务
  • (算法)Game
  • **CI中自动类加载的用法总结
  • .java 指数平滑_转载:二次指数平滑法求预测值的Java代码
  • .Net Attribute详解(上)-Attribute本质以及一个简单示例
  • .NET Core中的去虚
  • .NET 表达式计算:Expression Evaluator
  • .NET 设计模式初探
  • .net8.0与halcon编程环境构建
  • .Net--CLS,CTS,CLI,BCL,FCL
  • .NetCore发布到IIS
  • .NET教程 - 字符串 编码 正则表达式(String Encoding Regular Express)
  • .NET实现之(自动更新)
  • /使用匿名内部类来复写Handler当中的handlerMessage()方法
  • ??eclipse的安装配置问题!??