From 2887bbb33cefaac0c548eb2450a1f8e3e60f5ea7 Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Thu, 11 Oct 2012 18:08:27 +0000 Subject: [PATCH] HDFS-3912. Detect and avoid stale datanodes for writes. Contributed by Jing Zhao git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1397211 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 12 +- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 14 +- .../BlockPlacementPolicyDefault.java | 142 ++++++++---- .../BlockPlacementPolicyWithNodeGroup.java | 99 ++++---- .../blockmanagement/DatanodeManager.java | 169 +++++++++++--- .../blockmanagement/HeartbeatManager.java | 73 +++++- .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../hdfs/server/namenode/FSClusterStats.java | 12 +- .../hdfs/server/namenode/FSNamesystem.java | 6 + .../src/main/resources/hdfs-default.xml | 50 +++- .../TestReplicationPolicy.java | 219 +++++++++++++++++- 11 files changed, 635 insertions(+), 163 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 012ece743d..a3c61900d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -13,10 +13,6 @@ Trunk (Unreleased) HDFS-3601. Add BlockPlacementPolicyWithNodeGroup to support block placement with 4-layer network topology. (Junping Du via szetszwo) - HDFS-3703. Datanodes are marked stale if heartbeat is not received in - configured timeout and are selected as the last location to read from. - (Jing Zhao via suresh) - IMPROVEMENTS HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants -> @@ -238,6 +234,9 @@ Release 2.0.3-alpha - Unreleased HDFS-2656. Add libwebhdfs, a pure C client based on WebHDFS. (Jaimin D Jetly and Jing Zhao via szetszwo) + HDFS-3912. Detect and avoid stale datanodes for writes. + (Jing Zhao via suresh) + IMPROVEMENTS HDFS-3925. Prettify PipelineAck#toString() for printing to a log @@ -349,6 +348,11 @@ Release 2.0.2-alpha - 2012-09-07 HDFS-2793. Add an admin command to trigger an edit log roll. (todd) + HDFS-3703. Datanodes are marked stale if heartbeat is not received in + configured timeout and are selected as the last location to read from. + (Jing Zhao via suresh) + + IMPROVEMENTS HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 918a790a5a..cb48c00ea7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -180,9 +180,21 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // Whether to enable datanode's stale state detection and usage public static final String DFS_NAMENODE_CHECK_STALE_DATANODE_KEY = "dfs.namenode.check.stale.datanode"; public static final boolean DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT = false; + // Whether to enable datanode's stale state detection and usage + public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode"; + public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false; // The default value of the time interval for marking datanodes as stale public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval"; - public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT = 30 * 1000; // 30s + public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s + // The stale interval cannot be too small since otherwise this may cause too frequent churn on stale states. + // This value uses the times of heartbeat interval to define the minimum value for stale interval. + public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval"; + public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s + + // When the number stale datanodes marked as stale reached this certian ratio, + // stop avoiding writing to stale nodes so as to prevent causing hotspots. + public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio"; + public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f; // Replication monitoring related keys public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index bc396539c6..211a574f1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -62,6 +62,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { protected NetworkTopology clusterMap; private FSClusterStats stats; protected long heartbeatInterval; // interval for DataNode heartbeats + private long staleInterval; // interval used to identify stale DataNodes + /** * A miss of that many heartbeats is tolerated for replica deletion policy. */ @@ -78,7 +80,8 @@ protected BlockPlacementPolicyDefault() { @Override public void initialize(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap) { - this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true); + this.considerLoad = conf.getBoolean( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true); this.stats = stats; this.clusterMap = clusterMap; this.heartbeatInterval = conf.getLong( @@ -87,6 +90,9 @@ public void initialize(Configuration conf, FSClusterStats stats, this.tolerateHeartbeatMultiplier = conf.getInt( DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT); + this.staleInterval = conf.getLong( + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT); } protected ThreadLocal threadLocalBuilder = @@ -155,9 +161,10 @@ DatanodeDescriptor[] chooseTarget(int numOfReplicas, writer=null; } - DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, - excludedNodes, blocksize, - maxNodesPerRack, results); + boolean avoidStaleNodes = (stats != null + && stats.isAvoidingStaleDataNodesForWrite()); + DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, + excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes); if (!returnChosenNodes) { results.removeAll(chosenNodes); } @@ -173,8 +180,8 @@ private DatanodeDescriptor chooseTarget(int numOfReplicas, HashMap excludedNodes, long blocksize, int maxNodesPerRack, - List results) { - + List results, + final boolean avoidStaleNodes) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { return writer; } @@ -185,18 +192,21 @@ private DatanodeDescriptor chooseTarget(int numOfReplicas, if (writer == null && !newBlock) { writer = results.get(0); } - + + // Keep a copy of original excludedNodes + final HashMap oldExcludedNodes = avoidStaleNodes ? + new HashMap(excludedNodes) : null; try { if (numOfResults == 0) { - writer = chooseLocalNode(writer, excludedNodes, - blocksize, maxNodesPerRack, results); + writer = chooseLocalNode(writer, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes); if (--numOfReplicas == 0) { return writer; } } if (numOfResults <= 1) { - chooseRemoteRack(1, results.get(0), excludedNodes, - blocksize, maxNodesPerRack, results); + chooseRemoteRack(1, results.get(0), excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes); if (--numOfReplicas == 0) { return writer; } @@ -204,24 +214,36 @@ private DatanodeDescriptor chooseTarget(int numOfReplicas, if (numOfResults <= 2) { if (clusterMap.isOnSameRack(results.get(0), results.get(1))) { chooseRemoteRack(1, results.get(0), excludedNodes, - blocksize, maxNodesPerRack, results); + blocksize, maxNodesPerRack, + results, avoidStaleNodes); } else if (newBlock){ chooseLocalRack(results.get(1), excludedNodes, blocksize, - maxNodesPerRack, results); + maxNodesPerRack, results, avoidStaleNodes); } else { - chooseLocalRack(writer, excludedNodes, blocksize, - maxNodesPerRack, results); + chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack, + results, avoidStaleNodes); } if (--numOfReplicas == 0) { return writer; } } - chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, - blocksize, maxNodesPerRack, results); + chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes); } catch (NotEnoughReplicasException e) { LOG.warn("Not able to place enough replicas, still in need of " + numOfReplicas + " to reach " + totalReplicasExpected + "\n" + e.getMessage()); + if (avoidStaleNodes) { + // ecxludedNodes now has - initial excludedNodes, any nodes that were + // chosen and nodes that were tried but were not chosen because they + // were stale, decommissioned or for any other reason a node is not + // chosen for write. Retry again now not avoiding stale node + for (Node node : results) { + oldExcludedNodes.put(node, node); + } + return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, + maxNodesPerRack, results, false); + } } return writer; } @@ -236,26 +258,27 @@ protected DatanodeDescriptor chooseLocalNode( HashMap excludedNodes, long blocksize, int maxNodesPerRack, - List results) + List results, + boolean avoidStaleNodes) throws NotEnoughReplicasException { // if no local machine, randomly choose one node if (localMachine == null) - return chooseRandom(NodeBase.ROOT, excludedNodes, - blocksize, maxNodesPerRack, results); + return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes); if (preferLocalNode) { // otherwise try local machine first Node oldNode = excludedNodes.put(localMachine, localMachine); if (oldNode == null) { // was not in the excluded list - if (isGoodTarget(localMachine, blocksize, - maxNodesPerRack, false, results)) { + if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false, + results, avoidStaleNodes)) { results.add(localMachine); return localMachine; } } } // try a node on local rack - return chooseLocalRack(localMachine, excludedNodes, - blocksize, maxNodesPerRack, results); + return chooseLocalRack(localMachine, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes); } /* choose one node from the rack that localMachine is on. @@ -270,19 +293,19 @@ protected DatanodeDescriptor chooseLocalRack( HashMap excludedNodes, long blocksize, int maxNodesPerRack, - List results) + List results, + boolean avoidStaleNodes) throws NotEnoughReplicasException { // no local machine, so choose a random machine if (localMachine == null) { - return chooseRandom(NodeBase.ROOT, excludedNodes, - blocksize, maxNodesPerRack, results); + return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes); } // choose one from the local rack try { - return chooseRandom( - localMachine.getNetworkLocation(), - excludedNodes, blocksize, maxNodesPerRack, results); + return chooseRandom(localMachine.getNetworkLocation(), excludedNodes, + blocksize, maxNodesPerRack, results, avoidStaleNodes); } catch (NotEnoughReplicasException e1) { // find the second replica DatanodeDescriptor newLocal=null; @@ -296,18 +319,17 @@ protected DatanodeDescriptor chooseLocalRack( } if (newLocal != null) { try { - return chooseRandom( - newLocal.getNetworkLocation(), - excludedNodes, blocksize, maxNodesPerRack, results); + return chooseRandom(newLocal.getNetworkLocation(), excludedNodes, + blocksize, maxNodesPerRack, results, avoidStaleNodes); } catch(NotEnoughReplicasException e2) { //otherwise randomly choose one from the network - return chooseRandom(NodeBase.ROOT, excludedNodes, - blocksize, maxNodesPerRack, results); + return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes); } } else { //otherwise randomly choose one from the network - return chooseRandom(NodeBase.ROOT, excludedNodes, - blocksize, maxNodesPerRack, results); + return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes); } } } @@ -323,17 +345,19 @@ protected void chooseRemoteRack(int numOfReplicas, HashMap excludedNodes, long blocksize, int maxReplicasPerRack, - List results) + List results, + boolean avoidStaleNodes) throws NotEnoughReplicasException { int oldNumOfReplicas = results.size(); // randomly choose one node from remote racks try { - chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(), - excludedNodes, blocksize, maxReplicasPerRack, results); + chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(), + excludedNodes, blocksize, maxReplicasPerRack, results, + avoidStaleNodes); } catch (NotEnoughReplicasException e) { chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas), localMachine.getNetworkLocation(), excludedNodes, blocksize, - maxReplicasPerRack, results); + maxReplicasPerRack, results, avoidStaleNodes); } } @@ -345,7 +369,8 @@ protected DatanodeDescriptor chooseRandom( HashMap excludedNodes, long blocksize, int maxNodesPerRack, - List results) + List results, + boolean avoidStaleNodes) throws NotEnoughReplicasException { int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet()); @@ -363,7 +388,8 @@ protected DatanodeDescriptor chooseRandom( Node oldNode = excludedNodes.put(chosenNode, chosenNode); if (oldNode == null) { // chosenNode was not in the excluded list numOfAvailableNodes--; - if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) { + if (isGoodTarget(chosenNode, blocksize, + maxNodesPerRack, results, avoidStaleNodes)) { results.add(chosenNode); adjustExcludedNodes(excludedNodes, chosenNode); return chosenNode; @@ -390,7 +416,8 @@ protected void chooseRandom(int numOfReplicas, HashMap excludedNodes, long blocksize, int maxNodesPerRack, - List results) + List results, + boolean avoidStaleNodes) throws NotEnoughReplicasException { int numOfAvailableNodes = @@ -409,7 +436,8 @@ protected void chooseRandom(int numOfReplicas, if (oldNode == null) { numOfAvailableNodes--; - if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) { + if (isGoodTarget(chosenNode, blocksize, + maxNodesPerRack, results, avoidStaleNodes)) { numOfReplicas--; results.add(chosenNode); adjustExcludedNodes(excludedNodes, chosenNode); @@ -451,9 +479,10 @@ protected void adjustExcludedNodes(HashMap excludedNodes, */ private boolean isGoodTarget(DatanodeDescriptor node, long blockSize, int maxTargetPerRack, - List results) { - return isGoodTarget(node, blockSize, maxTargetPerRack, - this.considerLoad, results); + List results, + boolean avoidStaleNodes) { + return isGoodTarget(node, blockSize, maxTargetPerRack, this.considerLoad, + results, avoidStaleNodes); } /** @@ -466,7 +495,8 @@ private boolean isGoodTarget(DatanodeDescriptor node, * the cluster and total number of replicas for a block * @param considerLoad whether or not to consider load of the target node * @param results A list containing currently chosen nodes. Used to check if - * too many nodes has been chosen in the target rack. + * too many nodes has been chosen in the target rack. + * @param avoidStaleNodes Whether or not to avoid choosing stale nodes * @return Return true if node has enough space, * does not have too much load, * and the rack does not have too many nodes. @@ -474,7 +504,8 @@ private boolean isGoodTarget(DatanodeDescriptor node, protected boolean isGoodTarget(DatanodeDescriptor node, long blockSize, int maxTargetPerRack, boolean considerLoad, - List results) { + List results, + boolean avoidStaleNodes) { // check if the node is (being) decommissed if (node.isDecommissionInProgress() || node.isDecommissioned()) { if(LOG.isDebugEnabled()) { @@ -485,6 +516,17 @@ protected boolean isGoodTarget(DatanodeDescriptor node, return false; } + if (avoidStaleNodes) { + if (node.isStale(this.staleInterval)) { + if (LOG.isDebugEnabled()) { + threadLocalBuilder.get().append(node.toString()).append(": ") + .append("Node ").append(NodeBase.getPath(node)) + .append(" is not chosen because the node is staled "); + } + return false; + } + } + long remaining = node.getRemaining() - (node.getBlocksScheduled() * blockSize); // check the remaining capacity of the target machine diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java index 00b0b0723b..c575fa8e11 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java @@ -64,23 +64,20 @@ public void initialize(Configuration conf, FSClusterStats stats, * @return the chosen node */ @Override - protected DatanodeDescriptor chooseLocalNode( - DatanodeDescriptor localMachine, - HashMap excludedNodes, - long blocksize, - int maxNodesPerRack, - List results) + protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine, + HashMap excludedNodes, long blocksize, int maxNodesPerRack, + List results, boolean avoidStaleNodes) throws NotEnoughReplicasException { // if no local machine, randomly choose one node if (localMachine == null) return chooseRandom(NodeBase.ROOT, excludedNodes, - blocksize, maxNodesPerRack, results); + blocksize, maxNodesPerRack, results, avoidStaleNodes); // otherwise try local machine first Node oldNode = excludedNodes.put(localMachine, localMachine); if (oldNode == null) { // was not in the excluded list if (isGoodTarget(localMachine, blocksize, - maxNodesPerRack, false, results)) { + maxNodesPerRack, false, results, avoidStaleNodes)) { results.add(localMachine); // Nodes under same nodegroup should be excluded. addNodeGroupToExcludedNodes(excludedNodes, @@ -92,13 +89,13 @@ protected DatanodeDescriptor chooseLocalNode( // try a node on local node group DatanodeDescriptor chosenNode = chooseLocalNodeGroup( (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes, - blocksize, maxNodesPerRack, results); + blocksize, maxNodesPerRack, results, avoidStaleNodes); if (chosenNode != null) { return chosenNode; } // try a node on local rack return chooseLocalRack(localMachine, excludedNodes, - blocksize, maxNodesPerRack, results); + blocksize, maxNodesPerRack, results, avoidStaleNodes); } @Override @@ -119,17 +116,15 @@ private void addNodeGroupToExcludedNodes(HashMap excludedNodes, } @Override - protected DatanodeDescriptor chooseLocalRack( - DatanodeDescriptor localMachine, - HashMap excludedNodes, - long blocksize, - int maxNodesPerRack, - List results) - throws NotEnoughReplicasException { + protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine, + HashMap excludedNodes, long blocksize, int maxNodesPerRack, + List results, boolean avoidStaleNodes) + throws NotEnoughReplicasException { // no local machine, so choose a random machine if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, - blocksize, maxNodesPerRack, results); + blocksize, maxNodesPerRack, results, + avoidStaleNodes); } // choose one from the local rack, but off-nodegroup @@ -137,7 +132,8 @@ protected DatanodeDescriptor chooseLocalRack( return chooseRandom(NetworkTopology.getFirstHalf( localMachine.getNetworkLocation()), excludedNodes, blocksize, - maxNodesPerRack, results); + maxNodesPerRack, results, + avoidStaleNodes); } catch (NotEnoughReplicasException e1) { // find the second replica DatanodeDescriptor newLocal=null; @@ -151,39 +147,39 @@ protected DatanodeDescriptor chooseLocalRack( } if (newLocal != null) { try { - return chooseRandom(clusterMap.getRack(newLocal.getNetworkLocation()), - excludedNodes, blocksize, maxNodesPerRack, results); + return chooseRandom( + clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes, + blocksize, maxNodesPerRack, results, avoidStaleNodes); } catch(NotEnoughReplicasException e2) { //otherwise randomly choose one from the network - return chooseRandom(NodeBase.ROOT, excludedNodes, - blocksize, maxNodesPerRack, results); + return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes); } } else { //otherwise randomly choose one from the network - return chooseRandom(NodeBase.ROOT, excludedNodes, - blocksize, maxNodesPerRack, results); + return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes); } } } @Override protected void chooseRemoteRack(int numOfReplicas, - DatanodeDescriptor localMachine, - HashMap excludedNodes, - long blocksize, - int maxReplicasPerRack, - List results) - throws NotEnoughReplicasException { + DatanodeDescriptor localMachine, HashMap excludedNodes, + long blocksize, int maxReplicasPerRack, List results, + boolean avoidStaleNodes) throws NotEnoughReplicasException { int oldNumOfReplicas = results.size(); // randomly choose one node from remote racks try { - chooseRandom(numOfReplicas, "~"+NetworkTopology.getFirstHalf( - localMachine.getNetworkLocation()), - excludedNodes, blocksize, maxReplicasPerRack, results); + chooseRandom( + numOfReplicas, + "~" + NetworkTopology.getFirstHalf(localMachine.getNetworkLocation()), + excludedNodes, blocksize, maxReplicasPerRack, results, + avoidStaleNodes); } catch (NotEnoughReplicasException e) { - chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas), - localMachine.getNetworkLocation(), excludedNodes, blocksize, - maxReplicasPerRack, results); + chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas), + localMachine.getNetworkLocation(), excludedNodes, blocksize, + maxReplicasPerRack, results, avoidStaleNodes); } } @@ -193,19 +189,22 @@ protected void chooseRemoteRack(int numOfReplicas, * if still no such node is available, choose a random node in the cluster. * @return the chosen node */ - private DatanodeDescriptor chooseLocalNodeGroup(NetworkTopologyWithNodeGroup clusterMap, - DatanodeDescriptor localMachine, HashMap excludedNodes, long blocksize, - int maxNodesPerRack, List results) throws NotEnoughReplicasException { + private DatanodeDescriptor chooseLocalNodeGroup( + NetworkTopologyWithNodeGroup clusterMap, DatanodeDescriptor localMachine, + HashMap excludedNodes, long blocksize, int maxNodesPerRack, + List results, boolean avoidStaleNodes) + throws NotEnoughReplicasException { // no local machine, so choose a random machine if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, - blocksize, maxNodesPerRack, results); + blocksize, maxNodesPerRack, results, avoidStaleNodes); } // choose one from the local node group try { - return chooseRandom(clusterMap.getNodeGroup(localMachine.getNetworkLocation()), - excludedNodes, blocksize, maxNodesPerRack, results); + return chooseRandom( + clusterMap.getNodeGroup(localMachine.getNetworkLocation()), + excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes); } catch (NotEnoughReplicasException e1) { // find the second replica DatanodeDescriptor newLocal=null; @@ -219,17 +218,19 @@ private DatanodeDescriptor chooseLocalNodeGroup(NetworkTopologyWithNodeGroup clu } if (newLocal != null) { try { - return chooseRandom(clusterMap.getNodeGroup(newLocal.getNetworkLocation()), - excludedNodes, blocksize, maxNodesPerRack, results); + return chooseRandom( + clusterMap.getNodeGroup(newLocal.getNetworkLocation()), + excludedNodes, blocksize, maxNodesPerRack, results, + avoidStaleNodes); } catch(NotEnoughReplicasException e2) { //otherwise randomly choose one from the network - return chooseRandom(NodeBase.ROOT, excludedNodes, - blocksize, maxNodesPerRack, results); + return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes); } } else { //otherwise randomly choose one from the network - return chooseRandom(NodeBase.ROOT, excludedNodes, - blocksize, maxNodesPerRack, results); + return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 817b183e64..cad3540d7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -76,6 +76,7 @@ import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.net.InetAddresses; /** @@ -88,8 +89,8 @@ public class DatanodeManager { private final Namesystem namesystem; private final BlockManager blockManager; - private final HeartbeatManager heartbeatManager; + private Daemon decommissionthread = null; /** * Stores the datanode -> block map. @@ -127,28 +128,33 @@ public class DatanodeManager { /** Ask Datanode only up to this many blocks to delete. */ final int blockInvalidateLimit; + /** Whether or not to check stale DataNodes for read/write */ + private final boolean checkForStaleDataNodes; + + /** The interval for judging stale DataNodes for read/write */ + private final long staleInterval; + + /** Whether or not to avoid using stale DataNodes for writing */ + private volatile boolean avoidStaleDataNodesForWrite; + + /** The number of stale DataNodes */ + private volatile int numStaleNodes; + /** * Whether or not this cluster has ever consisted of more than 1 rack, * according to the NetworkTopology. */ private boolean hasClusterEverBeenMultiRack = false; - /** Whether or not to check the stale datanodes */ - private volatile boolean checkForStaleNodes; - /** The time interval for detecting stale datanodes */ - private volatile long staleInterval; - - DatanodeManager(final BlockManager blockManager, - final Namesystem namesystem, final Configuration conf - ) throws IOException { + DatanodeManager(final BlockManager blockManager, final Namesystem namesystem, + final Configuration conf) throws IOException { this.namesystem = namesystem; this.blockManager = blockManager; Class networkTopologyClass = conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, NetworkTopology.class, NetworkTopology.class); - networktopology = (NetworkTopology) ReflectionUtils.newInstance( - networkTopologyClass, conf); + networktopology = ReflectionUtils.newInstance(networkTopologyClass, conf); this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf); @@ -181,25 +187,69 @@ public class DatanodeManager { DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit); LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY + "=" + this.blockInvalidateLimit); - // set the value of stale interval based on configuration - this.checkForStaleNodes = conf.getBoolean( + + checkForStaleDataNodes = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT); - if (this.checkForStaleNodes) { - this.staleInterval = conf.getLong( - DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, - DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT); - if (this.staleInterval < DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT) { - LOG.warn("The given interval for marking stale datanode = " - + this.staleInterval + ", which is smaller than the default value " - + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT - + "."); - } - } + + staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval); + avoidStaleDataNodesForWrite = getAvoidStaleForWriteFromConf(conf, + checkForStaleDataNodes); } - - private Daemon decommissionthread = null; - + + private static long getStaleIntervalFromConf(Configuration conf, + long heartbeatExpireInterval) { + long staleInterval = conf.getLong( + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT); + Preconditions.checkArgument(staleInterval > 0, + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY + + " = '" + staleInterval + "' is invalid. " + + "It should be a positive non-zero value."); + + final long heartbeatIntervalSeconds = conf.getLong( + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); + // The stale interval value cannot be smaller than + // 3 times of heartbeat interval + final long minStaleInterval = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY, + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT) + * heartbeatIntervalSeconds * 1000; + if (staleInterval < minStaleInterval) { + LOG.warn("The given interval for marking stale datanode = " + + staleInterval + ", which is less than " + + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT + + " heartbeat intervals. This may cause too frequent changes of " + + "stale states of DataNodes since a heartbeat msg may be missing " + + "due to temporary short-term failures. Reset stale interval to " + + minStaleInterval + "."); + staleInterval = minStaleInterval; + } + if (staleInterval > heartbeatExpireInterval) { + LOG.warn("The given interval for marking stale datanode = " + + staleInterval + ", which is larger than heartbeat expire interval " + + heartbeatExpireInterval + "."); + } + return staleInterval; + } + + static boolean getAvoidStaleForWriteFromConf(Configuration conf, + boolean checkForStale) { + boolean avoid = conf.getBoolean( + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT); + boolean avoidStaleDataNodesForWrite = checkForStale && avoid; + if (!checkForStale && avoid) { + LOG.warn("Cannot set " + + DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY + + " as false while setting " + + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY + + " as true."); + } + return avoidStaleDataNodesForWrite; + } + void activate(final Configuration conf) { final DecommissionManager dm = new DecommissionManager(namesystem, blockManager); this.decommissionthread = new Daemon(dm.new Monitor( @@ -253,9 +303,10 @@ public void sortLocatedBlocks(final String targethost, client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost); } - Comparator comparator = checkForStaleNodes ? - new DFSUtil.DecomStaleComparator(staleInterval) : - DFSUtil.DECOM_COMPARATOR; + Comparator comparator = checkForStaleDataNodes ? + new DFSUtil.DecomStaleComparator(staleInterval) : + DFSUtil.DECOM_COMPARATOR; + for (LocatedBlock b : locatedblocks) { networktopology.pseudoSortByDistance(client, b.getLocations()); // Move decommissioned/stale datanodes to the bottom @@ -723,7 +774,7 @@ private void refreshHostsReader(Configuration conf) throws IOException { * 3. Added to exclude --> start decommission. * 4. Removed from exclude --> stop decommission. */ - private void refreshDatanodes() throws IOException { + private void refreshDatanodes() { for(DatanodeDescriptor node : datanodeMap.values()) { // Check if not include. if (!inHostsList(node)) { @@ -782,7 +833,61 @@ public List getDecommissioningNodes() { namesystem.readUnlock(); } } + + /* Getter and Setter for stale DataNodes related attributes */ + + /** + * @return whether or not to avoid writing to stale datanodes + */ + public boolean isAvoidingStaleDataNodesForWrite() { + return avoidStaleDataNodesForWrite; + } + /** + * Set the value of {@link DatanodeManager#avoidStaleDataNodesForWrite}. + * The HeartbeatManager disable avoidStaleDataNodesForWrite when more than + * half of the DataNodes are marked as stale. + * + * @param avoidStaleDataNodesForWrite + * The value to set to + * {@link DatanodeManager#avoidStaleDataNodesForWrite} + */ + void setAvoidStaleDataNodesForWrite(boolean avoidStaleDataNodesForWrite) { + this.avoidStaleDataNodesForWrite = avoidStaleDataNodesForWrite; + } + + /** + * @return Whether or not to check stale DataNodes for R/W + */ + boolean isCheckingForStaleDataNodes() { + return checkForStaleDataNodes; + } + + /** + * @return The time interval used to mark DataNodes as stale. + */ + long getStaleInterval() { + return staleInterval; + } + + /** + * Set the number of current stale DataNodes. The HeartbeatManager got this + * number based on DataNodes' heartbeats. + * + * @param numStaleNodes + * The number of stale DataNodes to be set. + */ + void setNumStaleNodes(int numStaleNodes) { + this.numStaleNodes = numStaleNodes; + } + + /** + * @return Return the current number of stale DataNodes (detected by + * HeartbeatManager). + */ + int getNumStaleNodes() { + return this.numStaleNodes; + } /** Fetch live and dead datanodes. */ public void fetchDatanodes(final List live, @@ -961,7 +1066,7 @@ public List getDatanodeListForReport( return nodes; } - private void setDatanodeDead(DatanodeDescriptor node) throws IOException { + private void setDatanodeDead(DatanodeDescriptor node) { node.setLastUpdate(0); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index 3030007707..6ee65d38c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -30,6 +30,8 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Time; +import com.google.common.base.Preconditions; + /** * Manage the heartbeats received from datanodes. * The datanode list and statistics are synchronized @@ -54,18 +56,48 @@ class HeartbeatManager implements DatanodeStatistics { private final long heartbeatRecheckInterval; /** Heartbeat monitor thread */ private final Daemon heartbeatThread = new Daemon(new Monitor()); - + /** + * The initial setting of end user which indicates whether or not to avoid + * writing to stale datanodes. + */ + private final boolean initialAvoidWriteStaleNodes; + /** + * When the ratio of stale datanodes reaches this number, stop avoiding + * writing to stale datanodes, i.e., continue using stale nodes for writing. + */ + private final float ratioUseStaleDataNodesForWrite; + final Namesystem namesystem; final BlockManager blockManager; - HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager, - final Configuration conf) { - this.heartbeatRecheckInterval = conf.getInt( - DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, - DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes - + HeartbeatManager(final Namesystem namesystem, + final BlockManager blockManager, final Configuration conf) { this.namesystem = namesystem; this.blockManager = blockManager; + boolean checkStaleNodes = conf.getBoolean( + DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, + DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT); + long recheckInterval = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min + long staleInterval = conf.getLong( + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s + this.initialAvoidWriteStaleNodes = DatanodeManager + .getAvoidStaleForWriteFromConf(conf, checkStaleNodes); + this.ratioUseStaleDataNodesForWrite = conf.getFloat( + DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY, + DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT); + Preconditions.checkArgument( + (ratioUseStaleDataNodesForWrite > 0 && + ratioUseStaleDataNodesForWrite <= 1.0f), + DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY + + " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " + + "It should be a positive non-zero float value, not greater than 1.0f."); + + this.heartbeatRecheckInterval = (checkStaleNodes + && initialAvoidWriteStaleNodes + && staleInterval < recheckInterval) ? staleInterval : recheckInterval; } void activate(Configuration conf) { @@ -210,16 +242,39 @@ void heartbeatCheck() { if (namesystem.isInSafeMode()) { return; } + boolean checkStaleNodes = dm.isCheckingForStaleDataNodes(); boolean allAlive = false; while (!allAlive) { // locate the first dead node. DatanodeID dead = null; + // check the number of stale nodes + int numOfStaleNodes = 0; synchronized(this) { for (DatanodeDescriptor d : datanodes) { - if (dm.isDatanodeDead(d)) { + if (dead == null && dm.isDatanodeDead(d)) { stats.incrExpiredHeartbeats(); dead = d; - break; + if (!checkStaleNodes) { + break; + } + } + if (checkStaleNodes && + d.isStale(dm.getStaleInterval())) { + numOfStaleNodes++; + } + } + + // Change whether to avoid using stale datanodes for writing + // based on proportion of stale datanodes + if (checkStaleNodes) { + dm.setNumStaleNodes(numOfStaleNodes); + if (numOfStaleNodes > + datanodes.size() * ratioUseStaleDataNodesForWrite) { + dm.setAvoidStaleDataNodesForWrite(false); + } else { + if (this.initialAvoidWriteStaleNodes) { + dm.setAvoidStaleDataNodesForWrite(true); + } } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c79b3fd08c..1983f40243 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -251,7 +251,7 @@ public static InetSocketAddress createSocketAddr(String target) { Daemon dataXceiverServer = null; ThreadGroup threadGroup = null; private DNConf dnConf; - private boolean heartbeatsDisabledForTests = false; + private volatile boolean heartbeatsDisabledForTests = false; private DataStorage storage = null; private HttpServer infoServer = null; DataNodeMetrics metrics; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java index 0840b840d4..f4827f38c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java @@ -32,8 +32,16 @@ public interface FSClusterStats { * @return a count of the total number of block transfers and block * writes that are currently occuring on the cluster. */ - - public int getTotalLoad() ; + public int getTotalLoad(); + + /** + * Indicate whether or not the cluster is now avoiding + * to use stale DataNodes for writing. + * + * @return True if the cluster is currently avoiding using stale DataNodes + * for writing targets, and false otherwise. + */ + public boolean isAvoidingStaleDataNodesForWrite(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 36cde19722..a907a57ff7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -5539,4 +5539,10 @@ public SafeModeInfo getSafeModeInfoForTests() { public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) { this.nnResourceChecker = nnResourceChecker; } + + @Override + public boolean isAvoidingStaleDataNodesForWrite() { + return this.blockManager.getDatanodeManager() + .isAvoidingStaleDataNodesForWrite(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 153e21ba15..2452f28433 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -988,12 +988,28 @@ dfs.namenode.check.stale.datanode false - Indicate whether or not to check "stale" datanodes whose - heartbeat messages have not been received by the namenode - for more than a specified time interval. If this configuration - parameter is set as true, the stale datanodes will be moved to - the end of the target node list for reading. The writing will - also try to avoid stale nodes. + Indicate whether or not to check "stale" datanodes whose + heartbeat messages have not been received by the namenode + for more than a specified time interval. If this configuration + parameter is set as true, the system will keep track + of the number of stale datanodes. The stale datanodes will be + moved to the end of the node list returned for reading. See + dfs.namenode.avoid.write.stale.datanode for details on how this + affects writes. + + + + + dfs.namenode.avoid.write.stale.datanode + false + + Indicate whether or not to avoid writing to "stale" datanodes whose + heartbeat messages have not been received by the namenode + for more than a specified time interval. If this configuration + parameter and dfs.namenode.check.stale.datanode are both set as true, + the writing will avoid using stale datanodes unless a high number + of datanodes are marked as stale. See + dfs.namenode.write.stale.datanode.ratio for details. @@ -1001,10 +1017,24 @@ dfs.namenode.stale.datanode.interval 30000 - Default time interval for marking a datanode as "stale", i.e., if - the namenode has not received heartbeat msg from a datanode for - more than this time interval, the datanode will be marked and treated - as "stale" by default. + Default time interval for marking a datanode as "stale", i.e., if + the namenode has not received heartbeat msg from a datanode for + more than this time interval, the datanode will be marked and treated + as "stale" by default. The stale interval cannot be too small since + otherwise this may cause too frequent change of stale states. + We thus set a minimum stale interval value (the default value is 3 times + of heartbeat interval) and guarantee that the stale interval cannot be less + than the minimum value. + + + + + dfs.namenode.write.stale.datanode.ratio + 0.5f + + When the ratio of number stale datanodes to total datanodes marked + is greater than this ratio, stop avoiding writing to stale nodes so + as to prevent causing hotspots. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 4d7356eb4f..7fa2fcbf72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -38,9 +38,12 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; +import org.apache.hadoop.util.Time; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -55,6 +58,9 @@ public class TestReplicationPolicy { private static BlockPlacementPolicy replicator; private static final String filename = "/dummyfile.txt"; private static DatanodeDescriptor dataNodes[]; + // The interval for marking a datanode as stale, + private static long staleInterval = + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT; @Rule public ExpectedException exception = ExpectedException.none(); @@ -77,6 +83,8 @@ public static void setupCluster() throws Exception { "test.build.data", "build/test/data"), "dfs/"); conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(baseDir, "name").getPath()); + // Enable the checking for stale datanodes in the beginning + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true); DFSTestUtil.formatNameNode(conf); namenode = new NameNode(conf); @@ -229,7 +237,7 @@ public void testChooseTarget2() throws Exception { assertEquals(2, targets.length); //make sure that the chosen node is in the target. int i = 0; - for(; i < targets.length && !dataNodes[2].equals(targets[i]); i++); + for (; i < targets.length && !dataNodes[2].equals(targets[i]); i++); assertTrue(i < targets.length); } @@ -369,6 +377,202 @@ public void testChooseTarget5() throws Exception { assertTrue(cluster.isOnSameRack(targets[1], targets[2])); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); } + + private boolean containsWithinRange(DatanodeDescriptor target, + DatanodeDescriptor[] nodes, int startIndex, int endIndex) { + assert startIndex >= 0 && startIndex < nodes.length; + assert endIndex >= startIndex && endIndex < nodes.length; + for (int i = startIndex; i <= endIndex; i++) { + if (nodes[i].equals(target)) { + return true; + } + } + return false; + } + + @Test + public void testChooseTargetWithStaleNodes() throws Exception { + // Enable avoidng writing to stale datanodes + namenode.getNamesystem().getBlockManager().getDatanodeManager() + .setAvoidStaleDataNodesForWrite(true); + // Set dataNodes[0] as stale + dataNodes[0].setLastUpdate(Time.now() - staleInterval - 1); + + DatanodeDescriptor[] targets; + // We set the datanode[0] as stale, thus should choose datanode[1] since + // datanode[1] is on the same rack with datanode[0] (writer) + targets = replicator.chooseTarget(filename, 1, dataNodes[0], + new ArrayList(), BLOCK_SIZE); + assertEquals(targets.length, 1); + assertEquals(targets[0], dataNodes[1]); + + HashMap excludedNodes = new HashMap(); + excludedNodes.put(dataNodes[1], dataNodes[1]); + List chosenNodes = new ArrayList(); + BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator; + targets = chooseTarget(repl, 1, dataNodes[0], chosenNodes, excludedNodes, + BLOCK_SIZE); + assertEquals(targets.length, 1); + assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0])); + + // reset + namenode.getNamesystem().getBlockManager().getDatanodeManager() + .setAvoidStaleDataNodesForWrite(false); + dataNodes[0].setLastUpdate(Time.now()); + } + + /** + * In this testcase, we set 3 nodes (dataNodes[0] ~ dataNodes[2]) as stale, + * and when the number of replicas is less or equal to 3, all the healthy + * datanodes should be returned by the chooseTarget method. When the number + * of replicas is 4, a stale node should be included. + * + * @throws Exception + */ + @Test + public void testChooseTargetWithHalfStaleNodes() throws Exception { + // Enable stale datanodes checking + namenode.getNamesystem().getBlockManager().getDatanodeManager() + .setAvoidStaleDataNodesForWrite(true); + // Set dataNodes[0], dataNodes[1], and dataNodes[2] as stale + for (int i = 0; i < 3; i++) { + dataNodes[i].setLastUpdate(Time.now() - staleInterval - 1); + } + + DatanodeDescriptor[] targets; + targets = replicator.chooseTarget(filename, 0, dataNodes[0], + new ArrayList(), BLOCK_SIZE); + assertEquals(targets.length, 0); + + // We set the datanode[0] as stale, thus should choose datanode[1] + targets = replicator.chooseTarget(filename, 1, dataNodes[0], + new ArrayList(), BLOCK_SIZE); + assertEquals(targets.length, 1); + assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2)); + + targets = replicator.chooseTarget(filename, 2, dataNodes[0], + new ArrayList(), BLOCK_SIZE); + assertEquals(targets.length, 2); + assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2)); + assertFalse(containsWithinRange(targets[1], dataNodes, 0, 2)); + + targets = replicator.chooseTarget(filename, 3, dataNodes[0], + new ArrayList(), BLOCK_SIZE); + assertEquals(targets.length, 3); + assertTrue(containsWithinRange(targets[0], dataNodes, 3, 5)); + assertTrue(containsWithinRange(targets[1], dataNodes, 3, 5)); + assertTrue(containsWithinRange(targets[2], dataNodes, 3, 5)); + + targets = replicator.chooseTarget(filename, 4, dataNodes[0], + new ArrayList(), BLOCK_SIZE); + assertEquals(targets.length, 4); + assertTrue(containsWithinRange(dataNodes[3], targets, 0, 3)); + assertTrue(containsWithinRange(dataNodes[4], targets, 0, 3)); + assertTrue(containsWithinRange(dataNodes[5], targets, 0, 3)); + + // reset + namenode.getNamesystem().getBlockManager().getDatanodeManager() + .setAvoidStaleDataNodesForWrite(false); + for (int i = 0; i < dataNodes.length; i++) { + dataNodes[i].setLastUpdate(Time.now()); + } + } + + @Test + public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true); + String[] hosts = new String[]{"host1", "host2", "host3", + "host4", "host5", "host6"}; + String[] racks = new String[]{"/d1/r1", "/d1/r1", "/d1/r2", + "/d1/r2", "/d2/r3", "/d2/r3"}; + MiniDFSCluster miniCluster = new MiniDFSCluster.Builder(conf).racks(racks) + .hosts(hosts).numDataNodes(hosts.length).build(); + miniCluster.waitActive(); + + try { + // Step 1. Make two datanodes as stale, check whether the + // avoidStaleDataNodesForWrite calculation is correct. + // First stop the heartbeat of host1 and host2 + for (int i = 0; i < 2; i++) { + DataNode dn = miniCluster.getDataNodes().get(i); + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + miniCluster.getNameNode().getNamesystem().getBlockManager() + .getDatanodeManager().getDatanode(dn.getDatanodeId()) + .setLastUpdate(Time.now() - staleInterval - 1); + } + // Instead of waiting, explicitly call heartbeatCheck to + // let heartbeat manager to detect stale nodes + miniCluster.getNameNode().getNamesystem().getBlockManager() + .getDatanodeManager().getHeartbeatManager().heartbeatCheck(); + int numStaleNodes = miniCluster.getNameNode().getNamesystem() + .getBlockManager().getDatanodeManager().getNumStaleNodes(); + assertEquals(numStaleNodes, 2); + assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager() + .getDatanodeManager().isAvoidingStaleDataNodesForWrite()); + // Call chooseTarget + DatanodeDescriptor staleNodeInfo = miniCluster.getNameNode() + .getNamesystem().getBlockManager().getDatanodeManager() + .getDatanode(miniCluster.getDataNodes().get(0).getDatanodeId()); + BlockPlacementPolicy replicator = miniCluster.getNameNode() + .getNamesystem().getBlockManager().getBlockPlacementPolicy(); + DatanodeDescriptor[] targets = replicator.chooseTarget(filename, 3, + staleNodeInfo, new ArrayList(), BLOCK_SIZE); + assertEquals(targets.length, 3); + assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo)); + + // Step 2. Set more than half of the datanodes as stale + for (int i = 0; i < 4; i++) { + DataNode dn = miniCluster.getDataNodes().get(i); + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + miniCluster.getNameNode().getNamesystem().getBlockManager() + .getDatanodeManager().getDatanode(dn.getDatanodeId()) + .setLastUpdate(Time.now() - staleInterval - 1); + } + // Explicitly call heartbeatCheck + miniCluster.getNameNode().getNamesystem().getBlockManager() + .getDatanodeManager().getHeartbeatManager().heartbeatCheck(); + numStaleNodes = miniCluster.getNameNode().getNamesystem() + .getBlockManager().getDatanodeManager().getNumStaleNodes(); + assertEquals(numStaleNodes, 4); + // According to our strategy, stale datanodes will be included for writing + // to avoid hotspots + assertFalse(miniCluster.getNameNode().getNamesystem().getBlockManager() + .getDatanodeManager().isAvoidingStaleDataNodesForWrite()); + // Call chooseTarget + targets = replicator.chooseTarget(filename, 3, + staleNodeInfo, new ArrayList(), BLOCK_SIZE); + assertEquals(targets.length, 3); + assertTrue(cluster.isOnSameRack(targets[0], staleNodeInfo)); + + // Step 3. Set 2 stale datanodes back to healthy nodes, + // still have 2 stale nodes + for (int i = 2; i < 4; i++) { + DataNode dn = miniCluster.getDataNodes().get(i); + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false); + miniCluster.getNameNode().getNamesystem().getBlockManager() + .getDatanodeManager().getDatanode(dn.getDatanodeId()) + .setLastUpdate(Time.now()); + } + // Explicitly call heartbeatCheck + miniCluster.getNameNode().getNamesystem().getBlockManager() + .getDatanodeManager().getHeartbeatManager().heartbeatCheck(); + numStaleNodes = miniCluster.getNameNode().getNamesystem() + .getBlockManager().getDatanodeManager().getNumStaleNodes(); + assertEquals(numStaleNodes, 2); + assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager() + .getDatanodeManager().isAvoidingStaleDataNodesForWrite()); + // Call chooseTarget + targets = replicator.chooseTarget(filename, 3, + staleNodeInfo, new ArrayList(), BLOCK_SIZE); + assertEquals(targets.length, 3); + assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo)); + } finally { + miniCluster.shutdown(); + } + } /** * This testcase tests re-replication, when dataNodes[0] is already chosen. @@ -490,8 +694,8 @@ public void testReplicationWithPriority() throws Exception { .format(true).build(); try { cluster.waitActive(); - final UnderReplicatedBlocks neededReplications = (UnderReplicatedBlocks) cluster - .getNameNode().getNamesystem().getBlockManager().neededReplications; + final UnderReplicatedBlocks neededReplications = cluster.getNameNode() + .getNamesystem().getBlockManager().neededReplications; for (int i = 0; i < 100; i++) { // Adding the blocks directly to normal priority neededReplications.add(new Block(random.nextLong()), 2, 0, 3); @@ -529,10 +733,10 @@ public void testChooseUnderReplicatedBlocks() throws Exception { // Adding QUEUE_VERY_UNDER_REPLICATED block underReplicatedBlocks.add(new Block(random.nextLong()), 2, 0, 7); - // Adding QUEUE_UNDER_REPLICATED block + // Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block underReplicatedBlocks.add(new Block(random.nextLong()), 6, 0, 6); - // Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block + // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(new Block(random.nextLong()), 5, 0, 6); // Adding QUEUE_WITH_CORRUPT_BLOCKS block @@ -618,6 +822,11 @@ public void testChooseReplicaToDelete() throws Exception { dataNodes[5].setRemaining(1*1024*1024); replicaNodeList.add(dataNodes[5]); + // Refresh the last update time for all the datanodes + for (int i = 0; i < dataNodes.length; i++) { + dataNodes[i].setLastUpdate(Time.now()); + } + List first = new ArrayList(); List second = new ArrayList(); replicator.splitNodesWithRack(