HDFS-3513. HttpFS should cache filesystems. (tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1368304 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2012-08-01 23:09:19 +00:00
parent 28320c5220
commit fe17d871d0
5 changed files with 264 additions and 47 deletions

View File

@ -30,4 +30,9 @@
<Field name="authority" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.lib.service.hadoop.FileSystemAccessService" />
<Method name="closeFileSystem" />
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE" />
</Match>
</FindBugsFilter>

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.lib.service.FileSystemAccess;
import org.apache.hadoop.lib.service.FileSystemAccessException;
import org.apache.hadoop.lib.service.Instrumentation;
import org.apache.hadoop.lib.service.Scheduler;
import org.apache.hadoop.lib.util.Check;
import org.apache.hadoop.lib.util.ConfigurationUtils;
import org.apache.hadoop.security.UserGroupInformation;
@ -42,6 +43,8 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class FileSystemAccessService extends BaseService implements FileSystemAccess {
@ -54,6 +57,8 @@ public class FileSystemAccessService extends BaseService implements FileSystemAc
public static final String AUTHENTICATION_TYPE = "authentication.type";
public static final String KERBEROS_KEYTAB = "authentication.kerberos.keytab";
public static final String KERBEROS_PRINCIPAL = "authentication.kerberos.principal";
public static final String FS_CACHE_PURGE_FREQUENCY = "filesystem.cache.purge.frequency";
public static final String FS_CACHE_PURGE_TIMEOUT = "filesystem.cache.purge.timeout";
public static final String NAME_NODE_WHITELIST = "name.node.whitelist";
@ -63,6 +68,61 @@ public class FileSystemAccessService extends BaseService implements FileSystemAc
private static final String FILE_SYSTEM_SERVICE_CREATED = "FileSystemAccessService.created";
private static class CachedFileSystem {
private FileSystem fs;
private long lastUse;
private long timeout;
private int count;
public CachedFileSystem(long timeout) {
this.timeout = timeout;
lastUse = -1;
count = 0;
}
synchronized FileSystem getFileSytem(Configuration conf)
throws IOException {
if (fs == null) {
fs = FileSystem.get(conf);
}
lastUse = -1;
count++;
return fs;
}
synchronized void release() throws IOException {
count--;
if (count == 0) {
if (timeout == 0) {
fs.close();
fs = null;
lastUse = -1;
}
else {
lastUse = System.currentTimeMillis();
}
}
}
// to avoid race conditions in the map cache adding removing entries
// an entry in the cache remains forever, it just closes/opens filesystems
// based on their utilization. Worse case scenario, the penalty we'll
// pay is that the amount of entries in the cache will be the total
// number of users in HDFS (which seems a resonable overhead).
synchronized boolean purgeIfIdle() throws IOException {
boolean ret = false;
if (count == 0 && lastUse != -1 &&
(System.currentTimeMillis() - lastUse) > timeout) {
fs.close();
fs = null;
lastUse = -1;
ret = true;
}
return ret;
}
}
public FileSystemAccessService() {
super(PREFIX);
}
@ -73,6 +133,11 @@ public FileSystemAccessService() {
private AtomicInteger unmanagedFileSystems = new AtomicInteger();
private ConcurrentHashMap<String, CachedFileSystem> fsCache =
new ConcurrentHashMap<String, CachedFileSystem>();
private long purgeTimeout;
@Override
protected void init() throws ServiceException {
LOG.info("Using FileSystemAccess JARs version [{}]", VersionInfo.getVersion());
@ -157,6 +222,30 @@ public Long getValue() {
return (long) unmanagedFileSystems.get();
}
});
Scheduler scheduler = getServer().get(Scheduler.class);
int purgeInterval = getServiceConfig().getInt(FS_CACHE_PURGE_FREQUENCY, 60);
purgeTimeout = getServiceConfig().getLong(FS_CACHE_PURGE_TIMEOUT, 60);
purgeTimeout = (purgeTimeout > 0) ? purgeTimeout : 0;
if (purgeTimeout > 0) {
scheduler.schedule(new FileSystemCachePurger(),
purgeInterval, purgeInterval, TimeUnit.SECONDS);
}
}
private class FileSystemCachePurger implements Runnable {
@Override
public void run() {
int count = 0;
for (CachedFileSystem cacheFs : fsCache.values()) {
try {
count += cacheFs.purgeIfIdle() ? 1 : 0;
} catch (Throwable ex) {
LOG.warn("Error while purging filesystem, " + ex.toString(), ex);
}
}
LOG.debug("Purged [{}} filesystem instances", count);
}
}
private Set<String> toLowerCase(Collection<String> collection) {
@ -174,7 +263,7 @@ public Class getInterface() {
@Override
public Class[] getServiceDependencies() {
return new Class[]{Instrumentation.class};
return new Class[]{Instrumentation.class, Scheduler.class};
}
protected UserGroupInformation getUGI(String user) throws IOException {
@ -185,12 +274,25 @@ protected void setRequiredServiceHadoopConf(Configuration conf) {
conf.set("fs.hdfs.impl.disable.cache", "true");
}
protected FileSystem createFileSystem(Configuration namenodeConf) throws IOException {
return FileSystem.get(namenodeConf);
private static final String HTTPFS_FS_USER = "httpfs.fs.user";
protected FileSystem createFileSystem(Configuration namenodeConf)
throws IOException {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
CachedFileSystem newCachedFS = new CachedFileSystem(purgeTimeout);
CachedFileSystem cachedFS = fsCache.putIfAbsent(user, newCachedFS);
if (cachedFS == null) {
cachedFS = newCachedFS;
}
Configuration conf = new Configuration(namenodeConf);
conf.set(HTTPFS_FS_USER, user);
return cachedFS.getFileSytem(conf);
}
protected void closeFileSystem(FileSystem fs) throws IOException {
fs.close();
if (fsCache.containsKey(fs.getConf().get(HTTPFS_FS_USER))) {
fsCache.get(fs.getConf().get(HTTPFS_FS_USER)).release();
}
}
protected void validateNamenode(String namenode) throws FileSystemAccessException {

View File

@ -210,4 +210,20 @@
</description>
</property>
<property>
<name>httpfs.hadoop.filesystem.cache.purge.frequency</name>
<value>60</value>
<description>
Frequency, in seconds, for the idle filesystem purging daemon runs.
</description>
</property>
<property>
<name>httpfs.hadoop.filesystem.cache.purge.timeout</name>
<value>60</value>
<description>
Timeout, in seconds, for an idle filesystem to be purged.
</description>
</property>
</configuration>

View File

@ -18,10 +18,6 @@
package org.apache.hadoop.lib.service.hadoop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@ -37,6 +33,7 @@
import org.apache.hadoop.lib.service.FileSystemAccess;
import org.apache.hadoop.lib.service.FileSystemAccessException;
import org.apache.hadoop.lib.service.instrumentation.InstrumentationService;
import org.apache.hadoop.lib.service.scheduler.SchedulerService;
import org.apache.hadoop.test.HFSTestCase;
import org.apache.hadoop.test.TestDir;
import org.apache.hadoop.test.TestDirHelper;
@ -44,6 +41,7 @@
import org.apache.hadoop.test.TestHdfs;
import org.apache.hadoop.test.TestHdfsHelper;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -68,13 +66,15 @@ public void createHadoopConf() throws Exception {
@TestDir
public void simpleSecurity() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
Server server = new Server("server", dir, dir, dir, dir, conf);
server.init();
assertNotNull(server.get(FileSystemAccess.class));
Assert.assertNotNull(server.get(FileSystemAccess.class));
server.destroy();
}
@ -83,8 +83,10 @@ public void simpleSecurity() throws Exception {
@TestDir
public void noKerberosKeytabProperty() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.authentication.type", "kerberos");
@ -98,8 +100,10 @@ public void noKerberosKeytabProperty() throws Exception {
@TestDir
public void noKerberosPrincipalProperty() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.authentication.type", "kerberos");
@ -114,8 +118,10 @@ public void noKerberosPrincipalProperty() throws Exception {
@TestDir
public void kerberosInitializationFailure() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.authentication.type", "kerberos");
@ -130,8 +136,10 @@ public void kerberosInitializationFailure() throws Exception {
@TestDir
public void invalidSecurity() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.authentication.type", "foo");
@ -143,15 +151,17 @@ public void invalidSecurity() throws Exception {
@TestDir
public void serviceHadoopConf() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
Server server = new Server("server", dir, dir, dir, dir, conf);
server.init();
FileSystemAccessService fsAccess = (FileSystemAccessService) server.get(FileSystemAccess.class);
assertEquals(fsAccess.serviceHadoopConf.get("foo"), "FOO");
Assert.assertEquals(fsAccess.serviceHadoopConf.get("foo"), "FOO");
server.destroy();
}
@ -161,8 +171,10 @@ public void serviceHadoopConfCustomDir() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String hadoopConfDir = new File(dir, "confx").getAbsolutePath();
new File(hadoopConfDir).mkdirs();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.config.dir", hadoopConfDir);
@ -177,7 +189,7 @@ public void serviceHadoopConfCustomDir() throws Exception {
Server server = new Server("server", dir, dir, dir, dir, conf);
server.init();
FileSystemAccessService fsAccess = (FileSystemAccessService) server.get(FileSystemAccess.class);
assertEquals(fsAccess.serviceHadoopConf.get("foo"), "BAR");
Assert.assertEquals(fsAccess.serviceHadoopConf.get("foo"), "BAR");
server.destroy();
}
@ -185,8 +197,10 @@ public void serviceHadoopConfCustomDir() throws Exception {
@TestDir
public void inWhitelists() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
Server server = new Server("server", dir, dir, dir, dir, conf);
@ -219,8 +233,10 @@ public void inWhitelists() throws Exception {
@TestDir
public void NameNodeNotinWhitelists() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.name.node.whitelist", "NN");
@ -235,8 +251,10 @@ public void NameNodeNotinWhitelists() throws Exception {
@TestHdfs
public void createFileSystem() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration hadoopConf = new Configuration(false);
hadoopConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
@ -244,19 +262,20 @@ public void createFileSystem() throws Exception {
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.filesystem.cache.purge.timeout", "0");
Server server = new Server("server", dir, dir, dir, dir, conf);
server.init();
FileSystemAccess hadoop = server.get(FileSystemAccess.class);
FileSystem fs = hadoop.createFileSystem("u", hadoop.getFileSystemConfiguration());
assertNotNull(fs);
Assert.assertNotNull(fs);
fs.mkdirs(new Path("/tmp/foo"));
hadoop.releaseFileSystem(fs);
try {
fs.mkdirs(new Path("/tmp/foo"));
fail();
Assert.fail();
} catch (IOException ex) {
} catch (Exception ex) {
fail();
Assert.fail();
}
server.destroy();
}
@ -266,8 +285,10 @@ public void createFileSystem() throws Exception {
@TestHdfs
public void fileSystemExecutor() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration hadoopConf = new Configuration(false);
hadoopConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
@ -275,6 +296,7 @@ public void fileSystemExecutor() throws Exception {
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.filesystem.cache.purge.timeout", "0");
Server server = new Server("server", dir, dir, dir, dir, conf);
server.init();
FileSystemAccess hadoop = server.get(FileSystemAccess.class);
@ -291,10 +313,10 @@ public Void execute(FileSystem fs) throws IOException {
});
try {
fsa[0].mkdirs(new Path("/tmp/foo"));
fail();
Assert.fail();
} catch (IOException ex) {
} catch (Exception ex) {
fail();
Assert.fail();
}
server.destroy();
}
@ -305,8 +327,10 @@ public Void execute(FileSystem fs) throws IOException {
@TestHdfs
public void fileSystemExecutorNoNameNode() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration hadoopConf = new Configuration(false);
hadoopConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
createHadoopConf(hadoopConf);
@ -332,8 +356,10 @@ public Void execute(FileSystem fs) throws IOException {
@TestHdfs
public void fileSystemExecutorException() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(),
FileSystemAccessService.class.getName()));
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration hadoopConf = new Configuration(false);
hadoopConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
@ -341,6 +367,7 @@ public void fileSystemExecutorException() throws Exception {
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.filesystem.cache.purge.timeout", "0");
Server server = new Server("server", dir, dir, dir, dir, conf);
server.init();
FileSystemAccess hadoop = server.get(FileSystemAccess.class);
@ -354,21 +381,86 @@ public Void execute(FileSystem fs) throws IOException {
throw new IOException();
}
});
fail();
Assert.fail();
} catch (FileSystemAccessException ex) {
assertEquals(ex.getError(), FileSystemAccessException.ERROR.H03);
Assert.assertEquals(ex.getError(), FileSystemAccessException.ERROR.H03);
} catch (Exception ex) {
fail();
Assert.fail();
}
try {
fsa[0].mkdirs(new Path("/tmp/foo"));
fail();
Assert.fail();
} catch (IOException ex) {
} catch (Exception ex) {
fail();
Assert.fail();
}
server.destroy();
}
@Test
@TestDir
@TestHdfs
public void fileSystemCache() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
String services = StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName()));
Configuration hadoopConf = new Configuration(false);
hadoopConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
createHadoopConf(hadoopConf);
Configuration conf = new Configuration(false);
conf.set("server.services", services);
conf.set("server.hadoop.filesystem.cache.purge.frequency", "1");
conf.set("server.hadoop.filesystem.cache.purge.timeout", "1");
Server server = new Server("server", dir, dir, dir, dir, conf);
try {
server.init();
FileSystemAccess hadoop = server.get(FileSystemAccess.class);
FileSystem fs1 =
hadoop.createFileSystem("u", hadoop.getFileSystemConfiguration());
Assert.assertNotNull(fs1);
fs1.mkdirs(new Path("/tmp/foo1"));
hadoop.releaseFileSystem(fs1);
//still around because of caching
fs1.mkdirs(new Path("/tmp/foo2"));
FileSystem fs2 =
hadoop.createFileSystem("u", hadoop.getFileSystemConfiguration());
//should be same instance because of caching
Assert.assertEquals(fs1, fs2);
Thread.sleep(4 * 1000);
//still around because of lease count is 1 (fs2 is out)
fs1.mkdirs(new Path("/tmp/foo2"));
Thread.sleep(4 * 1000);
//still around because of lease count is 1 (fs2 is out)
fs2.mkdirs(new Path("/tmp/foo"));
hadoop.releaseFileSystem(fs2);
Thread.sleep(4 * 1000);
//should not be around as lease count is 0
try {
fs2.mkdirs(new Path("/tmp/foo"));
Assert.fail();
} catch (IOException ex) {
} catch (Exception ex) {
Assert.fail();
}
} finally {
server.destroy();
}
}
}

View File

@ -201,6 +201,8 @@ Branch-2 ( Unreleased changes )
HDFS-3113. httpfs does not support delegation tokens. (tucu)
HDFS-3513. HttpFS should cache filesystems. (tucu)
IMPROVEMENTS
HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG