YARN-6575. Support global configuration mutation in MutableConfProvider. (Jonathan Hung via Xuan Gong)

This commit is contained in:
Xuan 2017-06-05 16:30:38 -07:00 committed by Jonathan Hung
parent a4e6253046
commit e566fd8b58
12 changed files with 151 additions and 65 deletions

View File

@ -21,7 +21,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
/** /**
* Interface for determining whether configuration mutations are allowed. * Interface for determining whether configuration mutations are allowed.
@ -41,7 +41,7 @@ public interface ConfigurationMutationACLPolicy {
* @param confUpdate configurations to be updated * @param confUpdate configurations to be updated
* @return whether provided mutation is allowed or not * @return whether provided mutation is allowed or not
*/ */
boolean isMutationAllowed(UserGroupInformation user, QueueConfigsUpdateInfo boolean isMutationAllowed(UserGroupInformation user, SchedConfUpdateInfo
confUpdate); confUpdate);
} }

View File

@ -22,7 +22,7 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
/** /**
* Default configuration mutation ACL policy. Checks if user is YARN admin. * Default configuration mutation ACL policy. Checks if user is YARN admin.
@ -39,7 +39,7 @@ public void init(Configuration conf, RMContext rmContext) {
@Override @Override
public boolean isMutationAllowed(UserGroupInformation user, public boolean isMutationAllowed(UserGroupInformation user,
QueueConfigsUpdateInfo confUpdate) { SchedConfUpdateInfo confUpdate) {
return authorizer.isAdmin(user); return authorizer.isAdmin(user);
} }
} }

View File

@ -19,7 +19,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
import java.io.IOException; import java.io.IOException;
@ -36,7 +36,7 @@ public interface MutableConfScheduler extends ResourceScheduler {
* @throws IOException if update is invalid * @throws IOException if update is invalid
*/ */
void updateConfiguration(UserGroupInformation user, void updateConfiguration(UserGroupInformation user,
QueueConfigsUpdateInfo confUpdate) throws IOException; SchedConfUpdateInfo confUpdate) throws IOException;
/** /**
* Get the scheduler configuration. * Get the scheduler configuration.

View File

@ -19,7 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
import java.io.IOException; import java.io.IOException;
@ -34,7 +34,7 @@ public interface MutableConfigurationProvider {
* @param confUpdate Key-value pairs for configurations to be updated. * @param confUpdate Key-value pairs for configurations to be updated.
* @throws IOException if scheduler could not be reinitialized * @throws IOException if scheduler could not be reinitialized
*/ */
void mutateConfiguration(UserGroupInformation user, QueueConfigsUpdateInfo void mutateConfiguration(UserGroupInformation user, SchedConfUpdateInfo
confUpdate) throws IOException; confUpdate) throws IOException;
} }

View File

