当前位置: 首页 > news >正文

hdfs源码解析之DFSClient

1、DFSClient类简介

    DFSClient 是 Hadoop 分布式文件系统(HDFS)中的一个核心类,用于客户端与 HDFS 之间的交互。它提供了一组方法,使客户端应用程序可以方便地与 HDFS 进行通信,包括文件的读取、写入、创建、删除、重命名等操作。DFSClient 封装了与 NameNode 和 DataNode 的通信细节,使得客户端开发者可以通过高级 API 进行文件系统操作,而不必关心底层的实现细节。

2、DFSClient主要功能

2.1、文件读取和写入

  • 提供方法用于读取和写入 HDFS 上的文件。
  • 例如,open 方法用于打开文件以读取,create 方法用于创建新文件以写入。

2.2、文件操作

  • 支持文件的创建、删除、重命名、追加等操作。
  • 例如,delete 方法用于删除文件或目录,rename 方法用于重命名文件或目录。

2.3、目录操作

  • 支持创建、删除和列出目录。
  • 例如,mkdirs 方法用于创建目录,listPaths 方法用于列出目录内容。

2.4、获取文件和目录信息

  • 提供方法获取文件和目录的元数据信息。
  • 例如,getFileInfo 方法用于获取文件或目录的详细信息,getLocatedBlocks 方法用于获取文件的块位置。

2.5、与NN、DN通信

  • 管理与 NameNode 的通信,用于获取文件的元数据和块位置信息。
  • 管理与 DataNode 的通信,用于读取和写入实际的数据块。

3、DFSClient核心源码

    DFSClient源码主要包括:创建客户端连接(配置获取、令牌处理、连接地址解析)

3.1、构造方法

3.1.1、代码概述

该构造函数已废弃,接受一个Configuration对象,并调用另一个构造函数获取NameNode地址

  @Deprecatedpublic DFSClient(Configuration conf) throws IOException {this(DFSUtilClient.getNNAddress(conf), conf);}

该构造函数接受一个InetSocketAddress对象和一个Configuration对象,并将InetSocketAddress 转换为URI然后调用另一个基于URI的构造函数

  public DFSClient(InetSocketAddress address, Configuration conf)throws IOException {this(DFSUtilClient.getNNUri(address), conf);}

该构造函数接受一个URI对象和一个Configuration对象,并将FileSystem.Statistics参数设置为 null,然后调用另一个更完整的构造函数

  public DFSClient(URI nameNodeUri, Configuration conf) throws IOException {this(nameNodeUri, conf, null);}

