HADOOP-11684. S3a to use thread pool that blocks clients. (Thomas Demoor and Aaron Fabbri via lei)
This commit is contained in:
parent
19a0c2660c
commit
bff7c90a56
@ -951,6 +951,9 @@ Release 2.8.0 - UNRELEASED
|
||||
HADOOP-12040. Adjust inputs order for the decode API in raw erasure coder.
|
||||
(Kai Zheng via yliu)
|
||||
|
||||
HADOOP-11684. S3a to use thread pool that blocks clients. (Thomas Demoor
|
||||
and Aaron Fabbri via lei)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
||||
|
@ -843,17 +843,11 @@ for ldap providers in the same way as above does.
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.threads.max</name>
|
||||
<value>256</value>
|
||||
<value>10</value>
|
||||
<description> Maximum number of concurrent active (part)uploads,
|
||||
which each use a thread from the threadpool.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.threads.core</name>
|
||||
<value>15</value>
|
||||
<description>Number of core threads in the threadpool.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.threads.keepalivetime</name>
|
||||
<value>60</value>
|
||||
@ -863,7 +857,7 @@ for ldap providers in the same way as above does.
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.max.total.tasks</name>
|
||||
<value>1000</value>
|
||||
<value>5</value>
|
||||
<description>Number of (part)uploads allowed to the queue before
|
||||
blocking additional uploads.</description>
|
||||
</property>
|
||||
|
@ -0,0 +1,274 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
|
||||
/**
|
||||
* This ExecutorService blocks the submission of new tasks when its queue is
|
||||
* already full by using a semaphore. Task submissions require permits, task
|
||||
* completions release permits.
|
||||
* <p>
|
||||
* This is inspired by <a href = "https://github
|
||||
* .com/apache/incubator-s4/blob/master/subprojects
|
||||
* /s4-comm/src/main/java/org/apache/s4/comm/staging
|
||||
* /BlockingThreadPoolExecutorService.java"> this s4 threadpool</a>
|
||||
*/
|
||||
public class BlockingThreadPoolExecutorService
|
||||
extends ForwardingListeningExecutorService {
|
||||
|
||||
private static Logger LOG = LoggerFactory
|
||||
.getLogger(BlockingThreadPoolExecutorService.class);
|
||||
|
||||
private Semaphore queueingPermits;
|
||||
private ListeningExecutorService executorDelegatee;
|
||||
|
||||
private static final AtomicInteger POOLNUMBER = new AtomicInteger(1);
|
||||
|
||||
/**
|
||||
* Returns a {@link java.util.concurrent.ThreadFactory} that names each
|
||||
* created thread uniquely,
|
||||
* with a common prefix.
|
||||
*
|
||||
* @param prefix The prefix of every created Thread's name
|
||||
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
|
||||
*/
|
||||
public static ThreadFactory getNamedThreadFactory(final String prefix) {
|
||||
SecurityManager s = System.getSecurityManager();
|
||||
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() :
|
||||
Thread.currentThread().getThreadGroup();
|
||||
|
||||
return new ThreadFactory() {
|
||||
private final AtomicInteger threadNumber = new AtomicInteger(1);
|
||||
private final int poolNum = POOLNUMBER.getAndIncrement();
|
||||
private final ThreadGroup group = threadGroup;
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
final String name =
|
||||
prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
|
||||
return new Thread(group, r, name);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a named {@link ThreadFactory} that just builds daemon threads.
|
||||
*
|
||||
* @param prefix name prefix for all threads created from the factory
|
||||
* @return a thread factory that creates named, daemon threads with
|
||||
* the supplied exception handler and normal priority
|
||||
*/
|
||||
private static ThreadFactory newDaemonThreadFactory(final String prefix) {
|
||||
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
|
||||
return new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = namedFactory.newThread(r);
|
||||
if (!t.isDaemon()) {
|
||||
t.setDaemon(true);
|
||||
}
|
||||
if (t.getPriority() != Thread.NORM_PRIORITY) {
|
||||
t.setPriority(Thread.NORM_PRIORITY);
|
||||
}
|
||||
return t;
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A thread pool that that blocks clients submitting additional tasks if
|
||||
* there are already {@code activeTasks} running threads and {@code
|
||||
* waitingTasks} tasks waiting in its queue.
|
||||
*
|
||||
* @param activeTasks maximum number of active tasks
|
||||
* @param waitingTasks maximum number of waiting tasks
|
||||
* @param keepAliveTime time until threads are cleaned up in {@code unit}
|
||||
* @param unit time unit
|
||||
* @param prefixName prefix of name for threads
|
||||
*/
|
||||
public BlockingThreadPoolExecutorService(int activeTasks, int waitingTasks,
|
||||
long keepAliveTime, TimeUnit unit, String prefixName) {
|
||||
super();
|
||||
queueingPermits = new Semaphore(waitingTasks + activeTasks, false);
|
||||
/* Although we generally only expect up to waitingTasks tasks in the
|
||||
queue, we need to be able to buffer all tasks in case dequeueing is
|
||||
slower than enqueueing. */
|
||||
final BlockingQueue<Runnable> workQueue =
|
||||
new LinkedBlockingQueue<>(waitingTasks + activeTasks);
|
||||
ThreadPoolExecutor eventProcessingExecutor =
|
||||
new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
|
||||
workQueue, newDaemonThreadFactory(prefixName),
|
||||
new RejectedExecutionHandler() {
|
||||
@Override
|
||||
public void rejectedExecution(Runnable r,
|
||||
ThreadPoolExecutor executor) {
|
||||
// This is not expected to happen.
|
||||
LOG.error("Could not submit task to executor {}",
|
||||
executor.toString());
|
||||
}
|
||||
});
|
||||
eventProcessingExecutor.allowCoreThreadTimeOut(true);
|
||||
executorDelegatee =
|
||||
MoreExecutors.listeningDecorator(eventProcessingExecutor);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ListeningExecutorService delegate() {
|
||||
return executorDelegatee;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> ListenableFuture<T> submit(Callable<T> task) {
|
||||
try {
|
||||
queueingPermits.acquire();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return Futures.immediateFailedCheckedFuture(e);
|
||||
}
|
||||
return super.submit(new CallableWithPermitRelease<T>(task));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> ListenableFuture<T> submit(Runnable task, T result) {
|
||||
try {
|
||||
queueingPermits.acquire();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return Futures.immediateFailedCheckedFuture(e);
|
||||
}
|
||||
return super.submit(new RunnableWithPermitRelease(task), result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<?> submit(Runnable task) {
|
||||
try {
|
||||
queueingPermits.acquire();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return Futures.immediateFailedCheckedFuture(e);
|
||||
}
|
||||
return super.submit(new RunnableWithPermitRelease(task));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
try {
|
||||
queueingPermits.acquire();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
super.execute(new RunnableWithPermitRelease(command));
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases a permit after the task is executed.
|
||||
*/
|
||||
class RunnableWithPermitRelease implements Runnable {
|
||||
|
||||
private Runnable delegatee;
|
||||
|
||||
public RunnableWithPermitRelease(Runnable delegatee) {
|
||||
this.delegatee = delegatee;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
delegatee.run();
|
||||
} finally {
|
||||
queueingPermits.release();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases a permit after the task is completed.
|
||||
*/
|
||||
class CallableWithPermitRelease<T> implements Callable<T> {
|
||||
|
||||
private Callable<T> delegatee;
|
||||
|
||||
public CallableWithPermitRelease(Callable<T> delegatee) {
|
||||
this.delegatee = delegatee;
|
||||
}
|
||||
|
||||
@Override
|
||||
public T call() throws Exception {
|
||||
try {
|
||||
return delegatee.call();
|
||||
} finally {
|
||||
queueingPermits.release();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
|
||||
throws InterruptedException {
|
||||
throw new RuntimeException("Not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
|
||||
long timeout, TimeUnit unit) throws InterruptedException {
|
||||
throw new RuntimeException("Not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
|
||||
throws InterruptedException, ExecutionException {
|
||||
throw new RuntimeException("Not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
|
||||
TimeUnit unit)
|
||||
throws InterruptedException, ExecutionException, TimeoutException {
|
||||
throw new RuntimeException("Not implemented");
|
||||
}
|
||||
|
||||
}
|
@ -61,20 +61,15 @@ public class Constants {
|
||||
|
||||
// the maximum number of threads to allow in the pool used by TransferManager
|
||||
public static final String MAX_THREADS = "fs.s3a.threads.max";
|
||||
public static final int DEFAULT_MAX_THREADS = 256;
|
||||
public static final int DEFAULT_MAX_THREADS = 10;
|
||||
|
||||
// the number of threads to keep in the pool used by TransferManager
|
||||
public static final String CORE_THREADS = "fs.s3a.threads.core";
|
||||
public static final int DEFAULT_CORE_THREADS = DEFAULT_MAXIMUM_CONNECTIONS;
|
||||
|
||||
// when the number of threads is greater than the core, this is the maximum time
|
||||
// that excess idle threads will wait for new tasks before terminating.
|
||||
// the time an idle thread waits before terminating
|
||||
public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
|
||||
public static final int DEFAULT_KEEPALIVE_TIME = 60;
|
||||
|
||||
// the maximum number of tasks that the LinkedBlockingQueue can hold
|
||||
// the maximum number of tasks cached if all threads are already uploading
|
||||
public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";
|
||||
public static final int DEFAULT_MAX_TOTAL_TASKS = 1000;
|
||||
public static final int DEFAULT_MAX_TOTAL_TASKS = 5;
|
||||
|
||||
// size of each of or multipart pieces in bytes
|
||||
public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";
|
||||
|
@ -51,7 +51,7 @@ import java.util.List;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
|
||||
/**
|
||||
@ -108,7 +108,7 @@ public class S3AFastOutputStream extends OutputStream {
|
||||
String bucket, String key, Progressable progress,
|
||||
FileSystem.Statistics statistics, CannedAccessControlList cannedACL,
|
||||
String serverSideEncryptionAlgorithm, long partSize,
|
||||
long multiPartThreshold, ThreadPoolExecutor threadPoolExecutor)
|
||||
long multiPartThreshold, ExecutorService threadPoolExecutor)
|
||||
throws IOException {
|
||||
this.bucket = bucket;
|
||||
this.key = key;
|
||||
|
@ -26,11 +26,8 @@ import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
@ -86,7 +83,7 @@ public class S3AFileSystem extends FileSystem {
|
||||
private int maxKeys;
|
||||
private long partSize;
|
||||
private TransferManager transfers;
|
||||
private ThreadPoolExecutor threadPoolExecutor;
|
||||
private ExecutorService threadPoolExecutor;
|
||||
private long multiPartThreshold;
|
||||
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
|
||||
private CannedAccessControlList cannedACL;
|
||||
@ -95,55 +92,6 @@ public class S3AFileSystem extends FileSystem {
|
||||
// The maximum number of entries that can be deleted in any call to s3
|
||||
private static final int MAX_ENTRIES_TO_DELETE = 1000;
|
||||
|
||||
private static final AtomicInteger poolNumber = new AtomicInteger(1);
|
||||
/**
|
||||
* Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
|
||||
* with a common prefix.
|
||||
* @param prefix The prefix of every created Thread's name
|
||||
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
|
||||
*/
|
||||
public static ThreadFactory getNamedThreadFactory(final String prefix) {
|
||||
SecurityManager s = System.getSecurityManager();
|
||||
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
|
||||
.getThreadGroup();
|
||||
|
||||
return new ThreadFactory() {
|
||||
final AtomicInteger threadNumber = new AtomicInteger(1);
|
||||
private final int poolNum = poolNumber.getAndIncrement();
|
||||
final ThreadGroup group = threadGroup;
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
final String name = prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
|
||||
return new Thread(group, r, name);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a named {@link ThreadFactory} that just builds daemon threads.
|
||||
* @param prefix name prefix for all threads created from the factory
|
||||
* @return a thread factory that creates named, daemon threads with
|
||||
* the supplied exception handler and normal priority
|
||||
*/
|
||||
private static ThreadFactory newDaemonThreadFactory(final String prefix) {
|
||||
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
|
||||
return new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = namedFactory.newThread(r);
|
||||
if (!t.isDaemon()) {
|
||||
t.setDaemon(true);
|
||||
}
|
||||
if (t.getPriority() != Thread.NORM_PRIORITY) {
|
||||
t.setPriority(Thread.NORM_PRIORITY);
|
||||
}
|
||||
return t;
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
/** Called after a new FileSystem instance is constructed.
|
||||
* @param name a uri whose authority section names the host, port, etc.
|
||||
* for this FileSystem
|
||||
@ -264,25 +212,19 @@ public class S3AFileSystem extends FileSystem {
|
||||
}
|
||||
|
||||
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
|
||||
int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS);
|
||||
if (maxThreads == 0) {
|
||||
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
|
||||
if (maxThreads < 2) {
|
||||
LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
|
||||
maxThreads = 2;
|
||||
}
|
||||
if (coreThreads == 0) {
|
||||
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
|
||||
int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
|
||||
if (totalTasks < 1) {
|
||||
LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
|
||||
totalTasks = 1;
|
||||
}
|
||||
long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
|
||||
LinkedBlockingQueue<Runnable> workQueue =
|
||||
new LinkedBlockingQueue<>(maxThreads *
|
||||
conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
|
||||
threadPoolExecutor = new ThreadPoolExecutor(
|
||||
coreThreads,
|
||||
maxThreads,
|
||||
keepAliveTime,
|
||||
TimeUnit.SECONDS,
|
||||
workQueue,
|
||||
newDaemonThreadFactory("s3a-transfer-shared-"));
|
||||
threadPoolExecutor.allowCoreThreadTimeOut(true);
|
||||
threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
|
||||
maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
|
||||
"s3a-transfer-shared");
|
||||
|
||||
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
|
||||
transferConfiguration.setMinimumUploadPartSize(partSize);
|
||||
|
@ -231,17 +231,11 @@ If you do any of these: change your credentials immediately!
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.threads.max</name>
|
||||
<value>256</value>
|
||||
<value>10</value>
|
||||
<description> Maximum number of concurrent active (part)uploads,
|
||||
which each use a thread from the threadpool.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.threads.core</name>
|
||||
<value>15</value>
|
||||
<description>Number of core threads in the threadpool.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.threads.keepalivetime</name>
|
||||
<value>60</value>
|
||||
@ -251,7 +245,7 @@ If you do any of these: change your credentials immediately!
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.max.total.tasks</name>
|
||||
<value>1000</value>
|
||||
<value>5</value>
|
||||
<description>Number of (part)uploads allowed to the queue before
|
||||
blocking additional uploads.</description>
|
||||
</property>
|
||||
|
@ -0,0 +1,182 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.apache.hadoop.util.StopWatch;
|
||||
import org.junit.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
/**
|
||||
* Basic unit test for S3A's blocking executor service.
|
||||
*/
|
||||
public class TestBlockingThreadPoolExecutorService {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
BlockingThreadPoolExecutorService.class);
|
||||
|
||||
private static final int NUM_ACTIVE_TASKS = 4;
|
||||
private static final int NUM_WAITING_TASKS = 2;
|
||||
private static final int TASK_SLEEP_MSEC = 100;
|
||||
private static final int SHUTDOWN_WAIT_MSEC = 200;
|
||||
private static final int SHUTDOWN_WAIT_TRIES = 5;
|
||||
private static final int BLOCKING_THRESHOLD_MSEC = 50;
|
||||
|
||||
private static final Integer SOME_VALUE = 1337;
|
||||
|
||||
private static BlockingThreadPoolExecutorService tpe = null;
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
ensureDestroyed();
|
||||
}
|
||||
|
||||
/**
|
||||
* Basic test of running one trivial task.
|
||||
*/
|
||||
@Test
|
||||
public void testSubmitCallable() throws Exception {
|
||||
ensureCreated();
|
||||
ListenableFuture<Integer> f = tpe.submit(callableSleeper);
|
||||
Integer v = f.get();
|
||||
assertEquals(SOME_VALUE, v);
|
||||
}
|
||||
|
||||
/**
|
||||
* More involved test, including detecting blocking when at capacity.
|
||||
*/
|
||||
@Test
|
||||
public void testSubmitRunnable() throws Exception {
|
||||
ensureCreated();
|
||||
int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS;
|
||||
StopWatch stopWatch = new StopWatch().start();
|
||||
for (int i = 0; i < totalTasks; i++) {
|
||||
tpe.submit(sleeper);
|
||||
assertDidntBlock(stopWatch);
|
||||
}
|
||||
tpe.submit(sleeper);
|
||||
assertDidBlock(stopWatch);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShutdown() throws Exception {
|
||||
// Cover create / destroy, regardless of when this test case runs
|
||||
ensureCreated();
|
||||
ensureDestroyed();
|
||||
|
||||
// Cover create, execute, destroy, regardless of when test case runs
|
||||
ensureCreated();
|
||||
testSubmitRunnable();
|
||||
ensureDestroyed();
|
||||
}
|
||||
|
||||
// Helper functions, etc.
|
||||
|
||||
private void assertDidntBlock(StopWatch sw) {
|
||||
try {
|
||||
assertFalse("Non-blocking call took too long.",
|
||||
sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC);
|
||||
} finally {
|
||||
sw.reset().start();
|
||||
}
|
||||
}
|
||||
|
||||
private void assertDidBlock(StopWatch sw) {
|
||||
try {
|
||||
if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) {
|
||||
throw new RuntimeException("Blocking call returned too fast.");
|
||||
}
|
||||
} finally {
|
||||
sw.reset().start();
|
||||
}
|
||||
}
|
||||
|
||||
private Runnable sleeper = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
String name = Thread.currentThread().getName();
|
||||
try {
|
||||
Thread.sleep(TASK_SLEEP_MSEC);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Thread {} interrupted.", name);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
private Callable<Integer> callableSleeper = new Callable<Integer>() {
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
sleeper.run();
|
||||
return SOME_VALUE;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Helper function to create thread pool under test.
|
||||
*/
|
||||
private static void ensureCreated() throws Exception {
|
||||
if (tpe == null) {
|
||||
LOG.debug("Creating thread pool");
|
||||
tpe = new BlockingThreadPoolExecutorService(NUM_ACTIVE_TASKS,
|
||||
NUM_WAITING_TASKS, 1, TimeUnit.SECONDS, "btpetest");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to terminate thread pool under test, asserting that
|
||||
* shutdown -> terminate works as expected.
|
||||
*/
|
||||
private static void ensureDestroyed() throws Exception {
|
||||
if (tpe == null) {
|
||||
return;
|
||||
}
|
||||
int shutdownTries = SHUTDOWN_WAIT_TRIES;
|
||||
|
||||
tpe.shutdown();
|
||||
if (!tpe.isShutdown()) {
|
||||
throw new RuntimeException("Shutdown had no effect.");
|
||||
}
|
||||
|
||||
while (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
|
||||
TimeUnit.MILLISECONDS)) {
|
||||
LOG.info("Waiting for thread pool shutdown.");
|
||||
if (shutdownTries-- <= 0) {
|
||||
LOG.error("Failed to terminate thread pool gracefully.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!tpe.isTerminated()) {
|
||||
tpe.shutdownNow();
|
||||
if (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
|
||||
TimeUnit.MILLISECONDS)) {
|
||||
throw new RuntimeException(
|
||||
"Failed to terminate thread pool in timely manner.");
|
||||
}
|
||||
}
|
||||
tpe = null;
|
||||
}
|
||||
}
|
@ -0,0 +1,80 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
/**
|
||||
* Demonstrate that the threadpool blocks additional client requests if
|
||||
* its queue is full (rather than throwing an exception) by initiating an
|
||||
* upload consisting of 4 parts with 2 threads and 1 spot in the queue. The
|
||||
* 4th part should not trigger an exception as it would with a
|
||||
* non-blocking threadpool.
|
||||
*/
|
||||
public class TestS3ABlockingThreadPool {
|
||||
|
||||
private Configuration conf;
|
||||
private S3AFileSystem fs;
|
||||
|
||||
@Rule
|
||||
public Timeout testTimeout = new Timeout(30 * 60 * 1000);
|
||||
|
||||
protected Path getTestPath() {
|
||||
return new Path("/tests3a");
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
conf = new Configuration();
|
||||
conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024);
|
||||
conf.setLong(Constants.MULTIPART_SIZE, 5 * 1024 * 1024);
|
||||
conf.setInt(Constants.MAX_THREADS, 2);
|
||||
conf.setInt(Constants.MAX_TOTAL_TASKS, 1);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (fs != null) {
|
||||
fs.delete(getTestPath(), true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegularMultiPartUpload() throws Exception {
|
||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
|
||||
1024);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFastMultiPartUpload() throws Exception {
|
||||
conf.setBoolean(Constants.FAST_UPLOAD, true);
|
||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
|
||||
1024);
|
||||
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user