@ -137,7 +137,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -2617,7 +2617,7 @@ public long getMaximumApplicationLifetime(String queueName) {
@Override @Override
public void updateConfiguration(UserGroupInformation user, public void updateConfiguration(UserGroupInformation user,
QueueConfigsUpdateInfo confUpdate) throws IOException { SchedConfUpdateInfo confUpdate) throws IOException {
if (csConfProvider instanceof MutableConfigurationProvider) { if (csConfProvider instanceof MutableConfigurationProvider) {
((MutableConfigurationProvider) csConfProvider).mutateConfiguration( ((MutableConfigurationProvider) csConfProvider).mutateConfiguration(
user, confUpdate); user, confUpdate);

View File

@ -32,7 +32,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -98,7 +98,7 @@ public CapacitySchedulerConfiguration loadConfiguration(Configuration
@Override @Override
public void mutateConfiguration(UserGroupInformation user, public void mutateConfiguration(UserGroupInformation user,
QueueConfigsUpdateInfo confUpdate) throws IOException { SchedConfUpdateInfo confUpdate) throws IOException {
if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) { if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) {
throw new AccessControlException("User is not admin of all modified" + throw new AccessControlException("User is not admin of all modified" +
" queues."); " queues.");
@ -126,7 +126,7 @@ public void mutateConfiguration(UserGroupInformation user,
private Map<String, String> constructKeyValueConfUpdate( private Map<String, String> constructKeyValueConfUpdate(
QueueConfigsUpdateInfo mutationInfo) throws IOException { SchedConfUpdateInfo mutationInfo) throws IOException {
CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler(); CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
CapacitySchedulerConfiguration proposedConf = CapacitySchedulerConfiguration proposedConf =
new CapacitySchedulerConfiguration(cs.getConfiguration(), false); new CapacitySchedulerConfiguration(cs.getConfiguration(), false);
@ -140,6 +140,10 @@ private Map<String, String> constructKeyValueConfUpdate(
for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) { for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) {
updateQueue(updateQueueInfo, proposedConf, confUpdate); updateQueue(updateQueueInfo, proposedConf, confUpdate);
} }
for (Map.Entry<String, String> global : mutationInfo.getGlobalParams()
.entrySet()) {
confUpdate.put(global.getKey(), global.getValue());
}
return confUpdate; return confUpdate;
} }

View File

@ -22,15 +22,17 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map;
import java.util.Set; import java.util.Set;
/** /**
@ -40,16 +42,29 @@
public class QueueAdminConfigurationMutationACLPolicy implements public class QueueAdminConfigurationMutationACLPolicy implements
ConfigurationMutationACLPolicy { ConfigurationMutationACLPolicy {
private Configuration conf;
private RMContext rmContext; private RMContext rmContext;
private YarnAuthorizationProvider authorizer;
@Override @Override
public void init(Configuration conf, RMContext context) { public void init(Configuration config, RMContext context) {
this.conf = config;
this.rmContext = context; this.rmContext = context;
this.authorizer = YarnAuthorizationProvider.getInstance(conf);
} }
@Override @Override
public boolean isMutationAllowed(UserGroupInformation user, public boolean isMutationAllowed(UserGroupInformation user,
QueueConfigsUpdateInfo confUpdate) { SchedConfUpdateInfo confUpdate) {
// If there are global config changes, check if user is admin.
Map<String, String> globalParams = confUpdate.getGlobalParams();
if (globalParams != null && globalParams.size() != 0) {
if (!authorizer.isAdmin(user)) {
return false;
}
}
// Check if user is admin of all modified queues.
Set<String> queues = new HashSet<>(); Set<String> queues = new HashSet<>();
for (QueueConfigInfo addQueueInfo : confUpdate.getAddQueueInfo()) { for (QueueConfigInfo addQueueInfo : confUpdate.getAddQueueInfo()) {
queues.add(addQueueInfo.getQueue()); queues.add(addQueueInfo.getQueue());
@ -71,7 +86,6 @@ public boolean isMutationAllowed(UserGroupInformation user,
// Queue is not found, do nothing. // Queue is not found, do nothing.
} }
String parentPath = queuePath; String parentPath = queuePath;
// TODO: handle global config change.
while (queueInfo == null) { while (queueInfo == null) {
// We are adding a queue (whose parent we are possibly also adding). // We are adding a queue (whose parent we are possibly also adding).
// Check ACL of lowest parent queue which already exists. // Check ACL of lowest parent queue which already exists.

View File

@ -2459,11 +2459,11 @@ protected List<ContainerReport> getContainersReport(
} }
@PUT @PUT
@Path("/queues") @Path("/sched-conf")
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Response updateSchedulerConfiguration(QueueConfigsUpdateInfo public Response updateSchedulerConfiguration(SchedConfUpdateInfo
mutationInfo, @Context HttpServletRequest hsr) mutationInfo, @Context HttpServletRequest hsr)
throws AuthorizationException, InterruptedException { throws AuthorizationException, InterruptedException {
init(); init();

View File

@ -19,30 +19,34 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
/** /**
* Information for making scheduler configuration changes (supports adding, * Information for making scheduler configuration changes (supports adding,
* removing, or updating a queue). * removing, or updating a queue, as well as global scheduler conf changes).
*/ */
@XmlRootElement(name = "schedConf") @XmlRootElement(name = "schedConf")
@XmlAccessorType(XmlAccessType.FIELD) @XmlAccessorType(XmlAccessType.FIELD)
public class QueueConfigsUpdateInfo { public class SchedConfUpdateInfo {
@XmlElement(name = "add") @XmlElement(name = "add-queue")
private ArrayList<QueueConfigInfo> addQueueInfo = new ArrayList<>(); private ArrayList<QueueConfigInfo> addQueueInfo = new ArrayList<>();
@XmlElement(name = "remove") @XmlElement(name = "remove-queue")
private ArrayList<String> removeQueueInfo = new ArrayList<>(); private ArrayList<String> removeQueueInfo = new ArrayList<>();
@XmlElement(name = "update") @XmlElement(name = "update-queue")
private ArrayList<QueueConfigInfo> updateQueueInfo = new ArrayList<>(); private ArrayList<QueueConfigInfo> updateQueueInfo = new ArrayList<>();
public QueueConfigsUpdateInfo() { private HashMap<String, String> global = new HashMap<>();
public SchedConfUpdateInfo() {
// JAXB needs this // JAXB needs this
} }
@ -57,4 +61,9 @@ public ArrayList<String> getRemoveQueueInfo() {
public ArrayList<QueueConfigInfo> getUpdateQueueInfo() { public ArrayList<QueueConfigInfo> getUpdateQueueInfo() {
return updateQueueInfo; return updateQueueInfo;
} }
@XmlElementWrapper(name = "global-updates")
public HashMap<String, String> getGlobalParams() {
return global;
}
} }

View File

@ -26,7 +26,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.QueueAdminConfigurationMutationACLPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.QueueAdminConfigurationMutationACLPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -77,6 +77,7 @@ private void mockQueue(String queueName, MutableConfScheduler scheduler)
.thenReturn(false); .thenReturn(false);
when(scheduler.getQueue(eq(queueName))).thenReturn(queue); when(scheduler.getQueue(eq(queueName))).thenReturn(queue);
} }
@Test @Test
public void testDefaultPolicy() { public void testDefaultPolicy() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -98,7 +99,7 @@ public void testQueueAdminBasedPolicy() {
ConfigurationMutationACLPolicy.class); ConfigurationMutationACLPolicy.class);
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf); policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
policy.init(conf, rmContext); policy.init(conf, rmContext);
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
QueueConfigInfo configInfo = new QueueConfigInfo("root.a", EMPTY_MAP); QueueConfigInfo configInfo = new QueueConfigInfo("root.a", EMPTY_MAP);
updateInfo.getUpdateQueueInfo().add(configInfo); updateInfo.getUpdateQueueInfo().add(configInfo);
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo)); assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
@ -114,7 +115,7 @@ public void testQueueAdminPolicyAddQueue() {
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf); policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
policy.init(conf, rmContext); policy.init(conf, rmContext);
// Add root.b.b1. Should check ACL of root.b queue. // Add root.b.b1. Should check ACL of root.b queue.
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
QueueConfigInfo configInfo = new QueueConfigInfo("root.b.b2", EMPTY_MAP); QueueConfigInfo configInfo = new QueueConfigInfo("root.b.b2", EMPTY_MAP);
updateInfo.getAddQueueInfo().add(configInfo); updateInfo.getAddQueueInfo().add(configInfo);
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo)); assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
@ -130,7 +131,7 @@ public void testQueueAdminPolicyAddNestedQueue() {
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf); policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
policy.init(conf, rmContext); policy.init(conf, rmContext);
// Add root.b.b1.b11. Should check ACL of root.b queue. // Add root.b.b1.b11. Should check ACL of root.b queue.
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
QueueConfigInfo configInfo = new QueueConfigInfo("root.b.b2.b21", EMPTY_MAP); QueueConfigInfo configInfo = new QueueConfigInfo("root.b.b2.b21", EMPTY_MAP);
updateInfo.getAddQueueInfo().add(configInfo); updateInfo.getAddQueueInfo().add(configInfo);
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo)); assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
@ -146,9 +147,26 @@ public void testQueueAdminPolicyRemoveQueue() {
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf); policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
policy.init(conf, rmContext); policy.init(conf, rmContext);
// Remove root.b.b1. // Remove root.b.b1.
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
updateInfo.getRemoveQueueInfo().add("root.b.b1"); updateInfo.getRemoveQueueInfo().add("root.b.b1");
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo)); assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
assertFalse(policy.isMutationAllowed(BAD_USER, updateInfo)); assertFalse(policy.isMutationAllowed(BAD_USER, updateInfo));
} }
@Test
public void testQueueAdminPolicyGlobal() {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.YARN_ADMIN_ACL, GOOD_USER.getShortUserName());
conf.setClass(YarnConfiguration.RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS,
QueueAdminConfigurationMutationACLPolicy.class,
ConfigurationMutationACLPolicy.class);
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
policy.init(conf, rmContext);
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
assertTrue(policy.isMutationAllowed(BAD_USER, updateInfo));
updateInfo.getGlobalParams().put("globalKey", "globalValue");
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
assertFalse(policy.isMutationAllowed(BAD_USER, updateInfo));
}
} }

