From 19c743c1bbcaf3df8f1d63e557143c960a538c42 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Fri, 26 Aug 2016 16:48:00 -0700 Subject: [PATCH] YARN-4889. Changes in AMRMClient for identifying resource-requests explicitly. (Arun Suresh via wangda) --- .../yarn/api/records/ResourceRequest.java | 19 +- .../hadoop/yarn/client/api/AMRMClient.java | 150 +++++++++++-- .../client/api/async/AMRMClientAsync.java | 18 ++ .../yarn/client/api/impl/AMRMClientImpl.java | 161 +++++++++----- .../client/api/impl/RemoteRequestsTable.java | 11 +- .../yarn/client/api/impl/TestAMRMClient.java | 200 +++++++++++++++--- .../impl/TestAMRMClientContainerRequest.java | 7 +- .../api/impl/TestDistributedScheduling.java | 24 ++- .../yarn/client/api/impl/TestNMClient.java | 6 +- 9 files changed, 476 insertions(+), 120 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index 07f132cd2e..2d6f0f460d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -111,6 +111,10 @@ public int compare(ResourceRequest r1, ResourceRequest r2) { // Compare priority, host and capability int ret = r1.getPriority().compareTo(r2.getPriority()); + if (ret == 0) { + ret = Long.compare( + r1.getAllocationRequestId(), r2.getAllocationRequestId()); + } if (ret == 0) { String h1 = r1.getResourceName(); String h2 = r2.getResourceName(); @@ -381,6 +385,7 @@ public int hashCode() { result = prime * result + ((hostName == null) ? 0 : hostName.hashCode()); result = prime * result + getNumContainers(); result = prime * result + ((priority == null) ? 0 : priority.hashCode()); + result = prime * result + Long.valueOf(getAllocationRequestId()).hashCode(); return result; } @@ -422,6 +427,11 @@ public boolean equals(Object obj) { .equals(other.getExecutionTypeRequest().getExecutionType())) { return false; } + + if (getAllocationRequestId() != other.getAllocationRequestId()) { + return false; + } + if (getNodeLabelExpression() == null) { if (other.getNodeLabelExpression() != null) { return false; @@ -452,7 +462,14 @@ public int compareTo(ResourceRequest other) { int capabilityComparison = this.getCapability().compareTo(other.getCapability()); if (capabilityComparison == 0) { - return this.getNumContainers() - other.getNumContainers(); + int numContainerComparison = + this.getNumContainers() - other.getNumContainers(); + if (numContainerComparison == 0) { + return Long.compare(getAllocationRequestId(), + other.getAllocationRequestId()); + } else { + return numContainerComparison; + } } else { return capabilityComparison; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 79d587ac1a..2990c05130 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -110,6 +110,7 @@ public static class ContainerRequest { final List nodes; final List racks; final Priority priority; + final long allocationRequestId; final boolean relaxLocality; final String nodeLabelsExpression; final ExecutionTypeRequest executionTypeRequest; @@ -134,6 +135,31 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority) { this(capability, nodes, racks, priority, true, null); } + + /** + * Instantiates a {@link ContainerRequest} with the given constraints and + * locality relaxation enabled. + * + * @param capability + * The {@link Resource} to be requested for each container. + * @param nodes + * Any hosts to request that the containers are placed on. + * @param racks + * Any racks to request that the containers are placed on. The + * racks corresponding to any hosts requested will be automatically + * added to this list. + * @param priority + * The priority at which to request the containers. Higher + * priorities have lower numerical values. + * @param allocationRequestId Allocation Request Id + */ + @Public + @InterfaceStability.Evolving + public ContainerRequest(Resource capability, String[] nodes, + String[] racks, Priority priority, long allocationRequestId) { + this(capability, nodes, racks, priority, allocationRequestId, true, null, + ExecutionTypeRequest.newInstance()); + } /** * Instantiates a {@link ContainerRequest} with the given constraints. @@ -175,20 +201,20 @@ public ContainerRequest(Resource capability, String[] nodes, * @param relaxLocality * If true, containers for this request may be assigned on hosts * and racks other than the ones explicitly requested. - * @param nodeLabelsExpression - * Set node labels to allocate resource, now we only support - * asking for only a single node label + * @param allocationRequestId Allocation Request Id */ - public ContainerRequest(Resource capability, String[] nodes, String[] racks, - Priority priority, boolean relaxLocality, String nodeLabelsExpression) { - this(capability, nodes, racks, priority, relaxLocality, - nodeLabelsExpression, - ExecutionTypeRequest.newInstance()); + @Public + @InterfaceStability.Evolving + public ContainerRequest(Resource capability, String[] nodes, + String[] racks, Priority priority, long allocationRequestId, + boolean relaxLocality) { + this(capability, nodes, racks, priority, allocationRequestId, + relaxLocality, null, ExecutionTypeRequest.newInstance()); } - + /** * Instantiates a {@link ContainerRequest} with the given constraints. - * + * * @param capability * The {@link Resource} to be requested for each container. * @param nodes @@ -206,11 +232,79 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks, * @param nodeLabelsExpression * Set node labels to allocate resource, now we only support * asking for only a single node label + */ + public ContainerRequest(Resource capability, String[] nodes, String[] racks, + Priority priority, boolean relaxLocality, String nodeLabelsExpression) { + this(capability, nodes, racks, priority, 0, relaxLocality, + nodeLabelsExpression, + ExecutionTypeRequest.newInstance()); + } + + /** + * Instantiates a {@link ContainerRequest} with the given constraints. + * + * @param capability + * The {@link Resource} to be requested for each container. + * @param nodes + * Any hosts to request that the containers are placed on. + * @param racks + * Any racks to request that the containers are placed on. The + * racks corresponding to any hosts requested will be automatically + * added to this list. + * @param priority + * The priority at which to request the containers. Higher + * priorities have lower numerical values. + * @param allocationRequestId + * The allocationRequestId of the request. To be used as a tracking + * id to match Containers allocated against this request. Will + * default to 0 if not specified. + * @param relaxLocality + * If true, containers for this request may be assigned on hosts + * and racks other than the ones explicitly requested. + * @param nodeLabelsExpression + * Set node labels to allocate resource, now we only support + * asking for only a single node label + */ + @Public + @InterfaceStability.Evolving + public ContainerRequest(Resource capability, String[] nodes, String[] racks, + Priority priority, long allocationRequestId, boolean relaxLocality, + String nodeLabelsExpression) { + this(capability, nodes, racks, priority, allocationRequestId, + relaxLocality, nodeLabelsExpression, + ExecutionTypeRequest.newInstance()); + } + + /** + * Instantiates a {@link ContainerRequest} with the given constraints. + * + * @param capability + * The {@link Resource} to be requested for each container. + * @param nodes + * Any hosts to request that the containers are placed on. + * @param racks + * Any racks to request that the containers are placed on. The + * racks corresponding to any hosts requested will be automatically + * added to this list. + * @param priority + * The priority at which to request the containers. Higher + * priorities have lower numerical values. + * @param allocationRequestId + * The allocationRequestId of the request. To be used as a tracking + * id to match Containers allocated against this request. Will + * default to 0 if not specified. + * @param relaxLocality + * If true, containers for this request may be assigned on hosts + * and racks other than the ones explicitly requested. + * @param nodeLabelsExpression + * Set node labels to allocate resource, now we only support + * asking for only a single node label * @param executionTypeRequest * Set the execution type of the container request. */ public ContainerRequest(Resource capability, String[] nodes, String[] racks, - Priority priority, boolean relaxLocality, String nodeLabelsExpression, + Priority priority, long allocationRequestId, boolean relaxLocality, + String nodeLabelsExpression, ExecutionTypeRequest executionTypeRequest) { // Validate request Preconditions.checkArgument(capability != null, @@ -223,6 +317,7 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks, && (nodes == null || nodes.length == 0)), "Can't turn off locality relaxation on a " + "request with no location constraints"); + this.allocationRequestId = allocationRequestId; this.capability = capability; this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null); this.racks = (racks != null ? ImmutableList.copyOf(racks) : null); @@ -247,6 +342,10 @@ public List getRacks() { public Priority getPriority() { return priority; } + + public long getAllocationRequestId() { + return allocationRequestId; + } public boolean getRelaxLocality() { return relaxLocality; @@ -264,6 +363,7 @@ public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Capability[").append(capability).append("]"); sb.append("Priority[").append(priority).append("]"); + sb.append("AllocationRequestId[").append(allocationRequestId).append("]"); sb.append("ExecutionTypeRequest[").append(executionTypeRequest) .append("]"); return sb.toString(); @@ -390,6 +490,10 @@ public abstract void requestContainerResourceChange( * Each collection in the list contains requests with identical * Resource size that fit in the given capability. In a * collection, requests will be returned in the same order as they were added. + * + * NOTE: This API only matches Container requests that were created by the + * client WITHOUT the allocationRequestId being set. + * * @return Collection of request matching the parameters */ @InterfaceStability.Evolving @@ -407,7 +511,11 @@ public abstract List> getMatchingRequests( * Each collection in the list contains requests with identical * Resource size that fit in the given capability. In a * collection, requests will be returned in the same order as they were added. - * specify an ExecutionType . + * specify an ExecutionType. + * + * NOTE: This API only matches Container requests that were created by the + * client WITHOUT the allocationRequestId being set. + * * @param priority Priority * @param resourceName Location * @param executionType ExecutionType @@ -421,7 +529,23 @@ public List> getMatchingRequests( throw new UnsupportedOperationException("The sub-class extending" + " AMRMClient is expected to implement this !!"); } - + + /** + * Get outstanding ContainerRequests matching the given + * allocationRequestId. These ContainerRequests should have been added via + * addContainerRequest earlier in the lifecycle. For performance, + * the AMRMClient may return its internal collection directly without creating + * a copy. Users should not perform mutable operations on the return value. + * + * NOTE: This API only matches Container requests that were created by the + * client WITH the allocationRequestId being set to a non-default value. + * + * @param allocationRequestId Allocation Request Id + * @return Collection of request matching the parameters + */ + @InterfaceStability.Evolving + public abstract Collection getMatchingRequests(long allocationRequestId); + /** * Update application's blacklist with addition or removal resources. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index d06fadc8b2..10d2a2f638 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -202,6 +202,10 @@ public abstract List> getMatchingRequests( /** * Returns all matching ContainerRequests that match the given Priority, * ResourceName, ExecutionType and Capability. + * + * NOTE: This matches only requests that were made by the client WITHOUT the + * allocationRequestId specified. + * * @param priority Priority. * @param resourceName Location. * @param executionType ExecutionType. @@ -214,6 +218,20 @@ public List> getMatchingRequests( return client.getMatchingRequests(priority, resourceName, executionType, capability); } + + /** + * Returns all matching ContainerRequests that match the given + * AllocationRequestId. + * + * NOTE: This matches only requests that were made by the client WITH the + * allocationRequestId specified. + * + * @param allocationRequestId AllocationRequestId. + * @return All matching ContainerRequests + */ + public Collection getMatchingRequests(long allocationRequestId) { + return client.getMatchingRequests(allocationRequestId); + } /** * Registers this application master with the resource manager. On successful diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 4145944328..60834f615f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -108,10 +108,11 @@ static class ResourceRequestInfo { ResourceRequest remoteRequest; LinkedHashSet containerRequests; - ResourceRequestInfo(Priority priority, String resourceName, - Resource capability, boolean relaxLocality) { + ResourceRequestInfo(Long allocationRequestId, Priority priority, + String resourceName, Resource capability, boolean relaxLocality) { remoteRequest = ResourceRequest.newInstance(priority, resourceName, capability, 0); + remoteRequest.setAllocationRequestId(allocationRequestId); remoteRequest.setRelaxLocality(relaxLocality); containerRequests = new LinkedHashSet(); } @@ -154,7 +155,8 @@ static boolean canFit(Resource arg0, Resource arg1) { return (mem0 <= mem1 && cpu0 <= cpu1); } - final RemoteRequestsTable remoteRequestsTable = new RemoteRequestsTable(); + private final Map> remoteRequests = + new HashMap<>(); protected final Set ask = new TreeSet( new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); @@ -263,10 +265,12 @@ public AllocateResponse allocate(float progressIndicator) for(ResourceRequest r : ask) { // create a copy of ResourceRequest as we might change it while the // RPC layer is using it to send info across - askList.add(ResourceRequest.newInstance(r.getPriority(), + ResourceRequest rr = ResourceRequest.newInstance(r.getPriority(), r.getResourceName(), r.getCapability(), r.getNumContainers(), r.getRelaxLocality(), r.getNodeLabelExpression(), - r.getExecutionTypeRequest())); + r.getExecutionTypeRequest()); + rr.setAllocationRequestId(r.getAllocationRequestId()); + askList.add(rr); } List increaseList = new ArrayList<>(); List decreaseList = new ArrayList<>(); @@ -318,11 +322,14 @@ public AllocateResponse allocate(float progressIndicator) synchronized (this) { release.addAll(this.pendingRelease); blacklistAdditions.addAll(this.blacklistedNodes); - @SuppressWarnings("unchecked") - Iterator> reqIter = - remoteRequestsTable.iterator(); - while (reqIter.hasNext()) { - addResourceRequestToAsk(reqIter.next().remoteRequest); + for (RemoteRequestsTable remoteRequestsTable : + remoteRequests.values()) { + @SuppressWarnings("unchecked") + Iterator> reqIter = + remoteRequestsTable.iterator(); + while (reqIter.hasNext()) { + addResourceRequestToAsk(reqIter.next().remoteRequest); + } } change.putAll(this.pendingChange); } @@ -498,15 +505,16 @@ public synchronized void addContainerRequest(T req) { // check that specific and non-specific requests cannot be mixed within a // priority - checkLocalityRelaxationConflict(req.getPriority(), ANY_LIST, - req.getRelaxLocality()); + checkLocalityRelaxationConflict(req.getAllocationRequestId(), + req.getPriority(), ANY_LIST, req.getRelaxLocality()); // check that specific rack cannot be mixed with specific node within a // priority. If node and its rack are both specified then they must be // in the same request. // For explicitly requested racks, we set locality relaxation to true - checkLocalityRelaxationConflict(req.getPriority(), dedupedRacks, true); - checkLocalityRelaxationConflict(req.getPriority(), inferredRacks, - req.getRelaxLocality()); + checkLocalityRelaxationConflict(req.getAllocationRequestId(), + req.getPriority(), dedupedRacks, true); + checkLocalityRelaxationConflict(req.getAllocationRequestId(), + req.getPriority(), inferredRacks, req.getRelaxLocality()); // check if the node label expression specified is valid checkNodeLabelExpression(req); @@ -607,6 +615,24 @@ public synchronized int getClusterNodeCount() { return clusterNodeCount; } + + @Override + @SuppressWarnings("unchecked") + public Collection getMatchingRequests(long allocationRequestId) { + RemoteRequestsTable remoteRequestsTable = getTable(allocationRequestId); + LinkedHashSet list = new LinkedHashSet<>(); + + if (remoteRequestsTable != null) { + Iterator> reqIter = + remoteRequestsTable.iterator(); + while (reqIter.hasNext()) { + ResourceRequestInfo resReqInfo = reqIter.next(); + list.addAll(resReqInfo.containerRequests); + } + } + return list; + } + @Override public synchronized List> getMatchingRequests( Priority priority, @@ -617,6 +643,7 @@ public synchronized List> getMatchingRequests( } @Override + @SuppressWarnings("unchecked") public synchronized List> getMatchingRequests( Priority priority, String resourceName, ExecutionType executionType, Resource capability) { @@ -626,9 +653,9 @@ public synchronized List> getMatchingRequests( "The priority at which to request containers should not be null "); List> list = new LinkedList>(); - @SuppressWarnings("unchecked") + RemoteRequestsTable remoteRequestsTable = getTable(0); List> matchingRequests = - this.remoteRequestsTable.getMatchingRequests(priority, resourceName, + remoteRequestsTable.getMatchingRequests(priority, resourceName, executionType, capability); // If no exact match. Container may be larger than what was requested. // get all resources <= capability. map is reverse sorted. @@ -664,23 +691,26 @@ private Set resolveRacks(List nodes) { * ContainerRequests with locality relaxation cannot be made at the same * priority as ContainerRequests without locality relaxation. */ - private void checkLocalityRelaxationConflict(Priority priority, - Collection locations, boolean relaxLocality) { + private void checkLocalityRelaxationConflict(Long allocationReqId, + Priority priority, Collection locations, boolean relaxLocality) { // Locality relaxation will be set to relaxLocality for all implicitly // requested racks. Make sure that existing rack requests match this. - @SuppressWarnings("unchecked") - List allCapabilityMaps = - remoteRequestsTable.getAllResourceRequestInfos(priority, locations); - for (ResourceRequestInfo reqs : allCapabilityMaps) { - ResourceRequest remoteRequest = reqs.remoteRequest; - boolean existingRelaxLocality = remoteRequest.getRelaxLocality(); - if (relaxLocality != existingRelaxLocality) { - throw new InvalidContainerRequestException("Cannot submit a " - + "ContainerRequest asking for location " - + remoteRequest.getResourceName() + " with locality relaxation " - + relaxLocality + " when it has already been requested" - + "with locality relaxation " + existingRelaxLocality); + RemoteRequestsTable remoteRequestsTable = getTable(allocationReqId); + if (remoteRequestsTable != null) { + @SuppressWarnings("unchecked") + List allCapabilityMaps = + remoteRequestsTable.getAllResourceRequestInfos(priority, locations); + for (ResourceRequestInfo reqs : allCapabilityMaps) { + ResourceRequest remoteRequest = reqs.remoteRequest; + boolean existingRelaxLocality = remoteRequest.getRelaxLocality(); + if (relaxLocality != existingRelaxLocality) { + throw new InvalidContainerRequestException("Cannot submit a " + + "ContainerRequest asking for location " + + remoteRequest.getResourceName() + " with locality relaxation " + + relaxLocality + " when it has already been requested" + + "with locality relaxation " + existingRelaxLocality); + } } } } @@ -742,10 +772,17 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) { private void addResourceRequest(Priority priority, String resourceName, ExecutionTypeRequest execTypeReq, Resource capability, T req, boolean relaxLocality, String labelExpression) { + RemoteRequestsTable remoteRequestsTable = + getTable(req.getAllocationRequestId()); + if (remoteRequestsTable == null) { + remoteRequestsTable = new RemoteRequestsTable(); + putTable(req.getAllocationRequestId(), remoteRequestsTable); + } @SuppressWarnings("unchecked") ResourceRequestInfo resourceRequestInfo = remoteRequestsTable - .addResourceRequest(priority, resourceName, - execTypeReq, capability, req, relaxLocality, labelExpression); + .addResourceRequest(req.getAllocationRequestId(), priority, + resourceName, execTypeReq, capability, req, relaxLocality, + labelExpression); // Note this down for next interaction with ResourceManager addResourceRequestToAsk(resourceRequestInfo.remoteRequest); @@ -761,29 +798,37 @@ private void addResourceRequest(Priority priority, String resourceName, private void decResourceRequest(Priority priority, String resourceName, ExecutionTypeRequest execTypeReq, Resource capability, T req) { - @SuppressWarnings("unchecked") - ResourceRequestInfo resourceRequestInfo = - remoteRequestsTable.decResourceRequest(priority, resourceName, - execTypeReq, capability, req); - // send the ResourceRequest to RM even if is 0 because it needs to override - // a previously sent value. If ResourceRequest was not sent previously then - // sending 0 aught to be a no-op on RM - if (resourceRequestInfo != null) { - addResourceRequestToAsk(resourceRequestInfo.remoteRequest); + RemoteRequestsTable remoteRequestsTable = + getTable(req.getAllocationRequestId()); + if (remoteRequestsTable != null) { + @SuppressWarnings("unchecked") + ResourceRequestInfo resourceRequestInfo = + remoteRequestsTable.decResourceRequest(priority, resourceName, + execTypeReq, capability, req); + // send the ResourceRequest to RM even if is 0 because it needs to + // override a previously sent value. If ResourceRequest was not sent + // previously then sending 0 ought to be a no-op on RM + if (resourceRequestInfo != null) { + addResourceRequestToAsk(resourceRequestInfo.remoteRequest); - // delete entry from map if no longer needed - if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) { - this.remoteRequestsTable.remove(priority, resourceName, - execTypeReq.getExecutionType(), capability); - } + // delete entry from map if no longer needed + if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) { + remoteRequestsTable.remove(priority, resourceName, + execTypeReq.getExecutionType(), capability); + } - if (LOG.isDebugEnabled()) { - LOG.debug("AFTER decResourceRequest:" + " applicationId=" - + " priority=" + priority.getPriority() - + " resourceName=" + resourceName + " numContainers=" - + resourceRequestInfo.remoteRequest.getNumContainers() - + " #asks=" + ask.size()); + if (LOG.isDebugEnabled()) { + LOG.debug("AFTER decResourceRequest:" + + " allocationRequestId=" + req.getAllocationRequestId() + + " priority=" + priority.getPriority() + + " resourceName=" + resourceName + " numContainers=" + + resourceRequestInfo.remoteRequest.getNumContainers() + + " #asks=" + ask.size()); + } } + } else { + LOG.info("No remoteRequestTable found with allocationRequestId=" + + req.getAllocationRequestId()); } } @@ -829,4 +874,14 @@ private void updateAMRMToken(Token token) throws IOException { currentUGI.addToken(amrmToken); amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig())); } + + @VisibleForTesting + RemoteRequestsTable getTable(long allocationRequestId) { + return remoteRequests.get(Long.valueOf(allocationRequestId)); + } + + RemoteRequestsTable putTable(long allocationRequestId, + RemoteRequestsTable table) { + return remoteRequests.put(Long.valueOf(allocationRequestId), table); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java index 853a512558..110ca79943 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java @@ -264,15 +264,16 @@ List getMatchingRequests( } @SuppressWarnings("unchecked") - ResourceRequestInfo addResourceRequest(Priority priority, String resourceName, - ExecutionTypeRequest execTypeReq, Resource capability, T req, - boolean relaxLocality, String labelExpression) { + ResourceRequestInfo addResourceRequest(Long allocationRequestId, + Priority priority, String resourceName, ExecutionTypeRequest execTypeReq, + Resource capability, T req, boolean relaxLocality, + String labelExpression) { ResourceRequestInfo resourceRequestInfo = get(priority, resourceName, execTypeReq.getExecutionType(), capability); if (resourceRequestInfo == null) { resourceRequestInfo = - new ResourceRequestInfo(priority, resourceName, capability, - relaxLocality); + new ResourceRequestInfo(allocationRequestId, priority, resourceName, + capability, relaxLocality); put(priority, resourceName, execTypeReq.getExecutionType(), capability, resourceRequestInfo); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 1eeeb78455..e0ad2c4bab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -33,6 +33,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -268,6 +269,18 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException { amClient.addContainerRequest(storedContainer5); amClient.addContainerRequest(storedContainer6); amClient.addContainerRequest(storedContainer7); + + // Add some CRs with allocReqIds... These will not be returned by + // the default getMatchingRequests + ContainerRequest storedContainer11 = + new ContainerRequest(capability1, nodes, racks, priority, 1); + ContainerRequest storedContainer33 = + new ContainerRequest(capability3, nodes, racks, priority, 3); + ContainerRequest storedContainer43 = + new ContainerRequest(capability4, nodes, racks, priority, 3); + amClient.addContainerRequest(storedContainer11); + amClient.addContainerRequest(storedContainer33); + amClient.addContainerRequest(storedContainer43); // test matching of containers List> matches; @@ -279,6 +292,25 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException { storedRequest = matches.get(0).iterator().next(); assertEquals(storedContainer1, storedRequest); amClient.removeContainerRequest(storedContainer1); + + // exact match for allocReqId 1 + Collection reqIdMatches = + amClient.getMatchingRequests(1); + assertEquals(1, reqIdMatches.size()); + storedRequest = reqIdMatches.iterator().next(); + assertEquals(storedContainer11, storedRequest); + amClient.removeContainerRequest(storedContainer11); + + // exact match for allocReqId 3 + reqIdMatches = amClient.getMatchingRequests(3); + assertEquals(2, reqIdMatches.size()); + Iterator iter = reqIdMatches.iterator(); + storedRequest = iter.next(); + assertEquals(storedContainer43, storedRequest); + amClient.removeContainerRequest(storedContainer43); + storedRequest = iter.next(); + assertEquals(storedContainer33, storedRequest); + amClient.removeContainerRequest(storedContainer33); // exact matching with order maintained Resource testCapability2 = Resource.newInstance(2000, 1); @@ -364,26 +396,32 @@ public void testAMRMClientMatchingFitExecType() ContainerRequest storedGuarContainer2 = new ContainerRequest(capability2, nodes, racks, priority); ContainerRequest storedOpportContainer1 = - new ContainerRequest(capability1, nodes, racks, priority, true, null, + new ContainerRequest(capability1, nodes, racks, priority, + 0, true, null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); ContainerRequest storedOpportContainer2 = - new ContainerRequest(capability2, nodes, racks, priority, true, null, + new ContainerRequest(capability2, nodes, racks, priority, + 0, true, null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); ContainerRequest storedOpportContainer3 = - new ContainerRequest(capability3, nodes, racks, priority, true, null, + new ContainerRequest(capability3, nodes, racks, priority, + 0, true, null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); ContainerRequest storedOpportContainer4 = - new ContainerRequest(capability4, nodes, racks, priority, true, null, + new ContainerRequest(capability4, nodes, racks, priority, + 0, true, null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); ContainerRequest storedOpportContainer5 = - new ContainerRequest(capability5, nodes, racks, priority, true, null, + new ContainerRequest(capability5, nodes, racks, priority, + 0, true, null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); ContainerRequest storedOpportContainer6 = - new ContainerRequest(capability6, nodes, racks, priority, true, null, + new ContainerRequest(capability6, nodes, racks, priority, + 0, true, null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); ContainerRequest storedOpportContainer7 = new ContainerRequest(capability7, nodes, racks, priority2, - false, null, + 0, false, null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); amClient.addContainerRequest(storedGuarContainer1); amClient.addContainerRequest(storedGuarContainer2); @@ -541,11 +579,13 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { amClient.addContainerRequest(storedContainer3); // test addition and storage - int containersRequestedAny = amClient.remoteRequestsTable.get(priority, + RemoteRequestsTable remoteRequestsTable = + amClient.getTable(0); + int containersRequestedAny = remoteRequestsTable.get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) .remoteRequest.getNumContainers(); assertEquals(2, containersRequestedAny); - containersRequestedAny = amClient.remoteRequestsTable.get(priority1, + containersRequestedAny = remoteRequestsTable.get(priority1, ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) .remoteRequest.getNumContainers(); assertEquals(1, containersRequestedAny); @@ -584,7 +624,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability); assertTrue(matches.isEmpty()); // 0 requests left. everything got cleaned up - assertTrue(amClient.remoteRequestsTable.isEmpty()); + assertTrue(amClient.getTable(0).isEmpty()); // go through an exemplary allocation, matching and release cycle amClient.addContainerRequest(storedContainer1); @@ -628,7 +668,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { assertEquals(0, amClient.ask.size()); assertEquals(0, allocResponse.getAllocatedContainers().size()); // 0 requests left. everything got cleaned up - assertTrue(amClient.remoteRequestsTable.isEmpty()); + assertTrue(remoteRequestsTable.isEmpty()); amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); @@ -780,6 +820,16 @@ private int getAllocatedContainersNumber( @Test (timeout=60000) public void testAMRMClient() throws YarnException, IOException { + initAMRMClientAndTest(false); + } + + @Test (timeout=60000) + public void testAMRMClientAllocReqId() throws YarnException, IOException { + initAMRMClientAndTest(true); + } + + private void initAMRMClientAndTest(boolean useAllocReqId) + throws YarnException, IOException { AMRMClient amClient = null; try { // start am rm client @@ -796,7 +846,11 @@ public void testAMRMClient() throws YarnException, IOException { amClient.registerApplicationMaster("Host", 10000, ""); - testAllocation((AMRMClientImpl)amClient); + if (useAllocReqId) { + testAllocRequestId((AMRMClientImpl)amClient); + } else { + testAllocation((AMRMClientImpl) amClient); + } amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); @@ -1055,22 +1109,9 @@ private void testAllocation(final AMRMClientImpl amClient) new ContainerRequest(capability, nodes, racks, priority)); amClient.removeContainerRequest( new ContainerRequest(capability, nodes, racks, priority)); - - int containersRequestedNode = amClient.remoteRequestsTable.get(priority, - node, ExecutionType.GUARANTEED, capability).remoteRequest - .getNumContainers(); - int containersRequestedRack = amClient.remoteRequestsTable.get(priority, - rack, ExecutionType.GUARANTEED, capability).remoteRequest - .getNumContainers(); - int containersRequestedAny = amClient.remoteRequestsTable.get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) - .remoteRequest.getNumContainers(); - assertEquals(2, containersRequestedNode); - assertEquals(2, containersRequestedRack); - assertEquals(2, containersRequestedAny); - assertEquals(3, amClient.ask.size()); - assertEquals(0, amClient.release.size()); + assertNumContainers(amClient, 0, 2, 2, 2, 3, 0); + int containersRequestedAny = 2; // RM should allocate container within 2 calls to allocate() int allocatedContainerCount = 0; @@ -1163,10 +1204,15 @@ public AllocateResponse answer(InvocationOnMock invocation) // verify that the remove request made in between makeRequest and allocate // has not been lost assertEquals(0, snoopRequest.getNumContainers()); - - iterationsLeft = 3; + + waitForContainerCompletion(3, amClient, releases); + } + + private void waitForContainerCompletion(int numIterations, + AMRMClientImpl amClient, Set releases) + throws YarnException, IOException { // do a few iterations to ensure RM is not going send new containers - while(!releases.isEmpty() || iterationsLeft-- > 0) { + while(!releases.isEmpty() || numIterations-- > 0) { // inform RM of rejection AllocateResponse allocResponse = amClient.allocate(0.1f); // RM did not send new containers because AM does not need any @@ -1181,7 +1227,7 @@ public AllocateResponse answer(InvocationOnMock invocation) } } } - if(iterationsLeft > 0) { + if(numIterations > 0) { // sleep to make sure NM's heartbeat sleep(100); } @@ -1190,6 +1236,98 @@ public AllocateResponse answer(InvocationOnMock invocation) assertEquals(0, amClient.release.size()); } + private void testAllocRequestId( + final AMRMClientImpl amClient) throws YarnException, + IOException { + // setup container request + + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + amClient.addContainerRequest( + new ContainerRequest(capability, nodes, racks, priority)); + amClient.addContainerRequest( + new ContainerRequest(capability, nodes, racks, priority)); + amClient.addContainerRequest( + new ContainerRequest(capability, nodes, racks, priority, 1)); + amClient.addContainerRequest( + new ContainerRequest(capability, nodes, racks, priority, 1)); + amClient.addContainerRequest( + new ContainerRequest(capability, nodes, racks, priority, 2)); + amClient.addContainerRequest( + new ContainerRequest(capability, nodes, racks, priority, 2)); + amClient.removeContainerRequest( + new ContainerRequest(capability, nodes, racks, priority)); + amClient.removeContainerRequest( + new ContainerRequest(capability, nodes, racks, priority, 1)); + amClient.removeContainerRequest( + new ContainerRequest(capability, nodes, racks, priority, 2)); + + assertNumContainers(amClient, 0, 1, 1, 1, 9, 0); + assertNumContainers(amClient, 1, 1, 1, 1, 9, 0); + assertNumContainers(amClient, 2, 1, 1, 1, 9, 0); + int containersRequestedAny = 3; + + // RM should allocate container within 2 calls to allocate() + List allocatedContainers = new ArrayList<>(); + int iterationsLeft = 5; + Set releases = new TreeSet(); + + while (allocatedContainers.size() < containersRequestedAny + && iterationsLeft-- > 0) { + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + for(Container container : allocResponse.getAllocatedContainers()) { + ContainerId rejectContainerId = container.getId(); + releases.add(rejectContainerId); + amClient.releaseAssignedContainer(rejectContainerId); + } + + if(allocatedContainers.size() < containersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(100); + } + } + + assertEquals(containersRequestedAny, allocatedContainers.size()); + Set expAllocIds = new HashSet<>( + Arrays.asList(Long.valueOf(0), Long.valueOf(1), Long.valueOf(2))); + Set actAllocIds = new HashSet<>(); + for (Container ac : allocatedContainers) { + actAllocIds.add(Long.valueOf(ac.getAllocationRequestId())); + } + assertEquals(expAllocIds, actAllocIds); + assertEquals(3, amClient.release.size()); + assertEquals(0, amClient.ask.size()); + + waitForContainerCompletion(3, amClient, releases); + } + + private void assertNumContainers(AMRMClientImpl amClient, + long allocationReqId, int expNode, int expRack, int expAny, + int expAsks, int expRelease) { + RemoteRequestsTable remoteRequestsTable = + amClient.getTable(allocationReqId); + int containersRequestedNode = remoteRequestsTable.get(priority, + node, ExecutionType.GUARANTEED, capability).remoteRequest + .getNumContainers(); + int containersRequestedRack = remoteRequestsTable.get(priority, + rack, ExecutionType.GUARANTEED, capability).remoteRequest + .getNumContainers(); + int containersRequestedAny = remoteRequestsTable.get(priority, + ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + .remoteRequest.getNumContainers(); + + assertEquals(expNode, containersRequestedNode); + assertEquals(expRack, containersRequestedRack); + assertEquals(expAny, containersRequestedAny); + assertEquals(expAsks, amClient.ask.size()); + assertEquals(expRelease, amClient.release.size()); + } + class CountDownSupplier implements Supplier { int counter = 0; @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java index 2db33c1885..ad18da3c53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java @@ -61,7 +61,7 @@ public void testOpportunisticAndGuaranteedRequests() { verifyResourceRequest(client, request, ResourceRequest.ANY, true); ContainerRequest request2 = new ContainerRequest(capability, new String[] {"host1", "host2"}, - new String[] {"/rack2"}, Priority.newInstance(1), true, null, + new String[] {"/rack2"}, Priority.newInstance(1), 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true)); client.addContainerRequest(request2); @@ -274,8 +274,9 @@ private void verifyResourceRequest( AMRMClientImpl client, ContainerRequest request, String location, boolean expectedRelaxLocality, ExecutionType executionType) { - ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority(), - location, executionType, request.getCapability()).remoteRequest; + ResourceRequest ask = client.getTable(0) + .get(request.getPriority(), location, executionType, + request.getCapability()).remoteRequest; assertEquals(location, ask.getResourceName()); assertEquals(1, ask.getNumContainers()); assertEquals(expectedRelaxLocality, ask.getRelaxLocality()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java index 40390d40ad..4cfc4ebc35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java @@ -366,12 +366,12 @@ public void testAMRMClient() throws Exception { new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); amClient.addContainerRequest( new AMRMClient.ContainerRequest(capability, null, null, priority2, - true, null, + 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); amClient.addContainerRequest( new AMRMClient.ContainerRequest(capability, null, null, priority2, - true, null, + 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); @@ -381,21 +381,23 @@ public void testAMRMClient() throws Exception { new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); amClient.removeContainerRequest( new AMRMClient.ContainerRequest(capability, null, null, priority2, - true, null, + 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); - int containersRequestedNode = amClient.remoteRequestsTable.get(priority, + RemoteRequestsTable remoteRequestsTable = + amClient.getTable(0); + int containersRequestedNode = remoteRequestsTable.get(priority, node, ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); - int containersRequestedRack = amClient.remoteRequestsTable.get(priority, + int containersRequestedRack = remoteRequestsTable.get(priority, rack, ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); - int containersRequestedAny = amClient.remoteRequestsTable.get(priority, + int containersRequestedAny = remoteRequestsTable.get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) .remoteRequest.getNumContainers(); int oppContainersRequestedAny = - amClient.remoteRequestsTable.get(priority2, ResourceRequest.ANY, + remoteRequestsTable.get(priority2, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC, capability).remoteRequest .getNumContainers(); @@ -457,7 +459,7 @@ public void testAMRMClient() throws Exception { new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); amClient.removeContainerRequest( new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, - true, null, + 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); assertEquals(4, amClient.ask.size()); @@ -469,7 +471,7 @@ public void testAMRMClient() throws Exception { nodes, racks, priority)); amClient.addContainerRequest( new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, - true, null, + 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); @@ -490,7 +492,7 @@ public AllocateResponse answer(InvocationOnMock invocation) priority)); amc.removeContainerRequest( new AMRMClient.ContainerRequest(capability, null, null, - priority2, true, null, + priority2, 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); throw new Exception(); @@ -571,7 +573,7 @@ public void testAMOpportunistic() throws Exception { ExecutionTypeRequest execTypeRequest = ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true); ContainerRequest containerRequest = new AMRMClient.ContainerRequest( - capability, nodes, racks, priority, true, null, execTypeRequest); + capability, nodes, racks, priority, 0, true, null, execTypeRequest); amClient.addContainerRequest(containerRequest); // Wait until the container is allocated diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 969fb70f44..3640883fac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -252,9 +252,9 @@ private Set allocateContainers( racks, priority)); } - int containersRequestedAny = rmClient.remoteRequestsTable.get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) - .remoteRequest.getNumContainers(); + int containersRequestedAny = rmClient.getTable(0) + .get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED, + capability).remoteRequest.getNumContainers(); // RM should allocate container within 2 calls to allocate() int allocatedContainerCount = 0;