YARN-9755. Fixed RM failing to start when FileSystemBasedConfigurationProvider is configured.
Contributed by Prabhu Joseph
This commit is contained in:
parent
3329257d99
commit
717c853873
@ -71,10 +71,20 @@ public synchronized InputStream getConfigurationInputStream(
|
|||||||
@Override
|
@Override
|
||||||
public synchronized void initInternal(Configuration bootstrapConf)
|
public synchronized void initInternal(Configuration bootstrapConf)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
Configuration conf = new Configuration(bootstrapConf);
|
||||||
configDir =
|
configDir =
|
||||||
new Path(bootstrapConf.get(YarnConfiguration.FS_BASED_RM_CONF_STORE,
|
new Path(conf.get(YarnConfiguration.FS_BASED_RM_CONF_STORE,
|
||||||
YarnConfiguration.DEFAULT_FS_BASED_RM_CONF_STORE));
|
YarnConfiguration.DEFAULT_FS_BASED_RM_CONF_STORE));
|
||||||
fs = configDir.getFileSystem(bootstrapConf);
|
String scheme = configDir.toUri().getScheme();
|
||||||
|
if (scheme == null) {
|
||||||
|
scheme = FileSystem.getDefaultUri(conf).getScheme();
|
||||||
|
}
|
||||||
|
if (scheme != null) {
|
||||||
|
String disableCacheName = String.format("fs.%s.impl.disable.cache",
|
||||||
|
scheme);
|
||||||
|
conf.setBoolean(disableCacheName, true);
|
||||||
|
}
|
||||||
|
fs = configDir.getFileSystem(conf);
|
||||||
fs.mkdirs(configDir);
|
fs.mkdirs(configDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -267,6 +267,22 @@ protected void serviceInit(Configuration conf) throws Exception {
|
|||||||
this.rmContext = new RMContextImpl();
|
this.rmContext = new RMContextImpl();
|
||||||
rmContext.setResourceManager(this);
|
rmContext.setResourceManager(this);
|
||||||
|
|
||||||
|
// Set HA configuration should be done before login
|
||||||
|
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
|
||||||
|
if (this.rmContext.isHAEnabled()) {
|
||||||
|
HAUtil.verifyAndSetConfiguration(this.conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set UGI and do login
|
||||||
|
// If security is enabled, use login user
|
||||||
|
// If security is not enabled, use current user
|
||||||
|
this.rmLoginUGI = UserGroupInformation.getCurrentUser();
|
||||||
|
try {
|
||||||
|
doSecureLogin();
|
||||||
|
} catch(IOException ie) {
|
||||||
|
throw new YarnRuntimeException("Failed to login", ie);
|
||||||
|
}
|
||||||
|
|
||||||
this.configurationProvider =
|
this.configurationProvider =
|
||||||
ConfigurationProviderFactory.getConfigurationProvider(conf);
|
ConfigurationProviderFactory.getConfigurationProvider(conf);
|
||||||
this.configurationProvider.init(this.conf);
|
this.configurationProvider.init(this.conf);
|
||||||
@ -286,22 +302,6 @@ protected void serviceInit(Configuration conf) throws Exception {
|
|||||||
|
|
||||||
validateConfigs(this.conf);
|
validateConfigs(this.conf);
|
||||||
|
|
||||||
// Set HA configuration should be done before login
|
|
||||||
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
|
|
||||||
if (this.rmContext.isHAEnabled()) {
|
|
||||||
HAUtil.verifyAndSetConfiguration(this.conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set UGI and do login
|
|
||||||
// If security is enabled, use login user
|
|
||||||
// If security is not enabled, use current user
|
|
||||||
this.rmLoginUGI = UserGroupInformation.getCurrentUser();
|
|
||||||
try {
|
|
||||||
doSecureLogin();
|
|
||||||
} catch(IOException ie) {
|
|
||||||
throw new YarnRuntimeException("Failed to login", ie);
|
|
||||||
}
|
|
||||||
|
|
||||||
// register the handlers for all AlwaysOn services using setupDispatcher().
|
// register the handlers for all AlwaysOn services using setupDispatcher().
|
||||||
rmDispatcher = setupDispatcher();
|
rmDispatcher = setupDispatcher();
|
||||||
addIfService(rmDispatcher);
|
addIfService(rmDispatcher);
|
||||||
|
@ -39,6 +39,8 @@
|
|||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||||
@ -50,6 +52,7 @@
|
|||||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||||
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
||||||
|
import org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
|
||||||
@ -59,6 +62,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
@ -172,6 +176,39 @@ public void testAdminRefreshQueuesWithLocalConfigurationProvider()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFileSystemCloseWithFileSystemBasedConfigurationProvider()
|
||||||
|
throws Exception {
|
||||||
|
MiniDFSCluster hdfsCluster = null;
|
||||||
|
try {
|
||||||
|
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
|
||||||
|
hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
|
||||||
|
.numDataNodes(1).build();
|
||||||
|
FileSystem fs1 = hdfsCluster.getFileSystem();
|
||||||
|
ConfigurationProvider configurationProvider = new
|
||||||
|
FileSystemBasedConfigurationProvider();
|
||||||
|
configurationProvider.init(hdfsConfig);
|
||||||
|
fs1.close();
|
||||||
|
try {
|
||||||
|
configurationProvider.getConfigurationInputStream(hdfsConfig,
|
||||||
|
"yarn-site.xml");
|
||||||
|
} catch (IOException e) {
|
||||||
|
if (e.getMessage().contains("Filesystem closed")) {
|
||||||
|
fail("FileSystemBasedConfigurationProvider failed to handle " +
|
||||||
|
"FileSystem close");
|
||||||
|
} else {
|
||||||
|
fail("Should not get any exceptions");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (hdfsCluster != null) {
|
||||||
|
hdfsCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAdminRefreshQueuesWithFileSystemBasedConfigurationProvider()
|
public void testAdminRefreshQueuesWithFileSystemBasedConfigurationProvider()
|
||||||
throws IOException, YarnException {
|
throws IOException, YarnException {
|
||||||
|
Loading…
Reference in New Issue
Block a user