View File

@ -24,7 +24,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -47,8 +47,8 @@ public class TestMutableCSConfigurationProvider {
private MutableCSConfigurationProvider confProvider; private MutableCSConfigurationProvider confProvider;
private RMContext rmContext; private RMContext rmContext;
private QueueConfigsUpdateInfo goodUpdate; private SchedConfUpdateInfo goodUpdate;
private QueueConfigsUpdateInfo badUpdate; private SchedConfUpdateInfo badUpdate;
private CapacityScheduler cs; private CapacityScheduler cs;
private static final UserGroupInformation TEST_USER = UserGroupInformation private static final UserGroupInformation TEST_USER = UserGroupInformation
@ -62,14 +62,14 @@ public void setUp() {
when(cs.getConfiguration()).thenReturn( when(cs.getConfiguration()).thenReturn(
new CapacitySchedulerConfiguration()); new CapacitySchedulerConfiguration());
confProvider = new MutableCSConfigurationProvider(rmContext); confProvider = new MutableCSConfigurationProvider(rmContext);
goodUpdate = new QueueConfigsUpdateInfo(); goodUpdate = new SchedConfUpdateInfo();
Map<String, String> goodUpdateMap = new HashMap<>(); Map<String, String> goodUpdateMap = new HashMap<>();
goodUpdateMap.put("goodKey", "goodVal"); goodUpdateMap.put("goodKey", "goodVal");
QueueConfigInfo goodUpdateInfo = new QueueConfigInfo goodUpdateInfo = new
QueueConfigInfo("root.a", goodUpdateMap); QueueConfigInfo("root.a", goodUpdateMap);
goodUpdate.getUpdateQueueInfo().add(goodUpdateInfo); goodUpdate.getUpdateQueueInfo().add(goodUpdateInfo);
badUpdate = new QueueConfigsUpdateInfo(); badUpdate = new SchedConfUpdateInfo();
Map<String, String> badUpdateMap = new HashMap<>(); Map<String, String> badUpdateMap = new HashMap<>();
badUpdateMap.put("badKey", "badVal"); badUpdateMap.put("badKey", "badVal");
QueueConfigInfo badUpdateInfo = new QueueConfigInfo badUpdateInfo = new

View File

@ -36,7 +36,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.GuiceServletConfig; import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
import org.apache.hadoop.yarn.webapp.JerseyTestBase; import org.apache.hadoop.yarn.webapp.JerseyTestBase;
@ -162,7 +162,7 @@ public void testAddNestedQueue() throws Exception {
ClientResponse response; ClientResponse response;
// Add parent queue root.d with two children d1 and d2. // Add parent queue root.d with two children d1 and d2.
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
Map<String, String> d1Capacity = new HashMap<>(); Map<String, String> d1Capacity = new HashMap<>();
d1Capacity.put(CapacitySchedulerConfiguration.CAPACITY, "25"); d1Capacity.put(CapacitySchedulerConfiguration.CAPACITY, "25");
d1Capacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "25"); d1Capacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "25");
@ -181,9 +181,9 @@ public void testAddNestedQueue() throws Exception {
updateInfo.getAddQueueInfo().add(d); updateInfo.getAddQueueInfo().add(d);
response = response =
r.path("ws").path("v1").path("cluster") r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName) .path("sched-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), .entity(toJson(updateInfo, SchedConfUpdateInfo.class),
MediaType.APPLICATION_JSON) MediaType.APPLICATION_JSON)
.put(ClientResponse.class); .put(ClientResponse.class);
@ -205,7 +205,7 @@ public void testAddWithUpdate() throws Exception {
ClientResponse response; ClientResponse response;
// Add root.d with capacity 25, reducing root.b capacity from 75 to 50. // Add root.d with capacity 25, reducing root.b capacity from 75 to 50.
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
Map<String, String> dCapacity = new HashMap<>(); Map<String, String> dCapacity = new HashMap<>();
dCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "25"); dCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "25");
Map<String, String> bCapacity = new HashMap<>(); Map<String, String> bCapacity = new HashMap<>();
@ -216,9 +216,9 @@ public void testAddWithUpdate() throws Exception {
updateInfo.getUpdateQueueInfo().add(b); updateInfo.getUpdateQueueInfo().add(b);
response = response =
r.path("ws").path("v1").path("cluster") r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName) .path("sched-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), .entity(toJson(updateInfo, SchedConfUpdateInfo.class),
MediaType.APPLICATION_JSON) MediaType.APPLICATION_JSON)
.put(ClientResponse.class); .put(ClientResponse.class);
@ -238,13 +238,13 @@ public void testRemoveQueue() throws Exception {
stopQueue("root.a.a2"); stopQueue("root.a.a2");
// Remove root.a.a2 // Remove root.a.a2
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
updateInfo.getRemoveQueueInfo().add("root.a.a2"); updateInfo.getRemoveQueueInfo().add("root.a.a2");
response = response =
r.path("ws").path("v1").path("cluster") r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName) .path("sched-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), .entity(toJson(updateInfo, SchedConfUpdateInfo.class),
MediaType.APPLICATION_JSON) MediaType.APPLICATION_JSON)
.put(ClientResponse.class); .put(ClientResponse.class);
@ -263,13 +263,13 @@ public void testRemoveParentQueue() throws Exception {
stopQueue("root.c", "root.c.c1"); stopQueue("root.c", "root.c.c1");
// Remove root.c (parent queue) // Remove root.c (parent queue)
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
updateInfo.getRemoveQueueInfo().add("root.c"); updateInfo.getRemoveQueueInfo().add("root.c");
response = response =
r.path("ws").path("v1").path("cluster") r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName) .path("sched-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), .entity(toJson(updateInfo, SchedConfUpdateInfo.class),
MediaType.APPLICATION_JSON) MediaType.APPLICATION_JSON)
.put(ClientResponse.class); .put(ClientResponse.class);
@ -288,7 +288,7 @@ public void testRemoveParentQueueWithCapacity() throws Exception {
stopQueue("root.a", "root.a.a1", "root.a.a2"); stopQueue("root.a", "root.a.a1", "root.a.a2");
// Remove root.a (parent queue) with capacity 25 // Remove root.a (parent queue) with capacity 25
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
updateInfo.getRemoveQueueInfo().add("root.a"); updateInfo.getRemoveQueueInfo().add("root.a");
// Set root.b capacity to 100 // Set root.b capacity to 100
@ -298,9 +298,9 @@ public void testRemoveParentQueueWithCapacity() throws Exception {
updateInfo.getUpdateQueueInfo().add(b); updateInfo.getUpdateQueueInfo().add(b);
response = response =
r.path("ws").path("v1").path("cluster") r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName) .path("sched-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), .entity(toJson(updateInfo, SchedConfUpdateInfo.class),
MediaType.APPLICATION_JSON) MediaType.APPLICATION_JSON)
.put(ClientResponse.class); .put(ClientResponse.class);
@ -320,7 +320,7 @@ public void testRemoveMultipleQueues() throws Exception {
stopQueue("root.b", "root.c", "root.c.c1"); stopQueue("root.b", "root.c", "root.c.c1");
// Remove root.b and root.c // Remove root.b and root.c
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
updateInfo.getRemoveQueueInfo().add("root.b"); updateInfo.getRemoveQueueInfo().add("root.b");
updateInfo.getRemoveQueueInfo().add("root.c"); updateInfo.getRemoveQueueInfo().add("root.c");
Map<String, String> aCapacity = new HashMap<>(); Map<String, String> aCapacity = new HashMap<>();
@ -330,9 +330,9 @@ public void testRemoveMultipleQueues() throws Exception {
updateInfo.getUpdateQueueInfo().add(configInfo); updateInfo.getUpdateQueueInfo().add(configInfo);
response = response =
r.path("ws").path("v1").path("cluster") r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName) .path("sched-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), .entity(toJson(updateInfo, SchedConfUpdateInfo.class),
MediaType.APPLICATION_JSON) MediaType.APPLICATION_JSON)
.put(ClientResponse.class); .put(ClientResponse.class);
@ -348,7 +348,7 @@ private void stopQueue(String... queuePaths) throws Exception {
ClientResponse response; ClientResponse response;
// Set state of queues to STOPPED. // Set state of queues to STOPPED.
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
Map<String, String> stoppedParam = new HashMap<>(); Map<String, String> stoppedParam = new HashMap<>();
stoppedParam.put(CapacitySchedulerConfiguration.STATE, stoppedParam.put(CapacitySchedulerConfiguration.STATE,
QueueState.STOPPED.toString()); QueueState.STOPPED.toString());
@ -358,9 +358,9 @@ private void stopQueue(String... queuePaths) throws Exception {
} }
response = response =
r.path("ws").path("v1").path("cluster") r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName) .path("sched-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), .entity(toJson(updateInfo, SchedConfUpdateInfo.class),
MediaType.APPLICATION_JSON) MediaType.APPLICATION_JSON)
.put(ClientResponse.class); .put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals(Status.OK.getStatusCode(), response.getStatus());
@ -378,7 +378,7 @@ public void testUpdateQueue() throws Exception {
ClientResponse response; ClientResponse response;
// Update config value. // Update config value.
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
Map<String, String> updateParam = new HashMap<>(); Map<String, String> updateParam = new HashMap<>();
updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_AM_RESOURCE_SUFFIX, updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_AM_RESOURCE_SUFFIX,
"0.2"); "0.2");
@ -393,9 +393,9 @@ public void testUpdateQueue() throws Exception {
0.001f); 0.001f);
response = response =
r.path("ws").path("v1").path("cluster") r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName) .path("sched-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), .entity(toJson(updateInfo, SchedConfUpdateInfo.class),
MediaType.APPLICATION_JSON) MediaType.APPLICATION_JSON)
.put(ClientResponse.class); .put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals(Status.OK.getStatusCode(), response.getStatus());
@ -411,9 +411,9 @@ public void testUpdateQueue() throws Exception {
updateInfo.getUpdateQueueInfo().add(aUpdateInfo); updateInfo.getUpdateQueueInfo().add(aUpdateInfo);
response = response =
r.path("ws").path("v1").path("cluster") r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName) .path("sched-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), .entity(toJson(updateInfo, SchedConfUpdateInfo.class),
MediaType.APPLICATION_JSON) MediaType.APPLICATION_JSON)
.put(ClientResponse.class); .put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals(Status.OK.getStatusCode(), response.getStatus());
@ -431,7 +431,7 @@ public void testUpdateQueueCapacity() throws Exception {
ClientResponse response; ClientResponse response;
// Update root.a and root.b capacity to 50. // Update root.a and root.b capacity to 50.
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
Map<String, String> updateParam = new HashMap<>(); Map<String, String> updateParam = new HashMap<>();
updateParam.put(CapacitySchedulerConfiguration.CAPACITY, "50"); updateParam.put(CapacitySchedulerConfiguration.CAPACITY, "50");
QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.a", updateParam); QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.a", updateParam);
@ -441,9 +441,9 @@ public void testUpdateQueueCapacity() throws Exception {
response = response =
r.path("ws").path("v1").path("cluster") r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName) .path("sched-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), .entity(toJson(updateInfo, SchedConfUpdateInfo.class),
MediaType.APPLICATION_JSON) MediaType.APPLICATION_JSON)
.put(ClientResponse.class); .put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals(Status.OK.getStatusCode(), response.getStatus());
@ -453,6 +453,47 @@ public void testUpdateQueueCapacity() throws Exception {
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f); assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f);
} }
@Test
public void testGlobalConfChange() throws Exception {
WebResource r = resource();
ClientResponse response;
// Set maximum-applications to 30000.
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
updateInfo.getGlobalParams().put(CapacitySchedulerConfiguration.PREFIX +
"maximum-applications", "30000");
response =
r.path("ws").path("v1").path("cluster")
.path("sched-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, SchedConfUpdateInfo.class),
MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration newCSConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
assertEquals(30000, newCSConf.getMaximumSystemApplications());
updateInfo.getGlobalParams().put(CapacitySchedulerConfiguration.PREFIX +
"maximum-applications", null);
// Unset maximum-applications. Should be set to default.
response =
r.path("ws").path("v1").path("cluster")
.path("sched-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, SchedConfUpdateInfo.class),
MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
newCSConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
assertEquals(CapacitySchedulerConfiguration
.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS,
newCSConf.getMaximumSystemApplications());
}
@Override @Override
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {