From c88640c4ad71dc6b7508d3a1c27ab059b92c02cf Mon Sep 17 00:00:00 2001 From: Symious Date: Mon, 22 Nov 2021 18:04:32 +0800 Subject: [PATCH] HDFS-16320. Datanode retrieve slownode information from NameNode (#3654) --- ...atanodeProtocolClientSideTranslatorPB.java | 3 +- .../hdfs/server/datanode/BPOfferService.java | 8 +++ .../hdfs/server/datanode/BPServiceActor.java | 7 +++ .../server/datanode/BlockPoolManager.java | 23 ++++++++ .../hadoop/hdfs/server/datanode/DataNode.java | 12 ++++ .../hdfs/server/namenode/FSNamesystem.java | 5 +- .../server/protocol/HeartbeatResponse.java | 15 ++++- .../src/main/proto/DatanodeProtocol.proto | 1 + .../server/datanode/TestBPOfferService.java | 59 ++++++++++++++++++- 9 files changed, 129 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 695a945ec7..fd58c0c7ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -183,7 +183,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements rollingUpdateStatus = PBHelperClient.convert(resp.getRollingUpgradeStatus()); } return new HeartbeatResponse(cmds, PBHelper.convert(resp.getHaStatus()), - rollingUpdateStatus, resp.getFullBlockReportLeaseId()); + rollingUpdateStatus, resp.getFullBlockReportLeaseId(), + resp.getIsSlownode()); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 3c63160f28..51a6f115cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -853,4 +853,12 @@ class BPOfferService { return isAlive(); } + boolean isSlownode() { + for (BPServiceActor actor : bpServices) { + if (actor.isSlownode()) { + return true; + } + } + return false; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 718520f727..57f6159093 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -112,6 +112,7 @@ class BPServiceActor implements Runnable { private String nnId = null; private volatile RunningState runningState = RunningState.CONNECTING; private volatile boolean shouldServiceRun = true; + private volatile boolean isSlownode = false; private final DataNode dn; private final DNConf dnConf; private long prevBlockReportId; @@ -205,6 +206,7 @@ class BPServiceActor implements Runnable { String.valueOf(getScheduler().getLastBlockReportTime())); info.put("maxBlockReportSize", String.valueOf(getMaxBlockReportSize())); info.put("maxDataLength", String.valueOf(maxDataLength)); + info.put("isSlownode", String.valueOf(isSlownode)); return info; } @@ -729,6 +731,7 @@ class BPServiceActor implements Runnable { handleRollingUpgradeStatus(resp); } commandProcessingThread.enqueue(resp.getCommands()); + isSlownode = resp.getIsSlownode(); } } @@ -1474,4 +1477,8 @@ class BPServiceActor implements Runnable { commandProcessingThread.interrupt(); } } + + boolean isSlownode() { + return isSlownode; + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java index 1139500c22..2ce81f593e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java @@ -307,4 +307,27 @@ class BlockPoolManager { Map getBpByNameserviceId() { return bpByNameserviceId; } + + boolean isSlownodeByNameserviceId(String nsId) { + if (bpByNameserviceId.containsKey(nsId)) { + return bpByNameserviceId.get(nsId).isSlownode(); + } + return false; + } + + boolean isSlownodeByBlockPoolId(String bpId) { + if (bpByBlockPoolId.containsKey(bpId)) { + return bpByBlockPoolId.get(bpId).isSlownode(); + } + return false; + } + + boolean isSlownode() { + for (BPOfferService bpOfferService : bpByBlockPoolId.values()) { + if (bpOfferService.isSlownode()) { + return true; + } + } + return false; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 0bb59de95a..3a4d4e9f27 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3814,4 +3814,16 @@ public class DataNode extends ReconfigurableBase return (stage == PIPELINE_SETUP_STREAMING_RECOVERY || stage == PIPELINE_SETUP_APPEND_RECOVERY); } + + boolean isSlownodeByNameserviceId(String nsId) { + return blockPoolManager.isSlownodeByNameserviceId(nsId); + } + + boolean isSlownodeByBlockPoolId(String bpId) { + return blockPoolManager.isSlownodeByBlockPoolId(bpId); + } + + boolean isSlownode() { + return blockPoolManager.isSlownode(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 3d9e9bb934..e278292659 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -4421,8 +4421,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, haContext.getState().getServiceState(), getFSImage().getCorrectLastAppliedOrWrittenTxId()); + Set slownodes = DatanodeManager.getSlowNodesUuidSet(); + boolean isSlownode = slownodes.contains(nodeReg.getDatanodeUuid()); + return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo, - blockReportLeaseId); + blockReportLeaseId, isSlownode); } finally { readUnlock("handleHeartbeat"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java index 8d6384e700..4ee930e67b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java @@ -36,14 +36,23 @@ public class HeartbeatResponse { private final RollingUpgradeStatus rollingUpdateStatus; private final long fullBlockReportLeaseId; - + + private final boolean isSlownode; + public HeartbeatResponse(DatanodeCommand[] cmds, NNHAStatusHeartbeat haStatus, RollingUpgradeStatus rollingUpdateStatus, long fullBlockReportLeaseId) { + this(cmds, haStatus, rollingUpdateStatus, fullBlockReportLeaseId, false); + } + + public HeartbeatResponse(DatanodeCommand[] cmds, + NNHAStatusHeartbeat haStatus, RollingUpgradeStatus rollingUpdateStatus, + long fullBlockReportLeaseId, boolean isSlownode) { commands = cmds; this.haStatus = haStatus; this.rollingUpdateStatus = rollingUpdateStatus; this.fullBlockReportLeaseId = fullBlockReportLeaseId; + this.isSlownode = isSlownode; } public DatanodeCommand[] getCommands() { @@ -61,4 +70,8 @@ public class HeartbeatResponse { public long getFullBlockReportLeaseId() { return fullBlockReportLeaseId; } + + public boolean getIsSlownode() { + return isSlownode; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 4a98f2d01e..3f89e951d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -223,6 +223,7 @@ message HeartbeatResponseProto { optional RollingUpgradeStatusProto rollingUpgradeStatus = 3; optional RollingUpgradeStatusProto rollingUpgradeStatusV2 = 4; optional uint64 fullBlockReportLeaseId = 5 [ default = 0 ]; + optional bool isSlownode = 6 [ default = false ]; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index fc2a998acb..057dd6459f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -30,6 +30,7 @@ import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; @@ -127,7 +128,8 @@ public class TestBPOfferService { private final int[] heartbeatCounts = new int[3]; private DataNode mockDn; private FsDatasetSpi mockFSDataset; - + private boolean isSlownode; + @Before public void setupMocks() throws Exception { mockNN1 = setupNNMock(0); @@ -216,6 +218,23 @@ public class TestBPOfferService { } } + private class HeartbeatIsSlownodeAnswer implements Answer { + private final int nnIdx; + + HeartbeatIsSlownodeAnswer(int nnIdx) { + this.nnIdx = nnIdx; + } + + @Override + public HeartbeatResponse answer(InvocationOnMock invocation) + throws Throwable { + HeartbeatResponse heartbeatResponse = new HeartbeatResponse( + datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null, + 0, isSlownode); + + return heartbeatResponse; + } + } private class HeartbeatRegisterAnswer implements Answer { private final int nnIdx; @@ -1182,6 +1201,44 @@ public class TestBPOfferService { } } + @Test(timeout = 15000) + public void testSetIsSlownode() throws Exception { + assertEquals(mockDn.isSlownode(), false); + Mockito.when(mockNN1.sendHeartbeat( + Mockito.any(DatanodeRegistration.class), + Mockito.any(StorageReport[].class), + Mockito.anyLong(), + Mockito.anyLong(), + Mockito.anyInt(), + Mockito.anyInt(), + Mockito.anyInt(), + Mockito.any(VolumeFailureSummary.class), + Mockito.anyBoolean(), + Mockito.any(SlowPeerReports.class), + Mockito.any(SlowDiskReports.class))) + .thenAnswer(new HeartbeatIsSlownodeAnswer(0)); + + BPOfferService bpos = setupBPOSForNNs(mockNN1); + bpos.start(); + + try { + waitForInitialization(bpos); + + bpos.triggerHeartbeatForTests(); + assertFalse(bpos.isSlownode()); + + isSlownode = true; + bpos.triggerHeartbeatForTests(); + assertTrue(bpos.isSlownode()); + + isSlownode = false; + bpos.triggerHeartbeatForTests(); + assertFalse(bpos.isSlownode()); + } finally { + bpos.stop(); + } + } + @Test(timeout = 15000) public void testCommandProcessingThread() throws Exception { Configuration conf = new HdfsConfiguration();