From 8598b498bcaf4deffa822f871a26635bdf3d9d5c Mon Sep 17 00:00:00 2001 From: Eric E Payne Date: Fri, 28 Sep 2018 15:32:07 +0000 Subject: [PATCH] YARN-8774. Memory leak when CapacityScheduler allocates from reserved container with non-default label. Contributed by Tao Yang. --- .../allocator/RegularContainerAllocator.java | 18 +++++- .../TestNodeLabelContainerAllocation.java | 62 +++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index c0a11a0aae..c692fdf8af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -750,9 +750,25 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, // When reserving container RMContainer updatedContainer = reservedContainer; if (updatedContainer == null) { + AppPlacementAllocator ps = + application.getAppSchedulingInfo() + .getAppPlacementAllocator(schedulerKey); + if (null == ps) { + LOG.warn("Failed to get " + AppPlacementAllocator.class.getName() + + " for application=" + application.getApplicationId() + + " schedulerRequestKey=" + schedulerKey); + ActivitiesLogger.APP + .recordAppActivityWithoutAllocation(activitiesManager, node, + application, schedulerKey.getPriority(), + ActivityDiagnosticConstant. + PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST, + ActivityState.REJECTED); + return ContainerAllocation.PRIORITY_SKIPPED; + } updatedContainer = new RMContainerImpl(container, schedulerKey, application.getApplicationAttemptId(), node.getNodeID(), - application.getAppSchedulingInfo().getUser(), rmContext); + application.getAppSchedulingInfo().getUser(), rmContext, + ps.getPrimaryRequestedNodePartition()); } allocationResult.updatedContainer = updatedContainer; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 1836919d40..9cfddd66f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -547,6 +547,68 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.close(); } + @Test (timeout = 120000) + public void testRMContainerLeakInLeafQueue() throws Exception { + // set node -> label + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"), + NodeId.newInstance("h2", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = + new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) { + @Override public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x + rm1.registerNode("h2:1234", 8 * GB); // label = x + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(1 * GB, "app1", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + RMApp app2 = rm1.submitApp(1 * GB, "app2", "user", null, "a1"); + MockRM.launchAndRegisterAM(app2, rm1, nm1); + + // request a container. + am1.allocate("*", 7 * GB, 2, new ArrayList()); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1"); + + // Do node heartbeats 1 time + // scheduler will reserve a container for app1 + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + // Check if a 4G container allocated for app1, and 4G is reserved + FiCaSchedulerApp schedulerApp1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp1.getReservedContainers().size()); + + // kill app2 then do node heartbeat 1 time + // scheduler will allocate a container from the reserved container on nm1 + rm1.killApp(app2.getApplicationId()); + rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Assert.assertEquals(2, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(0, schedulerApp1.getReservedContainers().size()); + + // After kill app1, LeafQueue#ignorePartitionExclusivityRMContainers should + // be clean, otherwise resource leak happened + rm1.killApp(app1.getApplicationId()); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + Assert.assertEquals(0, leafQueue.getIgnoreExclusivityRMContainers().size()); + + rm1.close(); + } + private void checkPendingResource(MockRM rm, int priority, ApplicationAttemptId attemptId, int memory) { CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();