YARN-11497 : Support removal of only selective node states in untracked removal flow (#5681)

Co-authored-by: mudit.sharma <mudit.sharma@flipkart.com>
Reviewed-by: Shilun Fan <slfan1989@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
mudit-97 2023-06-05 15:06:10 +05:30 committed by GitHub
parent 2243cfd225
commit e69a077af8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 120 additions and 0 deletions

View File

@ -1210,6 +1210,19 @@ public static boolean isAclEnabled(Configuration conf) {
public static final boolean
DEFAULT_RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH = false;
/**
* When non empty, untracked nodes are deleted only if their state is one of
* the states defined by this config. When empty, all the states are eligible
* for removal
* Eligible states are defined by enum values here:
* @see org.apache.hadoop.yarn.api.records.NodeState
* Example: LOST,DECOMMISSIONED
*/
public static final String RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE =
RM_PREFIX + "node-removal-untracked.node-selective-states-to-remove";
public static final String[]
DEFAULT_RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE = {};
/**
* RM proxy users' prefix
*/

View File

@ -5316,4 +5316,16 @@
</description>
</property>
<property>
<description>
If Yarn untracked removal is enabled, then this config can control what all
node states can be removed. If the untracked node is not having one of these
states, then node will skipped for removal. If this config value is set to
empty, all node states, will be eligible for removal
NodeState is an ENUM: org.apache.hadoop.yarn.api.records.NodeState
</description>
<name>yarn.resourcemanager.node-removal-untracked.node-selective-states-to-remove</name>
<value></value>
</property>
</configuration>

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -30,7 +31,9 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@ -128,6 +131,10 @@ protected void serviceInit(Configuration conf) throws Exception {
enableNodeUntrackedWithoutIncludePath = conf.getBoolean(
YarnConfiguration.RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH,
YarnConfiguration.DEFAULT_RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH);
final Set<String> untrackedSelectiveStatesToRemove = Arrays.stream(conf.getStrings(
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE,
YarnConfiguration.DEFAULT_RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE))
.collect(Collectors.toSet());
final int nodeRemovalTimeout =
conf.getInt(
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
@ -146,6 +153,13 @@ public void run() {
NodeId nodeId = entry.getKey();
RMNode rmNode = entry.getValue();
if (isUntrackedNode(rmNode.getHostName())) {
if(CollectionUtils.isNotEmpty(untrackedSelectiveStatesToRemove) &&
!untrackedSelectiveStatesToRemove.contains(rmNode.getState().toString())) {
LOG.warn("Untracked node {}, with node state {} is not part of " +
"node-removal-untracked.node-selective-states-to-remove config",
rmNode.getHostName(), rmNode.getState().toString());
continue;
}
if (rmNode.getUntrackedTimeStamp() == 0) {
rmNode.setUntrackedTimeStamp(now);
} else

View File

@ -3222,4 +3222,85 @@ public void testDecommissionWithoutIncludeFile() throws Exception {
rm.close();
}
/**
* Decommissioning with selective states for untracked nodes.
*/
@Test
public void testDecommissionWithSelectiveStates() throws Exception {
// clear exclude hosts
writeToHostsFile(excludeHostFile, "");
// init conf:
// (1) set untracked removal timeout to 500ms
// (2) set exclude path (no include path)
// (3) enable node untracked without pre-configured include path
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, 500);
conf.setBoolean(YarnConfiguration.RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH, true);
conf.setStrings(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE,
"DECOMMISSIONED", "SHUTDOWN");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile.getAbsolutePath());
rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 10240);
MockNM nm2 = rm.registerNode("host2:1234", 10240);
MockNM nm3 = rm.registerNode("host3:1234", 10240);
MockNM nm4 = rm.registerNode("host4:1234", 10240);
assertEquals(4, rm.getRMContext().getRMNodes().size());
assertEquals(0, rm.getRMContext().getInactiveRMNodes().size());
// decommission nm1 via adding nm1 into exclude hosts
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
writeToHostsFile(excludeHostFile, "host1");
rm.getNodesListManager().refreshNodes(conf);
rm.drainEvents();
assertEquals(rmNode1.getState(), NodeState.DECOMMISSIONED);
assertEquals(3, rm.getRMContext().getRMNodes().size());
assertEquals(1, rm.getRMContext().getInactiveRMNodes().size());
assertEquals(new HashSet(Arrays.asList(nm1.getNodeId())),
rm.getRMContext().getInactiveRMNodes().keySet());
// remove nm1 from exclude hosts, so that it will be marked as untracked
// and removed from inactive nodes after the timeout
writeToHostsFile(excludeHostFile, "");
rm.getNodesListManager().refreshNodes(conf);
// confirmed that nm1 should be removed from inactive nodes in 1 second
GenericTestUtils.waitFor(() -> rm.getRMContext().getInactiveRMNodes().size() == 0,
100, 1000);
// lost nm2
RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
rm.getRMContext().getDispatcher().getEventHandler()
.handle(new RMNodeEvent(nm2.getNodeId(), RMNodeEventType.EXPIRE));
rm.drainEvents();
assertEquals(rmNode2.getState(), NodeState.LOST);
assertEquals(2, rm.getRMContext().getRMNodes().size());
assertEquals(1, rm.getRMContext().getInactiveRMNodes().size());
// confirmed that nm2 should not be removed from inactive nodes in 1 second
GenericTestUtils.waitFor(() -> rm.getRMContext().getInactiveRMNodes().size() == 1,
100, 1000);
// shutdown nm3
RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId());
rm.getRMContext().getDispatcher().getEventHandler()
.handle(new RMNodeEvent(nm3.getNodeId(), RMNodeEventType.SHUTDOWN));
rm.drainEvents();
assertEquals(rmNode3.getState(), NodeState.SHUTDOWN);
assertEquals(1, rm.getRMContext().getRMNodes().size());
assertEquals(2, rm.getRMContext().getInactiveRMNodes().size());
// confirmed that nm3 should be removed from inactive nodes in 1 second
GenericTestUtils.waitFor(() -> rm.getRMContext().getInactiveRMNodes().size() == 1,
100, 1000);
// nm4 is still active node at last
assertEquals(new HashSet(Arrays.asList(nm4.getNodeId())),
rm.getRMContext().getRMNodes().keySet());
// nm2 is still inactive node at last, not removed
assertEquals(new HashSet(Arrays.asList(nm2.getNodeId())),
rm.getRMContext().getInactiveRMNodes().keySet());
rm.close();
}
}