diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9a6f4d2f4e..7a94e093b6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -643,6 +643,9 @@ Release 2.8.0 - UNRELEASED YARN-3885. ProportionalCapacityPreemptionPolicy doesn't preempt if queue is more than 2 level. (Ajith S via wangda) + YARN-3535. Scheduler must re-request container resources when RMContainer transitions + from ALLOCATED to KILLED (rohithsharma and peng.zhang via asuresh) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 0ad63b4f35..f7d3f56123 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; @@ -94,7 +95,7 @@ RMContainerEventType.ACQUIRED, new AcquiredTransition()) .addTransition(RMContainerState.ALLOCATED, RMContainerState.EXPIRED, RMContainerEventType.EXPIRE, new FinishedTransition()) .addTransition(RMContainerState.ALLOCATED, RMContainerState.KILLED, - RMContainerEventType.KILL, new FinishedTransition()) + RMContainerEventType.KILL, new ContainerRescheduledTransition()) // Transitions from ACQUIRED state .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, @@ -495,6 +496,17 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { } } + private static final class ContainerRescheduledTransition extends + FinishedTransition { + + @Override + public void transition(RMContainerImpl container, RMContainerEvent event) { + // Tell scheduler to recover request of this container to app + container.eventHandler.handle(new ContainerRescheduledEvent(container)); + super.transition(container, event); + } + } + private static class FinishedTransition extends BaseTransition { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 141aa7fb09..559dfc6713 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -108,6 +108,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; @@ -1370,6 +1371,14 @@ public void handle(SchedulerEvent event) { killContainer(containerToBeKilled); } break; + case CONTAINER_RESCHEDULED: + { + ContainerRescheduledEvent containerRescheduledEvent = + (ContainerRescheduledEvent) event; + RMContainer container = containerRescheduledEvent.getContainer(); + recoverResourceRequestForContainer(container); + } + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } @@ -1543,7 +1552,6 @@ public void killContainer(RMContainer cont) { if (LOG.isDebugEnabled()) { LOG.debug("KILL_CONTAINER: container" + cont.toString()); } - recoverResourceRequestForContainer(cont); completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus( cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java new file mode 100644 index 0000000000..de2ce36d9a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +public class ContainerRescheduledEvent extends SchedulerEvent { + + private RMContainer container; + + public ContainerRescheduledEvent(RMContainer container) { + super(SchedulerEventType.CONTAINER_RESCHEDULED); + this.container = container; + } + + public RMContainer getContainer() { + return container; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 9de935b50f..40dd66b424 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -38,6 +38,9 @@ public enum SchedulerEventType { // Source: ContainerAllocationExpirer CONTAINER_EXPIRED, + // Source: RMContainer + CONTAINER_RESCHEDULED, + // Source: SchedulingEditPolicy DROP_RESERVATION, PREEMPT_CONTAINER, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index efe654455a..3eefb8f728 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -462,7 +463,6 @@ protected void warnOrKillContainer(RMContainer container) { SchedulerUtils.createPreemptedContainerStatus( container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); - recoverResourceRequestForContainer(container); // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). completedContainer(container, status, RMContainerEventType.KILL); @@ -1236,6 +1236,15 @@ public void handle(SchedulerEvent event) { SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE); break; + case CONTAINER_RESCHEDULED: + if (!(event instanceof ContainerRescheduledEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + ContainerRescheduledEvent containerRescheduledEvent = + (ContainerRescheduledEvent) event; + RMContainer container = containerRescheduledEvent.getContainer(); + recoverResourceRequestForContainer(container); + break; default: LOG.error("Unknown event arrived at FairScheduler: " + event.toString()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index b8c419cdec..e66c02cc7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -846,6 +847,14 @@ public void handle(SchedulerEvent event) { RMContainerEventType.EXPIRE); } break; + case CONTAINER_RESCHEDULED: + { + ContainerRescheduledEvent containerRescheduledEvent = + (ContainerRescheduledEvent) event; + RMContainer container = containerRescheduledEvent.getContainer(); + recoverResourceRequestForContainer(container); + } + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 3387f41f51..d579595113 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -247,7 +247,7 @@ public void testAMRestartWithExistingContainers() throws Exception { private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt) throws InterruptedException { int count = 0; - while (attempt.getJustFinishedContainers().size() != expectedNum + while (attempt.getJustFinishedContainers().size() < expectedNum && count < 500) { Thread.sleep(100); count++; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index 91dd2490d1..ffd1c1f40d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -22,14 +22,20 @@ import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -40,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -403,6 +410,89 @@ public void testReleasedContainerIfAppAttemptisNull() throws Exception { } } + @Test(timeout = 60000) + public void testResourceRequestRestoreWhenRMContainerIsAtAllocated() + throws Exception { + configureScheduler(); + YarnConfiguration conf = getConf(); + MockRM rm1 = new MockRM(conf); + try { + rm1.start(); + RMApp app1 = + rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", + -1, null, "Test", false, true); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService()); + nm1.registerNode(); + + MockNM nm2 = + new MockNM("127.0.0.1:2351", 10240, rm1.getResourceTrackerService()); + nm2.registerNode(); + + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + int NUM_CONTAINERS = 1; + // allocate NUM_CONTAINERS containers + am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS, + new ArrayList()); + nm1.nodeHeartbeat(true); + + // wait for containers to be allocated. + List containers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (containers.size() != NUM_CONTAINERS) { + nm1.nodeHeartbeat(true); + containers.addAll(am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(200); + } + + // launch the 2nd container, for testing running container transferred. + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + + // 3rd container is in Allocated state. + am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS, + new ArrayList()); + nm2.nodeHeartbeat(true); + ContainerId containerId3 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); + rm1.waitForContainerAllocated(nm2, containerId3); + rm1.waitForState(nm2, containerId3, RMContainerState.ALLOCATED); + + // NodeManager restart + nm2.registerNode(); + + // NM restart kills all allocated and running containers. + rm1.waitForState(nm2, containerId3, RMContainerState.KILLED); + + // The killed RMContainer request should be restored. In successive + // nodeHeartBeats AM should be able to get container allocated. + containers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (containers.size() != NUM_CONTAINERS) { + nm2.nodeHeartbeat(true); + containers.addAll(am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(200); + } + + nm2.nodeHeartbeat(am1.getApplicationAttemptId(), 4, + ContainerState.RUNNING); + ContainerId containerId4 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 4); + rm1.waitForState(nm2, containerId4, RMContainerState.RUNNING); + } finally { + rm1.stop(); + } + } + private void verifyMaximumResourceCapability( Resource expectedMaximumResource, AbstractYarnScheduler scheduler) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 2260f734b3..c352cc9ca4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -4453,6 +4454,9 @@ public void testRecoverRequestAfterPreemption() throws Exception { // preempt now scheduler.warnOrKillContainer(rmContainer); + // Trigger container rescheduled event + scheduler.handle(new ContainerRescheduledEvent(rmContainer)); + List requests = rmContainer.getResourceRequests(); // Once recovered, resource request will be present again in app Assert.assertEquals(3, requests.size());