diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 921aa6385b..48a576a8cd 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -1874,6 +1874,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2766. Fixed NM to set secure permissions for files and directories
in distributed-cache. (Hitesh Shah via vinodkv)
+ MAPREDUCE-2696. Fixed NodeManager to cleanup logs in a thread when logs'
+ aggregation is not enabled. (Siddharth Seth via vinodkv)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java
index aceb562906..063cb6bdda 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java
@@ -228,9 +228,7 @@ public void testLogsView2() throws IOException {
params);
PrintWriter spyPw = WebAppTests.getPrintWriter(injector);
verify(spyPw).write(
- "Logs not available for container_10_0001_01_000001. Aggregation "
- + "may not be complete,"
- + " Check back later or try the nodemanager on "
+ "Aggregation is not enabled. Try the nodemanager at "
+ MockJobs.NM_HOST + ":" + MockJobs.NM_PORT);
}
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 0efcb3d2eb..b545bf72d0 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -89,6 +89,12 @@
+
+
+
+
+
+
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 52cad1c9dc..0779a5f732 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -288,15 +288,31 @@ public class YarnConfiguration extends Configuration {
public static final String NM_LOCALIZER_FETCH_THREAD_COUNT =
NM_PREFIX + "localizer.fetch.thread-count";
public static final int DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT = 4;
-
+
/** Where to store container logs.*/
public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
-
+
/** Whether to enable log aggregation */
public static final String NM_LOG_AGGREGATION_ENABLED = NM_PREFIX
- + "log-aggregation.enable";
+ + "log-aggregation-enable";
+ public static final boolean DEFAULT_NM_LOG_AGGREGATION_ENABLED = false;
+ /**
+ * Number of seconds to retain logs on the NodeManager. Only applicable if Log
+ * aggregation is disabled
+ */
+ public static final String NM_LOG_RETAIN_SECONDS = NM_PREFIX
+ + "log.retain-seconds";
+
+ /**
+ * Number of threads used in log cleanup. Only applicable if Log aggregation
+ * is disabled
+ */
+ public static final String NM_LOG_DELETION_THREADS_COUNT =
+ NM_PREFIX + "log.deletion-threads-count";
+ public static final int DEFAULT_NM_LOG_DELETE_THREAD_COUNT = 4;
+
/** Where to aggregate logs to.*/
public static final String NM_REMOTE_APP_LOG_DIR =
NM_PREFIX + "remote-app-log-dir";
@@ -312,11 +328,11 @@ public class YarnConfiguration extends Configuration {
public static final String YARN_LOG_SERVER_URL =
YARN_PREFIX + "log.server.url";
-
+
/** Amount of memory in GB that can be allocated for containers.*/
public static final String NM_PMEM_MB = NM_PREFIX + "resource.memory-mb";
public static final int DEFAULT_NM_PMEM_MB = 8 * 1024;
-
+
public static final String NM_VMEM_PMEM_RATIO =
NM_PREFIX + "vmem-pmem-ratio";
public static final float DEFAULT_NM_VMEM_PMEM_RATIO = 2.1f;
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
index 8ef3487d1a..cc6f496e12 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
@@ -281,9 +281,18 @@
Whether to enable log aggregation
- yarn.nodemanager.log-aggregation.enable
+ yarn.nodemanager.log-aggregation-enable
false
+
+
+ Time in seconds to retain user logs. Only applicable if
+ log aggregation is disabled
+
+ yarn.nodemanager.log.retain-seconds
+ 10800
+
+
Where to aggregate logs to.
yarn.nodemanager.remote-app-log-dir
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 93b3908313..5e3eb26cb5 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -87,7 +87,9 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
@@ -154,9 +156,6 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
new ContainersMonitorImpl(exec, dispatcher, this.context);
addService(this.containersMonitor);
- LogAggregationService logAggregationService =
- createLogAggregationService(this.context, this.deletionService);
- addService(logAggregationService);
dispatcher.register(ContainerEventType.class,
new ContainerEventDispatcher());
@@ -166,13 +165,35 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
- dispatcher.register(LogAggregatorEventType.class, logAggregationService);
+
addService(dispatcher);
}
- protected LogAggregationService createLogAggregationService(Context context,
+ @Override
+ public void init(Configuration conf) {
+ LogHandler logHandler =
+ createLogHandler(conf, this.context, this.deletionService);
+ addIfService(logHandler);
+ dispatcher.register(LogHandlerEventType.class, logHandler);
+
+ super.init(conf);
+ }
+
+ private void addIfService(Object object) {
+ if (object instanceof Service) {
+ addService((Service) object);
+ }
+ }
+
+ protected LogHandler createLogHandler(Configuration conf, Context context,
DeletionService deletionService) {
- return new LogAggregationService(this.dispatcher, context, deletionService);
+ if (conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
+ return new LogAggregationService(this.dispatcher, context,
+ deletionService);
+ } else {
+ return new NonAggregatingLogHandler(this.dispatcher, deletionService);
+ }
}
public ContainersMonitor getContainersMonitor() {
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java
index 91fda5ab29..f988a3e435 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java
@@ -32,6 +32,6 @@ public enum ApplicationEventType {
// Source: Container
APPLICATION_CONTAINER_FINISHED,
- // Source: Log Aggregation
- APPLICATION_LOG_AGGREGATION_FINISHED
+ // Source: Log Handler
+ APPLICATION_LOG_HANDLING_FINISHED
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index d1914b5f61..9cc5d6f786 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -42,8 +42,8 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -181,7 +181,7 @@ ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
// Transitions from FINISHED state
.addTransition(ApplicationState.FINISHED,
ApplicationState.FINISHED,
- ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
+ ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
new AppLogsAggregatedTransition())
// create the topology tables
@@ -251,7 +251,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
// Inform the logAggregator
app.dispatcher.getEventHandler().handle(
- new LogAggregatorAppStartedEvent(app.appId, app.user,
+ new LogHandlerAppStartedEvent(app.appId, app.user,
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
app.applicationACLs));
@@ -351,7 +351,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
// Inform the logService
app.dispatcher.getEventHandler().handle(
- new LogAggregatorAppFinishedEvent(app.appId));
+ new LogHandlerAppFinishedEvent(app.appId));
}
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 4b9bf16502..81b41a7731 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -56,7 +56,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
@@ -410,7 +410,7 @@ private void finished() {
// Remove the container from the resource-monitor
eventHandler.handle(new ContainerStopMonitoringEvent(containerID));
// Tell the logService too
- eventHandler.handle(new LogAggregatorContainerFinishedEvent(
+ eventHandler.handle(new LogHandlerContainerFinishedEvent(
containerID, exitCode));
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 391c28e01c..5db1b5de50 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -179,7 +179,7 @@ public Object run() throws Exception {
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId,
- ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED));
+ ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
this.appAggregationFinished.set(true);
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index 528cae63d1..d651cb9853 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -43,19 +43,19 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class LogAggregationService extends AbstractService implements
- EventHandler {
+ LogHandler {
private static final Log LOG = LogFactory
.getLog(LogAggregationService.class);
@@ -87,7 +87,6 @@ public class LogAggregationService extends AbstractService implements
Path remoteRootLogDir;
String remoteRootLogDirSuffix;
private NodeId nodeId;
- private boolean isLogAggregationEnabled = false;
private final ConcurrentMap appLogAggregators;
@@ -117,8 +116,6 @@ public synchronized void init(Configuration conf) {
this.remoteRootLogDirSuffix =
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
- this.isLogAggregationEnabled =
- conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
super.init(conf);
}
@@ -411,31 +408,30 @@ private void stopApp(ApplicationId appId) {
}
@Override
- public void handle(LogAggregatorEvent event) {
- if (this.isLogAggregationEnabled) {
- switch (event.getType()) {
- case APPLICATION_STARTED:
- LogAggregatorAppStartedEvent appStartEvent =
- (LogAggregatorAppStartedEvent) event;
- initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
- appStartEvent.getCredentials(),
- appStartEvent.getLogRetentionPolicy(),
- appStartEvent.getApplicationAcls());
- break;
- case CONTAINER_FINISHED:
- LogAggregatorContainerFinishedEvent containerFinishEvent =
- (LogAggregatorContainerFinishedEvent) event;
- stopContainer(containerFinishEvent.getContainerId(),
- containerFinishEvent.getExitCode());
- break;
- case APPLICATION_FINISHED:
- LogAggregatorAppFinishedEvent appFinishedEvent =
- (LogAggregatorAppFinishedEvent) event;
- stopApp(appFinishedEvent.getApplicationId());
- break;
- default:
- ; // Ignore
- }
+ public void handle(LogHandlerEvent event) {
+ switch (event.getType()) {
+ case APPLICATION_STARTED:
+ LogHandlerAppStartedEvent appStartEvent =
+ (LogHandlerAppStartedEvent) event;
+ initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
+ appStartEvent.getCredentials(),
+ appStartEvent.getLogRetentionPolicy(),
+ appStartEvent.getApplicationAcls());
+ break;
+ case CONTAINER_FINISHED:
+ LogHandlerContainerFinishedEvent containerFinishEvent =
+ (LogHandlerContainerFinishedEvent) event;
+ stopContainer(containerFinishEvent.getContainerId(),
+ containerFinishEvent.getExitCode());
+ break;
+ case APPLICATION_FINISHED:
+ LogHandlerAppFinishedEvent appFinishedEvent =
+ (LogHandlerAppFinishedEvent) event;
+ stopApp(appFinishedEvent.getApplicationId());
+ break;
+ default:
+ ; // Ignore
}
+
}
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java
new file mode 100644
index 0000000000..6eb3fb45ab
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java
@@ -0,0 +1,26 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
+
+public interface LogHandler extends EventHandler {
+ public void handle(LogHandlerEvent event);
+}
\ No newline at end of file
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
new file mode 100644
index 0000000000..96833f3d48
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
@@ -0,0 +1,153 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Log Handler which schedules deletion of log files based on the configured log
+ * retention time.
+ */
+public class NonAggregatingLogHandler extends AbstractService implements
+ LogHandler {
+
+ private static final Log LOG = LogFactory
+ .getLog(NonAggregatingLogHandler.class);
+ private final Dispatcher dispatcher;
+ private final DeletionService delService;
+ private final Map appOwners;
+
+ private String[] rootLogDirs;
+ private long deleteDelaySeconds;
+ private ScheduledThreadPoolExecutor sched;
+
+ public NonAggregatingLogHandler(Dispatcher dispatcher,
+ DeletionService delService) {
+ super(NonAggregatingLogHandler.class.getName());
+ this.dispatcher = dispatcher;
+ this.delService = delService;
+ this.appOwners = new ConcurrentHashMap();
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ // Default 3 hours.
+ this.deleteDelaySeconds =
+ conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 3 * 60 * 60);
+ this.rootLogDirs =
+ conf.getStrings(YarnConfiguration.NM_LOG_DIRS,
+ YarnConfiguration.DEFAULT_NM_LOG_DIRS);
+ sched = createScheduledThreadPoolExecutor(conf);
+ super.init(conf);
+ }
+
+ @Override
+ public void stop() {
+ sched.shutdown();
+ boolean isShutdown = false;
+ try {
+ isShutdown = sched.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ sched.shutdownNow();
+ isShutdown = true;
+ }
+ if (!isShutdown) {
+ sched.shutdownNow();
+ }
+ super.stop();
+ }
+
+ @Override
+ public void handle(LogHandlerEvent event) {
+ switch (event.getType()) {
+ case APPLICATION_STARTED:
+ LogHandlerAppStartedEvent appStartedEvent =
+ (LogHandlerAppStartedEvent) event;
+ this.appOwners.put(appStartedEvent.getApplicationId(),
+ appStartedEvent.getUser());
+ break;
+ case CONTAINER_FINISHED:
+ // Ignore
+ break;
+ case APPLICATION_FINISHED:
+ LogHandlerAppFinishedEvent appFinishedEvent =
+ (LogHandlerAppFinishedEvent) event;
+ // Schedule - so that logs are available on the UI till they're deleted.
+ LOG.info("Scheduling Log Deletion for application: "
+ + appFinishedEvent.getApplicationId() + ", with delay of "
+ + this.deleteDelaySeconds + " seconds");
+ sched.schedule(
+ new LogDeleterRunnable(appOwners.remove(appFinishedEvent
+ .getApplicationId()), appFinishedEvent.getApplicationId()),
+ this.deleteDelaySeconds, TimeUnit.SECONDS);
+ break;
+ default:
+ ; // Ignore
+ }
+ }
+
+ ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(
+ Configuration conf) {
+ ThreadFactory tf =
+ new ThreadFactoryBuilder().setNameFormat("LogDeleter #%d").build();
+ sched =
+ new ScheduledThreadPoolExecutor(conf.getInt(
+ YarnConfiguration.NM_LOG_DELETION_THREADS_COUNT,
+ YarnConfiguration.DEFAULT_NM_LOG_DELETE_THREAD_COUNT), tf);
+ return sched;
+ }
+
+ class LogDeleterRunnable implements Runnable {
+ private String user;
+ private ApplicationId applicationId;
+
+ public LogDeleterRunnable(String user, ApplicationId applicationId) {
+ this.user = user;
+ this.applicationId = applicationId;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void run() {
+ Path[] localAppLogDirs =
+ new Path[NonAggregatingLogHandler.this.rootLogDirs.length];
+ int index = 0;
+ for (String rootLogDir : NonAggregatingLogHandler.this.rootLogDirs) {
+ localAppLogDirs[index] = new Path(rootLogDir, applicationId.toString());
+ index++;
+ }
+ // Inform the application before the actual delete itself, so that links
+ // to logs will no longer be there on NM web-UI.
+ NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle(
+ new ApplicationEvent(this.applicationId,
+ ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
+ NonAggregatingLogHandler.this.delService.delete(user, null,
+ localAppLogDirs);
+ }
+
+ @Override
+ public String toString() {
+ return "LogDeleter for AppId " + this.applicationId.toString()
+ + ", owned by " + user;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppFinishedEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppFinishedEvent.java
similarity index 83%
rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppFinishedEvent.java
rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppFinishedEvent.java
index 5ff2e2194e..4da8f31569 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppFinishedEvent.java
@@ -16,16 +16,16 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-public class LogAggregatorAppFinishedEvent extends LogAggregatorEvent {
+public class LogHandlerAppFinishedEvent extends LogHandlerEvent {
private final ApplicationId applicationId;
- public LogAggregatorAppFinishedEvent(ApplicationId appId) {
- super(LogAggregatorEventType.APPLICATION_FINISHED);
+ public LogHandlerAppFinishedEvent(ApplicationId appId) {
+ super(LogHandlerEventType.APPLICATION_FINISHED);
this.applicationId = appId;
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
similarity index 90%
rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java
rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
index 3d93c51c21..a598bed264 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
import java.util.Map;
@@ -25,7 +25,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
-public class LogAggregatorAppStartedEvent extends LogAggregatorEvent {
+public class LogHandlerAppStartedEvent extends LogHandlerEvent {
private final ApplicationId applicationId;
private final ContainerLogsRetentionPolicy retentionPolicy;
@@ -33,10 +33,10 @@ public class LogAggregatorAppStartedEvent extends LogAggregatorEvent {
private final Credentials credentials;
private final Map appAcls;
- public LogAggregatorAppStartedEvent(ApplicationId appId, String user,
+ public LogHandlerAppStartedEvent(ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
Map appAcls) {
- super(LogAggregatorEventType.APPLICATION_STARTED);
+ super(LogHandlerEventType.APPLICATION_STARTED);
this.applicationId = appId;
this.user = user;
this.credentials = credentials;
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorContainerFinishedEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerFinishedEvent.java
similarity index 84%
rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorContainerFinishedEvent.java
rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerFinishedEvent.java
index 68ec27a73a..038006edcf 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorContainerFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerFinishedEvent.java
@@ -16,18 +16,18 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
import org.apache.hadoop.yarn.api.records.ContainerId;
-public class LogAggregatorContainerFinishedEvent extends LogAggregatorEvent {
+public class LogHandlerContainerFinishedEvent extends LogHandlerEvent {
private final ContainerId containerId;
private final int exitCode;
- public LogAggregatorContainerFinishedEvent(ContainerId containerId,
+ public LogHandlerContainerFinishedEvent(ContainerId containerId,
int exitCode) {
- super(LogAggregatorEventType.CONTAINER_FINISHED);
+ super(LogHandlerEventType.CONTAINER_FINISHED);
this.containerId = containerId;
this.exitCode = exitCode;
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEvent.java
similarity index 84%
rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEvent.java
rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEvent.java
index 052d080803..3d255467e6 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEvent.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEvent.java
@@ -16,14 +16,14 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
-public class LogAggregatorEvent extends AbstractEvent{
+public class LogHandlerEvent extends AbstractEvent{
- public LogAggregatorEvent(LogAggregatorEventType type) {
+ public LogHandlerEvent(LogHandlerEventType type) {
super(type);
}
-}
+}
\ No newline at end of file
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEventType.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java
similarity index 93%
rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEventType.java
rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java
index 64adf746d0..684d6b2605 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEventType.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java
@@ -16,8 +16,8 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
-public enum LogAggregatorEventType {
+public enum LogHandlerEventType {
APPLICATION_STARTED, CONTAINER_FINISHED, APPLICATION_FINISHED
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsBlock.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsBlock.java
index f36dbab5a6..9a75aa24e4 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsBlock.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsBlock.java
@@ -53,6 +53,14 @@ protected void render(Block html) {
logEntity = containerId.toString();
}
+ if (!conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
+ html.h1()
+ ._("Aggregation is not enabled. Try the nodemanager at " + nodeId)
+ ._();
+ return;
+ }
+
Path remoteRootLogDir =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
@@ -69,7 +77,7 @@ protected void render(Block html) {
._("Logs not available for "
+ logEntity
+ ". Aggregation may not be complete, "
- + "Check back later or try the nodemanager on "
+ + "Check back later or try the nodemanager at "
+ nodeId)._();
return;
} catch (IOException e) {
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java
index f4e40ad18f..cf69b0f4ee 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java
@@ -86,7 +86,9 @@ public void logs() {
ApplicationId appId =
containerId.getApplicationAttemptId().getApplicationId();
Application app = nmContext.getApplications().get(appId);
- if (app == null) {
+ if (app == null
+ && nmConf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
String logServerUrl = nmConf.get(YarnConfiguration.YARN_LOG_SERVER_URL);
String redirectUrl = null;
if (logServerUrl == null || logServerUrl.isEmpty()) {
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
index 5f7560fd3c..74d9979691 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
@@ -24,11 +24,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
@@ -49,8 +47,8 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
public class DummyContainerManager extends ContainerManagerImpl {
@@ -68,6 +66,7 @@ public DummyContainerManager(Context context, ContainerExecutor exec,
}
@Override
+ @SuppressWarnings("unchecked")
protected ResourceLocalizationService createResourceLocalizationService(ContainerExecutor exec,
DeletionService deletionContext) {
return new ResourceLocalizationService(super.dispatcher, exec, deletionContext) {
@@ -123,6 +122,7 @@ public void handle(LocalizationEvent event) {
}
@Override
+ @SuppressWarnings("unchecked")
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
return new ContainersLauncher(context, super.dispatcher, exec) {
@@ -147,23 +147,23 @@ public void handle(ContainersLauncherEvent event) {
}
@Override
- protected LogAggregationService createLogAggregationService(Context context,
- DeletionService deletionService) {
- return new LogAggregationService(new AsyncDispatcher(), context,
- deletionService) {
+ protected LogHandler createLogHandler(Configuration conf,
+ Context context, DeletionService deletionService) {
+ return new LogHandler() {
+
@Override
- public void handle(LogAggregatorEvent event) {
+ public void handle(LogHandlerEvent event) {
switch (event.getType()) {
- case APPLICATION_STARTED:
- break;
- case CONTAINER_FINISHED:
- break;
- case APPLICATION_FINISHED:
- break;
- default:
- // Ignore
- }
+ case APPLICATION_STARTED:
+ break;
+ case CONTAINER_FINISHED:
+ break;
+ case APPLICATION_FINISHED:
+ break;
+ default:
+ // Ignore
+ }
}
};
}
-}
+}
\ No newline at end of file
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
index 10056ff551..111d5c4e8f 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
@@ -33,8 +33,8 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -349,7 +349,7 @@ private class WrappedApplication {
final EventHandler monitorBus;
final EventHandler auxBus;
final EventHandler containerBus;
- final EventHandler logAggregationBus;
+ final EventHandler logAggregationBus;
final String user;
final List containers;
final Context context;
@@ -373,7 +373,7 @@ private class WrappedApplication {
dispatcher.register(ContainersMonitorEventType.class, monitorBus);
dispatcher.register(AuxServicesEventType.class, auxBus);
dispatcher.register(ContainerEventType.class, containerBus);
- dispatcher.register(LogAggregatorEventType.class, logAggregationBus);
+ dispatcher.register(LogHandlerEventType.class, logAggregationBus);
context = mock(Context.class);
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index b4db6d1108..bc420e4f92 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -70,9 +70,9 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogReader;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
@@ -114,7 +114,6 @@ public void testLocalFileDeletionAfterUpload() throws IOException {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
- this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true);
DrainDispatcher dispatcher = createDispatcher();
EventHandler appEventHandler = mock(EventHandler.class);
@@ -133,7 +132,7 @@ public void testLocalFileDeletionAfterUpload() throws IOException {
new File(localLogDir, ConverterUtils.toString(application1));
app1LogDir.mkdir();
logAggregationService
- .handle(new LogAggregatorAppStartedEvent(
+ .handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
@@ -143,9 +142,9 @@ public void testLocalFileDeletionAfterUpload() throws IOException {
// Simulate log-file creation
writeContainerLogs(app1LogDir, container11);
logAggregationService.handle(
- new LogAggregatorContainerFinishedEvent(container11, 0));
+ new LogHandlerContainerFinishedEvent(container11, 0));
- logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+ logAggregationService.handle(new LogHandlerAppFinishedEvent(
application1));
logAggregationService.stop();
@@ -169,7 +168,7 @@ public void testLocalFileDeletionAfterUpload() throws IOException {
ArgumentCaptor eventCaptor =
ArgumentCaptor.forClass(ApplicationEvent.class);
verify(appEventHandler).handle(eventCaptor.capture());
- assertEquals(ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
+ assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
eventCaptor.getValue().getType());
assertEquals(appAttemptId.getApplicationId(), eventCaptor.getValue()
.getApplicationID());
@@ -182,7 +181,6 @@ public void testNoContainerOnNode() {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
- this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true);
DrainDispatcher dispatcher = createDispatcher();
EventHandler appEventHandler = mock(EventHandler.class);
@@ -200,11 +198,11 @@ public void testNoContainerOnNode() {
new File(localLogDir, ConverterUtils.toString(application1));
app1LogDir.mkdir();
logAggregationService
- .handle(new LogAggregatorAppStartedEvent(
+ .handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
- logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+ logAggregationService.handle(new LogHandlerAppFinishedEvent(
application1));
logAggregationService.stop();
@@ -217,7 +215,7 @@ public void testNoContainerOnNode() {
ArgumentCaptor eventCaptor =
ArgumentCaptor.forClass(ApplicationEvent.class);
verify(appEventHandler).handle(eventCaptor.capture());
- assertEquals(ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
+ assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
eventCaptor.getValue().getType());
verify(appEventHandler).handle(eventCaptor.capture());
assertEquals(application1, eventCaptor.getValue()
@@ -231,7 +229,6 @@ public void testMultipleAppsLogAggregation() throws IOException {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
- this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true);
DrainDispatcher dispatcher = createDispatcher();
EventHandler appEventHandler = mock(EventHandler.class);
@@ -249,7 +246,7 @@ public void testMultipleAppsLogAggregation() throws IOException {
new File(localLogDir, ConverterUtils.toString(application1));
app1LogDir.mkdir();
logAggregationService
- .handle(new LogAggregatorAppStartedEvent(
+ .handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
@@ -260,7 +257,7 @@ public void testMultipleAppsLogAggregation() throws IOException {
// Simulate log-file creation
writeContainerLogs(app1LogDir, container11);
logAggregationService.handle(
- new LogAggregatorContainerFinishedEvent(container11, 0));
+ new LogHandlerContainerFinishedEvent(container11, 0));
ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2);
ApplicationAttemptId appAttemptId2 =
@@ -269,7 +266,7 @@ public void testMultipleAppsLogAggregation() throws IOException {
File app2LogDir =
new File(localLogDir, ConverterUtils.toString(application2));
app2LogDir.mkdir();
- logAggregationService.handle(new LogAggregatorAppStartedEvent(
+ logAggregationService.handle(new LogHandlerAppStartedEvent(
application2, this.user, null,
ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls));
@@ -278,13 +275,13 @@ public void testMultipleAppsLogAggregation() throws IOException {
writeContainerLogs(app2LogDir, container21);
logAggregationService.handle(
- new LogAggregatorContainerFinishedEvent(container21, 0));
+ new LogHandlerContainerFinishedEvent(container21, 0));
ContainerId container12 = BuilderUtils.newContainerId(appAttemptId1, 2);
writeContainerLogs(app1LogDir, container12);
logAggregationService.handle(
- new LogAggregatorContainerFinishedEvent(container12, 0));
+ new LogHandlerContainerFinishedEvent(container12, 0));
ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3);
ApplicationAttemptId appAttemptId3 =
@@ -293,7 +290,7 @@ public void testMultipleAppsLogAggregation() throws IOException {
File app3LogDir =
new File(localLogDir, ConverterUtils.toString(application3));
app3LogDir.mkdir();
- logAggregationService.handle(new LogAggregatorAppStartedEvent(application3,
+ logAggregationService.handle(new LogHandlerAppStartedEvent(application3,
this.user, null,
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
@@ -301,28 +298,28 @@ public void testMultipleAppsLogAggregation() throws IOException {
ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1);
writeContainerLogs(app3LogDir, container31);
logAggregationService.handle(
- new LogAggregatorContainerFinishedEvent(container31, 0));
+ new LogHandlerContainerFinishedEvent(container31, 0));
ContainerId container32 = BuilderUtils.newContainerId(appAttemptId3, 2);
writeContainerLogs(app3LogDir, container32);
logAggregationService.handle(
- new LogAggregatorContainerFinishedEvent(container32, 1)); // Failed
+ new LogHandlerContainerFinishedEvent(container32, 1)); // Failed
ContainerId container22 = BuilderUtils.newContainerId(appAttemptId2, 2);
writeContainerLogs(app2LogDir, container22);
logAggregationService.handle(
- new LogAggregatorContainerFinishedEvent(container22, 0));
+ new LogHandlerContainerFinishedEvent(container22, 0));
ContainerId container33 = BuilderUtils.newContainerId(appAttemptId3, 3);
writeContainerLogs(app3LogDir, container33);
logAggregationService.handle(
- new LogAggregatorContainerFinishedEvent(container33, 0));
+ new LogHandlerContainerFinishedEvent(container33, 0));
- logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+ logAggregationService.handle(new LogHandlerAppFinishedEvent(
application2));
- logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+ logAggregationService.handle(new LogHandlerAppFinishedEvent(
application3));
- logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+ logAggregationService.handle(new LogHandlerAppFinishedEvent(
application1));
logAggregationService.stop();
@@ -342,7 +339,7 @@ public void testMultipleAppsLogAggregation() throws IOException {
List capturedEvents = eventCaptor.getAllValues();
Set appIds = new HashSet();
for (ApplicationEvent cap : capturedEvents) {
- assertEquals(ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
+ assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
eventCaptor.getValue().getType());
appIds.add(cap.getApplicationID());
}
@@ -447,7 +444,6 @@ private void verifyContainerLogs(
public void testLogAggregationForRealContainerLaunch() throws IOException,
InterruptedException {
- this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true);
this.containerManager.start();
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
new file mode 100644
index 0000000000..6eacb8aa72
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
@@ -0,0 +1,187 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.File;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+import org.mockito.exceptions.verification.WantedButNotInvoked;
+
+public class TestNonAggregatingLogHandler {
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testLogDeletion() {
+ DeletionService delService = mock(DeletionService.class);
+ Configuration conf = new YarnConfiguration();
+ String user = "testuser";
+
+ File[] localLogDirs = new File[2];
+ localLogDirs[0] =
+ new File("target", this.getClass().getName() + "-localLogDir0")
+ .getAbsoluteFile();
+ localLogDirs[1] =
+ new File("target", this.getClass().getName() + "-localLogDir1")
+ .getAbsoluteFile();
+ String localLogDirsString =
+ localLogDirs[0].getAbsolutePath() + ","
+ + localLogDirs[1].getAbsolutePath();
+
+ conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
+ conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
+ conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l);
+
+ DrainDispatcher dispatcher = createDispatcher(conf);
+ EventHandler appEventHandler = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, appEventHandler);
+
+ ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
+ ApplicationAttemptId appAttemptId1 =
+ BuilderUtils.newApplicationAttemptId(appId1, 1);
+ ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
+
+ NonAggregatingLogHandler logHandler =
+ new NonAggregatingLogHandler(dispatcher, delService);
+ logHandler.init(conf);
+ logHandler.start();
+
+ logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null,
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
+
+ logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
+
+ logHandler.handle(new LogHandlerAppFinishedEvent(appId1));
+
+ Path[] localAppLogDirs = new Path[2];
+ localAppLogDirs[0] =
+ new Path(localLogDirs[0].getAbsolutePath(), appId1.toString());
+ localAppLogDirs[1] =
+ new Path(localLogDirs[1].getAbsolutePath(), appId1.toString());
+
+ // 5 seconds for the delete which is a separate thread.
+ long verifyStartTime = System.currentTimeMillis();
+ WantedButNotInvoked notInvokedException = null;
+ boolean matched = false;
+ while (!matched && System.currentTimeMillis() < verifyStartTime + 5000l) {
+ try {
+ verify(delService).delete(eq(user), (Path) eq(null),
+ eq(localAppLogDirs[0]), eq(localAppLogDirs[1]));
+ matched = true;
+ } catch (WantedButNotInvoked e) {
+ notInvokedException = e;
+ try {
+ Thread.sleep(50l);
+ } catch (InterruptedException i) {
+ }
+ }
+ }
+ if (!matched) {
+ throw notInvokedException;
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testDelayedDelete() {
+ DeletionService delService = mock(DeletionService.class);
+ Configuration conf = new YarnConfiguration();
+ String user = "testuser";
+
+ File[] localLogDirs = new File[2];
+ localLogDirs[0] =
+ new File("target", this.getClass().getName() + "-localLogDir0")
+ .getAbsoluteFile();
+ localLogDirs[1] =
+ new File("target", this.getClass().getName() + "-localLogDir1")
+ .getAbsoluteFile();
+ String localLogDirsString =
+ localLogDirs[0].getAbsolutePath() + ","
+ + localLogDirs[1].getAbsolutePath();
+
+ conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
+ conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
+
+ conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 10800l);
+
+ DrainDispatcher dispatcher = createDispatcher(conf);
+ EventHandler appEventHandler = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, appEventHandler);
+
+ ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
+ ApplicationAttemptId appAttemptId1 =
+ BuilderUtils.newApplicationAttemptId(appId1, 1);
+ ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
+
+ NonAggregatingLogHandler logHandler =
+ new NonAggregatingLogHandlerWithMockExecutor(dispatcher, delService);
+ logHandler.init(conf);
+ logHandler.start();
+
+ logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null,
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
+
+ logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
+
+ logHandler.handle(new LogHandlerAppFinishedEvent(appId1));
+
+ Path[] localAppLogDirs = new Path[2];
+ localAppLogDirs[0] =
+ new Path(localLogDirs[0].getAbsolutePath(), appId1.toString());
+ localAppLogDirs[1] =
+ new Path(localLogDirs[1].getAbsolutePath(), appId1.toString());
+
+ ScheduledThreadPoolExecutor mockSched =
+ ((NonAggregatingLogHandlerWithMockExecutor) logHandler).mockSched;
+
+ verify(mockSched).schedule(any(Runnable.class), eq(10800l),
+ eq(TimeUnit.SECONDS));
+ }
+
+ private class NonAggregatingLogHandlerWithMockExecutor extends
+ NonAggregatingLogHandler {
+
+ private ScheduledThreadPoolExecutor mockSched;
+
+ public NonAggregatingLogHandlerWithMockExecutor(Dispatcher dispatcher,
+ DeletionService delService) {
+ super(dispatcher, delService);
+ }
+
+ @Override
+ ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(
+ Configuration conf) {
+ mockSched = mock(ScheduledThreadPoolExecutor.class);
+ return mockSched;
+ }
+
+ }
+
+ private DrainDispatcher createDispatcher(Configuration conf) {
+ DrainDispatcher dispatcher = new DrainDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ return dispatcher;
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm
index f9371594a1..5ce75d21d0 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm
@@ -224,18 +224,13 @@ Hadoop MapReduce Next Generation - Cluster Setup
| | <<>> Scheduler class. | |
| | | <<>> (recommended) or <<>> |
*-------------------------+-------------------------+------------------------+
-| <<>> | | |
-| | | |
-| | | HDFS directory where the application logs are moved on application |
-| | | completion. Need to set appropriate permissions. |
-*-------------------------+-------------------------+------------------------+
| <<>> / | | |
| <<>> | | |
| | List of permitted/excluded NodeManagers. | |
| | | If necessary, use these files to control the list of allowable |
| | | NodeManagers. |
*-------------------------+-------------------------+------------------------+
-|
+
* Configurations for NodeManager:
*-------------------------+-------------------------+------------------------+
@@ -263,6 +258,27 @@ Hadoop MapReduce Next Generation - Cluster Setup
| | are written. | |
| | | Multiple paths help spread disk i/o. |
*-------------------------+-------------------------+------------------------+
+| <<>> | | |
+| | | |
+| | | Configuration to enable or disable log aggregation |
+*-------------------------+-------------------------+------------------------+
+| <<>> | | |
+| | <10800> | |
+| | | Default time (in seconds) to retain log files on the NodeManager |
+| | | Only applicable if log-aggregation is disabled. |
+*-------------------------+-------------------------+------------------------+
+| <<>> | | |
+| | | |
+| | | HDFS directory where the application logs are moved on application |
+| | | completion. Need to set appropriate permissions. |
+| | | Only applicable if log-aggregation is enabled. |
+*-------------------------+-------------------------+------------------------+
+| <<>> | | |
+| | | |
+| | | Suffix appended to the remote log dir. Logs will be aggregated to |
+| | | $\{yarn.nodemanager.remote-app-log-dir\}/$\{user\}/$\{thisParam\} |
+| | | Only applicable if log-aggregation is enabled. |
+*-------------------------+-------------------------+------------------------+
| <<>> | | |
| | mapreduce.shuffle | |
| | | Shuffle service that needs to be set for Map Reduce applications. |