From 4eb5f7fa32bab1b9ce3fb58eca51e2cd2e194cd5 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 10 Feb 2015 16:54:21 +0000 Subject: [PATCH] YARN-3090. DeletionService can silently ignore deletion task failures. Contributed by Varun Saxena --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../server/nodemanager/DeletionService.java | 40 ++++++++++++++++--- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index fbeca6a741..5a3a505955 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -527,6 +527,9 @@ Release 2.7.0 - UNRELEASED YARN-2971. RM uses conf instead of token service address to renew timeline delegation tokens (jeagles) + YARN-3090. DeletionService can silently ignore deletion task failures + (Varun Saxena via jlowe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index e4025f5da1..4e00a1cc26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -29,6 +29,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -113,13 +115,13 @@ protected void serviceInit(Configuration conf) throws Exception { .setNameFormat("DeletionService #%d") .build(); if (conf != null) { - sched = new ScheduledThreadPoolExecutor( - conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), - tf); + sched = new DelServiceSchedThreadPoolExecutor( + conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, + YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf); debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0); } else { - sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, - tf); + sched = new DelServiceSchedThreadPoolExecutor( + YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf); } sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); sched.setKeepAliveTime(60L, SECONDS); @@ -155,6 +157,34 @@ public boolean isTerminated() { return getServiceState() == STATE.STOPPED && sched.isTerminated(); } + private static class DelServiceSchedThreadPoolExecutor extends + ScheduledThreadPoolExecutor { + public DelServiceSchedThreadPoolExecutor(int corePoolSize, + ThreadFactory threadFactory) { + super(corePoolSize, threadFactory); + } + + @Override + protected void afterExecute(Runnable task, Throwable exception) { + if (task instanceof FutureTask) { + FutureTask futureTask = (FutureTask) task; + if (!futureTask.isCancelled()) { + try { + futureTask.get(); + } catch (ExecutionException ee) { + exception = ee.getCause(); + } catch (InterruptedException ie) { + exception = ie; + } + } + } + if (exception != null) { + LOG.error("Exception during execution of task in DeletionService", + exception); + } + } + } + public static class FileDeletionTask implements Runnable { public static final int INVALID_TASK_ID = -1; private int taskId;