From cfe89e6f963ba25b5fff1ce48cad36d74b3c789c Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Thu, 3 Jan 2019 23:56:28 +0800 Subject: [PATCH] YARN-9164. Shutdown NM may cause NPE when opportunistic container scheduling is enabled. Contributed by lujie. --- ...ortunisticContainerAllocatorAMService.java | 8 +- .../scheduler/AbstractYarnScheduler.java | 12 ++- .../scheduler/SchedulerUtils.java | 8 +- ...ortunisticContainerAllocatorAMService.java | 78 +++++++++++++++++++ 4 files changed, 97 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index 15c2a896e6..69a704ee96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -348,9 +348,11 @@ private void handleNewContainers(List allocContainers, RMContainer rmContainer = SchedulerUtils.createOpportunisticRmContainer( rmContext, container, isRemotelyAllocated); - rmContainer.handle( - new RMContainerEvent(container.getId(), - RMContainerEventType.ACQUIRED)); + if (rmContainer!=null) { + rmContainer.handle( + new RMContainerEvent(container.getId(), + RMContainerEventType.ACQUIRED)); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 8612e8001c..0ad47bb8f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -693,8 +693,10 @@ public void completedContainer(RMContainer rmContainer, LOG.debug("Completed container: " + rmContainer.getContainerId() + " in state: " + rmContainer.getState() + " event:" + event); } - getSchedulerNode(rmContainer.getNodeId()).releaseContainer( - rmContainer.getContainerId(), false); + SchedulerNode node = getSchedulerNode(rmContainer.getNodeId()); + if (node != null) { + node.releaseContainer(rmContainer.getContainerId(), false); + } } // If the container is getting killed in ACQUIRED state, the requester (AM @@ -1300,8 +1302,10 @@ private void handleDecreaseRequests(SchedulerApplicationAttempt appAttempt, uReq.getContainerUpdateType()) { RMContainer demotedRMContainer = createDemotedRMContainer(appAttempt, oppCntxt, rmContainer); - appAttempt.addToNewlyDemotedContainers( - uReq.getContainerId(), demotedRMContainer); + if (demotedRMContainer != null) { + appAttempt.addToNewlyDemotedContainers( + uReq.getContainerId(), demotedRMContainer); + } } else { RMContainer demotedRMContainer = createDecreasedRMContainer( appAttempt, uReq, rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 9a02b6e75c..a048dacb68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -564,6 +564,11 @@ public static boolean hasPendingResourceRequest(ResourceCalculator rc, public static RMContainer createOpportunisticRmContainer(RMContext rmContext, Container container, boolean isRemotelyAllocated) { + SchedulerNode node = ((AbstractYarnScheduler) rmContext.getScheduler()) + .getNode(container.getNodeId()); + if (node == null) { + return null; + } SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler) rmContext.getScheduler()) .getCurrentAttemptForContainer(container.getId()); @@ -572,8 +577,7 @@ public static RMContainer createOpportunisticRmContainer(RMContext rmContext, appAttempt.getApplicationAttemptId(), container.getNodeId(), appAttempt.getUser(), rmContext, isRemotelyAllocated); appAttempt.addRMContainer(container.getId(), rmContainer); - ((AbstractYarnScheduler) rmContext.getScheduler()).getNode( - container.getNodeId()).allocateContainer(rmContainer); + node.allocateContainer(rmContainer); return rmContainer; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 5542157c94..b2be1e732b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -23,6 +23,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; @@ -72,14 +73,19 @@ import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -88,6 +94,7 @@ .FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Assert; @@ -95,12 +102,17 @@ import org.junit.Test; import org.mockito.Mockito; +import com.google.common.base.Supplier; + +import static org.junit.Assert.fail; + import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.concurrent.TimeoutException; /** * Test cases for {@link OpportunisticContainerAllocatorAMService}. @@ -798,6 +810,72 @@ public void testNodeRemovalDuringAllocate() throws Exception { Assert.assertEquals(1, ctxt.getNodeMap().size()); } + @Test(timeout = 60000) + public void testAppAttemptRemovalAfterNodeRemoval() throws Exception { + MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService()); + nm.registerNode(); + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + RMApp app = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId = + app.getCurrentAppAttempt().getAppAttemptId(); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm); + ResourceScheduler scheduler = rm.getResourceScheduler(); + SchedulerApplicationAttempt schedulerAttempt = + ((CapacityScheduler)scheduler).getApplicationAttempt(attemptId); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm.getNodeId()); + nm.nodeHeartbeat(true); + ((RMNodeImpl) rmNode1) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + // Send add and update node events to AM Service. + amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); + try { + GenericTestUtils.waitFor(new Supplier() { + @Override public Boolean get() { + return scheduler.getNumClusterNodes() == 1; + } + }, 10, 200 * 100); + }catch (TimeoutException e) { + fail("timed out while waiting for NM to add."); + } + AllocateResponse allocateResponse = am.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))), + null); + List allocatedContainers = allocateResponse + .getAllocatedContainers(); + Container container = allocatedContainers.get(0); + scheduler.handle(new NodeRemovedSchedulerEvent(rmNode1)); + try { + GenericTestUtils.waitFor(new Supplier() { + @Override public Boolean get() { + return scheduler.getNumClusterNodes() == 0; + } + }, 10, 200 * 100); + }catch (TimeoutException e) { + fail("timed out while waiting for NM to remove."); + } + //test YARN-9165 + RMContainer rmContainer = null; + rmContainer = SchedulerUtils.createOpportunisticRmContainer( + rm.getRMContext(), container, true); + if (rmContainer == null) { + rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), + schedulerAttempt.getApplicationAttemptId(), container.getNodeId(), + schedulerAttempt.getUser(), rm.getRMContext(), true); + } + assert(rmContainer!=null); + //test YARN-9164 + schedulerAttempt.addRMContainer(container.getId(), rmContainer); + scheduler.handle(new AppAttemptRemovedSchedulerEvent(attemptId, + RMAppAttemptState.FAILED, false)); + } + private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime, int queueLength) { OpportunisticContainersStatus status1 =