当前位置: 首页 > news >正文

Hadoop之DataNode启动源码解析

Hadoop之DataNode启动源码解析

添加依赖

为了能够编译和运行 Hadoop 的 DataNode 组件,我们需要在项目的 pom.xml 文件中添加以下依赖:

<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs-client</artifactId><version>3.1.3</version><scope>provided</scope></dependency>
</dependencies>
DataNode 类介绍

DataNode 类是 Hadoop 分布式文件系统 (HDFS) 中的一个核心组件,它负责存储文件系统的数据块。每个部署可以包含一个或多个 DataNode 实例。DataNodeNameNode 通信以报告其存储状态,并响应来自 NameNode 的指令,如删除或复制块等操作。此外,DataNode 还需要与客户端代码和其他 DataNode 进行交互。

DataNode 主程序入口

DataNode 的主程序入口点位于 main 方法中,该方法首先检查命令行参数是否存在帮助请求,然后调用 secureMain 方法初始化 DataNode 实例。

public static void main(String args[]) {if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {System.exit(0);}secureMain(args, null);
}public static void secureMain(String args[], SecureResources resources) {int errorCode = 0;try {StringUtils.startupShutdownMessage(DataNode.class, args, LOG);DataNode datanode = createDataNode(args, null, resources);… …} catch (Throwable e) {LOG.error("Exception in secureMain", e);terminate(1, e);} finally {LOG.warn("Exiting Datanode");terminate(errorCode);}
}
DataNode 实例化

createDataNode 方法用于实例化 DataNode 对象,并启动其守护进程。

public static DataNode createDataNode(String args[], Configuration conf,SecureResources resources) throws IOException {// 初始化DNDataNode dn = instantiateDataNode(args, conf, resources);if (dn != null) {// 启动DN进程dn.runDatanodeDaemon();}return dn;
}public static DataNode instantiateDataNode(String args[], Configuration conf,SecureResources resources) throws IOException {... ...return makeInstance(dataLocations, conf, resources);
}static DataNode makeInstance(Collection<StorageLocation> dataDirs,Configuration conf, SecureResources resources) throws IOException {... ...return new DataNode(conf, locations, storageLocationChecker, resources);
}
DataNode 构造函数

构造函数初始化了 DataNode 的主要属性,并启动了必要的组件。

