YARN-10287.Update scheduler-conf corrupts the CS configuration when removing queue which is referred in queue mapping (#4515)

* YARN-10287.Update scheduler-conf corrupts the CS configuration when removing queue which is referred in queue mapping

Co-authored-by: Ashutosh Gupta <ashugpt@amazon.com>
This commit is contained in:
Ashutosh Gupta 2022-07-02 17:28:56 +01:00 committed by GitHub
parent 3cad632709
commit 57cbde9abf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 13 deletions

View File

@ -455,6 +455,7 @@ public void reinitialize(Configuration newConf, RMContext rmContext,
reinitializeQueues(this.conf); reinitializeQueues(this.conf);
} catch (Throwable t) { } catch (Throwable t) {
this.conf = oldConf; this.conf = oldConf;
reinitializeQueues(this.conf);
refreshMaximumAllocation( refreshMaximumAllocation(
ResourceUtils.fetchMaximumAllocationFromConfig(this.conf)); ResourceUtils.fetchMaximumAllocationFromConfig(this.conf));
throw new IOException("Failed to re-init queues : " + t.getMessage(), throw new IOException("Failed to re-init queues : " + t.getMessage(),

View File

@ -44,6 +44,8 @@
import org.apache.hadoop.yarn.webapp.JerseyTestBase; import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
@ -69,8 +71,8 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY;
/** /**
* Test scheduler configuration mutation via REST API. * Test scheduler configuration mutation via REST API.
@ -145,7 +147,7 @@ public void setUp() throws Exception {
private static void setupQueueConfiguration( private static void setupQueueConfiguration(
CapacitySchedulerConfiguration config) { CapacitySchedulerConfiguration config) {
config.setQueues(CapacitySchedulerConfiguration.ROOT, config.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[]{"a", "b", "c"}); new String[]{"a", "b", "c", "mappedqueue"});
final String a = CapacitySchedulerConfiguration.ROOT + ".a"; final String a = CapacitySchedulerConfiguration.ROOT + ".a";
config.setCapacity(a, 25f); config.setCapacity(a, 25f);
@ -166,6 +168,11 @@ private static void setupQueueConfiguration(
final String c1 = c + ".c1"; final String c1 = c + ".c1";
config.setQueues(c, new String[] {"c1"}); config.setQueues(c, new String[] {"c1"});
config.setCapacity(c1, 0f); config.setCapacity(c1, 0f);
final String d = CapacitySchedulerConfiguration.ROOT + ".d";
config.setCapacity(d, 0f);
config.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
"g:hadoop:mappedqueue");
} }
public TestRMWebServicesConfigurationMutation() { public TestRMWebServicesConfigurationMutation() {
@ -201,14 +208,14 @@ private CapacitySchedulerConfiguration getSchedulerConf()
public void testGetSchedulerConf() throws Exception { public void testGetSchedulerConf() throws Exception {
CapacitySchedulerConfiguration orgConf = getSchedulerConf(); CapacitySchedulerConfiguration orgConf = getSchedulerConf();
assertNotNull(orgConf); assertNotNull(orgConf);
assertEquals(3, orgConf.getQueues("root").length); assertEquals(4, orgConf.getQueues("root").length);
} }
@Test @Test
public void testFormatSchedulerConf() throws Exception { public void testFormatSchedulerConf() throws Exception {
CapacitySchedulerConfiguration newConf = getSchedulerConf(); CapacitySchedulerConfiguration newConf = getSchedulerConf();
assertNotNull(newConf); assertNotNull(newConf);
assertEquals(3, newConf.getQueues("root").length); assertEquals(4, newConf.getQueues("root").length);
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo(); SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
Map<String, String> nearEmptyCapacity = new HashMap<>(); Map<String, String> nearEmptyCapacity = new HashMap<>();
@ -234,7 +241,7 @@ public void testFormatSchedulerConf() throws Exception {
.put(ClientResponse.class); .put(ClientResponse.class);
newConf = getSchedulerConf(); newConf = getSchedulerConf();
assertNotNull(newConf); assertNotNull(newConf);
assertEquals(4, newConf.getQueues("root").length); assertEquals(5, newConf.getQueues("root").length);
// Format the scheduler config and validate root.formattest is not present // Format the scheduler config and validate root.formattest is not present
response = r.path("ws").path("v1").path("cluster") response = r.path("ws").path("v1").path("cluster")
@ -243,7 +250,7 @@ public void testFormatSchedulerConf() throws Exception {
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals(Status.OK.getStatusCode(), response.getStatus());
newConf = getSchedulerConf(); newConf = getSchedulerConf();
assertEquals(3, newConf.getQueues("root").length); assertEquals(4, newConf.getQueues("root").length);
} }
private long getConfigVersion() throws Exception { private long getConfigVersion() throws Exception {
@ -269,7 +276,7 @@ public void testSchedulerConfigVersion() throws Exception {
public void testAddNestedQueue() throws Exception { public void testAddNestedQueue() throws Exception {
CapacitySchedulerConfiguration orgConf = getSchedulerConf(); CapacitySchedulerConfiguration orgConf = getSchedulerConf();
assertNotNull(orgConf); assertNotNull(orgConf);
assertEquals(3, orgConf.getQueues("root").length); assertEquals(4, orgConf.getQueues("root").length);
WebResource r = resource(); WebResource r = resource();
@ -304,7 +311,7 @@ public void testAddNestedQueue() throws Exception {
assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration newCSConf = CapacitySchedulerConfiguration newCSConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
assertEquals(4, newCSConf.getQueues("root").length); assertEquals(5, newCSConf.getQueues("root").length);
assertEquals(2, newCSConf.getQueues("root.d").length); assertEquals(2, newCSConf.getQueues("root.d").length);
assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.d.d1")), assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.d.d1")),
0.01f); 0.01f);
@ -313,7 +320,7 @@ public void testAddNestedQueue() throws Exception {
CapacitySchedulerConfiguration newConf = getSchedulerConf(); CapacitySchedulerConfiguration newConf = getSchedulerConf();
assertNotNull(newConf); assertNotNull(newConf);
assertEquals(4, newConf.getQueues("root").length); assertEquals(5, newConf.getQueues("root").length);
} }
@Test @Test
@ -343,7 +350,7 @@ public void testAddWithUpdate() throws Exception {
assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration newCSConf = CapacitySchedulerConfiguration newCSConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
assertEquals(4, newCSConf.getQueues("root").length); assertEquals(5, newCSConf.getQueues("root").length);
assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.d")), 0.01f); assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.d")), 0.01f);
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.b")), 0.01f); assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.b")), 0.01f);
} }
@ -504,6 +511,46 @@ public void testStopWithRemoveQueue() throws Exception {
assertEquals("a1", newCSConf.getQueues("root.a")[0]); assertEquals("a1", newCSConf.getQueues("root.a")[0]);
} }
@Test
public void testRemoveQueueWhichHasQueueMapping() throws Exception {
WebResource r = resource();
ClientResponse response;
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
// Validate Queue 'mappedqueue' exists before deletion
assertNotNull("Failed to setup CapacityScheduler Configuration",
cs.getQueue("mappedqueue"));
// Set state of queue 'mappedqueue' to STOPPED.
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
Map<String, String> stoppedParam = new HashMap<>();
stoppedParam.put(CapacitySchedulerConfiguration.STATE, QueueState.STOPPED.toString());
QueueConfigInfo stoppedInfo = new QueueConfigInfo("root.mappedqueue", stoppedParam);
updateInfo.getUpdateQueueInfo().add(stoppedInfo);
// Remove queue 'mappedqueue' using update scheduler-conf
updateInfo.getRemoveQueueInfo().add("root.mappedqueue");
response = r.path("ws").path("v1").path("cluster").path("scheduler-conf")
.queryParam("user.name", userName).accept(MediaType.APPLICATION_JSON)
.entity(YarnWebServiceUtils.toJson(updateInfo, SchedConfUpdateInfo.class),
MediaType.APPLICATION_JSON).put(ClientResponse.class);
String responseText = response.getEntity(String.class);
// Queue 'mappedqueue' deletion will fail as there is queue mapping present
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
assertTrue(responseText.contains(
"Failed to re-init queues : " + "org.apache.hadoop.yarn.exceptions.YarnException:"
+ " Path root 'mappedqueue' does not exist. Path 'mappedqueue' is invalid"));
// Validate queue 'mappedqueue' exists after above failure
CapacitySchedulerConfiguration newCSConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
assertEquals(4, newCSConf.getQueues("root").length);
assertNotNull("CapacityScheduler Configuration is corrupt",
cs.getQueue("mappedqueue"));
}
@Test @Test
public void testStopWithConvertLeafToParentQueue() throws Exception { public void testStopWithConvertLeafToParentQueue() throws Exception {
WebResource r = resource(); WebResource r = resource();
@ -558,7 +605,7 @@ public void testRemoveParentQueue() throws Exception {
assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration newCSConf = CapacitySchedulerConfiguration newCSConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
assertEquals(2, newCSConf.getQueues("root").length); assertEquals(3, newCSConf.getQueues("root").length);
assertNull(newCSConf.getQueues("root.c")); assertNull(newCSConf.getQueues("root.c"));
} }
@ -589,7 +636,7 @@ public void testRemoveParentQueueWithCapacity() throws Exception {
assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration newCSConf = CapacitySchedulerConfiguration newCSConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
assertEquals(2, newCSConf.getQueues("root").length); assertEquals(3, newCSConf.getQueues("root").length);
assertEquals(100.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.b")), assertEquals(100.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.b")),
0.01f); 0.01f);
} }
@ -621,7 +668,7 @@ public void testRemoveMultipleQueues() throws Exception {
assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration newCSConf = CapacitySchedulerConfiguration newCSConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
assertEquals(1, newCSConf.getQueues("root").length); assertEquals(2, newCSConf.getQueues("root").length);
} }
private void stopQueue(String... queuePaths) throws Exception { private void stopQueue(String... queuePaths) throws Exception {