diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 283d5678b8..b77a90a3d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -344,7 +344,7 @@ void setChildQueues(Collection childQueues) throws IOException { if (Math.abs(childrenPctSum) > PRECISION) { // It is wrong when percent sum != {0, 1} throw new IOException( - "Illegal" + " capacity sum of " + childrenPctSum + "Illegal capacity sum of " + childrenPctSum + " for children of queue " + getQueueName() + " for label=" + nodeLabel + ". It should be either 0 or 1.0"); } else{ @@ -357,7 +357,7 @@ void setChildQueues(Collection childQueues) throws IOException { if ((Math.abs(queueCapacities.getCapacity(nodeLabel)) > PRECISION) && (!allowZeroCapacitySum)) { throw new IOException( - "Illegal" + " capacity sum of " + childrenPctSum + "Illegal capacity sum of " + childrenPctSum + " for children of queue " + getQueueName() + " for label=" + nodeLabel + ". It is set to 0, but parent percent != 0, and " @@ -372,7 +372,7 @@ void setChildQueues(Collection childQueues) throws IOException { queueCapacities.getCapacity(nodeLabel)) <= 0f && !allowZeroCapacitySum) { throw new IOException( - "Illegal" + " capacity sum of " + childrenPctSum + "Illegal capacity sum of " + childrenPctSum + " for children of queue " + getQueueName() + " for label=" + nodeLabel + ". queue=" + getQueueName() + " has zero capacity, but child" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueuePath.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueuePath.java index 37cfa2ef73..440742b908 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueuePath.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueuePath.java @@ -61,12 +61,13 @@ public QueuePath(String fullPath) { } /** - * Concatenate queue path parts into one queue path string. - * @param parts Parts of the full queue pathAutoCreatedQueueTemplate - * @return full path of the given queue parts + * Constructor to create Queue path from queue names. + * The provided queue names will be concatenated by dots, giving a full queue path. + * @param parts Parts of queue path + * @return QueuePath object */ - public static String concatenatePath(String... parts) { - return String.join(DOT, parts); + public static QueuePath createFromQueues(String... parts) { + return new QueuePath(String.join(DOT, parts)); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 314b031208..041b37c616 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -2656,8 +2656,7 @@ public Response formatSchedulerConfiguration(@Context HttpServletRequest hsr) initForWritableEndpoints(callerUGI, true); ResourceScheduler scheduler = rm.getResourceScheduler(); - if (scheduler instanceof MutableConfScheduler - && ((MutableConfScheduler) scheduler).isConfigurationMutable()) { + if (isConfigurationMutable(scheduler)) { try { MutableConfigurationProvider mutableConfigurationProvider = ((MutableConfScheduler) scheduler).getMutableConfProvider(); @@ -2696,8 +2695,7 @@ public synchronized Response validateAndGetSchedulerConfiguration( UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); initForWritableEndpoints(callerUGI, true); ResourceScheduler scheduler = rm.getResourceScheduler(); - if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler) - scheduler).isConfigurationMutable()) { + if (isConfigurationMutable(scheduler)) { try { MutableConfigurationProvider mutableConfigurationProvider = ((MutableConfScheduler) scheduler).getMutableConfProvider(); @@ -2746,51 +2744,61 @@ public synchronized Response validateAndGetSchedulerConfiguration( public synchronized Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo, @Context HttpServletRequest hsr) throws AuthorizationException, InterruptedException { - UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); initForWritableEndpoints(callerUGI, true); ResourceScheduler scheduler = rm.getResourceScheduler(); - if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler) - scheduler).isConfigurationMutable()) { + if (isConfigurationMutable(scheduler)) { try { - callerUGI.doAs(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - MutableConfigurationProvider provider = ((MutableConfScheduler) - scheduler).getMutableConfProvider(); - if (!provider.getAclMutationPolicy().isMutationAllowed(callerUGI, - mutationInfo)) { - throw new org.apache.hadoop.security.AccessControlException("User" - + " is not admin of all modified queues."); - } - LogMutation logMutation = provider.logAndApplyMutation(callerUGI, - mutationInfo); - try { - rm.getRMContext().getRMAdminService().refreshQueues(); - } catch (IOException | YarnException e) { - provider.confirmPendingMutation(logMutation, false); - throw e; - } - provider.confirmPendingMutation(logMutation, true); - return null; - } + callerUGI.doAs((PrivilegedExceptionAction) () -> { + MutableConfigurationProvider provider = ((MutableConfScheduler) + scheduler).getMutableConfProvider(); + LogMutation logMutation = applyMutation(provider, callerUGI, mutationInfo); + return refreshQueues(provider, logMutation); }); } catch (IOException e) { LOG.error("Exception thrown when modifying configuration.", e); return Response.status(Status.BAD_REQUEST).entity(e.getMessage()) .build(); } - return Response.status(Status.OK).entity("Configuration change " + - "successfully applied.").build(); + return Response.status(Status.OK).entity("Configuration change successfully applied.") + .build(); } else { return Response.status(Status.BAD_REQUEST) - .entity("Configuration change only supported by " + - "MutableConfScheduler.") + .entity(String.format("Configuration change only supported by " + + "%s.", MutableConfScheduler.class.getSimpleName())) .build(); } } + private Void refreshQueues(MutableConfigurationProvider provider, LogMutation logMutation) + throws Exception { + try { + rm.getRMContext().getRMAdminService().refreshQueues(); + } catch (IOException | YarnException e) { + provider.confirmPendingMutation(logMutation, false); + throw e; + } + provider.confirmPendingMutation(logMutation, true); + return null; + } + + private LogMutation applyMutation(MutableConfigurationProvider provider, + UserGroupInformation callerUGI, SchedConfUpdateInfo mutationInfo) throws Exception { + if (!provider.getAclMutationPolicy().isMutationAllowed(callerUGI, + mutationInfo)) { + throw new org.apache.hadoop.security.AccessControlException("User" + + " is not admin of all modified queues."); + } + return provider.logAndApplyMutation(callerUGI, + mutationInfo); + } + + private boolean isConfigurationMutable(ResourceScheduler scheduler) { + return scheduler instanceof MutableConfScheduler && ((MutableConfScheduler) + scheduler).isConfigurationMutable(); + } + @GET @Path(RMWSConsts.SCHEDULER_CONF) @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, @@ -2803,8 +2811,7 @@ public Response getSchedulerConfiguration(@Context HttpServletRequest hsr) initForWritableEndpoints(callerUGI, true); ResourceScheduler scheduler = rm.getResourceScheduler(); - if (scheduler instanceof MutableConfScheduler - && ((MutableConfScheduler) scheduler).isConfigurationMutable()) { + if (isConfigurationMutable(scheduler)) { MutableConfigurationProvider mutableConfigurationProvider = ((MutableConfScheduler) scheduler).getMutableConfProvider(); // We load the cached configuration from configuration store, @@ -2835,8 +2842,7 @@ public Response getSchedulerConfigurationVersion(@Context initForWritableEndpoints(callerUGI, true); ResourceScheduler scheduler = rm.getResourceScheduler(); - if (scheduler instanceof MutableConfScheduler - && ((MutableConfScheduler) scheduler).isConfigurationMutable()) { + if (isConfigurationMutable(scheduler)) { MutableConfigurationProvider mutableConfigurationProvider = ((MutableConfScheduler) scheduler).getMutableConfProvider(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java index 673fbbe2ec..d5d534395b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java @@ -1098,4 +1098,4 @@ private RMWebServices prepareWebServiceForValidation( return webService; } -} +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java index 15599863d0..675e79243f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java @@ -22,10 +22,13 @@ import com.google.inject.servlet.ServletModule; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.core.util.MultivaluedMapImpl; import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; import com.sun.jersey.test.framework.WebAppDescriptor; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.JettyUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Sets; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -34,12 +37,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.GuiceServletConfig; import org.apache.hadoop.yarn.webapp.JerseyTestBase; import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; -import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -58,10 +62,15 @@ import java.util.HashMap; import java.util.Map; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_CAPACITY; +import static org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils.toJson; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNotNull; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY; +import static org.junit.Assert.assertTrue; /** * Test scheduler configuration mutation via REST API. @@ -74,7 +83,11 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase { "test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE); private static final File OLD_CONF_FILE = new File(new File("target", "test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE + ".tmp"); - + private static final String LABEL_1 = "label1"; + public static final QueuePath ROOT = new QueuePath("root"); + public static final QueuePath ROOT_A = new QueuePath("root", "a"); + public static final QueuePath ROOT_A_A1 = QueuePath.createFromQueues("root", "a", "a1"); + public static final QueuePath ROOT_A_A2 = QueuePath.createFromQueues("root", "a", "a2"); private static MockRM rm; private static String userName; private static CapacitySchedulerConfiguration csConf; @@ -216,7 +229,7 @@ public void testFormatSchedulerConf() throws Exception { ClientResponse response = r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, + .entity(toJson(updateInfo, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); newConf = getSchedulerConf(); @@ -284,7 +297,7 @@ public void testAddNestedQueue() throws Exception { r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, + .entity(toJson(updateInfo, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); @@ -323,7 +336,7 @@ public void testAddWithUpdate() throws Exception { r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, + .entity(toJson(updateInfo, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); @@ -350,7 +363,7 @@ public void testUnsetParentQueueOrderingPolicy() throws Exception { response = r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo1, + .entity(toJson(updateInfo1, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); assertEquals(Status.OK.getStatusCode(), response.getStatus()); @@ -371,7 +384,7 @@ public void testUnsetParentQueueOrderingPolicy() throws Exception { response = r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo2, + .entity(toJson(updateInfo2, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); @@ -401,7 +414,7 @@ public void testUnsetLeafQueueOrderingPolicy() throws Exception { response = r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo1, + .entity(toJson(updateInfo1, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); assertEquals(Status.OK.getStatusCode(), response.getStatus()); @@ -419,7 +432,7 @@ public void testUnsetLeafQueueOrderingPolicy() throws Exception { response = r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo2, + .entity(toJson(updateInfo2, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); assertEquals(Status.OK.getStatusCode(), response.getStatus()); @@ -448,7 +461,7 @@ public void testRemoveQueue() throws Exception { r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, + .entity(toJson(updateInfo, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); @@ -480,7 +493,7 @@ public void testStopWithRemoveQueue() throws Exception { response = r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, + .entity(toJson(updateInfo, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); @@ -513,7 +526,7 @@ public void testStopWithConvertLeafToParentQueue() throws Exception { response = r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, + .entity(toJson(updateInfo, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); @@ -538,7 +551,7 @@ public void testRemoveParentQueue() throws Exception { r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, + .entity(toJson(updateInfo, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); @@ -569,7 +582,7 @@ public void testRemoveParentQueueWithCapacity() throws Exception { r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, + .entity(toJson(updateInfo, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); @@ -601,7 +614,7 @@ public void testRemoveMultipleQueues() throws Exception { r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, + .entity(toJson(updateInfo, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); @@ -629,7 +642,7 @@ private void stopQueue(String... queuePaths) throws Exception { r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, + .entity(toJson(updateInfo, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); assertEquals(Status.OK.getStatusCode(), response.getStatus()); @@ -664,7 +677,7 @@ public void testUpdateQueue() throws Exception { r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, + .entity(toJson(updateInfo, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); LOG.debug("Response headers: " + response.getHeaders()); @@ -683,7 +696,7 @@ public void testUpdateQueue() throws Exception { r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, + .entity(toJson(updateInfo, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); assertEquals(Status.OK.getStatusCode(), response.getStatus()); @@ -713,7 +726,7 @@ public void testUpdateQueueCapacity() throws Exception { r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, + .entity(toJson(updateInfo, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); assertEquals(Status.OK.getStatusCode(), response.getStatus()); @@ -738,7 +751,7 @@ public void testGlobalConfChange() throws Exception { r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, + .entity(toJson(updateInfo, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); assertEquals(Status.OK.getStatusCode(), response.getStatus()); @@ -753,7 +766,7 @@ public void testGlobalConfChange() throws Exception { r.path("ws").path("v1").path("cluster") .path("scheduler-conf").queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, + .entity(toJson(updateInfo, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .put(ClientResponse.class); assertEquals(Status.OK.getStatusCode(), response.getStatus()); @@ -764,6 +777,220 @@ public void testGlobalConfChange() throws Exception { newCSConf.getMaximumSystemApplications()); } + @Test + public void testNodeLabelRemovalResidualConfigsAreCleared() throws Exception { + WebResource r = resource(); + ClientResponse response; + + // 1. Create Node Label: label1 + NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo(); + nodeLabelsInfo.getNodeLabelsInfo().add(new NodeLabelInfo(LABEL_1)); + WebResource addNodeLabelsResource = r.path("ws").path("v1").path("cluster") + .path("add-node-labels"); + WebResource getNodeLabelsResource = r.path("ws").path("v1").path("cluster") + .path("get-node-labels"); + WebResource removeNodeLabelsResource = r.path("ws").path("v1").path("cluster") + .path("remove-node-labels"); + WebResource schedulerConfResource = r.path("ws").path("v1").path("cluster") + .path(RMWSConsts.SCHEDULER_CONF); + response = + addNodeLabelsResource.queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(logAndReturnJson(addNodeLabelsResource, + toJson(nodeLabelsInfo, NodeLabelsInfo.class)), + MediaType.APPLICATION_JSON) + .post(ClientResponse.class); + + // 2. Verify new Node Label + response = + getNodeLabelsResource.queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, + response.getType().toString()); + nodeLabelsInfo = response.getEntity(NodeLabelsInfo.class); + assertEquals(1, nodeLabelsInfo.getNodeLabels().size()); + for (NodeLabelInfo nl : nodeLabelsInfo.getNodeLabelsInfo()) { + assertEquals(LABEL_1, nl.getName()); + assertTrue(nl.getExclusivity()); + } + + // 3. Assign 'label1' to root.a + SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo(); + Map updateForRoot = new HashMap<>(); + updateForRoot.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, "*"); + QueueConfigInfo rootUpdateInfo = new QueueConfigInfo(ROOT.getFullPath(), updateForRoot); + + Map updateForRootA = new HashMap<>(); + updateForRootA.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, LABEL_1); + QueueConfigInfo rootAUpdateInfo = new QueueConfigInfo(ROOT_A.getFullPath(), updateForRootA); + + updateInfo.getUpdateQueueInfo().add(rootUpdateInfo); + updateInfo.getUpdateQueueInfo().add(rootAUpdateInfo); + + response = + schedulerConfResource + .queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(logAndReturnJson(schedulerConfResource, toJson(updateInfo, + SchedConfUpdateInfo.class)), MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + assertEquals(Sets.newHashSet("*"), + cs.getConfiguration().getAccessibleNodeLabels(ROOT.getFullPath())); + assertEquals(Sets.newHashSet(LABEL_1), + cs.getConfiguration().getAccessibleNodeLabels(ROOT_A.getFullPath())); + + // 4. Set partition capacities to queues as below + updateInfo = new SchedConfUpdateInfo(); + updateForRoot = new HashMap<>(); + updateForRoot.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "100"); + updateForRoot.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "100"); + rootUpdateInfo = new QueueConfigInfo(ROOT.getFullPath(), updateForRoot); + + updateForRootA = new HashMap<>(); + updateForRootA.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "100"); + updateForRootA.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "100"); + rootAUpdateInfo = new QueueConfigInfo(ROOT_A.getFullPath(), updateForRootA); + + // Avoid the following exception by adding some capacities to root.a.a1 and root.a.a2 to label1 + // Illegal capacity sum of 0.0 for children of queue a for label=label1. + // It is set to 0, but parent percent != 0, and doesn't allow children capacity to set to 0 + Map updateForRootA_A1 = new HashMap<>(); + updateForRootA_A1.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "20"); + updateForRootA_A1.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "20"); + QueueConfigInfo rootA_A1UpdateInfo = new QueueConfigInfo(ROOT_A_A1.getFullPath(), + updateForRootA_A1); + + Map updateForRootA_A2 = new HashMap<>(); + updateForRootA_A2.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "80"); + updateForRootA_A2.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "80"); + QueueConfigInfo rootA_A2UpdateInfo = new QueueConfigInfo(ROOT_A_A2.getFullPath(), + updateForRootA_A2); + + + updateInfo.getUpdateQueueInfo().add(rootUpdateInfo); + updateInfo.getUpdateQueueInfo().add(rootAUpdateInfo); + updateInfo.getUpdateQueueInfo().add(rootA_A1UpdateInfo); + updateInfo.getUpdateQueueInfo().add(rootA_A2UpdateInfo); + + response = + schedulerConfResource + .queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(logAndReturnJson(schedulerConfResource, toJson(updateInfo, + SchedConfUpdateInfo.class)), MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + assertEquals(100.0, cs.getConfiguration().getLabeledQueueCapacity(ROOT, LABEL_1), 0.001f); + assertEquals(100.0, cs.getConfiguration().getLabeledQueueMaximumCapacity(ROOT, LABEL_1), + 0.001f); + assertEquals(100.0, cs.getConfiguration().getLabeledQueueCapacity(ROOT_A, LABEL_1), 0.001f); + assertEquals(100.0, cs.getConfiguration().getLabeledQueueMaximumCapacity(ROOT_A, LABEL_1), + 0.001f); + assertEquals(20.0, cs.getConfiguration().getLabeledQueueCapacity(ROOT_A_A1, LABEL_1), 0.001f); + assertEquals(20.0, cs.getConfiguration().getLabeledQueueMaximumCapacity(ROOT_A_A1, LABEL_1), + 0.001f); + assertEquals(80.0, cs.getConfiguration().getLabeledQueueCapacity(ROOT_A_A2, LABEL_1), 0.001f); + assertEquals(80.0, cs.getConfiguration().getLabeledQueueMaximumCapacity(ROOT_A_A2, LABEL_1), + 0.001f); + + //5. De-assign node label: "label1" + Remove residual properties + updateInfo = new SchedConfUpdateInfo(); + updateForRoot = new HashMap<>(); + updateForRoot.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, "*"); + updateForRoot.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), ""); + updateForRoot.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), ""); + rootUpdateInfo = new QueueConfigInfo(ROOT.getFullPath(), updateForRoot); + + updateForRootA = new HashMap<>(); + updateForRootA.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, ""); + updateForRootA.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), ""); + updateForRootA.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), ""); + rootAUpdateInfo = new QueueConfigInfo(ROOT_A.getFullPath(), updateForRootA); + + updateForRootA_A1 = new HashMap<>(); + updateForRootA_A1.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, ""); + updateForRootA_A1.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), ""); + updateForRootA_A1.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), ""); + rootA_A1UpdateInfo = new QueueConfigInfo(ROOT_A_A1.getFullPath(), updateForRootA_A1); + + updateForRootA_A2 = new HashMap<>(); + updateForRootA_A2.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, ""); + updateForRootA_A2.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), ""); + updateForRootA_A2.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), ""); + rootA_A2UpdateInfo = new QueueConfigInfo(ROOT_A_A2.getFullPath(), updateForRootA_A2); + + updateInfo.getUpdateQueueInfo().add(rootUpdateInfo); + updateInfo.getUpdateQueueInfo().add(rootAUpdateInfo); + updateInfo.getUpdateQueueInfo().add(rootA_A1UpdateInfo); + updateInfo.getUpdateQueueInfo().add(rootA_A2UpdateInfo); + + response = + schedulerConfResource + .queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(logAndReturnJson(schedulerConfResource, toJson(updateInfo, + SchedConfUpdateInfo.class)), MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + assertEquals(Sets.newHashSet("*"), + cs.getConfiguration().getAccessibleNodeLabels(ROOT.getFullPath())); + assertNull(cs.getConfiguration().getAccessibleNodeLabels(ROOT_A.getFullPath())); + + //6. Remove node label 'label1' + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("labels", LABEL_1); + response = + removeNodeLabelsResource + .queryParam("user.name", userName) + .queryParams(params) + .accept(MediaType.APPLICATION_JSON) + .post(ClientResponse.class); + + // Verify + response = + getNodeLabelsResource.queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, + response.getType().toString()); + nodeLabelsInfo = response.getEntity(NodeLabelsInfo.class); + assertEquals(0, nodeLabelsInfo.getNodeLabels().size()); + + //6. Check residual configs + assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT, LABEL_1, CAPACITY)); + assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT, LABEL_1, MAXIMUM_CAPACITY)); + assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A, LABEL_1, CAPACITY)); + assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A, LABEL_1, MAXIMUM_CAPACITY)); + assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A_A1, LABEL_1, CAPACITY)); + assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A_A1, LABEL_1, MAXIMUM_CAPACITY)); + assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A_A2, LABEL_1, CAPACITY)); + assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A_A2, LABEL_1, MAXIMUM_CAPACITY)); + } + + private String getConfValueForQueueAndLabelAndType(CapacityScheduler cs, + QueuePath queuePath, String label, String type) { + return cs.getConfiguration().get( + CapacitySchedulerConfiguration.getNodeLabelPrefix( + queuePath.getFullPath(), label) + type); + } + + private Object logAndReturnJson(WebResource ws, String json) { + LOG.info("Sending to web resource: {}, json: {}", ws, json); + return json; + } + + private String getAccessibleNodeLabelsCapacityPropertyName(String label) { + return String.format("%s.%s.%s", ACCESSIBLE_NODE_LABELS, label, CAPACITY); + } + + private String getAccessibleNodeLabelsMaxCapacityPropertyName(String label) { + return String.format("%s.%s.%s", ACCESSIBLE_NODE_LABELS, label, MAXIMUM_CAPACITY); + } + @Test public void testValidateWithClusterMaxAllocation() throws Exception { WebResource r = resource(); @@ -784,7 +1011,7 @@ public void testValidateWithClusterMaxAllocation() throws Exception { .path(RMWSConsts.SCHEDULER_CONF_VALIDATE) .queryParam("user.name", userName) .accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, + .entity(toJson(updateInfo, SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) .post(ClientResponse.class); assertEquals(Status.OK.getStatusCode(), response.getStatus());