From f2d3ac2a3f27a849e00f529c5c2df6ef0bd82911 Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Thu, 19 Mar 2020 12:48:30 +0530 Subject: [PATCH] YARN-10034. Remove Allocation Tags from released container from Decommission node Contributed by Kyungwan Nam. Reviewed by Adam Antal. --- .../scheduler/capacity/CapacityScheduler.java | 1 + .../scheduler/fair/FairScheduler.java | 1 + .../scheduler/TestAbstractYarnScheduler.java | 86 +++++++++++++++++++ 3 files changed, 88 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 174a699545..fbd7a9388c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2044,6 +2044,7 @@ public class CapacityScheduler extends super.completedContainer(container, SchedulerUtils .createAbnormalContainerStatus(container.getContainerId(), SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); + node.releaseContainer(container.getContainerId(), true); } // Remove reservations, if any diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3c9dcb155a..bb3a863a26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -795,6 +795,7 @@ public class FairScheduler extends super.completedContainer(container, SchedulerUtils .createAbnormalContainerStatus(container.getContainerId(), SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); + node.releaseContainer(container.getContainerId(), true); } // Remove reservations, if any diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index 48f6654c55..a30d3782b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -537,6 +537,92 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { } } + + @Test(timeout = 30000L) + public void testNodeRemovedWithAllocationTags() throws Exception { + // Currently only can be tested against capacity scheduler. + if (getSchedulerType().equals(SchedulerType.CAPACITY)) { + final String testTag1 = "some-tag"; + YarnConfiguration conf = getConf(); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, "scheduler"); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = new MockNM("127.0.0.1:1234", + 10240, rm1.getResourceTrackerService()); + nm1.registerNode(); + MockRMAppSubmissionData data = + MockRMAppSubmissionData.Builder.createWithMemory(200, rm1) + .withAppName("name") + .withUser("user") + .withAcls(new HashMap<>()) + .withUnmanagedAM(false) + .withQueue("default") + .withMaxAppAttempts(-1) + .withCredentials(null) + .withAppType("Test") + .withWaitForAppAcceptedState(false) + .withKeepContainers(true) + .build(); + RMApp app1 = + MockRMAppSubmitter.submit(rm1, data); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // allocate 1 container with tag1 + SchedulingRequest sr = SchedulingRequest + .newInstance(1L, Priority.newInstance(1), + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), + Sets.newHashSet(testTag1), + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)), + null); + + AllocateRequest ar = AllocateRequest.newBuilder() + .schedulingRequests(Lists.newArrayList(sr)).build(); + am1.allocate(ar); + nm1.nodeHeartbeat(true); + + List allocated = new ArrayList<>(); + while (allocated.size() < 1) { + AllocateResponse rsp = am1 + .allocate(new ArrayList<>(), new ArrayList<>()); + allocated.addAll(rsp.getAllocatedContainers()); + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + } + + Assert.assertEquals(1, allocated.size()); + + Set containers = allocated.stream() + .filter(container -> container.getAllocationRequestId() == 1L) + .collect(Collectors.toSet()); + Assert.assertNotNull(containers); + Assert.assertEquals(1, containers.size()); + ContainerId cid = containers.iterator().next().getId(); + + // mock container start + rm1.getRMContext().getScheduler() + .getSchedulerNode(nm1.getNodeId()).containerStarted(cid); + + // verifies the allocation is made with correct number of tags + Map nodeTags = rm1.getRMContext() + .getAllocationTagsManager() + .getAllocationTagsWithCount(nm1.getNodeId()); + Assert.assertNotNull(nodeTags.get(testTag1)); + Assert.assertEquals(1, nodeTags.get(testTag1).intValue()); + + // remove the node + RMNode node1 = MockNodes.newNodeInfo( + 0, Resources.createResource(nm1.getMemory()), 1, "127.0.0.1", 1234); + rm1.getRMContext().getScheduler().handle( + new NodeRemovedSchedulerEvent(node1)); + + // Once the node is removed, the tag should be removed immediately + nodeTags = rm1.getRMContext().getAllocationTagsManager() + .getAllocationTagsWithCount(nm1.getNodeId()); + Assert.assertNull(nodeTags); + } + } + + @Test(timeout=60000) public void testContainerReleasedByNode() throws Exception { System.out.println("Starting testContainerReleasedByNode");