YARN-5351. ResourceRequest should take ExecutionType into account during comparison. (Konstantinos Karanasos via asuresh)
This commit is contained in:
parent
49969b16cd
commit
2d8d183b19
@ -31,7 +31,8 @@
|
|||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Evolving
|
@Evolving
|
||||||
public abstract class ExecutionTypeRequest {
|
public abstract class ExecutionTypeRequest
|
||||||
|
implements Comparable<ExecutionTypeRequest> {
|
||||||
|
|
||||||
@Public
|
@Public
|
||||||
@Evolving
|
@Evolving
|
||||||
@ -39,6 +40,12 @@ public static ExecutionTypeRequest newInstance() {
|
|||||||
return newInstance(ExecutionType.GUARANTEED, false);
|
return newInstance(ExecutionType.GUARANTEED, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Public
|
||||||
|
@Evolving
|
||||||
|
public static ExecutionTypeRequest newInstance(ExecutionType execType) {
|
||||||
|
return newInstance(execType, false);
|
||||||
|
}
|
||||||
|
|
||||||
@Public
|
@Public
|
||||||
@Evolving
|
@Evolving
|
||||||
public static ExecutionTypeRequest newInstance(ExecutionType execType,
|
public static ExecutionTypeRequest newInstance(ExecutionType execType,
|
||||||
|
@ -116,6 +116,10 @@ public int compare(ResourceRequest r1, ResourceRequest r2) {
|
|||||||
String h2 = r2.getResourceName();
|
String h2 = r2.getResourceName();
|
||||||
ret = h1.compareTo(h2);
|
ret = h1.compareTo(h2);
|
||||||
}
|
}
|
||||||
|
if (ret == 0) {
|
||||||
|
ret = r1.getExecutionTypeRequest()
|
||||||
|
.compareTo(r2.getExecutionTypeRequest());
|
||||||
|
}
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
ret = r1.getCapability().compareTo(r2.getCapability());
|
ret = r1.getCapability().compareTo(r2.getCapability());
|
||||||
}
|
}
|
||||||
@ -414,7 +418,8 @@ public boolean equals(Object obj) {
|
|||||||
if (other.getExecutionTypeRequest() != null) {
|
if (other.getExecutionTypeRequest() != null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
} else if (!execTypeRequest.equals(other.getExecutionTypeRequest())) {
|
} else if (!execTypeRequest.getExecutionType()
|
||||||
|
.equals(other.getExecutionTypeRequest().getExecutionType())) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (getNodeLabelExpression() == null) {
|
if (getNodeLabelExpression() == null) {
|
||||||
@ -441,6 +446,9 @@ public int compareTo(ResourceRequest other) {
|
|||||||
int hostNameComparison =
|
int hostNameComparison =
|
||||||
this.getResourceName().compareTo(other.getResourceName());
|
this.getResourceName().compareTo(other.getResourceName());
|
||||||
if (hostNameComparison == 0) {
|
if (hostNameComparison == 0) {
|
||||||
|
int execTypeReqComparison = this.getExecutionTypeRequest()
|
||||||
|
.compareTo(other.getExecutionTypeRequest());
|
||||||
|
if (execTypeReqComparison == 0) {
|
||||||
int capabilityComparison =
|
int capabilityComparison =
|
||||||
this.getCapability().compareTo(other.getCapability());
|
this.getCapability().compareTo(other.getCapability());
|
||||||
if (capabilityComparison == 0) {
|
if (capabilityComparison == 0) {
|
||||||
@ -448,6 +456,9 @@ public int compareTo(ResourceRequest other) {
|
|||||||
} else {
|
} else {
|
||||||
return capabilityComparison;
|
return capabilityComparison;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
return execTypeReqComparison;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
return hostNameComparison;
|
return hostNameComparison;
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,6 @@
|
|||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
@ -62,6 +61,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||||
@ -336,6 +336,133 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test fit of both GUARANTEED and OPPORTUNISTIC containers.
|
||||||
|
*/
|
||||||
|
@Test (timeout=60000)
|
||||||
|
public void testAMRMClientMatchingFitExecType()
|
||||||
|
throws YarnException, IOException {
|
||||||
|
AMRMClient<ContainerRequest> amClient = null;
|
||||||
|
try {
|
||||||
|
// start am rm client
|
||||||
|
amClient = AMRMClient.<ContainerRequest>createAMRMClient();
|
||||||
|
amClient.init(conf);
|
||||||
|
amClient.start();
|
||||||
|
amClient.registerApplicationMaster("Host", 10000, "");
|
||||||
|
|
||||||
|
Resource capability1 = Resource.newInstance(1024, 2);
|
||||||
|
Resource capability2 = Resource.newInstance(1024, 1);
|
||||||
|
Resource capability3 = Resource.newInstance(1000, 2);
|
||||||
|
Resource capability4 = Resource.newInstance(1000, 2);
|
||||||
|
Resource capability5 = Resource.newInstance(2000, 2);
|
||||||
|
Resource capability6 = Resource.newInstance(2000, 3);
|
||||||
|
Resource capability7 = Resource.newInstance(6000, 3);
|
||||||
|
|
||||||
|
// Add 2 GUARANTEED and 7 OPPORTUNISTIC requests.
|
||||||
|
ContainerRequest storedGuarContainer1 =
|
||||||
|
new ContainerRequest(capability1, nodes, racks, priority);
|
||||||
|
ContainerRequest storedGuarContainer2 =
|
||||||
|
new ContainerRequest(capability2, nodes, racks, priority);
|
||||||
|
ContainerRequest storedOpportContainer1 =
|
||||||
|
new ContainerRequest(capability1, nodes, racks, priority, true, null,
|
||||||
|
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
|
||||||
|
ContainerRequest storedOpportContainer2 =
|
||||||
|
new ContainerRequest(capability2, nodes, racks, priority, true, null,
|
||||||
|
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
|
||||||
|
ContainerRequest storedOpportContainer3 =
|
||||||
|
new ContainerRequest(capability3, nodes, racks, priority, true, null,
|
||||||
|
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
|
||||||
|
ContainerRequest storedOpportContainer4 =
|
||||||
|
new ContainerRequest(capability4, nodes, racks, priority, true, null,
|
||||||
|
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
|
||||||
|
ContainerRequest storedOpportContainer5 =
|
||||||
|
new ContainerRequest(capability5, nodes, racks, priority, true, null,
|
||||||
|
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
|
||||||
|
ContainerRequest storedOpportContainer6 =
|
||||||
|
new ContainerRequest(capability6, nodes, racks, priority, true, null,
|
||||||
|
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
|
||||||
|
ContainerRequest storedOpportContainer7 =
|
||||||
|
new ContainerRequest(capability7, nodes, racks, priority2,
|
||||||
|
false, null,
|
||||||
|
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
|
||||||
|
amClient.addContainerRequest(storedGuarContainer1);
|
||||||
|
amClient.addContainerRequest(storedGuarContainer2);
|
||||||
|
amClient.addContainerRequest(storedOpportContainer1);
|
||||||
|
amClient.addContainerRequest(storedOpportContainer2);
|
||||||
|
amClient.addContainerRequest(storedOpportContainer3);
|
||||||
|
amClient.addContainerRequest(storedOpportContainer4);
|
||||||
|
amClient.addContainerRequest(storedOpportContainer5);
|
||||||
|
amClient.addContainerRequest(storedOpportContainer6);
|
||||||
|
amClient.addContainerRequest(storedOpportContainer7);
|
||||||
|
|
||||||
|
// Make sure 3 entries are generated in the ask list for each added
|
||||||
|
// container request of a given capability, locality, execution type and
|
||||||
|
// priority (one node-local, one rack-local, and one ANY).
|
||||||
|
assertEquals(24,
|
||||||
|
(((AMRMClientImpl<ContainerRequest>) amClient).ask.size()));
|
||||||
|
|
||||||
|
// test exact matching of GUARANTEED containers
|
||||||
|
List<? extends Collection<ContainerRequest>> matches;
|
||||||
|
ContainerRequest storedRequest;
|
||||||
|
Resource testCapability1 = Resource.newInstance(1024, 2);
|
||||||
|
matches = amClient
|
||||||
|
.getMatchingRequests(priority, node, ExecutionType.GUARANTEED,
|
||||||
|
testCapability1);
|
||||||
|
verifyMatches(matches, 1);
|
||||||
|
storedRequest = matches.get(0).iterator().next();
|
||||||
|
assertEquals(storedGuarContainer1, storedRequest);
|
||||||
|
amClient.removeContainerRequest(storedGuarContainer1);
|
||||||
|
|
||||||
|
// test exact matching of OPPORTUNISTIC containers
|
||||||
|
matches = amClient.getMatchingRequests(priority, node,
|
||||||
|
ExecutionType.OPPORTUNISTIC, testCapability1);
|
||||||
|
verifyMatches(matches, 1);
|
||||||
|
storedRequest = matches.get(0).iterator().next();
|
||||||
|
assertEquals(storedOpportContainer1, storedRequest);
|
||||||
|
amClient.removeContainerRequest(storedOpportContainer1);
|
||||||
|
|
||||||
|
// exact OPPORTUNISTIC matching with order maintained
|
||||||
|
Resource testCapability2 = Resource.newInstance(1000, 2);
|
||||||
|
matches = amClient.getMatchingRequests(priority, node,
|
||||||
|
ExecutionType.OPPORTUNISTIC, testCapability2);
|
||||||
|
verifyMatches(matches, 2);
|
||||||
|
// must be returned in the order they were made
|
||||||
|
int i = 0;
|
||||||
|
for(ContainerRequest storedRequest1 : matches.get(0)) {
|
||||||
|
if(i++ == 0) {
|
||||||
|
assertEquals(storedOpportContainer3, storedRequest1);
|
||||||
|
} else {
|
||||||
|
assertEquals(storedOpportContainer4, storedRequest1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
amClient.removeContainerRequest(storedOpportContainer3);
|
||||||
|
|
||||||
|
// matching with larger container
|
||||||
|
Resource testCapability3 = Resource.newInstance(4000, 4);
|
||||||
|
matches = amClient.getMatchingRequests(priority, node,
|
||||||
|
ExecutionType.OPPORTUNISTIC, testCapability3);
|
||||||
|
assert(matches.size() == 4);
|
||||||
|
|
||||||
|
// verify requests without relaxed locality are only returned at specific
|
||||||
|
// locations
|
||||||
|
Resource testCapability4 = Resource.newInstance(6000, 3);
|
||||||
|
matches = amClient.getMatchingRequests(priority2, ResourceRequest.ANY,
|
||||||
|
ExecutionType.OPPORTUNISTIC, testCapability4);
|
||||||
|
assert(matches.size() == 0);
|
||||||
|
matches = amClient.getMatchingRequests(priority2, node,
|
||||||
|
ExecutionType.OPPORTUNISTIC, testCapability4);
|
||||||
|
assert(matches.size() == 1);
|
||||||
|
|
||||||
|
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
|
||||||
|
null, null);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
|
||||||
|
amClient.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void verifyMatches(
|
private void verifyMatches(
|
||||||
List<? extends Collection<ContainerRequest>> matches,
|
List<? extends Collection<ContainerRequest>> matches,
|
||||||
int matchSize) {
|
int matchSize) {
|
||||||
|
@ -85,6 +85,11 @@ public boolean getEnforceExecutionType() {
|
|||||||
return p.getEnforceExecutionType();
|
return p.getEnforceExecutionType();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(ExecutionTypeRequest other) {
|
||||||
|
return this.getExecutionType().compareTo(other.getExecutionType());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "{Execution Type: " + getExecutionType()
|
return "{Execution Type: " + getExecutionType()
|
||||||
|
Loading…
Reference in New Issue
Block a user