Hadoop之HDFS文件上传源码解析
HDFS文件上传源码解析
文件创建流程
添加依赖
为了在项目中使用 HDFS 的文件上传功能,我们需要添加以下依赖项到项目的构建配置文件中(例如 Maven 的 pom.xml
文件):
<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs-client</artifactId><version>3.1.3</version><scope>provided</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency>
</dependencies>
用户代码示例
接下来,我们通过一个简单的测试用例来演示如何在 HDFS 中创建并写入文件:
@Test
public void testPut2() throws IOException {FSDataOutputStream fos = fs.create(new Path("/input"));fos.write("hello world".getBytes());
}
创建过程详解
在这一节中,我们将详细探讨 HDFS 中文件创建的具体流程。从用户代码开始,逐步深入到 HDFS 客户端内部。
1. FileSystem 类
FileSystem
类提供了创建文件的基本接口。下面是 create
方法的定义:
public FSDataOutputStream create(Path f) throws IOException {return create(f, true);
}public FSDataOutputStream create(Path f, boolean overwrite)throws IOException {return create(f, overwrite,getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,IO_FILE_BUFFER_SIZE_DEFAULT),getDefaultReplication(f),getDefaultBlockSize(f));
}public FSDataOutputStream create(Path f,boolean overwrite,int bufferSize,short replication,long blockSize) throws IOException {return create(f, overwrite, bufferSize, replication, blockSize, null);
}public FSDataOutputStream create(Path f,boolean overwrite,int bufferSize,short replication,long blockSize,Progressable progress) throws IOException {return this.create(f, FsCreateModes.applyUMask(FsPermission.getFileDefault(), FsPermission.getUMask(getConf())),overwrite, bufferSize, replication, blockSize, progress);
}public abstract FSDataOutputStream create(Path f,FsPermission permission,boolean overwrite,int bufferSize,short replication,long blockSize,Progressable progress) throws IOException;
2. DistributedFileSystem 类
DistributedFileSystem
类继承自 FileSystem
类,实现了具体的 HDFS 文件系统操作。下面是 create
方法的实现:
@Override
public FSDataOutputStream create(Path f, FsPermission permission,boolean overwrite, int bufferSize, short replication, long blockSize,Progressable progress) throws IOException {return this.create(f, permission,overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE): EnumSet.of(CreateFlag.CREATE), bufferSize, replication,blockSize, progress, null);
}@Override
public FSDataOutputStream create(final Path f, final FsPermission permission,final EnumSet<CreateFlag> cflags, final int bufferSize,final short replication, final long blockSize,final Progressable progress, final ChecksumOpt checksumOpt)throws IOException {statistics.incrementWriteOps(1);storageStatistics.incrementOpCounter(OpType.CREATE);Path absF = fixRelativePart(f);return new FileSystemLinkResolver<FSDataOutputStream>() {@Overridepublic FSDataOutputStream doCall(final Path p) throws IOException {// 创建获取了一个输出流对象final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,cflags, replication, blockSize, progress, bufferSize,checksumOpt);// 这里将上面创建的dfsos进行包装并返回return dfs.createWrappedOutputStream(dfsos, statistics);}@Overridepublic FSDataOutputStream next(final FileSystem fs, final Path p)throws IOException {return fs.create(p, permission, cflags, bufferSize,replication, blockSize, progress, checksumOpt);}}.resolve(this, absF);
}
3. DFSClient 类
DFSClient
类负责与 NameNode 和 DataNode 的通信。下面是 create
方法的实现:
public DFSOutputStream create(String src, FsPermission permission,EnumSet<CreateFlag> flag, short replication, long blockSize,Progressable progress, int buffersize, ChecksumOpt checksumOpt)throws IOException {return create(src, permission, flag, true,replication, blockSize, progress, buffersize, checksumOpt, null);
}public DFSOutputStream create(String src, FsPermission permission,EnumSet<CreateFlag> flag, boolean createParent, short replication,long blockSize, Progressable progress, int buffersize,ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes)throws IOException {return create(src, permission, flag, createParent, replication, blockSize,progress, buffersize, checksumOpt, favoredNodes, null);
}public DFSOutputStream create(String src, FsPermission permission,EnumSet<CreateFlag> flag, boolean createParent, short replication,long blockSize, Progressable progress, int buffersize,ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,String ecPolicyName) throws IOException {checkOpen();final FsPermission masked = applyUMask(permission);LOG.debug("{}: masked={}", src, masked);final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,src, masked, flag, createParent, replication, blockSize, progress,dfsClientConf.createChecksum(checksumOpt),getFavoredNodesStr(favoredNodes), ecPolicyName);beginFileLease(result.getFileId(), result);return result;
}
4. DFSOutputStream 类
最后,我们来看看 DFSOutputStream
类中的 newStreamForCreate
方法,该方法负责实际的文件创建逻辑:
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,short replication, long blockSize, Progressable progress,DataChecksum checksum, String[] favoredNodes, String ecPolicyName)throws IOException {try (TraceScope ignored =dfsClient.newPathTraceScope("newStreamForCreate", src)) {HdfsFileStatus stat = null;// Retry the create if we get a RetryStartFileException up to a maximum// number of timesboolean shouldRetry = true;int retryCount = CREATE_RETRY_COUNT;while (shouldRetry) {shouldRetry = false;try {// DN将创建请求发送给NN(RPC)stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,new EnumSetWritable<>(flag), createParent, replication,blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);break;} catch (RemoteException re) {… ….}}Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");final DFSOutputStream out;if (stat.getErasureCodingPolicy() != null) {out = new DFSStripedOutputStream(dfsClient, src, stat,flag, progress, checksum, favoredNodes);} else {out = new DFSOutputStream(dfsClient, src, stat,flag, progress, checksum, favoredNodes, true);}// 开启线程run,DataStreamer extends Daemon extends Threadout.start();return out;}
}
NameNode 处理 DataNode 创建请求
1. ClientProtocol.java 中的 create 方法
客户端通过调用 ClientProtocol
接口中的 create
方法来发起文件创建请求:
HdfsFileStatus create(String src, FsPermission masked,String clientName, EnumSetWritable<CreateFlag> flag,boolean createParent, short replication, long blockSize,CryptoProtocolVersion[] supportedVersions, String ecPolicyName)throws IOException;
2. NameNodeRpcServer.java 中的 create 实现
在 NameNodeRpcServer
类中实现了 create
方法,该方法检查 NameNode 是否已启动,并调用 namesystem.startFile
方法来处理文件创建:
public HdfsFileStatus create(String src, FsPermission masked,String clientName, EnumSetWritable<CreateFlag> flag,boolean createParent, short replication, long blockSize,Cr