HADOOP-17313. FileSystem.get to support slow-to-instantiate FS clients. (#2396)

This adds a semaphore to throttle the number of FileSystem instances which
can be created simultaneously, set in "fs.creation.parallel.count".

This is designed to reduce the impact of many threads in an application calling
FileSystem.get() on a filesystem which takes time to instantiate -for example
to an object where HTTPS connections are set up during initialization.
Many threads trying to do this may create spurious delays by conflicting
for access to synchronized blocks, when simply limiting the parallelism
diminishes the conflict, so speeds up all threads trying to access
the store.

The default value, 64, is larger than is likely to deliver any speedup -but
it does mean that there should be no adverse effects from the change.

If a service appears to be blocking on all threads initializing connections to
abfs, s3a or store, try a smaller (possibly significantly smaller) value.

Contributed by Steve Loughran.
This commit is contained in:
Steve Loughran 2020-11-25 14:31:02 +00:00 committed by GitHub
parent 3193d8c793
commit ac7045b75f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 329 additions and 35 deletions

View File

@ -166,6 +166,27 @@ public class CommonConfigurationKeysPublic {
public static final String FS_AUTOMATIC_CLOSE_KEY = "fs.automatic.close"; public static final String FS_AUTOMATIC_CLOSE_KEY = "fs.automatic.close";
/** Default value for FS_AUTOMATIC_CLOSE_KEY */ /** Default value for FS_AUTOMATIC_CLOSE_KEY */
public static final boolean FS_AUTOMATIC_CLOSE_DEFAULT = true; public static final boolean FS_AUTOMATIC_CLOSE_DEFAULT = true;
/**
* Number of filesystems instances can be created in parallel.
* <p></p>
* A higher number here does not necessarily improve performance, especially
* for object stores, where multiple threads may be attempting to create an FS
* instance for the same URI.
* <p></p>
* Default value: {@value}.
*/
public static final String FS_CREATION_PARALLEL_COUNT =
"fs.creation.parallel.count";
/**
* Default value for {@link #FS_CREATION_PARALLEL_COUNT}.
* <p></p>
* Default value: {@value}.
*/
public static final int FS_CREATION_PARALLEL_COUNT_DEFAULT =
64;
/** /**
* @see * @see
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml"> * <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">

View File

@ -21,6 +21,7 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.lang.ref.ReferenceQueue; import java.lang.ref.ReferenceQueue;
import java.net.URI; import java.net.URI;
@ -44,6 +45,7 @@
import java.util.Stack; import java.util.Stack;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -75,6 +77,7 @@
import org.apache.hadoop.security.token.DelegationTokenIssuer; import org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.util.ClassUtil; import org.apache.hadoop.util.ClassUtil;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils; import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
@ -200,7 +203,7 @@ public abstract class FileSystem extends Configured
public static final String USER_HOME_PREFIX = "/user"; public static final String USER_HOME_PREFIX = "/user";
/** FileSystem cache. */ /** FileSystem cache. */
static final Cache CACHE = new Cache(); static final Cache CACHE = new Cache(new Configuration());
/** The key this instance is stored under in the cache. */ /** The key this instance is stored under in the cache. */
private Cache.Key key; private Cache.Key key;
@ -2591,8 +2594,11 @@ public void close() throws IOException {
+ "; Object Identity Hash: " + "; Object Identity Hash: "
+ Integer.toHexString(System.identityHashCode(this))); + Integer.toHexString(System.identityHashCode(this)));
// delete all files that were marked as delete-on-exit. // delete all files that were marked as delete-on-exit.
processDeleteOnExit(); try {
CACHE.remove(this.key, this); processDeleteOnExit();
} finally {
CACHE.remove(this.key, this);
}
} }
/** /**
@ -3453,7 +3459,9 @@ public static Class<? extends FileSystem> getFileSystemClass(String scheme,
private static FileSystem createFileSystem(URI uri, Configuration conf) private static FileSystem createFileSystem(URI uri, Configuration conf)
throws IOException { throws IOException {
Tracer tracer = FsTracer.get(conf); Tracer tracer = FsTracer.get(conf);
try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) { try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
DurationInfo ignored =
new DurationInfo(LOGGER, false, "Creating FS %s", uri)) {
scope.addKVAnnotation("scheme", uri.getScheme()); scope.addKVAnnotation("scheme", uri.getScheme());
Class<? extends FileSystem> clazz = Class<? extends FileSystem> clazz =
getFileSystemClass(uri.getScheme(), conf); getFileSystemClass(uri.getScheme(), conf);
@ -3476,15 +3484,39 @@ private static FileSystem createFileSystem(URI uri, Configuration conf)
} }
/** Caching FileSystem objects. */ /** Caching FileSystem objects. */
static class Cache { static final class Cache {
private final ClientFinalizer clientFinalizer = new ClientFinalizer(); private final ClientFinalizer clientFinalizer = new ClientFinalizer();
private final Map<Key, FileSystem> map = new HashMap<>(); private final Map<Key, FileSystem> map = new HashMap<>();
private final Set<Key> toAutoClose = new HashSet<>(); private final Set<Key> toAutoClose = new HashSet<>();
/** Semaphore used to serialize creation of new FS instances. */
private final Semaphore creatorPermits;
/**
* Counter of the number of discarded filesystem instances
* in this cache. Primarily for testing, but it could possibly
* be made visible as some kind of metric.
*/
private final AtomicLong discardedInstances = new AtomicLong(0);
/** A variable that makes all objects in the cache unique. */ /** A variable that makes all objects in the cache unique. */
private static AtomicLong unique = new AtomicLong(1); private static AtomicLong unique = new AtomicLong(1);
/**
* Instantiate. The configuration is used to read the
* count of permits issued for concurrent creation
* of filesystem instances.
* @param conf configuration
*/
Cache(final Configuration conf) {
int permits = conf.getInt(FS_CREATION_PARALLEL_COUNT,
FS_CREATION_PARALLEL_COUNT_DEFAULT);
checkArgument(permits > 0, "Invalid value of %s: %s",
FS_CREATION_PARALLEL_COUNT, permits);
creatorPermits = new Semaphore(permits);
}
FileSystem get(URI uri, Configuration conf) throws IOException{ FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf); Key key = new Key(uri, conf);
return getInternal(uri, conf, key); return getInternal(uri, conf, key);
@ -3518,33 +3550,86 @@ private FileSystem getInternal(URI uri, Configuration conf, Key key)
if (fs != null) { if (fs != null) {
return fs; return fs;
} }
// fs not yet created, acquire lock
fs = createFileSystem(uri, conf); // to construct an instance.
final long timeout = conf.getTimeDuration(SERVICE_SHUTDOWN_TIMEOUT, try (DurationInfo d = new DurationInfo(LOGGER, false,
SERVICE_SHUTDOWN_TIMEOUT_DEFAULT, "Acquiring creator semaphore for %s", uri)) {
ShutdownHookManager.TIME_UNIT_DEFAULT); creatorPermits.acquire();
synchronized (this) { // refetch the lock again } catch (InterruptedException e) {
FileSystem oldfs = map.get(key); // acquisition was interrupted; convert to an IOE.
if (oldfs != null) { // a file system is created while lock is releasing throw (IOException)new InterruptedIOException(e.toString())
fs.close(); // close the new file system .initCause(e);
return oldfs; // return the old file system
}
// now insert the new file system into the map
if (map.isEmpty()
&& !ShutdownHookManager.get().isShutdownInProgress()) {
ShutdownHookManager.get().addShutdownHook(clientFinalizer,
SHUTDOWN_HOOK_PRIORITY, timeout,
ShutdownHookManager.TIME_UNIT_DEFAULT);
}
fs.key = key;
map.put(key, fs);
if (conf.getBoolean(
FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
toAutoClose.add(key);
}
return fs;
} }
FileSystem fsToClose = null;
try {
// See if FS was instantiated by another thread while waiting
// for the permit.
synchronized (this) {
fs = map.get(key);
}
if (fs != null) {
LOGGER.debug("Filesystem {} created while awaiting semaphore", uri);
return fs;
}
// create the filesystem
fs = createFileSystem(uri, conf);
final long timeout = conf.getTimeDuration(SERVICE_SHUTDOWN_TIMEOUT,
SERVICE_SHUTDOWN_TIMEOUT_DEFAULT,
ShutdownHookManager.TIME_UNIT_DEFAULT);
// any FS to close outside of the synchronized section
synchronized (this) { // lock on the Cache object
// see if there is now an entry for the FS, which happens
// if another thread's creation overlapped with this one.
FileSystem oldfs = map.get(key);
if (oldfs != null) {
// a file system was created in a separate thread.
// save the FS reference to close outside all locks,
// and switch to returning the oldFS
fsToClose = fs;
fs = oldfs;
} else {
// register the clientFinalizer if needed and shutdown isn't
// already active
if (map.isEmpty()
&& !ShutdownHookManager.get().isShutdownInProgress()) {
ShutdownHookManager.get().addShutdownHook(clientFinalizer,
SHUTDOWN_HOOK_PRIORITY, timeout,
ShutdownHookManager.TIME_UNIT_DEFAULT);
}
// insert the new file system into the map
fs.key = key;
map.put(key, fs);
if (conf.getBoolean(
FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
toAutoClose.add(key);
}
}
} // end of synchronized block
} finally {
// release the creator permit.
creatorPermits.release();
}
if (fsToClose != null) {
LOGGER.debug("Duplicate FS created for {}; discarding {}",
uri, fs);
discardedInstances.incrementAndGet();
// close the new file system
// note this will briefly remove and reinstate "fsToClose" from
// the map. It is done in a synchronized block so will not be
// visible to others.
IOUtils.cleanupWithLogger(LOGGER, fsToClose);
}
return fs;
}
/**
* Get the count of discarded instances.
* @return the new instance.
*/
@VisibleForTesting
long getDiscardedInstances() {
return discardedInstances.get();
} }
synchronized void remove(Key key, FileSystem fs) { synchronized void remove(Key key, FileSystem fs) {

View File

@ -21,22 +21,30 @@
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.HadoopTestBase;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.assertj.core.api.Assertions;
import org.junit.Test; import org.junit.Test;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Semaphore;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_CREATION_PARALLEL_COUNT;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
public class TestFileSystemCaching { public class TestFileSystemCaching extends HadoopTestBase {
@Test @Test
public void testCacheEnabled() throws Exception { public void testCacheEnabled() throws Exception {
@ -336,4 +344,133 @@ public void testCacheIncludesURIUserInfo() throws Throwable {
assertNotEquals(keyA, new FileSystem.Cache.Key( assertNotEquals(keyA, new FileSystem.Cache.Key(
new URI("wasb://a:password@account.blob.core.windows.net"), conf)); new URI("wasb://a:password@account.blob.core.windows.net"), conf));
} }
/**
* Single semaphore: no surplus FS instances will be created
* and then discarded.
*/
@Test
public void testCacheSingleSemaphoredConstruction() throws Exception {
FileSystem.Cache cache = semaphoredCache(1);
createFileSystems(cache, 10);
Assertions.assertThat(cache.getDiscardedInstances())
.describedAs("Discarded FS instances")
.isEqualTo(0);
}
/**
* Dual semaphore: thread 2 will get as far as
* blocking in the initialize() method while awaiting
* thread 1 to complete its initialization.
* <p></p>
* The thread 2 FS instance will be discarded.
* All other threads will block for a cache semaphore,
* so when they are given an opportunity to proceed,
* they will find that an FS instance exists.
*/
@Test
public void testCacheDualSemaphoreConstruction() throws Exception {
FileSystem.Cache cache = semaphoredCache(2);
createFileSystems(cache, 10);
Assertions.assertThat(cache.getDiscardedInstances())
.describedAs("Discarded FS instances")
.isEqualTo(1);
}
/**
* Construct the FS instances in a cache with effectively no
* limit on the number of instances which can be created
* simultaneously.
* <p></p>
* This is the effective state before HADOOP-17313.
* <p></p>
* All but one thread's FS instance will be discarded.
*/
@Test
public void testCacheLargeSemaphoreConstruction() throws Exception {
FileSystem.Cache cache = semaphoredCache(999);
int count = 10;
createFileSystems(cache, count);
Assertions.assertThat(cache.getDiscardedInstances())
.describedAs("Discarded FS instances")
.isEqualTo(count -1);
}
/**
* Create a cache with a given semaphore size.
* @param semaphores number of semaphores
* @return the cache.
*/
private FileSystem.Cache semaphoredCache(final int semaphores) {
final Configuration conf1 = new Configuration();
conf1.setInt(FS_CREATION_PARALLEL_COUNT, semaphores);
FileSystem.Cache cache = new FileSystem.Cache(conf1);
return cache;
}
/**
* Attempt to create {@code count} filesystems in parallel,
* then assert that they are all equal.
* @param cache cache to use
* @param count count of filesystems to instantiate
*/
private void createFileSystems(final FileSystem.Cache cache, final int count)
throws URISyntaxException, InterruptedException,
java.util.concurrent.ExecutionException {
final Configuration conf = new Configuration();
conf.set("fs.blocking.impl", BlockingInitializer.NAME);
// only one instance can be created at a time.
URI uri = new URI("blocking://a");
ListeningExecutorService pool =
BlockingThreadPoolExecutorService.newInstance(count * 2, 0,
10, TimeUnit.SECONDS,
"creation-threads");
// submit a set of requests to create an FS instance.
// the semaphore will block all but one, and that will block until
// it is allowed to continue
List<ListenableFuture<FileSystem>> futures = new ArrayList<>(count);
// acquire the semaphore so blocking all FS instances from
// being initialized.
Semaphore semaphore = BlockingInitializer.SEM;
semaphore.acquire();
for (int i = 0; i < count; i++) {
futures.add(pool.submit(
() -> cache.get(uri, conf)));
}
// now let all blocked initializers free
semaphore.release();
// get that first FS
FileSystem createdFS = futures.get(0).get();
// verify all the others are the same instance
for (int i = 1; i < count; i++) {
FileSystem fs = futures.get(i).get();
Assertions.assertThat(fs)
.isSameAs(createdFS);
}
}
/**
* An FS which blocks in initialize() until it can acquire the shared
* semaphore (which it then releases).
*/
private static final class BlockingInitializer extends LocalFileSystem {
private static final String NAME = BlockingInitializer.class.getName();
private static final Semaphore SEM = new Semaphore(1);
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
try {
SEM.acquire();
SEM.release();
} catch (InterruptedException e) {
throw new IOException(e.toString(), e);
}
}
}
} }

View File

@ -629,6 +629,8 @@ being placed on the classpath.
## Tuning FileSystem Initialization. ## Tuning FileSystem Initialization.
### Disabling bucket existence checks
When an S3A Filesystem instance is created and initialized, the client When an S3A Filesystem instance is created and initialized, the client
checks if the bucket provided is valid. This can be slow. checks if the bucket provided is valid. This can be slow.
You can ignore bucket validation by configuring `fs.s3a.bucket.probe` as follows: You can ignore bucket validation by configuring `fs.s3a.bucket.probe` as follows:
@ -642,3 +644,52 @@ You can ignore bucket validation by configuring `fs.s3a.bucket.probe` as follows
Note: if the bucket does not exist, this issue will surface when operations are performed Note: if the bucket does not exist, this issue will surface when operations are performed
on the filesystem; you will see `UnknownStoreException` stack traces. on the filesystem; you will see `UnknownStoreException` stack traces.
### Rate limiting parallel FileSystem creation operations
Applications normally ask for filesystems from the shared cache,
via `FileSystem.get()` or `Path.getFileSystem()`.
The cache, `FileSystem.CACHE` will, for each user, cachec one instance of a filesystem
for a given URI.
All calls to `FileSystem.get` for a cached FS for a URI such
as `s3a://landsat-pds/` will return that singe single instance.
FileSystem instances are created on-demand for the cache,
and will be done in each thread which requests an instance.
This is done outside of any synchronisation block.
Once a task has an initialized FileSystem instance, it will, in a synchronized block
add it to the cache.
If it turns out that the cache now already has an instance for that URI, it will
revert the cached copy to it, and close the FS instance it has just created.
If a FileSystem takes time to be initialized, and many threads are trying to
retrieve a FileSystem instance for the same S3 bucket in parallel,
All but one of the threads will be doing useless work, and may unintentionally
be creating lock contention on shared objects.
There is an option, `fs.creation.parallel.count`, which uses a semaphore
to limit the number of FS instances which may be created in parallel.
Setting this to a low number will reduce the amount of wasted work,
at the expense of limiting the number of FileSystem clients which
can be created simultaneously for different object stores/distributed
filesystems.
For example, a value of four would put an upper limit on the number
of wasted instantiations of a connector for the `s3a://landsat-pds/`
bucket.
```xml
<property>
<name>fs.creation.parallel.count</name>
<value>4</value>
</property>
```
It would also mean that if four threads were in the process
of creating such connectors, all threads trying to create
connectors for other buckets, would end up blocking too.
Consider experimenting with this when running applications
where many threads may try to simultaneously interact
with the same slow-to-initialize object stores.