From 85a20508bd04851d47c24b7562ec2927d5403446 Mon Sep 17 00:00:00 2001 From: Vinitha Reddy Gankidi Date: Mon, 25 Jul 2016 16:56:45 -0700 Subject: [PATCH] HDFS-10301. Interleaving processing of storages from repeated block reports causes false zombie storage detection, removes valid blocks. Contributed by Vinitha Gankidi. --- .../hdfs/protocol/BlockListAsLongs.java | 45 +++++++++++ .../server/blockmanagement/BlockManager.java | 52 ++++++------ .../BlockReportLeaseManager.java | 4 +- .../blockmanagement/DatanodeDescriptor.java | 29 +------ .../blockmanagement/DatanodeStorageInfo.java | 11 --- .../hdfs/server/datanode/BPServiceActor.java | 35 ++++++-- .../server/namenode/NameNodeRpcServer.java | 40 ++++++---- .../blockmanagement/TestBlockManager.java | 19 ++--- .../TestNameNodePrunesMissingStorages.java | 80 +++++++++++++++++-- ...stDnRespectsBlockReportSplitThreshold.java | 33 +++++++- .../TestNNHandlesBlockReportPerStorage.java | 34 ++++++-- 11 files changed, 274 insertions(+), 108 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java index 26c7ffb02c..26340a9a77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java @@ -63,6 +63,34 @@ public long[] getBlockListAsLongs() { public Iterator iterator() { return Collections.emptyIterator(); } + @Override + public boolean isStorageReport() { + return false; + } + }; + + // STORAGE_REPORT is used to report all storages in the DN + public static final BlockListAsLongs STORAGE_REPORT = new BlockListAsLongs() { + @Override + public int getNumberOfBlocks() { + return -1; + } + @Override + public ByteString getBlocksBuffer() { + return ByteString.EMPTY; + } + @Override + public long[] getBlockListAsLongs() { + return EMPTY_LONGS; + } + @Override + public Iterator iterator() { + return Collections.emptyIterator(); + } + @Override + public boolean isStorageReport() { + return true; + } }; /** @@ -252,6 +280,13 @@ public List getBlocksBuffers() { */ abstract public long[] getBlockListAsLongs(); + /** + * Return true for STORAGE_REPORT BlocksListsAsLongs. + * Otherwise return false. + * @return boolean + */ + abstract public boolean isStorageReport(); + /** * Returns a singleton iterator over blocks in the block report. Do not * add the returned blocks to a collection. @@ -391,6 +426,11 @@ public long[] getBlockListAsLongs() { return longs; } + @Override + public boolean isStorageReport() { + return false; + } + @Override public Iterator iterator() { return new Iterator() { @@ -474,6 +514,11 @@ public long[] getBlockListAsLongs() { return longs; } + @Override + public boolean isStorageReport() { + return false; + } + @Override public Iterator iterator() { return new Iterator() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 349b018505..d927b2a8a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2138,7 +2138,7 @@ private static class BlockInfoToAdd { public boolean processReport(final DatanodeID nodeID, final DatanodeStorage storage, final BlockListAsLongs newReport, - BlockReportContext context, boolean lastStorageInRpc) throws IOException { + BlockReportContext context) throws IOException { namesystem.writeLock(); final long startTime = Time.monotonicNow(); //after acquiring write lock final long endTime; @@ -2189,30 +2189,14 @@ public boolean processReport(final DatanodeID nodeID, storageInfo.receivedBlockReport(); if (context != null) { - storageInfo.setLastBlockReportId(context.getReportId()); - if (lastStorageInRpc) { - int rpcsSeen = node.updateBlockReportContext(context); - if (rpcsSeen >= context.getTotalRpcs()) { - long leaseId = blockReportLeaseManager.removeLease(node); - BlockManagerFaultInjector.getInstance(). - removeBlockReportLease(node, leaseId); - List zombies = node.removeZombieStorages(); - if (zombies.isEmpty()) { - LOG.debug("processReport 0x{}: no zombie storages found.", - Long.toHexString(context.getReportId())); - } else { - for (DatanodeStorageInfo zombie : zombies) { - removeZombieReplicas(context, zombie); - } - } - node.clearBlockReportContext(); - } else { - LOG.debug("processReport 0x{}: {} more RPCs remaining in this " + - "report.", Long.toHexString(context.getReportId()), - (context.getTotalRpcs() - rpcsSeen) - ); - } + if (context.getTotalRpcs() == context.getCurRpc() + 1) { + long leaseId = this.getBlockReportLeaseManager().removeLease(node); + BlockManagerFaultInjector.getInstance(). + removeBlockReportLease(node, leaseId); } + LOG.debug("Processing RPC with index {} out of total {} RPCs in " + + "processReport 0x{}", context.getCurRpc(), + context.getTotalRpcs(), Long.toHexString(context.getReportId())); } } finally { endTime = Time.monotonicNow(); @@ -2238,6 +2222,26 @@ public boolean processReport(final DatanodeID nodeID, return !node.hasStaleStorages(); } + public void removeZombieStorages(DatanodeRegistration nodeReg, + BlockReportContext context, Set storageIDsInBlockReport) + throws UnregisteredNodeException { + namesystem.writeLock(); + DatanodeDescriptor node = this.getDatanodeManager().getDatanode(nodeReg); + if (node != null) { + List zombies = + node.removeZombieStorages(storageIDsInBlockReport); + if (zombies.isEmpty()) { + LOG.debug("processReport 0x{}: no zombie storages found.", + Long.toHexString(context.getReportId())); + } else { + for (DatanodeStorageInfo zombie : zombies) { + this.removeZombieReplicas(context, zombie); + } + } + } + namesystem.writeUnlock(); + } + private void removeZombieReplicas(BlockReportContext context, DatanodeStorageInfo zombie) { LOG.warn("processReport 0x{}: removing zombie storage {}, which no " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java index 7db05c7aaa..34e094923f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java @@ -308,10 +308,10 @@ public synchronized boolean checkLease(DatanodeDescriptor dn, return false; } if (node.leaseId == 0) { - LOG.warn("BR lease 0x{} is not valid for DN {}, because the DN " + + LOG.warn("BR lease 0x{} is not found for DN {}, because the DN " + "is not in the pending set.", Long.toHexString(id), dn.getDatanodeUuid()); - return false; + return true; } if (pruneIfExpired(monotonicNowMs, node)) { LOG.warn("BR lease 0x{} is not valid for DN {}, because the lease " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 1646129680..d807ab6136 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; -import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -43,7 +42,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; -import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -154,9 +152,6 @@ public Type getType() { public final DecommissioningStatus decommissioningStatus = new DecommissioningStatus(); - private long curBlockReportId = 0; - - private BitSet curBlockReportRpcsSeen = null; private final Map storageMap = new HashMap<>(); @@ -257,20 +252,6 @@ public DatanodeDescriptor(DatanodeID nodeID, updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null); } - public int updateBlockReportContext(BlockReportContext context) { - if (curBlockReportId != context.getReportId()) { - curBlockReportId = context.getReportId(); - curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs()); - } - curBlockReportRpcsSeen.set(context.getCurRpc()); - return curBlockReportRpcsSeen.cardinality(); - } - - public void clearBlockReportContext() { - curBlockReportId = 0; - curBlockReportRpcsSeen = null; - } - public CachedBlocksList getPendingCached() { return pendingCached; } @@ -334,7 +315,8 @@ boolean hasStaleStorages() { } } - List removeZombieStorages() { + List + removeZombieStorages(Set storageIDsInBlockReport) { List zombies = null; synchronized (storageMap) { Iterator> iter = @@ -342,18 +324,13 @@ List removeZombieStorages() { while (iter.hasNext()) { Map.Entry entry = iter.next(); DatanodeStorageInfo storageInfo = entry.getValue(); - if (storageInfo.getLastBlockReportId() != curBlockReportId) { - LOG.info("{} had lastBlockReportId 0x{} but curBlockReportId = 0x{}", - storageInfo.getStorageID(), - Long.toHexString(storageInfo.getLastBlockReportId()), - Long.toHexString(curBlockReportId)); + if (!storageIDsInBlockReport.contains(storageInfo.getStorageID())) { iter.remove(); if (zombies == null) { zombies = new LinkedList<>(); } zombies.add(storageInfo); } - storageInfo.setLastBlockReportId(0); } } return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 843a8d514b..1b7cd7c6fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -98,9 +98,6 @@ public void updateFromStorage(DatanodeStorage storage) { private final FoldedTreeSet blocks = new FoldedTreeSet<>(); - // The ID of the last full block report which updated this storage. - private long lastBlockReportId = 0; - /** The number of block reports received */ private int blockReportCount = 0; @@ -165,14 +162,6 @@ public void setUtilizationForTesting(long capacity, long dfsUsed, this.blockPoolUsed = blockPoolUsed; } - long getLastBlockReportId() { - return lastBlockReportId; - } - - void setLastBlockReportId(long lastBlockReportId) { - this.lastBlockReportId = lastBlockReportId; - } - State getState() { return this.state; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 69989fbbc6..f18cf0bf65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -367,11 +367,36 @@ List blockReport(long fullBrLeaseId) throws IOException { } else { // Send one block report per message. for (int r = 0; r < reports.length; r++) { - StorageBlockReport singleReport[] = { reports[r] }; - DatanodeCommand cmd = bpNamenode.blockReport( - bpRegistration, bpos.getBlockPoolId(), singleReport, - new BlockReportContext(reports.length, r, reportId, - fullBrLeaseId, true)); + StorageBlockReport[] singleReport = {reports[r]}; + DatanodeCommand cmd; + if (r != reports.length - 1) { + cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), + singleReport, new BlockReportContext(reports.length, r, + reportId, fullBrLeaseId, true)); + } else { + StorageBlockReport[] lastSplitReport = + new StorageBlockReport[perVolumeBlockLists.size()]; + // When block reports are split, the last RPC in the block report + // has the information about all storages in the block report. + // See HDFS-10301 for more details. To achieve this, the last RPC + // has 'n' storage reports, where 'n' is the number of storages in + // a DN. The actual block replicas are reported only for the + // last/n-th storage. + i = 0; + for(Map.Entry kvPair : + perVolumeBlockLists.entrySet()) { + lastSplitReport[i++] = new StorageBlockReport( + kvPair.getKey(), BlockListAsLongs.STORAGE_REPORT); + if (i == r) { + lastSplitReport[i] = reports[r]; + break; + } + } + cmd = bpNamenode.blockReport( + bpRegistration, bpos.getBlockPoolId(), lastSplitReport, + new BlockReportContext(reports.length, r, reportId, + fullBrLeaseId, true)); + } numReportsSent++; numRPCs++; if (cmd != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 6b52949868..3f36fcceef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1435,24 +1435,36 @@ public DatanodeCommand blockReport(final DatanodeRegistration nodeReg, boolean noStaleStorages = false; for (int r = 0; r < reports.length; r++) { final BlockListAsLongs blocks = reports[r].getBlocks(); - // - // BlockManager.processReport accumulates information of prior calls - // for the same node and storage, so the value returned by the last - // call of this loop is the final updated value for noStaleStorage. - // - final int index = r; - noStaleStorages = bm.runBlockOp(new Callable() { - @Override - public Boolean call() throws IOException { - return bm.processReport(nodeReg, reports[index].getStorage(), - blocks, context, (index == reports.length - 1)); - } - }); - metrics.incrStorageBlockReportOps(); + if (!blocks.isStorageReport()) { + // + // BlockManager.processReport accumulates information of prior calls + // for the same node and storage, so the value returned by the last + // call of this loop is the final updated value for noStaleStorage. + // + final int index = r; + noStaleStorages = bm.runBlockOp(new Callable() { + @Override + public Boolean call() + throws IOException { + return bm.processReport(nodeReg, reports[index].getStorage(), + blocks, context); + } + }); + metrics.incrStorageBlockReportOps(); + } } BlockManagerFaultInjector.getInstance(). incomingBlockReportRpc(nodeReg, context); + if (nn.getFSImage().isUpgradeFinalized() && + context.getTotalRpcs() == context.getCurRpc() + 1) { + Set storageIDsInBlockReport = new HashSet<>(); + for (StorageBlockReport report : reports) { + storageIDsInBlockReport.add(report.getStorage().getStorageID()); + } + bm.removeZombieStorages(nodeReg, context, storageIDsInBlockReport); + } + if (nn.getFSImage().isUpgradeFinalized() && !namesystem.isRollingUpgrade() && !nn.isStandbyState() && diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 394fae9655..8c231d1e2e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -713,12 +713,12 @@ public void testSafeModeIBR() throws Exception { reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); assertEquals(1, ds.getBlockReportCount()); // send block report again, should NOT be processed reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); assertEquals(1, ds.getBlockReportCount()); // re-register as if node restarted, should update existing node @@ -729,7 +729,7 @@ public void testSafeModeIBR() throws Exception { // send block report, should be processed after restart reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); // Reinitialize as registration with empty storage list pruned // node.storageMap. ds = node.getStorageInfos()[0]; @@ -758,7 +758,7 @@ public void testSafeModeIBRAfterIncremental() throws Exception { reset(node); doReturn(1).when(node).numBlocks(); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); assertEquals(1, ds.getBlockReportCount()); } @@ -832,7 +832,7 @@ public void testSafeModeIBRBeforeFirstFullBR() throws Exception { assertEquals(0, ds.getBlockReportCount()); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), builder.build(), - new BlockReportContext(1, 0, System.nanoTime(), 0, true), false); + new BlockReportContext(1, 0, System.nanoTime(), 0, true)); assertEquals(1, ds.getBlockReportCount()); // verify the storage info is correct @@ -871,8 +871,7 @@ public void testFullBR() throws Exception { assertEquals(0, ds.getBlockReportCount()); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), generateReport(blocks), - new BlockReportContext(1, 0, System.nanoTime(), 0, false), - false); + new BlockReportContext(1, 0, System.nanoTime(), 0, false)); assertEquals(1, ds.getBlockReportCount()); // verify the storage info is correct for (BlockInfo block : blocks) { @@ -882,8 +881,7 @@ public void testFullBR() throws Exception { // Send unsorted report bm.processReport(node, new DatanodeStorage(ds.getStorageID()), generateReport(blocks), - new BlockReportContext(1, 0, System.nanoTime(), 0, false), - false); + new BlockReportContext(1, 0, System.nanoTime(), 0, false)); assertEquals(2, ds.getBlockReportCount()); // verify the storage info is correct for (BlockInfo block : blocks) { @@ -894,8 +892,7 @@ public void testFullBR() throws Exception { Collections.sort(blocks); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), generateReport(blocks), - new BlockReportContext(1, 0, System.nanoTime(), 0, true), - false); + new BlockReportContext(1, 0, System.nanoTime(), 0, true)); assertEquals(3, ds.getBlockReportCount()); // verify the storage info is correct for (BlockInfo block : blocks) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index b11b48aed7..be38afeadc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -19,34 +19,40 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import com.google.common.base.Supplier; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Test; -import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; @@ -55,8 +61,6 @@ import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Writer; -import java.util.Arrays; -import java.util.EnumSet; import java.util.Iterator; import java.util.List; import java.util.UUID; @@ -368,4 +372,68 @@ public Boolean get() { cluster.shutdown(); } } + + @Test(timeout=300000) + public void testInterleavedFullBlockReports() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + 36000000L); + int numStoragesPerDatanode = 6; + final MiniDFSCluster cluster = new MiniDFSCluster + .Builder(conf).numDataNodes(1) + .storagesPerDatanode(numStoragesPerDatanode) + .build(); + try { + LOG.info("waiting for cluster to become active..."); + cluster.waitActive(); + // Get the datanode registration and the block reports + DataNode dn = cluster.getDataNodes().get(0); + final String blockPoolId = cluster.getNamesystem().getBlockPoolId(); + LOG.info("Block pool id: " + blockPoolId); + final DatanodeRegistration dnR = dn.getDNRegistrationForBP(blockPoolId); + Map perVolumeBlockLists = + dn.getFSDataset().getBlockReports(blockPoolId); + final StorageBlockReport[] reports = + new StorageBlockReport[perVolumeBlockLists.size()]; + int reportIndex = 0; + for(Map.Entry kvPair : + perVolumeBlockLists.entrySet()) { + DatanodeStorage dnStorage = kvPair.getKey(); + BlockListAsLongs blockList = kvPair.getValue(); + reports[reportIndex++] = + new StorageBlockReport(dnStorage, blockList); + } + // Get the list of storage ids associated with the datanode + // before the test + BlockManager bm = + cluster.getNameNode().getNamesystem().getBlockManager(); + final DatanodeDescriptor dnDescriptor = bm.getDatanodeManager(). + getDatanode(cluster.getDataNodes().get(0).getDatanodeUuid()); + DatanodeStorageInfo[] storageInfos = dnDescriptor.getStorageInfos(); + // Send the full block report concurrently using + // numThreads=numStoragesPerDatanode + ExecutorService executorService = Executors. + newFixedThreadPool(numStoragesPerDatanode); + List> futureList = + new ArrayList<>(numStoragesPerDatanode); + for (int i = 0; i < numStoragesPerDatanode; i++) { + futureList.add(executorService.submit(new Callable() { + @Override + public DatanodeCommand call() throws IOException { + return cluster.getNameNodeRpc().blockReport(dnR, blockPoolId, + reports, new BlockReportContext(1, 0, System.nanoTime(), + 0L, true)); + } + })); + } + for (Future future: futureList) { + future.get(); + } + executorService.shutdown(); + // Verify that the storages match before and after the test + Assert.assertArrayEquals(storageInfos, dnDescriptor.getStorageInfos()); + } finally { + cluster.shutdown(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java index bf0e3c11bd..f41c546674 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java @@ -41,6 +41,7 @@ import org.mockito.Mockito; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.*; import static org.mockito.Mockito.times; @@ -88,6 +89,34 @@ private void createFile(String filenamePrefix, int blockCount) blockCount * BLOCK_SIZE, BLOCK_SIZE, REPL_FACTOR, seed); } + private void verifyCapturedArgumentsSplit( + ArgumentCaptor captor, + int expectedReportsPerCall, + int expectedTotalBlockCount) { + List listOfReports = captor.getAllValues(); + int numBlocksReported = 0; + int storageIndex = 0; + int listOfReportsSize = listOfReports.size(); + for (StorageBlockReport[] reports : listOfReports) { + if (storageIndex < (listOfReportsSize - 1)) { + assertThat(reports.length, is(expectedReportsPerCall)); + } else { + assertThat(reports.length, is(listOfReportsSize)); + } + for (StorageBlockReport report : reports) { + BlockListAsLongs blockList = report.getBlocks(); + if (!blockList.isStorageReport()) { + numBlocksReported += blockList.getNumberOfBlocks(); + } else { + assertEquals(blockList.getNumberOfBlocks(), -1); + } + } + storageIndex++; + } + + assert(numBlocksReported >= expectedTotalBlockCount); + } + private void verifyCapturedArguments( ArgumentCaptor captor, int expectedReportsPerCall, @@ -136,7 +165,7 @@ public void testAlwaysSplit() throws IOException, InterruptedException { anyString(), captor.capture(), Mockito.anyObject()); - verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); + verifyCapturedArgumentsSplit(captor, 1, BLOCKS_IN_FILE); } /** @@ -200,7 +229,7 @@ public void testCornerCaseAtThreshold() throws IOException, InterruptedException anyString(), captor.capture(), Mockito.anyObject()); - verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); + verifyCapturedArgumentsSplit(captor, 1, BLOCKS_IN_FILE); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java index 791ee20190..524243bbd8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; @@ -34,13 +35,32 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase { @Override protected void sendBlockReports(DatanodeRegistration dnR, String poolId, StorageBlockReport[] reports) throws IOException { - int i = 0; - for (StorageBlockReport report : reports) { - LOG.info("Sending block report for storage " + report.getStorage().getStorageID()); - StorageBlockReport[] singletonReport = { report }; - cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport, - new BlockReportContext(reports.length, i, System.nanoTime(), 0L, true)); - i++; + for (int r = 0; r < reports.length; r++) { + LOG.info("Sending block report for storage " + + reports[r].getStorage().getStorageID()); + StorageBlockReport[] singletonReport = {reports[r]}; + if (r != reports.length - 1) { + cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport, + new BlockReportContext(reports.length, r, System.nanoTime(), + 0L, true)); + } else { + StorageBlockReport[] lastSplitReport = + new StorageBlockReport[reports.length]; + // When block reports are split, send a dummy storage report for all + // other storages in the blockreport along with the last storage report + for (int i = 0; i <= r; i++) { + if (i == r) { + lastSplitReport[i] = reports[r]; + break; + } + lastSplitReport[i] = + new StorageBlockReport(reports[i].getStorage(), + BlockListAsLongs.STORAGE_REPORT); + } + cluster.getNameNodeRpc().blockReport(dnR, poolId, lastSplitReport, + new BlockReportContext(reports.length, r, System.nanoTime(), + 0L, true)); + } } } }