diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 67dfc50e1f..e0d5652b0a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -105,6 +105,8 @@ Release 0.23.2 - UNRELEASED OPTIMIZATIONS BUG FIXES + MAPREDUCE-3862. Nodemanager can appear to hang on shutdown due to lingering + DeletionService threads (Jason Lowe via bobby) MAPREDUCE-3680. FifoScheduler web service rest API can print out invalid JSON. (B Anil Kumar via tgraves) diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index 38eff3591f..7d4de873e3 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -25,6 +25,7 @@ import static java.util.concurrent.TimeUnit.*; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; @@ -85,6 +86,7 @@ public void init(Configuration conf) { sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf); } + sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); sched.setKeepAliveTime(60L, SECONDS); super.init(conf); } @@ -92,14 +94,27 @@ public void init(Configuration conf) { @Override public void stop() { sched.shutdown(); + boolean terminated = false; try { - sched.awaitTermination(10, SECONDS); + terminated = sched.awaitTermination(10, SECONDS); } catch (InterruptedException e) { + } + if (terminated != true) { sched.shutdownNow(); } super.stop(); } + /** + * Determine if the service has completely stopped. + * Used only by unit tests + * @return true if service has completely stopped + */ + @Private + public boolean isTerminated() { + return getServiceState() == STATE.STOPPED && sched.isTerminated(); + } + private class FileDeletion implements Runnable { final String user; final Path subDir; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java index abaad224d4..28b51c0632 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java @@ -27,12 +27,15 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.junit.AfterClass; import org.junit.Test; +import org.mockito.Mockito; + import static org.junit.Assert.*; public class TestDeletionService { @@ -107,12 +110,18 @@ public void testAbsDelete() throws Exception { del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, null); } + + int msecToWait = 20 * 1000; + for (Path p : dirs) { + while (msecToWait > 0 && lfs.util().exists(p)) { + Thread.sleep(100); + msecToWait -= 100; + } + assertFalse(lfs.util().exists(p)); + } } finally { del.stop(); } - for (Path p : dirs) { - assertFalse(lfs.util().exists(p)); - } } @Test @@ -137,14 +146,35 @@ public void testRelativeDelete() throws Exception { del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, baseDirs.toArray(new Path[4])); } + + int msecToWait = 20 * 1000; + for (Path p : baseDirs) { + for (Path q : content) { + Path fp = new Path(p, q); + while (msecToWait > 0 && lfs.util().exists(fp)) { + Thread.sleep(100); + msecToWait -= 100; + } + assertFalse(lfs.util().exists(fp)); + } + } } finally { del.stop(); } - for (Path p : baseDirs) { - for (Path q : content) { - assertFalse(lfs.util().exists(new Path(p, q))); - } - } } + @Test + public void testStopWithDelayedTasks() throws Exception { + DeletionService del = new DeletionService(Mockito.mock(ContainerExecutor.class)); + Configuration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 60); + del.init(conf); + del.start(); + try { + del.delete("dingo", new Path("/does/not/exist")); + } finally { + del.stop(); + } + assertTrue(del.isTerminated()); + } }