YARN-1611. Introduced the concept of a configuration provider which can be used by ResourceManager to read configuration locally or from remote systems so as to help RM failover. Contributed by Xuan Gong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1564002 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-02-03 19:07:55 +00:00
parent 88d8ea9509
commit ca72e11158
11 changed files with 489 additions and 13 deletions

View File

@ -82,6 +82,10 @@ Release 2.4.0 - UNRELEASED
YARN-1633. Defined user-facing entity, entity-info and event objects related
to Application Timeline feature. (Zhijie Shen via vinodkv)
YARN-1611. Introduced the concept of a configuration provider which can be
used by ResourceManager to read configuration locally or from remote systems
so as to help RM failover. (Xuan Gong via vinodkv)
IMPROVEMENTS
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.conf;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@Private
@Unstable
/**
* Base class to implement ConfigurationProvider.
* Real ConfigurationProvider implementations need to derive from it and
* implement load methods to actually load the configuration.
*/
public abstract class ConfigurationProvider {
public void init(Configuration conf) throws Exception {
initInternal(conf);
}
public void close() throws Exception {
closeInternal();
}
/**
* Get the configuration.
* @param name The configuration file name
* @return configuration
* @throws YarnException
* @throws IOException
*/
public abstract Configuration getConfiguration(String name)
throws YarnException, IOException;
/**
* Derived classes initialize themselves using this method.
*/
public abstract void initInternal(Configuration conf) throws Exception;
/**
* Derived classes close themselves using this method.
*/
public abstract void closeInternal() throws Exception;
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.conf;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@Private
@Unstable
/**
* Factory for {@link ConfigurationProvider} implementations.
*/
public class ConfigurationProviderFactory {
/**
* Creates an instance of {@link ConfigurationProvider} using given
* configuration.
* @param conf
* @return configurationProvider
*/
@SuppressWarnings("unchecked")
public static ConfigurationProvider
getConfigurationProvider(Configuration conf) {
Class<? extends ConfigurationProvider> defaultProviderClass;
try {
defaultProviderClass = (Class<? extends ConfigurationProvider>)
Class.forName(
YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS);
} catch (Exception e) {
throw new YarnRuntimeException(
"Invalid default configuration provider class"
+ YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS, e);
}
ConfigurationProvider configurationProvider = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
defaultProviderClass, ConfigurationProvider.class), conf);
return configurationProvider;
}
}

View File