该构造函数接受一个URI对象、一个Configuration对象和一个FileSystem.Statistics对象,然后调用最完整的构造函数

  public DFSClient(URI nameNodeUri, Configuration conf,FileSystem.Statistics stats) throws IOException {this(nameNodeUri, null, conf, stats);}

 最底层构造函数,该方法不建议直接调用。

  @VisibleForTestingpublic DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,Configuration conf, FileSystem.Statistics stats) throws IOException {// Copy only the required DFSClient configurationthis.tracer = FsTracer.get(conf);this.dfsClientConf = new DfsClientConf(conf);this.conf = conf;this.stats = stats;this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);this.dtpReplaceDatanodeOnFailureReplication = (short) conf.getInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION,HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION_DEFAULT);LOG.debug("Sets {} to {}",HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION, dtpReplaceDatanodeOnFailureReplication);this.ugi = UserGroupInformation.getCurrentUser();this.namenodeUri = nameNodeUri;this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" +ThreadLocalRandom.current().nextInt()  + "_" +Thread.currentThread().getId();int numResponseToDrop = conf.getInt(DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);ProxyAndInfo<ClientProtocol> proxyInfo = null;AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);if (numResponseToDrop > 0) {// This case is used for testing.LOG.warn("{} is set to {} , this hacked client will proactively drop responses",DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, numResponseToDrop);proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf,nameNodeUri, ClientProtocol.class, numResponseToDrop,nnFallbackToSimpleAuth);}if (proxyInfo != null) {this.dtService = proxyInfo.getDelegationTokenService();this.namenode = proxyInfo.getProxy();} else if (rpcNamenode != null) {// This case is used for testing.Preconditions.checkArgument(nameNodeUri == null);this.namenode = rpcNamenode;dtService = null;} else {Preconditions.checkArgument(nameNodeUri != null,"null URI");proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf,nameNodeUri, nnFallbackToSimpleAuth);this.dtService = proxyInfo.getDelegationTokenService();this.namenode = proxyInfo.getProxy();}String localInterfaces[] =conf.getTrimmedStrings(DFS_CLIENT_LOCAL_INTERFACES);localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {LOG.debug("Using local interfaces [{}] with addresses [{}]",Joiner.on(',').join(localInterfaces),Joiner.on(',').join(localInterfaceAddrs));}Boolean readDropBehind =(conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?null : conf.getLongBytes(DFS_CLIENT_CACHE_READAHEAD, 0);this.serverDefaultsValidityPeriod = conf.getTimeDuration(DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY,DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_DEFAULT,TimeUnit.MILLISECONDS);Boolean writeDropBehind =(conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);this.defaultReadCachingStrategy =new CachingStrategy(readDropBehind, readahead);this.defaultWriteCachingStrategy =new CachingStrategy(writeDropBehind, readahead);this.clientContext = ClientContext.get(conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),dfsClientConf, conf);if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) {this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize());}this.initThreadsNumForStripedReads(dfsClientConf.getStripedReadThreadpoolSize());this.saslClient = new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);}

3.1.2、重点剖析

DFSClient的核心构建方式是传入namenode节点对应的URI以及配置信息,也是我们构建DFSClient通常使用的方法

public DFSClient(URI nameNodeUri, Configuration conf) throws IOException {this(nameNodeUri, conf, null);
}

3.2、委托令牌处理      

        这段源码是一个用于续约和取消 HDFS 委托令牌(Delegation Token)的 Renewer 类,它继承自 TokenRenewer 类。主要功能是通过与 NameNode 通信,维护和管理委托令牌的生命周期。

3.2.1、代码概述

3.2.2、重点剖析

  • static静态代码块为初始化hdfs配置文件;
  • handleKind方法用于判断是否处理指定类型的委托令牌,在当前源码中会默认判定是否为HDFS的委托令牌类型;
  • renew 方法用于续约委托令牌。它通过 getNNProxy 方法获取到与委托令牌对应的 NameNode 代理,然后调用 renewDelegationToken 方法进行委托令牌的续约操作;
  • cancel 方法用于取消委托令牌。它也通过 getNNProxy 方法获取 NameNode 代理,然后调用 cancelDelegationToken 方法执行委托令牌的取消操作;
  • getNNProxy 方法根据委托令牌获取对应的 NameNode 代理。它首先根据委托令牌的信息构建 URI,然后通过 NameNodeProxiesClient 类的静态方法创建 NameNode 的代理对象,并返回该代理对象。

3.3、getLocalInterfaceAddrs

3.3.1、代码概述

       这个方法的作用是接受一个接口名称的数组,并根据每个接口名称解析成对应的本地地址(可以是 IP 地址、子网或域名)。它首先尝试将接口名称视为一个 IP 地址,如果不是,则检查它是否是一个有效的子网,如果仍然不是,则假定它是一个域名,并通过 DNS 解析。最终,所有解析出的地址都被封装为 InetSocketAddress 对象,并返回一个包含这些地址的数组。

private static SocketAddress[] getLocalInterfaceAddrs(String interfaceNames[]) throws UnknownHostException {List<SocketAddress> localAddrs = new ArrayList<>();for (String interfaceName : interfaceNames) {if (InetAddresses.isInetAddress(interfaceName)) {localAddrs.add(new InetSocketAddress(interfaceName, 0));} else if (NetUtils.isValidSubnet(interfaceName)) {for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) {localAddrs.add(new InetSocketAddress(addr, 0));}} else {for (String ip : DNS.getIPs(interfaceName, false)) {localAddrs.add(new InetSocketAddress(ip, 0));}}}return localAddrs.toArray(new SocketAddress[localAddrs.size()]);}

