当前位置: 首页 > news >正文

Hadoop之HDFS文件上传源码解析

HDFS文件上传源码解析

文件创建流程
添加依赖

为了在项目中使用 HDFS 的文件上传功能,我们需要添加以下依赖项到项目的构建配置文件中(例如 Maven 的 pom.xml 文件):

<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs-client</artifactId><version>3.1.3</version><scope>provided</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency>
</dependencies>
用户代码示例

接下来,我们通过一个简单的测试用例来演示如何在 HDFS 中创建并写入文件:

@Test
public void testPut2() throws IOException {FSDataOutputStream fos = fs.create(new Path("/input"));fos.write("hello world".getBytes());
}
创建过程详解

在这一节中,我们将详细探讨 HDFS 中文件创建的具体流程。从用户代码开始,逐步深入到 HDFS 客户端内部。

1. FileSystem 类

FileSystem 类提供了创建文件的基本接口。下面是 create 方法的定义:

public FSDataOutputStream create(Path f) throws IOException {return create(f, true);
}public FSDataOutputStream create(Path f, boolean overwrite)throws IOException {return create(f, overwrite,getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,IO_FILE_BUFFER_SIZE_DEFAULT),getDefaultReplication(f),getDefaultBlockSize(f));
}public FSDataOutputStream create(Path f,boolean overwrite,int bufferSize,short replication,long blockSize) throws IOException {return create(f, overwrite, bufferSize, replication, blockSize, null);
}public FSDataOutputStream create(Path f,boolean overwrite,int bufferSize,short replication,long blockSize,Progressable progress) throws IOException {return this.create(f, FsCreateModes.applyUMask(FsPermission.getFileDefault(), FsPermission.getUMask(getConf())),overwrite, bufferSize, replication, blockSize, progress);
}public abstract FSDataOutputStream create(Path f,FsPermission permission,boolean overwrite,int bufferSize,short replication,long blockSize,Progressable progress) throws IOException;

2. DistributedFileSystem 类

DistributedFileSystem 类继承自 FileSystem 类,实现了具体的 HDFS 文件系统操作。下面是 create 方法的实现:

