当前位置: 首页 > news >正文

大型文件数据读取并持久化到数据库

产品经理今天给了一个上亿数据的文本文件给我,让我把导入到mysql数据库。
文本的内容很简单,只有一个字段,但有1亿行。
在这里插入图片描述
我拿到文件后最开始直接用navicat工具直接导入,但发现效率极慢,跑了一分多钟,才导进去10W+数据进去,算下来要跑完至少需要20多个小时,时间不允许。
看来只能自己写代码来提升效率了。
常规的做法肯定是把文件内容按行读取出来,然后每N条拆分一批,再插入到数据库中。但这个文件太大,一次性全部读取到内存中,对机器有点压力。所以只能按批来读取,一边读一边写,已经持久化的数据就及时释放掉,避免一直占用内存。哎!LinkedBlockingQueue 就很适合干这个事。

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.LineIter;
import cn.hutool.core.io.FileUtil;
import com.yc.kfpt.oversea.dao.entity.SourceCode;
import com.yc.kfpt.oversea.dao.repository.SourceCodeRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;import java.io.BufferedReader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** @author 敖癸* @formatter:on* @since 2024/3/6*/
@Slf4j
@Service
@RequiredArgsConstructor
public class ImportDataService {private final SourceCodeRepository sourceCodeRepository;private final Executor asyncExecutor;@Asyncpublic void importData() {LinkedBlockingQueue<String> codeQueue = new LinkedBlockingQueue<>(500000);// 监听器线程queueListener(codeQueue).start();readFile("D:\\91-240305-j000.txt", codeQueue);}/*** 创建队列监听器** @param codeQueue* @return java.lang.Thread* @author 敖癸* @since 2024/3/6 - 16:41*/private Thread queueListener(LinkedBlockingQueue<String> codeQueue) {return new Thread(() -> {long index = 0;List<String> codes = new ArrayList<>();while (true) {try {String code = codeQueue.poll(5, TimeUnit.SECONDS);// 如果超过5秒从队列中还没获取到数据,就认为已经没有数据了if (code == null) {if (CollUtil.isNotEmpty(codes)) {log.info("入库处理: {}", index);List<SourceCode> entities = convertToEntity(codes);asyncExecutor.execute(() -> sourceCodeRepository.saveBatch("GENERAL", entities));codes.clear();}break;}index++;codes.add(code);// 5000一个批次if (codes.size() == 5000) {log.info("入库处理: {}", index);List<SourceCode> entities = convertToEntity(codes);// 持久化操作扔到线程池中异步去执行,可以多开点线程数量。asyncExecutor.execute(() -> sourceCodeRepository.saveBatch("GENERAL", entities));codes.clear();}} catch (InterruptedException e) {throw new RuntimeException(e);}}});}/*** 文件读取** @param codeQueue* @author 敖癸* @since 2024/3/6 - 16:41*/private static void readFile(String filePath, LinkedBlockingQueue<String> codeQueue) {BufferedReader reader = FileUtil.getReader(filePath, Charset.defaultCharset());int readCount = 0;  // 读取行数计数try (LineIter lineIter = new LineIter(reader)) {while (lineIter.hasNext()) {readCount++;// 如果codeQueue中的元素个数已达上限,这里会阻塞codeQueue.put(lineIter.next());if (readCount % 50000 == 0) {log.info("已读取{}行", readCount);}}} catch (Exception e) {log.error("文件读取异常", e);}log.info("读取完成,供{}行", readCount);}/*** 将行数据转换成数据库对象** @param codes* @return java.util.List<com.yc.kfpt.oversea.dao.entity.SourceCode>* @author 敖癸* @since 2024/3/6 - 16:43*/private static List<SourceCode> convertToEntity(List<String> codes) {return codes.stream().map(SourceCode::new).collect(Collectors.toList());}
}

实测,1亿数据量,大概花了20分钟导入完成。
这里需要注意的知识点:
LinkedBlockingQueue 的 put 方法,如果队列已满,会阻塞等待,直到队列中腾出空位。
LinkedBlockingQueue 的 poll 方法,可以设置超时时间,在等待超时后如果在队列中还是没有拿到数据,就返回null。
注意 take, add, offer, remove,poll,put的使用区别
注意 take, add, offer, remove,poll,put的使用区别

关于 LinkedBlockingQueue 的详解,可以参考一下这位博主的文章
深入理解Java系列 | LinkedBlockingQueue用法详解

相关文章:

