From b7923a356e9f111619375b94d12749d634069347 Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Tue, 16 Dec 2014 10:30:22 -0600 Subject: [PATCH] HDFS-6425. Large postponedMisreplicatedBlocks has impact on blockReport latency. Contributed by Ming Ma. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + .../server/blockmanagement/BlockManager.java | 113 +++++++++++------- .../blockmanagement/DatanodeManager.java | 14 ++- .../src/main/resources/hdfs-default.xml | 8 ++ .../blockmanagement/BlockManagerTestUtil.java | 8 ++ .../server/namenode/ha/TestDNFencing.java | 19 ++- .../ha/TestDNFencingWithReplication.java | 4 + 8 files changed, 124 insertions(+), 49 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ceb28ece9e..13f212a8ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -595,6 +595,9 @@ Release 2.7.0 - UNRELEASED HDFS-7516. Fix findbugs warnings in hdfs-nfs project. (brandonli) + HDFS-6425. Large postponedMisreplicatedBlocks has impact on blockReport + latency. (Ming Ma via kihwal) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 78cae9cb25..52a2fd5cd3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -325,6 +325,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio"; public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f; + // Number of blocks to rescan for each iteration of postponedMisreplicatedBlocks. + public static final String DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY = "dfs.namenode.blocks.per.postponedblocks.rescan"; + public static final long DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT = 10000; + // Replication monitoring related keys public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION = "dfs.namenode.invalidate.work.pct.per.iteration"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 5f718e7e76..7e54c0f9c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1050,21 +1050,6 @@ void removeBlocksAssociatedTo(final DatanodeDescriptor node) { node.resetBlocks(); invalidateBlocks.remove(node); - - // If the DN hasn't block-reported since the most recent - // failover, then we may have been holding up on processing - // over-replicated blocks because of it. But we can now - // process those blocks. - boolean stale = false; - for(DatanodeStorageInfo storage : node.getStorageInfos()) { - if (storage.areBlockContentsStale()) { - stale = true; - break; - } - } - if (stale) { - rescanPostponedMisreplicatedBlocks(); - } } /** Remove the blocks associated to the given DatanodeStorageInfo. */ @@ -1818,17 +1803,7 @@ public boolean processReport(final DatanodeID nodeID, invalidatedBlocks = processReport(storageInfo, newReport); } - // Now that we have an up-to-date block report, we know that any - // deletions from a previous NN iteration have been accounted for. - boolean staleBefore = storageInfo.areBlockContentsStale(); storageInfo.receivedBlockReport(); - if (staleBefore && !storageInfo.areBlockContentsStale()) { - LOG.info("BLOCK* processReport: Received first block report from " - + storage + " after starting up or becoming active. Its block " - + "contents are no longer considered stale"); - rescanPostponedMisreplicatedBlocks(); - } - } finally { endTime = Time.now(); namesystem.writeUnlock(); @@ -1857,31 +1832,74 @@ public boolean processReport(final DatanodeID nodeID, /** * Rescan the list of blocks which were previously postponed. */ - private void rescanPostponedMisreplicatedBlocks() { - for (Iterator it = postponedMisreplicatedBlocks.iterator(); - it.hasNext();) { - Block b = it.next(); - - BlockInfo bi = blocksMap.getStoredBlock(b); - if (bi == null) { + void rescanPostponedMisreplicatedBlocks() { + if (getPostponedMisreplicatedBlocksCount() == 0) { + return; + } + long startTimeRescanPostponedMisReplicatedBlocks = Time.now(); + long startPostponedMisReplicatedBlocksCount = + getPostponedMisreplicatedBlocksCount(); + namesystem.writeLock(); + try { + // blocksPerRescan is the configured number of blocks per rescan. + // Randomly select blocksPerRescan consecutive blocks from the HashSet + // when the number of blocks remaining is larger than blocksPerRescan. + // The reason we don't always pick the first blocksPerRescan blocks is to + // handle the case if for some reason some datanodes remain in + // content stale state for a long time and only impact the first + // blocksPerRescan blocks. + int i = 0; + long startIndex = 0; + long blocksPerRescan = + datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan(); + long base = getPostponedMisreplicatedBlocksCount() - blocksPerRescan; + if (base > 0) { + startIndex = DFSUtil.getRandom().nextLong() % (base+1); + if (startIndex < 0) { + startIndex += (base+1); + } + } + Iterator it = postponedMisreplicatedBlocks.iterator(); + for (int tmp = 0; tmp < startIndex; tmp++) { + it.next(); + } + + for (;it.hasNext(); i++) { + Block b = it.next(); + if (i >= blocksPerRescan) { + break; + } + + BlockInfo bi = blocksMap.getStoredBlock(b); + if (bi == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " + + "Postponed mis-replicated block " + b + " no longer found " + + "in block map."); + } + it.remove(); + postponedMisreplicatedBlocksCount.decrementAndGet(); + continue; + } + MisReplicationResult res = processMisReplicatedBlock(bi); if (LOG.isDebugEnabled()) { LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " + - "Postponed mis-replicated block " + b + " no longer found " + - "in block map."); + "Re-scanned block " + b + ", result is " + res); + } + if (res != MisReplicationResult.POSTPONE) { + it.remove(); + postponedMisreplicatedBlocksCount.decrementAndGet(); } - it.remove(); - postponedMisreplicatedBlocksCount.decrementAndGet(); - continue; - } - MisReplicationResult res = processMisReplicatedBlock(bi); - if (LOG.isDebugEnabled()) { - LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " + - "Re-scanned block " + b + ", result is " + res); - } - if (res != MisReplicationResult.POSTPONE) { - it.remove(); - postponedMisreplicatedBlocksCount.decrementAndGet(); } + } finally { + namesystem.writeUnlock(); + long endPostponedMisReplicatedBlocksCount = + getPostponedMisreplicatedBlocksCount(); + LOG.info("Rescan of postponedMisreplicatedBlocks completed in " + + (Time.now() - startTimeRescanPostponedMisReplicatedBlocks) + + " msecs. " + endPostponedMisReplicatedBlocksCount + + " blocks are left. " + (startPostponedMisReplicatedBlocksCount - + endPostponedMisReplicatedBlocksCount) + " blocks are removed."); } } @@ -3580,6 +3598,7 @@ public void run() { if (namesystem.isPopulatingReplQueues()) { computeDatanodeWork(); processPendingReplications(); + rescanPostponedMisreplicatedBlocks(); } Thread.sleep(replicationRecheckInterval); } catch (Throwable t) { @@ -3648,6 +3667,8 @@ public void clearQueues() { excessReplicateMap.clear(); invalidateBlocks.clear(); datanodeManager.clearPendingQueues(); + postponedMisreplicatedBlocks.clear(); + postponedMisreplicatedBlocksCount.set(0); }; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 0ff469a8c8..41d03633fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -133,13 +133,18 @@ public class DatanodeManager { * writing to stale datanodes, i.e., continue using stale nodes for writing. */ private final float ratioUseStaleDataNodesForWrite; - + /** The number of stale DataNodes */ private volatile int numStaleNodes; /** The number of stale storages */ private volatile int numStaleStorages; + /** + * Number of blocks to check for each postponedMisreplicatedBlocks iteration + */ + private final long blocksPerPostponedMisreplicatedBlocksRescan; + /** * Whether or not this cluster has ever consisted of more than 1 rack, * according to the NetworkTopology. @@ -259,6 +264,9 @@ public class DatanodeManager { this.timeBetweenResendingCachingDirectivesMs = conf.getLong( DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS, DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT); + this.blocksPerPostponedMisreplicatedBlocksRescan = conf.getLong( + DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY, + DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT); } private static long getStaleIntervalFromConf(Configuration conf, @@ -1133,6 +1141,10 @@ public boolean shouldAvoidStaleDataNodesForWrite() { * ratioUseStaleDataNodesForWrite); } + public long getBlocksPerPostponedMisreplicatedBlocksRescan() { + return blocksPerPostponedMisreplicatedBlocksRescan; + } + /** * @return The time interval used to mark DataNodes as stale. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 55a876e0ab..4d60792004 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2253,4 +2253,12 @@ + + dfs.namenode.blocks.per.postponedblocks.rescan + 10000 + Number of blocks to rescan for each iteration of + postponedMisreplicatedBlocks. + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 2755b2989d..fccd308e70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -238,6 +238,14 @@ public static DatanodeStorageInfo updateStorage(DatanodeDescriptor dn, return dn.updateStorage(s); } + /** + * Call heartbeat check function of HeartbeatManager + * @param bm the BlockManager to manipulate + */ + public static void rescanPostponedMisreplicatedBlocks(BlockManager bm) { + bm.rescanPostponedMisreplicatedBlocks(); + } + public static DatanodeDescriptor getLocalDatanodeDescriptor( boolean initializeStorage) { DatanodeDescriptor dn = new DatanodeDescriptor(DFSTestUtil.getLocalDatanodeID()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java index 75d5b70d5e..85864f7af2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java @@ -165,7 +165,12 @@ public void testDnFencing() throws Exception { banner("Metadata after nodes have all block-reported"); doMetasave(nn2); - + + // Force a rescan of postponedMisreplicatedBlocks. + BlockManager nn2BM = nn2.getNamesystem().getBlockManager(); + BlockManagerTestUtil.checkHeartbeat(nn2BM); + BlockManagerTestUtil.rescanPostponedMisreplicatedBlocks(nn2BM); + // The blocks should no longer be postponed. assertEquals(0, nn2.getNamesystem().getPostponedMisreplicatedBlocks()); @@ -251,7 +256,12 @@ public void testNNClearsCommandsOnFailoverAfterStartup() banner("Metadata after nodes have all block-reported"); doMetasave(nn2); - + + // Force a rescan of postponedMisreplicatedBlocks. + BlockManager nn2BM = nn2.getNamesystem().getBlockManager(); + BlockManagerTestUtil.checkHeartbeat(nn2BM); + BlockManagerTestUtil.rescanPostponedMisreplicatedBlocks(nn2BM); + // The block should no longer be postponed. assertEquals(0, nn2.getNamesystem().getPostponedMisreplicatedBlocks()); @@ -347,6 +357,11 @@ public void testNNClearsCommandsOnFailoverWithReplChanges() banner("Metadata after nodes have all block-reported"); doMetasave(nn2); + // Force a rescan of postponedMisreplicatedBlocks. + BlockManager nn2BM = nn2.getNamesystem().getBlockManager(); + BlockManagerTestUtil.checkHeartbeat(nn2BM); + BlockManagerTestUtil.rescanPostponedMisreplicatedBlocks(nn2BM); + // The block should no longer be postponed. assertEquals(0, nn2.getNamesystem().getPostponedMisreplicatedBlocks()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java index 93830c1d28..e7cba75011 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java @@ -109,6 +109,10 @@ public void testFencingStress() throws Exception { HAStressTestHarness harness = new HAStressTestHarness(); harness.conf.setInt( DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000); + harness.conf.setInt( + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1); + harness.conf.setInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); final MiniDFSCluster cluster = harness.startCluster(); try {