diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java index f553a44b23..e3ee8b416b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java @@ -31,7 +31,8 @@ */ @Public @Evolving -public abstract class ExecutionTypeRequest { +public abstract class ExecutionTypeRequest + implements Comparable { @Public @Evolving @@ -39,6 +40,12 @@ public static ExecutionTypeRequest newInstance() { return newInstance(ExecutionType.GUARANTEED, false); } + @Public + @Evolving + public static ExecutionTypeRequest newInstance(ExecutionType execType) { + return newInstance(execType, false); + } + @Public @Evolving public static ExecutionTypeRequest newInstance(ExecutionType execType, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index 50a76198e9..07f132cd2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -116,6 +116,10 @@ public int compare(ResourceRequest r1, ResourceRequest r2) { String h2 = r2.getResourceName(); ret = h1.compareTo(h2); } + if (ret == 0) { + ret = r1.getExecutionTypeRequest() + .compareTo(r2.getExecutionTypeRequest()); + } if (ret == 0) { ret = r1.getCapability().compareTo(r2.getCapability()); } @@ -414,7 +418,8 @@ public boolean equals(Object obj) { if (other.getExecutionTypeRequest() != null) { return false; } - } else if (!execTypeRequest.equals(other.getExecutionTypeRequest())) { + } else if (!execTypeRequest.getExecutionType() + .equals(other.getExecutionTypeRequest().getExecutionType())) { return false; } if (getNodeLabelExpression() == null) { @@ -441,12 +446,18 @@ public int compareTo(ResourceRequest other) { int hostNameComparison = this.getResourceName().compareTo(other.getResourceName()); if (hostNameComparison == 0) { - int capabilityComparison = - this.getCapability().compareTo(other.getCapability()); - if (capabilityComparison == 0) { - return this.getNumContainers() - other.getNumContainers(); + int execTypeReqComparison = this.getExecutionTypeRequest() + .compareTo(other.getExecutionTypeRequest()); + if (execTypeReqComparison == 0) { + int capabilityComparison = + this.getCapability().compareTo(other.getCapability()); + if (capabilityComparison == 0) { + return this.getNumContainers() - other.getNumContainers(); + } else { + return capabilityComparison; + } } else { - return capabilityComparison; + return execTypeReqComparison; } } else { return hostNameComparison; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 99bfca51f8..57cdbfb58c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -39,7 +39,6 @@ import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; @@ -62,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.NMToken; @@ -120,7 +120,7 @@ public class TestAMRMClient { static String[] nodes; static String[] racks; private final static int DEFAULT_ITERATION = 3; - + @BeforeClass public static void setup() throws Exception { // start minicluster @@ -335,6 +335,133 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException { } } } + + /** + * Test fit of both GUARANTEED and OPPORTUNISTIC containers. + */ + @Test (timeout=60000) + public void testAMRMClientMatchingFitExecType() + throws YarnException, IOException { + AMRMClient amClient = null; + try { + // start am rm client + amClient = AMRMClient.createAMRMClient(); + amClient.init(conf); + amClient.start(); + amClient.registerApplicationMaster("Host", 10000, ""); + + Resource capability1 = Resource.newInstance(1024, 2); + Resource capability2 = Resource.newInstance(1024, 1); + Resource capability3 = Resource.newInstance(1000, 2); + Resource capability4 = Resource.newInstance(1000, 2); + Resource capability5 = Resource.newInstance(2000, 2); + Resource capability6 = Resource.newInstance(2000, 3); + Resource capability7 = Resource.newInstance(6000, 3); + + // Add 2 GUARANTEED and 7 OPPORTUNISTIC requests. + ContainerRequest storedGuarContainer1 = + new ContainerRequest(capability1, nodes, racks, priority); + ContainerRequest storedGuarContainer2 = + new ContainerRequest(capability2, nodes, racks, priority); + ContainerRequest storedOpportContainer1 = + new ContainerRequest(capability1, nodes, racks, priority, true, null, + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); + ContainerRequest storedOpportContainer2 = + new ContainerRequest(capability2, nodes, racks, priority, true, null, + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); + ContainerRequest storedOpportContainer3 = + new ContainerRequest(capability3, nodes, racks, priority, true, null, + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); + ContainerRequest storedOpportContainer4 = + new ContainerRequest(capability4, nodes, racks, priority, true, null, + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); + ContainerRequest storedOpportContainer5 = + new ContainerRequest(capability5, nodes, racks, priority, true, null, + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); + ContainerRequest storedOpportContainer6 = + new ContainerRequest(capability6, nodes, racks, priority, true, null, + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); + ContainerRequest storedOpportContainer7 = + new ContainerRequest(capability7, nodes, racks, priority2, + false, null, + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); + amClient.addContainerRequest(storedGuarContainer1); + amClient.addContainerRequest(storedGuarContainer2); + amClient.addContainerRequest(storedOpportContainer1); + amClient.addContainerRequest(storedOpportContainer2); + amClient.addContainerRequest(storedOpportContainer3); + amClient.addContainerRequest(storedOpportContainer4); + amClient.addContainerRequest(storedOpportContainer5); + amClient.addContainerRequest(storedOpportContainer6); + amClient.addContainerRequest(storedOpportContainer7); + + // Make sure 3 entries are generated in the ask list for each added + // container request of a given capability, locality, execution type and + // priority (one node-local, one rack-local, and one ANY). + assertEquals(24, + (((AMRMClientImpl) amClient).ask.size())); + + // test exact matching of GUARANTEED containers + List> matches; + ContainerRequest storedRequest; + Resource testCapability1 = Resource.newInstance(1024, 2); + matches = amClient + .getMatchingRequests(priority, node, ExecutionType.GUARANTEED, + testCapability1); + verifyMatches(matches, 1); + storedRequest = matches.get(0).iterator().next(); + assertEquals(storedGuarContainer1, storedRequest); + amClient.removeContainerRequest(storedGuarContainer1); + + // test exact matching of OPPORTUNISTIC containers + matches = amClient.getMatchingRequests(priority, node, + ExecutionType.OPPORTUNISTIC, testCapability1); + verifyMatches(matches, 1); + storedRequest = matches.get(0).iterator().next(); + assertEquals(storedOpportContainer1, storedRequest); + amClient.removeContainerRequest(storedOpportContainer1); + + // exact OPPORTUNISTIC matching with order maintained + Resource testCapability2 = Resource.newInstance(1000, 2); + matches = amClient.getMatchingRequests(priority, node, + ExecutionType.OPPORTUNISTIC, testCapability2); + verifyMatches(matches, 2); + // must be returned in the order they were made + int i = 0; + for(ContainerRequest storedRequest1 : matches.get(0)) { + if(i++ == 0) { + assertEquals(storedOpportContainer3, storedRequest1); + } else { + assertEquals(storedOpportContainer4, storedRequest1); + } + } + amClient.removeContainerRequest(storedOpportContainer3); + + // matching with larger container + Resource testCapability3 = Resource.newInstance(4000, 4); + matches = amClient.getMatchingRequests(priority, node, + ExecutionType.OPPORTUNISTIC, testCapability3); + assert(matches.size() == 4); + + // verify requests without relaxed locality are only returned at specific + // locations + Resource testCapability4 = Resource.newInstance(6000, 3); + matches = amClient.getMatchingRequests(priority2, ResourceRequest.ANY, + ExecutionType.OPPORTUNISTIC, testCapability4); + assert(matches.size() == 0); + matches = amClient.getMatchingRequests(priority2, node, + ExecutionType.OPPORTUNISTIC, testCapability4); + assert(matches.size() == 1); + + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, + null, null); + + } finally { + if (amClient != null && amClient.getServiceState() == STATE.STARTED) { + amClient.stop(); + } + } + } private void verifyMatches( List> matches, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java index 0037dd3f6b..f6a5032379 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java @@ -85,6 +85,11 @@ public boolean getEnforceExecutionType() { return p.getEnforceExecutionType(); } + @Override + public int compareTo(ExecutionTypeRequest other) { + return this.getExecutionType().compareTo(other.getExecutionType()); + } + @Override public String toString() { return "{Execution Type: " + getExecutionType()