diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 1b744e768f..e60703be4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -43,8 +44,6 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - import javax.management.ObjectName; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -101,7 +100,6 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.util.FoldedTreeSet; -import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.server.namenode.CacheManager; @@ -184,7 +182,6 @@ public class BlockManager implements BlockStatsMXBean { /** flag indicating whether replication queues have been initialized */ private boolean initializedReplQueues; - private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L); private final long startupDelayBlockDeletionInMs; private final BlockReportLeaseManager blockReportLeaseManager; private ObjectName mxBeanName; @@ -219,7 +216,7 @@ public long getExcessBlocksCount() { } /** Used by metrics */ public long getPostponedMisreplicatedBlocksCount() { - return postponedMisreplicatedBlocksCount.get(); + return postponedMisreplicatedBlocks.size(); } /** Used by metrics */ public int getPendingDataNodeMessageCount() { @@ -275,8 +272,10 @@ public long getNumTimedOutPendingReconstructions() { * notified of all block deletions that might have been pending * when the failover happened. */ - private final LightWeightHashSet postponedMisreplicatedBlocks = - new LightWeightHashSet<>(); + private final Set postponedMisreplicatedBlocks = + new LinkedHashSet(); + private final int blocksPerPostpondedRescan; + private final ArrayList rescannedMisreplicatedBlocks; /** * Maps a StorageID to the set of blocks that are "extra" for this @@ -378,7 +377,10 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, datanodeManager = new DatanodeManager(this, namesystem, conf); heartbeatManager = datanodeManager.getHeartbeatManager(); this.blockIdManager = new BlockIdManager(this); - + blocksPerPostpondedRescan = (int)Math.min(Integer.MAX_VALUE, + datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan()); + rescannedMisreplicatedBlocks = + new ArrayList(blocksPerPostpondedRescan); startupDelayBlockDeletionInMs = conf.getLong( DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L; @@ -1613,9 +1615,7 @@ public void setPostponeBlocksFromFuture(boolean postpone) { private void postponeBlock(Block blk) { - if (postponedMisreplicatedBlocks.add(blk)) { - postponedMisreplicatedBlocksCount.incrementAndGet(); - } + postponedMisreplicatedBlocks.add(blk); } @@ -2375,39 +2375,14 @@ void rescanPostponedMisreplicatedBlocks() { if (getPostponedMisreplicatedBlocksCount() == 0) { return; } - long startTimeRescanPostponedMisReplicatedBlocks = Time.monotonicNow(); namesystem.writeLock(); - long startPostponedMisReplicatedBlocksCount = - getPostponedMisreplicatedBlocksCount(); + long startTime = Time.monotonicNow(); + long startSize = postponedMisreplicatedBlocks.size(); try { - // blocksPerRescan is the configured number of blocks per rescan. - // Randomly select blocksPerRescan consecutive blocks from the HashSet - // when the number of blocks remaining is larger than blocksPerRescan. - // The reason we don't always pick the first blocksPerRescan blocks is to - // handle the case if for some reason some datanodes remain in - // content stale state for a long time and only impact the first - // blocksPerRescan blocks. - int i = 0; - long startIndex = 0; - long blocksPerRescan = - datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan(); - long base = getPostponedMisreplicatedBlocksCount() - blocksPerRescan; - if (base > 0) { - startIndex = ThreadLocalRandom.current().nextLong() % (base+1); - if (startIndex < 0) { - startIndex += (base+1); - } - } Iterator it = postponedMisreplicatedBlocks.iterator(); - for (int tmp = 0; tmp < startIndex; tmp++) { - it.next(); - } - - for (;it.hasNext(); i++) { + for (int i=0; i < blocksPerPostpondedRescan && it.hasNext(); i++) { Block b = it.next(); - if (i >= blocksPerRescan) { - break; - } + it.remove(); BlockInfo bi = getStoredBlock(b); if (bi == null) { @@ -2416,8 +2391,6 @@ void rescanPostponedMisreplicatedBlocks() { "Postponed mis-replicated block " + b + " no longer found " + "in block map."); } - it.remove(); - postponedMisreplicatedBlocksCount.decrementAndGet(); continue; } MisReplicationResult res = processMisReplicatedBlock(bi); @@ -2425,20 +2398,19 @@ void rescanPostponedMisreplicatedBlocks() { LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " + "Re-scanned block " + b + ", result is " + res); } - if (res != MisReplicationResult.POSTPONE) { - it.remove(); - postponedMisreplicatedBlocksCount.decrementAndGet(); + if (res == MisReplicationResult.POSTPONE) { + rescannedMisreplicatedBlocks.add(b); } } } finally { - long endPostponedMisReplicatedBlocksCount = - getPostponedMisreplicatedBlocksCount(); + postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks); + rescannedMisreplicatedBlocks.clear(); + long endSize = postponedMisreplicatedBlocks.size(); namesystem.writeUnlock(); LOG.info("Rescan of postponedMisreplicatedBlocks completed in " + - (Time.monotonicNow() - startTimeRescanPostponedMisReplicatedBlocks) + - " msecs. " + endPostponedMisReplicatedBlocksCount + - " blocks are left. " + (startPostponedMisReplicatedBlocksCount - - endPostponedMisReplicatedBlocksCount) + " blocks are removed."); + (Time.monotonicNow() - startTime) + " msecs. " + + endSize + " blocks are left. " + + (startSize - endSize) + " blocks were removed."); } } @@ -4048,9 +4020,7 @@ public void removeBlock(BlockInfo block) { // Remove the block from pendingReconstruction and neededReconstruction pendingReconstruction.remove(block); neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL); - if (postponedMisreplicatedBlocks.remove(block)) { - postponedMisreplicatedBlocksCount.decrementAndGet(); - } + postponedMisreplicatedBlocks.remove(block); } public BlockInfo getStoredBlock(Block block) { @@ -4464,7 +4434,6 @@ public void clearQueues() { invalidateBlocks.clear(); datanodeManager.clearPendingQueues(); postponedMisreplicatedBlocks.clear(); - postponedMisreplicatedBlocksCount.set(0); }; public static LocatedBlock newLocatedBlock(