diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index 29680e5d87..f3cbf63c9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; @@ -86,8 +87,8 @@ public interface RMContainer extends EventHandler, ContainerReport createContainerReport(); boolean isAMContainer(); - - List getResourceRequests(); + + ContainerRequest getContainerRequest(); String getNodeHttpAddress(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index a43459cfbb..e26689e98f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -178,7 +179,7 @@ RMContainerEventType.RESERVED, new ContainerReservedTransition()) private long finishTime; private ContainerStatus finishedStatus; private boolean isAMContainer; - private List resourceRequests; + private ContainerRequest containerRequestForRecovery; // Only used for container resource increase and decrease. This is the // resource to rollback to should container resource increase token expires. @@ -233,7 +234,6 @@ public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, this.eventHandler = rmContext.getDispatcher().getEventHandler(); this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer(); this.isAMContainer = false; - this.resourceRequests = null; this.nodeLabelExpression = nodeLabelExpression; this.lastConfirmedResource = container.getResource(); this.isExternallyAllocated = isExternallyAllocated; @@ -412,21 +412,21 @@ public ContainerState getContainerState() { readLock.unlock(); } } - + @Override - public List getResourceRequests() { + public ContainerRequest getContainerRequest() { try { readLock.lock(); - return resourceRequests; + return containerRequestForRecovery; } finally { readLock.unlock(); } } - - public void setResourceRequests(List requests) { + + public void setContainerRequest(ContainerRequest request) { + writeLock.lock(); try { - writeLock.lock(); - this.resourceRequests = requests; + this.containerRequestForRecovery = request; } finally { writeLock.unlock(); } @@ -576,7 +576,7 @@ private static final class AcquiredTransition extends BaseTransition { public void transition(RMContainerImpl container, RMContainerEvent event) { // Clear ResourceRequest stored in RMContainer, we don't need to remember // this anymore. - container.setResourceRequests(null); + container.setContainerRequest(null); // Register with containerAllocationExpirer. container.containerAllocationExpirer.register( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 4749c3d899..d94efb1150 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; @@ -600,10 +601,10 @@ private RMContainer recoverAndCreateContainer(NMContainerStatus status, * @param rmContainer rmContainer */ private void recoverResourceRequestForContainer(RMContainer rmContainer) { - List requests = rmContainer.getResourceRequests(); + ContainerRequest containerRequest = rmContainer.getContainerRequest(); // If container state is moved to ACQUIRED, request will be empty. - if (requests == null) { + if (containerRequest == null) { return; } @@ -618,7 +619,7 @@ private void recoverResourceRequestForContainer(RMContainer rmContainer) { SchedulerApplicationAttempt schedulerAttempt = getCurrentAttemptForContainer(rmContainer.getContainerId()); if (schedulerAttempt != null) { - schedulerAttempt.recoverResourceRequestsForContainer(requests); + schedulerAttempt.recoverResourceRequestsForContainer(containerRequest); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 9f49880201..e47f0c1d61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -45,10 +45,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PendingAskUpdateResult; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -220,16 +221,14 @@ boolean addRequestToAppPlacement( } // Update AppPlacementAllocator - ResourceRequestUpdateResult pendingAmountChanges = + PendingAskUpdateResult pendingAmountChanges = schedulerKeyToAppPlacementAllocator.get(schedulerRequestKey) - .updateResourceRequests( - entry.getValue().values(), + .updatePendingAsk(entry.getValue().values(), recoverPreemptedRequestForAContainer); if (null != pendingAmountChanges) { updatePendingResources( - pendingAmountChanges.getLastAnyResourceRequest(), - pendingAmountChanges.getNewResourceRequest(), schedulerRequestKey, + pendingAmountChanges, schedulerRequestKey, queue.getMetrics()); offswitchResourcesUpdated = true; } @@ -237,12 +236,17 @@ boolean addRequestToAppPlacement( return offswitchResourcesUpdated; } - private void updatePendingResources(ResourceRequest lastRequest, - ResourceRequest request, SchedulerRequestKey schedulerKey, - QueueMetrics metrics) { + private void updatePendingResources(PendingAskUpdateResult updateResult, + SchedulerRequestKey schedulerKey, QueueMetrics metrics) { + + PendingAsk lastPendingAsk = updateResult.getLastPendingAsk(); + PendingAsk newPendingAsk = updateResult.getNewPendingAsk(); + String lastNodePartition = updateResult.getLastNodePartition(); + String newNodePartition = updateResult.getNewNodePartition(); + int lastRequestContainers = - (lastRequest != null) ? lastRequest.getNumContainers() : 0; - if (request.getNumContainers() <= 0) { + (lastPendingAsk != null) ? lastPendingAsk.getCount() : 0; + if (newPendingAsk.getCount() <= 0) { if (lastRequestContainers >= 0) { schedulerKeys.remove(schedulerKey); schedulerKeyToAppPlacementAllocator.remove(schedulerKey); @@ -258,31 +262,23 @@ private void updatePendingResources(ResourceRequest lastRequest, } } - Resource lastRequestCapability = - lastRequest != null ? lastRequest.getCapability() : Resources.none(); - metrics.incrPendingResources(request.getNodeLabelExpression(), user, - request.getNumContainers(), request.getCapability()); - - if(lastRequest != null) { - metrics.decrPendingResources(lastRequest.getNodeLabelExpression(), user, - lastRequestContainers, lastRequestCapability); + if (lastPendingAsk != null) { + // Deduct resources from metrics / pending resources of queue/app. + metrics.decrPendingResources(lastNodePartition, user, + lastPendingAsk.getCount(), lastPendingAsk.getPerAllocationResource()); + Resource decreasedResource = Resources.multiply( + lastPendingAsk.getPerAllocationResource(), lastRequestContainers); + queue.decPendingResource(lastNodePartition, decreasedResource); + appResourceUsage.decPending(lastNodePartition, decreasedResource); } - // update queue: - Resource increasedResource = - Resources.multiply(request.getCapability(), request.getNumContainers()); - queue.incPendingResource(request.getNodeLabelExpression(), - increasedResource); - appResourceUsage.incPending(request.getNodeLabelExpression(), - increasedResource); - if (lastRequest != null) { - Resource decreasedResource = - Resources.multiply(lastRequestCapability, lastRequestContainers); - queue.decPendingResource(lastRequest.getNodeLabelExpression(), - decreasedResource); - appResourceUsage.decPending(lastRequest.getNodeLabelExpression(), - decreasedResource); - } + // Increase resources to metrics / pending resources of queue/app. + metrics.incrPendingResources(newNodePartition, user, + newPendingAsk.getCount(), newPendingAsk.getPerAllocationResource()); + Resource increasedResource = Resources.multiply( + newPendingAsk.getPerAllocationResource(), newPendingAsk.getCount()); + queue.incPendingResource(newNodePartition, increasedResource); + appResourceUsage.incPending(newNodePartition, increasedResource); } public void addRequestedPartition(String partition) { @@ -417,7 +413,7 @@ public boolean isPlaceBlacklisted(String resourceName, } } - public List allocate(NodeType type, + public ContainerRequest allocate(NodeType type, SchedulerNode node, SchedulerRequestKey schedulerKey, Container containerAllocated) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java index 93995a1dcf..f410db12f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer .RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; @@ -155,16 +156,16 @@ private void cancelPreviousRequest(SchedulerNode schedulerNode, AppPlacementAllocator appPlacementAllocator = appSchedulingInfo.getAppPlacementAllocator(schedulerKey); if (appPlacementAllocator != null) { - Map resourceRequests = appPlacementAllocator - .getResourceRequests(); - ResourceRequest prevReq = resourceRequests.get(ResourceRequest.ANY); + PendingAsk pendingAsk = appPlacementAllocator.getPendingAsk( + ResourceRequest.ANY); // Decrement the pending using a dummy RR with // resource = prev update req capability - if (prevReq != null) { + if (pendingAsk != null && pendingAsk.getCount() > 0) { appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode, schedulerKey, Container.newInstance(UNDEFINED, schedulerNode.getNodeID(), "host:port", - prevReq.getCapability(), schedulerKey.getPriority(), null)); + pendingAsk.getPerAllocationResource(), + schedulerKey.getPriority(), null)); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 65673c9f05..dfb0e67fcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; @@ -449,11 +450,12 @@ public boolean updateResourceRequests( } public void recoverResourceRequestsForContainer( - List requests) { + ContainerRequest containerRequest) { try { writeLock.lock(); if (!isStopped) { - appSchedulingInfo.updateResourceRequests(requests, true); + appSchedulingInfo.updateResourceRequests( + containerRequest.getResourceRequests(), true); } } finally { writeLock.unlock(); @@ -913,7 +915,7 @@ private List pullNewlyUpdatedContainers( RMContainer c = tempIter.next(); // Mark container for release (set RRs to null, so RM does not think // it is a recoverable container) - ((RMContainerImpl) c).setResourceRequests(null); + ((RMContainerImpl) c).setContainerRequest(null); // Release this container async-ly so as to prevent // 'LeafQueue::completedContainer()' from trying to acquire a lock @@ -1383,13 +1385,6 @@ public AppPlacementAllocator getAppPlacementAllocat SchedulerRequestKey schedulerRequestKey) { return appSchedulingInfo.getAppPlacementAllocator(schedulerRequestKey); } - - public Map getResourceRequests( - SchedulerRequestKey schedulerRequestKey) { - return appSchedulingInfo.getAppPlacementAllocator(schedulerRequestKey) - .getResourceRequests(); - } - public void incUnconfirmedRes(Resource res) { unconfirmedAllocatedMem.addAndGet(res.getMemorySize()); unconfirmedAllocatedVcores.addAndGet(res.getVirtualCores()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 69e90c68a4..264253221e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -299,7 +299,8 @@ private boolean canAssign(SchedulerRequestKey schedulerKey, } // If we have only ANY requests for this schedulerKey, we should not // delay its scheduling. - if (application.getResourceRequests(schedulerKey).size() == 1) { + if (application.getAppPlacementAllocator(schedulerKey) + .getUniqueLocationAsks() == 1) { return true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java new file mode 100644 index 0000000000..075db796a4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; + +import org.apache.hadoop.yarn.api.records.ResourceRequest; + +import java.util.List; + +/** + * ContainerRequest is a class to capture resource requests associated with a + * Container, this will be used by scheduler to recover resource requests if the + * container preempted or cancelled before AM acquire the container. + * + * It should include deducted resource requests when the container allocated. + * + * Lifecycle of the ContainerRequest is: + * + *
+ * 1) It is instantiated when container created.
+ * 2) It will be set to ContainerImpl by scheduler.
+ * 3) When container preempted or cancelled because of whatever reason before
+ *    container acquired by AM. ContainerRequest will be added back to pending
+ *    request pool.
+ * 4) It will be cleared from ContainerImpl if the container already acquired by
+ *    AM.
+ * 
+ */ +public class ContainerRequest { + private List requests; + + public ContainerRequest(List requests) { + this.requests = requests; + } + + public List getResourceRequests() { + return requests; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 93d51d8af3..34594cf409 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; @@ -369,7 +370,7 @@ private boolean commonCheckContainerAllocation( public boolean accept(Resource cluster, ResourceCommitRequest request) { - List resourceRequests = null; + ContainerRequest containerRequest = null; boolean reReservation = false; try { @@ -397,8 +398,8 @@ public boolean accept(Resource cluster, if (schedulerContainer.isAllocated()) { // When allocate a new container - resourceRequests = - schedulerContainer.getRmContainer().getResourceRequests(); + containerRequest = + schedulerContainer.getRmContainer().getContainerRequest(); // Check pending resource request if (!appSchedulingInfo.checkAllocation(allocation.getAllocationLocalityType(), @@ -471,8 +472,8 @@ public boolean accept(Resource cluster, } // When rejected, recover resource requests for this app - if (!accepted && resourceRequests != null) { - recoverResourceRequestsForContainer(resourceRequests); + if (!accepted && containerRequest != null) { + recoverResourceRequestsForContainer(containerRequest); } return accepted; @@ -524,12 +525,12 @@ public void apply(Resource cluster, liveContainers.put(containerId, rmContainer); // Deduct pending resource requests - List requests = appSchedulingInfo.allocate( + ContainerRequest containerRequest = appSchedulingInfo.allocate( allocation.getAllocationLocalityType(), schedulerContainer.getSchedulerNode(), schedulerContainer.getSchedulerRequestKey(), schedulerContainer.getRmContainer().getContainer()); - ((RMContainerImpl) rmContainer).setResourceRequests(requests); + ((RMContainerImpl) rmContainer).setContainerRequest(containerRequest); attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(), allocation.getAllocatedOrReservedResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 43daace76e..e095a422ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -460,13 +461,13 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node, liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations - List resourceRequestList = appSchedulingInfo.allocate( + ContainerRequest containerRequest = appSchedulingInfo.allocate( type, node, schedulerKey, container); this.attemptResourceUsage.incUsed(container.getResource()); getQueue().incUsedResource(container.getResource()); // Update resource requests related to "request" and store in RMContainer - ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); + ((RMContainerImpl) rmContainer).setContainerRequest(containerRequest); // Inform the container rmContainer.handle( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java index d932e0e089..169b98a82e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -80,14 +81,14 @@ public RMContainer allocate(NodeType type, FiCaSchedulerNode node, liveContainers.put(containerId, rmContainer); // Update consumption and track allocations - List resourceRequestList = appSchedulingInfo.allocate( + ContainerRequest containerRequest = appSchedulingInfo.allocate( type, node, schedulerKey, container); attemptResourceUsage.incUsed(node.getPartition(), container.getResource()); // Update resource requests related to "request" and store in RMContainer - ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); + ((RMContainerImpl) rmContainer).setContainerRequest(containerRequest); // Inform the container rmContainer.handle( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java index 63b22a3be1..dcb38aa005 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; @@ -57,14 +58,14 @@ public interface AppPlacementAllocator { Iterator getPreferredNodeIterator(CandidateNodeSet candidateNodeSet); /** - * Replace existing ResourceRequest by the new requests + * Replace existing pending asks by the new requests * - * @param requests new ResourceRequests + * @param requests new asks * @param recoverPreemptedRequestForAContainer if we're recovering resource * requests for preempted container * @return true if total pending resource changed */ - ResourceRequestUpdateResult updateResourceRequests( + PendingAskUpdateResult updatePendingAsk( Collection requests, boolean recoverPreemptedRequestForAContainer); @@ -97,17 +98,13 @@ ResourceRequestUpdateResult updateResourceRequests( * @param schedulerKey SchedulerRequestKey for this ResourceRequest * @param type Type of the allocation * @param node Which node this container allocated on - * @return list of ResourceRequests deducted + * @return ContainerRequest which include resource requests associated with + * the container. This will be used by scheduler to recover requests. + * Please refer to {@link ContainerRequest} for more details. */ - List allocate(SchedulerRequestKey schedulerKey, + ContainerRequest allocate(SchedulerRequestKey schedulerKey, NodeType type, SchedulerNode node); - /** - * Returns list of accepted resourceNames. - * @return Iterator of accepted resourceNames - */ - Iterator getAcceptedResouceNames(); - /** * We can still have pending requirement for a given NodeType and node * @param type Locality Type diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index 7f89435336..766827ca30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; @@ -44,7 +45,7 @@ * into account locality preferences (node, rack, any) when allocating * containers. */ -public class LocalityAppPlacementAllocator +public class LocalityAppPlacementAllocator implements AppPlacementAllocator { private static final Log LOG = LogFactory.getLog(LocalityAppPlacementAllocator.class); @@ -122,13 +123,13 @@ private void updateNodeLabels(ResourceRequest request) { } @Override - public ResourceRequestUpdateResult updateResourceRequests( + public PendingAskUpdateResult updatePendingAsk( Collection requests, boolean recoverPreemptedRequestForAContainer) { try { this.writeLock.lock(); - ResourceRequestUpdateResult updateResult = null; + PendingAskUpdateResult updateResult = null; // Update resource requests for (ResourceRequest request : requests) { @@ -156,7 +157,16 @@ public ResourceRequestUpdateResult updateResourceRequests( //update the applications requested labels set appSchedulingInfo.addRequestedPartition(partition); - updateResult = new ResourceRequestUpdateResult(lastRequest, request); + PendingAsk lastPendingAsk = + lastRequest == null ? null : new PendingAsk( + lastRequest.getCapability(), lastRequest.getNumContainers()); + String lastRequestedNodePartition = + lastRequest == null ? null : lastRequest.getNodeLabelExpression(); + + updateResult = new PendingAskUpdateResult(lastPendingAsk, + new PendingAsk(request.getCapability(), + request.getNumContainers()), lastRequestedNodePartition, + request.getNodeLabelExpression()); } } return updateResult; @@ -380,7 +390,7 @@ public void showRequests() { } @Override - public List allocate(SchedulerRequestKey schedulerKey, + public ContainerRequest allocate(SchedulerRequestKey schedulerKey, NodeType type, SchedulerNode node) { try { writeLock.lock(); @@ -404,19 +414,9 @@ public List allocate(SchedulerRequestKey schedulerKey, allocateOffSwitch(schedulerKey, request, resourceRequests); } - return resourceRequests; + return new ContainerRequest(resourceRequests); } finally { writeLock.unlock(); } } - - @Override - public Iterator getAcceptedResouceNames() { - try { - readLock.lock(); - return resourceRequestMap.keySet().iterator(); - } finally { - readLock.unlock(); - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PendingAskUpdateResult.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PendingAskUpdateResult.java new file mode 100644 index 0000000000..8765e865d3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PendingAskUpdateResult.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; + +/** + * Result of a resource-request update. This will be used by + * {@link org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo} + * to update queue metrics and application/queue's overall pending resources. + * And this is per-scheduler-key. + * + * Following fields will be set if pending ask changed for a given scheduler key + * - lastPendingAsk: how many resource asked before. + * - newPendingAsk: how many resource asked now. + * - lastNodePartition: what's the node partition before. + * - newNodePartition: what's the node partition now. + */ +public class PendingAskUpdateResult { + private final PendingAsk lastPendingAsk; + private final String lastNodePartition; + private final PendingAsk newPendingAsk; + private final String newNodePartition; + + public PendingAskUpdateResult(PendingAsk lastPendingAsk, + PendingAsk newPendingAsk, String lastNodePartition, + String newNodePartition) { + this.lastPendingAsk = lastPendingAsk; + this.newPendingAsk = newPendingAsk; + this.lastNodePartition = lastNodePartition; + this.newNodePartition = newNodePartition; + } + + public PendingAsk getLastPendingAsk() { + return lastPendingAsk; + } + + public PendingAsk getNewPendingAsk() { + return newPendingAsk; + } + + public String getLastNodePartition() { + return lastNodePartition; + } + + public String getNewNodePartition() { + return newNodePartition; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java deleted file mode 100644 index da356f5f54..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; - -import org.apache.hadoop.yarn.api.records.ResourceRequest; - -/** - * Result of ResourceRequest update - */ -public class ResourceRequestUpdateResult { - private final ResourceRequest lastAnyResourceRequest; - private final ResourceRequest newResourceRequest; - - public ResourceRequestUpdateResult(ResourceRequest lastAnyResourceRequest, - ResourceRequest newResourceRequest) { - this.lastAnyResourceRequest = lastAnyResourceRequest; - this.newResourceRequest = newResourceRequest; - } - - public ResourceRequest getLastAnyResourceRequest() { - return lastAnyResourceRequest; - } - - public ResourceRequest getNewResourceRequest() { - return newResourceRequest; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index e889de09f4..fbde6813d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -431,7 +431,7 @@ private void updateResourceRequests(Map requests, if (type == NodeType.NODE_LOCAL) { for (String host : task.getHosts()) { if(LOG.isDebugEnabled()) { - LOG.debug("updateResourceRequests:" + " application=" + applicationId + LOG.debug("updatePendingAsk:" + " application=" + applicationId + " type=" + type + " host=" + host + " request=" + ((requests == null) ? "null" : requests.get(host))); } @@ -442,7 +442,7 @@ private void updateResourceRequests(Map requests, if (type == NodeType.NODE_LOCAL || type == NodeType.RACK_LOCAL) { for (String rack : task.getRacks()) { if(LOG.isDebugEnabled()) { - LOG.debug("updateResourceRequests:" + " application=" + applicationId + LOG.debug("updatePendingAsk:" + " application=" + applicationId + " type=" + type + " rack=" + rack + " request=" + ((requests == null) ? "null" : requests.get(rack))); } @@ -453,7 +453,7 @@ private void updateResourceRequests(Map requests, updateResourceRequest(requests.get(ResourceRequest.ANY)); if(LOG.isDebugEnabled()) { - LOG.debug("updateResourceRequests:" + " application=" + applicationId + LOG.debug("updatePendingAsk:" + " application=" + applicationId + " #asks=" + ask.size()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index db3144898f..6c189b335d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -278,8 +278,8 @@ public void testExistenceOfResourceRequestInRMContainer() throws Exception { // Verify whether list of ResourceRequest is present in RMContainer // while moving to ALLOCATED state - Assert.assertNotNull(scheduler.getRMContainer(containerId2) - .getResourceRequests()); + Assert.assertNotNull( + scheduler.getRMContainer(containerId2).getContainerRequest()); // Allocate container am1.allocate(new ArrayList(), new ArrayList()) @@ -288,8 +288,8 @@ public void testExistenceOfResourceRequestInRMContainer() throws Exception { // After RMContainer moving to ACQUIRED state, list of ResourceRequest will // be empty - Assert.assertNull(scheduler.getRMContainer(containerId2) - .getResourceRequests()); + Assert.assertNull( + scheduler.getRMContainer(containerId2).getContainerRequest()); } @Test (timeout = 180000) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 1edb0dab0b..e91f7341a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -1696,7 +1696,8 @@ public void testRecoverRequestAfterPreemption() throws Exception { rm1.waitForState(nm1, containerId1, RMContainerState.ALLOCATED); RMContainer rmContainer = cs.getRMContainer(containerId1); - List requests = rmContainer.getResourceRequests(); + List requests = + rmContainer.getContainerRequest().getResourceRequests(); FiCaSchedulerApp app = cs.getApplicationAttempt(am1 .getApplicationAttemptId());