当前位置: 首页 > news >正文

HDFS 之 文件流

org.apache.hadoop.hdfs.DFSInputStream

read 接口的关键逻辑在以下 pread 接口。

  private int pread(long position, ByteBuffer buffer)throws IOException {// sanity checksdfsClient.checkOpen();if (closed.get()) {throw new IOException("Stream closed");}failures = 0;long filelen = getFileLength();if ((position < 0) || (position >= filelen)) {return -1;}int length = buffer.remaining();int realLen = length;if ((position + length) > filelen) {realLen = (int)(filelen - position);}// determine the block and byte range within the block// corresponding to position and realLenList<LocatedBlock> blockRange = getBlockRange(position, realLen);int remaining = realLen;CorruptedBlocks corruptedBlocks = new CorruptedBlocks();for (LocatedBlock blk : blockRange) {long targetStart = position - blk.getStartOffset();int bytesToRead = (int) Math.min(remaining,blk.getBlockSize() - targetStart);long targetEnd = targetStart + bytesToRead - 1;try {if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {hedgedFetchBlockByteRange(blk, targetStart,targetEnd, buffer, corruptedBlocks);} else {fetchBlockByteRange(blk, targetStart, targetEnd,buffer, corruptedBlocks);}} finally {// Check and report if any block replicas are corrupted.// BlockMissingException may be caught if all block replicas are// corrupted.reportCheckSumFailure(corruptedBlocks, blk.getLocations().length,false);}remaining -= bytesToRead;position += bytesToRead;}assert remaining == 0 : "Wrong number of bytes read.";return realLen;}

遇到临界位置, 这边起主要作用

    if ((position + length) > filelen) {realLen = (int)(filelen - position);}
hadoop 中 H1SeekableInputStream 和 H2SeekableInputStream
  • H1SeekableInputStream
/*** SeekableInputStream implementation that implements read(ByteBuffer) for* Hadoop 1 FSDataInputStream.*/
class H1SeekableInputStream extends DelegatingSeekableInputStream {

H1SeekableInputStream 直接使用父类 DelegatingSeekableInputStream 中的 readFully 方法。

@Overridepublic int read(ByteBuffer buf) throws IOException {if (buf.hasArray()) {return readHeapBuffer(stream, buf);} else {return readDirectBuffer(stream, buf, temp);}}

在这里插入图片描述

最后到这里:

  // Visible for testingstatic void readFullyHeapBuffer(InputStream f, ByteBuffer buf) throws IOException {readFully(f, buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());buf.position(buf.limit());}

巧妙的转成了 bytes 数组,进行读写。

  • H2SeekableInputStream
/*** SeekableInputStream implementation for FSDataInputStream that implements* ByteBufferReadable in Hadoop 2.*/
class H2SeekableInputStream extends DelegatingSeekableInputStream {

H2SeekableInputStream 覆写了 父类 DelegatingSeekableInputStream 中的 readFully 方法。

@Overridepublic void readFully(ByteBuffer buf) throws IOException {readFully(reader, buf);}
org.apache.parquet.hadoop.util
  • org.apache.parquet.hadoop.util.HadoopInputFile
  public SeekableInputStream newStream() throws IOException {return HadoopStreams.wrap(fs.open(stat.getPath()));}
  • org.apache.parquet.hadoop.util.HadoopStreams

相关返回哪个 SeekableInputStream, 逻辑在这里。

  public static SeekableInputStream wrap(FSDataInputStream stream) {Objects.requireNonNull(stream, "Cannot wrap a null input stream");if (byteBufferReadableClass != null && h2SeekableConstructor != null &&byteBufferReadableClass.isInstance(stream.getWrappedStream())) {try {return h2SeekableConstructor.newInstance(stream);} catch (InstantiationException | IllegalAccessException e) {LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e);return new H1SeekableInputStream(stream);} catch (InvocationTargetException e) {throw new ParquetDecodingException("Could not instantiate H2SeekableInputStream", e.getTargetException());}} else {return new H1SeekableInputStream(stream);}}

在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • proteus仿真c51单片机(四)双机串口通信(电路设计及代码)
  • 八 信息系统基础知识(考点篇)试题
  • Obsidian插件安装与开发
  • Thinkphp框架漏洞(附修复方法)
  • 【QT】鼠标按键事件 - QMouseEvent QKeyEvent
  • 在psotgres中的gist和gin索引介绍
  • RM小陀螺技术经验与思考
  • 无法访问jakarta.servlet.http.HttpServletRequest
  • 基于QT实现的简易WPS(已开源)
  • HCIP笔记4-OSPF(2)
  • solidity 数学和密码学函数
  • HarmonyOS应用开发知识地图
  • 清空QWidget
  • 回归预测|基于雪消融优化极端梯度提升树的数据回归预测Matlab程序SAO-XGBoost多特征输入单输出 含基础模型
  • 常用API(三)
  • [译] 怎样写一个基础的编译器
  • ES6核心特性
  • exif信息对照
  • HashMap ConcurrentHashMap
  • HTTP传输编码增加了传输量,只为解决这一个问题 | 实用 HTTP
  • If…else
  • RxJS 实现摩斯密码(Morse) 【内附脑图】
  • 彻底搞懂浏览器Event-loop
  • 大型网站性能监测、分析与优化常见问题QA
  • 适配mpvue平台的的微信小程序日历组件mpvue-calendar
  • 掌握面试——弹出框的实现(一道题中包含布局/js设计模式)
  • 智能网联汽车信息安全
  • 湖北分布式智能数据采集方法有哪些?
  • #162 (Div. 2)
  • ${ }的特别功能
  • (1)虚拟机的安装与使用,linux系统安装
  • (二)换源+apt-get基础配置+搜狗拼音
  • (分类)KNN算法- 参数调优
  • (附源码)计算机毕业设计SSM疫情居家隔离服务系统
  • (六)什么是Vite——热更新时vite、webpack做了什么
  • (论文阅读23/100)Hierarchical Convolutional Features for Visual Tracking
  • (四)事件系统
  • (转)人的集合论——移山之道
  • (自用)learnOpenGL学习总结-高级OpenGL-抗锯齿
  • .net core 管理用户机密
  • .Net MVC + EF搭建学生管理系统
  • .NET中的Exception处理(C#)
  • .sh 的运行
  • @Bean有哪些属性
  • @PreAuthorize与@Secured注解的区别是什么?
  • @RequestBody与@ResponseBody的使用
  • [2018/11/18] Java数据结构(2) 简单排序 冒泡排序 选择排序 插入排序
  • [ACP云计算]组件介绍
  • [AI]文心一言爆火的同时,ChatGPT带来了这么多的开源项目你了解吗
  • [Algorithm][动态规划][两个数组的DP][正则表达式匹配][交错字符串][两个字符串的最小ASCII删除和][最长重复子数组]详细讲解
  • [BUUCTF]-PWN:wustctf2020_number_game解析(补码,整数漏洞)
  • [C#]C# winform实现imagecaption图像生成描述图文描述生成
  • [C#]猫叫人醒老鼠跑 C#的委托及事件
  • [C][数据结构][树]详细讲解
  • [C++]C++类基本语法