From 8f4456d4a1552548b8b227243da66c797f8ff103 Mon Sep 17 00:00:00 2001 From: Tamas Domok Date: Tue, 21 Sep 2021 16:13:04 +0200 Subject: [PATCH] YARN-10961. TestCapacityScheduler: reuse appHelper where feasible. Contributed by Tamas Domok Co-authored-by: Tamas Domok --- .../capacity/TestCapacityScheduler.java | 175 ++---------------- 1 file changed, 12 insertions(+), 163 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 074549187e..eea017848a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -1185,32 +1185,7 @@ public void testBlackListNodes() throws Exception { MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host); cs.handle(new NodeAddedSchedulerEvent(node)); - ApplicationId appId = BuilderUtils.newApplicationId(100, 1); - ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( - appId, 1); - - RMAppAttemptMetrics attemptMetric = - new RMAppAttemptMetrics(appAttemptId, rm.getRMContext()); - RMAppImpl app = mock(RMAppImpl.class); - when(app.getApplicationId()).thenReturn(appId); - RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class); - Container container = mock(Container.class); - when(attempt.getMasterContainer()).thenReturn(container); - ApplicationSubmissionContext submissionContext = mock( - ApplicationSubmissionContext.class); - when(attempt.getSubmissionContext()).thenReturn(submissionContext); - when(attempt.getAppAttemptId()).thenReturn(appAttemptId); - when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); - when(app.getCurrentAppAttempt()).thenReturn(attempt); - - rm.getRMContext().getRMApps().put(appId, app); - - SchedulerEvent addAppEvent = - new AppAddedSchedulerEvent(appId, "default", "user"); - cs.handle(addAppEvent); - SchedulerEvent addAttemptEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - cs.handle(addAttemptEvent); + ApplicationAttemptId appAttemptId = appHelper(rm, cs, 100, 1, "default", "user"); // Verify the blacklist can be updated independent of requesting containers cs.allocate(appAttemptId, Collections.emptyList(), null, @@ -1251,60 +1226,8 @@ public void testAllocateReorder() throws Exception { MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host); cs.handle(new NodeAddedSchedulerEvent(node)); - //add app begin - ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1); - ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId( - appId1, 1); - - RMAppAttemptMetrics attemptMetric1 = - new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext()); - RMAppImpl app1 = mock(RMAppImpl.class); - when(app1.getApplicationId()).thenReturn(appId1); - RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class); - Container container = mock(Container.class); - when(attempt1.getMasterContainer()).thenReturn(container); - ApplicationSubmissionContext submissionContext = mock( - ApplicationSubmissionContext.class); - when(attempt1.getSubmissionContext()).thenReturn(submissionContext); - when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1); - when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1); - when(app1.getCurrentAppAttempt()).thenReturn(attempt1); - - rm.getRMContext().getRMApps().put(appId1, app1); - - SchedulerEvent addAppEvent1 = - new AppAddedSchedulerEvent(appId1, "default", "user"); - cs.handle(addAppEvent1); - SchedulerEvent addAttemptEvent1 = - new AppAttemptAddedSchedulerEvent(appAttemptId1, false); - cs.handle(addAttemptEvent1); - //add app end - - //add app begin - ApplicationId appId2 = BuilderUtils.newApplicationId(100, 2); - ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId( - appId2, 1); - - RMAppAttemptMetrics attemptMetric2 = - new RMAppAttemptMetrics(appAttemptId2, rm.getRMContext()); - RMAppImpl app2 = mock(RMAppImpl.class); - when(app2.getApplicationId()).thenReturn(appId2); - RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class); - when(attempt2.getMasterContainer()).thenReturn(container); - when(attempt2.getSubmissionContext()).thenReturn(submissionContext); - when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2); - when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2); - when(app2.getCurrentAppAttempt()).thenReturn(attempt2); - - rm.getRMContext().getRMApps().put(appId2, app2); - - SchedulerEvent addAppEvent2 = - new AppAddedSchedulerEvent(appId2, "default", "user"); - cs.handle(addAppEvent2); - SchedulerEvent addAttemptEvent2 = - new AppAttemptAddedSchedulerEvent(appAttemptId2, false); - cs.handle(addAttemptEvent2); - //add app end + ApplicationAttemptId appAttemptId1 = appHelper(rm, cs, 100, 1, "default", "user"); + ApplicationAttemptId appAttemptId2 = appHelper(rm, cs, 100, 2, "default", "user"); RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -1326,7 +1249,7 @@ public void testAllocateReorder() throws Exception { //failling back to fifo (start) ordering assertEquals(q.getOrderingPolicy().getAssignmentIterator( IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(), - appId1.toString()); + appAttemptId1.getApplicationId().toString()); //Now, allocate for app2 (this would be the first/AM allocation) ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory); @@ -1340,7 +1263,7 @@ public void testAllocateReorder() throws Exception { //Now, the first app for assignment is app2 assertEquals(q.getOrderingPolicy().getAssignmentIterator( IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(), - appId2.toString()); + appAttemptId2.getApplicationId().toString()); rm.stop(); } @@ -2850,34 +2773,9 @@ public void testRemoveAttemptMoveAdded() throws Exception { RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1"); SchedulerEvent addNode = new NodeAddedSchedulerEvent(node); sch.handle(addNode); - // create appid - ApplicationId appId = BuilderUtils.newApplicationId(100, 1); - ApplicationAttemptId appAttemptId = - BuilderUtils.newApplicationAttemptId(appId, 1); - RMAppAttemptMetrics attemptMetric = - new RMAppAttemptMetrics(appAttemptId, rm.getRMContext()); - RMAppImpl app = mock(RMAppImpl.class); - when(app.getApplicationId()).thenReturn(appId); - RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class); - Container container = mock(Container.class); - when(attempt.getMasterContainer()).thenReturn(container); - ApplicationSubmissionContext submissionContext = - mock(ApplicationSubmissionContext.class); - when(attempt.getSubmissionContext()).thenReturn(submissionContext); - when(attempt.getAppAttemptId()).thenReturn(appAttemptId); - when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); - when(app.getCurrentAppAttempt()).thenReturn(attempt); + ApplicationAttemptId appAttemptId = appHelper(rm, sch, 100, 1, "a1", "user"); - rm.getRMContext().getRMApps().put(appId, app); - // Add application - SchedulerEvent addAppEvent = - new AppAddedSchedulerEvent(appId, "a1", "user"); - sch.handle(addAppEvent); - // Add application attempt - SchedulerEvent addAttemptEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - sch.handle(addAttemptEvent); // get Queues CSQueue queueA1 = sch.getQueue("a1"); CSQueue queueB = sch.getQueue("b"); @@ -2908,7 +2806,7 @@ public void testRemoveAttemptMoveAdded() throws Exception { sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId, RMAppAttemptState.KILLED, true)); // Move application to queue b1 - sch.moveApplication(appId, "b1"); + sch.moveApplication(appAttemptId.getApplicationId(), "b1"); // Check queue metrics after move Assert.assertEquals(0, queueA1.getNumApplications()); Assert.assertEquals(1, queueB.getNumApplications()); @@ -2916,7 +2814,7 @@ public void testRemoveAttemptMoveAdded() throws Exception { // Release attempt add event ApplicationAttemptId appAttemptId2 = - BuilderUtils.newApplicationAttemptId(appId, 2); + BuilderUtils.newApplicationAttemptId(appAttemptId.getApplicationId(), 2); SchedulerEvent addAttemptEvent2 = new AppAttemptAddedSchedulerEvent(appAttemptId2, true); sch.handle(addAttemptEvent2); @@ -4259,57 +4157,8 @@ public void testHeadRoomCalculationWithDRC() throws Exception { LeafQueue qb = (LeafQueue)cs.getQueue("default"); qb.setUserLimitFactor((float)0.8); - // add app 1 - ApplicationId appId = BuilderUtils.newApplicationId(100, 1); - ApplicationAttemptId appAttemptId = - BuilderUtils.newApplicationAttemptId(appId, 1); - - RMAppAttemptMetrics attemptMetric = - new RMAppAttemptMetrics(appAttemptId, rm.getRMContext()); - RMAppImpl app = mock(RMAppImpl.class); - when(app.getApplicationId()).thenReturn(appId); - RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class); - Container container = mock(Container.class); - when(attempt.getMasterContainer()).thenReturn(container); - ApplicationSubmissionContext submissionContext = mock( - ApplicationSubmissionContext.class); - when(attempt.getSubmissionContext()).thenReturn(submissionContext); - when(attempt.getAppAttemptId()).thenReturn(appAttemptId); - when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); - when(app.getCurrentAppAttempt()).thenReturn(attempt); - - rm.getRMContext().getRMApps().put(appId, app); - - SchedulerEvent addAppEvent = - new AppAddedSchedulerEvent(appId, "default", "user1"); - cs.handle(addAppEvent); - SchedulerEvent addAttemptEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - cs.handle(addAttemptEvent); - - // add app 2 - ApplicationId appId2 = BuilderUtils.newApplicationId(100, 2); - ApplicationAttemptId appAttemptId2 = - BuilderUtils.newApplicationAttemptId(appId2, 1); - - RMAppAttemptMetrics attemptMetric2 = - new RMAppAttemptMetrics(appAttemptId2, rm.getRMContext()); - RMAppImpl app2 = mock(RMAppImpl.class); - when(app2.getApplicationId()).thenReturn(appId2); - RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class); - when(attempt2.getMasterContainer()).thenReturn(container); - when(attempt2.getSubmissionContext()).thenReturn(submissionContext); - when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2); - when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2); - when(app2.getCurrentAppAttempt()).thenReturn(attempt2); - - rm.getRMContext().getRMApps().put(appId2, app2); - addAppEvent = - new AppAddedSchedulerEvent(appId2, "default", "user2"); - cs.handle(addAppEvent); - addAttemptEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId2, false); - cs.handle(addAttemptEvent); + ApplicationAttemptId appAttemptId = appHelper(rm, cs, 100, 1, "default", "user1"); + ApplicationAttemptId appAttemptId2 = appHelper(rm, cs, 100, 2, "default", "user2"); // add nodes to cluster, so cluster have 20GB and 20 vcores Resource newResource = Resource.newInstance(10 * GB, 10); @@ -4321,11 +4170,11 @@ public void testHeadRoomCalculationWithDRC() throws Exception { cs.handle(new NodeAddedSchedulerEvent(node2)); FiCaSchedulerApp fiCaApp1 = - cs.getSchedulerApplications().get(app.getApplicationId()) + cs.getSchedulerApplications().get(appAttemptId.getApplicationId()) .getCurrentAppAttempt(); FiCaSchedulerApp fiCaApp2 = - cs.getSchedulerApplications().get(app2.getApplicationId()) + cs.getSchedulerApplications().get(appAttemptId2.getApplicationId()) .getCurrentAppAttempt(); Priority u0Priority = TestUtils.createMockPriority(1); RecordFactory recordFactory =