DataNode(final Configuration conf,final List<StorageLocation> dataDirs,final StorageLocationChecker storageLocationChecker,final SecureResources resources) throws IOException {super(conf);... ...try {hostName = getHostName(conf);LOG.info("Configured hostname is {}", hostName);// 启动DNstartDataNode(dataDirs, resources);} catch (IOException ie) {shutdown();throw ie;}... ...
}
DataNode 启动过程

startDataNode 方法初始化了 DataNode 的关键组件,包括数据存储 (DataStorage)、MXBean 注册、DataXceiver 服务器、HTTP 服务器等。

void startDataNode(List<StorageLocation> dataDirectories,SecureResources resources) throws IOException {... ...// 创建数据存储对象storage = new DataStorage();// global DN settingsregisterMXBean();// 初始化DataXceiverinitDataXceiver();// 启动HttpServerstartInfoServer();pauseMonitor = new JvmPauseMonitor();pauseMonitor.init(getConf());pauseMonitor.start();// BlockPoolTokenSecretManager is required to create ipc server.this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();// Login is done by now. Set the DN user name.dnUserName = UserGroupInformation.getCurrentUser().getUserName();LOG.info("dnUserName = {}", dnUserName);LOG.info("supergroup = {}", supergroup);// 初始化RPC服务initIpcServer();metrics = DataNodeMetrics.create(getConf(), getDisplayName());peerMetrics = dnConf.peerStatsEnabled ?DataNodePeerMetrics.create(getDisplayName(), getConf()) : null;metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);ecWorker = new ErasureCodingWorker(getConf(), this);blockRecoveryWorker = new BlockRecoveryWorker(this);// 创建BlockPoolManagerblockPoolManager = new BlockPoolManager(this);// 心跳管理blockPoolManager.refreshNamenodes(getConf());// Create the ReadaheadPool from the DataNode context so we can// exit without having to explicitly shutdown its thread pool.readaheadPool = ReadaheadPool.getInstance();saslClient = new SaslDataTransferClient(dnConf.getConf(),dnConf.saslPropsResolver, dnConf.trustedChannelResolver);saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);startMetricsLogger();if (dnConf.diskStatsEnabled) {diskMetrics = new DataNodeDiskMetrics(this,dnConf.outliersReportIntervalMs);}
}
初始化DataXceiverServer

initDataXceiver 方法创建并启动了 DataXceiverServer,它是 DataNode 用来接收客户端和其他 DataNode 发送过来的数据的服务。

private void initDataXceiver() throws IOException {// dataXceiverServer是一个服务,DN用来接收客户端和其他DN发送过来的数据服务this.dataXceiverServer = new Daemon(threadGroup, xserver);this.threadGroup.setDaemon(true); // auto destroy when empty... ...
}
初始化HTTP服务

startInfoServer 方法初始化并启动了 HTTP 服务器,用于提供有关 DataNode 的信息和服务。

private void startInfoServer()throws IOException {// SecureDataNodeStarter will bind the privileged port to the channel if// the DN is started by JSVC, pass it along.ServerSocketChannel httpServerChannel = secureResources != null ?secureResources.getHttpServerChannel() : null;httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);httpServer.start();if (httpServer.getHttpAddress() != null) {infoPort = httpServer.getHttpAddress().getPort();}if (httpServer.getHttpsAddress() != null) {infoSecurePort = httpServer.getHttpsAddress().getPort();}
}
DatanodeHttpServer 构造函数

DatanodeHttpServer 构造函数用于创建 HTTP 服务器实例。

public DatanodeHttpServer(final Configuration conf,final DataNode datanode,final ServerSocketChannel externalHttpChannel)throws IOException {... ...HttpServer2.Builder builder = new HttpServer2.Builder().setName("datanode").setConf(confForInfoServer).setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))).hostName(getHostnameForSpnegoPrincipal(confForInfoServer)).addEndpoint(URI.create("http://localhost:" + proxyPort)).setFindPort(true);... ...
}
初始化DataNode的RPC服务端

初始化 DataNode 的 RPC 服务端涉及到配置和启动相关的服务,以供客户端连接。

private void initIpcServer() throws IOException {InetSocketAddress ipcAddr = NetUtils.createSocketAddr(getConf().getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));... ...ipcServer = new RPC.Builder(getConf()).setProtocol(ClientDatanodeProtocolPB.class).setInstance(service).setBindAddress(ipcAddr.getHostName()).setPort(ipcAddr.getPort()).setNumHandlers(getConf().getInt(DFS_DATANODE_HANDLER_COUNT_KEY,DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false).setSecretManager(blockPoolTokenSecretManager).build();... ...
}
DataNode 向 NameNode 注册

DataNode 需要向 NameNode 注册自身,以便 NameNode 可以跟踪和管理集群中的所有 DataNode。

