YARN-11048. Add tests that shows how to delete config values with Mutation API (#3799). Contributed by Szilard Nemeth
This commit is contained in:
parent
bdec546671
commit
a9a5830f31
@ -344,7 +344,7 @@ void setChildQueues(Collection<CSQueue> childQueues) throws IOException {
|
||||
if (Math.abs(childrenPctSum) > PRECISION) {
|
||||
// It is wrong when percent sum != {0, 1}
|
||||
throw new IOException(
|
||||
"Illegal" + " capacity sum of " + childrenPctSum
|
||||
"Illegal capacity sum of " + childrenPctSum
|
||||
+ " for children of queue " + getQueueName() + " for label="
|
||||
+ nodeLabel + ". It should be either 0 or 1.0");
|
||||
} else{
|
||||
@ -357,7 +357,7 @@ void setChildQueues(Collection<CSQueue> childQueues) throws IOException {
|
||||
if ((Math.abs(queueCapacities.getCapacity(nodeLabel))
|
||||
> PRECISION) && (!allowZeroCapacitySum)) {
|
||||
throw new IOException(
|
||||
"Illegal" + " capacity sum of " + childrenPctSum
|
||||
"Illegal capacity sum of " + childrenPctSum
|
||||
+ " for children of queue " + getQueueName()
|
||||
+ " for label=" + nodeLabel
|
||||
+ ". It is set to 0, but parent percent != 0, and "
|
||||
@ -372,7 +372,7 @@ void setChildQueues(Collection<CSQueue> childQueues) throws IOException {
|
||||
queueCapacities.getCapacity(nodeLabel)) <= 0f
|
||||
&& !allowZeroCapacitySum) {
|
||||
throw new IOException(
|
||||
"Illegal" + " capacity sum of " + childrenPctSum
|
||||
"Illegal capacity sum of " + childrenPctSum
|
||||
+ " for children of queue " + getQueueName() + " for label="
|
||||
+ nodeLabel + ". queue=" + getQueueName()
|
||||
+ " has zero capacity, but child"
|
||||
|
@ -61,12 +61,13 @@ public QueuePath(String fullPath) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Concatenate queue path parts into one queue path string.
|
||||
* @param parts Parts of the full queue pathAutoCreatedQueueTemplate
|
||||
* @return full path of the given queue parts
|
||||
* Constructor to create Queue path from queue names.
|
||||
* The provided queue names will be concatenated by dots, giving a full queue path.
|
||||
* @param parts Parts of queue path
|
||||
* @return QueuePath object
|
||||
*/
|
||||
public static String concatenatePath(String... parts) {
|
||||
return String.join(DOT, parts);
|
||||
public static QueuePath createFromQueues(String... parts) {
|
||||
return new QueuePath(String.join(DOT, parts));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2656,8 +2656,7 @@ public Response formatSchedulerConfiguration(@Context HttpServletRequest hsr)
|
||||
initForWritableEndpoints(callerUGI, true);
|
||||
|
||||
ResourceScheduler scheduler = rm.getResourceScheduler();
|
||||
if (scheduler instanceof MutableConfScheduler
|
||||
&& ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
|
||||
if (isConfigurationMutable(scheduler)) {
|
||||
try {
|
||||
MutableConfigurationProvider mutableConfigurationProvider =
|
||||
((MutableConfScheduler) scheduler).getMutableConfProvider();
|
||||
@ -2696,8 +2695,7 @@ public synchronized Response validateAndGetSchedulerConfiguration(
|
||||
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
|
||||
initForWritableEndpoints(callerUGI, true);
|
||||
ResourceScheduler scheduler = rm.getResourceScheduler();
|
||||
if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
|
||||
scheduler).isConfigurationMutable()) {
|
||||
if (isConfigurationMutable(scheduler)) {
|
||||
try {
|
||||
MutableConfigurationProvider mutableConfigurationProvider =
|
||||
((MutableConfScheduler) scheduler).getMutableConfProvider();
|
||||
@ -2746,51 +2744,61 @@ public synchronized Response validateAndGetSchedulerConfiguration(
|
||||
public synchronized Response updateSchedulerConfiguration(SchedConfUpdateInfo
|
||||
mutationInfo, @Context HttpServletRequest hsr)
|
||||
throws AuthorizationException, InterruptedException {
|
||||
|
||||
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
|
||||
initForWritableEndpoints(callerUGI, true);
|
||||
|
||||
ResourceScheduler scheduler = rm.getResourceScheduler();
|
||||
if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
|
||||
scheduler).isConfigurationMutable()) {
|
||||
if (isConfigurationMutable(scheduler)) {
|
||||
try {
|
||||
callerUGI.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
MutableConfigurationProvider provider = ((MutableConfScheduler)
|
||||
scheduler).getMutableConfProvider();
|
||||
if (!provider.getAclMutationPolicy().isMutationAllowed(callerUGI,
|
||||
mutationInfo)) {
|
||||
throw new org.apache.hadoop.security.AccessControlException("User"
|
||||
+ " is not admin of all modified queues.");
|
||||
}
|
||||
LogMutation logMutation = provider.logAndApplyMutation(callerUGI,
|
||||
mutationInfo);
|
||||
try {
|
||||
rm.getRMContext().getRMAdminService().refreshQueues();
|
||||
} catch (IOException | YarnException e) {
|
||||
provider.confirmPendingMutation(logMutation, false);
|
||||
throw e;
|
||||
}
|
||||
provider.confirmPendingMutation(logMutation, true);
|
||||
return null;
|
||||
}
|
||||
callerUGI.doAs((PrivilegedExceptionAction<Void>) () -> {
|
||||
MutableConfigurationProvider provider = ((MutableConfScheduler)
|
||||
scheduler).getMutableConfProvider();
|
||||
LogMutation logMutation = applyMutation(provider, callerUGI, mutationInfo);
|
||||
return refreshQueues(provider, logMutation);
|
||||
});
|
||||
} catch (IOException e) {
|
||||
LOG.error("Exception thrown when modifying configuration.", e);
|
||||
return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
|
||||
.build();
|
||||
}
|
||||
return Response.status(Status.OK).entity("Configuration change " +
|
||||
"successfully applied.").build();
|
||||
return Response.status(Status.OK).entity("Configuration change successfully applied.")
|
||||
.build();
|
||||
} else {
|
||||
return Response.status(Status.BAD_REQUEST)
|
||||
.entity("Configuration change only supported by " +
|
||||
"MutableConfScheduler.")
|
||||
.entity(String.format("Configuration change only supported by " +
|
||||
"%s.", MutableConfScheduler.class.getSimpleName()))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
private Void refreshQueues(MutableConfigurationProvider provider, LogMutation logMutation)
|
||||
throws Exception {
|
||||
try {
|
||||
rm.getRMContext().getRMAdminService().refreshQueues();
|
||||
} catch (IOException | YarnException e) {
|
||||
provider.confirmPendingMutation(logMutation, false);
|
||||
throw e;
|
||||
}
|
||||
provider.confirmPendingMutation(logMutation, true);
|
||||
return null;
|
||||
}
|
||||
|
||||
private LogMutation applyMutation(MutableConfigurationProvider provider,
|
||||
UserGroupInformation callerUGI, SchedConfUpdateInfo mutationInfo) throws Exception {
|
||||
if (!provider.getAclMutationPolicy().isMutationAllowed(callerUGI,
|
||||
mutationInfo)) {
|
||||
throw new org.apache.hadoop.security.AccessControlException("User"
|
||||
+ " is not admin of all modified queues.");
|
||||
}
|
||||
return provider.logAndApplyMutation(callerUGI,
|
||||
mutationInfo);
|
||||
}
|
||||
|
||||
private boolean isConfigurationMutable(ResourceScheduler scheduler) {
|
||||
return scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
|
||||
scheduler).isConfigurationMutable();
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path(RMWSConsts.SCHEDULER_CONF)
|
||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||
@ -2803,8 +2811,7 @@ public Response getSchedulerConfiguration(@Context HttpServletRequest hsr)
|
||||
initForWritableEndpoints(callerUGI, true);
|
||||
|
||||
ResourceScheduler scheduler = rm.getResourceScheduler();
|
||||
if (scheduler instanceof MutableConfScheduler
|
||||
&& ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
|
||||
if (isConfigurationMutable(scheduler)) {
|
||||
MutableConfigurationProvider mutableConfigurationProvider =
|
||||
((MutableConfScheduler) scheduler).getMutableConfProvider();
|
||||
// We load the cached configuration from configuration store,
|
||||
@ -2835,8 +2842,7 @@ public Response getSchedulerConfigurationVersion(@Context
|
||||
initForWritableEndpoints(callerUGI, true);
|
||||
|
||||
ResourceScheduler scheduler = rm.getResourceScheduler();
|
||||
if (scheduler instanceof MutableConfScheduler
|
||||
&& ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
|
||||
if (isConfigurationMutable(scheduler)) {
|
||||
MutableConfigurationProvider mutableConfigurationProvider =
|
||||
((MutableConfScheduler) scheduler).getMutableConfProvider();
|
||||
|
||||
|
@ -1098,4 +1098,4 @@ private RMWebServices prepareWebServiceForValidation(
|
||||
return webService;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -22,10 +22,13 @@
|
||||
import com.google.inject.servlet.ServletModule;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.api.client.WebResource;
|
||||
import com.sun.jersey.core.util.MultivaluedMapImpl;
|
||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||
import com.sun.jersey.test.framework.WebAppDescriptor;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.http.JettyUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Sets;
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
@ -34,12 +37,13 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
|
||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
|
||||
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
|
||||
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
|
||||
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||
import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
|
||||
import org.codehaus.jettison.json.JSONArray;
|
||||
import org.codehaus.jettison.json.JSONException;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
@ -58,10 +62,15 @@
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.CAPACITY;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_CAPACITY;
|
||||
import static org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils.toJson;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Test scheduler configuration mutation via REST API.
|
||||
@ -74,7 +83,11 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||
"test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE);
|
||||
private static final File OLD_CONF_FILE = new File(new File("target",
|
||||
"test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE + ".tmp");
|
||||
|
||||
private static final String LABEL_1 = "label1";
|
||||
public static final QueuePath ROOT = new QueuePath("root");
|
||||
public static final QueuePath ROOT_A = new QueuePath("root", "a");
|
||||
public static final QueuePath ROOT_A_A1 = QueuePath.createFromQueues("root", "a", "a1");
|
||||
public static final QueuePath ROOT_A_A2 = QueuePath.createFromQueues("root", "a", "a2");
|
||||
private static MockRM rm;
|
||||
private static String userName;
|
||||
private static CapacitySchedulerConfiguration csConf;
|
||||
@ -216,7 +229,7 @@ public void testFormatSchedulerConf() throws Exception {
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||
.entity(toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
newConf = getSchedulerConf();
|
||||
@ -284,7 +297,7 @@ public void testAddNestedQueue() throws Exception {
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||
.entity(toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
@ -323,7 +336,7 @@ public void testAddWithUpdate() throws Exception {
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||
.entity(toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
@ -350,7 +363,7 @@ public void testUnsetParentQueueOrderingPolicy() throws Exception {
|
||||
response = r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo1,
|
||||
.entity(toJson(updateInfo1,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
@ -371,7 +384,7 @@ public void testUnsetParentQueueOrderingPolicy() throws Exception {
|
||||
response = r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo2,
|
||||
.entity(toJson(updateInfo2,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
@ -401,7 +414,7 @@ public void testUnsetLeafQueueOrderingPolicy() throws Exception {
|
||||
response = r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo1,
|
||||
.entity(toJson(updateInfo1,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
@ -419,7 +432,7 @@ public void testUnsetLeafQueueOrderingPolicy() throws Exception {
|
||||
response = r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo2,
|
||||
.entity(toJson(updateInfo2,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
@ -448,7 +461,7 @@ public void testRemoveQueue() throws Exception {
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||
.entity(toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
@ -480,7 +493,7 @@ public void testStopWithRemoveQueue() throws Exception {
|
||||
response = r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||
.entity(toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
@ -513,7 +526,7 @@ public void testStopWithConvertLeafToParentQueue() throws Exception {
|
||||
response = r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||
.entity(toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
@ -538,7 +551,7 @@ public void testRemoveParentQueue() throws Exception {
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||
.entity(toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
@ -569,7 +582,7 @@ public void testRemoveParentQueueWithCapacity() throws Exception {
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||
.entity(toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
@ -601,7 +614,7 @@ public void testRemoveMultipleQueues() throws Exception {
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||
.entity(toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
@ -629,7 +642,7 @@ private void stopQueue(String... queuePaths) throws Exception {
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||
.entity(toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
@ -664,7 +677,7 @@ public void testUpdateQueue() throws Exception {
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||
.entity(toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
LOG.debug("Response headers: " + response.getHeaders());
|
||||
@ -683,7 +696,7 @@ public void testUpdateQueue() throws Exception {
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||
.entity(toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
@ -713,7 +726,7 @@ public void testUpdateQueueCapacity() throws Exception {
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||
.entity(toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
@ -738,7 +751,7 @@ public void testGlobalConfChange() throws Exception {
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||
.entity(toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
@ -753,7 +766,7 @@ public void testGlobalConfChange() throws Exception {
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler-conf").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||
.entity(toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
@ -764,6 +777,220 @@ public void testGlobalConfChange() throws Exception {
|
||||
newCSConf.getMaximumSystemApplications());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeLabelRemovalResidualConfigsAreCleared() throws Exception {
|
||||
WebResource r = resource();
|
||||
ClientResponse response;
|
||||
|
||||
// 1. Create Node Label: label1
|
||||
NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo();
|
||||
nodeLabelsInfo.getNodeLabelsInfo().add(new NodeLabelInfo(LABEL_1));
|
||||
WebResource addNodeLabelsResource = r.path("ws").path("v1").path("cluster")
|
||||
.path("add-node-labels");
|
||||
WebResource getNodeLabelsResource = r.path("ws").path("v1").path("cluster")
|
||||
.path("get-node-labels");
|
||||
WebResource removeNodeLabelsResource = r.path("ws").path("v1").path("cluster")
|
||||
.path("remove-node-labels");
|
||||
WebResource schedulerConfResource = r.path("ws").path("v1").path("cluster")
|
||||
.path(RMWSConsts.SCHEDULER_CONF);
|
||||
response =
|
||||
addNodeLabelsResource.queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(logAndReturnJson(addNodeLabelsResource,
|
||||
toJson(nodeLabelsInfo, NodeLabelsInfo.class)),
|
||||
MediaType.APPLICATION_JSON)
|
||||
.post(ClientResponse.class);
|
||||
|
||||
// 2. Verify new Node Label
|
||||
response =
|
||||
getNodeLabelsResource.queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
nodeLabelsInfo = response.getEntity(NodeLabelsInfo.class);
|
||||
assertEquals(1, nodeLabelsInfo.getNodeLabels().size());
|
||||
for (NodeLabelInfo nl : nodeLabelsInfo.getNodeLabelsInfo()) {
|
||||
assertEquals(LABEL_1, nl.getName());
|
||||
assertTrue(nl.getExclusivity());
|
||||
}
|
||||
|
||||
// 3. Assign 'label1' to root.a
|
||||
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||
Map<String, String> updateForRoot = new HashMap<>();
|
||||
updateForRoot.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, "*");
|
||||
QueueConfigInfo rootUpdateInfo = new QueueConfigInfo(ROOT.getFullPath(), updateForRoot);
|
||||
|
||||
Map<String, String> updateForRootA = new HashMap<>();
|
||||
updateForRootA.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, LABEL_1);
|
||||
QueueConfigInfo rootAUpdateInfo = new QueueConfigInfo(ROOT_A.getFullPath(), updateForRootA);
|
||||
|
||||
updateInfo.getUpdateQueueInfo().add(rootUpdateInfo);
|
||||
updateInfo.getUpdateQueueInfo().add(rootAUpdateInfo);
|
||||
|
||||
response =
|
||||
schedulerConfResource
|
||||
.queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(logAndReturnJson(schedulerConfResource, toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class)), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
|
||||
assertEquals(Sets.newHashSet("*"),
|
||||
cs.getConfiguration().getAccessibleNodeLabels(ROOT.getFullPath()));
|
||||
assertEquals(Sets.newHashSet(LABEL_1),
|
||||
cs.getConfiguration().getAccessibleNodeLabels(ROOT_A.getFullPath()));
|
||||
|
||||
// 4. Set partition capacities to queues as below
|
||||
updateInfo = new SchedConfUpdateInfo();
|
||||
updateForRoot = new HashMap<>();
|
||||
updateForRoot.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "100");
|
||||
updateForRoot.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "100");
|
||||
rootUpdateInfo = new QueueConfigInfo(ROOT.getFullPath(), updateForRoot);
|
||||
|
||||
updateForRootA = new HashMap<>();
|
||||
updateForRootA.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "100");
|
||||
updateForRootA.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "100");
|
||||
rootAUpdateInfo = new QueueConfigInfo(ROOT_A.getFullPath(), updateForRootA);
|
||||
|
||||
// Avoid the following exception by adding some capacities to root.a.a1 and root.a.a2 to label1
|
||||
// Illegal capacity sum of 0.0 for children of queue a for label=label1.
|
||||
// It is set to 0, but parent percent != 0, and doesn't allow children capacity to set to 0
|
||||
Map<String, String> updateForRootA_A1 = new HashMap<>();
|
||||
updateForRootA_A1.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "20");
|
||||
updateForRootA_A1.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "20");
|
||||
QueueConfigInfo rootA_A1UpdateInfo = new QueueConfigInfo(ROOT_A_A1.getFullPath(),
|
||||
updateForRootA_A1);
|
||||
|
||||
Map<String, String> updateForRootA_A2 = new HashMap<>();
|
||||
updateForRootA_A2.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "80");
|
||||
updateForRootA_A2.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "80");
|
||||
QueueConfigInfo rootA_A2UpdateInfo = new QueueConfigInfo(ROOT_A_A2.getFullPath(),
|
||||
updateForRootA_A2);
|
||||
|
||||
|
||||
updateInfo.getUpdateQueueInfo().add(rootUpdateInfo);
|
||||
updateInfo.getUpdateQueueInfo().add(rootAUpdateInfo);
|
||||
updateInfo.getUpdateQueueInfo().add(rootA_A1UpdateInfo);
|
||||
updateInfo.getUpdateQueueInfo().add(rootA_A2UpdateInfo);
|
||||
|
||||
response =
|
||||
schedulerConfResource
|
||||
.queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(logAndReturnJson(schedulerConfResource, toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class)), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
|
||||
assertEquals(100.0, cs.getConfiguration().getLabeledQueueCapacity(ROOT, LABEL_1), 0.001f);
|
||||
assertEquals(100.0, cs.getConfiguration().getLabeledQueueMaximumCapacity(ROOT, LABEL_1),
|
||||
0.001f);
|
||||
assertEquals(100.0, cs.getConfiguration().getLabeledQueueCapacity(ROOT_A, LABEL_1), 0.001f);
|
||||
assertEquals(100.0, cs.getConfiguration().getLabeledQueueMaximumCapacity(ROOT_A, LABEL_1),
|
||||
0.001f);
|
||||
assertEquals(20.0, cs.getConfiguration().getLabeledQueueCapacity(ROOT_A_A1, LABEL_1), 0.001f);
|
||||
assertEquals(20.0, cs.getConfiguration().getLabeledQueueMaximumCapacity(ROOT_A_A1, LABEL_1),
|
||||
0.001f);
|
||||
assertEquals(80.0, cs.getConfiguration().getLabeledQueueCapacity(ROOT_A_A2, LABEL_1), 0.001f);
|
||||
assertEquals(80.0, cs.getConfiguration().getLabeledQueueMaximumCapacity(ROOT_A_A2, LABEL_1),
|
||||
0.001f);
|
||||
|
||||
//5. De-assign node label: "label1" + Remove residual properties
|
||||
updateInfo = new SchedConfUpdateInfo();
|
||||
updateForRoot = new HashMap<>();
|
||||
updateForRoot.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, "*");
|
||||
updateForRoot.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "");
|
||||
updateForRoot.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "");
|
||||
rootUpdateInfo = new QueueConfigInfo(ROOT.getFullPath(), updateForRoot);
|
||||
|
||||
updateForRootA = new HashMap<>();
|
||||
updateForRootA.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, "");
|
||||
updateForRootA.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "");
|
||||
updateForRootA.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "");
|
||||
rootAUpdateInfo = new QueueConfigInfo(ROOT_A.getFullPath(), updateForRootA);
|
||||
|
||||
updateForRootA_A1 = new HashMap<>();
|
||||
updateForRootA_A1.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, "");
|
||||
updateForRootA_A1.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "");
|
||||
updateForRootA_A1.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "");
|
||||
rootA_A1UpdateInfo = new QueueConfigInfo(ROOT_A_A1.getFullPath(), updateForRootA_A1);
|
||||
|
||||
updateForRootA_A2 = new HashMap<>();
|
||||
updateForRootA_A2.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, "");
|
||||
updateForRootA_A2.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "");
|
||||
updateForRootA_A2.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "");
|
||||
rootA_A2UpdateInfo = new QueueConfigInfo(ROOT_A_A2.getFullPath(), updateForRootA_A2);
|
||||
|
||||
updateInfo.getUpdateQueueInfo().add(rootUpdateInfo);
|
||||
updateInfo.getUpdateQueueInfo().add(rootAUpdateInfo);
|
||||
updateInfo.getUpdateQueueInfo().add(rootA_A1UpdateInfo);
|
||||
updateInfo.getUpdateQueueInfo().add(rootA_A2UpdateInfo);
|
||||
|
||||
response =
|
||||
schedulerConfResource
|
||||
.queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(logAndReturnJson(schedulerConfResource, toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class)), MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
assertEquals(Sets.newHashSet("*"),
|
||||
cs.getConfiguration().getAccessibleNodeLabels(ROOT.getFullPath()));
|
||||
assertNull(cs.getConfiguration().getAccessibleNodeLabels(ROOT_A.getFullPath()));
|
||||
|
||||
//6. Remove node label 'label1'
|
||||
MultivaluedMapImpl params = new MultivaluedMapImpl();
|
||||
params.add("labels", LABEL_1);
|
||||
response =
|
||||
removeNodeLabelsResource
|
||||
.queryParam("user.name", userName)
|
||||
.queryParams(params)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.post(ClientResponse.class);
|
||||
|
||||
// Verify
|
||||
response =
|
||||
getNodeLabelsResource.queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
nodeLabelsInfo = response.getEntity(NodeLabelsInfo.class);
|
||||
assertEquals(0, nodeLabelsInfo.getNodeLabels().size());
|
||||
|
||||
//6. Check residual configs
|
||||
assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT, LABEL_1, CAPACITY));
|
||||
assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT, LABEL_1, MAXIMUM_CAPACITY));
|
||||
assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A, LABEL_1, CAPACITY));
|
||||
assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A, LABEL_1, MAXIMUM_CAPACITY));
|
||||
assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A_A1, LABEL_1, CAPACITY));
|
||||
assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A_A1, LABEL_1, MAXIMUM_CAPACITY));
|
||||
assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A_A2, LABEL_1, CAPACITY));
|
||||
assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A_A2, LABEL_1, MAXIMUM_CAPACITY));
|
||||
}
|
||||
|
||||
private String getConfValueForQueueAndLabelAndType(CapacityScheduler cs,
|
||||
QueuePath queuePath, String label, String type) {
|
||||
return cs.getConfiguration().get(
|
||||
CapacitySchedulerConfiguration.getNodeLabelPrefix(
|
||||
queuePath.getFullPath(), label) + type);
|
||||
}
|
||||
|
||||
private Object logAndReturnJson(WebResource ws, String json) {
|
||||
LOG.info("Sending to web resource: {}, json: {}", ws, json);
|
||||
return json;
|
||||
}
|
||||
|
||||
private String getAccessibleNodeLabelsCapacityPropertyName(String label) {
|
||||
return String.format("%s.%s.%s", ACCESSIBLE_NODE_LABELS, label, CAPACITY);
|
||||
}
|
||||
|
||||
private String getAccessibleNodeLabelsMaxCapacityPropertyName(String label) {
|
||||
return String.format("%s.%s.%s", ACCESSIBLE_NODE_LABELS, label, MAXIMUM_CAPACITY);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateWithClusterMaxAllocation() throws Exception {
|
||||
WebResource r = resource();
|
||||
@ -784,7 +1011,7 @@ public void testValidateWithClusterMaxAllocation() throws Exception {
|
||||
.path(RMWSConsts.SCHEDULER_CONF_VALIDATE)
|
||||
.queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||
.entity(toJson(updateInfo,
|
||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||
.post(ClientResponse.class);
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
|
Loading…
Reference in New Issue
Block a user