@Override
public FSDataOutputStream create(Path f, FsPermission permission,boolean overwrite, int bufferSize, short replication, long blockSize,Progressable progress) throws IOException {return this.create(f, permission,overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE): EnumSet.of(CreateFlag.CREATE), bufferSize, replication,blockSize, progress, null);
}@Override
public FSDataOutputStream create(final Path f, final FsPermission permission,final EnumSet<CreateFlag> cflags, final int bufferSize,final short replication, final long blockSize,final Progressable progress, final ChecksumOpt checksumOpt)throws IOException {statistics.incrementWriteOps(1);storageStatistics.incrementOpCounter(OpType.CREATE);Path absF = fixRelativePart(f);return new FileSystemLinkResolver<FSDataOutputStream>() {@Overridepublic FSDataOutputStream doCall(final Path p) throws IOException {// 创建获取了一个输出流对象final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,cflags, replication, blockSize, progress, bufferSize,checksumOpt);// 这里将上面创建的dfsos进行包装并返回return dfs.createWrappedOutputStream(dfsos, statistics);}@Overridepublic FSDataOutputStream next(final FileSystem fs, final Path p)throws IOException {return fs.create(p, permission, cflags, bufferSize,replication, blockSize, progress, checksumOpt);}}.resolve(this, absF);
}

3. DFSClient 类

DFSClient 类负责与 NameNode 和 DataNode 的通信。下面是 create 方法的实现:

public DFSOutputStream create(String src, FsPermission permission,EnumSet<CreateFlag> flag, short replication, long blockSize,Progressable progress, int buffersize, ChecksumOpt checksumOpt)throws IOException {return create(src, permission, flag, true,replication, blockSize, progress, buffersize, checksumOpt, null);
}public DFSOutputStream create(String src, FsPermission permission,EnumSet<CreateFlag> flag, boolean createParent, short replication,long blockSize, Progressable progress, int buffersize,ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes)throws IOException {return create(src, permission, flag, createParent, replication, blockSize,progress, buffersize, checksumOpt, favoredNodes, null);
}public DFSOutputStream create(String src, FsPermission permission,EnumSet<CreateFlag> flag, boolean createParent, short replication,long blockSize, Progressable progress, int buffersize,ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,String ecPolicyName) throws IOException {checkOpen();final FsPermission masked = applyUMask(permission);LOG.debug("{}: masked={}", src, masked);final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,src, masked, flag, createParent, replication, blockSize, progress,dfsClientConf.createChecksum(checksumOpt),getFavoredNodesStr(favoredNodes), ecPolicyName);beginFileLease(result.getFileId(), result);return result;
}

4. DFSOutputStream 类

最后,我们来看看 DFSOutputStream 类中的 newStreamForCreate 方法,该方法负责实际的文件创建逻辑:

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,short replication, long blockSize, Progressable progress,DataChecksum checksum, String[] favoredNodes, String ecPolicyName)throws IOException {try (TraceScope ignored =dfsClient.newPathTraceScope("newStreamForCreate", src)) {HdfsFileStatus stat = null;// Retry the create if we get a RetryStartFileException up to a maximum// number of timesboolean shouldRetry = true;int retryCount = CREATE_RETRY_COUNT;while (shouldRetry) {shouldRetry = false;try {// DN将创建请求发送给NN(RPC)stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,new EnumSetWritable<>(flag), createParent, replication,blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);break;} catch (RemoteException re) {… ….}}Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");final DFSOutputStream out;if (stat.getErasureCodingPolicy() != null) {out = new DFSStripedOutputStream(dfsClient, src, stat,flag, progress, checksum, favoredNodes);} else {out = new DFSOutputStream(dfsClient, src, stat,flag, progress, checksum, favoredNodes, true);}// 开启线程run,DataStreamer extends Daemon extends Threadout.start();return out;}
}
NameNode 处理 DataNode 创建请求
1. ClientProtocol.java 中的 create 方法

客户端通过调用 ClientProtocol 接口中的 create 方法来发起文件创建请求:

HdfsFileStatus create(String src, FsPermission masked,String clientName, EnumSetWritable<CreateFlag> flag,boolean createParent, short replication, long blockSize,CryptoProtocolVersion[] supportedVersions, String ecPolicyName)throws IOException;
2. NameNodeRpcServer.java 中的 create 实现

NameNodeRpcServer 类中实现了 create 方法,该方法检查 NameNode 是否已启动,并调用 namesystem.startFile 方法来处理文件创建:

public HdfsFileStatus create(String src, FsPermission masked,String clientName, EnumSetWritable<CreateFlag> flag,boolean createParent, short replication, long blockSize,Cr

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • StackStorm自动化平台
  • 【通天星主动安全监控云平台信息泄露漏洞】
  • MySQL数据库入门,pycharm连接数据库—详细讲解
  • 算法的学习笔记—调整数组顺序使奇数位于偶数前面(牛客JZ21)
  • CSS的:valid和:invalid伪类:增强表单验证的视觉反馈
  • PyTorch 基础学习(7)- 自动微分
  • 【计算机人接私活】手把手教你上手挖到第一个漏洞,从底薪3k到月入过万,只有一步之遥!
  • C语言 ——— 枚举类型的定义及其优点
  • Qt-多种方式实现helloworld(6)
  • 技术周总结08.12-08.18周日(C#开发环境搭建 Linux命令)
  • 蓝图中结构体改变后,要重新创建widget
  • 系统开发之禁止卸载应用名单
  • 图卷积(GCN)
  • 第一章——数组基础(概念篇python版)
  • Android+Jacoco+code-diff全量、增量覆盖率生成实战
  • canvas绘制圆角头像
  • Centos6.8 使用rpm安装mysql5.7
  • docker-consul
  • express + mock 让前后台并行开发
  • Java多态
  • jquery cookie
  • laravel 用artisan创建自己的模板
  • mysql_config not found
  • nodejs:开发并发布一个nodejs包
  • Otto开发初探——微服务依赖管理新利器
  • vue-cli3搭建项目
  • 阿里研究院入选中国企业智库系统影响力榜
  • 案例分享〡三拾众筹持续交付开发流程支撑创新业务
  • 给自己的博客网站加上酷炫的初音未来音乐游戏?
  • 京东美团研发面经
  • 跨域
  • 浏览器缓存机制分析
  • 前端_面试
  • 区块链共识机制优缺点对比都是什么
  • 一加3T解锁OEM、刷入TWRP、第三方ROM以及ROOT
  • elasticsearch-head插件安装
  • Linux权限管理(week1_day5)--技术流ken
  • 如何正确理解,内页权重高于首页?
  • (8)STL算法之替换
  • (floyd+补集) poj 3275
  • (ISPRS,2021)具有遥感知识图谱的鲁棒深度对齐网络用于零样本和广义零样本遥感图像场景分类
  • (LLM) 很笨
  • (二十九)STL map容器(映射)与STL pair容器(值对)
  • (计算机网络)物理层
  • (七)Knockout 创建自定义绑定
  • (三)c52学习之旅-点亮LED灯
  • (深入.Net平台的软件系统分层开发).第一章.上机练习.20170424
  • (十六)Flask之蓝图
  • (转) 深度模型优化性能 调参
  • (转)scrum常见工具列表
  • *++p:p先自+,然后*p,最终为3 ++*p:先*p,即arr[0]=1,然后再++,最终为2 *p++:值为arr[0],即1,该语句执行完毕后,p指向arr[1]
  • .htaccess 强制https 单独排除某个目录
  • .NET CF命令行调试器MDbg入门(二) 设备模拟器
  • .NET Entity FrameWork 总结 ,在项目中用处个人感觉不大。适合初级用用,不涉及到与数据库通信。
  • .net framework 4.0中如何 输出 form 的name属性。