YARN-927. Change ContainerRequest to not have more than 1 container count and remove StoreContainerRequest (bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1503960 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
865d902bd1
commit
3520039979
@ -491,6 +491,9 @@ Release 2.1.0-beta - 2013-07-02
|
||||
YARN-513. Create common proxy client for communicating with RM. (Xuan Gong
|
||||
& Jian He via bikas)
|
||||
|
||||
YARN-927. Change ContainerRequest to not have more than 1 container count
|
||||
and remove StoreContainerRequest (bikas)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-512. Log aggregation root directory check is more expensive than it
|
||||
|
@ -483,8 +483,10 @@ public boolean run() throws YarnException, IOException {
|
||||
// containers
|
||||
// Keep looping until all the containers are launched and shell script
|
||||
// executed on them ( regardless of success/failure).
|
||||
ContainerRequest containerAsk = setupContainerAskForRM(numTotalContainers);
|
||||
resourceManager.addContainerRequest(containerAsk);
|
||||
for (int i = 0; i < numTotalContainers; ++i) {
|
||||
ContainerRequest containerAsk = setupContainerAskForRM();
|
||||
resourceManager.addContainerRequest(containerAsk);
|
||||
}
|
||||
numRequestedContainers.set(numTotalContainers);
|
||||
|
||||
while (!done) {
|
||||
@ -591,8 +593,10 @@ public void onContainersCompleted(List<ContainerStatus> completedContainers) {
|
||||
numRequestedContainers.addAndGet(askCount);
|
||||
|
||||
if (askCount > 0) {
|
||||
ContainerRequest containerAsk = setupContainerAskForRM(askCount);
|
||||
resourceManager.addContainerRequest(containerAsk);
|
||||
for (int i = 0; i < askCount; ++i) {
|
||||
ContainerRequest containerAsk = setupContainerAskForRM();
|
||||
resourceManager.addContainerRequest(containerAsk);
|
||||
}
|
||||
}
|
||||
|
||||
if (numCompletedContainers.get() == numTotalContainers) {
|
||||
@ -813,7 +817,7 @@ public void run() {
|
||||
* @param numContainers Containers to ask for from RM
|
||||
* @return the setup ResourceRequest to be sent to RM
|
||||
*/
|
||||
private ContainerRequest setupContainerAskForRM(int numContainers) {
|
||||
private ContainerRequest setupContainerAskForRM() {
|
||||
// setup requirements for hosts
|
||||
// using * as any host will do for the distributed shell app
|
||||
// set the priority for the request
|
||||
@ -827,7 +831,7 @@ private ContainerRequest setupContainerAskForRM(int numContainers) {
|
||||
capability.setMemory(containerMemory);
|
||||
|
||||
ContainerRequest request = new ContainerRequest(capability, null, null,
|
||||
pri, numContainers);
|
||||
pri);
|
||||
LOG.info("Requested container ask: " + request.toString());
|
||||
return request;
|
||||
}
|
||||
|
@ -69,7 +69,7 @@ protected AMRMClient(String name) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Object to represent a container request for resources. Scheduler
|
||||
* Object to represent a single container request for resources. Scheduler
|
||||
* documentation should be consulted for the specifics of how the parameters
|
||||
* are honored.
|
||||
*
|
||||
@ -101,7 +101,6 @@ public static class ContainerRequest {
|
||||
final List<String> nodes;
|
||||
final List<String> racks;
|
||||
final Priority priority;
|
||||
final int containerCount;
|
||||
final boolean relaxLocality;
|
||||
|
||||
/**
|
||||
@ -119,12 +118,10 @@ public static class ContainerRequest {
|
||||
* @param priority
|
||||
* The priority at which to request the containers. Higher
|
||||
* priorities have lower numerical values.
|
||||
* @param containerCount
|
||||
* The number of containers to request.
|
||||
*/
|
||||
public ContainerRequest(Resource capability, String[] nodes,
|
||||
String[] racks, Priority priority, int containerCount) {
|
||||
this(capability, nodes, racks, priority, containerCount, true);
|
||||
String[] racks, Priority priority) {
|
||||
this(capability, nodes, racks, priority, true);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -141,23 +138,18 @@ public ContainerRequest(Resource capability, String[] nodes,
|
||||
* @param priority
|
||||
* The priority at which to request the containers. Higher
|
||||
* priorities have lower numerical values.
|
||||
* @param containerCount
|
||||
* The number of containers to request.
|
||||
* @param relaxLocality
|
||||
* If true, containers for this request may be assigned on hosts
|
||||
* and racks other than the ones explicitly requested.
|
||||
*/
|
||||
public ContainerRequest(Resource capability, String[] nodes,
|
||||
String[] racks, Priority priority, int containerCount,
|
||||
boolean relaxLocality) {
|
||||
String[] racks, Priority priority, boolean relaxLocality) {
|
||||
// Validate request
|
||||
Preconditions.checkArgument(capability != null,
|
||||
"The Resource to be requested for each container " +
|
||||
"should not be null ");
|
||||
Preconditions.checkArgument(priority != null,
|
||||
"The priority at which to request containers should not be null ");
|
||||
Preconditions.checkArgument(containerCount > 0,
|
||||
"The number of containers to request should larger than 0");
|
||||
Preconditions.checkArgument(
|
||||
!(!relaxLocality && (racks == null || racks.length == 0)
|
||||
&& (nodes == null || nodes.length == 0)),
|
||||
@ -167,7 +159,6 @@ public ContainerRequest(Resource capability, String[] nodes,
|
||||
this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
|
||||
this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
|
||||
this.priority = priority;
|
||||
this.containerCount = containerCount;
|
||||
this.relaxLocality = relaxLocality;
|
||||
}
|
||||
|
||||
@ -187,10 +178,6 @@ public Priority getPriority() {
|
||||
return priority;
|
||||
}
|
||||
|
||||
public int getContainerCount() {
|
||||
return containerCount;
|
||||
}
|
||||
|
||||
public boolean getRelaxLocality() {
|
||||
return relaxLocality;
|
||||
}
|
||||
@ -199,32 +186,10 @@ public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Capability[").append(capability).append("]");
|
||||
sb.append("Priority[").append(priority).append("]");
|
||||
sb.append("ContainerCount[").append(containerCount).append("]");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This creates a <code>ContainerRequest</code> for 1 container and the
|
||||
* AMRMClient stores this request internally. <code>getMatchingRequests</code>
|
||||
* can be used to retrieve these requests from AMRMClient. These requests may
|
||||
* be matched with an allocated container to determine which request to assign
|
||||
* the container to. <code>removeContainerRequest</code> must be called using
|
||||
* the same assigned <code>StoredContainerRequest</code> object so that
|
||||
* AMRMClient can remove it from its internal store.
|
||||
*/
|
||||
public static class StoredContainerRequest extends ContainerRequest {
|
||||
public StoredContainerRequest(Resource capability, String[] nodes,
|
||||
String[] racks, Priority priority) {
|
||||
super(capability, nodes, racks, priority, 1);
|
||||
}
|
||||
|
||||
public StoredContainerRequest(Resource capability, String[] nodes,
|
||||
String[] racks, Priority priority, boolean relaxLocality) {
|
||||
super(capability, nodes, racks, priority, 1, relaxLocality);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the application master. This must be called before any
|
||||
* other interaction
|
||||
@ -311,8 +276,8 @@ public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatu
|
||||
public abstract int getClusterNodeCount();
|
||||
|
||||
/**
|
||||
* Get outstanding <code>StoredContainerRequest</code>s matching the given
|
||||
* parameters. These StoredContainerRequests should have been added via
|
||||
* Get outstanding <code>ContainerRequest</code>s matching the given
|
||||
* parameters. These ContainerRequests should have been added via
|
||||
* <code>addContainerRequest</code> earlier in the lifecycle. For performance,
|
||||
* the AMRMClient may return its internal collection directly without creating
|
||||
* a copy. Users should not perform mutable operations on the return value.
|
||||
|
@ -343,26 +343,26 @@ public synchronized void addContainerRequest(T req) {
|
||||
+ joiner.join(req.getNodes()));
|
||||
}
|
||||
for (String node : dedupedNodes) {
|
||||
addResourceRequest(req.getPriority(), node, req.getCapability(),
|
||||
req.getContainerCount(), req, true);
|
||||
addResourceRequest(req.getPriority(), node, req.getCapability(), req,
|
||||
true);
|
||||
}
|
||||
}
|
||||
|
||||
for (String rack : dedupedRacks) {
|
||||
addResourceRequest(req.getPriority(), rack, req.getCapability(),
|
||||
req.getContainerCount(), req, true);
|
||||
addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
|
||||
true);
|
||||
}
|
||||
|
||||
// Ensure node requests are accompanied by requests for
|
||||
// corresponding rack
|
||||
for (String rack : inferredRacks) {
|
||||
addResourceRequest(req.getPriority(), rack, req.getCapability(),
|
||||
req.getContainerCount(), req, req.getRelaxLocality());
|
||||
addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
|
||||
req.getRelaxLocality());
|
||||
}
|
||||
|
||||
// Off-switch
|
||||
addResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(),
|
||||
req.getContainerCount(), req, req.getRelaxLocality());
|
||||
addResourceRequest(req.getPriority(), ResourceRequest.ANY,
|
||||
req.getCapability(), req, req.getRelaxLocality());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -378,18 +378,16 @@ public synchronized void removeContainerRequest(T req) {
|
||||
// Update resource requests
|
||||
if (req.getNodes() != null) {
|
||||
for (String node : new HashSet<String>(req.getNodes())) {
|
||||
decResourceRequest(req.getPriority(), node, req.getCapability(),
|
||||
req.getContainerCount(), req);
|
||||
decResourceRequest(req.getPriority(), node, req.getCapability(), req);
|
||||
}
|
||||
}
|
||||
|
||||
for (String rack : allRacks) {
|
||||
decResourceRequest(req.getPriority(), rack, req.getCapability(),
|
||||
req.getContainerCount(), req);
|
||||
decResourceRequest(req.getPriority(), rack, req.getCapability(), req);
|
||||
}
|
||||
|
||||
decResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(),
|
||||
req.getContainerCount(), req);
|
||||
decResourceRequest(req.getPriority(), ResourceRequest.ANY,
|
||||
req.getCapability(), req);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -516,7 +514,7 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
|
||||
}
|
||||
|
||||
private void addResourceRequest(Priority priority, String resourceName,
|
||||
Resource capability, int containerCount, T req, boolean relaxLocality) {
|
||||
Resource capability, T req, boolean relaxLocality) {
|
||||
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
|
||||
this.remoteRequestsTable.get(priority);
|
||||
if (remoteRequests == null) {
|
||||
@ -544,9 +542,9 @@ private void addResourceRequest(Priority priority, String resourceName,
|
||||
}
|
||||
|
||||
resourceRequestInfo.remoteRequest.setNumContainers(
|
||||
resourceRequestInfo.remoteRequest.getNumContainers() + containerCount);
|
||||
resourceRequestInfo.remoteRequest.getNumContainers() + 1);
|
||||
|
||||
if (req instanceof StoredContainerRequest && relaxLocality) {
|
||||
if (relaxLocality) {
|
||||
resourceRequestInfo.containerRequests.add(req);
|
||||
}
|
||||
|
||||
@ -565,7 +563,6 @@ private void addResourceRequest(Priority priority, String resourceName,
|
||||
private void decResourceRequest(Priority priority,
|
||||
String resourceName,
|
||||
Resource capability,
|
||||
int containerCount,
|
||||
T req) {
|
||||
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
|
||||
this.remoteRequestsTable.get(priority);
|
||||
@ -597,11 +594,9 @@ private void decResourceRequest(Priority priority,
|
||||
}
|
||||
|
||||
resourceRequestInfo.remoteRequest.setNumContainers(
|
||||
resourceRequestInfo.remoteRequest.getNumContainers() - containerCount);
|
||||
resourceRequestInfo.remoteRequest.getNumContainers() - 1);
|
||||
|
||||
if(req instanceof StoredContainerRequest) {
|
||||
resourceRequestInfo.containerRequests.remove(req);
|
||||
}
|
||||
resourceRequestInfo.containerRequests.remove(req);
|
||||
|
||||
if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
|
||||
// guard against spurious removals
|
||||
|
@ -60,7 +60,6 @@
|
||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
@ -93,6 +92,7 @@ public class TestAMRMClient {
|
||||
public static void setup() throws Exception {
|
||||
// start minicluster
|
||||
conf = new YarnConfiguration();
|
||||
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
|
||||
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
|
||||
yarnCluster.init(conf);
|
||||
yarnCluster.start();
|
||||
@ -169,10 +169,10 @@ public static void tearDown() {
|
||||
|
||||
@Test (timeout=60000)
|
||||
public void testAMRMClientMatchingFit() throws YarnException, IOException {
|
||||
AMRMClient<StoredContainerRequest> amClient = null;
|
||||
AMRMClient<ContainerRequest> amClient = null;
|
||||
try {
|
||||
// start am rm client
|
||||
amClient = AMRMClient.<StoredContainerRequest>createAMRMClient(attemptId);
|
||||
amClient = AMRMClient.<ContainerRequest>createAMRMClient(attemptId);
|
||||
amClient.init(conf);
|
||||
amClient.start();
|
||||
amClient.registerApplicationMaster("Host", 10000, "");
|
||||
@ -185,20 +185,20 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException {
|
||||
Resource capability6 = Resource.newInstance(2000, 1);
|
||||
Resource capability7 = Resource.newInstance(2000, 1);
|
||||
|
||||
StoredContainerRequest storedContainer1 =
|
||||
new StoredContainerRequest(capability1, nodes, racks, priority);
|
||||
StoredContainerRequest storedContainer2 =
|
||||
new StoredContainerRequest(capability2, nodes, racks, priority);
|
||||
StoredContainerRequest storedContainer3 =
|
||||
new StoredContainerRequest(capability3, nodes, racks, priority);
|
||||
StoredContainerRequest storedContainer4 =
|
||||
new StoredContainerRequest(capability4, nodes, racks, priority);
|
||||
StoredContainerRequest storedContainer5 =
|
||||
new StoredContainerRequest(capability5, nodes, racks, priority);
|
||||
StoredContainerRequest storedContainer6 =
|
||||
new StoredContainerRequest(capability6, nodes, racks, priority);
|
||||
StoredContainerRequest storedContainer7 =
|
||||
new StoredContainerRequest(capability7, nodes, racks, priority2, false);
|
||||
ContainerRequest storedContainer1 =
|
||||
new ContainerRequest(capability1, nodes, racks, priority);
|
||||
ContainerRequest storedContainer2 =
|
||||
new ContainerRequest(capability2, nodes, racks, priority);
|
||||
ContainerRequest storedContainer3 =
|
||||
new ContainerRequest(capability3, nodes, racks, priority);
|
||||
ContainerRequest storedContainer4 =
|
||||
new ContainerRequest(capability4, nodes, racks, priority);
|
||||
ContainerRequest storedContainer5 =
|
||||
new ContainerRequest(capability5, nodes, racks, priority);
|
||||
ContainerRequest storedContainer6 =
|
||||
new ContainerRequest(capability6, nodes, racks, priority);
|
||||
ContainerRequest storedContainer7 =
|
||||
new ContainerRequest(capability7, nodes, racks, priority2, false);
|
||||
amClient.addContainerRequest(storedContainer1);
|
||||
amClient.addContainerRequest(storedContainer2);
|
||||
amClient.addContainerRequest(storedContainer3);
|
||||
@ -208,8 +208,8 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException {
|
||||
amClient.addContainerRequest(storedContainer7);
|
||||
|
||||
// test matching of containers
|
||||
List<? extends Collection<StoredContainerRequest>> matches;
|
||||
StoredContainerRequest storedRequest;
|
||||
List<? extends Collection<ContainerRequest>> matches;
|
||||
ContainerRequest storedRequest;
|
||||
// exact match
|
||||
Resource testCapability1 = Resource.newInstance(1024, 2);
|
||||
matches = amClient.getMatchingRequests(priority, node, testCapability1);
|
||||
@ -224,7 +224,7 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException {
|
||||
verifyMatches(matches, 2);
|
||||
// must be returned in the order they were made
|
||||
int i = 0;
|
||||
for(StoredContainerRequest storedRequest1 : matches.get(0)) {
|
||||
for(ContainerRequest storedRequest1 : matches.get(0)) {
|
||||
if(i++ == 0) {
|
||||
assertTrue(storedContainer4 == storedRequest1);
|
||||
} else {
|
||||
@ -242,9 +242,9 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException {
|
||||
matches = amClient.getMatchingRequests(priority, node, testCapability4);
|
||||
assert(matches.size() == 2);
|
||||
// verify non-fitting containers are not returned and fitting ones are
|
||||
for(Collection<StoredContainerRequest> testSet : matches) {
|
||||
for(Collection<ContainerRequest> testSet : matches) {
|
||||
assertTrue(testSet.size() == 1);
|
||||
StoredContainerRequest testRequest = testSet.iterator().next();
|
||||
ContainerRequest testRequest = testSet.iterator().next();
|
||||
assertTrue(testRequest != storedContainer4);
|
||||
assertTrue(testRequest != storedContainer5);
|
||||
assert(testRequest == storedContainer2 ||
|
||||
@ -275,7 +275,7 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException {
|
||||
}
|
||||
|
||||
private void verifyMatches(
|
||||
List<? extends Collection<StoredContainerRequest>> matches,
|
||||
List<? extends Collection<ContainerRequest>> matches,
|
||||
int matchSize) {
|
||||
assertTrue(matches.size() == 1);
|
||||
assertTrue(matches.get(0).size() == matchSize);
|
||||
@ -283,23 +283,23 @@ private void verifyMatches(
|
||||
|
||||
@Test (timeout=60000)
|
||||
public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException {
|
||||
AMRMClientImpl<StoredContainerRequest> amClient = null;
|
||||
AMRMClientImpl<ContainerRequest> amClient = null;
|
||||
try {
|
||||
// start am rm client
|
||||
amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
|
||||
amClient = new AMRMClientImpl<ContainerRequest>(attemptId);
|
||||
amClient.init(conf);
|
||||
amClient.start();
|
||||
amClient.registerApplicationMaster("Host", 10000, "");
|
||||
|
||||
Resource capability = Resource.newInstance(1024, 2);
|
||||
|
||||
StoredContainerRequest storedContainer1 =
|
||||
new StoredContainerRequest(capability, nodes, null, priority);
|
||||
ContainerRequest storedContainer1 =
|
||||
new ContainerRequest(capability, nodes, null, priority);
|
||||
amClient.addContainerRequest(storedContainer1);
|
||||
|
||||
// verify matching with original node and inferred rack
|
||||
List<? extends Collection<StoredContainerRequest>> matches;
|
||||
StoredContainerRequest storedRequest;
|
||||
List<? extends Collection<ContainerRequest>> matches;
|
||||
ContainerRequest storedRequest;
|
||||
// exact match node
|
||||
matches = amClient.getMatchingRequests(priority, node, capability);
|
||||
verifyMatches(matches, 1);
|
||||
@ -326,14 +326,14 @@ public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOExce
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout=60000)
|
||||
@Test //(timeout=60000)
|
||||
public void testAMRMClientMatchStorage() throws YarnException, IOException {
|
||||
AMRMClientImpl<StoredContainerRequest> amClient = null;
|
||||
AMRMClientImpl<ContainerRequest> amClient = null;
|
||||
try {
|
||||
// start am rm client
|
||||
amClient =
|
||||
(AMRMClientImpl<StoredContainerRequest>) AMRMClient
|
||||
.<StoredContainerRequest> createAMRMClient(attemptId);
|
||||
(AMRMClientImpl<ContainerRequest>) AMRMClient
|
||||
.<ContainerRequest> createAMRMClient(attemptId);
|
||||
amClient.init(conf);
|
||||
amClient.start();
|
||||
amClient.registerApplicationMaster("Host", 10000, "");
|
||||
@ -341,12 +341,12 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException {
|
||||
Priority priority1 = Records.newRecord(Priority.class);
|
||||
priority1.setPriority(2);
|
||||
|
||||
StoredContainerRequest storedContainer1 =
|
||||
new StoredContainerRequest(capability, nodes, racks, priority);
|
||||
StoredContainerRequest storedContainer2 =
|
||||
new StoredContainerRequest(capability, nodes, racks, priority);
|
||||
StoredContainerRequest storedContainer3 =
|
||||
new StoredContainerRequest(capability, null, null, priority1);
|
||||
ContainerRequest storedContainer1 =
|
||||
new ContainerRequest(capability, nodes, racks, priority);
|
||||
ContainerRequest storedContainer2 =
|
||||
new ContainerRequest(capability, nodes, racks, priority);
|
||||
ContainerRequest storedContainer3 =
|
||||
new ContainerRequest(capability, null, null, priority1);
|
||||
amClient.addContainerRequest(storedContainer1);
|
||||
amClient.addContainerRequest(storedContainer2);
|
||||
amClient.addContainerRequest(storedContainer3);
|
||||
@ -358,7 +358,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException {
|
||||
containersRequestedAny = amClient.remoteRequestsTable.get(priority1)
|
||||
.get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
|
||||
assertTrue(containersRequestedAny == 1);
|
||||
List<? extends Collection<StoredContainerRequest>> matches =
|
||||
List<? extends Collection<ContainerRequest>> matches =
|
||||
amClient.getMatchingRequests(priority, node, capability);
|
||||
verifyMatches(matches, 2);
|
||||
matches = amClient.getMatchingRequests(priority, rack, capability);
|
||||
@ -383,7 +383,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException {
|
||||
verifyMatches(matches, 1);
|
||||
|
||||
// test matching of containers
|
||||
StoredContainerRequest storedRequest = matches.get(0).iterator().next();
|
||||
ContainerRequest storedRequest = matches.get(0).iterator().next();
|
||||
assertTrue(storedContainer1 == storedRequest);
|
||||
amClient.removeContainerRequest(storedContainer1);
|
||||
matches =
|
||||
@ -400,7 +400,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException {
|
||||
amClient.addContainerRequest(storedContainer3);
|
||||
// RM should allocate container within 2 calls to allocate()
|
||||
int allocatedContainerCount = 0;
|
||||
int iterationsLeft = 2;
|
||||
int iterationsLeft = 3;
|
||||
while (allocatedContainerCount < 2
|
||||
&& iterationsLeft-- > 0) {
|
||||
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
||||
@ -420,24 +420,23 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException {
|
||||
verifyMatches(matches, 1);
|
||||
ContainerRequest matchedRequest = matches.get(0).iterator().next();
|
||||
assertTrue(matchedRequest == expectedRequest);
|
||||
|
||||
amClient.removeContainerRequest(matchedRequest);
|
||||
// assign this container, use it and release it
|
||||
amClient.releaseAssignedContainer(container.getId());
|
||||
}
|
||||
if(allocatedContainerCount < containersRequestedAny) {
|
||||
// sleep to let NM's heartbeat to RM and trigger allocations
|
||||
sleep(1000);
|
||||
sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue(allocatedContainerCount == 2);
|
||||
assertTrue(amClient.release.size() == 2);
|
||||
assertTrue(amClient.ask.size() == 0);
|
||||
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
||||
assertTrue(amClient.release.size() == 0);
|
||||
assertTrue(amClient.ask.size() == 0);
|
||||
assertTrue(allocResponse.getAllocatedContainers().size() == 0);
|
||||
|
||||
// 0 requests left. everything got cleaned up
|
||||
assertTrue(amClient.remoteRequestsTable.isEmpty());
|
||||
|
||||
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
|
||||
null, null);
|
||||
@ -480,11 +479,17 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
|
||||
assertTrue(amClient.release.size() == 0);
|
||||
|
||||
amClient.addContainerRequest(
|
||||
new ContainerRequest(capability, nodes, racks, priority, 1));
|
||||
new ContainerRequest(capability, nodes, racks, priority));
|
||||
amClient.addContainerRequest(
|
||||
new ContainerRequest(capability, nodes, racks, priority, 3));
|
||||
new ContainerRequest(capability, nodes, racks, priority));
|
||||
amClient.addContainerRequest(
|
||||
new ContainerRequest(capability, nodes, racks, priority));
|
||||
amClient.addContainerRequest(
|
||||
new ContainerRequest(capability, nodes, racks, priority));
|
||||
amClient.removeContainerRequest(
|
||||
new ContainerRequest(capability, nodes, racks, priority, 2));
|
||||
new ContainerRequest(capability, nodes, racks, priority));
|
||||
amClient.removeContainerRequest(
|
||||
new ContainerRequest(capability, nodes, racks, priority));
|
||||
|
||||
int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
|
||||
.get(node).get(capability).remoteRequest.getNumContainers();
|
||||
@ -501,7 +506,7 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
|
||||
|
||||
// RM should allocate container within 2 calls to allocate()
|
||||
int allocatedContainerCount = 0;
|
||||
int iterationsLeft = 2;
|
||||
int iterationsLeft = 3;
|
||||
Set<ContainerId> releases = new TreeSet<ContainerId>();
|
||||
|
||||
NMTokenCache.clearCache();
|
||||
@ -532,7 +537,7 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
|
||||
|
||||
if(allocatedContainerCount < containersRequestedAny) {
|
||||
// sleep to let NM's heartbeat to RM and trigger allocations
|
||||
sleep(1000);
|
||||
sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
@ -546,7 +551,9 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
|
||||
|
||||
// need to tell the AMRMClient that we dont need these resources anymore
|
||||
amClient.removeContainerRequest(
|
||||
new ContainerRequest(capability, nodes, racks, priority, 2));
|
||||
new ContainerRequest(capability, nodes, racks, priority));
|
||||
amClient.removeContainerRequest(
|
||||
new ContainerRequest(capability, nodes, racks, priority));
|
||||
assertTrue(amClient.ask.size() == 3);
|
||||
// send 0 container count request for resources that are no longer needed
|
||||
ResourceRequest snoopRequest = amClient.ask.iterator().next();
|
||||
@ -554,7 +561,9 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
|
||||
|
||||
// test RPC exception handling
|
||||
amClient.addContainerRequest(new ContainerRequest(capability, nodes,
|
||||
racks, priority, 2));
|
||||
racks, priority));
|
||||
amClient.addContainerRequest(new ContainerRequest(capability, nodes,
|
||||
racks, priority));
|
||||
snoopRequest = amClient.ask.iterator().next();
|
||||
assertTrue(snoopRequest.getNumContainers() == 2);
|
||||
|
||||
@ -567,7 +576,9 @@ public AllocateResponse answer(InvocationOnMock invocation)
|
||||
throws Exception {
|
||||
amClient.removeContainerRequest(
|
||||
new ContainerRequest(capability, nodes,
|
||||
racks, priority, 2));
|
||||
racks, priority));
|
||||
amClient.removeContainerRequest(
|
||||
new ContainerRequest(capability, nodes, racks, priority));
|
||||
throw new Exception();
|
||||
}
|
||||
});
|
||||
@ -585,7 +596,7 @@ public AllocateResponse answer(InvocationOnMock invocation)
|
||||
// has not been lost
|
||||
assertTrue(snoopRequest.getNumContainers() == 0);
|
||||
|
||||
iterationsLeft = 2;
|
||||
iterationsLeft = 3;
|
||||
// do a few iterations to ensure RM is not going send new containers
|
||||
while(!releases.isEmpty() || iterationsLeft-- > 0) {
|
||||
// inform RM of rejection
|
||||
@ -604,7 +615,7 @@ public AllocateResponse answer(InvocationOnMock invocation)
|
||||
}
|
||||
if(iterationsLeft > 0) {
|
||||
// sleep to make sure NM's heartbeat
|
||||
sleep(1000);
|
||||
sleep(100);
|
||||
}
|
||||
}
|
||||
assertTrue(amClient.ask.size() == 0);
|
||||
|
@ -24,7 +24,6 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
@ -53,7 +52,7 @@ public void testFillInRacks() {
|
||||
Resource capability = Resource.newInstance(1024, 1);
|
||||
ContainerRequest request =
|
||||
new ContainerRequest(capability, new String[] {"host1", "host2"},
|
||||
new String[] {"/rack2"}, Priority.newInstance(1), 4);
|
||||
new String[] {"/rack2"}, Priority.newInstance(1));
|
||||
client.addContainerRequest(request);
|
||||
verifyResourceRequest(client, request, "host1", true);
|
||||
verifyResourceRequest(client, request, "host2", true);
|
||||
@ -75,7 +74,7 @@ public void testDisableLocalityRelaxation() {
|
||||
Resource capability = Resource.newInstance(1024, 1);
|
||||
ContainerRequest nodeLevelRequest =
|
||||
new ContainerRequest(capability, new String[] {"host1", "host2"},
|
||||
null, Priority.newInstance(1), 4, false);
|
||||
null, Priority.newInstance(1), false);
|
||||
client.addContainerRequest(nodeLevelRequest);
|
||||
|
||||
verifyResourceRequest(client, nodeLevelRequest, ResourceRequest.ANY, false);
|
||||
@ -87,12 +86,12 @@ public void testDisableLocalityRelaxation() {
|
||||
// same priority
|
||||
ContainerRequest nodeLevelRequest2 =
|
||||
new ContainerRequest(capability, new String[] {"host2", "host3"},
|
||||
null, Priority.newInstance(1), 4, false);
|
||||
null, Priority.newInstance(1), false);
|
||||
client.addContainerRequest(nodeLevelRequest2);
|
||||
|
||||
AMRMClient.ContainerRequest rackLevelRequest =
|
||||
new AMRMClient.ContainerRequest(capability, null,
|
||||
new String[] {"/rack3", "/rack4"}, Priority.newInstance(2), 3, false);
|
||||
new String[] {"/rack3", "/rack4"}, Priority.newInstance(2), false);
|
||||
client.addContainerRequest(rackLevelRequest);
|
||||
|
||||
verifyResourceRequest(client, rackLevelRequest, ResourceRequest.ANY, false);
|
||||
@ -103,13 +102,13 @@ public void testDisableLocalityRelaxation() {
|
||||
// same priority
|
||||
AMRMClient.ContainerRequest rackLevelRequest2 =
|
||||
new AMRMClient.ContainerRequest(capability, null,
|
||||
new String[] {"/rack4", "/rack5"}, Priority.newInstance(2), 3, false);
|
||||
new String[] {"/rack4", "/rack5"}, Priority.newInstance(2), false);
|
||||
client.addContainerRequest(rackLevelRequest2);
|
||||
|
||||
ContainerRequest bothLevelRequest =
|
||||
new ContainerRequest(capability, new String[] {"host3", "host4"},
|
||||
new String[] {"rack1", "/otherrack"},
|
||||
Priority.newInstance(3), 4, false);
|
||||
Priority.newInstance(3), false);
|
||||
client.addContainerRequest(bothLevelRequest);
|
||||
|
||||
verifyResourceRequest(client, bothLevelRequest, ResourceRequest.ANY, false);
|
||||
@ -125,7 +124,7 @@ public void testDisableLocalityRelaxation() {
|
||||
ContainerRequest bothLevelRequest2 =
|
||||
new ContainerRequest(capability, new String[] {"host4", "host5"},
|
||||
new String[] {"rack1", "/otherrack2"},
|
||||
Priority.newInstance(3), 4, false);
|
||||
Priority.newInstance(3), false);
|
||||
client.addContainerRequest(bothLevelRequest2);
|
||||
}
|
||||
|
||||
@ -142,11 +141,11 @@ public void testDifferentLocalityRelaxationSamePriority() {
|
||||
Resource capability = Resource.newInstance(1024, 1);
|
||||
ContainerRequest request1 =
|
||||
new ContainerRequest(capability, new String[] {"host1", "host2"},
|
||||
null, Priority.newInstance(1), 4, false);
|
||||
null, Priority.newInstance(1), false);
|
||||
client.addContainerRequest(request1);
|
||||
ContainerRequest request2 =
|
||||
new ContainerRequest(capability, new String[] {"host3"},
|
||||
null, Priority.newInstance(1), 4, true);
|
||||
null, Priority.newInstance(1), true);
|
||||
client.addContainerRequest(request2);
|
||||
}
|
||||
|
||||
@ -163,28 +162,28 @@ public void testInvalidValidWhenOldRemoved() {
|
||||
Resource capability = Resource.newInstance(1024, 1);
|
||||
ContainerRequest request1 =
|
||||
new ContainerRequest(capability, new String[] {"host1", "host2"},
|
||||
null, Priority.newInstance(1), 4, false);
|
||||
null, Priority.newInstance(1), false);
|
||||
client.addContainerRequest(request1);
|
||||
|
||||
client.removeContainerRequest(request1);
|
||||
|
||||
ContainerRequest request2 =
|
||||
new ContainerRequest(capability, new String[] {"host3"},
|
||||
null, Priority.newInstance(1), 4, true);
|
||||
null, Priority.newInstance(1), true);
|
||||
client.addContainerRequest(request2);
|
||||
|
||||
client.removeContainerRequest(request2);
|
||||
|
||||
ContainerRequest request3 =
|
||||
new ContainerRequest(capability, new String[] {"host1", "host2"},
|
||||
null, Priority.newInstance(1), 4, false);
|
||||
null, Priority.newInstance(1), false);
|
||||
client.addContainerRequest(request3);
|
||||
|
||||
client.removeContainerRequest(request3);
|
||||
|
||||
ContainerRequest request4 =
|
||||
new ContainerRequest(capability, null,
|
||||
new String[] {"rack1"}, Priority.newInstance(1), 4, true);
|
||||
new String[] {"rack1"}, Priority.newInstance(1), true);
|
||||
client.addContainerRequest(request4);
|
||||
|
||||
}
|
||||
@ -202,11 +201,11 @@ public void testLocalityRelaxationDifferentLevels() {
|
||||
Resource capability = Resource.newInstance(1024, 1);
|
||||
ContainerRequest request1 =
|
||||
new ContainerRequest(capability, new String[] {"host1", "host2"},
|
||||
null, Priority.newInstance(1), 4, false);
|
||||
null, Priority.newInstance(1), false);
|
||||
client.addContainerRequest(request1);
|
||||
ContainerRequest request2 =
|
||||
new ContainerRequest(capability, null,
|
||||
new String[] {"rack1"}, Priority.newInstance(1), 4, true);
|
||||
new String[] {"rack1"}, Priority.newInstance(1), true);
|
||||
client.addContainerRequest(request2);
|
||||
}
|
||||
|
||||
@ -227,7 +226,7 @@ private void verifyResourceRequest(
|
||||
ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority())
|
||||
.get(location).get(request.getCapability()).remoteRequest;
|
||||
assertEquals(location, ask.getResourceName());
|
||||
assertEquals(request.getContainerCount(), ask.getNumContainers());
|
||||
assertEquals(1, ask.getNumContainers());
|
||||
assertEquals(expectedRelaxLocality, ask.getRelaxLocality());
|
||||
}
|
||||
}
|
||||
|
@ -29,7 +29,6 @@
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
@ -52,7 +51,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.api.NMClient;
|
||||
@ -220,7 +218,7 @@ private Set<Container> allocateContainers(
|
||||
|
||||
for (int i = 0; i < num; ++i) {
|
||||
rmClient.addContainerRequest(new ContainerRequest(capability, nodes,
|
||||
racks, priority, 1));
|
||||
racks, priority));
|
||||
}
|
||||
|
||||
int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
|
||||
|
Loading…
Reference in New Issue
Block a user