diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 45355d4f1b..bf919e16fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -44,6 +44,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -55,6 +57,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; @@ -109,8 +112,6 @@ public class TestBPOfferService { private long firstLeaseId = 0; private long secondLeaseId = 0; private long nextFullBlockReportLeaseId = 1L; - private int fullBlockReportCount = 0; - private int incrBlockReportCount = 0; static { GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL); @@ -233,14 +234,6 @@ public HeartbeatResponse answer(InvocationOnMock invocation) } } - private void setBlockReportCount(int count) { - fullBlockReportCount = count; - } - - private void setIncreaseBlockReportCount(int count) { - incrBlockReportCount += count; - } - /** * Test that the BPOS can register to talk to two different NNs, * sends block reports to both, etc. @@ -288,6 +281,7 @@ public void testMissBlocksWhenReregister() throws Exception { Thread addNewBlockThread = null; final AtomicInteger count = new AtomicInteger(0); DataNodeFaultInjector prevDNFaultInjector = null; + Set blocks = new TreeSet<>(); try { waitForBothActors(bpos); waitForInitialization(bpos); @@ -303,7 +297,7 @@ public void blockUtilSendFullBlockReport() { } }); - countBlockReportItems(FAKE_BLOCK, mockNN1); + countBlockReportItems(FAKE_BLOCK, mockNN1, blocks); addNewBlockThread = new Thread(() -> { for (int i = 0; i < totalTestBlocks; i++) { SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset; @@ -334,14 +328,12 @@ public void blockUtilSendFullBlockReport() { addNewBlockThread = null; // Verify FBR/IBR count is equal to generate number. try { - GenericTestUtils.waitFor(() -> - (fullBlockReportCount == totalTestBlocks || - incrBlockReportCount == totalTestBlocks), 1000, 15000); + GenericTestUtils.waitFor(() -> blocks.size() == totalTestBlocks, + 1000, 15000); } catch (Exception e) { - fail(String.format("Timed out wait for IBR counts FBRCount = %d," - + " IBRCount = %d; expected = %d. Exception: %s", - fullBlockReportCount, incrBlockReportCount, totalTestBlocks, - e.getMessage())); + fail(String.format("Timed out waiting for blocks count. " + + "reported = %d, expected = %d. Exception: %s", + blocks.size(), totalTestBlocks, e.getMessage())); } } finally { @@ -711,7 +703,8 @@ private void setTimeForSynchronousBPOSCalls() { * which assume no deleting blocks here. */ private void countBlockReportItems(final ExtendedBlock fakeBlock, - final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception { + final DatanodeProtocolClientSideTranslatorPB mockNN, + final Set blocks) throws Exception { final String fakeBlockPoolId = fakeBlock.getBlockPoolId(); final ArgumentCaptor captor = ArgumentCaptor.forClass(StorageBlockReport[].class); @@ -720,7 +713,9 @@ private void countBlockReportItems(final ExtendedBlock fakeBlock, Mockito.doAnswer((Answer) invocation -> { Object[] arguments = invocation.getArguments(); StorageBlockReport[] list = (StorageBlockReport[])arguments[2]; - setBlockReportCount(list[0].getBlocks().getNumberOfBlocks()); + for (BlockReportReplica brr : list[0].getBlocks()) { + blocks.add(brr.getBlockId()); + } return null; }).when(mockNN).blockReport( Mockito.any(), @@ -734,7 +729,9 @@ private void countBlockReportItems(final ExtendedBlock fakeBlock, Object[] arguments = invocation.getArguments(); StorageReceivedDeletedBlocks[] list = (StorageReceivedDeletedBlocks[])arguments[2]; - setIncreaseBlockReportCount(list[0].getBlocks().length); + for (ReceivedDeletedBlockInfo rdbi : list[0].getBlocks()) { + blocks.add(rdbi.getBlock().getBlockId()); + } return null; }).when(mockNN).blockReceivedAndDeleted( Mockito.any(), @@ -1233,4 +1230,4 @@ public void testCommandProcessingThreadExit() throws Exception { } } } -} \ No newline at end of file +}