From 57cbde9abf0eed348e2bbab89732f881b907af13 Mon Sep 17 00:00:00 2001 From: Ashutosh Gupta Date: Sat, 2 Jul 2022 17:28:56 +0100 Subject: [PATCH] YARN-10287.Update scheduler-conf corrupts the CS configuration when removing queue which is referred in queue mapping (#4515) * YARN-10287.Update scheduler-conf corrupts the CS configuration when removing queue which is referred in queue mapping Co-authored-by: Ashutosh Gupta --- .../scheduler/capacity/CapacityScheduler.java | 1 + ...estRMWebServicesConfigurationMutation.java | 73 +++++++++++++++---- 2 files changed, 61 insertions(+), 13 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index cf5034ba22..e513359af0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -455,6 +455,7 @@ public void reinitialize(Configuration newConf, RMContext rmContext, reinitializeQueues(this.conf); } catch (Throwable t) { this.conf = oldConf; + reinitializeQueues(this.conf); refreshMaximumAllocation( ResourceUtils.fetchMaximumAllocationFromConfig(this.conf)); throw new IOException("Failed to re-init queues : " + t.getMessage(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java index 675e79243f..43b5ee7127 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java @@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.webapp.JerseyTestBase; import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; +import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils; + import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -69,8 +71,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNotNull; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY; import static org.junit.Assert.assertTrue; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY; /** * Test scheduler configuration mutation via REST API. @@ -145,7 +147,7 @@ public void setUp() throws Exception { private static void setupQueueConfiguration( CapacitySchedulerConfiguration config) { config.setQueues(CapacitySchedulerConfiguration.ROOT, - new String[]{"a", "b", "c"}); + new String[]{"a", "b", "c", "mappedqueue"}); final String a = CapacitySchedulerConfiguration.ROOT + ".a"; config.setCapacity(a, 25f); @@ -166,6 +168,11 @@ private static void setupQueueConfiguration( final String c1 = c + ".c1"; config.setQueues(c, new String[] {"c1"}); config.setCapacity(c1, 0f); + + final String d = CapacitySchedulerConfiguration.ROOT + ".d"; + config.setCapacity(d, 0f); + config.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, + "g:hadoop:mappedqueue"); } public TestRMWebServicesConfigurationMutation() { @@ -201,14 +208,14 @@ private CapacitySchedulerConfiguration getSchedulerConf() public void testGetSchedulerConf() throws Exception { CapacitySchedulerConfiguration orgConf = getSchedulerConf(); assertNotNull(orgConf); - assertEquals(3, orgConf.getQueues("root").length); + assertEquals(4, orgConf.getQueues("root").length); } @Test public void testFormatSchedulerConf() throws Exception { CapacitySchedulerConfiguration newConf = getSchedulerConf(); assertNotNull(newConf); - assertEquals(3, newConf.getQueues("root").length); + assertEquals(4, newConf.getQueues("root").length); SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo(); Map nearEmptyCapacity = new HashMap<>(); @@ -234,7 +241,7 @@ public void testFormatSchedulerConf() throws Exception { .put(ClientResponse.class); newConf = getSchedulerConf(); assertNotNull(newConf); - assertEquals(4, newConf.getQueues("root").length); + assertEquals(5, newConf.getQueues("root").length); // Format the scheduler config and validate root.formattest is not present response = r.path("ws").path("v1").path("cluster") @@ -243,7 +250,7 @@ public void testFormatSchedulerConf() throws Exception { .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); assertEquals(Status.OK.getStatusCode(), response.getStatus()); newConf = getSchedulerConf(); - assertEquals(3, newConf.getQueues("root").length); + assertEquals(4, newConf.getQueues("root").length); } private long getConfigVersion() throws Exception { @@ -269,7 +276,7 @@ public void testSchedulerConfigVersion() throws Exception { public void testAddNestedQueue() throws Exception { CapacitySchedulerConfiguration orgConf = getSchedulerConf(); assertNotNull(orgConf); - assertEquals(3, orgConf.getQueues("root").length); + assertEquals(4, orgConf.getQueues("root").length); WebResource r = resource(); @@ -304,7 +311,7 @@ public void testAddNestedQueue() throws Exception { assertEquals(Status.OK.getStatusCode(), response.getStatus()); CapacitySchedulerConfiguration newCSConf = ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); - assertEquals(4, newCSConf.getQueues("root").length); + assertEquals(5, newCSConf.getQueues("root").length); assertEquals(2, newCSConf.getQueues("root.d").length); assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.d.d1")), 0.01f); @@ -313,7 +320,7 @@ public void testAddNestedQueue() throws Exception { CapacitySchedulerConfiguration newConf = getSchedulerConf(); assertNotNull(newConf); - assertEquals(4, newConf.getQueues("root").length); + assertEquals(5, newConf.getQueues("root").length); } @Test @@ -343,7 +350,7 @@ public void testAddWithUpdate() throws Exception { assertEquals(Status.OK.getStatusCode(), response.getStatus()); CapacitySchedulerConfiguration newCSConf = ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); - assertEquals(4, newCSConf.getQueues("root").length); + assertEquals(5, newCSConf.getQueues("root").length); assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.d")), 0.01f); assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.b")), 0.01f); } @@ -504,6 +511,46 @@ public void testStopWithRemoveQueue() throws Exception { assertEquals("a1", newCSConf.getQueues("root.a")[0]); } + @Test + public void testRemoveQueueWhichHasQueueMapping() throws Exception { + WebResource r = resource(); + + ClientResponse response; + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Validate Queue 'mappedqueue' exists before deletion + assertNotNull("Failed to setup CapacityScheduler Configuration", + cs.getQueue("mappedqueue")); + + // Set state of queue 'mappedqueue' to STOPPED. + SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo(); + Map stoppedParam = new HashMap<>(); + stoppedParam.put(CapacitySchedulerConfiguration.STATE, QueueState.STOPPED.toString()); + QueueConfigInfo stoppedInfo = new QueueConfigInfo("root.mappedqueue", stoppedParam); + updateInfo.getUpdateQueueInfo().add(stoppedInfo); + + // Remove queue 'mappedqueue' using update scheduler-conf + updateInfo.getRemoveQueueInfo().add("root.mappedqueue"); + response = r.path("ws").path("v1").path("cluster").path("scheduler-conf") + .queryParam("user.name", userName).accept(MediaType.APPLICATION_JSON) + .entity(YarnWebServiceUtils.toJson(updateInfo, SchedConfUpdateInfo.class), + MediaType.APPLICATION_JSON).put(ClientResponse.class); + String responseText = response.getEntity(String.class); + + // Queue 'mappedqueue' deletion will fail as there is queue mapping present + assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + assertTrue(responseText.contains( + "Failed to re-init queues : " + "org.apache.hadoop.yarn.exceptions.YarnException:" + + " Path root 'mappedqueue' does not exist. Path 'mappedqueue' is invalid")); + + // Validate queue 'mappedqueue' exists after above failure + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + assertEquals(4, newCSConf.getQueues("root").length); + assertNotNull("CapacityScheduler Configuration is corrupt", + cs.getQueue("mappedqueue")); + } + @Test public void testStopWithConvertLeafToParentQueue() throws Exception { WebResource r = resource(); @@ -558,7 +605,7 @@ public void testRemoveParentQueue() throws Exception { assertEquals(Status.OK.getStatusCode(), response.getStatus()); CapacitySchedulerConfiguration newCSConf = ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); - assertEquals(2, newCSConf.getQueues("root").length); + assertEquals(3, newCSConf.getQueues("root").length); assertNull(newCSConf.getQueues("root.c")); } @@ -589,7 +636,7 @@ public void testRemoveParentQueueWithCapacity() throws Exception { assertEquals(Status.OK.getStatusCode(), response.getStatus()); CapacitySchedulerConfiguration newCSConf = ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); - assertEquals(2, newCSConf.getQueues("root").length); + assertEquals(3, newCSConf.getQueues("root").length); assertEquals(100.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.b")), 0.01f); } @@ -621,7 +668,7 @@ public void testRemoveMultipleQueues() throws Exception { assertEquals(Status.OK.getStatusCode(), response.getStatus()); CapacitySchedulerConfiguration newCSConf = ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); - assertEquals(1, newCSConf.getQueues("root").length); + assertEquals(2, newCSConf.getQueues("root").length); } private void stopQueue(String... queuePaths) throws Exception {