3.3.2、重点剖析

  1. 该方法首先检查interfaceName是否是一个有效的IP地址:
  2. 如果不是IP地址,检查interfaceName是否是一个有效的子网:
  3. 如果是有效的子网,获取该子网中的所有IP地址,并将每个IP地址封装为InetSocketAddress对象,添加到localAddrs列表中。
  4. 如果既不是IP地址也不是子网,假定它是一个域名:
  5. 通过DNS解析获取该域名的所有IP地址,并将每个IP地址封装为InetSocketAddress对象,添加到localAddrs列表中。

3.4、getRandomLocalInterfaceAddr

3.4.1、代码概述

        这个方法的作用是从一组预先配置的本地接口地址 (localInterfaceAddrs 数组) 中随机选择一个地址并返回。

SocketAddress getRandomLocalInterfaceAddr() {if (localInterfaceAddrs.length == 0) {return null;}final int idx = r.nextInt(localInterfaceAddrs.length);final SocketAddress addr = localInterfaceAddrs[idx];LOG.debug("Using local interface {}", addr);return addr;}

3.4.2、重点剖析

  1. 检查 localInterfaceAddrs 数组是否为空,如果为空则返回 null
  2. 使用随机数生成器 r 生成一个随机索引 idx
  3. 获取并返回 localInterfaceAddrs 数组中对应索引 idxSocketAddress 对象。
  4. 在返回之前,记录调试日志以便于跟踪选中的本地接口地址。

3.5、读写超时时间判定

3.5.1、代码概述

        这段代码包含两个方法:getDatanodeWriteTimeoutgetDatanodeReadTimeout,它们用于计算数据节点写入和读取的超时时间。每个方法都接收一个参数 numNodes,表示数据节点的数量。

int getDatanodeWriteTimeout(int numNodes) {final int t = dfsClientConf.getDatanodeSocketWriteTimeout();return t > 0? t + HdfsConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0;
}int getDatanodeReadTimeout(int numNodes) {final int t = dfsClientConf.getSocketTimeout();return t > 0? HdfsConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0;
}

3.5.2、重点剖析

  1. 通过dfsclientconf获取写入\读取超时时间t;
  2. 如果t大于0则返回 t 加上一个扩展超时时间,这个扩展超时时间是常量 HdfsConstants.WRITE_TIMEOUT_EXTENSION 乘以 numNodes(数据节点数量)
  3. 如果t<=0,则返回0

3.6、租约管理

3.6.1、代码概述

        这段代码定义了三个方法:getLeaseRenewerbeginFileLeaseendFileLease,用于管理HDFS中的文件租约。文件租约机制确保文件在写入过程中不会被其他客户端修改或删除。