void refreshNamenodes(Configuration conf)throws IOException {... ...synchronized (refreshNamenodesLock) {doRefreshNamenodes(newAddressMap, newLifelineAddressMap);}
}private void doRefreshNamenodes(Map<String, Map<String, InetSocketAddress>> addrMap,Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)throws IOException {… …synchronized (this) {… …// Step 3. Start new nameservicesif (!toAdd.isEmpty()) {for (String nsToAdd : toAdd) {… …BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs);bpByNameserviceId.put(nsToAdd, bpos);offerServices.add(bpos);}}startAll();}… …
}protected BPOfferService createBPOS(final String nameserviceId,List<InetSocketAddress> nnAddrs,List<InetSocketAddress> lifelineNnAddrs) {// 根据NameNode个数创建对应的服务return new BPOfferService(nameserviceId, nnAddrs, lifelineNnAddrs, dn);
}
启动所有的服务

启动所有的服务涉及到创建服务实例并开始它们的生命周期。

synchronized void startAll() throws IOException {try {UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Object>() {@Overridepublic Object run() throws Exception {for (BPOfferService bpos : offerServices) {// 启动服务bpos.start();}return null;}});} catch (InterruptedException ex) {... ...}
}void start() {for (BPServiceActor actor : bpServices) {actor.start();}
}void start() {... ...bpThread = new Thread(this);bpThread.setDaemon(true); // needed for JUnit testing// 表示开启一个线程,所有查找该线程的 run 方法bpThread.start();if (lifelineSender != null) {lifelineSender.start();}
}
DataNode 服务线程运行

DataNode 服务线程运行涉及初始化和心跳发送。

public void run() {LOG.info(this + " starting to offer service");try {while (true) {// init stufftry {// setup storage// 向NN 注册connectToNNAndHandshake();break;} catch (IOException ioe) {// Initial handshake, storage recovery or registration failedrunningState = RunningState.INIT_FAILED;if (shouldRetryInit()) {// Retry until all namenode's of BPOS failed initializationLOG.error("Initialization failed for " + this + " "+ ioe.getLocalizedMessage());// 注册失败,5s后重试sleepAndLogInterrupts(5000, "initializing");} else {runningState = RunningState.FAILED;LOG.error("Initialization failed for " + this + ". Exiting. ", ioe);return;}}}… …while (shouldRun()) {try {// 发送心跳offerService();} catch (Exception ex) {... ...}}
}private void connectToNNAndHandshake() throws IOException {// get NN proxy 获取NN的RPC客户端对象bpNamenode = dn.connectToNN(nnAddr);// First phase of the handshake with NN - get the namespace// info.NamespaceInfo nsInfo = retrieveNamespaceInfo();// Verify that this matches the other NN in this HA pair.// This also initializes our block pool in the DN if we are// the first NN connection for this BP.bpos.verifyAndSetNamespaceInfo(this, nsInfo);/* set thread name again to include NamespaceInfo when it's available. */this.bpThread.setName(formatThreadName("heartbeating", nnAddr));// 注册register(nsInfo);
}
DataNode 注册至 NameNode

DataNode 注册至 NameNode 包括创建注册信息以及实际的注册过程。

void register(NamespaceInfo nsInfo) throws IOException {// 创建注册信息DatanodeRegistration newBpRegistration = bpos.createRegistration();LOG.info(this + " beginning handshake with NN");while (shouldRun()) {try {// Use returned registration from namenode with updated fields// 把注册信息发送给NN(DN调用接口方法,执行在NN)newBpRegistration = bpNamenode.registerDatanode(newBpRegistration);newBpRegistration.setNamespaceInfo(nsInfo);bpRegistration = newBpRegistration;break;} catch(EOFException e) {  // namenode might have just restartedLOG.info("Problem connecting to server: " + nnAddr + " :"+ e.getLocalizedMessage());sleepAndLogInterrupts(1000, "connecting to server");} catch(SocketTimeoutException e) {  // namenode is busyLOG.info("Problem connecting to server: " + nnAddr);sleepAndLogInterrupts(1000, "connecting to server");}}… …
}
NameNode 处理注册

NameNode 接收到 DataNode 的注册请求后,会进行处理。

public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)throws IOException {checkNNStartup();verifySoftwareVersion(nodeReg);// 注册DNnamesystem.registerDatanode(nodeReg);return nodeReg;
}void registerDatanode(DatanodeRegistration nodeReg) throws IOException {writeLock();try {blockManager.registerDatanode(nodeReg);} finally {writeUnlock("registerDatanode");}
}public void registerDatanode(DatanodeRegistration nodeReg)throws DisallowedDatanodeException, UnresolvedTopologyException {... ...// register new datanode 注册DNaddDatanode(nodeDescr);blockManager.getBlockReportLeaseManager().register(nodeDescr);// also treat the registration message as a heartbeat// no need to update its timestamp// because its is done when the descriptor is created// 将DN添加到心跳管理heartbeatManager.addDatanode(nodeDescr);heartbeatManager.updateDnStat(nodeDescr);incrementVersionCount(nodeReg.getSoftwareVersion());startAdminOperationIfNecessary(nodeDescr);success = true;... ...
}void addDatanode(final DatanodeDescriptor node) {// To keep host2DatanodeMap consistent with datanodeMap,// remove  from host2DatanodeMap the datanodeDescriptor removed// from datanodeMap before adding node to host2DatanodeMap.synchronized(this) {host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));}networktopology.add(node); // may throw InvalidTopologyExceptionhost2DatanodeMap.add(node);checkIfClusterIsNowMultiRack(node);resolveUpgradeDomain(node);… …
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Mybatis XML 数据源为 Oracle 之批量插入或更新 Merge Into 的具体介绍与使用
  • Android MediaRecorder 视频录制及报错解决
  • 【ubuntu24.04】docker安装
  • 7za解压缩工具
  • 常见漏洞描述及修复建议
  • SSv2数据集
  • QML基础学习
  • C++ 126类和对象_面像对像_继承
  • matlab 音频音量处理(音量大小按照dB调节)
  • 硬件工程师必须掌握的MOS管详细知识
  • QT stackwidget控件支持上下,左右手势滑动,触摸屏
  • Rust 面向对象编程
  • HTML静态网页成品作业(HTML+CSS)——花主题介绍网页设计制作(1个页面)
  • 解密键盘输入:探索设备控制器的奥秘
  • 基于STM32开发的智能家居照明控制系统
  • 03Go 类型总结
  • ECMAScript6(0):ES6简明参考手册
  • ES学习笔记(10)--ES6中的函数和数组补漏
  • HTTP传输编码增加了传输量,只为解决这一个问题 | 实用 HTTP
  • Linux快速复制或删除大量小文件
  • nodejs调试方法
  • Webpack 4 学习01(基础配置)
  • 分享自己折腾多时的一套 vue 组件 --we-vue
  • 基于MaxCompute打造轻盈的人人车移动端数据平台
  • 基于OpenResty的Lua Web框架lor0.0.2预览版发布
  • 前端攻城师
  • 如何在 Tornado 中实现 Middleware
  • 使用agvtool更改app version/build
  • 通过几道题目学习二叉搜索树
  • 无服务器化是企业 IT 架构的未来吗?
  • 线性表及其算法(java实现)
  • 想晋级高级工程师只知道表面是不够的!Git内部原理介绍
  • 深度学习之轻量级神经网络在TWS蓝牙音频处理器上的部署
  • “十年磨一剑”--有赞的HBase平台实践和应用之路 ...
  • ​LeetCode解法汇总2808. 使循环数组所有元素相等的最少秒数
  • ​中南建设2022年半年报“韧”字当头,经营性现金流持续为正​
  • #include<初见C语言之指针(5)>
  • #pragma 指令
  • #每日一题合集#牛客JZ23-JZ33
  • $$$$GB2312-80区位编码表$$$$
  • (4)Elastix图像配准:3D图像
  • (论文阅读11/100)Fast R-CNN
  • (十二)python网络爬虫(理论+实战)——实战:使用BeautfulSoup解析baidu热搜新闻数据
  • (算法)N皇后问题
  • .net core 6 集成和使用 mongodb
  • .NET Core实战项目之CMS 第一章 入门篇-开篇及总体规划
  • .NET Framework Client Profile - a Subset of the .NET Framework Redistribution
  • .net 重复调用webservice_Java RMI 远程调用详解,优劣势说明
  • .NET/C# 检测电脑上安装的 .NET Framework 的版本
  • .one4-V-XXXXXXXX勒索病毒数据怎么处理|数据解密恢复
  • .php结尾的域名,【php】php正则截取url中域名后的内容
  • /3GB和/USERVA开关
  • @NestedConfigurationProperty 注解用法
  • [240621] Anthropic 发布了 Claude 3.5 Sonnet AI 助手 | Socket.IO 拒绝服务漏洞
  • [acwing周赛复盘] 第 94 场周赛20230311