diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 4c7cdb125d..0efc81d6bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -1056,7 +1056,7 @@ private void copyPlacementQueueToSubmissionContext( LOG.info("Placed application with ID " + context.getApplicationId() + " in queue: " + placementContext.getQueue() + ", original submission queue was: " + context.getQueue()); - context.setQueue(placementContext.getQueue()); + context.setQueue(placementContext.getFullQueuePath()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index be0dbf2a1b..fe2a7bab9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -283,7 +283,7 @@ public void setUp() throws IOException { setupDispatcher(rmContext, conf); } - private static PlacementManager createMockPlacementManager( + public static PlacementManager createMockPlacementManager( String userRegex, String placementQueue, String placementParentQueue ) throws YarnException { PlacementManager placementMgr = mock(PlacementManager.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 52a34fbf76..a636738053 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -1651,6 +1651,97 @@ public void testUAMRecoveryOnRMWorkPreservingRestart() throws Exception { assertUnmanagedAMQueueMetrics(qm2, 1, 0, 0, 1); } + // Test behavior of an app if two same name leaf queue with different queuePath + // during work preserving rm restart with %specified mapping Placement Rule. + // Test case does following: + //1. Submit an apps to queue root.joe.test. + //2. While the applications is running, restart the rm and + // check whether the app submitted to the queue it was submitted initially. + //3. Verify that application running successfully. + @Test(timeout = 60000) + public void testQueueRecoveryOnRMWorkPreservingRestart() throws Exception { + if (getSchedulerType() != SchedulerType.CAPACITY) { + return; + } + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf); + + csConf.setQueues( + CapacitySchedulerConfiguration.ROOT, new String[] {"default", "joe", "john"}); + csConf.setCapacity( + CapacitySchedulerConfiguration.ROOT + "." + "joe", 25); + csConf.setCapacity( + CapacitySchedulerConfiguration.ROOT + "." + "john", 25); + csConf.setCapacity( + CapacitySchedulerConfiguration.ROOT + "." + "default", 50); + + final String q1 = CapacitySchedulerConfiguration.ROOT + "." + "joe"; + final String q2 = CapacitySchedulerConfiguration.ROOT + "." + "john"; + csConf.setQueues(q1, new String[] {"test"}); + csConf.setQueues(q2, new String[] {"test"}); + csConf.setCapacity( + CapacitySchedulerConfiguration.ROOT + "." + "joe.test", 100); + csConf.setCapacity( + CapacitySchedulerConfiguration.ROOT + "." + "john.test", 100); + + csConf.set(CapacitySchedulerConfiguration.MAPPING_RULE_JSON, + "{\"rules\" : [{\"type\": \"user\", \"policy\" : \"specified\", " + + "\"fallbackResult\" : \"skip\", \"matches\" : \"*\"}]}"); + + // start RM + rm1 = new MockRM(csConf); + rm1.start(); + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + RMContext newMockRMContext = rm1.getRMContext(); + newMockRMContext.setQueuePlacementManager(TestAppManager.createMockPlacementManager( + "user1|user2", "test", "root.joe")); + + MockRMAppSubmissionData data = + MockRMAppSubmissionData.Builder.createWithMemory(1024, rm1) + .withAppName("app") + .withQueue("root.joe.test") + .withUser("user1") + .withAcls(null) + .build(); + + RMApp app = MockRMAppSubmitter.submit(rm1, data); + MockAM am = MockRM.launchAndRegisterAM(app, rm1, nm1); + rm1.waitForState(app.getApplicationId(), RMAppState.RUNNING); + + MockRM rm2 = new MockRM(csConf, memStore) { + @Override + protected RMAppManager createRMAppManager() { + return new RMAppManager(this.rmContext, this.scheduler, + this.masterService, this.applicationACLsManager, conf) { + @Override + ApplicationPlacementContext placeApplication( + PlacementManager placementManager, + ApplicationSubmissionContext context, String user, + boolean isRecovery) throws YarnException { + return super.placeApplication( + newMockRMContext.getQueuePlacementManager(), context, user, isRecovery); + } + }; + } + }; + + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + rm2.start(); + RMApp recoveredApp0 = + rm2.getRMContext().getRMApps().get(app.getApplicationId()); + + rm2.waitForState(recoveredApp0.getApplicationId(), RMAppState.ACCEPTED); + am.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am.registerAppAttempt(true); + rm2.waitForState(recoveredApp0.getApplicationId(), RMAppState.RUNNING); + + Assert.assertEquals("root.joe.test", recoveredApp0.getQueue()); + } + private void assertUnmanagedAMQueueMetrics(QueueMetrics qm, int appsSubmitted, int appsPending, int appsRunning, int appsCompleted) { Assert.assertEquals(appsSubmitted, qm.getUnmanagedAppsSubmitted());