YARN-9253. Add UT to verify Placement Constraint in Distributed Shell. Contributed by Prabhu Joseph.

This commit is contained in:
Weiwei Yang 2019-02-06 22:36:51 +08:00
parent 911790cc26
commit 711d22f166
2 changed files with 63 additions and 11 deletions

View File

@ -18,12 +18,16 @@
package org.apache.hadoop.yarn.applications.distributedshell; package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
@ -35,10 +39,10 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class TestDistributedShellWithNodeLabels { public class TestDSWithMultipleNodeManager {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(TestDistributedShellWithNodeLabels.class); LoggerFactory.getLogger(TestDSWithMultipleNodeManager.class);
static final int NUM_NMS = 2; static final int NUM_NMS = 2;
TestDistributedShell distShellTest; TestDistributedShell distShellTest;
@ -47,7 +51,7 @@ public void setup() throws Exception {
distShellTest = new TestDistributedShell(); distShellTest = new TestDistributedShell();
distShellTest.setupInternal(NUM_NMS); distShellTest.setupInternal(NUM_NMS);
} }
private void initializeNodeLabels() throws IOException { private void initializeNodeLabels() throws IOException {
RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext(); RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext();
@ -77,11 +81,11 @@ private void initializeNodeLabels() throws IOException {
// Set label x to NM[1] // Set label x to NM[1]
labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels)); labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels));
} }
@Test(timeout=90000) @Test(timeout=90000)
public void testDSShellWithNodeLabelExpression() throws Exception { public void testDSShellWithNodeLabelExpression() throws Exception {
initializeNodeLabels(); initializeNodeLabels();
// Start NMContainerMonitor // Start NMContainerMonitor
NMContainerMonitor mon = new NMContainerMonitor(); NMContainerMonitor mon = new NMContainerMonitor();
Thread t = new Thread(mon); Thread t = new Thread(mon);
@ -117,9 +121,9 @@ public void testDSShellWithNodeLabelExpression() throws Exception {
LOG.info("Running DS Client"); LOG.info("Running DS Client");
boolean result = client.run(); boolean result = client.run();
LOG.info("Client run completed. Result=" + result); LOG.info("Client run completed. Result=" + result);
t.interrupt(); t.interrupt();
// Check maximum number of containers on each NMs // Check maximum number of containers on each NMs
int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport(); int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
// Check no container allocated on NM[0] // Check no container allocated on NM[0]
@ -127,7 +131,54 @@ public void testDSShellWithNodeLabelExpression() throws Exception {
// Check there're some containers allocated on NM[1] // Check there're some containers allocated on NM[1]
Assert.assertTrue(maxRunningContainersOnNMs[1] > 0); Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
} }
@Test(timeout = 90000)
public void testDistributedShellWithPlacementConstraint()
throws Exception {
NMContainerMonitor mon = new NMContainerMonitor();
Thread t = new Thread(mon);
t.start();
String[] args = {
"--jar",
distShellTest.APPMASTER_JAR,
"1",
"--shell_command",
distShellTest.getSleepCommand(15),
"--placement_spec",
"zk=1,NOTIN,NODE,zk:spark=1,NOTIN,NODE,zk"
};
LOG.info("Initializing DS Client");
final Client client =
new Client(new Configuration(distShellTest.yarnCluster.getConfig()));
boolean initSuccess = client.init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running DS Client");
boolean result = client.run();
LOG.info("Client run completed. Result=" + result);
t.interrupt();
ConcurrentMap<ApplicationId, RMApp> apps = distShellTest.yarnCluster.
getResourceManager().getRMContext().getRMApps();
RMApp app = apps.values().iterator().next();
RMAppAttempt appAttempt = app.getAppAttempts().values().iterator().next();
NodeId masterNodeId = appAttempt.getMasterContainer().getNodeId();
NodeManager nm1 = distShellTest.yarnCluster.getNodeManager(0);
int expectedNM1Count = 1;
int expectedNM2Count = 1;
if (nm1.getNMContext().getNodeId().equals(masterNodeId)) {
expectedNM1Count++;
} else {
expectedNM2Count++;
}
int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
Assert.assertEquals(expectedNM1Count, maxRunningContainersOnNMs[0]);
Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]);
}
/** /**
* Monitor containers running on NMs * Monitor containers running on NMs
*/ */

View File

@ -179,7 +179,8 @@ private void setupInternal(int numNodeManager, float timelineVersion)
true); true);
conf.setBoolean( conf.setBoolean(
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
// ATS version specific settings // ATS version specific settings
if (timelineVersion == 1.0f) { if (timelineVersion == 1.0f) {
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
@ -782,7 +783,7 @@ private boolean checkIPs(String hostname, String localIP, String appIP)
} }
private String getSleepCommand(int sec) { protected String getSleepCommand(int sec) {
// Windows doesn't have a sleep command, ping -n does the trick // Windows doesn't have a sleep command, ping -n does the trick
return Shell.WINDOWS ? "ping -n " + (sec + 1) + " 127.0.0.1 >nul" return Shell.WINDOWS ? "ping -n " + (sec + 1) + " 127.0.0.1 >nul"
: "sleep " + sec; : "sleep " + sec;