HDFS 之 文件流
org.apache.hadoop.hdfs.DFSInputStream
read 接口的关键逻辑在以下 pread 接口。
private int pread(long position, ByteBuffer buffer)throws IOException {// sanity checksdfsClient.checkOpen();if (closed.get()) {throw new IOException("Stream closed");}failures = 0;long filelen = getFileLength();if ((position < 0) || (position >= filelen)) {return -1;}int length = buffer.remaining();int realLen = length;if ((position + length) > filelen) {realLen = (int)(filelen - position);}// determine the block and byte range within the block// corresponding to position and realLenList<LocatedBlock> blockRange = getBlockRange(position, realLen);int remaining = realLen;CorruptedBlocks corruptedBlocks = new CorruptedBlocks();for (LocatedBlock blk : blockRange) {long targetStart = position - blk.getStartOffset();int bytesToRead = (int) Math.min(remaining,blk.getBlockSize() - targetStart);long targetEnd = targetStart + bytesToRead - 1;try {if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {hedgedFetchBlockByteRange(blk, targetStart,targetEnd, buffer, corruptedBlocks);} else {fetchBlockByteRange(blk, targetStart, targetEnd,buffer, corruptedBlocks);}} finally {// Check and report if any block replicas are corrupted.// BlockMissingException may be caught if all block replicas are// corrupted.reportCheckSumFailure(corruptedBlocks, blk.getLocations().length,false);}remaining -= bytesToRead;position += bytesToRead;}assert remaining == 0 : "Wrong number of bytes read.";return realLen;}
遇到临界位置, 这边起主要作用
if ((position + length) > filelen) {realLen = (int)(filelen - position);}
hadoop 中 H1SeekableInputStream 和 H2SeekableInputStream
- H1SeekableInputStream
/*** SeekableInputStream implementation that implements read(ByteBuffer) for* Hadoop 1 FSDataInputStream.*/
class H1SeekableInputStream extends DelegatingSeekableInputStream {
H1SeekableInputStream 直接使用父类 DelegatingSeekableInputStream 中的 readFully 方法。
@Overridepublic int read(ByteBuffer buf) throws IOException {if (buf.hasArray()) {return readHeapBuffer(stream, buf);} else {return readDirectBuffer(stream, buf, temp);}}
最后到这里:
// Visible for testingstatic void readFullyHeapBuffer(InputStream f, ByteBuffer buf) throws IOException {readFully(f, buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());buf.position(buf.limit());}
巧妙的转成了 bytes 数组,进行读写。
- H2SeekableInputStream
/*** SeekableInputStream implementation for FSDataInputStream that implements* ByteBufferReadable in Hadoop 2.*/
class H2SeekableInputStream extends DelegatingSeekableInputStream {
H2SeekableInputStream 覆写了 父类 DelegatingSeekableInputStream 中的 readFully 方法。
@Overridepublic void readFully(ByteBuffer buf) throws IOException {readFully(reader, buf);}
org.apache.parquet.hadoop.util
- org.apache.parquet.hadoop.util.HadoopInputFile
public SeekableInputStream newStream() throws IOException {return HadoopStreams.wrap(fs.open(stat.getPath()));}
- org.apache.parquet.hadoop.util.HadoopStreams
相关返回哪个 SeekableInputStream, 逻辑在这里。
public static SeekableInputStream wrap(FSDataInputStream stream) {Objects.requireNonNull(stream, "Cannot wrap a null input stream");if (byteBufferReadableClass != null && h2SeekableConstructor != null &&byteBufferReadableClass.isInstance(stream.getWrappedStream())) {try {return h2SeekableConstructor.newInstance(stream);} catch (InstantiationException | IllegalAccessException e) {LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e);return new H1SeekableInputStream(stream);} catch (InvocationTargetException e) {throw new ParquetDecodingException("Could not instantiate H2SeekableInputStream", e.getTargetException());}} else {return new H1SeekableInputStream(stream);}}