HDFS-9612. DistCp worker threads are not terminated after jobs are done. (Wei-Chiu Chuang via Yongjun Zhang)

This commit is contained in:
Yongjun Zhang 2016-01-15 10:03:09 -08:00
parent 9fbd579ab5
commit a9c69ebeb7
4 changed files with 130 additions and 14 deletions

View File

@ -2590,6 +2590,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9584. NPE in distcp when ssl configuration file does not exist in HDFS-9584. NPE in distcp when ssl configuration file does not exist in
class path. (Surendra Singh Lilhore via Xiaoyu Yao) class path. (Surendra Singh Lilhore via Xiaoyu Yao)
HDFS-9612. DistCp worker threads are not terminated after jobs are done.
(Wei-Chiu Chuang via Yongjun Zhang)
Release 2.7.3 - UNRELEASED Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -70,7 +70,10 @@ public void addWorker(WorkRequestProcessor<T, R> processor) {
* completion of any pending work. * completion of any pending work.
*/ */
public void shutdown() { public void shutdown() {
executor.shutdown(); if (hasWork()) {
LOG.warn("Shutdown() is called but there are still unprocessed work!");
}
executor.shutdownNow();
} }
/** /**
@ -117,6 +120,8 @@ public void put(WorkRequest<T> workRequest) {
/** /**
* Blocking take from ProducerConsumer output queue that can be interrupted. * Blocking take from ProducerConsumer output queue that can be interrupted.
* *
* @throws InterruptedException if interrupted before an element becomes
* available.
* @return item returned by processor's processItem(). * @return item returned by processor's processItem().
*/ */
public WorkReport<R> take() throws InterruptedException { public WorkReport<R> take() throws InterruptedException {
@ -143,30 +148,52 @@ public WorkReport<R> blockingTake() {
} }
} }
/**
* Worker thread implementation.
*
*/
private class Worker implements Runnable { private class Worker implements Runnable {
private WorkRequestProcessor<T, R> processor; private WorkRequestProcessor<T, R> processor;
/**
* Constructor.
* @param processor is used to process an item from input queue.
*/
public Worker(WorkRequestProcessor<T, R> processor) { public Worker(WorkRequestProcessor<T, R> processor) {
this.processor = processor; this.processor = processor;
} }
/**
* The worker continuously gets an item from input queue, process it and
* then put the processed result into output queue. It waits to get an item
* from input queue if there's none.
*/
public void run() { public void run() {
while (true) { while (true) {
WorkRequest<T> work;
try { try {
WorkRequest<T> work = inputQueue.take(); work = inputQueue.take();
WorkReport<R> result = processor.processItem(work); } catch (InterruptedException e) {
// It is assumed that if an interrupt occurs while taking a work
// out from input queue, the interrupt is likely triggered by
// ProducerConsumer.shutdown(). Therefore, exit the thread.
LOG.debug("Interrupted while waiting for requests from inputQueue.");
return;
}
boolean isDone = false; boolean isDone = false;
while (!isDone) { while (!isDone) {
try { try {
// if the interrupt happens while the work is being processed,
// go back to process the same work again.
WorkReport<R> result = processor.processItem(work);
outputQueue.put(result); outputQueue.put(result);
isDone = true; isDone = true;
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.debug("Could not put report into outputQueue. Retrying..."); LOG.debug("Worker thread was interrupted while processing an item,"
} + " or putting into outputQueue. Retrying...");
} }
} catch (InterruptedException ie) {
LOG.debug("Interrupted while waiting for request from inputQueue.");
} }
} }
} }

View File

@ -26,6 +26,8 @@ public interface WorkRequestProcessor<T, R> {
/** /**
* Work processor. * Work processor.
* The processor should be stateless: that is, it can be repeated after
* being interrupted.
* *
* @param workRequest Input work item. * @param workRequest Input work item.
* @return Outputs WorkReport after processing workRequest item. * @return Outputs WorkReport after processing workRequest item.

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.tools.util; package org.apache.hadoop.tools.util;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.util.ProducerConsumer; import org.apache.hadoop.tools.util.ProducerConsumer;
import org.apache.hadoop.tools.util.WorkReport; import org.apache.hadoop.tools.util.WorkReport;
import org.apache.hadoop.tools.util.WorkRequest; import org.apache.hadoop.tools.util.WorkRequest;
@ -27,6 +28,7 @@
import java.lang.Exception; import java.lang.Exception;
import java.lang.Integer; import java.lang.Integer;
import java.util.concurrent.TimeoutException;
public class TestProducerConsumer { public class TestProducerConsumer {
public class CopyProcessor implements WorkRequestProcessor<Integer, Integer> { public class CopyProcessor implements WorkRequestProcessor<Integer, Integer> {
@ -64,6 +66,7 @@ public void testSimpleProducerConsumer() {
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Assert.assertTrue(false); Assert.assertTrue(false);
} }
worker.shutdown();
} }
@Test @Test
@ -89,6 +92,7 @@ public void testMultipleProducerConsumer() {
} }
Assert.assertEquals(0, sum); Assert.assertEquals(0, sum);
Assert.assertEquals(numRequests, numReports); Assert.assertEquals(numRequests, numReports);
workers.shutdown();
} }
@Test @Test
@ -105,5 +109,85 @@ public void testExceptionProducerConsumer() {
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Assert.assertTrue(false); Assert.assertTrue(false);
} }
worker.shutdown();
}
@Test
public void testSimpleProducerConsumerShutdown() throws InterruptedException,
TimeoutException {
// create a producer-consumer thread pool with one thread.
ProducerConsumer<Integer, Integer> worker =
new ProducerConsumer<Integer, Integer>(1);
worker.addWorker(new CopyProcessor());
// interrupt worker threads
worker.shutdown();
// Regression test for HDFS-9612
// Periodically check, and make sure that worker threads are ultimately
// terminated after interrupts
GenericTestUtils.waitForThreadTermination("pool-.*-thread.*",100,10000);
}
@Test(timeout=10000)
public void testMultipleProducerConsumerShutdown()
throws InterruptedException, TimeoutException {
int numWorkers = 10;
// create a producer consumer thread pool with 10 threads.
final ProducerConsumer<Integer, Integer> worker =
new ProducerConsumer<Integer, Integer>(numWorkers);
for (int i=0; i< numWorkers; i++) {
worker.addWorker(new CopyProcessor());
}
// starts two thread: a source thread which put in work, and a sink thread
// which takes a piece of work from ProducerConsumer
class SourceThread extends Thread {
public void run() {
while (true) {
try {
worker.put(new WorkRequest<Integer>(42));
Thread.sleep(1);
} catch (InterruptedException ie) {
return;
}
}
}
};
// The source thread put requests into producer-consumer.
SourceThread source = new SourceThread();
source.start();
class SinkThread extends Thread {
public void run() {
try {
while (true) {
WorkReport<Integer> report = worker.take();
Assert.assertEquals(42, report.getItem().intValue());
}
} catch (InterruptedException ie) {
return;
}
}
};
// The sink thread gets proceessed items from producer-consumer
SinkThread sink = new SinkThread();
sink.start();
// sleep 1 second and then shut down source.
// This makes sure producer consumer gets some work to do
Thread.sleep(1000);
// after 1 second, stop source thread to stop pushing items.
source.interrupt();
// wait until all work is consumed by sink
while (worker.hasWork()) {
Thread.sleep(1);
}
worker.shutdown();
// Regression test for HDFS-9612
// make sure worker threads are terminated after workers are asked to
// shutdown.
GenericTestUtils.waitForThreadTermination("pool-.*-thread.*",100,10000);
sink.interrupt();
source.join();
sink.join();
} }
} }