diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs-httpfs/dev-support/findbugsExcludeFile.xml index b72f5bc26e..2ffc069596 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/dev-support/findbugsExcludeFile.xml @@ -30,4 +30,9 @@ + + + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java index eb31b06084..35255d3405 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java @@ -27,6 +27,7 @@ import org.apache.hadoop.lib.service.FileSystemAccess; import org.apache.hadoop.lib.service.FileSystemAccessException; import org.apache.hadoop.lib.service.Instrumentation; +import org.apache.hadoop.lib.service.Scheduler; import org.apache.hadoop.lib.util.Check; import org.apache.hadoop.lib.util.ConfigurationUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -42,6 +43,8 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class FileSystemAccessService extends BaseService implements FileSystemAccess { @@ -54,6 +57,8 @@ public class FileSystemAccessService extends BaseService implements FileSystemAc public static final String AUTHENTICATION_TYPE = "authentication.type"; public static final String KERBEROS_KEYTAB = "authentication.kerberos.keytab"; public static final String KERBEROS_PRINCIPAL = "authentication.kerberos.principal"; + public static final String FS_CACHE_PURGE_FREQUENCY = "filesystem.cache.purge.frequency"; + public static final String FS_CACHE_PURGE_TIMEOUT = "filesystem.cache.purge.timeout"; public static final String NAME_NODE_WHITELIST = "name.node.whitelist"; @@ -63,6 +68,61 @@ public class FileSystemAccessService extends BaseService implements FileSystemAc private static final String FILE_SYSTEM_SERVICE_CREATED = "FileSystemAccessService.created"; + private static class CachedFileSystem { + private FileSystem fs; + private long lastUse; + private long timeout; + private int count; + + public CachedFileSystem(long timeout) { + this.timeout = timeout; + lastUse = -1; + count = 0; + } + + synchronized FileSystem getFileSytem(Configuration conf) + throws IOException { + if (fs == null) { + fs = FileSystem.get(conf); + } + lastUse = -1; + count++; + return fs; + } + + synchronized void release() throws IOException { + count--; + if (count == 0) { + if (timeout == 0) { + fs.close(); + fs = null; + lastUse = -1; + } + else { + lastUse = System.currentTimeMillis(); + } + } + } + + // to avoid race conditions in the map cache adding removing entries + // an entry in the cache remains forever, it just closes/opens filesystems + // based on their utilization. Worse case scenario, the penalty we'll + // pay is that the amount of entries in the cache will be the total + // number of users in HDFS (which seems a resonable overhead). + synchronized boolean purgeIfIdle() throws IOException { + boolean ret = false; + if (count == 0 && lastUse != -1 && + (System.currentTimeMillis() - lastUse) > timeout) { + fs.close(); + fs = null; + lastUse = -1; + ret = true; + } + return ret; + } + + } + public FileSystemAccessService() { super(PREFIX); } @@ -73,6 +133,11 @@ public FileSystemAccessService() { private AtomicInteger unmanagedFileSystems = new AtomicInteger(); + private ConcurrentHashMap fsCache = + new ConcurrentHashMap(); + + private long purgeTimeout; + @Override protected void init() throws ServiceException { LOG.info("Using FileSystemAccess JARs version [{}]", VersionInfo.getVersion()); @@ -157,6 +222,30 @@ public Long getValue() { return (long) unmanagedFileSystems.get(); } }); + Scheduler scheduler = getServer().get(Scheduler.class); + int purgeInterval = getServiceConfig().getInt(FS_CACHE_PURGE_FREQUENCY, 60); + purgeTimeout = getServiceConfig().getLong(FS_CACHE_PURGE_TIMEOUT, 60); + purgeTimeout = (purgeTimeout > 0) ? purgeTimeout : 0; + if (purgeTimeout > 0) { + scheduler.schedule(new FileSystemCachePurger(), + purgeInterval, purgeInterval, TimeUnit.SECONDS); + } + } + + private class FileSystemCachePurger implements Runnable { + + @Override + public void run() { + int count = 0; + for (CachedFileSystem cacheFs : fsCache.values()) { + try { + count += cacheFs.purgeIfIdle() ? 1 : 0; + } catch (Throwable ex) { + LOG.warn("Error while purging filesystem, " + ex.toString(), ex); + } + } + LOG.debug("Purged [{}} filesystem instances", count); + } } private Set toLowerCase(Collection collection) { @@ -174,7 +263,7 @@ public Class getInterface() { @Override public Class[] getServiceDependencies() { - return new Class[]{Instrumentation.class}; + return new Class[]{Instrumentation.class, Scheduler.class}; } protected UserGroupInformation getUGI(String user) throws IOException { @@ -185,12 +274,25 @@ protected void setRequiredServiceHadoopConf(Configuration conf) { conf.set("fs.hdfs.impl.disable.cache", "true"); } - protected FileSystem createFileSystem(Configuration namenodeConf) throws IOException { - return FileSystem.get(namenodeConf); + private static final String HTTPFS_FS_USER = "httpfs.fs.user"; + + protected FileSystem createFileSystem(Configuration namenodeConf) + throws IOException { + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + CachedFileSystem newCachedFS = new CachedFileSystem(purgeTimeout); + CachedFileSystem cachedFS = fsCache.putIfAbsent(user, newCachedFS); + if (cachedFS == null) { + cachedFS = newCachedFS; + } + Configuration conf = new Configuration(namenodeConf); + conf.set(HTTPFS_FS_USER, user); + return cachedFS.getFileSytem(conf); } protected void closeFileSystem(FileSystem fs) throws IOException { - fs.close(); + if (fsCache.containsKey(fs.getConf().get(HTTPFS_FS_USER))) { + fsCache.get(fs.getConf().get(HTTPFS_FS_USER)).release(); + } } protected void validateNamenode(String namenode) throws FileSystemAccessException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/resources/httpfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/resources/httpfs-default.xml index fc4faf5563..7171d388fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/resources/httpfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/resources/httpfs-default.xml @@ -210,4 +210,20 @@ + + httpfs.hadoop.filesystem.cache.purge.frequency + 60 + + Frequency, in seconds, for the idle filesystem purging daemon runs. + + + + + httpfs.hadoop.filesystem.cache.purge.timeout + 60 + + Timeout, in seconds, for an idle filesystem to be purged. + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/service/hadoop/TestFileSystemAccessService.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/service/hadoop/TestFileSystemAccessService.java index 192fdd1704..ed9efa945f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/service/hadoop/TestFileSystemAccessService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/service/hadoop/TestFileSystemAccessService.java @@ -18,10 +18,6 @@ package org.apache.hadoop.lib.service.hadoop; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; - import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -37,6 +33,7 @@ import org.apache.hadoop.lib.service.FileSystemAccess; import org.apache.hadoop.lib.service.FileSystemAccessException; import org.apache.hadoop.lib.service.instrumentation.InstrumentationService; +import org.apache.hadoop.lib.service.scheduler.SchedulerService; import org.apache.hadoop.test.HFSTestCase; import org.apache.hadoop.test.TestDir; import org.apache.hadoop.test.TestDirHelper; @@ -44,6 +41,7 @@ import org.apache.hadoop.test.TestHdfs; import org.apache.hadoop.test.TestHdfsHelper; import org.apache.hadoop.util.StringUtils; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -68,13 +66,15 @@ public void createHadoopConf() throws Exception { @TestDir public void simpleSecurity() throws Exception { String dir = TestDirHelper.getTestDir().getAbsolutePath(); - String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(), - FileSystemAccessService.class.getName())); + String services = StringUtils.join(",", + Arrays.asList(InstrumentationService.class.getName(), + SchedulerService.class.getName(), + FileSystemAccessService.class.getName())); Configuration conf = new Configuration(false); conf.set("server.services", services); Server server = new Server("server", dir, dir, dir, dir, conf); server.init(); - assertNotNull(server.get(FileSystemAccess.class)); + Assert.assertNotNull(server.get(FileSystemAccess.class)); server.destroy(); } @@ -83,8 +83,10 @@ public void simpleSecurity() throws Exception { @TestDir public void noKerberosKeytabProperty() throws Exception { String dir = TestDirHelper.getTestDir().getAbsolutePath(); - String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(), - FileSystemAccessService.class.getName())); + String services = StringUtils.join(",", + Arrays.asList(InstrumentationService.class.getName(), + SchedulerService.class.getName(), + FileSystemAccessService.class.getName())); Configuration conf = new Configuration(false); conf.set("server.services", services); conf.set("server.hadoop.authentication.type", "kerberos"); @@ -98,8 +100,10 @@ public void noKerberosKeytabProperty() throws Exception { @TestDir public void noKerberosPrincipalProperty() throws Exception { String dir = TestDirHelper.getTestDir().getAbsolutePath(); - String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(), - FileSystemAccessService.class.getName())); + String services = StringUtils.join(",", + Arrays.asList(InstrumentationService.class.getName(), + SchedulerService.class.getName(), + FileSystemAccessService.class.getName())); Configuration conf = new Configuration(false); conf.set("server.services", services); conf.set("server.hadoop.authentication.type", "kerberos"); @@ -114,8 +118,10 @@ public void noKerberosPrincipalProperty() throws Exception { @TestDir public void kerberosInitializationFailure() throws Exception { String dir = TestDirHelper.getTestDir().getAbsolutePath(); - String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(), - FileSystemAccessService.class.getName())); + String services = StringUtils.join(",", + Arrays.asList(InstrumentationService.class.getName(), + SchedulerService.class.getName(), + FileSystemAccessService.class.getName())); Configuration conf = new Configuration(false); conf.set("server.services", services); conf.set("server.hadoop.authentication.type", "kerberos"); @@ -130,8 +136,10 @@ public void kerberosInitializationFailure() throws Exception { @TestDir public void invalidSecurity() throws Exception { String dir = TestDirHelper.getTestDir().getAbsolutePath(); - String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(), - FileSystemAccessService.class.getName())); + String services = StringUtils.join(",", + Arrays.asList(InstrumentationService.class.getName(), + SchedulerService.class.getName(), + FileSystemAccessService.class.getName())); Configuration conf = new Configuration(false); conf.set("server.services", services); conf.set("server.hadoop.authentication.type", "foo"); @@ -143,15 +151,17 @@ public void invalidSecurity() throws Exception { @TestDir public void serviceHadoopConf() throws Exception { String dir = TestDirHelper.getTestDir().getAbsolutePath(); - String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(), - FileSystemAccessService.class.getName())); + String services = StringUtils.join(",", + Arrays.asList(InstrumentationService.class.getName(), + SchedulerService.class.getName(), + FileSystemAccessService.class.getName())); Configuration conf = new Configuration(false); conf.set("server.services", services); Server server = new Server("server", dir, dir, dir, dir, conf); server.init(); FileSystemAccessService fsAccess = (FileSystemAccessService) server.get(FileSystemAccess.class); - assertEquals(fsAccess.serviceHadoopConf.get("foo"), "FOO"); + Assert.assertEquals(fsAccess.serviceHadoopConf.get("foo"), "FOO"); server.destroy(); } @@ -161,8 +171,10 @@ public void serviceHadoopConfCustomDir() throws Exception { String dir = TestDirHelper.getTestDir().getAbsolutePath(); String hadoopConfDir = new File(dir, "confx").getAbsolutePath(); new File(hadoopConfDir).mkdirs(); - String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(), - FileSystemAccessService.class.getName())); + String services = StringUtils.join(",", + Arrays.asList(InstrumentationService.class.getName(), + SchedulerService.class.getName(), + FileSystemAccessService.class.getName())); Configuration conf = new Configuration(false); conf.set("server.services", services); conf.set("server.hadoop.config.dir", hadoopConfDir); @@ -177,7 +189,7 @@ public void serviceHadoopConfCustomDir() throws Exception { Server server = new Server("server", dir, dir, dir, dir, conf); server.init(); FileSystemAccessService fsAccess = (FileSystemAccessService) server.get(FileSystemAccess.class); - assertEquals(fsAccess.serviceHadoopConf.get("foo"), "BAR"); + Assert.assertEquals(fsAccess.serviceHadoopConf.get("foo"), "BAR"); server.destroy(); } @@ -185,8 +197,10 @@ public void serviceHadoopConfCustomDir() throws Exception { @TestDir public void inWhitelists() throws Exception { String dir = TestDirHelper.getTestDir().getAbsolutePath(); - String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(), - FileSystemAccessService.class.getName())); + String services = StringUtils.join(",", + Arrays.asList(InstrumentationService.class.getName(), + SchedulerService.class.getName(), + FileSystemAccessService.class.getName())); Configuration conf = new Configuration(false); conf.set("server.services", services); Server server = new Server("server", dir, dir, dir, dir, conf); @@ -219,8 +233,10 @@ public void inWhitelists() throws Exception { @TestDir public void NameNodeNotinWhitelists() throws Exception { String dir = TestDirHelper.getTestDir().getAbsolutePath(); - String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(), - FileSystemAccessService.class.getName())); + String services = StringUtils.join(",", + Arrays.asList(InstrumentationService.class.getName(), + SchedulerService.class.getName(), + FileSystemAccessService.class.getName())); Configuration conf = new Configuration(false); conf.set("server.services", services); conf.set("server.hadoop.name.node.whitelist", "NN"); @@ -235,8 +251,10 @@ public void NameNodeNotinWhitelists() throws Exception { @TestHdfs public void createFileSystem() throws Exception { String dir = TestDirHelper.getTestDir().getAbsolutePath(); - String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(), - FileSystemAccessService.class.getName())); + String services = StringUtils.join(",", + Arrays.asList(InstrumentationService.class.getName(), + SchedulerService.class.getName(), + FileSystemAccessService.class.getName())); Configuration hadoopConf = new Configuration(false); hadoopConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)); @@ -244,19 +262,20 @@ public void createFileSystem() throws Exception { Configuration conf = new Configuration(false); conf.set("server.services", services); + conf.set("server.hadoop.filesystem.cache.purge.timeout", "0"); Server server = new Server("server", dir, dir, dir, dir, conf); server.init(); FileSystemAccess hadoop = server.get(FileSystemAccess.class); FileSystem fs = hadoop.createFileSystem("u", hadoop.getFileSystemConfiguration()); - assertNotNull(fs); + Assert.assertNotNull(fs); fs.mkdirs(new Path("/tmp/foo")); hadoop.releaseFileSystem(fs); try { fs.mkdirs(new Path("/tmp/foo")); - fail(); + Assert.fail(); } catch (IOException ex) { } catch (Exception ex) { - fail(); + Assert.fail(); } server.destroy(); } @@ -266,8 +285,10 @@ public void createFileSystem() throws Exception { @TestHdfs public void fileSystemExecutor() throws Exception { String dir = TestDirHelper.getTestDir().getAbsolutePath(); - String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(), - FileSystemAccessService.class.getName())); + String services = StringUtils.join(",", + Arrays.asList(InstrumentationService.class.getName(), + SchedulerService.class.getName(), + FileSystemAccessService.class.getName())); Configuration hadoopConf = new Configuration(false); hadoopConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)); @@ -275,6 +296,7 @@ public void fileSystemExecutor() throws Exception { Configuration conf = new Configuration(false); conf.set("server.services", services); + conf.set("server.hadoop.filesystem.cache.purge.timeout", "0"); Server server = new Server("server", dir, dir, dir, dir, conf); server.init(); FileSystemAccess hadoop = server.get(FileSystemAccess.class); @@ -291,10 +313,10 @@ public Void execute(FileSystem fs) throws IOException { }); try { fsa[0].mkdirs(new Path("/tmp/foo")); - fail(); + Assert.fail(); } catch (IOException ex) { } catch (Exception ex) { - fail(); + Assert.fail(); } server.destroy(); } @@ -305,8 +327,10 @@ public Void execute(FileSystem fs) throws IOException { @TestHdfs public void fileSystemExecutorNoNameNode() throws Exception { String dir = TestDirHelper.getTestDir().getAbsolutePath(); - String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(), - FileSystemAccessService.class.getName())); + String services = StringUtils.join(",", + Arrays.asList(InstrumentationService.class.getName(), + SchedulerService.class.getName(), + FileSystemAccessService.class.getName())); Configuration hadoopConf = new Configuration(false); hadoopConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)); createHadoopConf(hadoopConf); @@ -332,8 +356,10 @@ public Void execute(FileSystem fs) throws IOException { @TestHdfs public void fileSystemExecutorException() throws Exception { String dir = TestDirHelper.getTestDir().getAbsolutePath(); - String services = StringUtils.join(",", Arrays.asList(InstrumentationService.class.getName(), - FileSystemAccessService.class.getName())); + String services = StringUtils.join(",", + Arrays.asList(InstrumentationService.class.getName(), + SchedulerService.class.getName(), + FileSystemAccessService.class.getName())); Configuration hadoopConf = new Configuration(false); hadoopConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)); @@ -341,6 +367,7 @@ public void fileSystemExecutorException() throws Exception { Configuration conf = new Configuration(false); conf.set("server.services", services); + conf.set("server.hadoop.filesystem.cache.purge.timeout", "0"); Server server = new Server("server", dir, dir, dir, dir, conf); server.init(); FileSystemAccess hadoop = server.get(FileSystemAccess.class); @@ -354,21 +381,86 @@ public Void execute(FileSystem fs) throws IOException { throw new IOException(); } }); - fail(); + Assert.fail(); } catch (FileSystemAccessException ex) { - assertEquals(ex.getError(), FileSystemAccessException.ERROR.H03); + Assert.assertEquals(ex.getError(), FileSystemAccessException.ERROR.H03); } catch (Exception ex) { - fail(); + Assert.fail(); } try { fsa[0].mkdirs(new Path("/tmp/foo")); - fail(); + Assert.fail(); } catch (IOException ex) { } catch (Exception ex) { - fail(); + Assert.fail(); } server.destroy(); } + @Test + @TestDir + @TestHdfs + public void fileSystemCache() throws Exception { + String dir = TestDirHelper.getTestDir().getAbsolutePath(); + String services = StringUtils.join(",", + Arrays.asList(InstrumentationService.class.getName(), + SchedulerService.class.getName(), + FileSystemAccessService.class.getName())); + + Configuration hadoopConf = new Configuration(false); + hadoopConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + TestHdfsHelper.getHdfsConf().get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)); + createHadoopConf(hadoopConf); + + Configuration conf = new Configuration(false); + conf.set("server.services", services); + conf.set("server.hadoop.filesystem.cache.purge.frequency", "1"); + conf.set("server.hadoop.filesystem.cache.purge.timeout", "1"); + Server server = new Server("server", dir, dir, dir, dir, conf); + try { + server.init(); + FileSystemAccess hadoop = server.get(FileSystemAccess.class); + + FileSystem fs1 = + hadoop.createFileSystem("u", hadoop.getFileSystemConfiguration()); + Assert.assertNotNull(fs1); + fs1.mkdirs(new Path("/tmp/foo1")); + hadoop.releaseFileSystem(fs1); + + //still around because of caching + fs1.mkdirs(new Path("/tmp/foo2")); + + FileSystem fs2 = + hadoop.createFileSystem("u", hadoop.getFileSystemConfiguration()); + + //should be same instance because of caching + Assert.assertEquals(fs1, fs2); + + Thread.sleep(4 * 1000); + + //still around because of lease count is 1 (fs2 is out) + fs1.mkdirs(new Path("/tmp/foo2")); + + Thread.sleep(4 * 1000); + + //still around because of lease count is 1 (fs2 is out) + fs2.mkdirs(new Path("/tmp/foo")); + + hadoop.releaseFileSystem(fs2); + Thread.sleep(4 * 1000); + + //should not be around as lease count is 0 + try { + fs2.mkdirs(new Path("/tmp/foo")); + Assert.fail(); + } catch (IOException ex) { + } catch (Exception ex) { + Assert.fail(); + } + } finally { + server.destroy(); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 08adf3195b..346c9820ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -201,6 +201,8 @@ Branch-2 ( Unreleased changes ) HDFS-3113. httpfs does not support delegation tokens. (tucu) + HDFS-3513. HttpFS should cache filesystems. (tucu) + IMPROVEMENTS HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG