diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 49f64c2e5d..39f821972a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -798,6 +798,11 @@ void reRegister() throws IOException { // and re-register register(nsInfo); scheduler.scheduleHeartbeat(); + // HDFS-9917,Standby NN IBR can be very huge if standby namenode is down + // for sometime. + if (state == HAServiceState.STANDBY) { + ibrManager.clearIBRs(); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java index b9b348a7de..e95142db87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java @@ -258,4 +258,13 @@ synchronized void triggerDeletionReportForTests() { } } } + + void clearIBRs() { + pendingIBRs.clear(); + } + + @VisibleForTesting + int getPendingIBRSize() { + return pendingIBRs.size(); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 95a103e2e8..29db702cf2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -48,10 +49,12 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -92,7 +95,10 @@ public class TestBPOfferService { private DatanodeProtocolClientSideTranslatorPB mockNN1; private DatanodeProtocolClientSideTranslatorPB mockNN2; - private final NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2]; + private final NNHAStatusHeartbeat[] mockHaStatuses = + new NNHAStatusHeartbeat[2]; + private final DatanodeCommand[][] datanodeCommands = + new DatanodeCommand[2][0]; private final int[] heartbeatCounts = new int[2]; private DataNode mockDn; private FsDatasetSpi mockFSDataset; @@ -147,6 +153,7 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx) Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean()); mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0); + datanodeCommands[nnIdx] = new DatanodeCommand[0]; return mock; } @@ -165,9 +172,12 @@ public HeartbeatAnswer(int nnIdx) { @Override public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable { heartbeatCounts[nnIdx]++; - return new HeartbeatResponse(new DatanodeCommand[0], - mockHaStatuses[nnIdx], null, + HeartbeatResponse heartbeatResponse = new HeartbeatResponse( + datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null, ThreadLocalRandom.current().nextLong() | 1L); + //reset the command + datanodeCommands[nnIdx] = new DatanodeCommand[0]; + return heartbeatResponse; } } @@ -709,4 +719,84 @@ public void testReportBadBlocksWhenNNThrowsStandbyException() bpos.join(); } } + + /* + * HDFS-9917 : Standby IBR accumulation when Standby was down. + */ + @Test + public void testIBRClearanceForStandbyOnReRegister() throws Exception { + final BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2); + bpos.start(); + try { + waitForInitialization(bpos); + // Should start with neither NN as active. + assertNull(bpos.getActiveNN()); + // Have NN1 claim active at txid 1 + mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1); + bpos.triggerHeartbeatForTests(); + // Now mockNN1 is acting like active namenode and mockNN2 as Standby + assertSame(mockNN1, bpos.getActiveNN()); + // Return nothing when active Active Namenode gets IBRs + Mockito.doNothing().when(mockNN1).blockReceivedAndDeleted( + Mockito.any(DatanodeRegistration.class), Mockito.anyString(), Mockito + .any(StorageReceivedDeletedBlocks[].class)); + + final IOException re = new IOException( + "Standby NN is currently not able to process IBR"); + + final AtomicBoolean ibrReported = new AtomicBoolean(false); + // throw exception for standby when first IBR is receieved + Mockito.doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ibrReported.set(true); + throw re; + } + }).when(mockNN2).blockReceivedAndDeleted( + Mockito.any(DatanodeRegistration.class), Mockito.anyString(), Mockito + .any(StorageReceivedDeletedBlocks[].class)); + + DatanodeStorage storage = Mockito.mock(DatanodeStorage.class); + Mockito.doReturn(storage).when(mockFSDataset).getStorage("storage0"); + // Add IBRs + bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "storage0", false); + // Send heartbeat so that the BpServiceActor can send IBR to + // namenode + bpos.triggerHeartbeatForTests(); + // Wait till first IBR is received at standbyNN. Just for confirmation. + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return ibrReported.get(); + } + }, 100, 1000); + + // Send register command back to Datanode to reRegister(). + // After reRegister IBRs should be cleared. + datanodeCommands[1] = new DatanodeCommand[] { new RegisterCommand() }; + assertEquals( + "IBR size before reRegister should be non-0", 1, getStandbyIBRSize( + bpos)); + bpos.triggerHeartbeatForTests(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return getStandbyIBRSize(bpos) == 0; + } + }, 100, 1000); + } finally { + bpos.stop(); + bpos.join(); + } + } + + private int getStandbyIBRSize(BPOfferService bpos) { + List bpServiceActors = bpos.getBPServiceActors(); + for (BPServiceActor bpServiceActor : bpServiceActors) { + if (bpServiceActor.state == HAServiceState.STANDBY) { + return bpServiceActor.getIbrManager().getPendingIBRSize(); + } + } + return -1; + } }