From c4e11269808ee8364e46dea743d6b858b87d37eb Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Mon, 14 Dec 2009 22:17:19 +0000 Subject: [PATCH] HADOOP-6433. Introduce asychronous deletion of files via a pool of threads. This can be used to delete files in the Distributed Cache. (Zheng Shao via dhruba) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@890502 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 4 + .../apache/hadoop/util/AsyncDiskService.java | 155 ++++++++++++++++++ .../hadoop/util/TestAsyncDiskService.java | 84 ++++++++++ 3 files changed, 243 insertions(+) create mode 100644 src/java/org/apache/hadoop/util/AsyncDiskService.java create mode 100644 src/test/core/org/apache/hadoop/util/TestAsyncDiskService.java diff --git a/CHANGES.txt b/CHANGES.txt index 8b40a91c79..71b0c1ec10 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -22,6 +22,10 @@ Trunk (unreleased changes) HADOOP-6323. Add comparators to the serialization API. (Aaron Kimball via cutting) + HADOOP-6433. Introduce asychronous deletion of files via a pool of + threads. This can be used to delete files in the Distributed + Cache. (Zheng Shao via dhruba) + IMPROVEMENTS HADOOP-6283. Improve the exception messages thrown by diff --git a/src/java/org/apache/hadoop/util/AsyncDiskService.java b/src/java/org/apache/hadoop/util/AsyncDiskService.java new file mode 100644 index 0000000000..9a203f8a5b --- /dev/null +++ b/src/java/org/apache/hadoop/util/AsyncDiskService.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/* + * This class is a container of multiple thread pools, each for a volume, + * so that we can schedule async disk operations easily. + * + * Examples of async disk operations are deletion of files. + * We can move the files to a "TO_BE_DELETED" folder before asychronously + * deleting it, to make sure the caller can run it faster. + */ +public class AsyncDiskService { + + public static final Log LOG = LogFactory.getLog(AsyncDiskService.class); + + // ThreadPool core pool size + private static final int CORE_THREADS_PER_VOLUME = 1; + // ThreadPool maximum pool size + private static final int MAXIMUM_THREADS_PER_VOLUME = 4; + // ThreadPool keep-alive time for threads over core pool size + private static final long THREADS_KEEP_ALIVE_SECONDS = 60; + + private final ThreadGroup threadGroup = new ThreadGroup("async disk service"); + + private ThreadFactory threadFactory; + + private HashMap executors + = new HashMap(); + + /** + * Create a AsyncDiskServices with a set of volumes (specified by their + * root directories). + * + * The AsyncDiskServices uses one ThreadPool per volume to do the async + * disk operations. + * + * @param volumes The roots of the file system volumes. + */ + public AsyncDiskService(String[] volumes) throws IOException { + + threadFactory = new ThreadFactory() { + public Thread newThread(Runnable r) { + return new Thread(threadGroup, r); + } + }; + + // Create one ThreadPool per volume + for (int v = 0 ; v < volumes.length; v++) { + ThreadPoolExecutor executor = new ThreadPoolExecutor( + CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, + THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, + new LinkedBlockingQueue(), threadFactory); + + // This can reduce the number of running threads + executor.allowCoreThreadTimeOut(true); + executors.put(volumes[v], executor); + } + + } + + /** + * Execute the task sometime in the future, using ThreadPools. + */ + public synchronized void execute(String root, Runnable task) { + ThreadPoolExecutor executor = executors.get(root); + if (executor == null) { + throw new RuntimeException("Cannot find root " + root + + " for execution of task " + task); + } else { + executor.execute(task); + } + } + + /** + * Gracefully start the shut down of all ThreadPools. + */ + public synchronized void shutdown() { + + LOG.info("Shutting down all AsyncDiskService threads..."); + + for (Map.Entry e + : executors.entrySet()) { + e.getValue().shutdown(); + } + } + + /** + * Wait for the termination of the thread pools. + * + * @param milliseconds The number of milliseconds to wait + * @return true if all thread pools are terminated without time limit + * @throws InterruptedException + */ + public synchronized boolean awaitTermination(long milliseconds) + throws InterruptedException { + + long end = System.currentTimeMillis() + milliseconds; + for (Map.Entry e: + executors.entrySet()) { + ThreadPoolExecutor executor = e.getValue(); + if (!executor.awaitTermination( + Math.max(end - System.currentTimeMillis(), 0), + TimeUnit.MILLISECONDS)) { + LOG.warn("AsyncDiskService awaitTermination timeout."); + return false; + } + } + LOG.info("All AsyncDiskService threads are terminated."); + return true; + } + + /** + * Shut down all ThreadPools immediately. + */ + public synchronized List shutdownNow() { + + LOG.info("Shutting down all AsyncDiskService threads immediately..."); + + List list = new ArrayList(); + for (Map.Entry e + : executors.entrySet()) { + list.addAll(e.getValue().shutdownNow()); + } + return list; + } + +} diff --git a/src/test/core/org/apache/hadoop/util/TestAsyncDiskService.java b/src/test/core/org/apache/hadoop/util/TestAsyncDiskService.java new file mode 100644 index 0000000000..075ef69fd3 --- /dev/null +++ b/src/test/core/org/apache/hadoop/util/TestAsyncDiskService.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.AsyncDiskService; +import org.junit.Test; + +/** + * A test for AsyncDiskService. + */ +public class TestAsyncDiskService extends TestCase { + + public static final Log LOG = LogFactory.getLog(TestAsyncDiskService.class); + + // Access by multiple threads from the ThreadPools in AsyncDiskService. + volatile int count; + + /** An example task for incrementing a counter. + */ + class ExampleTask implements Runnable { + + ExampleTask() { + } + + @Override + public void run() { + synchronized (TestAsyncDiskService.this) { + count ++; + } + } + }; + + + /** + * This test creates some ExampleTasks and runs them. + */ + @Test + public void testAsyncDiskService() throws Throwable { + + String[] vols = new String[]{"/0", "/1"}; + AsyncDiskService service = new AsyncDiskService(vols); + + int total = 100; + + for (int i = 0; i < total; i++) { + service.execute(vols[i%2], new ExampleTask()); + } + + Exception e = null; + try { + service.execute("no_such_volume", new ExampleTask()); + } catch (RuntimeException ex) { + e = ex; + } + assertNotNull("Executing a task on a non-existing volume should throw an " + + "Exception.", e); + + service.shutdown(); + if (!service.awaitTermination(5000)) { + fail("AsyncDiskService didn't shutdown in 5 seconds."); + } + + assertEquals(total, count); + } +}