From 43589a8df70d4dbaf86609961e27cc4e23dda993 Mon Sep 17 00:00:00 2001 From: Mayank Bansal Date: Tue, 15 Jul 2014 21:48:58 +0000 Subject: [PATCH] YARN-1408 Preemption caused Invalid State Event: ACQUIRED at KILLED and caused a task timeout for 30mins. (Sunil G via mayank) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1610860 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../rmcontainer/RMContainer.java | 5 ++ .../rmcontainer/RMContainerImpl.java | 28 +++++- .../scheduler/AbstractYarnScheduler.java | 22 +++++ .../scheduler/AppSchedulingInfo.java | 69 +++++++++++---- .../SchedulerApplicationAttempt.java | 9 +- .../scheduler/capacity/CapacityScheduler.java | 1 + .../common/fica/FiCaSchedulerApp.java | 10 ++- .../scheduler/fair/FSSchedulerApp.java | 10 ++- .../scheduler/fair/FairScheduler.java | 3 +- .../rmcontainer/TestRMContainerImpl.java | 42 +++++++++ .../capacity/TestCapacityScheduler.java | 67 +++++++++++++++ .../scheduler/fair/FairSchedulerTestBase.java | 21 +++++ .../scheduler/fair/TestFairScheduler.java | 86 +++++++++++++++++++ 14 files changed, 352 insertions(+), 24 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 999f9fbc05..15c448e1d0 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -259,6 +259,9 @@ Release 2.5.0 - UNRELEASED YARN-2241. ZKRMStateStore: On startup, show nicer messages if znodes already exist. (Robert Kanter via kasha) + YARN-1408 Preemption caused Invalid State Event: ACQUIRED at KILLED and + caused a task timeout for 30mins. (Sunil G via mayank) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index 96150c3c2a..9e9dcb9aa6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; +import java.util.List; + import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; /** @@ -73,5 +76,7 @@ public interface RMContainer extends EventHandler { ContainerReport createContainerReport(); boolean isAMContainer(); + + List getResourceRequests(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index d79a0b7613..eef361f343 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import java.util.EnumSet; +import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -158,6 +160,7 @@ RMContainerEventType.RELEASED, new KillTransition()) private long finishTime; private ContainerStatus finishedStatus; private boolean isAMContainer; + private List resourceRequests; public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, @@ -180,7 +183,8 @@ public RMContainerImpl(Container container, this.eventHandler = rmContext.getDispatcher().getEventHandler(); this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer(); this.isAMContainer = false; - + this.resourceRequests = null; + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -311,6 +315,25 @@ public ContainerState getContainerState() { readLock.unlock(); } } + + @Override + public List getResourceRequests() { + try { + readLock.lock(); + return resourceRequests; + } finally { + readLock.unlock(); + } + } + + public void setResourceRequests(List requests) { + try { + writeLock.lock(); + this.resourceRequests = requests; + } finally { + writeLock.unlock(); + } + } @Override public String toString() { @@ -432,6 +455,9 @@ private static final class AcquiredTransition extends BaseTransition { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { + // Clear ResourceRequest stored in RMContainer + container.setResourceRequests(null); + // Register with containerAllocationExpirer. container.containerAllocationExpirer.register(container.getContainerId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 38678845ec..b3e835a54d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -275,6 +276,27 @@ private RMContainer recoverAndCreateContainer(NMContainerStatus status, return rmContainer; } + /** + * Recover resource request back from RMContainer when a container is + * preempted before AM pulled the same. If container is pulled by + * AM, then RMContainer will not have resource request to recover. + * @param rmContainer + */ + protected void recoverResourceRequestForContainer(RMContainer rmContainer) { + List requests = rmContainer.getResourceRequests(); + + // If container state is moved to ACQUIRED, request will be empty. + if (requests == null) { + return; + } + // Add resource request back to Scheduler. + SchedulerApplicationAttempt schedulerAttempt + = getCurrentAttemptForContainer(rmContainer.getContainerId()); + if (schedulerAttempt != null) { + schedulerAttempt.recoverResourceRequests(requests); + } + } + public SchedulerNode getSchedulerNode(NodeId nodeId) { return nodes.get(nodeId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 581321ca35..a127123a76 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -127,9 +127,10 @@ public int getNewContainerId() { * by the application. * * @param requests resources to be acquired + * @param recoverPreemptedRequest recover Resource Request on preemption */ synchronized public void updateResourceRequests( - List requests) { + List requests, boolean recoverPreemptedRequest) { QueueMetrics metrics = queue.getMetrics(); // Update resource requests @@ -163,8 +164,13 @@ synchronized public void updateResourceRequests( asks = new HashMap(); this.requests.put(priority, asks); this.priorities.add(priority); - } else if (updatePendingResources) { - lastRequest = asks.get(resourceName); + } + lastRequest = asks.get(resourceName); + + if (recoverPreemptedRequest && lastRequest != null) { + // Increment the number of containers to 1, as it is recovering a + // single container. + request.setNumContainers(lastRequest.getNumContainers() + 1); } asks.put(resourceName, request); @@ -254,14 +260,16 @@ public synchronized boolean isBlacklisted(String resourceName) { * @param container * the containers allocated. */ - synchronized public void allocate(NodeType type, SchedulerNode node, - Priority priority, ResourceRequest request, Container container) { + synchronized public List allocate(NodeType type, + SchedulerNode node, Priority priority, ResourceRequest request, + Container container) { + List resourceRequests = new ArrayList(); if (type == NodeType.NODE_LOCAL) { - allocateNodeLocal(node, priority, request, container); + allocateNodeLocal(node, priority, request, container, resourceRequests); } else if (type == NodeType.RACK_LOCAL) { - allocateRackLocal(node, priority, request, container); + allocateRackLocal(node, priority, request, container, resourceRequests); } else { - allocateOffSwitch(node, priority, request, container); + allocateOffSwitch(node, priority, request, container, resourceRequests); } QueueMetrics metrics = queue.getMetrics(); if (pending) { @@ -279,6 +287,7 @@ synchronized public void allocate(NodeType type, SchedulerNode node, + " resource=" + request.getCapability()); } metrics.allocateResources(user, 1, request.getCapability(), true); + return resourceRequests; } /** @@ -288,9 +297,9 @@ synchronized public void allocate(NodeType type, SchedulerNode node, * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateNodeLocal( - SchedulerNode node, Priority priority, - ResourceRequest nodeLocalRequest, Container container) { + synchronized private void allocateNodeLocal(SchedulerNode node, + Priority priority, ResourceRequest nodeLocalRequest, Container container, + List resourceRequests) { // Update future requirements nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1); if (nodeLocalRequest.getNumContainers() == 0) { @@ -304,7 +313,14 @@ synchronized private void allocateNodeLocal( this.requests.get(priority).remove(node.getRackName()); } - decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY)); + ResourceRequest offRackRequest = requests.get(priority).get( + ResourceRequest.ANY); + decrementOutstanding(offRackRequest); + + // Update cloned NodeLocal, RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(nodeLocalRequest)); + resourceRequests.add(cloneResourceRequest(rackLocalRequest)); + resourceRequests.add(cloneResourceRequest(offRackRequest)); } /** @@ -314,16 +330,22 @@ synchronized private void allocateNodeLocal( * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateRackLocal( - SchedulerNode node, Priority priority, - ResourceRequest rackLocalRequest, Container container) { + synchronized private void allocateRackLocal(SchedulerNode node, + Priority priority, ResourceRequest rackLocalRequest, Container container, + List resourceRequests) { // Update future requirements rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1); if (rackLocalRequest.getNumContainers() == 0) { this.requests.get(priority).remove(node.getRackName()); } - decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY)); + ResourceRequest offRackRequest = requests.get(priority).get( + ResourceRequest.ANY); + decrementOutstanding(offRackRequest); + + // Update cloned RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(rackLocalRequest)); + resourceRequests.add(cloneResourceRequest(offRackRequest)); } /** @@ -333,11 +355,13 @@ synchronized private void allocateRackLocal( * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateOffSwitch( - SchedulerNode node, Priority priority, - ResourceRequest offSwitchRequest, Container container) { + synchronized private void allocateOffSwitch(SchedulerNode node, + Priority priority, ResourceRequest offSwitchRequest, Container container, + List resourceRequests) { // Update future requirements decrementOutstanding(offSwitchRequest); + // Update cloned RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(offSwitchRequest)); } synchronized private void decrementOutstanding( @@ -436,4 +460,11 @@ public synchronized void recoverContainer(RMContainer rmContainer) { metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(), false); } + + public ResourceRequest cloneResourceRequest(ResourceRequest request) { + ResourceRequest newRequest = ResourceRequest.newInstance( + request.getPriority(), request.getResourceName(), + request.getCapability(), 1, request.getRelaxLocality()); + return newRequest; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 3a51417cdf..32dd23b08c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -241,7 +241,14 @@ public Queue getQueue() { public synchronized void updateResourceRequests( List requests) { if (!isStopped) { - appSchedulingInfo.updateResourceRequests(requests); + appSchedulingInfo.updateResourceRequests(requests, false); + } + } + + public synchronized void recoverResourceRequests( + List requests) { + if (!isStopped) { + appSchedulingInfo.updateResourceRequests(requests, true); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 92727e37f2..649eb92019 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1089,6 +1089,7 @@ public void killContainer(RMContainer cont) { if (LOG.isDebugEnabled()) { LOG.debug("KILL_CONTAINER: container" + cont.toString()); } + recoverResourceRequestForContainer(cont); completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus( cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 470cb106f1..846d1e1396 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -77,6 +78,9 @@ synchronized public boolean containerCompleted(RMContainer rmContainer, if (null == liveContainers.remove(rmContainer.getContainerId())) { return false; } + + // Remove from the list of newly allocated containers if found + newlyAllocatedContainers.remove(rmContainer); Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); @@ -129,8 +133,12 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations - appSchedulingInfo.allocate(type, node, priority, request, container); + List resourceRequestList = appSchedulingInfo.allocate( + type, node, priority, request, container); Resources.addTo(currentConsumption, container.getResource()); + + // Update resource requests related to "request" and store in RMContainer + ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList); // Inform the container rmContainer.handle( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index 63a29e4b09..20cf3952d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; @@ -82,6 +83,9 @@ synchronized public void containerCompleted(RMContainer rmContainer, Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); + // Remove from the list of newly allocated containers if found + newlyAllocatedContainers.remove(rmContainer); + // Inform the container rmContainer.handle( new RMContainerFinishedEvent( @@ -281,9 +285,13 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations - appSchedulingInfo.allocate(type, node, priority, request, container); + List resourceRequestList = appSchedulingInfo.allocate( + type, node, priority, request, container); Resources.addTo(currentConsumption, container.getResource()); + // Update resource requests related to "request" and store in RMContainer + ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); + // Inform the container rmContainer.handle( new RMContainerEvent(container.getId(), RMContainerEventType.START)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 6f9b76f010..7e867554f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -422,7 +422,7 @@ protected void preemptResources(Resource toPreempt) { } } - private void warnOrKillContainer(RMContainer container) { + protected void warnOrKillContainer(RMContainer container) { ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); FSSchedulerApp app = getSchedulerApp(appAttemptId); FSLeafQueue queue = app.getQueue(); @@ -440,6 +440,7 @@ private void warnOrKillContainer(RMContainer container) { SchedulerUtils.createPreemptedContainerStatus( container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); + recoverResourceRequestForContainer(container); // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). completedContainer(container, status, RMContainerEventType.KILL); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index b8f2986c77..44f8381b48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -26,6 +26,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -36,17 +39,24 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -204,4 +214,36 @@ public void testExpireWhileRunning() { assertEquals(RMContainerState.RUNNING, rmContainer.getState()); verify(writer, never()).containerFinished(any(RMContainer.class)); } + + @Test + public void testExistenceOfResourceRequestInRMContainer() throws Exception { + Configuration conf = new Configuration(); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000); + RMApp app1 = rm1.submitApp(1024); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + ResourceScheduler scheduler = rm1.getResourceScheduler(); + + // request a container. + am1.allocate("127.0.0.1", 1024, 1, new ArrayList()); + ContainerId containerId2 = ContainerId.newInstance( + am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + + // Verify whether list of ResourceRequest is present in RMContainer + // while moving to ALLOCATED state + Assert.assertNotNull(scheduler.getRMContainer(containerId2) + .getResourceRequests()); + + // Allocate container + am1.allocate(new ArrayList(), new ArrayList()) + .getAllocatedContainers(); + rm1.waitForState(nm1, containerId2, RMContainerState.ACQUIRED); + + // After RMContainer moving to ACQUIRED state, list of ResourceRequest will + // be empty + Assert.assertNull(scheduler.getRMContainer(containerId2) + .getResourceRequests()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 3ca3c480c5..0efd48fa28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedAction; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -79,6 +80,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -87,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -947,4 +951,67 @@ public void testPreemptionInfo() throws Exception { rm1.stop(); } + + @Test(timeout = 30000) + public void testRecoverRequestAfterPreemption() throws Exception { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000); + RMApp app1 = rm1.submitApp(1024); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + // request a container. + am1.allocate("127.0.0.1", 1024, 1, new ArrayList()); + ContainerId containerId1 = ContainerId.newInstance( + am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId1, RMContainerState.ALLOCATED); + + RMContainer rmContainer = cs.getRMContainer(containerId1); + List requests = rmContainer.getResourceRequests(); + FiCaSchedulerApp app = cs.getApplicationAttempt(am1 + .getApplicationAttemptId()); + + FiCaSchedulerNode node = cs.getNode(rmContainer.getAllocatedNode()); + for (ResourceRequest request : requests) { + // Skip the OffRack and RackLocal resource requests. + if (request.getResourceName().equals(node.getRackName()) + || request.getResourceName().equals(ResourceRequest.ANY)) { + continue; + } + + // Already the node local resource request is cleared from RM after + // allocation. + Assert.assertNull(app.getResourceRequest(request.getPriority(), + request.getResourceName())); + } + + // Call killContainer to preempt the container + cs.killContainer(rmContainer); + + Assert.assertEquals(3, requests.size()); + for (ResourceRequest request : requests) { + // Resource request must have added back in RM after preempt event + // handling. + Assert.assertEquals( + 1, + app.getResourceRequest(request.getPriority(), + request.getResourceName()).getNumContainers()); + } + + // New container will be allocated and will move to ALLOCATED state + ContainerId containerId2 = ContainerId.newInstance( + am1.getApplicationAttemptId(), 3); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + + // allocate container + List containers = am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + + // Now with updated ResourceRequest, a container is allocated for AM. + Assert.assertTrue(containers.size() == 1); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index ec942f9769..50f61d8d59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -167,6 +167,27 @@ protected ApplicationAttemptId createSchedulingRequest( .put(id.getApplicationId(), rmApp); return id; } + + protected ApplicationAttemptId createSchedulingRequest(String queueId, + String userId, List ask) { + ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, + this.ATTEMPT_ID++); + scheduler.addApplication(id.getApplicationId(), queueId, userId); + // This conditional is for testAclSubmitApplication where app is rejected + // and no app is added. + if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { + scheduler.addApplicationAttempt(id, false, true); + } + scheduler.allocate(id, ask, new ArrayList(), null, null); + RMApp rmApp = mock(RMApp.class); + RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class); + when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); + when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn( + new RMAppAttemptMetrics(id)); + resourceManager.getRMContext().getRMApps() + .put(id.getApplicationId(), rmApp); + return id; + } protected void createSchedulingRequestExistingApplication( int memory, int priority, ApplicationAttemptId attId) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index dbc79d907d..20c386714d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -53,10 +53,13 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; @@ -77,11 +80,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -2831,6 +2836,87 @@ public void testDefaultRuleInitializesProperlyWhenPolicyNotConfigured() } } } + + @Test(timeout=5000) + public void testRecoverRequestAfterPreemption() throws Exception { + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10); + + MockClock clock = new MockClock(); + scheduler.setClock(clock); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + Priority priority = Priority.newInstance(20); + String host = "127.0.0.1"; + int GB = 1024; + + // Create Node and raised Node Added event + RMNode node = MockNodes.newNodeInfo(1, + Resources.createResource(16 * 1024, 4), 0, host); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + scheduler.handle(nodeEvent); + + // Create 3 container requests and place it in ask + List ask = new ArrayList(); + ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host, + priority.getPriority(), 1, true); + ResourceRequest rackLocalRequest = createResourceRequest(GB, 1, + node.getRackName(), priority.getPriority(), 1, true); + ResourceRequest offRackRequest = createResourceRequest(GB, 1, + ResourceRequest.ANY, priority.getPriority(), 1, true); + ask.add(nodeLocalRequest); + ask.add(rackLocalRequest); + ask.add(offRackRequest); + + // Create Request and update + ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA", + "user1", ask); + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeUpdate); + + assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers() + .size()); + FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId); + + // ResourceRequest will be empty once NodeUpdate is completed + Assert.assertNull(app.getResourceRequest(priority, host)); + + ContainerId containerId1 = ContainerId.newInstance(appAttemptId, 1); + RMContainer rmContainer = app.getRMContainer(containerId1); + + // Create a preempt event and register for preemption + scheduler.warnOrKillContainer(rmContainer); + + // Wait for few clock ticks + clock.tick(5); + + // preempt now + scheduler.warnOrKillContainer(rmContainer); + + List requests = rmContainer.getResourceRequests(); + // Once recovered, resource request will be present again in app + Assert.assertEquals(3, requests.size()); + for (ResourceRequest request : requests) { + Assert.assertEquals(1, + app.getResourceRequest(priority, request.getResourceName()) + .getNumContainers()); + } + + // Send node heartbeat + scheduler.update(); + scheduler.handle(nodeUpdate); + + List containers = scheduler.allocate(appAttemptId, + Collections. emptyList(), + Collections. emptyList(), null, null).getContainers(); + + // Now with updated ResourceRequest, a container is allocated for AM. + Assert.assertTrue(containers.size() == 1); + } @SuppressWarnings("resource") @Test