diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index ff531cdc2a..56bbe8843d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1210,6 +1210,19 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH = false; + /** + * When non empty, untracked nodes are deleted only if their state is one of + * the states defined by this config. When empty, all the states are eligible + * for removal + * Eligible states are defined by enum values here: + * @see org.apache.hadoop.yarn.api.records.NodeState + * Example: LOST,DECOMMISSIONED + */ + public static final String RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE = + RM_PREFIX + "node-removal-untracked.node-selective-states-to-remove"; + public static final String[] + DEFAULT_RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE = {}; + /** * RM proxy users' prefix */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index aea9226001..0069e9ef36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5316,4 +5316,16 @@ + + + If Yarn untracked removal is enabled, then this config can control what all + node states can be removed. If the untracked node is not having one of these + states, then node will skipped for removal. If this config value is set to + empty, all node states, will be eligible for removal + NodeState is an ENUM: org.apache.hadoop.yarn.api.records.NodeState + + yarn.resourcemanager.node-removal-untracked.node-selective-states-to-remove + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 21be92169a..6b5bf0e4f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -30,7 +31,9 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -128,6 +131,10 @@ protected void serviceInit(Configuration conf) throws Exception { enableNodeUntrackedWithoutIncludePath = conf.getBoolean( YarnConfiguration.RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH, YarnConfiguration.DEFAULT_RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH); + final Set untrackedSelectiveStatesToRemove = Arrays.stream(conf.getStrings( + YarnConfiguration.RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE, + YarnConfiguration.DEFAULT_RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE)) + .collect(Collectors.toSet()); final int nodeRemovalTimeout = conf.getInt( YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, @@ -146,6 +153,13 @@ public void run() { NodeId nodeId = entry.getKey(); RMNode rmNode = entry.getValue(); if (isUntrackedNode(rmNode.getHostName())) { + if(CollectionUtils.isNotEmpty(untrackedSelectiveStatesToRemove) && + !untrackedSelectiveStatesToRemove.contains(rmNode.getState().toString())) { + LOG.warn("Untracked node {}, with node state {} is not part of " + + "node-removal-untracked.node-selective-states-to-remove config", + rmNode.getHostName(), rmNode.getState().toString()); + continue; + } if (rmNode.getUntrackedTimeStamp() == 0) { rmNode.setUntrackedTimeStamp(now); } else diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index e4f0b79e37..358cf9e0f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -3222,4 +3222,85 @@ public void testDecommissionWithoutIncludeFile() throws Exception { rm.close(); } + + /** + * Decommissioning with selective states for untracked nodes. + */ + @Test + public void testDecommissionWithSelectiveStates() throws Exception { + // clear exclude hosts + writeToHostsFile(excludeHostFile, ""); + // init conf: + // (1) set untracked removal timeout to 500ms + // (2) set exclude path (no include path) + // (3) enable node untracked without pre-configured include path + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, 500); + conf.setBoolean(YarnConfiguration.RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH, true); + conf.setStrings(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE, + "DECOMMISSIONED", "SHUTDOWN"); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile.getAbsolutePath()); + + rm = new MockRM(conf); + rm.start(); + MockNM nm1 = rm.registerNode("host1:1234", 10240); + MockNM nm2 = rm.registerNode("host2:1234", 10240); + MockNM nm3 = rm.registerNode("host3:1234", 10240); + MockNM nm4 = rm.registerNode("host4:1234", 10240); + assertEquals(4, rm.getRMContext().getRMNodes().size()); + assertEquals(0, rm.getRMContext().getInactiveRMNodes().size()); + + // decommission nm1 via adding nm1 into exclude hosts + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + writeToHostsFile(excludeHostFile, "host1"); + rm.getNodesListManager().refreshNodes(conf); + rm.drainEvents(); + assertEquals(rmNode1.getState(), NodeState.DECOMMISSIONED); + assertEquals(3, rm.getRMContext().getRMNodes().size()); + assertEquals(1, rm.getRMContext().getInactiveRMNodes().size()); + assertEquals(new HashSet(Arrays.asList(nm1.getNodeId())), + rm.getRMContext().getInactiveRMNodes().keySet()); + + // remove nm1 from exclude hosts, so that it will be marked as untracked + // and removed from inactive nodes after the timeout + writeToHostsFile(excludeHostFile, ""); + rm.getNodesListManager().refreshNodes(conf); + // confirmed that nm1 should be removed from inactive nodes in 1 second + GenericTestUtils.waitFor(() -> rm.getRMContext().getInactiveRMNodes().size() == 0, + 100, 1000); + + // lost nm2 + RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + rm.getRMContext().getDispatcher().getEventHandler() + .handle(new RMNodeEvent(nm2.getNodeId(), RMNodeEventType.EXPIRE)); + rm.drainEvents(); + assertEquals(rmNode2.getState(), NodeState.LOST); + assertEquals(2, rm.getRMContext().getRMNodes().size()); + assertEquals(1, rm.getRMContext().getInactiveRMNodes().size()); + // confirmed that nm2 should not be removed from inactive nodes in 1 second + GenericTestUtils.waitFor(() -> rm.getRMContext().getInactiveRMNodes().size() == 1, + 100, 1000); + + // shutdown nm3 + RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId()); + rm.getRMContext().getDispatcher().getEventHandler() + .handle(new RMNodeEvent(nm3.getNodeId(), RMNodeEventType.SHUTDOWN)); + rm.drainEvents(); + assertEquals(rmNode3.getState(), NodeState.SHUTDOWN); + assertEquals(1, rm.getRMContext().getRMNodes().size()); + assertEquals(2, rm.getRMContext().getInactiveRMNodes().size()); + // confirmed that nm3 should be removed from inactive nodes in 1 second + GenericTestUtils.waitFor(() -> rm.getRMContext().getInactiveRMNodes().size() == 1, + 100, 1000); + + // nm4 is still active node at last + assertEquals(new HashSet(Arrays.asList(nm4.getNodeId())), + rm.getRMContext().getRMNodes().keySet()); + + // nm2 is still inactive node at last, not removed + assertEquals(new HashSet(Arrays.asList(nm2.getNodeId())), + rm.getRMContext().getInactiveRMNodes().keySet()); + + rm.close(); + } }