HDFS-8674. Improve performance of postponed block scans. Contributed by Daryn Sharp.

This commit is contained in:
Kihwal Lee 2016-12-01 12:11:27 -06:00
parent e0fa49234f
commit 96c574927a

View File

@ -30,6 +30,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -43,8 +44,6 @@
import java.util.concurrent.FutureTask; import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
@ -101,7 +100,6 @@
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.util.FoldedTreeSet; import org.apache.hadoop.hdfs.util.FoldedTreeSet;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.server.namenode.CacheManager; import org.apache.hadoop.hdfs.server.namenode.CacheManager;
@ -184,7 +182,6 @@ public class BlockManager implements BlockStatsMXBean {
/** flag indicating whether replication queues have been initialized */ /** flag indicating whether replication queues have been initialized */
private boolean initializedReplQueues; private boolean initializedReplQueues;
private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
private final long startupDelayBlockDeletionInMs; private final long startupDelayBlockDeletionInMs;
private final BlockReportLeaseManager blockReportLeaseManager; private final BlockReportLeaseManager blockReportLeaseManager;
private ObjectName mxBeanName; private ObjectName mxBeanName;
@ -219,7 +216,7 @@ public long getExcessBlocksCount() {
} }
/** Used by metrics */ /** Used by metrics */
public long getPostponedMisreplicatedBlocksCount() { public long getPostponedMisreplicatedBlocksCount() {
return postponedMisreplicatedBlocksCount.get(); return postponedMisreplicatedBlocks.size();
} }
/** Used by metrics */ /** Used by metrics */
public int getPendingDataNodeMessageCount() { public int getPendingDataNodeMessageCount() {
@ -275,8 +272,10 @@ public long getNumTimedOutPendingReconstructions() {
* notified of all block deletions that might have been pending * notified of all block deletions that might have been pending
* when the failover happened. * when the failover happened.
*/ */
private final LightWeightHashSet<Block> postponedMisreplicatedBlocks = private final Set<Block> postponedMisreplicatedBlocks =
new LightWeightHashSet<>(); new LinkedHashSet<Block>();
private final int blocksPerPostpondedRescan;
private final ArrayList<Block> rescannedMisreplicatedBlocks;
/** /**
* Maps a StorageID to the set of blocks that are "extra" for this * Maps a StorageID to the set of blocks that are "extra" for this
@ -378,7 +377,10 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
datanodeManager = new DatanodeManager(this, namesystem, conf); datanodeManager = new DatanodeManager(this, namesystem, conf);
heartbeatManager = datanodeManager.getHeartbeatManager(); heartbeatManager = datanodeManager.getHeartbeatManager();
this.blockIdManager = new BlockIdManager(this); this.blockIdManager = new BlockIdManager(this);
blocksPerPostpondedRescan = (int)Math.min(Integer.MAX_VALUE,
datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan());
rescannedMisreplicatedBlocks =
new ArrayList<Block>(blocksPerPostpondedRescan);
startupDelayBlockDeletionInMs = conf.getLong( startupDelayBlockDeletionInMs = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L; DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L;
@ -1613,9 +1615,7 @@ public void setPostponeBlocksFromFuture(boolean postpone) {
private void postponeBlock(Block blk) { private void postponeBlock(Block blk) {
if (postponedMisreplicatedBlocks.add(blk)) { postponedMisreplicatedBlocks.add(blk);
postponedMisreplicatedBlocksCount.incrementAndGet();
}
} }
@ -2375,39 +2375,14 @@ void rescanPostponedMisreplicatedBlocks() {
if (getPostponedMisreplicatedBlocksCount() == 0) { if (getPostponedMisreplicatedBlocksCount() == 0) {
return; return;
} }
long startTimeRescanPostponedMisReplicatedBlocks = Time.monotonicNow();
namesystem.writeLock(); namesystem.writeLock();
long startPostponedMisReplicatedBlocksCount = long startTime = Time.monotonicNow();
getPostponedMisreplicatedBlocksCount(); long startSize = postponedMisreplicatedBlocks.size();
try { try {
// blocksPerRescan is the configured number of blocks per rescan.
// Randomly select blocksPerRescan consecutive blocks from the HashSet
// when the number of blocks remaining is larger than blocksPerRescan.
// The reason we don't always pick the first blocksPerRescan blocks is to
// handle the case if for some reason some datanodes remain in
// content stale state for a long time and only impact the first
// blocksPerRescan blocks.
int i = 0;
long startIndex = 0;
long blocksPerRescan =
datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan();
long base = getPostponedMisreplicatedBlocksCount() - blocksPerRescan;
if (base > 0) {
startIndex = ThreadLocalRandom.current().nextLong() % (base+1);
if (startIndex < 0) {
startIndex += (base+1);
}
}
Iterator<Block> it = postponedMisreplicatedBlocks.iterator(); Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
for (int tmp = 0; tmp < startIndex; tmp++) { for (int i=0; i < blocksPerPostpondedRescan && it.hasNext(); i++) {
it.next();
}
for (;it.hasNext(); i++) {
Block b = it.next(); Block b = it.next();
if (i >= blocksPerRescan) { it.remove();
break;
}
BlockInfo bi = getStoredBlock(b); BlockInfo bi = getStoredBlock(b);
if (bi == null) { if (bi == null) {
@ -2416,8 +2391,6 @@ void rescanPostponedMisreplicatedBlocks() {
"Postponed mis-replicated block " + b + " no longer found " + "Postponed mis-replicated block " + b + " no longer found " +
"in block map."); "in block map.");
} }
it.remove();
postponedMisreplicatedBlocksCount.decrementAndGet();
continue; continue;
} }
MisReplicationResult res = processMisReplicatedBlock(bi); MisReplicationResult res = processMisReplicatedBlock(bi);
@ -2425,20 +2398,19 @@ void rescanPostponedMisreplicatedBlocks() {
LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " + LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
"Re-scanned block " + b + ", result is " + res); "Re-scanned block " + b + ", result is " + res);
} }
if (res != MisReplicationResult.POSTPONE) { if (res == MisReplicationResult.POSTPONE) {
it.remove(); rescannedMisreplicatedBlocks.add(b);
postponedMisreplicatedBlocksCount.decrementAndGet();
} }
} }
} finally { } finally {
long endPostponedMisReplicatedBlocksCount = postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks);
getPostponedMisreplicatedBlocksCount(); rescannedMisreplicatedBlocks.clear();
long endSize = postponedMisreplicatedBlocks.size();
namesystem.writeUnlock(); namesystem.writeUnlock();
LOG.info("Rescan of postponedMisreplicatedBlocks completed in " + LOG.info("Rescan of postponedMisreplicatedBlocks completed in " +
(Time.monotonicNow() - startTimeRescanPostponedMisReplicatedBlocks) + (Time.monotonicNow() - startTime) + " msecs. " +
" msecs. " + endPostponedMisReplicatedBlocksCount + endSize + " blocks are left. " +
" blocks are left. " + (startPostponedMisReplicatedBlocksCount - (startSize - endSize) + " blocks were removed.");
endPostponedMisReplicatedBlocksCount) + " blocks are removed.");
} }
} }
@ -4048,9 +4020,7 @@ public void removeBlock(BlockInfo block) {
// Remove the block from pendingReconstruction and neededReconstruction // Remove the block from pendingReconstruction and neededReconstruction
pendingReconstruction.remove(block); pendingReconstruction.remove(block);
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL); neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
if (postponedMisreplicatedBlocks.remove(block)) { postponedMisreplicatedBlocks.remove(block);
postponedMisreplicatedBlocksCount.decrementAndGet();
}
} }
public BlockInfo getStoredBlock(Block block) { public BlockInfo getStoredBlock(Block block) {
@ -4464,7 +4434,6 @@ public void clearQueues() {
invalidateBlocks.clear(); invalidateBlocks.clear();
datanodeManager.clearPendingQueues(); datanodeManager.clearPendingQueues();
postponedMisreplicatedBlocks.clear(); postponedMisreplicatedBlocks.clear();
postponedMisreplicatedBlocksCount.set(0);
}; };
public static LocatedBlock newLocatedBlock( public static LocatedBlock newLocatedBlock(