diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index f9148ef47b..a0eaff7143 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -42,7 +42,6 @@ import java.util.Set; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -213,7 +212,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private boolean enableMultiObjectsDelete; private TransferManager transfers; private ListeningExecutorService boundedThreadPool; - private ExecutorService unboundedThreadPool; + private ThreadPoolExecutor unboundedThreadPool; private int executorCapacity; private long multiPartThreshold; public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); @@ -440,6 +439,7 @@ private void initThreadPools(Configuration conf) { new LinkedBlockingQueue<>(), BlockingThreadPoolExecutorService.newDaemonThreadFactory( "s3a-transfer-unbounded")); + unboundedThreadPool.allowCoreThreadTimeOut(true); executorCapacity = intOption(conf, EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java index e320bb2191..df5cd46fff 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer; +import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3ATestUtils; @@ -52,10 +53,8 @@ public class ITestS3AConcurrentOps extends S3AScaleTestBase { private static final Logger LOG = LoggerFactory.getLogger( ITestS3AConcurrentOps.class); private final int concurrentRenames = 10; + private final int fileSize = 1024 * 1024; private Path testRoot; - private Path[] source = new Path[concurrentRenames]; - private Path[] target = new Path[concurrentRenames]; - private S3AFileSystem fs; private S3AFileSystem auxFs; @Override @@ -66,43 +65,10 @@ protected int getTestTimeoutSeconds() { @Override public void setup() throws Exception { super.setup(); - fs = getRestrictedFileSystem(); auxFs = getNormalFileSystem(); testRoot = path("/ITestS3AConcurrentOps"); testRoot = S3ATestUtils.createTestPath(testRoot); - - for (int i = 0; i < concurrentRenames; i++){ - source[i] = new Path(testRoot, "source" + i); - target[i] = new Path(testRoot, "target" + i); - } - - LOG.info("Generating data..."); - auxFs.mkdirs(testRoot); - byte[] zeroes = ContractTestUtils.dataset(1024*1024, 0, Integer.MAX_VALUE); - for (Path aSource : source) { - try(FSDataOutputStream out = auxFs.create(aSource)) { - for (int mb = 0; mb < 20; mb++) { - LOG.debug("{}: Block {}...", aSource, mb); - out.write(zeroes); - } - } - } - LOG.info("Data generated..."); - } - - private S3AFileSystem getRestrictedFileSystem() throws Exception { - Configuration conf = getConfiguration(); - conf.setInt(MAX_THREADS, 2); - conf.setInt(MAX_TOTAL_TASKS, 1); - - conf.set(MIN_MULTIPART_THRESHOLD, "10M"); - conf.set(MULTIPART_SIZE, "5M"); - - S3AFileSystem s3a = getFileSystem(); - URI rootURI = new URI(conf.get(TEST_FS_S3A_NAME)); - s3a.initialize(rootURI, conf); - return s3a; } private S3AFileSystem getNormalFileSystem() throws Exception { @@ -118,9 +84,76 @@ public void teardown() throws Exception { super.teardown(); if (auxFs != null) { auxFs.delete(testRoot, true); + auxFs.close(); } } + private void parallelRenames(int concurrentRenames, final S3AFileSystem fs, + String sourceNameBase, String targetNameBase) throws ExecutionException, + InterruptedException, IOException { + + Path[] source = new Path[concurrentRenames]; + Path[] target = new Path[concurrentRenames]; + + for (int i = 0; i < concurrentRenames; i++){ + source[i] = new Path(testRoot, sourceNameBase + i); + target[i] = new Path(testRoot, targetNameBase + i); + } + + LOG.info("Generating data..."); + auxFs.mkdirs(testRoot); + byte[] zeroes = ContractTestUtils.dataset(fileSize, 0, Integer.MAX_VALUE); + for (Path aSource : source) { + try(FSDataOutputStream out = auxFs.create(aSource)) { + for (int mb = 0; mb < 20; mb++) { + LOG.debug("{}: Block {}...", aSource, mb); + out.write(zeroes); + } + } + } + LOG.info("Data generated..."); + + ExecutorService executor = Executors.newFixedThreadPool( + concurrentRenames, new ThreadFactory() { + private AtomicInteger count = new AtomicInteger(0); + + public Thread newThread(Runnable r) { + return new Thread(r, + "testParallelRename" + count.getAndIncrement()); + } + }); + try { + ((ThreadPoolExecutor)executor).prestartAllCoreThreads(); + Future[] futures = new Future[concurrentRenames]; + for (int i = 0; i < concurrentRenames; i++) { + final int index = i; + futures[i] = executor.submit(new Callable() { + @Override + public Boolean call() throws Exception { + NanoTimer timer = new NanoTimer(); + boolean result = fs.rename(source[index], target[index]); + timer.end("parallel rename %d", index); + LOG.info("Rename {} ran from {} to {}", index, + timer.getStartTime(), timer.getEndTime()); + return result; + } + }); + } + LOG.info("Waiting for tasks to complete..."); + LOG.info("Deadlock may have occurred if nothing else is logged" + + " or the test times out"); + for (int i = 0; i < concurrentRenames; i++) { + assertTrue("No future " + i, futures[i].get()); + assertPathExists("target path", target[i]); + assertPathDoesNotExist("source path", source[i]); + } + LOG.info("All tasks have completed successfully"); + } finally { + executor.shutdown(); + } + } + + /** * Attempts to trigger a deadlock that would happen if any bounded resource * pool became saturated with control tasks that depended on other tasks @@ -130,39 +163,51 @@ public void teardown() throws Exception { @SuppressWarnings("unchecked") public void testParallelRename() throws InterruptedException, ExecutionException, IOException { - ExecutorService executor = Executors.newFixedThreadPool( - concurrentRenames, new ThreadFactory() { - private AtomicInteger count = new AtomicInteger(0); - public Thread newThread(Runnable r) { - return new Thread(r, - "testParallelRename" + count.getAndIncrement()); - } - }); - ((ThreadPoolExecutor)executor).prestartAllCoreThreads(); - Future[] futures = new Future[concurrentRenames]; - for (int i = 0; i < concurrentRenames; i++) { - final int index = i; - futures[i] = executor.submit(new Callable() { - @Override - public Boolean call() throws Exception { - NanoTimer timer = new NanoTimer(); - boolean result = fs.rename(source[index], target[index]); - timer.end("parallel rename %d", index); - LOG.info("Rename {} ran from {} to {}", index, - timer.getStartTime(), timer.getEndTime()); - return result; - } - }); + Configuration conf = getConfiguration(); + conf.setInt(MAX_THREADS, 2); + conf.setInt(MAX_TOTAL_TASKS, 1); + + conf.set(MIN_MULTIPART_THRESHOLD, "10K"); + conf.set(MULTIPART_SIZE, "5K"); + + try (S3AFileSystem tinyThreadPoolFs = new S3AFileSystem()) { + tinyThreadPoolFs.initialize(auxFs.getUri(), conf); + + parallelRenames(concurrentRenames, tinyThreadPoolFs, + "testParallelRename-source", "testParallelRename-target"); } - LOG.info("Waiting for tasks to complete..."); - LOG.info("Deadlock may have occurred if nothing else is logged" + - " or the test times out"); - for (int i = 0; i < concurrentRenames; i++) { - assertTrue("No future " + i, futures[i].get()); - assertPathExists("target path", target[i]); - assertPathDoesNotExist("source path", source[i]); + } + + @Test + public void testThreadPoolCoolDown() throws InterruptedException, + ExecutionException, IOException { + + int hotThreads = 0; + int coldThreads = 0; + + parallelRenames(concurrentRenames, auxFs, + "testThreadPoolCoolDown-source", "testThreadPoolCoolDown-target"); + + for (Thread t : Thread.getAllStackTraces().keySet()) { + if (t.getName().startsWith("s3a-transfer")) { + hotThreads++; + } } - LOG.info("All tasks have completed successfully"); + + int timeoutMs = Constants.DEFAULT_KEEPALIVE_TIME * 1000; + Thread.sleep((int)(1.1 * timeoutMs)); + + for (Thread t : Thread.getAllStackTraces().keySet()) { + if (t.getName().startsWith("s3a-transfer")) { + coldThreads++; + } + } + + assertNotEquals("Failed to find threads in active FS - test is flawed", + hotThreads, 0); + assertTrue("s3a-transfer threads went from " + hotThreads + " to " + + coldThreads + ", should have gone to 0", 0 == coldThreads); + } }