YARN-1459. Changed ResourceManager to depend its service initialization on the configuration-provider mechanism during startup too. Contributed by Xuan Gong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1566791 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-02-10 22:50:15 +00:00
parent e74e117ad3
commit 1fa6ab249b
19 changed files with 266 additions and 176 deletions

View File

@ -66,6 +66,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
@ -454,9 +455,10 @@ public void refreshServiceAcl(Configuration conf, PolicyProvider provider) {
* Refresh the service authorization ACL for the service handled by this server
* using the specified Configuration.
*/
public void refreshServiceAclWithConfigration(Configuration conf,
@Private
public void refreshServiceAclWithLoadedConfiguration(Configuration conf,
PolicyProvider provider) {
serviceAuthorizationManager.refreshWithConfiguration(conf, provider);
serviceAuthorizationManager.refreshWithLoadedConfiguration(conf, provider);
}
/**
* Returns a handle to the serviceAuthorizationManager (required in tests)

View File

@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -122,10 +123,11 @@ public synchronized void refresh(Configuration conf,
// Make a copy of the original config, and load the policy file
Configuration policyConf = new Configuration(conf);
policyConf.addResource(policyFile);
refreshWithConfiguration(policyConf, provider);
refreshWithLoadedConfiguration(policyConf, provider);
}
public synchronized void refreshWithConfiguration(Configuration conf,
@Private
public synchronized void refreshWithLoadedConfiguration(Configuration conf,
PolicyProvider provider) {
final Map<Class<?>, AccessControlList> newAcls =
new IdentityHashMap<Class<?>, AccessControlList>();

View File

@ -169,6 +169,10 @@ Release 2.4.0 - UNRELEASED
YARN-1493. Changed ResourceManager and Scheduler interfacing to recognize
app-attempts separately from apps. (Jian He via vinodkv)
YARN-1459. Changed ResourceManager to depend its service initialization
on the configuration-provider mechanism during startup too. (Xuan Gong via
vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -309,4 +309,10 @@
<Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!-- Multithreaded correctness warnings need to be ignored here as this is for creating the singleton.-->
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider"/>
<Bug pattern="DC_DOUBLECHECK" />
</Match>
</FindBugsFilter>

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.conf;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@ -34,8 +33,8 @@
*/
public abstract class ConfigurationProvider {
public void init(Configuration conf) throws Exception {
initInternal(conf);
public void init(Configuration bootstrapConf) throws Exception {
initInternal(bootstrapConf);
}
public void close() throws Exception {
@ -43,19 +42,21 @@ public void close() throws Exception {
}
/**
* Get the configuration.
* Get the configuration and combine with bootstrapConf
* @param bootstrapConf Configuration
* @param name The configuration file name
* @return configuration
* @throws YarnException
* @throws IOException
*/
public abstract Configuration getConfiguration(String name)
throws YarnException, IOException;
public abstract Configuration getConfiguration(Configuration bootstrapConf,
String name) throws YarnException, IOException;
/**
* Derived classes initialize themselves using this method.
*/
public abstract void initInternal(Configuration conf) throws Exception;
public abstract void initInternal(Configuration bootstrapConf)
throws Exception;
/**
* Derived classes close themselves using this method.

View File

@ -33,12 +33,12 @@ public class ConfigurationProviderFactory {
/**
* Creates an instance of {@link ConfigurationProvider} using given
* configuration.
* @param conf
* @param bootstrapConf
* @return configurationProvider
*/
@SuppressWarnings("unchecked")
public static ConfigurationProvider
getConfigurationProvider(Configuration conf) {
getConfigurationProvider(Configuration bootstrapConf) {
Class<? extends ConfigurationProvider> defaultProviderClass;
try {
defaultProviderClass = (Class<? extends ConfigurationProvider>)
@ -49,9 +49,11 @@ public class ConfigurationProviderFactory {
"Invalid default configuration provider class"
+ YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS, e);
}
ConfigurationProvider configurationProvider = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
defaultProviderClass, ConfigurationProvider.class), conf);
ConfigurationProvider configurationProvider =
ReflectionUtils.newInstance(bootstrapConf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
defaultProviderClass, ConfigurationProvider.class),
bootstrapConf);
return configurationProvider;
}
}

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -42,24 +41,24 @@ public class FileSystemBasedConfigurationProvider
private Path configDir;
@Override
public synchronized Configuration getConfiguration(String name)
throws IOException, YarnException {
public synchronized Configuration getConfiguration(Configuration bootstrapConf,
String name) throws IOException, YarnException {
Path configPath = new Path(this.configDir, name);
if (!fs.exists(configPath)) {
throw new YarnException("Can not find Configuration: " + name + " in "
+ configDir);
}
Configuration conf = new Configuration(false);
conf.addResource(fs.open(configPath));
return conf;
bootstrapConf.addResource(fs.open(configPath));
return bootstrapConf;
}
@Override
public synchronized void initInternal(Configuration conf) throws Exception {
public synchronized void initInternal(Configuration bootstrapConf)
throws Exception {
configDir =
new Path(conf.get(YarnConfiguration.FS_BASED_RM_CONF_STORE,
new Path(bootstrapConf.get(YarnConfiguration.FS_BASED_RM_CONF_STORE,
YarnConfiguration.DEFAULT_FS_BASED_RM_CONF_STORE));
fs = configDir.getFileSystem(conf);
fs = configDir.getFileSystem(bootstrapConf);
if (!fs.exists(configDir)) {
fs.mkdirs(configDir);
}

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@ -31,13 +30,13 @@
public class LocalConfigurationProvider extends ConfigurationProvider {
@Override
public Configuration getConfiguration(String name)
throws IOException, YarnException {
return new Configuration();
public Configuration getConfiguration(Configuration bootstrapConf,
String name) throws IOException, YarnException {
return bootstrapConf;
}
@Override
public void initInternal(Configuration conf) throws Exception {
public void initInternal(Configuration bootstrapConf) throws Exception {
// Do nothing
}

View File

@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
@ -45,11 +46,8 @@
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -92,8 +90,6 @@ public class AdminService extends CompositeService implements
private Server server;
private InetSocketAddress masterServiceAddress;
private AccessControlList adminAcl;
private ConfigurationProvider configurationProvider = null;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@ -115,10 +111,6 @@ public synchronized void serviceInit(Configuration conf) throws Exception {
}
}
this.configurationProvider =
ConfigurationProviderFactory.getConfigurationProvider(conf);
configurationProvider.init(conf);
masterServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
@ -139,9 +131,6 @@ protected synchronized void serviceStart() throws Exception {
@Override
protected synchronized void serviceStop() throws Exception {
stopServer();
if (this.configurationProvider != null) {
configurationProvider.close();
}
super.serviceStop();
}
@ -158,7 +147,10 @@ protected void startServer() throws Exception {
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(conf, new RMPolicyProvider());
refreshServiceAcls(
getConfiguration(conf,
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE),
RMPolicyProvider.getInstance());
}
if (rmContext.isHAEnabled()) {
@ -321,8 +313,8 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
RefreshQueuesResponse response =
recordFactory.newRecordInstance(RefreshQueuesResponse.class);
try {
Configuration conf =
getConfiguration(YarnConfiguration.CS_CONFIGURATION_FILE);
Configuration conf = getConfiguration(getConfig(),
YarnConfiguration.CS_CONFIGURATION_FILE);
rmContext.getScheduler().reinitialize(conf, this.rmContext);
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
@ -376,7 +368,8 @@ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfigu
}
Configuration conf =
getConfiguration(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
getConfiguration(getConfig(),
YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
RMAuditLogger.logSuccess(user.getShortUserName(),
argName, "AdminService");
@ -421,7 +414,7 @@ public RefreshAdminAclsResponse refreshAdminAcls(
throwStandbyException();
}
Configuration conf =
getConfiguration(YarnConfiguration.YARN_SITE_XML_FILE);
getConfiguration(getConfig(), YarnConfiguration.YARN_SITE_XML_FILE);
adminAcl = new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
@ -452,9 +445,10 @@ public RefreshServiceAclsResponse refreshServiceAcls(
throwStandbyException();
}
PolicyProvider policyProvider = new RMPolicyProvider();
PolicyProvider policyProvider = RMPolicyProvider.getInstance();
Configuration conf =
getConfiguration(YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
getConfiguration(getConfig(),
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
refreshServiceAcls(conf, policyProvider);
rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
@ -466,12 +460,13 @@ public RefreshServiceAclsResponse refreshServiceAcls(
return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
}
synchronized void refreshServiceAcls(Configuration configuration,
private synchronized void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
if (this.configurationProvider instanceof LocalConfigurationProvider) {
if (this.rmContext.getConfigurationProvider() instanceof
LocalConfigurationProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
} else {
this.server.refreshServiceAclWithConfigration(configuration,
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
policyProvider);
}
}
@ -521,9 +516,10 @@ public UpdateNodeResourceResponse updateNodeResource(
return response;
}
private synchronized Configuration getConfiguration(String confFileName)
throws YarnException, IOException {
return this.configurationProvider.getConfiguration(confFileName);
private synchronized Configuration getConfiguration(Configuration conf,
String confFileName) throws YarnException, IOException {
return this.rmContext.getConfigurationProvider().getConfiguration(conf,
confFileName);
}
@VisibleForTesting

View File

@ -105,7 +105,6 @@ public class ApplicationMasterService extends AbstractService implements
private final AllocateResponse resync =
recordFactory.newRecordInstance(AllocateResponse.class);
private final RMContext rmContext;
private boolean useLocalConfigurationProvider;
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
super(ApplicationMasterService.class.getName());
@ -115,15 +114,6 @@ public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
this.rmContext = rmContext;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.useLocalConfigurationProvider =
(LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
LocalConfigurationProvider.class)));
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
@ -150,7 +140,10 @@ protected void serviceStart() throws Exception {
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(conf, new RMPolicyProvider());
refreshServiceAcls(
this.rmContext.getConfigurationProvider().getConfiguration(conf,
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE),
RMPolicyProvider.getInstance());
}
this.server.start();
@ -591,10 +584,11 @@ public void unregisterAttempt(ApplicationAttemptId attemptId) {
public void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
if (this.useLocalConfigurationProvider) {
if (this.rmContext.getConfigurationProvider() instanceof
LocalConfigurationProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
} else {
this.server.refreshServiceAclWithConfigration(configuration,
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
policyProvider);
}
}

View File

@ -136,7 +136,6 @@ public class ClientRMService extends AbstractService implements
private final ApplicationACLsManager applicationsACLsManager;
private final QueueACLsManager queueACLsManager;
private boolean useLocalConfigurationProvider;
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
@ -154,10 +153,6 @@ public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
@Override
protected void serviceInit(Configuration conf) throws Exception {
clientBindAddress = getBindAddress(conf);
this.useLocalConfigurationProvider =
(LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
LocalConfigurationProvider.class)));
super.serviceInit(conf);
}
@ -176,7 +171,10 @@ protected void serviceStart() throws Exception {
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(conf, new RMPolicyProvider());
refreshServiceAcls(
this.rmContext.getConfigurationProvider().getConfiguration(conf,
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE),
RMPolicyProvider.getInstance());
}
this.server.start();
@ -809,10 +807,11 @@ private String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
if (this.useLocalConfigurationProvider) {
if (this.rmContext.getConfigurationProvider() instanceof
LocalConfigurationProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
} else {
this.server.refreshServiceAclWithConfigration(configuration,
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
policyProvider);
}
}

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@ -97,4 +98,5 @@ void setRMDelegationTokenSecretManager(
void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter);
ConfigurationProvider getConfigurationProvider();
}

View File

@ -23,8 +23,10 @@
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
@ -78,7 +80,7 @@ public class RMContextImpl implements RMContext {
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private ConfigurationProvider configurationProvider;
/**
* Default constructor. To be used in conjunction with setter methods for
* individual fields.
@ -119,8 +121,11 @@ public RMContextImpl(Dispatcher rmDispatcher,
} catch (Exception e) {
assert false;
}
ConfigurationProvider provider = new LocalConfigurationProvider();
setConfigurationProvider(provider);
}
@Override
public Dispatcher getDispatcher() {
return this.rmDispatcher;
@ -334,4 +339,13 @@ public void setRMApplicationHistoryWriter(
this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
}
@Override
public ConfigurationProvider getConfigurationProvider() {
return this.configurationProvider;
}
public void setConfigurationProvider(
ConfigurationProvider configurationProvider) {
this.configurationProvider = configurationProvider;
}
}

View File

@ -42,10 +42,13 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@ -154,7 +157,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
private boolean recoveryEnabled;
private String webAppAddress;
private ConfigurationProvider configurationProvider = null;
/** End of Active services */
private Configuration conf;
@ -182,6 +185,21 @@ protected void serviceInit(Configuration conf) throws Exception {
this.conf = conf;
this.rmContext = new RMContextImpl();
this.configurationProvider =
ConfigurationProviderFactory.getConfigurationProvider(conf);
this.configurationProvider.init(this.conf);
rmContext.setConfigurationProvider(configurationProvider);
if (!(this.configurationProvider instanceof LocalConfigurationProvider)) {
// load yarn-site.xml
this.conf =
this.configurationProvider.getConfiguration(this.conf,
YarnConfiguration.YARN_SITE_XML_FILE);
// load core-site.xml
this.conf =
this.configurationProvider.getConfiguration(this.conf,
YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
}
// register the handlers for all AlwaysOn services using setupDispatcher().
rmDispatcher = setupDispatcher();
addIfService(rmDispatcher);
@ -884,6 +902,9 @@ protected void serviceStop() throws Exception {
if (fetcher != null) {
fetcher.stop();
}
if (configurationProvider != null) {
configurationProvider.close();
}
super.serviceStop();
transitionToStandby(false);
rmContext.setHAServiceState(HAServiceState.STOPPING);

View File

@ -95,7 +95,6 @@ public class ResourceTrackerService extends AbstractService implements
private int minAllocMb;
private int minAllocVcores;
private boolean useLocalConfigurationProvider;
static {
resync.setNodeAction(NodeAction.RESYNC);
@ -145,10 +144,6 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
this.useLocalConfigurationProvider =
(LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
LocalConfigurationProvider.class)));
super.serviceInit(conf);
}
@ -169,7 +164,10 @@ protected void serviceStart() throws Exception {
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(conf, new RMPolicyProvider());
refreshServiceAcls(
this.rmContext.getConfigurationProvider().getConfiguration(conf,
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE),
RMPolicyProvider.getInstance());
}
this.server.start();
@ -423,10 +421,11 @@ public static Node resolve(String hostName) {
void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
if (this.useLocalConfigurationProvider) {
if (this.rmContext.getConfigurationProvider() instanceof
LocalConfigurationProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
} else {
this.server.refreshServiceAclWithConfigration(configuration,
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
policyProvider);
}
}

View File

@ -196,7 +196,6 @@ public Configuration getConf() {
private ResourceCalculator calculator;
private boolean usePortForNodeName;
private boolean useLocalConfigurationProvider;
public CapacityScheduler() {}
@ -262,14 +261,21 @@ public Resource getClusterResources() {
@Override
public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException {
Configuration configuration = new Configuration(conf);
if (!initialized) {
this.useLocalConfigurationProvider =
(LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
LocalConfigurationProvider.class)));
this.conf =
new CapacitySchedulerConfiguration(conf,
this.useLocalConfigurationProvider);
if (rmContext.getConfigurationProvider() instanceof
LocalConfigurationProvider) {
this.conf = new CapacitySchedulerConfiguration(configuration, true);
} else {
try {
this.conf =
new CapacitySchedulerConfiguration(rmContext
.getConfigurationProvider().getConfiguration(configuration,
YarnConfiguration.CS_CONFIGURATION_FILE), false);
} catch (Exception e) {
throw new IOException(e);
}
}
validateConf(this.conf);
this.minimumAllocation = this.conf.getMinimumAllocation();
this.maximumAllocation = this.conf.getMaximumAllocation();
@ -290,7 +296,8 @@ public Resource getClusterResources() {
CapacitySchedulerConfiguration oldConf = this.conf;
this.conf =
new CapacitySchedulerConfiguration(conf,
this.useLocalConfigurationProvider);
rmContext.getConfigurationProvider() instanceof
LocalConfigurationProvider);
validateConf(this.conf);
try {
LOG.info("Re-initializing queues...");
@ -316,6 +323,7 @@ public CSQueue hook(CSQueue queue) {
@Lock(CapacityScheduler.class)
private void initializeQueues(CapacitySchedulerConfiguration conf)
throws IOException {
root =
parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
queues, queues, noop);

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.security.authorize;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.authorize.PolicyProvider;
@ -37,6 +39,23 @@
@InterfaceStability.Unstable
public class RMPolicyProvider extends PolicyProvider {
private static RMPolicyProvider rmPolicyProvider = null;
private RMPolicyProvider() {}
@Private
@Unstable
public static RMPolicyProvider getInstance() {
if (rmPolicyProvider == null) {
synchronized(RMPolicyProvider.class) {
if (rmPolicyProvider == null) {
rmPolicyProvider = new RMPolicyProvider();
}
}
}
return rmPolicyProvider;
}
private static final Service[] resourceManagerServices =
new Service[] {
new Service(

View File

@ -26,7 +26,6 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.authorize.AccessControlList;
@ -105,34 +104,34 @@ public void testAdminRefreshQueuesWithFileSystemBasedConfigurationProvider()
throws IOException, YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
try {
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
fail("Should throw an exception");
} catch(Exception ex) {
// Expect exception here
}
// clean the remoteDirectory
cleanRemoteDirectory();
//upload default configurations
uploadDefaultConfiguration();
try {
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
} catch(Exception ex) {
fail("Should not get any exceptions");
}
CapacityScheduler cs =
(CapacityScheduler) rm.getRMContext().getScheduler();
int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications();
try {
rm.adminService.refreshQueues(RefreshQueuesRequest.newInstance());
fail("FileSystemBasedConfigurationProvider is used." +
" Should get an exception here");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains(
"Can not find Configuration: capacity-scheduler.xml"));
}
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.set("yarn.scheduler.capacity.maximum-applications", "5000");
String csConfFile = writeConfigurationXML(csConf,
"capacity-scheduler.xml");
// upload the file into Remote File System
uploadToRemoteFileSystem(new Path(csConfFile));
uploadConfiguration(csConf, "capacity-scheduler.xml");
rm.adminService.refreshQueues(RefreshQueuesRequest.newInstance());
@ -159,20 +158,24 @@ public void testAdminAclsWithFileSystemBasedConfigurationProvider()
throws IOException, YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
try {
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
fail("Should throw an exception");
} catch(Exception ex) {
// Expect exception here
}
// clean the remoteDirectory
cleanRemoteDirectory();
//upload default configurations
uploadDefaultConfiguration();
try {
rm.adminService.refreshAdminAcls(RefreshAdminAclsRequest.newInstance());
fail("FileSystemBasedConfigurationProvider is used." +
" Should get an exception here");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains(
"Can not find Configuration: yarn-site.xml"));
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
} catch(Exception ex) {
fail("Should not get any exceptions");
}
String aclStringBefore =
@ -180,10 +183,8 @@ public void testAdminAclsWithFileSystemBasedConfigurationProvider()
YarnConfiguration yarnConf = new YarnConfiguration();
yarnConf.set(YarnConfiguration.YARN_ADMIN_ACL, "world:anyone:rwcda");
String yarnConfFile = writeConfigurationXML(yarnConf, "yarn-site.xml");
uploadConfiguration(yarnConf, "yarn-site.xml");
// upload the file into Remote File System
uploadToRemoteFileSystem(new Path(yarnConfFile));
rm.adminService.refreshAdminAcls(RefreshAdminAclsRequest.newInstance());
String aclStringAfter =
@ -214,7 +215,6 @@ public void testServiceAclsRefreshWithLocalConfigurationProvider() {
}
}
@SuppressWarnings("resource")
@Test
public void testServiceAclsRefreshWithFileSystemBasedConfigurationProvider()
throws IOException, YarnException {
@ -224,33 +224,33 @@ public void testServiceAclsRefreshWithFileSystemBasedConfigurationProvider()
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
ResourceManager resourceManager = null;
try {
resourceManager = new ResourceManager();
resourceManager.init(configuration);
resourceManager.start();
// clean the remoteDirectory
cleanRemoteDirectory();
try {
resourceManager.adminService
.refreshServiceAcls(RefreshServiceAclsRequest
.newInstance());
fail("FileSystemBasedConfigurationProvider is used." +
" Should get an exception here");
resourceManager = new ResourceManager();
resourceManager.init(configuration);
resourceManager.start();
fail("Should throw an exception");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains(
"Can not find Configuration: hadoop-policy.xml"));
// expect to get an exception here
}
String aclsString = "alice,bob users,wheel";
//upload default configurations
uploadDefaultConfiguration();
Configuration conf = new Configuration();
conf.setBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true);
conf.set("security.applicationclient.protocol.acl", aclsString);
String hadoopConfFile = writeConfigurationXML(conf, "hadoop-policy.xml");
uploadConfiguration(conf, "core-site.xml");
try {
resourceManager = new ResourceManager();
resourceManager.init(configuration);
resourceManager.start();
} catch (Exception ex) {
fail("Should not get any exceptions");
}
// upload the file into Remote File System
uploadToRemoteFileSystem(new Path(hadoopConfFile));
String aclsString = "alice,bob users,wheel";
Configuration newConf = new Configuration();
newConf.set("security.applicationclient.protocol.acl", aclsString);
uploadConfiguration(newConf, "hadoop-policy.xml");
resourceManager.adminService.refreshServiceAcls(RefreshServiceAclsRequest
.newInstance());
@ -328,31 +328,31 @@ private void verifyServiceACLsRefresh(ServiceAuthorizationManager manager,
throws IOException, YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
try {
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
fail("Should throw an exception");
} catch(Exception ex) {
// Expect exception here
}
// clean the remoteDirectory
cleanRemoteDirectory();
//upload default configurations
uploadDefaultConfiguration();
try {
rm.adminService.refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest.newInstance());
fail("FileSystemBasedConfigurationProvider is used." +
" Should get an exception here");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains(
"Can not find Configuration: core-site.xml"));
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
} catch(Exception ex) {
fail("Should not get any exceptions");
}
Configuration coreConf = new Configuration(false);
coreConf.set("hadoop.proxyuser.test.groups", "test_groups");
coreConf.set("hadoop.proxyuser.test.hosts", "test_hosts");
String coreConfFile = writeConfigurationXML(coreConf,
"core-site.xml");
uploadConfiguration(coreConf, "core-site.xml");
// upload the file into Remote File System
uploadToRemoteFileSystem(new Path(coreConfFile));
rm.adminService.refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest.newInstance());
Assert.assertTrue(ProxyUsers.getProxyGroups()
@ -393,11 +393,29 @@ private void uploadToRemoteFileSystem(Path filePath)
fs.copyFromLocalFile(filePath, workingPath);
}
private void cleanRemoteDirectory() throws IOException {
if (fs.exists(workingPath)) {
for (FileStatus file : fs.listStatus(workingPath)) {
fs.delete(file.getPath(), true);
}
}
private void uploadConfiguration(Configuration conf, String confFileName)
throws IOException {
String csConfFile = writeConfigurationXML(conf, confFileName);
// upload the file into Remote File System
uploadToRemoteFileSystem(new Path(csConfFile));
}
private void uploadDefaultConfiguration() throws IOException {
Configuration conf = new Configuration();
uploadConfiguration(conf, "core-site.xml");
YarnConfiguration yarnConf = new YarnConfiguration();
yarnConf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
uploadConfiguration(yarnConf, "yarn-site.xml");
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
uploadConfiguration(csConf, "capacity-scheduler.xml");
Configuration hadoopPolicyConf = new Configuration(false);
hadoopPolicyConf
.addResource(YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
uploadConfiguration(hadoopPolicyConf, "hadoop-policy.xml");
}
}

View File

@ -40,6 +40,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -104,6 +105,7 @@ public class TestCapacityScheduler {
private static float B3_CAPACITY = 20;
private ResourceManager resourceManager = null;
private RMContext mockContext;
@Before
public void setUp() throws Exception {
@ -118,6 +120,9 @@ public void setUp() throws Exception {
resourceManager.getRMContainerTokenSecretManager().rollMasterKey();
resourceManager.getRMNMTokenSecretManager().rollMasterKey();
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
mockContext = mock(RMContext.class);
when(mockContext.getConfigurationProvider()).thenReturn(
new LocalConfigurationProvider());
}
@After
@ -133,7 +138,7 @@ public void testConfValidation() throws Exception {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
try {
scheduler.reinitialize(conf, null);
scheduler.reinitialize(conf, mockContext);
fail("Exception is expected because the min memory allocation is" +
" larger than the max memory allocation.");
} catch (YarnRuntimeException e) {
@ -147,7 +152,7 @@ public void testConfValidation() throws Exception {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1);
try {
scheduler.reinitialize(conf, null);
scheduler.reinitialize(conf, mockContext);
fail("Exception is expected because the min vcores allocation is" +
" larger than the max vcores allocation.");
} catch (YarnRuntimeException e) {
@ -353,7 +358,7 @@ null, new RMContainerTokenSecretManager(conf),
conf.setCapacity(A, 80f);
conf.setCapacity(B, 20f);
cs.reinitialize(conf,null);
cs.reinitialize(conf, mockContext);
checkQueueCapacities(cs, 80f, 20f);
}
@ -503,7 +508,7 @@ null, new RMContainerTokenSecretManager(conf),
conf.setCapacity(B2, B2_CAPACITY);
conf.setCapacity(B3, B3_CAPACITY);
conf.setCapacity(B4, B4_CAPACITY);
cs.reinitialize(conf,null);
cs.reinitialize(conf,mockContext);
checkQueueCapacities(cs, 80f, 20f);
// Verify parent for B4