public LeaseRenewer getLeaseRenewer() {return LeaseRenewer.getInstance(namenodeUri != null ? namenodeUri.getAuthority() : "null", ugi, this);}/** Get a lease and start automatic renewal */private void beginFileLease(final String key, final DFSOutputStream out) {synchronized (filesBeingWritten) {putFileBeingWritten(key, out);LeaseRenewer renewer = getLeaseRenewer();boolean result = renewer.put(this);if (!result) {// Existing LeaseRenewer cannot add another Daemon, so remove existing// and add new one.LeaseRenewer.remove(renewer);renewer = getLeaseRenewer();renewer.put(this);}}}/** Stop renewal of lease for the file. */void endFileLease(final String renewLeaseKey) {synchronized (filesBeingWritten) {removeFileBeingWritten(renewLeaseKey);// remove client from renewer if no files are openif (filesBeingWritten.isEmpty()) {getLeaseRenewer().closeClient(this);}}}

3.6.2、重点剖析

  • 获取租约续约器getLeaseRenewer 方法返回一个 LeaseRenewer 实例,用于管理租约的续约。        
    • 获取租约续约器
    • 调用 LeaseRenewer.getInstance 方法获取 LeaseRenewer 实例。
    • 如果 namenodeUri 不为空,则使用其权限部分(authority),否则使用 "null"。ugi(用户组信息)和当前 DFSClient 实例(this)作为参数传递给 LeaseRenewer.getInstance
  • 开始文件租约beginFileLease 方法将文件添加到写入记录中,并确保当前客户端的租约续约器能够处理该文件的续约。
    • 使用 key 和 out(DFSOutputStream 实例)调用 putFileBeingWritten 方法,记录正在写入的文件;
    • 获取 LeaseRenewer 实例;
    • 调用 renewer.put(this) 方法将当前客户端添加到租约续约器中;
    • 如果返回结果为 false(表示现有的 LeaseRenewer 不能添加新的守护线程),则移除现有的 LeaseRenewer,获取新的 LeaseRenewer 实例,并将当前客户端添加到新的 LeaseRenewer 中;
  • 结束文件租约endFileLease 方法移除文件写入记录,并在没有文件写入时关闭客户端的租约续约
    • 使用 renewLeaseKey 调用 removeFileBeingWritten 方法,从记录中移除正在写入的文件
    • 如果没有文件在写入(filesBeingWritten 为空),则获取 LeaseRenewer 实例,调用 renewer.closeClient(this) 方法,关闭当前客户端的租约续约。

todo,未完待续

相关文章:

  • 【ARM Cache 及 MMU 系列文章 6.5 -- 如何进行 Cache miss 统计?】
  • 利用CUDA加速卷积计算:原理、实践与示例代码
  • 深入理解网络传输协议——TCP/IP协议的可靠交付服务的特征
  • 面向对象进阶--继承(Java继承(超详解))
  • 关于QTcreator,19年大学时写的文章了,之前写在印象笔记现在拉过来,往事如烟呐
  • C#面:详细阐述什么是 DTO
  • 什么是数字化,什么是数智化?数字化与数智化的区别和联系
  • BT音频方案
  • 央国企财务专家的“专家课”——中国总会计师协会联合实在智能举办RPA专项培训
  • web标准与浏览器前缀
  • GANs网络在图像和视频技术中的应用前景
  • springboot中maven的使用教程
  • Qt 实战(4)信号与槽 | 4.1、信号与槽机制
  • 切换到root用户的方法和区别
  • Linux 编写脚本自动清理旧的日志文件,释放磁盘空间
  • IOS评论框不贴底(ios12新bug)
  • Java精华积累:初学者都应该搞懂的问题
  • node-sass 安装卡在 node scripts/install.js 解决办法
  • PAT A1120
  • Redis 中的布隆过滤器
  • yii2权限控制rbac之rule详细讲解
  • 阿里云爬虫风险管理产品商业化,为云端流量保驾护航
  • 对JS继承的一点思考
  • 排序算法学习笔记
  • 入手阿里云新服务器的部署NODE
  • 数组的操作
  • 一些基于React、Vue、Node.js、MongoDB技术栈的实践项目
  • 译有关态射的一切
  • 源码之下无秘密 ── 做最好的 Netty 源码分析教程
  • 怎样选择前端框架
  • 函数计算新功能-----支持C#函数
  • ​ 无限可能性的探索:Amazon Lightsail轻量应用服务器引领数字化时代创新发展
  • # 深度解析 Socket 与 WebSocket:原理、区别与应用
  • #、%和$符号在OGNL表达式中经常出现
  • #{}和${}的区别是什么 -- java面试
  • #Spring-boot高级
  • $.type 怎么精确判断对象类型的 --(源码学习2)
  • ()、[]、{}、(())、[[]]等各种括号的使用
  • (10)工业界推荐系统-小红书推荐场景及内部实践【排序模型的特征】
  • (3)llvm ir转换过程
  • (4)Elastix图像配准:3D图像
  • (Matalb时序预测)WOA-BP鲸鱼算法优化BP神经网络的多维时序回归预测
  • (二)fiber的基本认识
  • (二十四)Flask之flask-session组件
  • (分布式缓存)Redis哨兵
  • (九)信息融合方式简介
  • (论文阅读40-45)图像描述1
  • (三)elasticsearch 源码之启动流程分析
  • (三)模仿学习-Action数据的模仿
  • (十六)串口UART
  • (十三)Flink SQL
  • (五)网络优化与超参数选择--九五小庞
  • . NET自动找可写目录
  • .net core 连接数据库,通过数据库生成Modell
  • .NET Core中的时区转换问题