YARN-9864. Format CS Configuration present in Configuration Store. Contributeed by Prabhu Joseph

This commit is contained in:
Sunil G 2019-10-01 09:08:41 +05:30
parent b3275ab1f2
commit 137546a78a
16 changed files with 431 additions and 12 deletions

View File

@ -65,6 +65,16 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey.jersey-test-framework</groupId>
<artifactId>jersey-test-framework-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey.jersey-test-framework</groupId>
<artifactId>jersey-test-framework-grizzly2</artifactId>
<scope>test</scope>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>

View File

@ -21,6 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.WebResource.Builder;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
@ -30,6 +31,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
@ -56,6 +58,7 @@ public class SchedConfCLI extends Configured implements Tool {
private static final String REMOVE_QUEUES_OPTION = "removeQueues";
private static final String UPDATE_QUEUES_OPTION = "updateQueues";
private static final String GLOBAL_OPTIONS = "globalUpdates";
private static final String FORMAT_CONF = "formatConfig";
private static final String HELP_CMD = "help";
private static final String CONF_ERR_MSG = "Specify configuration key " +
@ -83,6 +86,9 @@ public int run(String[] args) throws Exception {
"Update queue configurations");
opts.addOption("global", GLOBAL_OPTIONS, true,
"Update global scheduler configurations");
opts.addOption("format", FORMAT_CONF, false,
"Format Scheduler Configuration and reload from" +
" capacity-scheduler.xml");
opts.addOption("h", HELP_CMD, false, "Displays help for all commands.");
int exitCode = -1;
@ -101,6 +107,7 @@ public int run(String[] args) throws Exception {
}
boolean hasOption = false;
boolean format = false;
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
try {
if (parsedCli.hasOption(ADD_QUEUES_OPTION)) {
@ -121,6 +128,11 @@ public int run(String[] args) throws Exception {
hasOption = true;
globalUpdates(parsedCli.getOptionValue(GLOBAL_OPTIONS), updateInfo);
}
if (parsedCli.hasOption((FORMAT_CONF))) {
hasOption = true;
format = true;
}
} catch (IllegalArgumentException e) {
System.err.println(e.getMessage());
return -1;
@ -133,18 +145,78 @@ public int run(String[] args) throws Exception {
}
Configuration conf = getConf();
return WebAppUtils.execOnActiveRM(conf,
this::updateSchedulerConfOnRMNode, updateInfo);
if (format) {
return WebAppUtils.execOnActiveRM(conf, this::formatSchedulerConf, null);
} else {
return WebAppUtils.execOnActiveRM(conf,
this::updateSchedulerConfOnRMNode, updateInfo);
}
}
private int updateSchedulerConfOnRMNode(String webAppAddress,
@VisibleForTesting
int formatSchedulerConf(String webAppAddress, WebResource resource)
throws Exception {
Client webServiceClient = Client.create();
ClientResponse response = null;
resource = (resource != null) ? resource :
webServiceClient.resource(webAppAddress);
try {
Builder builder = null;
if (UserGroupInformation.isSecurityEnabled()) {
builder = resource
.path("ws").path("v1").path("cluster")
.path("/scheduler-conf/format")
.accept(MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON);
} else {
builder = resource
.path("ws").path("v1").path("cluster")
.path("/scheduler-conf/format").queryParam("user.name",
UserGroupInformation.getCurrentUser().getShortUserName())
.accept(MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON);
}
response = builder.get(ClientResponse.class);
if (response != null) {
if (response.getStatus() == Status.OK.getStatusCode()) {
System.out.println(response.getEntity(String.class));
return 0;
} else {
System.err.println("Failed to format scheduler configuration: " +
response.getEntity(String.class));
}
} else {
System.err.println("Failed to format scheduler configuration: " +
"null response");
}
return -1;
} finally {
if (response != null) {
response.close();
}
webServiceClient.destroy();
}
}
@VisibleForTesting
int updateSchedulerConfOnRMNode(String webAppAddress,
SchedConfUpdateInfo updateInfo) throws Exception {
Client webServiceClient = Client.create();
ClientResponse response = null;
WebResource resource = webServiceClient.resource(webAppAddress);
try {
Builder builder = webServiceClient.resource(webAppAddress)
.path("ws").path("v1").path("cluster")
.path("scheduler-conf").accept(MediaType.APPLICATION_JSON);
Builder builder = null;
if (UserGroupInformation.isSecurityEnabled()) {
builder = resource.path("ws").path("v1").path("cluster")
.path("scheduler-conf").accept(MediaType.APPLICATION_JSON);
} else {
builder = resource.path("ws").path("v1").path("cluster")
.queryParam("user.name",
UserGroupInformation.getCurrentUser().getShortUserName())
.path("scheduler-conf").accept(MediaType.APPLICATION_JSON);
}
builder.entity(YarnWebServiceUtils.toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON);
response = builder.put(ClientResponse.class);
@ -253,7 +325,8 @@ private void printUsage() {
+ "[-remove \"queueRemovePath1;queueRemovePath2\"] "
+ "[-update \"queueUpdatePath1:confKey1=confVal1\"] "
+ "[-global globalConfKey1=globalConfVal1,"
+ "globalConfKey2=globalConfVal2]\n"
+ "globalConfKey2=globalConfVal2] "
+ "[-format]\n"
+ "Example (adding queues): yarn schedulerconf -add "
+ "\"root.a.a1:capacity=100,maximum-capacity=100;root.a.a2:capacity=0,"
+ "maximum-capacity=0\"\n"
@ -264,6 +337,8 @@ private void printUsage() {
+ "maximum-capacity=75\"\n"
+ "Example (global scheduler update): yarn schedulerconf "
+ "-global yarn.scheduler.capacity.maximum-applications=10000\n"
+ "Example (format scheduler configuration): yarn schedulerconf "
+ "-format\n"
+ "Note: This is an alpha feature, the syntax/options are subject to "
+ "change, please run at your own risk.");
}

View File

@ -22,12 +22,48 @@
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.security.Principal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.inject.Guice;
import com.google.inject.Singleton;
import com.google.inject.servlet.ServletModule;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.WebAppDescriptor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletResponse;
import javax.servlet.ServletRequest;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletRequestWrapper;
import javax.servlet.http.HttpServletResponse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -35,7 +71,7 @@
/**
* Class for testing {@link SchedConfCLI}.
*/
public class TestSchedConfCLI {
public class TestSchedConfCLI extends JerseyTestBase {
private ByteArrayOutputStream sysOutStream;
private PrintStream sysOut;
@ -45,6 +81,23 @@ public class TestSchedConfCLI {
private SchedConfCLI cli;
private static MockRM rm;
private static String userName;
private static CapacitySchedulerConfiguration csConf;
private static final File CONF_FILE = new File(new File("target",
"test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE);
private static final File OLD_CONF_FILE = new File(new File("target",
"test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE + ".tmp");
public TestSchedConfCLI() {
super(new WebAppDescriptor.Builder(
"org.apache.hadoop.yarn.server.resourcemanager.webapp")
.contextListenerClass(GuiceServletConfig.class)
.filterClass(com.google.inject.servlet.GuiceFilter.class)
.contextPath("jersey-guice-filter").servletPath("/").build());
}
@Before
public void setUp() {
sysOutStream = new ByteArrayOutputStream();
@ -58,6 +111,138 @@ public void setUp() {
cli = new SchedConfCLI();
}
private static class WebServletModule extends ServletModule {
@Override
protected void configureServlets() {
bind(JAXBContextResolver.class);
bind(RMWebServices.class);
bind(GenericExceptionHandler.class);
Configuration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
YarnConfiguration.MEMORY_CONFIGURATION_STORE);
try {
userName = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException ioe) {
throw new RuntimeException("Unable to get current user name "
+ ioe.getMessage(), ioe);
}
csConf = new CapacitySchedulerConfiguration(new Configuration(false),
false);
setupQueueConfiguration(csConf);
try {
if (CONF_FILE.exists()) {
if (!CONF_FILE.renameTo(OLD_CONF_FILE)) {
throw new RuntimeException("Failed to rename conf file");
}
}
FileOutputStream out = new FileOutputStream(CONF_FILE);
csConf.writeXml(out);
out.close();
} catch (IOException e) {
throw new RuntimeException("Failed to write XML file", e);
}
rm = new MockRM(conf);
bind(ResourceManager.class).toInstance(rm);
serve("/*").with(GuiceContainer.class);
filter("/*").through(TestRMCustomAuthFilter.class);
}
}
/**
* Custom filter which sets the Remote User for testing purpose.
*/
@Singleton
public static class TestRMCustomAuthFilter extends AuthenticationFilter {
@Override
public void init(FilterConfig filterConfig) {
}
@Override
public void doFilter(ServletRequest request, ServletResponse response,
FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest)request;
HttpServletResponse httpResponse = (HttpServletResponse) response;
httpRequest = new HttpServletRequestWrapper(httpRequest) {
public String getAuthType() {
return null;
}
public String getRemoteUser() {
return userName;
}
public Principal getUserPrincipal() {
return new Principal() {
@Override
public String getName() {
return userName;
}
};
}
};
doFilter(filterChain, httpRequest, httpResponse);
}
}
private static void setupQueueConfiguration(
CapacitySchedulerConfiguration config) {
config.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[]{"testqueue"});
String a = CapacitySchedulerConfiguration.ROOT + ".testqueue";
config.setCapacity(a, 100f);
config.setMaximumCapacity(a, 100f);
}
@Test(timeout = 10000)
public void testFormatSchedulerConf() throws Exception {
try {
super.setUp();
GuiceServletConfig.setInjector(
Guice.createInjector(new WebServletModule()));
ResourceScheduler scheduler = rm.getResourceScheduler();
MutableConfigurationProvider provider =
((MutableConfScheduler) scheduler).getMutableConfProvider();
SchedConfUpdateInfo schedUpdateInfo = new SchedConfUpdateInfo();
HashMap<String, String> globalUpdates = new HashMap<>();
globalUpdates.put("schedKey1", "schedVal1");
schedUpdateInfo.setGlobalParams(globalUpdates);
provider.logAndApplyMutation(UserGroupInformation.getCurrentUser(),
schedUpdateInfo);
rm.getRMContext().getRMAdminService().refreshQueues();
provider.confirmPendingMutation(true);
Configuration schedulerConf = provider.getConfiguration();
assertEquals("schedVal1", schedulerConf.get("schedKey1"));
int exitCode = cli.formatSchedulerConf("", resource());
assertEquals(0, exitCode);
schedulerConf = provider.getConfiguration();
assertNull(schedulerConf.get("schedKey1"));
} finally {
if (rm != null) {
rm.stop();
}
CONF_FILE.delete();
if (OLD_CONF_FILE.exists()) {
if (!OLD_CONF_FILE.renameTo(CONF_FILE)) {
throw new RuntimeException("Failed to re-copy old" +
" configuration file");
}
}
super.tearDown();
}
}
@Test(timeout = 10000)
public void testInvalidConf() throws Exception {
// conf pair with no key should be invalid

View File

@ -65,6 +65,8 @@ void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo
*/
Configuration getConfiguration();
void formatConfigurationInStore(Configuration conf) throws Exception;
/**
* Closes the configuration provider, releasing any required resources.
* @throws IOException on failure to close

View File

@ -156,6 +156,11 @@ private void finalizeFileSystemFile() throws IOException {
+ finalConfigPath);
}
@Override
public void format() throws Exception {
fileSystem.delete(schedulerConfDir, true);
}
private Path getFinalConfigPath(Path tempPath) {
String tempConfigPathStr = tempPath.getName();
if (!tempConfigPathStr.endsWith(TMP)) {

View File

@ -60,6 +60,11 @@ public void confirmMutation(boolean isValid) {
pendingMutation = null;
}
@Override
public void format() {
this.schedConf = null;
}
@Override
public synchronized Configuration retrieve() {
return schedConf;

View File

@ -98,6 +98,13 @@ public void initialize(Configuration config, Configuration schedConf,
}
}
@Override
public void format() throws Exception {
close();
FileSystem fs = FileSystem.getLocal(conf);
fs.delete(getStorageDir(), true);
}
private void initDatabase(Configuration config) throws Exception {
Path storeRoot = createStorageDir();
Options options = new Options();

View File

@ -40,6 +40,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* CS configuration provider which implements
@ -58,6 +59,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
private ConfigurationMutationACLPolicy aclMutationPolicy;
private RMContext rmContext;
private final ReentrantReadWriteLock formatLock =
new ReentrantReadWriteLock();
public MutableCSConfigurationProvider(RMContext rmContext) {
this.rmContext = rmContext;
}
@ -151,17 +155,51 @@ public void logAndApplyMutation(UserGroupInformation user,
}
}
@Override
public void formatConfigurationInStore(Configuration config)
throws Exception {
formatLock.writeLock().lock();
try {
confStore.format();
Configuration initialSchedConf = new Configuration(false);
initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE);
this.schedConf = new Configuration(false);
// We need to explicitly set the key-values in schedConf, otherwise
// these configuration keys cannot be deleted when
// configuration is reloaded.
for (Map.Entry<String, String> kv : initialSchedConf) {
schedConf.set(kv.getKey(), kv.getValue());
}
confStore.initialize(config, schedConf, rmContext);
confStore.checkVersion();
} catch (Exception e) {
throw new IOException(e);
} finally {
formatLock.writeLock().unlock();
}
}
@Override
public void confirmPendingMutation(boolean isValid) throws Exception {
confStore.confirmMutation(isValid);
if (!isValid) {
schedConf = oldConf;
formatLock.readLock().lock();
try {
confStore.confirmMutation(isValid);
if (!isValid) {
schedConf = oldConf;
}
} finally {
formatLock.readLock().unlock();
}
}
@Override
public void reloadConfigurationFromStore() throws Exception {
schedConf = confStore.retrieve();
formatLock.readLock().lock();
try {
schedConf = confStore.retrieve();
} finally {
formatLock.readLock().unlock();
}
}
private List<String> getSiblingQueues(String queuePath, Configuration conf) {

View File

@ -125,6 +125,13 @@ public void close() throws IOException {}
*/
public abstract Configuration retrieve() throws IOException;
/**
* Format the persisted configuration.
* @throws IOException on failure to format
*/
public abstract void format() throws Exception;
/**
* Get a list of confirmed configuration mutations starting from a given id.
* @param fromId id from which to start getting mutations, inclusive

View File

@ -132,6 +132,11 @@ public Version getConfStoreVersion() throws Exception {
return null;
}
@Override
public void format() throws Exception {
zkManager.delete(confStorePath);
}
@Override
public synchronized void storeVersion() throws Exception {
byte[] data =

View File

@ -48,6 +48,9 @@ public final class RMWSConsts {
/** Path for {@code RMWebServices#updateSchedulerConfiguration}. */
public static final String SCHEDULER_CONF = "/scheduler-conf";
/** Path for {@code RMWebServices#formatSchedulerConfiguration}. */
public static final String FORMAT_SCHEDULER_CONF = "/scheduler-conf/format";
/** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */
public static final String SCHEDULER_LOGS = "/scheduler/logs";

View File

@ -2564,6 +2564,37 @@ protected List<ContainerReport> getContainersReport(
return rm.getClientRMService().getContainers(request).getContainerList();
}
@GET
@Path(RMWSConsts.FORMAT_SCHEDULER_CONF)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
public Response formatSchedulerConfiguration(@Context HttpServletRequest hsr)
throws AuthorizationException {
// Only admin user allowed to format scheduler conf in configuration store
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, true);
ResourceScheduler scheduler = rm.getResourceScheduler();
if (scheduler instanceof MutableConfScheduler
&& ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
try {
MutableConfigurationProvider mutableConfigurationProvider =
((MutableConfScheduler) scheduler).getMutableConfProvider();
mutableConfigurationProvider.formatConfigurationInStore(conf);
return Response.status(Status.OK).entity("Configuration under " +
"store successfully formatted.").build();
} catch (Exception e) {
LOG.error("Exception thrown when formating configuration", e);
return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
.build();
}
} else {
return Response.status(Status.BAD_REQUEST)
.entity("Configuration change only supported by " +
"MutableConfScheduler.").build();
}
}
@PUT
@Path(RMWSConsts.SCHEDULER_CONF)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,

View File

@ -36,6 +36,9 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
/**
* Tests {@link FSSchedulerConfigurationStore}.
@ -135,6 +138,19 @@ public void confirmMutationWithInValid() throws Exception {
compareConfig(conf, storeConf);
}
@Test
public void testFormatConfiguration() throws Exception {
assertTrue(testSchedulerConfigurationDir.exists());
Configuration schedulerConf = new Configuration();
schedulerConf.set("a", "a");
writeConf(schedulerConf);
configurationStore.initialize(conf, conf, null);
Configuration storedConfig = configurationStore.retrieve();
assertEquals("a", storedConfig.get("a"));
configurationStore.format();
assertFalse(testSchedulerConfigurationDir.exists());
}
@Test
public void retrieve() throws Exception {
Configuration schedulerConf = new Configuration();

View File

@ -104,6 +104,10 @@ public void testInMemoryBackedProvider() throws Exception {
confProvider.confirmPendingMutation(false);
assertNull(confProvider.loadConfiguration(conf).get(
"yarn.scheduler.capacity.root.a.badKey"));
confProvider.formatConfigurationInStore(conf);
assertNull(confProvider.loadConfiguration(conf)
.get("yarn.scheduler.capacity.root.a.goodKey"));
}
@Test
@ -174,6 +178,10 @@ public void testHDFSBackedProvider() throws Exception {
assertNull(confProvider.loadConfiguration(conf).get(
"yarn.scheduler.capacity.root.a.badKey"));
confProvider.formatConfigurationInStore(conf);
assertNull(confProvider.loadConfiguration(conf)
.get("yarn.scheduler.capacity.root.a.goodKey"));
}
private void writeConf(Configuration conf, String storePath)

View File

@ -128,6 +128,15 @@ public void testPersistConfiguration() throws Exception {
}
@Test
public void testFormatConfiguration() throws Exception {
schedConf.set("key", "val");
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
confStore.format();
assertNull(confStore.retrieve());
}
@Test
public void testPersistUpdatedConfiguration() throws Exception {
confStore.initialize(conf, schedConf, rmContext);

View File

@ -189,6 +189,19 @@ public void testGetSchedulerConf() throws Exception {
assertEquals(3, orgConf.getQueues("root").length);
}
@Test
public void testFormatSchedulerConf() throws Exception {
testAddNestedQueue();
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster")
.queryParam("user.name", userName)
.path(RMWSConsts.FORMAT_SCHEDULER_CONF)
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration orgConf = getSchedulerConf();
assertEquals(3, orgConf.getQueues("root").length);
}
@Test
public void testAddNestedQueue() throws Exception {
CapacitySchedulerConfiguration orgConf = getSchedulerConf();