@ -37,6 +37,9 @@
@Evolving
public class YarnConfiguration extends Configuration {
@Private
public static final String CS_CONFIGURATION_FILE= "capacity-scheduler.xml";
private static final String YARN_DEFAULT_XML_FILE = "yarn-default.xml";
private static final String YARN_SITE_XML_FILE = "yarn-site.xml";
@ -329,6 +332,16 @@ public class YarnConfiguration extends Configuration {
public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids";
public static final String RM_HA_ID = RM_HA_PREFIX + "id";
/** Store the related configuration files in File System */
public static final String FS_BASED_RM_CONF_STORE = RM_PREFIX
+ "configuration.file-system-based-store";
public static final String DEFAULT_FS_BASED_RM_CONF_STORE = "/yarn/conf";
public static final String RM_CONFIGURATION_PROVIDER_CLASS = RM_PREFIX
+ "configuration.provider-class";
public static final String DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS =
"org.apache.hadoop.yarn.LocalConfigurationProvider";
@Private
public static final List<String> RM_SERVICES_ADDRESS_CONF_KEYS =
Collections.unmodifiableList(Arrays.asList(

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@Private
@Unstable
public class FileSystemBasedConfigurationProvider
extends ConfigurationProvider {
private static final Log LOG = LogFactory
.getLog(FileSystemBasedConfigurationProvider.class);
private FileSystem fs;
private Path configDir;
@Override
public synchronized Configuration getConfiguration(String name)
throws IOException, YarnException {
Path configPath = new Path(this.configDir, name);
if (!fs.exists(configPath)) {
throw new YarnException("Can not find Configuration: " + name + " in "
+ configDir);
}
Configuration conf = new Configuration(false);
conf.addResource(fs.open(configPath));
return conf;
}
@Override
public synchronized void initInternal(Configuration conf) throws Exception {
configDir =
new Path(conf.get(YarnConfiguration.FS_BASED_RM_CONF_STORE,
YarnConfiguration.DEFAULT_FS_BASED_RM_CONF_STORE));
fs = configDir.getFileSystem(conf);
if (!fs.exists(configDir)) {
fs.mkdirs(configDir);
}
}
@Override
public synchronized void closeInternal() throws Exception {
fs.close();
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.exceptions.YarnException;
@Private
@Unstable
public class LocalConfigurationProvider extends ConfigurationProvider {
@Override
public Configuration getConfiguration(String name)
throws IOException, YarnException {
return new Configuration();
}
@Override
public void initInternal(Configuration conf) throws Exception {
// Do nothing
}
@Override
public void closeInternal() throws Exception {
// Do nothing
}
}

View File

@ -588,6 +588,18 @@
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.NullApplicationHistoryStore</value>
</property>
<property>
<description>The class to use as the configuration provider.
If org.apache.hadoop.yarn.LocalConfigurationProvider is used,
the local configuration will be loaded.
If org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider is used,
the configuration which will be loaded should be uploaded to remote File system first.
</description>>
<name>yarn.resourcemanager.configuration.provider-class</name>
<value>org.apache.hadoop.yarn.LocalConfigurationProvider</value>
<!-- <value>org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider</value> -->
</property>
<!-- Node Manager Configs -->
<property>
<description>The hostname of the NM.</description>

View File

@ -47,6 +47,8 @@
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -89,6 +91,8 @@ public class AdminService extends CompositeService implements
private InetSocketAddress masterServiceAddress;
private AccessControlList adminAcl;
private ConfigurationProvider configurationProvider = null;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@ -109,6 +113,10 @@ public synchronized void serviceInit(Configuration conf) throws Exception {
}
}
this.configurationProvider =
ConfigurationProviderFactory.getConfigurationProvider(conf);
configurationProvider.init(conf);
masterServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
@ -129,6 +137,9 @@ protected synchronized void serviceStart() throws Exception {
@Override
protected synchronized void serviceStop() throws Exception {
stopServer();
if (this.configurationProvider != null) {
configurationProvider.close();
}
super.serviceStop();
}
@ -295,23 +306,28 @@ public synchronized HAServiceStatus getServiceStatus() throws IOException {
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws YarnException, StandbyException {
UserGroupInformation user = checkAcls("refreshQueues");
String argName = "refreshQueues";
UserGroupInformation user = checkAcls(argName);
if (!isRMActive()) {
RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues",
RMAuditLogger.logFailure(user.getShortUserName(), argName,
adminAcl.toString(), "AdminService",
"ResourceManager is not active. Can not refresh queues.");
throwStandbyException();
}
RefreshQueuesResponse response =
recordFactory.newRecordInstance(RefreshQueuesResponse.class);
try {
rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
RMAuditLogger.logSuccess(user.getShortUserName(), "refreshQueues",
Configuration conf =
getConfiguration(YarnConfiguration.CS_CONFIGURATION_FILE);
rmContext.getScheduler().reinitialize(conf, this.rmContext);
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
return recordFactory.newRecordInstance(RefreshQueuesResponse.class);
return response;
} catch (IOException ioe) {
LOG.info("Exception refreshing queues ", ioe);
RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues",
RMAuditLogger.logFailure(user.getShortUserName(), argName,
adminAcl.toString(), "AdminService",
"Exception refreshing queues");
throw RPCUtil.getRemoteException(ioe);
@ -483,5 +499,9 @@ public UpdateNodeResourceResponse updateNodeResource(
UpdateNodeResourceResponse.class);
return response;
}
private synchronized Configuration getConfiguration(String confFileName)
throws YarnException, IOException {
return this.configurationProvider.getConfiguration(confFileName);
}
}

View File

@ -195,6 +195,7 @@ public Configuration getConf() {
private ResourceCalculator calculator;
private boolean usePortForNodeName;
private boolean useLocalConfigurationProvider;
public CapacityScheduler() {}
@ -261,7 +262,13 @@ public Resource getClusterResources() {
public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException {
if (!initialized) {
this.conf = new CapacitySchedulerConfiguration(conf);
this.useLocalConfigurationProvider = conf.get(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS).equals(
"org.apache.hadoop.yarn.LocalConfigurationProvider");
this.conf =
new CapacitySchedulerConfiguration(conf,
this.useLocalConfigurationProvider);
validateConf(this.conf);
this.minimumAllocation = this.conf.getMinimumAllocation();
this.maximumAllocation = this.conf.getMaximumAllocation();
@ -279,9 +286,10 @@ public Resource getClusterResources() {
"minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
"maximumAllocation=<" + getMaximumResourceCapability() + ">");
} else {
CapacitySchedulerConfiguration oldConf = this.conf;
this.conf = new CapacitySchedulerConfiguration(conf);
this.conf =
new CapacitySchedulerConfiguration(conf,
this.useLocalConfigurationProvider);
validateConf(this.conf);
try {
LOG.info("Re-initializing queues...");

View File

@ -140,10 +140,17 @@ public CapacitySchedulerConfiguration() {
}
public CapacitySchedulerConfiguration(Configuration configuration) {
super(configuration);
addResource(CS_CONFIGURATION_FILE);
this(configuration, true);
}
public CapacitySchedulerConfiguration(Configuration configuration,
boolean useLocalConfigurationProvider) {
super(configuration);
if (useLocalConfigurationProvider) {
addResource(CS_CONFIGURATION_FILE);
}
}
private String getQueuePrefix(String queue) {
String queueName = PREFIX + queue + DOT;
return queueName;

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.fail;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestRMAdminService {
private final Configuration configuration = new YarnConfiguration();
private MockRM rm = null;
private FileSystem fs;
private Path workingPath;
private Path tmpDir;
@Before
public void setup() throws IOException {
fs = FileSystem.get(configuration);
workingPath =
new Path(new File("target", this.getClass().getSimpleName()
+ "-remoteDir").getAbsolutePath());
configuration.set(YarnConfiguration.FS_BASED_RM_CONF_STORE,
workingPath.toString());
tmpDir = new Path(new File("target", this.getClass().getSimpleName()
+ "-tmpDir").getAbsolutePath());
fs.delete(workingPath, true);
fs.delete(tmpDir, true);
fs.mkdirs(workingPath);
fs.mkdirs(tmpDir);
}
@After
public void tearDown() throws IOException {
if (rm != null) {
rm.stop();
}
fs.delete(workingPath, true);
fs.delete(tmpDir, true);
}
@Test
public void testAdminRefreshQueuesWithLocalConfigurationProvider()
throws IOException, YarnException {
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
CapacityScheduler cs =
(CapacityScheduler) rm.getRMContext().getScheduler();
int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications();
try {
rm.adminService.refreshQueues(RefreshQueuesRequest.newInstance());
Assert.assertEquals(maxAppsBefore, cs.getConfiguration()
.getMaximumSystemApplications());
} catch (Exception ex) {
fail("Using localConfigurationProvider. Should not get any exception.");
}
}
@Test
public void testAdminRefreshQueuesWithFileSystemBasedConfigurationProvider()
throws IOException, YarnException {
Configuration.addDefaultResource(YarnConfiguration.CS_CONFIGURATION_FILE);
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
// clean the remoteDirectory
cleanRemoteDirectory();
CapacityScheduler cs =
(CapacityScheduler) rm.getRMContext().getScheduler();
int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications();
try {
rm.adminService.refreshQueues(RefreshQueuesRequest.newInstance());
fail("FileSystemBasedConfigurationProvider is used." +
" Should get an exception here");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains(
"Can not find Configuration: capacity-scheduler.xml"));
}
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.set("yarn.scheduler.capacity.maximum-applications", "5000");
String csConfFile = writeConfigurationXML(csConf,
"capacity-scheduler.xml");
// upload the file into Remote File System
uploadToRemoteFileSystem(new Path(csConfFile));
rm.adminService.refreshQueues(RefreshQueuesRequest.newInstance());
int maxAppsAfter = cs.getConfiguration().getMaximumSystemApplications();
Assert.assertEquals(maxAppsAfter, 5000);
Assert.assertTrue(maxAppsAfter != maxAppsBefore);
}
private String writeConfigurationXML(Configuration conf, String confXMLName)
throws IOException {
DataOutputStream output = null;
try {
final File confFile = new File(tmpDir.toString(), confXMLName);
if (confFile.exists()) {
confFile.delete();
}
if (!confFile.createNewFile()) {
Assert.fail("Can not create " + confXMLName);
}
output = new DataOutputStream(
new FileOutputStream(confFile));
conf.writeXml(output);
return confFile.getAbsolutePath();
} finally {
if (output != null) {
output.close();
}
}
}
private void uploadToRemoteFileSystem(Path filePath)
throws IOException {
fs.copyFromLocalFile(filePath, workingPath);
}
private void cleanRemoteDirectory() throws IOException {
if (fs.exists(workingPath)) {
for (FileStatus file : fs.listStatus(workingPath)) {
fs.delete(file.getPath(), true);
}
}
}
}