diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 1bc6f239c5..106525d915 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -70,10 +70,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -957,7 +957,7 @@ public synchronized List getTransferredContainers( } @Override - protected void completedContainerInternal(RMContainer rmContainer, + protected void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { // do nothing } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a9f4a8409f..24f16b510e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1271,9 +1271,6 @@ Release 2.8.0 - UNRELEASED YARN-4538. QueueMetrics pending cores and memory metrics wrong. (Bibin A Chundatt via wangda) - YARN-4502. Fix two AM containers get allocated when AM restart. - (Vinod Kumar Vavilapalli via wangda) - Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index c68bb8e133..5df2be8402 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -50,7 +51,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -257,7 +257,7 @@ private void containerBasedPreemptOrKill(CSQueue root, // kill it rmContext.getDispatcher().getEventHandler().handle( new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.KILL_PREEMPTED_CONTAINER)); + SchedulerEventType.KILL_CONTAINER)); preempted.remove(container); } else { if (preempted.get(container) != null) { @@ -764,7 +764,7 @@ private void preemptFrom(FiCaSchedulerApp app, if (!observeOnly) { rmContext.getDispatcher().getEventHandler().handle( new ContainerPreemptEvent( - appId, c, SchedulerEventType.KILL_RESERVED_CONTAINER)); + appId, c, SchedulerEventType.DROP_RESERVATION)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 83876d0a15..96c4f2772d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; @@ -96,7 +97,7 @@ RMContainerEventType.ACQUIRED, new AcquiredTransition()) .addTransition(RMContainerState.ALLOCATED, RMContainerState.EXPIRED, RMContainerEventType.EXPIRE, new FinishedTransition()) .addTransition(RMContainerState.ALLOCATED, RMContainerState.KILLED, - RMContainerEventType.KILL, new FinishedTransition()) + RMContainerEventType.KILL, new ContainerRescheduledTransition()) // Transitions from ACQUIRED state .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, @@ -520,8 +521,7 @@ private static final class AcquiredTransition extends BaseTransition { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { - // Clear ResourceRequest stored in RMContainer, we don't need to remember - // this anymore. + // Clear ResourceRequest stored in RMContainer container.setResourceRequests(null); // Register with containerAllocationExpirer. @@ -597,6 +597,17 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { } } + private static final class ContainerRescheduledTransition extends + FinishedTransition { + + @Override + public void transition(RMContainerImpl container, RMContainerEvent event) { + // Tell scheduler to recover request of this container to app + container.eventHandler.handle(new ContainerRescheduledEvent(container)); + super.transition(container, event); + } + } + private static class FinishedTransition extends BaseTransition { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 41a04f23da..ed93acfa87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -511,28 +511,20 @@ private RMContainer recoverAndCreateContainer(NMContainerStatus status, * Recover resource request back from RMContainer when a container is * preempted before AM pulled the same. If container is pulled by * AM, then RMContainer will not have resource request to recover. - * @param rmContainer rmContainer + * @param rmContainer */ - private void recoverResourceRequestForContainer(RMContainer rmContainer) { + protected void recoverResourceRequestForContainer(RMContainer rmContainer) { List requests = rmContainer.getResourceRequests(); // If container state is moved to ACQUIRED, request will be empty. if (requests == null) { return; } - - // Add resource request back to Scheduler ApplicationAttempt. - - // We lookup the application-attempt here again using - // getCurrentApplicationAttempt() because there is only one app-attempt at - // any point in the scheduler. But in corner cases, AMs can crash, - // corresponding containers get killed and recovered to the same-attempt, - // but because the app-attempt is extinguished right after, the recovered - // requests don't serve any purpose, but that's okay. - SchedulerApplicationAttempt schedulerAttempt = - getCurrentAttemptForContainer(rmContainer.getContainerId()); + // Add resource request back to Scheduler. + SchedulerApplicationAttempt schedulerAttempt + = getCurrentAttemptForContainer(rmContainer.getContainerId()); if (schedulerAttempt != null) { - schedulerAttempt.recoverResourceRequestsForContainer(requests); + schedulerAttempt.recoverResourceRequests(requests); } } @@ -567,30 +559,8 @@ public void clearPendingContainerCache() { } } - @VisibleForTesting - @Private // clean up a completed container - public void completedContainer(RMContainer rmContainer, - ContainerStatus containerStatus, RMContainerEventType event) { - - if (rmContainer == null) { - LOG.info("Container " + containerStatus.getContainerId() - + " completed with event " + event - + ", but corresponding RMContainer doesn't exist."); - return; - } - - completedContainerInternal(rmContainer, containerStatus, event); - - // If the container is getting killed in ACQUIRED state, the requester (AM - // for regular containers and RM itself for AM container) will not know what - // happened. Simply add the ResourceRequest back again so that requester - // doesn't need to do anything conditionally. - recoverResourceRequestForContainer(rmContainer); - } - - // clean up a completed container - protected abstract void completedContainerInternal(RMContainer rmContainer, + protected abstract void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event); protected void releaseContainers(List containers, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 631b4182af..973e9d37c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -289,15 +289,12 @@ public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId, * application, by asking for more resources and releasing resources acquired * by the application. * - * @param requests - * resources to be acquired - * @param recoverPreemptedRequestForAContainer - * recover ResourceRequest on preemption + * @param requests resources to be acquired + * @param recoverPreemptedRequest recover ResourceRequest on preemption * @return true if any resource was updated, false otherwise */ public synchronized boolean updateResourceRequests( - List requests, - boolean recoverPreemptedRequestForAContainer) { + List requests, boolean recoverPreemptedRequest) { // Flag to track if any incoming requests update "ANY" requests boolean anyResourcesUpdated = false; @@ -318,7 +315,7 @@ public synchronized boolean updateResourceRequests( // Increment number of containers if recovering preempted resources ResourceRequest lastRequest = asks.get(resourceName); - if (recoverPreemptedRequestForAContainer && lastRequest != null) { + if (recoverPreemptedRequest && lastRequest != null) { request.setNumContainers(lastRequest.getNumContainers() + 1); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerPreemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java similarity index 87% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerPreemptEvent.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java index 4b0be0c59e..7ab275820e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerPreemptEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java @@ -16,10 +16,12 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; /** * Simple event class used to communicate containers unreservations, preemption, killing diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java index 5a37295b08..c89696d153 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java @@ -31,7 +31,7 @@ public interface PreemptableResourceScheduler extends ResourceScheduler { * ask the scheduler to drop the reservation for the given container. * @param container Reference to reserved container allocation. */ - void killReservedContainer(RMContainer container); + void dropContainerReservation(RMContainer container); /** * Ask the scheduler to obtain back the container from a specific application @@ -45,6 +45,6 @@ public interface PreemptableResourceScheduler extends ResourceScheduler { * Ask the scheduler to forcibly interrupt the container given as input * @param container */ - void killPreemptedContainer(RMContainer container); + void killContainer(RMContainer container); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index d91c79ed99..b43c1060d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -322,7 +322,7 @@ public synchronized boolean updateResourceRequests( return false; } - public synchronized void recoverResourceRequestsForContainer( + public synchronized void recoverResourceRequests( List requests) { if (!isStopped) { appSchedulingInfo.updateResourceRequests(requests, true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 26b6a2ba2b..84b7d9bd06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -93,6 +93,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; @@ -114,14 +115,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -865,7 +865,7 @@ private synchronized void doneApplicationAttempt( LOG.info("Skip killing " + rmContainer.getContainerId()); continue; } - super.completedContainer( + completedContainer( rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), @@ -874,7 +874,7 @@ private synchronized void doneApplicationAttempt( // Release all reserved containers for (RMContainer rmContainer : attempt.getReservedContainers()) { - super.completedContainer( + completedContainer( rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), "Application Complete"), @@ -1047,7 +1047,7 @@ private synchronized void nodeUpdate(RMNode nm) { for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); RMContainer container = getRMContainer(containerId); - super.completedContainer(container, completedContainer, + completedContainer(container, completedContainer, RMContainerEventType.FINISHED); if (container != null) { releasedContainers++; @@ -1128,7 +1128,7 @@ private synchronized void updateLabelsOnNode(NodeId nodeId, // Unreserve container on this node RMContainer reservedContainer = node.getReservedContainer(); if (null != reservedContainer) { - killReservedContainer(reservedContainer); + dropContainerReservation(reservedContainer); } // Update node labels after we've done this @@ -1372,19 +1372,18 @@ public void handle(SchedulerEvent event) { ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerId = containerExpiredEvent.getContainerId(); - super.completedContainer(getRMContainer(containerId), + completedContainer(getRMContainer(containerId), SchedulerUtils.createAbnormalContainerStatus( containerId, SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE); } break; - case KILL_RESERVED_CONTAINER: + case DROP_RESERVATION: { - ContainerPreemptEvent killReservedContainerEvent = - (ContainerPreemptEvent) event; - RMContainer container = killReservedContainerEvent.getContainer(); - killReservedContainer(container); + ContainerPreemptEvent dropReservationEvent = (ContainerPreemptEvent)event; + RMContainer container = dropReservationEvent.getContainer(); + dropContainerReservation(container); } break; case PREEMPT_CONTAINER: @@ -1396,11 +1395,19 @@ public void handle(SchedulerEvent event) { preemptContainer(aid, containerToBePreempted); } break; - case KILL_PREEMPTED_CONTAINER: + case KILL_CONTAINER: { ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event; RMContainer containerToBeKilled = killContainerEvent.getContainer(); - killPreemptedContainer(containerToBeKilled); + killContainer(containerToBeKilled); + } + break; + case CONTAINER_RESCHEDULED: + { + ContainerRescheduledEvent containerRescheduledEvent = + (ContainerRescheduledEvent) event; + RMContainer container = containerRescheduledEvent.getContainer(); + recoverResourceRequestForContainer(container); } break; default: @@ -1455,7 +1462,7 @@ private synchronized void removeNode(RMNode nodeInfo) { // Remove running containers List runningContainers = node.getRunningContainers(); for (RMContainer container : runningContainers) { - super.completedContainer(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.LOST_CONTAINER), @@ -1465,7 +1472,7 @@ private synchronized void removeNode(RMNode nodeInfo) { // Remove reservations, if any RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - super.completedContainer(reservedContainer, + completedContainer(reservedContainer, SchedulerUtils.createAbnormalContainerStatus( reservedContainer.getContainerId(), SchedulerUtils.LOST_CONTAINER), @@ -1481,8 +1488,13 @@ private synchronized void removeNode(RMNode nodeInfo) { @Lock(CapacityScheduler.class) @Override - protected synchronized void completedContainerInternal(RMContainer rmContainer, + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { + if (rmContainer == null) { + LOG.info("Container " + containerStatus.getContainerId() + + " completed with event " + event); + return; + } Container container = rmContainer.getContainer(); @@ -1584,14 +1596,11 @@ public void recover(RMState state) throws Exception { } @Override - public void killReservedContainer(RMContainer container) { + public void dropContainerReservation(RMContainer container) { if(LOG.isDebugEnabled()){ - LOG.debug(SchedulerEventType.KILL_RESERVED_CONTAINER + ":" - + container.toString()); + LOG.debug("DROP_RESERVATION:" + container.toString()); } - // TODO: What happens if this is no longer a reserved container, for e.g if - // the reservation became an allocation. - super.completedContainer(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.UNRESERVED_CONTAINER), @@ -1601,24 +1610,23 @@ public void killReservedContainer(RMContainer container) { @Override public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) { if(LOG.isDebugEnabled()){ - LOG.debug(SchedulerEventType.PREEMPT_CONTAINER + ": appAttempt:" - + aid.toString() + " container: " + cont.toString()); + LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() + + " container: " + cont.toString()); } FiCaSchedulerApp app = getApplicationAttempt(aid); if (app != null) { - app.preemptContainer(cont.getContainerId()); + app.addPreemptContainer(cont.getContainerId()); } } @Override - public void killPreemptedContainer(RMContainer cont) { + public void killContainer(RMContainer cont) { if (LOG.isDebugEnabled()) { - LOG.debug(SchedulerEventType.KILL_PREEMPTED_CONTAINER + ": container" - + cont.toString()); + LOG.debug("KILL_CONTAINER: container" + cont.toString()); } - super.completedContainer(cont, SchedulerUtils - .createPreemptedContainerStatus(cont.getContainerId(), - SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); + completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus( + cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), + RMContainerEventType.KILL); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 39602936e4..4b88415ad8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -301,7 +301,7 @@ public synchronized Resource getTotalPendingRequests() { return ret; } - public synchronized void preemptContainer(ContainerId cont) { + public synchronized void addPreemptContainer(ContainerId cont) { // ignore already completed containers if (liveContainers.containsKey(cont)) { containersToPreempt.add(cont); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java new file mode 100644 index 0000000000..de2ce36d9a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +public class ContainerRescheduledEvent extends SchedulerEvent { + + private RMContainer container; + + public ContainerRescheduledEvent(RMContainer container) { + super(SchedulerEventType.CONTAINER_RESCHEDULED); + this.container = container; + } + + public RMContainer getContainer() { + return container; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index edc148f91d..40dd66b424 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -38,9 +38,11 @@ public enum SchedulerEventType { // Source: ContainerAllocationExpirer CONTAINER_EXPIRED, + // Source: RMContainer + CONTAINER_RESCHEDULED, + // Source: SchedulingEditPolicy - KILL_RESERVED_CONTAINER, - PREEMPT_CONTAINER, // Mark a container for preemption in the near future - KILL_PREEMPTED_CONTAINER // Kill a container previously marked for - // preemption + DROP_RESERVATION, + PREEMPT_CONTAINER, + KILL_CONTAINER } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 4df47cc844..9c16e493e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -497,7 +498,7 @@ protected void warnOrKillContainer(RMContainer container) { // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). - super.completedContainer(container, status, RMContainerEventType.KILL); + completedContainer(container, status, RMContainerEventType.KILL); LOG.info("Killing container" + container + " (after waiting for preemption for " + (getClock().getTime() - time) + "ms)"); @@ -806,7 +807,7 @@ private synchronized void removeApplicationAttempt( LOG.info("Skip killing " + rmContainer.getContainerId()); continue; } - super.completedContainer(rmContainer, + completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), @@ -815,7 +816,7 @@ private synchronized void removeApplicationAttempt( // Release all reserved containers for (RMContainer rmContainer : attempt.getReservedContainers()) { - super.completedContainer(rmContainer, + completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), "Application Complete"), @@ -842,9 +843,13 @@ private synchronized void removeApplicationAttempt( * Clean up a completed container. */ @Override - protected synchronized void completedContainerInternal( - RMContainer rmContainer, ContainerStatus containerStatus, - RMContainerEventType event) { + protected synchronized void completedContainer(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event) { + if (rmContainer == null) { + LOG.info("Container " + containerStatus.getContainerId() + + " completed with event " + event); + return; + } Container container = rmContainer.getContainer(); @@ -914,7 +919,7 @@ private synchronized void removeNode(RMNode rmNode) { // Remove running containers List runningContainers = node.getRunningContainers(); for (RMContainer container : runningContainers) { - super.completedContainer(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.LOST_CONTAINER), @@ -924,7 +929,7 @@ private synchronized void removeNode(RMNode rmNode) { // Remove reservations, if any RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - super.completedContainer(reservedContainer, + completedContainer(reservedContainer, SchedulerUtils.createAbnormalContainerStatus( reservedContainer.getContainerId(), SchedulerUtils.LOST_CONTAINER), @@ -1052,7 +1057,7 @@ private synchronized void nodeUpdate(RMNode nm) { for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), + completedContainer(getRMContainer(containerId), completedContainer, RMContainerEventType.FINISHED); } @@ -1297,12 +1302,21 @@ public void handle(SchedulerEvent event) { ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent)event; ContainerId containerId = containerExpiredEvent.getContainerId(); - super.completedContainer(getRMContainer(containerId), + completedContainer(getRMContainer(containerId), SchedulerUtils.createAbnormalContainerStatus( containerId, SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE); break; + case CONTAINER_RESCHEDULED: + if (!(event instanceof ContainerRescheduledEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + ContainerRescheduledEvent containerRescheduledEvent = + (ContainerRescheduledEvent) event; + RMContainer container = containerRescheduledEvent.getContainer(); + recoverResourceRequestForContainer(container); + break; default: LOG.error("Unknown event arrived at FairScheduler: " + event.toString()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 5787ba6299..8e75d1167d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -74,10 +74,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -467,7 +468,7 @@ private synchronized void doneApplicationAttempt( LOG.info("Skip killing " + container.getContainerId()); continue; } - super.completedContainer(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.KILL); @@ -738,7 +739,7 @@ private synchronized void nodeUpdate(RMNode rmNode) { for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), + completedContainer(getRMContainer(containerId), completedContainer, RMContainerEventType.FINISHED); } @@ -857,13 +858,21 @@ public void handle(SchedulerEvent event) { ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerid = containerExpiredEvent.getContainerId(); - super.completedContainer(getRMContainer(containerid), + completedContainer(getRMContainer(containerid), SchedulerUtils.createAbnormalContainerStatus( containerid, SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE); } break; + case CONTAINER_RESCHEDULED: + { + ContainerRescheduledEvent containerRescheduledEvent = + (ContainerRescheduledEvent) event; + RMContainer container = containerRescheduledEvent.getContainer(); + recoverResourceRequestForContainer(container); + } + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } @@ -871,9 +880,12 @@ public void handle(SchedulerEvent event) { @Lock(FifoScheduler.class) @Override - protected synchronized void completedContainerInternal( - RMContainer rmContainer, ContainerStatus containerStatus, - RMContainerEventType event) { + protected synchronized void completedContainer(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event) { + if (rmContainer == null) { + LOG.info("Null container completed..."); + return; + } // Get the application for the finished container Container container = rmContainer.getContainer(); @@ -919,7 +931,7 @@ private synchronized void removeNode(RMNode nodeInfo) { } // Kill running containers for(RMContainer container : node.getRunningContainers()) { - super.completedContainer(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.LOST_CONTAINER), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index a5d14c3b55..0372cd7855 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -275,8 +275,7 @@ public boolean waitForState(Collection nms, ContainerId containerId, nm.nodeHeartbeat(true); } container = getResourceScheduler().getRMContainer(containerId); - System.out.println("Waiting for container " + containerId + " to be " - + containerState + ", container is null right now."); + System.out.println("Waiting for container " + containerId + " to be allocated."); Thread.sleep(100); if (timeoutMillisecs <= timeoutSecs * 100) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java index a54aeec1e9..db7c96ab1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java @@ -29,8 +29,8 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.junit.Assert; @@ -55,11 +55,10 @@ public void testSchedulerEventDispatcherForPreemptionEvents() { ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class); RMContainer container = mock(RMContainer.class); ContainerPreemptEvent event1 = new ContainerPreemptEvent( - appAttemptId, container, SchedulerEventType.KILL_RESERVED_CONTAINER); + appAttemptId, container, SchedulerEventType.DROP_RESERVATION); rmDispatcher.getEventHandler().handle(event1); - ContainerPreemptEvent event2 = - new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.KILL_PREEMPTED_CONTAINER); + ContainerPreemptEvent event2 = new ContainerPreemptEvent( + appAttemptId, container, SchedulerEventType.KILL_CONTAINER); rmDispatcher.getEventHandler().handle(event2); ContainerPreemptEvent event3 = new ContainerPreemptEvent( appAttemptId, container, SchedulerEventType.PREEMPT_CONTAINER); @@ -67,9 +66,9 @@ public void testSchedulerEventDispatcherForPreemptionEvents() { // Wait for events to be processed by scheduler dispatcher. Thread.sleep(1000); verify(sched, times(3)).handle(any(SchedulerEvent.class)); - verify(sched).killReservedContainer(container); + verify(sched).dropContainerReservation(container); verify(sched).preemptContainer(appAttemptId, container); - verify(sched).killPreemptedContainer(container); + verify(sched).killContainer(container); } catch (InterruptedException e) { Assert.fail(); } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 3d0c82307c..f1fe1eabb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -566,7 +566,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Preempt the first attempt; - scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer)); + scheduler.killContainer(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); @@ -582,7 +582,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { // Preempt the second attempt. ContainerId amContainer2 = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); - scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer2)); + scheduler.killContainer(scheduler.getRMContainer(amContainer2)); am2.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry()); @@ -677,7 +677,7 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception { ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Forcibly preempt the am container; - scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer)); + scheduler.killContainer(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index d96f09cdd5..7a3ce568b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -23,7 +23,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_PREEMPTED_CONTAINER; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.PREEMPT_CONTAINER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; @@ -76,7 +77,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -289,7 +289,7 @@ public void testExpireKill() { List events = evtCaptor.getAllValues(); for (ContainerPreemptEvent e : events.subList(20, 20)) { assertEquals(appC, e.getAppId()); - assertEquals(KILL_PREEMPTED_CONTAINER, e.getType()); + assertEquals(KILL_CONTAINER, e.getType()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index fc2d9c436e..7c33f78358 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -29,7 +29,6 @@ import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -47,15 +46,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; @@ -498,114 +493,6 @@ public void testResourceRequestRestoreWhenRMContainerIsAtAllocated() } } - /** - * Test to verify that ResourceRequests recovery back to the right app-attempt - * after a container gets killed at ACQUIRED state: YARN-4502. - * - * @throws Exception - */ - @Test - public void testResourceRequestRecoveryToTheRightAppAttempt() - throws Exception { - - configureScheduler(); - YarnConfiguration conf = getConf(); - MockRM rm = new MockRM(conf); - try { - rm.start(); - RMApp rmApp = - rm.submitApp(200, "name", "user", - new HashMap(), false, "default", -1, - null, "Test", false, true); - MockNM node = - new MockNM("127.0.0.1:1234", 10240, rm.getResourceTrackerService()); - node.registerNode(); - - MockAM am1 = MockRM.launchAndRegisterAM(rmApp, rm, node); - ApplicationAttemptId applicationAttemptOneID = - am1.getApplicationAttemptId(); - ContainerId am1ContainerID = - ContainerId.newContainerId(applicationAttemptOneID, 1); - - // allocate NUM_CONTAINERS containers - am1.allocate("127.0.0.1", 1024, 1, new ArrayList()); - node.nodeHeartbeat(true); - - // wait for containers to be allocated. - List containers = - am1.allocate(new ArrayList(), - new ArrayList()).getAllocatedContainers(); - while (containers.size() != 1) { - node.nodeHeartbeat(true); - containers.addAll(am1.allocate(new ArrayList(), - new ArrayList()).getAllocatedContainers()); - Thread.sleep(200); - } - - // launch a 2nd container, for testing running-containers transfer. - node.nodeHeartbeat(applicationAttemptOneID, 2, ContainerState.RUNNING); - ContainerId runningContainerID = - ContainerId.newContainerId(applicationAttemptOneID, 2); - rm.waitForState(node, runningContainerID, RMContainerState.RUNNING); - - // 3rd container is in Allocated state. - int ALLOCATED_CONTAINER_PRIORITY = 1047; - am1.allocate("127.0.0.1", 1024, 1, ALLOCATED_CONTAINER_PRIORITY, - new ArrayList(), null); - node.nodeHeartbeat(true); - ContainerId allocatedContainerID = - ContainerId.newContainerId(applicationAttemptOneID, 3); - rm.waitForContainerAllocated(node, allocatedContainerID); - rm.waitForState(node, allocatedContainerID, RMContainerState.ALLOCATED); - RMContainer allocatedContainer = - rm.getResourceScheduler().getRMContainer(allocatedContainerID); - - // Capture scheduler app-attempt before AM crash. - SchedulerApplicationAttempt firstSchedulerAppAttempt = - ((AbstractYarnScheduler) rm - .getResourceScheduler()) - .getApplicationAttempt(applicationAttemptOneID); - - // AM crashes, and a new app-attempt gets created - node.nodeHeartbeat(applicationAttemptOneID, 1, ContainerState.COMPLETE); - rm.waitForState(node, am1ContainerID, RMContainerState.COMPLETED); - RMAppAttempt rmAppAttempt2 = MockRM.waitForAttemptScheduled(rmApp, rm); - ApplicationAttemptId applicationAttemptTwoID = - rmAppAttempt2.getAppAttemptId(); - Assert.assertEquals(2, applicationAttemptTwoID.getAttemptId()); - - // All outstanding allocated containers will be killed (irrespective of - // keep-alive of container across app-attempts) - Assert.assertEquals(RMContainerState.KILLED, - allocatedContainer.getState()); - - // The core part of this test - // The killed containers' ResourceRequests are recovered back to the - // original app-attempt, not the new one - for (ResourceRequest request : firstSchedulerAppAttempt - .getAppSchedulingInfo().getAllResourceRequests()) { - if (request.getPriority().getPriority() == 0) { - Assert.assertEquals(0, request.getNumContainers()); - } else if (request.getPriority().getPriority() == ALLOCATED_CONTAINER_PRIORITY) { - Assert.assertEquals(1, request.getNumContainers()); - } - } - - // Also, only one running container should be transferred after AM - // launches - MockRM.launchAM(rmApp, rm, node); - List transferredContainers = - rm.getResourceScheduler().getTransferredContainers( - applicationAttemptTwoID); - Assert.assertEquals(1, transferredContainers.size()); - Assert.assertEquals(runningContainerID, transferredContainers.get(0) - .getId()); - - } finally { - rm.stop(); - } - } - private void verifyMaximumResourceCapability( Resource expectedMaximumResource, YarnScheduler scheduler) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index e32a33b71a..2ad805a206 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -205,7 +205,7 @@ public void testApplicationPriorityAllocation() throws Exception { if (++counter > 2) { break; } - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); + cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId())); } // check node report, 12 GB used and 4 GB available @@ -513,7 +513,7 @@ public void testApplicationPriorityAllocationWithChangeInPriority() if (++counter > 2) { break; } - cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); + cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); iterator.remove(); } @@ -543,7 +543,7 @@ public void testApplicationPriorityAllocationWithChangeInPriority() if (++counter > 1) { break; } - cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); + cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); iterator.remove(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index e139df65f3..7c95cdca1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -1170,7 +1170,7 @@ public void testPreemptionInfo() throws Exception { // kill the 3 containers for (Container c : allocatedContainers) { - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); + cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId())); } // check values @@ -1179,7 +1179,7 @@ public void testPreemptionInfo() throws Exception { Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3); // kill app0-attempt0 AM container - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(app0 + cs.killContainer(schedulerAppAttempt.getRMContainer(app0 .getCurrentAppAttempt().getMasterContainer().getId())); // wait for app0 failed @@ -1202,7 +1202,7 @@ public void testPreemptionInfo() throws Exception { allocatedContainers = am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1); for (Container c : allocatedContainers) { - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); + cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId())); } // check values @@ -1251,7 +1251,7 @@ public void testRecoverRequestAfterPreemption() throws Exception { } // Call killContainer to preempt the container - cs.killPreemptedContainer(rmContainer); + cs.killContainer(rmContainer); Assert.assertEquals(3, requests.size()); for (ResourceRequest request : requests) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 1b1418a82d..430eba7107 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -18,11 +18,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.isA; @@ -50,7 +52,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.GroupMappingServiceProvider; import org.apache.hadoop.yarn.MockApps; @@ -94,11 +95,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; @@ -4735,11 +4735,11 @@ public void testDefaultRuleInitializesProperlyWhenPolicyNotConfigured() } } } - + @Test(timeout = 5000) public void testRecoverRequestAfterPreemption() throws Exception { conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10); - + ControlledClock clock = new ControlledClock(); scheduler.setClock(clock); scheduler.init(conf); @@ -4779,7 +4779,7 @@ public void testRecoverRequestAfterPreemption() throws Exception { assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers() .size()); - SchedulerApplicationAttempt app = scheduler.getSchedulerApp(appAttemptId); + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); // ResourceRequest will be empty once NodeUpdate is completed Assert.assertNull(app.getResourceRequest(priority, host)); @@ -4797,8 +4797,7 @@ public void testRecoverRequestAfterPreemption() throws Exception { scheduler.warnOrKillContainer(rmContainer); // Trigger container rescheduled event - scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer, - SchedulerEventType.KILL_PREEMPTED_CONTAINER)); + scheduler.handle(new ContainerRescheduledEvent(rmContainer)); List requests = rmContainer.getResourceRequests(); // Once recovered, resource request will be present again in app @@ -4821,6 +4820,7 @@ Collections. emptyList(), Assert.assertTrue(containers.size() == 1); } + @SuppressWarnings("resource") @Test public void testBlacklistNodes() throws Exception { scheduler.init(conf);