From 9a6d00aba47397b6dfbf2900de9e20a64ba3cdc4 Mon Sep 17 00:00:00 2001 From: huhaiyang Date: Mon, 4 Dec 2023 21:16:38 +0800 Subject: [PATCH] HDFS-17218. NameNode should process time out excess redundancy blocks (#6176). Contributed by Haiyang Hu. Signed-off-by: He Xiaoqiao --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 7 + .../server/blockmanagement/BlockManager.java | 115 ++++++++++++++++ .../blockmanagement/ExcessRedundancyMap.java | 57 +++++++- .../src/main/resources/hdfs-default.xml | 18 +++ .../apache/hadoop/hdfs/MiniDFSCluster.java | 19 +++ .../blockmanagement/TestBlockManager.java | 128 ++++++++++++++++++ 6 files changed, 337 insertions(+), 7 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 88a18d9cf0..f92a2ad565 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -315,6 +315,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT = 300; + public static final String DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY = + "dfs.namenode.excess.redundancy.timeout-sec"; + public static final long DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_DEAFULT = 3600; + public static final String DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT + = "dfs.namenode.excess.redundancy.timeout.check.limit"; + public static final long DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT = 1000; + public static final String DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY = "dfs.namenode.maintenance.replication.min"; public static final int DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 2d216be945..76efb35300 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -30,6 +30,7 @@ import java.util.BitSet; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -50,6 +51,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import javax.management.ObjectName; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -86,6 +88,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.PendingReconstructionBlocks.PendingBlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.ExcessRedundancyMap.ExcessBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; @@ -116,6 +119,7 @@ import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; +import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; @@ -482,6 +486,16 @@ public int getPendingSPSPaths() { /** Storages accessible from multiple DNs. */ private final ProvidedStorageMap providedStorageMap; + /** + * Timeout for excess redundancy block. + */ + private long excessRedundancyTimeout; + + /** + * Limits number of blocks used to check for excess redundancy timeout. + */ + private long excessRedundancyTimeoutCheckLimit; + public BlockManager(final Namesystem namesystem, boolean haEnabled, final Configuration conf) throws IOException { this.namesystem = namesystem; @@ -589,6 +603,12 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED, DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT); + setExcessRedundancyTimeout(conf.getLong(DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY, + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_DEAFULT)); + setExcessRedundancyTimeoutCheckLimit(conf.getLong( + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT, + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT)); + printInitialConfigs(); } @@ -3041,6 +3061,100 @@ void rescanPostponedMisreplicatedBlocks() { (Time.monotonicNow() - startTime), endSize, (startSize - endSize)); } } + + /** + * Sets the timeout (in seconds) for excess redundancy blocks, if the provided timeout is + * less than or equal to 0, the default value is used (converted to milliseconds). + * @param timeout The time (in seconds) to set as the excess redundancy block timeout. + */ + public void setExcessRedundancyTimeout(long timeout) { + if (timeout <= 0) { + this.excessRedundancyTimeout = DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_DEAFULT * 1000L; + } else { + this.excessRedundancyTimeout = timeout * 1000L; + } + } + + /** + * Sets the limit number of blocks for checking excess redundancy timeout. + * If the provided limit is less than or equal to 0, the default limit is used. + * + * @param limit The limit number of blocks used to check for excess redundancy timeout. + */ + public void setExcessRedundancyTimeoutCheckLimit(long limit) { + if (excessRedundancyTimeoutCheckLimit <= 0) { + this.excessRedundancyTimeoutCheckLimit = + DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT; + } else { + this.excessRedundancyTimeoutCheckLimit = limit; + } + } + + /** + * Process timed-out blocks in the excess redundancy map. + */ + void processTimedOutExcessBlocks() { + if (excessRedundancyMap.size() == 0) { + return; + } + namesystem.writeLock(); + long now = Time.monotonicNow(); + int processed = 0; + try { + Iterator>> iter = + excessRedundancyMap.getExcessRedundancyMap().entrySet().iterator(); + while (iter.hasNext() && processed < excessRedundancyTimeoutCheckLimit) { + Map.Entry> entry = iter.next(); + String datanodeUuid = entry.getKey(); + LightWeightHashSet blocks = entry.getValue(); + // Sort blocks by timestamp in descending order. + List sortedBlocks = blocks.stream() + .filter(block -> block instanceof ExcessBlockInfo) + .map(block -> (ExcessBlockInfo) block) + .sorted(Comparator.comparingLong(ExcessBlockInfo::getTimeStamp)) + .collect(Collectors.toList()); + + for (ExcessBlockInfo excessBlockInfo : sortedBlocks) { + if (processed >= excessRedundancyTimeoutCheckLimit) { + break; + } + + processed++; + // If the datanode doesn't have any excess block that has exceeded the timeout, + // can exit this loop. + if (now <= excessBlockInfo.getTimeStamp() + excessRedundancyTimeout) { + break; + } + + BlockInfo blockInfo = excessBlockInfo.getBlockInfo(); + BlockInfo bi = blocksMap.getStoredBlock(blockInfo); + if (bi == null || bi.isDeleted()) { + continue; + } + + Iterator iterator = blockInfo.getStorageInfos(); + while (iterator.hasNext()) { + DatanodeStorageInfo datanodeStorageInfo = iterator.next(); + DatanodeDescriptor datanodeDescriptor = datanodeStorageInfo.getDatanodeDescriptor(); + if (datanodeDescriptor.getDatanodeUuid().equals(datanodeUuid) && + datanodeStorageInfo.getState().equals(State.NORMAL)) { + final Block b = getBlockOnStorage(blockInfo, datanodeStorageInfo); + if (!containsInvalidateBlock(datanodeDescriptor, b)) { + addToInvalidates(b, datanodeDescriptor); + LOG.debug("Excess block timeout ({}, {}) is added to invalidated.", + b, datanodeDescriptor); + } + excessBlockInfo.setTimeStamp(); + break; + } + } + } + } + } finally { + namesystem.writeUnlock("processTimedOutExcessBlocks"); + LOG.info("processTimedOutExcessBlocks {} msecs.", (Time.monotonicNow() - now)); + } + } Collection processReport( final DatanodeStorageInfo storageInfo, @@ -5232,6 +5346,7 @@ public void run() { computeDatanodeWork(); processPendingReconstructions(); rescanPostponedMisreplicatedBlocks(); + processTimedOutExcessBlocks(); lastRedundancyCycleTS.set(Time.monotonicNow()); } TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java index 225a962692..92c3c79524 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java @@ -21,12 +21,15 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.slf4j.Logger; import org.apache.hadoop.classification.VisibleForTesting; +import static org.apache.hadoop.util.Time.monotonicNow; + /** * Maps a datnode to the set of excess redundancy details. * @@ -35,7 +38,7 @@ class ExcessRedundancyMap { public static final Logger blockLog = NameNode.blockStateChangeLog; - private final Map> map =new HashMap<>(); + private final Map> map = new HashMap<>(); private final AtomicLong size = new AtomicLong(0L); /** @@ -50,7 +53,7 @@ long size() { */ @VisibleForTesting synchronized int getSize4Testing(String dnUuid) { - final LightWeightHashSet set = map.get(dnUuid); + final LightWeightHashSet set = map.get(dnUuid); return set == null? 0: set.size(); } @@ -64,7 +67,7 @@ synchronized void clear() { * datanode and the given block? */ synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) { - final LightWeightHashSet set = map.get(dn.getDatanodeUuid()); + final LightWeightHashSet set = map.get(dn.getDatanodeUuid()); return set != null && set.contains(blk); } @@ -75,12 +78,12 @@ synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) { * @return true if the block is added. */ synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) { - LightWeightHashSet set = map.get(dn.getDatanodeUuid()); + LightWeightHashSet set = map.get(dn.getDatanodeUuid()); if (set == null) { set = new LightWeightHashSet<>(); map.put(dn.getDatanodeUuid(), set); } - final boolean added = set.add(blk); + final boolean added = set.add(new ExcessBlockInfo(blk)); if (added) { size.incrementAndGet(); blockLog.debug("BLOCK* ExcessRedundancyMap.add({}, {})", dn, blk); @@ -95,11 +98,10 @@ synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) { * @return true if the block is removed. */ synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) { - final LightWeightHashSet set = map.get(dn.getDatanodeUuid()); + final LightWeightHashSet set = map.get(dn.getDatanodeUuid()); if (set == null) { return false; } - final boolean removed = set.remove(blk); if (removed) { size.decrementAndGet(); @@ -111,4 +113,45 @@ synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) { } return removed; } + + synchronized Map> getExcessRedundancyMap() { + return map; + } + + /** + * An object that contains information about a block that is being excess redundancy. + * It records the timestamp when added excess redundancy map of this block. + */ + static class ExcessBlockInfo extends Block { + private long timeStamp; + private final BlockInfo blockInfo; + + ExcessBlockInfo(BlockInfo blockInfo) { + super(blockInfo.getBlockId(), blockInfo.getNumBytes(), blockInfo.getGenerationStamp()); + this.timeStamp = monotonicNow(); + this.blockInfo = blockInfo; + } + + public BlockInfo getBlockInfo() { + return blockInfo; + } + + long getTimeStamp() { + return timeStamp; + } + + void setTimeStamp() { + timeStamp = monotonicNow(); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index b64a7a664b..3ef433d6a4 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -5425,6 +5425,24 @@ + + dfs.namenode.excess.redundancy.timeout-sec + 3600 + + Timeout in seconds for excess redundancy block. If this value is 0 or less, + then it will default to 3600 minutes. + + + + + dfs.namenode.excess.redundancy.timeout.check.limit + 1000 + + Limits number of blocks used to check for excess redundancy timeout. + If this value is 0 or less, then it will default to 1000. + + + dfs.namenode.stale.datanode.minimum.interval 3 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index ca8ae04bbf..6c20c141a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2092,6 +2092,25 @@ public FsDatasetTestUtils getFsDatasetTestUtils(DataNode dn) { .newInstance(dn); } + /** + * Wait for the datanodes in the cluster to process any block + * deletions that have already been asynchronously queued. + */ + public void waitForDNDeletions() + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + for (DataNode dn : getDataNodes()) { + if (getFsDatasetTestUtils(dn).getPendingAsyncDeletions() > 0) { + return false; + } + } + return true; + } + }, 1000, 10000); + } + /** * Gets the rpc port used by the NameNode, because the caller * supplied port is not necessarily the actual port used. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 3e00491e99..8ce69a45eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.thirdparty.com.google.common.collect.LinkedListMultimap; @@ -112,6 +113,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -2201,4 +2203,130 @@ public void testBlockReportSetNoAckBlockToInvalidate() throws Exception { assertEquals(1, getLongCounter("IncrementalBlockReportsNumOps", rb)); } } + + /** + * Test NameNode should process time out excess redundancy blocks. + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test(timeout = 360000) + public void testProcessTimedOutExcessBlocks() throws IOException, + InterruptedException, TimeoutException { + Configuration config = new HdfsConfiguration(); + // Bump up replication interval. + config.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10000); + // Set the excess redundancy block timeout. + long timeOut = 60L; + config.setLong(DFSConfigKeys.DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY, timeOut); + + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); + + final Semaphore semaphore = new Semaphore(0); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(config).numDataNodes(3).build()) { + DistributedFileSystem fs = cluster.getFileSystem(); + BlockManager blockManager = cluster.getNameNode().getNamesystem().getBlockManager(); + cluster.waitActive(); + + final DataNodeFaultInjector injector = new DataNodeFaultInjector() { + @Override + public void delayDeleteReplica() { + // Lets wait for the remove replica process. + try { + semaphore.acquire(1); + } catch (InterruptedException e) { + // ignore. + } + } + }; + DataNodeFaultInjector.set(injector); + + // Create file. + Path path = new Path("/testfile"); + DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0); + DFSTestUtil.waitReplication(fs, path, (short) 3); + LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0); + ExtendedBlock extendedBlock = lb.getBlock(); + DatanodeInfo[] loc = lb.getLocations(); + assertEquals(3, loc.length); + + // Set replication as 2, to choose excess. + fs.setReplication(path, (short) 2); + + // Check excessRedundancyMap and invalidateBlocks size as 1. + assertEquals(1, blockManager.getExcessBlocksCount()); + assertEquals(1, blockManager.getPendingDeletionBlocksCount()); + DataNode excessDn = Arrays.stream(loc). + filter(datanodeInfo -> blockManager.getExcessSize4Testing( + datanodeInfo.getDatanodeUuid()) > 0) + .map(datanodeInfo -> cluster.getDataNode(datanodeInfo.getIpcPort())) + .findFirst() + .orElse(null); + + // Schedule blocks for deletion at excessDn. + assertEquals(1, blockManager.computeInvalidateWork(1)); + // Check excessRedundancyMap size as 1. + assertEquals(1, blockManager.getExcessBlocksCount()); + // Check invalidateBlocks size as 0. + assertEquals(0, blockManager.getPendingDeletionBlocksCount()); + assertNotNull(excessDn); + + // NameNode will ask datanode to delete replicas in heartbeat response. + cluster.triggerHeartbeats(); + + // Wait for the datanode to process any block deletions + // that have already been asynchronously queued. + DataNode finalExcessDn = excessDn; + GenericTestUtils.waitFor( + () -> cluster.getFsDatasetTestUtils(finalExcessDn).getPendingAsyncDeletions() == 1, + 100, 1000); + + // Restart the datanode. + int ipcPort = excessDn.getDatanodeId().getIpcPort(); + MiniDFSCluster.DataNodeProperties dataNodeProperties = cluster.stopDataNode( + excessDn.getDatanodeId().getXferAddr()); + assertTrue(cluster.restartDataNode(dataNodeProperties, true)); + semaphore.release(1); + cluster.waitActive(); + + // Check replica is exists in excessDn. + excessDn = cluster.getDataNode(ipcPort); + assertNotNull(cluster.getFsDatasetTestUtils(excessDn).fetchReplica(extendedBlock)); + assertEquals(0, cluster.getFsDatasetTestUtils(excessDn).getPendingAsyncDeletions()); + + // Verify excess redundancy blocks have not timed out. + blockManager.processTimedOutExcessBlocks(); + assertEquals(0, blockManager.getPendingDeletionBlocksCount()); + + // Verify excess redundancy block time out. + Thread.sleep(timeOut * 1000); + blockManager.processTimedOutExcessBlocks(); + + // Check excessRedundancyMap and invalidateBlocks size as 1. + assertEquals(1, blockManager.getExcessSize4Testing(excessDn.getDatanodeUuid())); + assertEquals(1, blockManager.getExcessBlocksCount()); + assertEquals(1, blockManager.getPendingDeletionBlocksCount()); + + // Schedule blocks for deletion. + assertEquals(1, blockManager.computeInvalidateWork(1)); + + cluster.triggerHeartbeats(); + + // Make it resume the removeReplicaFromMem method. + semaphore.release(1); + + // Wait for the datanode in the cluster to process any block + // deletions that have already been asynchronously queued + cluster.waitForDNDeletions(); + + // Trigger immediate deletion report. + cluster.triggerDeletionReports(); + + // The replica num should be 2. + assertEquals(2, DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations().length); + assertEquals(0, blockManager.getExcessBlocksCount()); + } finally { + DataNodeFaultInjector.set(oldInjector); + } + } } \ No newline at end of file