From 5b3074fdcd0e13db89a2d0d23762e2c51752145d Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 21 Jul 2024 22:28:34 +0800 Subject: [PATCH] =?UTF-8?q?BPServiceActor=E8=AF=A6=E8=A7=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hdfs/BPServiceActor详解.md | 106 +++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 hdfs/BPServiceActor详解.md diff --git a/hdfs/BPServiceActor详解.md b/hdfs/BPServiceActor详解.md new file mode 100644 index 0000000..d27ba5d --- /dev/null +++ b/hdfs/BPServiceActor详解.md @@ -0,0 +1,106 @@ + +# 简介 +BPServiceActor 主要在DataNode中用于和NameNode沟通的类。主要功能如下: +- 与 namenode 进行预注册握手。 +- 向 namenode 注册。 +- 定期向 namenode 发送心跳。 +- 处理从 namenode 收到的命令。 + + +# 核心功能 + +## 预注册 + + +## 注册DataNode + +### DataNode注册核心逻辑 + +注册DataNode的核心函数是register + +```java +void register(NamespaceInfo nsInfo) throws IOException { + // The handshake() phase loaded the block pool storage + // off disk - so update the bpRegistration object from that info + DatanodeRegistration newBpRegistration = bpos.createRegistration(); + + LOG.info(this + " beginning handshake with NN"); + + while (shouldRun()) { + try { + // 向NN注册DataNodde + newBpRegistration = bpNamenode.registerDatanode(newBpRegistration); + newBpRegistration.setNamespaceInfo(nsInfo); + bpRegistration = newBpRegistration; + break; + // ... 省略部分异常处理 + } + } + + if (bpRegistration == null) { + throw new IOException("DN shut down before block pool registered"); + } + + // .. 省略 .. + // reset lease id whenever registered to NN. + // ask for a new lease id at the next heartbeat. + fullBlockReportLeaseId = 0; + + // random short delay - helps scatter the BR from all DatanodeRegistration + // 定时汇报block块信息给NN + scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs, true); +} +``` + +### NameNode 处理注册信息 + +NameNode处理DataNode的注册信息,主要接口是在NameNodeRpcServer的ipc接口registerDatanode。 +其主要的实现实在DatanodeManager中的函数registerDatanode。 + + +主要实现如下: + +```java +// update cluster map +getNetworkTopology().remove(nodeS); +if(shouldCountVersion(nodeS)) { + decrementVersionCount(nodeS.getSoftwareVersion()); +} +nodeS.updateRegInfo(nodeReg); + +nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion()); +nodeS.setDisallowed(false); // Node is in the include list + +// resolve network location +if(this.rejectUnresolvedTopologyDN) { + nodeS.setNetworkLocation(resolveNetworkLocation(nodeS)); + nodeS.setDependentHostNames(getNetworkDependencies(nodeS)); +} else { + nodeS.setNetworkLocation( + resolveNetworkLocationWithFallBackToDefaultLocation(nodeS)); + nodeS.setDependentHostNames( + getNetworkDependenciesWithDefault(nodeS)); +} +getNetworkTopology().add(nodeS); +resolveUpgradeDomain(nodeS); + +// also treat the registration message as a heartbeat +heartbeatManager.register(nodeS); +incrementVersionCount(nodeS.getSoftwareVersion()); +startAdminOperationIfNecessary(nodeS); +success = true; + +``` + +主要做了下面两件事: +- 更新节点信息,比如网络拓扑信息。 +- 在心跳管理里面注册当前节点。只要将节点信息添加到节点列表里面即可。 + + +## 发送心跳 + + +## 处理NN的命令 + + +