diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index cd398ade05..999c6c8f5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -2590,6 +2590,9 @@ Release 2.8.0 - UNRELEASED HDFS-9584. NPE in distcp when ssl configuration file does not exist in class path. (Surendra Singh Lilhore via Xiaoyu Yao) + HDFS-9612. DistCp worker threads are not terminated after jobs are done. + (Wei-Chiu Chuang via Yongjun Zhang) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java index 16bf2541c2..906e1ea4dc 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java @@ -70,7 +70,10 @@ public void addWorker(WorkRequestProcessor processor) { * completion of any pending work. */ public void shutdown() { - executor.shutdown(); + if (hasWork()) { + LOG.warn("Shutdown() is called but there are still unprocessed work!"); + } + executor.shutdownNow(); } /** @@ -117,6 +120,8 @@ public void put(WorkRequest workRequest) { /** * Blocking take from ProducerConsumer output queue that can be interrupted. * + * @throws InterruptedException if interrupted before an element becomes + * available. * @return item returned by processor's processItem(). */ public WorkReport take() throws InterruptedException { @@ -143,30 +148,52 @@ public WorkReport blockingTake() { } } + /** + * Worker thread implementation. + * + */ private class Worker implements Runnable { private WorkRequestProcessor processor; + /** + * Constructor. + * @param processor is used to process an item from input queue. + */ public Worker(WorkRequestProcessor processor) { this.processor = processor; } + /** + * The worker continuously gets an item from input queue, process it and + * then put the processed result into output queue. It waits to get an item + * from input queue if there's none. + */ public void run() { while (true) { - try { - WorkRequest work = inputQueue.take(); - WorkReport result = processor.processItem(work); + WorkRequest work; - boolean isDone = false; - while (!isDone) { - try { - outputQueue.put(result); - isDone = true; - } catch (InterruptedException ie) { - LOG.debug("Could not put report into outputQueue. Retrying..."); - } + try { + work = inputQueue.take(); + } catch (InterruptedException e) { + // It is assumed that if an interrupt occurs while taking a work + // out from input queue, the interrupt is likely triggered by + // ProducerConsumer.shutdown(). Therefore, exit the thread. + LOG.debug("Interrupted while waiting for requests from inputQueue."); + return; + } + + boolean isDone = false; + while (!isDone) { + try { + // if the interrupt happens while the work is being processed, + // go back to process the same work again. + WorkReport result = processor.processItem(work); + outputQueue.put(result); + isDone = true; + } catch (InterruptedException ie) { + LOG.debug("Worker thread was interrupted while processing an item," + + " or putting into outputQueue. Retrying..."); } - } catch (InterruptedException ie) { - LOG.debug("Interrupted while waiting for request from inputQueue."); } } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java index 91f738e4ef..6a4c797ab0 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java @@ -26,6 +26,8 @@ public interface WorkRequestProcessor { /** * Work processor. + * The processor should be stateless: that is, it can be repeated after + * being interrupted. * * @param workRequest Input work item. * @return Outputs WorkReport after processing workRequest item. diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java index de0fcfd6e3..ea52f694ab 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.tools.util; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.tools.util.ProducerConsumer; import org.apache.hadoop.tools.util.WorkReport; import org.apache.hadoop.tools.util.WorkRequest; @@ -27,6 +28,7 @@ import java.lang.Exception; import java.lang.Integer; +import java.util.concurrent.TimeoutException; public class TestProducerConsumer { public class CopyProcessor implements WorkRequestProcessor { @@ -64,6 +66,7 @@ public void testSimpleProducerConsumer() { } catch (InterruptedException ie) { Assert.assertTrue(false); } + worker.shutdown(); } @Test @@ -89,6 +92,7 @@ public void testMultipleProducerConsumer() { } Assert.assertEquals(0, sum); Assert.assertEquals(numRequests, numReports); + workers.shutdown(); } @Test @@ -105,5 +109,85 @@ public void testExceptionProducerConsumer() { } catch (InterruptedException ie) { Assert.assertTrue(false); } + worker.shutdown(); + } + + @Test + public void testSimpleProducerConsumerShutdown() throws InterruptedException, + TimeoutException { + // create a producer-consumer thread pool with one thread. + ProducerConsumer worker = + new ProducerConsumer(1); + worker.addWorker(new CopyProcessor()); + // interrupt worker threads + worker.shutdown(); + // Regression test for HDFS-9612 + // Periodically check, and make sure that worker threads are ultimately + // terminated after interrupts + GenericTestUtils.waitForThreadTermination("pool-.*-thread.*",100,10000); + } + + @Test(timeout=10000) + public void testMultipleProducerConsumerShutdown() + throws InterruptedException, TimeoutException { + int numWorkers = 10; + // create a producer consumer thread pool with 10 threads. + final ProducerConsumer worker = + new ProducerConsumer(numWorkers); + for (int i=0; i< numWorkers; i++) { + worker.addWorker(new CopyProcessor()); + } + + // starts two thread: a source thread which put in work, and a sink thread + // which takes a piece of work from ProducerConsumer + class SourceThread extends Thread { + public void run() { + while (true) { + try { + worker.put(new WorkRequest(42)); + Thread.sleep(1); + } catch (InterruptedException ie) { + return; + } + } + } + }; + // The source thread put requests into producer-consumer. + SourceThread source = new SourceThread(); + source.start(); + class SinkThread extends Thread { + public void run() { + try { + while (true) { + WorkReport report = worker.take(); + Assert.assertEquals(42, report.getItem().intValue()); + } + } catch (InterruptedException ie) { + return; + } + } + }; + // The sink thread gets proceessed items from producer-consumer + SinkThread sink = new SinkThread(); + sink.start(); + // sleep 1 second and then shut down source. + // This makes sure producer consumer gets some work to do + Thread.sleep(1000); + // after 1 second, stop source thread to stop pushing items. + source.interrupt(); + // wait until all work is consumed by sink + while (worker.hasWork()) { + Thread.sleep(1); + } + worker.shutdown(); + // Regression test for HDFS-9612 + // make sure worker threads are terminated after workers are asked to + // shutdown. + GenericTestUtils.waitForThreadTermination("pool-.*-thread.*",100,10000); + + sink.interrupt(); + + source.join(); + sink.join(); } }