diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
index 7556831701..57446d3d64 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
@@ -166,6 +166,27 @@ public class CommonConfigurationKeysPublic {
public static final String FS_AUTOMATIC_CLOSE_KEY = "fs.automatic.close";
/** Default value for FS_AUTOMATIC_CLOSE_KEY */
public static final boolean FS_AUTOMATIC_CLOSE_DEFAULT = true;
+
+ /**
+ * Number of filesystems instances can be created in parallel.
+ *
+ * A higher number here does not necessarily improve performance, especially
+ * for object stores, where multiple threads may be attempting to create an FS
+ * instance for the same URI.
+ *
+ * Default value: {@value}.
+ */
+ public static final String FS_CREATION_PARALLEL_COUNT =
+ "fs.creation.parallel.count";
+
+ /**
+ * Default value for {@link #FS_CREATION_PARALLEL_COUNT}.
+ *
+ * Default value: {@value}.
+ */
+ public static final int FS_CREATION_PARALLEL_COUNT_DEFAULT =
+ 64;
+
/**
* @see
*
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 9fffb8aa5b..e814b3da91 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -21,6 +21,7 @@
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.lang.ref.WeakReference;
import java.lang.ref.ReferenceQueue;
import java.net.URI;
@@ -44,6 +45,7 @@
import java.util.Stack;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@@ -75,6 +77,7 @@
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.util.ClassUtil;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
@@ -200,7 +203,7 @@ public abstract class FileSystem extends Configured
public static final String USER_HOME_PREFIX = "/user";
/** FileSystem cache. */
- static final Cache CACHE = new Cache();
+ static final Cache CACHE = new Cache(new Configuration());
/** The key this instance is stored under in the cache. */
private Cache.Key key;
@@ -2591,8 +2594,11 @@ public void close() throws IOException {
+ "; Object Identity Hash: "
+ Integer.toHexString(System.identityHashCode(this)));
// delete all files that were marked as delete-on-exit.
- processDeleteOnExit();
- CACHE.remove(this.key, this);
+ try {
+ processDeleteOnExit();
+ } finally {
+ CACHE.remove(this.key, this);
+ }
}
/**
@@ -3453,7 +3459,9 @@ public static Class extends FileSystem> getFileSystemClass(String scheme,
private static FileSystem createFileSystem(URI uri, Configuration conf)
throws IOException {
Tracer tracer = FsTracer.get(conf);
- try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) {
+ try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
+ DurationInfo ignored =
+ new DurationInfo(LOGGER, false, "Creating FS %s", uri)) {
scope.addKVAnnotation("scheme", uri.getScheme());
Class extends FileSystem> clazz =
getFileSystemClass(uri.getScheme(), conf);
@@ -3476,15 +3484,39 @@ private static FileSystem createFileSystem(URI uri, Configuration conf)
}
/** Caching FileSystem objects. */
- static class Cache {
+ static final class Cache {
private final ClientFinalizer clientFinalizer = new ClientFinalizer();
private final Map map = new HashMap<>();
private final Set toAutoClose = new HashSet<>();
+ /** Semaphore used to serialize creation of new FS instances. */
+ private final Semaphore creatorPermits;
+
+ /**
+ * Counter of the number of discarded filesystem instances
+ * in this cache. Primarily for testing, but it could possibly
+ * be made visible as some kind of metric.
+ */
+ private final AtomicLong discardedInstances = new AtomicLong(0);
+
/** A variable that makes all objects in the cache unique. */
private static AtomicLong unique = new AtomicLong(1);
+ /**
+ * Instantiate. The configuration is used to read the
+ * count of permits issued for concurrent creation
+ * of filesystem instances.
+ * @param conf configuration
+ */
+ Cache(final Configuration conf) {
+ int permits = conf.getInt(FS_CREATION_PARALLEL_COUNT,
+ FS_CREATION_PARALLEL_COUNT_DEFAULT);
+ checkArgument(permits > 0, "Invalid value of %s: %s",
+ FS_CREATION_PARALLEL_COUNT, permits);
+ creatorPermits = new Semaphore(permits);
+ }
+
FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
return getInternal(uri, conf, key);
@@ -3518,33 +3550,86 @@ private FileSystem getInternal(URI uri, Configuration conf, Key key)
if (fs != null) {
return fs;
}
-
- fs = createFileSystem(uri, conf);
- final long timeout = conf.getTimeDuration(SERVICE_SHUTDOWN_TIMEOUT,
- SERVICE_SHUTDOWN_TIMEOUT_DEFAULT,
- ShutdownHookManager.TIME_UNIT_DEFAULT);
- synchronized (this) { // refetch the lock again
- FileSystem oldfs = map.get(key);
- if (oldfs != null) { // a file system is created while lock is releasing
- fs.close(); // close the new file system
- return oldfs; // return the old file system
- }
-
- // now insert the new file system into the map
- if (map.isEmpty()
- && !ShutdownHookManager.get().isShutdownInProgress()) {
- ShutdownHookManager.get().addShutdownHook(clientFinalizer,
- SHUTDOWN_HOOK_PRIORITY, timeout,
- ShutdownHookManager.TIME_UNIT_DEFAULT);
- }
- fs.key = key;
- map.put(key, fs);
- if (conf.getBoolean(
- FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
- toAutoClose.add(key);
- }
- return fs;
+ // fs not yet created, acquire lock
+ // to construct an instance.
+ try (DurationInfo d = new DurationInfo(LOGGER, false,
+ "Acquiring creator semaphore for %s", uri)) {
+ creatorPermits.acquire();
+ } catch (InterruptedException e) {
+ // acquisition was interrupted; convert to an IOE.
+ throw (IOException)new InterruptedIOException(e.toString())
+ .initCause(e);
}
+ FileSystem fsToClose = null;
+ try {
+ // See if FS was instantiated by another thread while waiting
+ // for the permit.
+ synchronized (this) {
+ fs = map.get(key);
+ }
+ if (fs != null) {
+ LOGGER.debug("Filesystem {} created while awaiting semaphore", uri);
+ return fs;
+ }
+ // create the filesystem
+ fs = createFileSystem(uri, conf);
+ final long timeout = conf.getTimeDuration(SERVICE_SHUTDOWN_TIMEOUT,
+ SERVICE_SHUTDOWN_TIMEOUT_DEFAULT,
+ ShutdownHookManager.TIME_UNIT_DEFAULT);
+ // any FS to close outside of the synchronized section
+ synchronized (this) { // lock on the Cache object
+
+ // see if there is now an entry for the FS, which happens
+ // if another thread's creation overlapped with this one.
+ FileSystem oldfs = map.get(key);
+ if (oldfs != null) {
+ // a file system was created in a separate thread.
+ // save the FS reference to close outside all locks,
+ // and switch to returning the oldFS
+ fsToClose = fs;
+ fs = oldfs;
+ } else {
+ // register the clientFinalizer if needed and shutdown isn't
+ // already active
+ if (map.isEmpty()
+ && !ShutdownHookManager.get().isShutdownInProgress()) {
+ ShutdownHookManager.get().addShutdownHook(clientFinalizer,
+ SHUTDOWN_HOOK_PRIORITY, timeout,
+ ShutdownHookManager.TIME_UNIT_DEFAULT);
+ }
+ // insert the new file system into the map
+ fs.key = key;
+ map.put(key, fs);
+ if (conf.getBoolean(
+ FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
+ toAutoClose.add(key);
+ }
+ }
+ } // end of synchronized block
+ } finally {
+ // release the creator permit.
+ creatorPermits.release();
+ }
+ if (fsToClose != null) {
+ LOGGER.debug("Duplicate FS created for {}; discarding {}",
+ uri, fs);
+ discardedInstances.incrementAndGet();
+ // close the new file system
+ // note this will briefly remove and reinstate "fsToClose" from
+ // the map. It is done in a synchronized block so will not be
+ // visible to others.
+ IOUtils.cleanupWithLogger(LOGGER, fsToClose);
+ }
+ return fs;
+ }
+
+ /**
+ * Get the count of discarded instances.
+ * @return the new instance.
+ */
+ @VisibleForTesting
+ long getDiscardedInstances() {
+ return discardedInstances.get();
}
synchronized void remove(Key key, FileSystem fs) {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java
index b3c38475d4..01abeaaf57 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java
@@ -21,22 +21,30 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.test.HadoopTestBase;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
+import org.assertj.core.api.Assertions;
import org.junit.Test;
-import java.security.PrivilegedExceptionAction;
-import java.util.concurrent.Semaphore;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_CREATION_PARALLEL_COUNT;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
-import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
-public class TestFileSystemCaching {
+public class TestFileSystemCaching extends HadoopTestBase {
@Test
public void testCacheEnabled() throws Exception {
@@ -336,4 +344,133 @@ public void testCacheIncludesURIUserInfo() throws Throwable {
assertNotEquals(keyA, new FileSystem.Cache.Key(
new URI("wasb://a:password@account.blob.core.windows.net"), conf));
}
+
+
+ /**
+ * Single semaphore: no surplus FS instances will be created
+ * and then discarded.
+ */
+ @Test
+ public void testCacheSingleSemaphoredConstruction() throws Exception {
+ FileSystem.Cache cache = semaphoredCache(1);
+ createFileSystems(cache, 10);
+ Assertions.assertThat(cache.getDiscardedInstances())
+ .describedAs("Discarded FS instances")
+ .isEqualTo(0);
+ }
+
+ /**
+ * Dual semaphore: thread 2 will get as far as
+ * blocking in the initialize() method while awaiting
+ * thread 1 to complete its initialization.
+ *
+ * The thread 2 FS instance will be discarded.
+ * All other threads will block for a cache semaphore,
+ * so when they are given an opportunity to proceed,
+ * they will find that an FS instance exists.
+ */
+ @Test
+ public void testCacheDualSemaphoreConstruction() throws Exception {
+ FileSystem.Cache cache = semaphoredCache(2);
+ createFileSystems(cache, 10);
+ Assertions.assertThat(cache.getDiscardedInstances())
+ .describedAs("Discarded FS instances")
+ .isEqualTo(1);
+ }
+
+ /**
+ * Construct the FS instances in a cache with effectively no
+ * limit on the number of instances which can be created
+ * simultaneously.
+ *
+ * This is the effective state before HADOOP-17313.
+ *
+ * All but one thread's FS instance will be discarded.
+ */
+ @Test
+ public void testCacheLargeSemaphoreConstruction() throws Exception {
+ FileSystem.Cache cache = semaphoredCache(999);
+ int count = 10;
+ createFileSystems(cache, count);
+ Assertions.assertThat(cache.getDiscardedInstances())
+ .describedAs("Discarded FS instances")
+ .isEqualTo(count -1);
+ }
+
+ /**
+ * Create a cache with a given semaphore size.
+ * @param semaphores number of semaphores
+ * @return the cache.
+ */
+ private FileSystem.Cache semaphoredCache(final int semaphores) {
+ final Configuration conf1 = new Configuration();
+ conf1.setInt(FS_CREATION_PARALLEL_COUNT, semaphores);
+ FileSystem.Cache cache = new FileSystem.Cache(conf1);
+ return cache;
+ }
+
+ /**
+ * Attempt to create {@code count} filesystems in parallel,
+ * then assert that they are all equal.
+ * @param cache cache to use
+ * @param count count of filesystems to instantiate
+ */
+ private void createFileSystems(final FileSystem.Cache cache, final int count)
+ throws URISyntaxException, InterruptedException,
+ java.util.concurrent.ExecutionException {
+ final Configuration conf = new Configuration();
+ conf.set("fs.blocking.impl", BlockingInitializer.NAME);
+ // only one instance can be created at a time.
+ URI uri = new URI("blocking://a");
+ ListeningExecutorService pool =
+ BlockingThreadPoolExecutorService.newInstance(count * 2, 0,
+ 10, TimeUnit.SECONDS,
+ "creation-threads");
+
+ // submit a set of requests to create an FS instance.
+ // the semaphore will block all but one, and that will block until
+ // it is allowed to continue
+ List> futures = new ArrayList<>(count);
+
+ // acquire the semaphore so blocking all FS instances from
+ // being initialized.
+ Semaphore semaphore = BlockingInitializer.SEM;
+ semaphore.acquire();
+
+ for (int i = 0; i < count; i++) {
+ futures.add(pool.submit(
+ () -> cache.get(uri, conf)));
+ }
+ // now let all blocked initializers free
+ semaphore.release();
+ // get that first FS
+ FileSystem createdFS = futures.get(0).get();
+ // verify all the others are the same instance
+ for (int i = 1; i < count; i++) {
+ FileSystem fs = futures.get(i).get();
+ Assertions.assertThat(fs)
+ .isSameAs(createdFS);
+ }
+ }
+
+ /**
+ * An FS which blocks in initialize() until it can acquire the shared
+ * semaphore (which it then releases).
+ */
+ private static final class BlockingInitializer extends LocalFileSystem {
+
+ private static final String NAME = BlockingInitializer.class.getName();
+
+ private static final Semaphore SEM = new Semaphore(1);
+
+ @Override
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ try {
+ SEM.acquire();
+ SEM.release();
+ } catch (InterruptedException e) {
+ throw new IOException(e.toString(), e);
+ }
+ }
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
index 68e768d38d..c8f1c0ed46 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
@@ -629,6 +629,8 @@ being placed on the classpath.
## Tuning FileSystem Initialization.
+### Disabling bucket existence checks
+
When an S3A Filesystem instance is created and initialized, the client
checks if the bucket provided is valid. This can be slow.
You can ignore bucket validation by configuring `fs.s3a.bucket.probe` as follows:
@@ -642,3 +644,52 @@ You can ignore bucket validation by configuring `fs.s3a.bucket.probe` as follows
Note: if the bucket does not exist, this issue will surface when operations are performed
on the filesystem; you will see `UnknownStoreException` stack traces.
+
+### Rate limiting parallel FileSystem creation operations
+
+Applications normally ask for filesystems from the shared cache,
+via `FileSystem.get()` or `Path.getFileSystem()`.
+The cache, `FileSystem.CACHE` will, for each user, cachec one instance of a filesystem
+for a given URI.
+All calls to `FileSystem.get` for a cached FS for a URI such
+as `s3a://landsat-pds/` will return that singe single instance.
+
+FileSystem instances are created on-demand for the cache,
+and will be done in each thread which requests an instance.
+This is done outside of any synchronisation block.
+Once a task has an initialized FileSystem instance, it will, in a synchronized block
+add it to the cache.
+If it turns out that the cache now already has an instance for that URI, it will
+revert the cached copy to it, and close the FS instance it has just created.
+
+If a FileSystem takes time to be initialized, and many threads are trying to
+retrieve a FileSystem instance for the same S3 bucket in parallel,
+All but one of the threads will be doing useless work, and may unintentionally
+be creating lock contention on shared objects.
+
+There is an option, `fs.creation.parallel.count`, which uses a semaphore
+to limit the number of FS instances which may be created in parallel.
+
+Setting this to a low number will reduce the amount of wasted work,
+at the expense of limiting the number of FileSystem clients which
+can be created simultaneously for different object stores/distributed
+filesystems.
+
+For example, a value of four would put an upper limit on the number
+of wasted instantiations of a connector for the `s3a://landsat-pds/`
+bucket.
+
+```xml
+
+ fs.creation.parallel.count
+ 4
+
+```
+
+It would also mean that if four threads were in the process
+of creating such connectors, all threads trying to create
+connectors for other buckets, would end up blocking too.
+
+Consider experimenting with this when running applications
+where many threads may try to simultaneously interact
+with the same slow-to-initialize object stores.
\ No newline at end of file