diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index c8f031e8fe..7856419f3d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -483,6 +483,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // Whether to enable datanode's stale state detection and usage for writes public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode"; public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false; + // enable and disable logging datanode staleness. Disabled by default. + public static final String DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_KEY = + "dfs.namenode.enable.log.stale.datanode"; + public static final boolean DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_DEFAULT = + false; // The default value of the time interval for marking datanodes as stale public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval"; public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index 46444bc9b2..9e4d867a0b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; @@ -43,7 +45,15 @@ import com.google.common.annotations.VisibleForTesting; */ class HeartbeatManager implements DatanodeStatistics { static final Logger LOG = LoggerFactory.getLogger(HeartbeatManager.class); - + private static final String REPORT_DELTA_STALE_DN_HEADER = + "StaleNodes Report: [New Stale Nodes]: %d"; + private static final String REPORT_STALE_DN_LINE_ENTRY = "%n\t %s"; + private static final String REPORT_STALE_DN_LINE_TAIL = ", %s"; + private static final String REPORT_REMOVE_DEAD_NODE_ENTRY = + "StaleNodes Report: [Remove DeadNode]: %s"; + private static final String REPORT_REMOVE_STALE_NODE_ENTRY = + "StaleNodes Report: [Remove StaleNode]: %s"; + private static final int REPORT_STALE_NODE_NODES_PER_LINE = 10; /** * Stores a subset of the datanodeMap in DatanodeManager, * containing nodes that are considered alive. @@ -56,14 +66,19 @@ class HeartbeatManager implements DatanodeStatistics { /** Statistics, which are synchronized by the heartbeat manager lock. */ private final DatanodeStats stats = new DatanodeStats(); - /** The time period to check for expired datanodes */ + /** The time period to check for expired datanodes. */ private final long heartbeatRecheckInterval; - /** Heartbeat monitor thread */ + /** Heartbeat monitor thread. */ private final Daemon heartbeatThread = new Daemon(new Monitor()); private final StopWatch heartbeatStopWatch = new StopWatch(); final Namesystem namesystem; final BlockManager blockManager; + /** Enable log for datanode staleness. */ + private final boolean enableLogStaleNodes; + + /** reports for stale datanodes. */ + private final Set staleDataNodes = new HashSet<>(); HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager, final Configuration conf) { @@ -78,6 +93,9 @@ class HeartbeatManager implements DatanodeStatistics { long staleInterval = conf.getLong( DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s + enableLogStaleNodes = conf.getBoolean( + DFSConfigKeys.DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_KEY, + DFSConfigKeys.DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_DEFAULT); if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) { this.heartbeatRecheckInterval = staleInterval; @@ -228,6 +246,7 @@ class HeartbeatManager implements DatanodeStatistics { if (node.isAlive()) { stats.subtract(node); datanodes.remove(node); + removeNodeFromStaleList(node); node.setAlive(false); } } @@ -323,6 +342,59 @@ class HeartbeatManager implements DatanodeStatistics { return elapsed + offset > heartbeatRecheckInterval; } + /** + * Remove deadNode from StaleNodeList if it exists. + * This method assumes that it is called inside a synchronized block. + * + * @param d node descriptor to be marked as dead. + * @return true if the node was already on the stale list. + */ + private boolean removeNodeFromStaleList(DatanodeDescriptor d) { + return removeNodeFromStaleList(d, true); + } + + /** + * Remove node from StaleNodeList if it exists. + * If enabled, the log will show whether the node is removed from list because + * it is dead or not. + * This method assumes that it is called inside a synchronized block. + * + * @param d node descriptor to be marked as dead. + * @param isDead + * @return true if the node was already in the stale list. + */ + private boolean removeNodeFromStaleList(DatanodeDescriptor d, + boolean isDead) { + boolean result = false; + result = staleDataNodes.remove(d); + if (enableLogStaleNodes && result) { + LOG.info(String.format(isDead ? + REPORT_REMOVE_DEAD_NODE_ENTRY : REPORT_REMOVE_STALE_NODE_ENTRY, + d)); + } + return result; + } + + /** + * Dump the new stale data nodes added since last heartbeat check. + * + * @param staleNodes list of datanodes added in the last heartbeat check. + */ + private void dumpStaleNodes(List staleNodes) { + // log nodes detected as stale + if (enableLogStaleNodes && (!staleNodes.isEmpty())) { + StringBuilder staleLogMSG = + new StringBuilder(String.format(REPORT_DELTA_STALE_DN_HEADER, + staleNodes.size())); + for (int ind = 0; ind < staleNodes.size(); ind++) { + String logFormat = (ind % REPORT_STALE_NODE_NODES_PER_LINE == 0) ? + REPORT_STALE_DN_LINE_ENTRY : REPORT_STALE_DN_LINE_TAIL; + staleLogMSG.append(String.format(logFormat, staleNodes.get(ind))); + } + LOG.info(staleLogMSG.toString()); + } + } + /** * Check if there are any expired heartbeats, and if so, * whether any blocks have to be re-replicated. @@ -365,9 +437,9 @@ class HeartbeatManager implements DatanodeStatistics { // locate the first failed storage that isn't on a dead node. DatanodeStorageInfo failedStorage = null; - // check the number of stale nodes - int numOfStaleNodes = 0; + // check the number of stale storages int numOfStaleStorages = 0; + List staleNodes = new ArrayList<>(); synchronized(this) { for (DatanodeDescriptor d : datanodes) { // check if an excessive GC pause has occurred @@ -377,13 +449,21 @@ class HeartbeatManager implements DatanodeStatistics { if (dead == null && dm.isDatanodeDead(d)) { stats.incrExpiredHeartbeats(); dead = d; + // remove the node from stale list to adjust the stale list size + // before setting the stale count of the DatanodeManager + removeNodeFromStaleList(d); + } else { + if (d.isStale(dm.getStaleInterval())) { + if (staleDataNodes.add(d)) { + // the node is n + staleNodes.add(d); + } + } else { + // remove the node if it is no longer stale + removeNodeFromStaleList(d, false); + } } - if (d.isStale(dm.getStaleInterval())) { - LOG.warn(String.format("Stale datanode {}." - + " No heartbeat received since last {} milliseconds"), - d.getName(), dm.getStaleInterval()); - numOfStaleNodes++; - } + DatanodeStorageInfo[] storageInfos = d.getStorageInfos(); for(DatanodeStorageInfo storageInfo : storageInfos) { if (storageInfo.areBlockContentsStale()) { @@ -396,18 +476,21 @@ class HeartbeatManager implements DatanodeStatistics { failedStorage = storageInfo; } } - } // Set the number of stale nodes in the DatanodeManager - dm.setNumStaleNodes(numOfStaleNodes); + dm.setNumStaleNodes(staleDataNodes.size()); dm.setNumStaleStorages(numOfStaleStorages); } + // log nodes detected as stale since last heartBeat + dumpStaleNodes(staleNodes); + allAlive = dead == null && failedStorage == null; if (!allAlive && namesystem.isInStartupSafeMode()) { return; } + if (dead != null) { // acquire the fsnamesystem lock, and then remove the dead node. namesystem.writeLock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index c291c892f9..6c54d85802 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2040,6 +2040,14 @@ + + dfs.namenode.enable.log.stale.datanode + false + + Enable and disable logging datanode staleness. Disabled by default. + + + dfs.namenode.stale.datanode.interval 30000