  • 力扣--动态规划/回溯算法131.分割回文串
  • 【MacOS原版镜像下载】讲解
  • LaTex 笔记
  • 视频极速切割无损工具免费版,亲测好用!
  • Flutter APP下载更新
  • 新规正式发布 | 百度深度参编《生成式人工智能服务安全基本要求》
  • C++的萃取技术
  • 5个实用的PyCharm插件
  • 【Echarts】曲线图上方显示数字以及自定义值,标题和副标题居中,鼠标上显示信息以及自定义信息
  • 如何定义resultType和resultMap,它们之间的区别是什么?解释一下<parameterType>的作用和用法。
  • Python——读写属性
  • Apache Doris 2.1.0 版本发布:开箱盲测性能大幅优化,复杂查询性能提升 100%
  • 基于冠豪猪优化算法(Crested Porcupine Optimizer,CPO)的无人机三维路径规划(MATLAB)
  • centos安装hadoop启动问题解决方案
  • multipass基本操作
  • 【css3】浏览器内核及其兼容性
  • 【vuex入门系列02】mutation接收单个参数和多个参数
  • 【跃迁之路】【733天】程序员高效学习方法论探索系列(实验阶段490-2019.2.23)...
  • Go 语言编译器的 //go: 详解
  • Java,console输出实时的转向GUI textbox
  • Mac 鼠须管 Rime 输入法 安装五笔输入法 教程
  • Median of Two Sorted Arrays
  • Spring声明式事务管理之一:五大属性分析
  • Swift 中的尾递归和蹦床
  • webpack4 一点通
  • 短视频宝贝=慢?阿里巴巴工程师这样秒开短视频
  • 分享几个不错的工具
  • 好的网址,关于.net 4.0 ,vs 2010
  • 简单基于spring的redis配置(单机和集群模式)
  • 前端技术周刊 2018-12-10:前端自动化测试
  • 什么软件可以剪辑音乐?
  • 使用agvtool更改app version/build
  • 手机app有了短信验证码还有没必要有图片验证码?
  • 掌握面试——弹出框的实现(一道题中包含布局/js设计模式)
  • 组复制官方翻译九、Group Replication Technical Details
  • ​学习笔记——动态路由——IS-IS中间系统到中间系统(报文/TLV)​
  • !!【OpenCV学习】计算两幅图像的重叠区域
  • ### Error querying database. Cause: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
  • #162 (Div. 2)
  • #使用清华镜像源 安装/更新 指定版本tensorflow
  • #我与Java虚拟机的故事#连载12:一本书带我深入Java领域
  • (55)MOS管专题--->(10)MOS管的封装
  • (利用IDEA+Maven)定制属于自己的jar包
  • (六)DockerCompose安装与配置
  • (续)使用Django搭建一个完整的项目(Centos7+Nginx)
  • (转)VC++中ondraw在什么时候调用的
  • (转)母版页和相对路径
  • (转)使用VMware vSphere标准交换机设置网络连接
  • .bashrc在哪里,alias妙用
  • .NET / MSBuild 扩展编译时什么时候用 BeforeTargets / AfterTargets 什么时候用 DependsOnTargets?
  • .NET CORE 第一节 创建基本的 asp.net core
  • .NET 服务 ServiceController
  • .NET 实现 NTFS 文件系统的硬链接 mklink /J(Junction)
  • .NET建议使用的大小写命名原则
  • /dev下添加设备节点的方法步骤(通过device_create)