diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index f22a5862fc..b0a721f970 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -107,6 +107,7 @@ enum RunningState { private final DataNode dn; private final DNConf dnConf; private long prevBlockReportId; + private long fullBlockReportLeaseId; private final SortedSet blockReportSizes = Collections.synchronizedSortedSet(new TreeSet<>()); private final int maxDataLength; @@ -131,6 +132,7 @@ enum RunningState { dnConf.ibrInterval, dn.getMetrics()); prevBlockReportId = ThreadLocalRandom.current().nextLong(); + fullBlockReportLeaseId = 0; scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval, dnConf.outliersReportIntervalMs); @@ -638,7 +640,6 @@ private void offerService() throws Exception { + "; heartBeatInterval=" + dnConf.heartBeatInterval + (lifelineSender != null ? "; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : "")); - long fullBlockReportLeaseId = 0; // // Now loop for a long time.... @@ -806,6 +807,10 @@ void register(NamespaceInfo nsInfo) throws IOException { LOG.info("Block pool " + this + " successfully registered with NN"); bpos.registrationSucceeded(this, bpRegistration); + // reset lease id whenever registered to NN. + // ask for a new lease id at the next heartbeat. + fullBlockReportLeaseId = 0; + // random short delay - helps scatter the BR from all DNs scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 87bbd738dd..d34654edf9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -27,12 +27,12 @@ import java.io.File; import java.io.IOException; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -61,6 +61,8 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; @@ -90,6 +92,9 @@ public class TestBPOfferService { private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestBPOfferService.class); private long firstCallTime = 0; private long secondCallTime = 0; + private long firstLeaseId = 0; + private long secondLeaseId = 0; + private long nextFullBlockReportLeaseId = 1L; static { GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL); @@ -169,16 +174,24 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx) private class HeartbeatAnswer implements Answer { private final int nnIdx; - public HeartbeatAnswer(int nnIdx) { + HeartbeatAnswer(int nnIdx) { this.nnIdx = nnIdx; } @Override - public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable { + public HeartbeatResponse answer(InvocationOnMock invocation) + throws Throwable { heartbeatCounts[nnIdx]++; + Boolean requestFullBlockReportLease = + (Boolean) invocation.getArguments()[8]; + long fullBlockReportLeaseId = 0; + if (requestFullBlockReportLease) { + fullBlockReportLeaseId = nextFullBlockReportLeaseId++; + } + LOG.info("fullBlockReportLeaseId=" + fullBlockReportLeaseId); HeartbeatResponse heartbeatResponse = new HeartbeatResponse( datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null, - ThreadLocalRandom.current().nextLong() | 1L); + fullBlockReportLeaseId); //reset the command datanodeCommands[nnIdx] = new DatanodeCommand[0]; return heartbeatResponse; @@ -186,6 +199,24 @@ public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable { } + private class HeartbeatRegisterAnswer implements Answer { + private final int nnIdx; + + HeartbeatRegisterAnswer(int nnIdx) { + this.nnIdx = nnIdx; + } + + @Override + public HeartbeatResponse answer(InvocationOnMock invocation) + throws Throwable { + heartbeatCounts[nnIdx]++; + DatanodeCommand[] cmds = new DatanodeCommand[1]; + cmds[0] = new RegisterCommand(); + return new HeartbeatResponse(cmds, mockHaStatuses[nnIdx], + null, 0L); + } + } + /** * Test that the BPOS can register to talk to two different NNs, * sends block reports to both, etc. @@ -521,6 +552,26 @@ private Boolean get(DatanodeProtocolClientSideTranslatorPB mockNN) { }, 500, 10000); } + private void waitForRegistration( + final DatanodeProtocolClientSideTranslatorPB mockNN, int times) + throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + // The DN should have register to both NNs. + // first called by connectToNNAndHandshake, then called by reRegister. + Mockito.verify(mockNN, Mockito.times(2)) + .registerDatanode(Mockito.any()); + return true; + } catch (Throwable t) { + LOG.info("waiting on block registerDatanode: " + t.getMessage()); + return false; + } + } + }, 500, 10000); + } + private ReceivedDeletedBlockInfo[] waitForBlockReceived( final ExtendedBlock fakeBlock, final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception { @@ -857,7 +908,7 @@ public void testNNHAStateUpdateFromVersionRequest() throws Exception { } - @Test + @Test(timeout = 30000) public void testRefreshNameNodes() throws Exception { BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2); @@ -930,4 +981,68 @@ public void testRefreshNameNodes() throws Exception { bpos.join(); } } + + @Test(timeout = 15000) + public void testRefreshLeaseId() throws Exception { + Mockito.when(mockNN1.sendHeartbeat( + Mockito.any(DatanodeRegistration.class), + Mockito.any(StorageReport[].class), + Mockito.anyLong(), + Mockito.anyLong(), + Mockito.anyInt(), + Mockito.anyInt(), + Mockito.anyInt(), + Mockito.any(VolumeFailureSummary.class), + Mockito.anyBoolean(), + Mockito.any(SlowPeerReports.class), + Mockito.any(SlowDiskReports.class))) + //heartbeat to old NN instance + .thenAnswer(new HeartbeatAnswer(0)) + //heartbeat to new NN instance with Register Command + .thenAnswer(new HeartbeatRegisterAnswer(0)) + .thenAnswer(new HeartbeatAnswer(0)); + + Mockito.when(mockNN1.blockReport( + Mockito.any(DatanodeRegistration.class), + Mockito.anyString(), + Mockito.any(StorageBlockReport[].class), + Mockito.any(BlockReportContext.class))) + .thenAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) + throws Throwable { + BlockReportContext context = + (BlockReportContext) invocation.getArguments()[3]; + long leaseId = context.getLeaseId(); + LOG.info("leaseId = "+leaseId); + + // leaseId == 1 means DN make block report with old leaseId + // just reject and wait until DN request for a new leaseId + if(leaseId == 1) { + firstLeaseId = leaseId; + throw new ConnectException( + "network is not reachable for test. "); + } else { + secondLeaseId = leaseId; + return null; + } + } + }); + + BPOfferService bpos = setupBPOSForNNs(mockNN1); + bpos.start(); + + try { + waitForInitialization(bpos); + // Should call registration 2 times + waitForRegistration(mockNN1, 2); + assertEquals(1L, firstLeaseId); + while(secondLeaseId != 2L) { + Thread.sleep(1000); + } + } finally { + bpos.stop(); + } + } }