diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 54d09c2d7b..163baf5ecb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -281,23 +281,30 @@ public void testBasicFunctionality() throws Exception { public void testMissBlocksWhenReregister() throws Exception { BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2); bpos.start(); + int totalTestBlocks = 4000; + Thread addNewBlockThread = null; + final AtomicInteger count = new AtomicInteger(0); + try { waitForBothActors(bpos); waitForInitialization(bpos); - DataNodeFaultInjector.set(new DataNodeFaultInjector() { public void blockUtilSendFullBlockReport() { try { - Thread.sleep(200); - } catch (InterruptedException e) { + GenericTestUtils.waitFor(() -> { + if(count.get() > 2000) { + return true; + } + return false; + }, 100, 1000); + } catch (Exception e) { e.printStackTrace(); } } }); countBlockReportItems(FAKE_BLOCK, mockNN1); - int totalTestBlocks = 4000; - Thread addNewBlockThread = new Thread(() -> { + addNewBlockThread = new Thread(() -> { for (int i = 0; i < totalTestBlocks; i++) { SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset; SimulatedStorage simulatedStorage = fsDataset.getStorages().get(0); @@ -307,6 +314,7 @@ public void blockUtilSendFullBlockReport() { fsDataset.createRbw(StorageType.DEFAULT, storageId, b, false); bpos.notifyNamenodeReceivingBlock(b, storageId); fsDataset.finalizeBlock(b, false); + count.addAndGet(1); Thread.sleep(1); } catch (Exception e) { e.printStackTrace(); @@ -316,7 +324,13 @@ public void blockUtilSendFullBlockReport() { addNewBlockThread.start(); // Make sure that generate blocks for DataNode and IBR not empty now. - Thread.sleep(200); + GenericTestUtils.waitFor(() -> { + if(count.get() > 0) { + return true; + } + return false; + }, 100, 1000); + // Trigger re-register using DataNode Command. datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER}; bpos.triggerHeartbeatForTests(); @@ -335,6 +349,7 @@ public void blockUtilSendFullBlockReport() { assertTrue(fullBlockReportCount == totalTestBlocks || incrBlockReportCount == totalTestBlocks); } finally { + addNewBlockThread.join(); bpos.stop(); bpos.join(); @@ -695,12 +710,17 @@ private void setTimeForSynchronousBPOSCalls() { } } + /** + * Record blocks counts of block report and total adding blocks count of IBR + * which assume no deleting blocks here. + */ private void countBlockReportItems(final ExtendedBlock fakeBlock, final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception { final String fakeBlockPoolId = fakeBlock.getBlockPoolId(); final ArgumentCaptor captor = ArgumentCaptor.forClass(StorageBlockReport[].class); + // Record blocks count about the last time block report. Mockito.doAnswer((Answer) invocation -> { Object[] arguments = invocation.getArguments(); StorageBlockReport[] list = (StorageBlockReport[])arguments[2]; @@ -713,6 +733,7 @@ private void countBlockReportItems(final ExtendedBlock fakeBlock, Mockito.any() ); + // Record total adding blocks count and assume no deleting blocks here. Mockito.doAnswer((Answer) invocation -> { Object[] arguments = invocation.getArguments(); StorageReceivedDeletedBlocks[] list =