diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 9e1333f952..8ff46e37ae 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -557,7 +557,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // This value uses the times of heartbeat interval to define the minimum value for stale interval. public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval"; public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s - + public static final String DFS_NAMENODE_REMOVE_DEAD_DATANODE_BATCHNUM_KEY + = "dfs.namenode.remove.dead.datanode.batchnum"; + public static final int DFS_NAMENODE_REMOVE_BAD_BATCH_NUM_DEFAULT = 10; // When the percentage of stale datanodes reaches this ratio, // allow writing to stale nodes to prevent hotspots. public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index b923ba3a65..01e1b6392a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -71,6 +71,7 @@ class HeartbeatManager implements DatanodeStatistics { /** Heartbeat monitor thread. */ private final Daemon heartbeatThread = new Daemon(new Monitor()); private final StopWatch heartbeatStopWatch = new StopWatch(); + private final int numOfDeadDatanodesRemove; final Namesystem namesystem; final BlockManager blockManager; @@ -96,6 +97,9 @@ class HeartbeatManager implements DatanodeStatistics { enableLogStaleNodes = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_KEY, DFSConfigKeys.DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_DEFAULT); + this.numOfDeadDatanodesRemove = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_REMOVE_DEAD_DATANODE_BATCHNUM_KEY, + DFSConfigKeys.DFS_NAMENODE_REMOVE_BAD_BATCH_NUM_DEFAULT); if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) { this.heartbeatRecheckInterval = staleInterval; @@ -404,7 +408,7 @@ private void dumpStaleNodes(List staleNodes) { /** * Check if there are any expired heartbeats, and if so, * whether any blocks have to be re-replicated. - * While removing dead datanodes, make sure that only one datanode is marked + * While removing dead datanodes, make sure that limited datanodes is marked * dead at a time within the synchronized section. Otherwise, a cascading * effect causes more datanodes to be declared dead. * Check if there are any failed storage and if so, @@ -436,12 +440,17 @@ void heartbeatCheck() { return; } boolean allAlive = false; - while (!allAlive) { - // locate the first dead node. - DatanodeDescriptor dead = null; + // Locate limited dead nodes. + List deadDatanodes = new ArrayList<>( + numOfDeadDatanodesRemove); + // Locate limited failed storages that isn't on a dead node. + List failedStorages = new ArrayList<>( + numOfDeadDatanodesRemove); - // locate the first failed storage that isn't on a dead node. - DatanodeStorageInfo failedStorage = null; + while (!allAlive) { + + deadDatanodes.clear(); + failedStorages.clear(); // check the number of stale storages int numOfStaleStorages = 0; @@ -452,9 +461,10 @@ void heartbeatCheck() { if (shouldAbortHeartbeatCheck(0)) { return; } - if (dead == null && dm.isDatanodeDead(d)) { + if (deadDatanodes.size() < numOfDeadDatanodesRemove && + dm.isDatanodeDead(d)) { stats.incrExpiredHeartbeats(); - dead = d; + deadDatanodes.add(d); // remove the node from stale list to adjust the stale list size // before setting the stale count of the DatanodeManager removeNodeFromStaleList(d); @@ -476,10 +486,10 @@ void heartbeatCheck() { numOfStaleStorages++; } - if (failedStorage == null && + if (failedStorages.size() < numOfDeadDatanodesRemove && storageInfo.areBlocksOnFailedStorage() && - d != dead) { - failedStorage = storageInfo; + !deadDatanodes.contains(d)) { + failedStorages.add(storageInfo); } } } @@ -492,12 +502,12 @@ void heartbeatCheck() { // log nodes detected as stale since last heartBeat dumpStaleNodes(staleNodes); - allAlive = dead == null && failedStorage == null; + allAlive = deadDatanodes.isEmpty() && failedStorages.isEmpty(); if (!allAlive && namesystem.isInStartupSafeMode()) { return; } - if (dead != null) { + for (DatanodeDescriptor dead : deadDatanodes) { // acquire the fsnamesystem lock, and then remove the dead node. namesystem.writeLock(); try { @@ -506,7 +516,7 @@ void heartbeatCheck() { namesystem.writeUnlock("removeDeadDatanode"); } } - if (failedStorage != null) { + for (DatanodeStorageInfo failedStorage : failedStorages) { // acquire the fsnamesystem lock, and remove blocks on the storage. namesystem.writeLock(); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 33ffd07c8d..28fe382b17 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -5365,6 +5365,14 @@ + + dfs.namenode.remove.dead.datanode.batchnum + 10 + + Maximum number of datanodes removed by HeartbeatManager per scan. + + + dfs.namenode.snapshot.capture.openfiles false