MAPREDUCE-2696. Fixed NodeManager to cleanup logs in a thread when logs' aggregation is not enabled. Contributed by Siddharth Seth.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1195383 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
47a381e306
commit
a75c4cf4e4
@ -1874,6 +1874,9 @@ Release 0.23.0 - Unreleased
|
|||||||
MAPREDUCE-2766. Fixed NM to set secure permissions for files and directories
|
MAPREDUCE-2766. Fixed NM to set secure permissions for files and directories
|
||||||
in distributed-cache. (Hitesh Shah via vinodkv)
|
in distributed-cache. (Hitesh Shah via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-2696. Fixed NodeManager to cleanup logs in a thread when logs'
|
||||||
|
aggregation is not enabled. (Siddharth Seth via vinodkv)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -228,9 +228,7 @@ public void testLogsView2() throws IOException {
|
|||||||
params);
|
params);
|
||||||
PrintWriter spyPw = WebAppTests.getPrintWriter(injector);
|
PrintWriter spyPw = WebAppTests.getPrintWriter(injector);
|
||||||
verify(spyPw).write(
|
verify(spyPw).write(
|
||||||
"Logs not available for container_10_0001_01_000001. Aggregation "
|
"Aggregation is not enabled. Try the nodemanager at "
|
||||||
+ "may not be complete,"
|
|
||||||
+ " Check back later or try the nodemanager on "
|
|
||||||
+ MockJobs.NM_HOST + ":" + MockJobs.NM_PORT);
|
+ MockJobs.NM_HOST + ":" + MockJobs.NM_PORT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -89,6 +89,12 @@
|
|||||||
<Method name="handle" />
|
<Method name="handle" />
|
||||||
<Bug pattern="BC_UNCONFIRMED_CAST" />
|
<Bug pattern="BC_UNCONFIRMED_CAST" />
|
||||||
</Match>
|
</Match>
|
||||||
|
<Match>
|
||||||
|
<Class name="~org\.apache\.hadoop\.yarn\.server\.nodemanager\.containermanager\.loghandler\.NonAggregatingLogHandler.*" />
|
||||||
|
<Method name="handle" />
|
||||||
|
<Bug pattern="BC_UNCONFIRMED_CAST" />
|
||||||
|
</Match>
|
||||||
|
|
||||||
|
|
||||||
<!-- Ignore intentional switch fallthroughs -->
|
<!-- Ignore intentional switch fallthroughs -->
|
||||||
<Match>
|
<Match>
|
||||||
|
@ -288,15 +288,31 @@ public class YarnConfiguration extends Configuration {
|
|||||||
public static final String NM_LOCALIZER_FETCH_THREAD_COUNT =
|
public static final String NM_LOCALIZER_FETCH_THREAD_COUNT =
|
||||||
NM_PREFIX + "localizer.fetch.thread-count";
|
NM_PREFIX + "localizer.fetch.thread-count";
|
||||||
public static final int DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT = 4;
|
public static final int DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT = 4;
|
||||||
|
|
||||||
/** Where to store container logs.*/
|
/** Where to store container logs.*/
|
||||||
public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
|
public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
|
||||||
public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
|
public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
|
||||||
|
|
||||||
/** Whether to enable log aggregation */
|
/** Whether to enable log aggregation */
|
||||||
public static final String NM_LOG_AGGREGATION_ENABLED = NM_PREFIX
|
public static final String NM_LOG_AGGREGATION_ENABLED = NM_PREFIX
|
||||||
+ "log-aggregation.enable";
|
+ "log-aggregation-enable";
|
||||||
|
public static final boolean DEFAULT_NM_LOG_AGGREGATION_ENABLED = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of seconds to retain logs on the NodeManager. Only applicable if Log
|
||||||
|
* aggregation is disabled
|
||||||
|
*/
|
||||||
|
public static final String NM_LOG_RETAIN_SECONDS = NM_PREFIX
|
||||||
|
+ "log.retain-seconds";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of threads used in log cleanup. Only applicable if Log aggregation
|
||||||
|
* is disabled
|
||||||
|
*/
|
||||||
|
public static final String NM_LOG_DELETION_THREADS_COUNT =
|
||||||
|
NM_PREFIX + "log.deletion-threads-count";
|
||||||
|
public static final int DEFAULT_NM_LOG_DELETE_THREAD_COUNT = 4;
|
||||||
|
|
||||||
/** Where to aggregate logs to.*/
|
/** Where to aggregate logs to.*/
|
||||||
public static final String NM_REMOTE_APP_LOG_DIR =
|
public static final String NM_REMOTE_APP_LOG_DIR =
|
||||||
NM_PREFIX + "remote-app-log-dir";
|
NM_PREFIX + "remote-app-log-dir";
|
||||||
@ -312,11 +328,11 @@ public class YarnConfiguration extends Configuration {
|
|||||||
|
|
||||||
public static final String YARN_LOG_SERVER_URL =
|
public static final String YARN_LOG_SERVER_URL =
|
||||||
YARN_PREFIX + "log.server.url";
|
YARN_PREFIX + "log.server.url";
|
||||||
|
|
||||||
/** Amount of memory in GB that can be allocated for containers.*/
|
/** Amount of memory in GB that can be allocated for containers.*/
|
||||||
public static final String NM_PMEM_MB = NM_PREFIX + "resource.memory-mb";
|
public static final String NM_PMEM_MB = NM_PREFIX + "resource.memory-mb";
|
||||||
public static final int DEFAULT_NM_PMEM_MB = 8 * 1024;
|
public static final int DEFAULT_NM_PMEM_MB = 8 * 1024;
|
||||||
|
|
||||||
public static final String NM_VMEM_PMEM_RATIO =
|
public static final String NM_VMEM_PMEM_RATIO =
|
||||||
NM_PREFIX + "vmem-pmem-ratio";
|
NM_PREFIX + "vmem-pmem-ratio";
|
||||||
public static final float DEFAULT_NM_VMEM_PMEM_RATIO = 2.1f;
|
public static final float DEFAULT_NM_VMEM_PMEM_RATIO = 2.1f;
|
||||||
|
@ -281,9 +281,18 @@
|
|||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Whether to enable log aggregation</description>
|
<description>Whether to enable log aggregation</description>
|
||||||
<name>yarn.nodemanager.log-aggregation.enable</name>
|
<name>yarn.nodemanager.log-aggregation-enable</name>
|
||||||
<value>false</value>
|
<value>false</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Time in seconds to retain user logs. Only applicable if
|
||||||
|
log aggregation is disabled
|
||||||
|
</description>
|
||||||
|
<name>yarn.nodemanager.log.retain-seconds</name>
|
||||||
|
<value>10800</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Where to aggregate logs to.</description>
|
<description>Where to aggregate logs to.</description>
|
||||||
<name>yarn.nodemanager.remote-app-log-dir</name>
|
<name>yarn.nodemanager.remote-app-log-dir</name>
|
||||||
|
@ -87,7 +87,9 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
|
||||||
@ -154,9 +156,6 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
|
|||||||
new ContainersMonitorImpl(exec, dispatcher, this.context);
|
new ContainersMonitorImpl(exec, dispatcher, this.context);
|
||||||
addService(this.containersMonitor);
|
addService(this.containersMonitor);
|
||||||
|
|
||||||
LogAggregationService logAggregationService =
|
|
||||||
createLogAggregationService(this.context, this.deletionService);
|
|
||||||
addService(logAggregationService);
|
|
||||||
|
|
||||||
dispatcher.register(ContainerEventType.class,
|
dispatcher.register(ContainerEventType.class,
|
||||||
new ContainerEventDispatcher());
|
new ContainerEventDispatcher());
|
||||||
@ -166,13 +165,35 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
|
|||||||
dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
|
dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
|
||||||
dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
|
dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
|
||||||
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
|
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
|
||||||
dispatcher.register(LogAggregatorEventType.class, logAggregationService);
|
|
||||||
addService(dispatcher);
|
addService(dispatcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected LogAggregationService createLogAggregationService(Context context,
|
@Override
|
||||||
|
public void init(Configuration conf) {
|
||||||
|
LogHandler logHandler =
|
||||||
|
createLogHandler(conf, this.context, this.deletionService);
|
||||||
|
addIfService(logHandler);
|
||||||
|
dispatcher.register(LogHandlerEventType.class, logHandler);
|
||||||
|
|
||||||
|
super.init(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addIfService(Object object) {
|
||||||
|
if (object instanceof Service) {
|
||||||
|
addService((Service) object);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected LogHandler createLogHandler(Configuration conf, Context context,
|
||||||
DeletionService deletionService) {
|
DeletionService deletionService) {
|
||||||
return new LogAggregationService(this.dispatcher, context, deletionService);
|
if (conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
|
||||||
|
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
|
||||||
|
return new LogAggregationService(this.dispatcher, context,
|
||||||
|
deletionService);
|
||||||
|
} else {
|
||||||
|
return new NonAggregatingLogHandler(this.dispatcher, deletionService);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public ContainersMonitor getContainersMonitor() {
|
public ContainersMonitor getContainersMonitor() {
|
||||||
|
@ -32,6 +32,6 @@ public enum ApplicationEventType {
|
|||||||
// Source: Container
|
// Source: Container
|
||||||
APPLICATION_CONTAINER_FINISHED,
|
APPLICATION_CONTAINER_FINISHED,
|
||||||
|
|
||||||
// Source: Log Aggregation
|
// Source: Log Handler
|
||||||
APPLICATION_LOG_AGGREGATION_FINISHED
|
APPLICATION_LOG_HANDLING_FINISHED
|
||||||
}
|
}
|
||||||
|
@ -42,8 +42,8 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
||||||
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
||||||
@ -181,7 +181,7 @@ ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
|
|||||||
// Transitions from FINISHED state
|
// Transitions from FINISHED state
|
||||||
.addTransition(ApplicationState.FINISHED,
|
.addTransition(ApplicationState.FINISHED,
|
||||||
ApplicationState.FINISHED,
|
ApplicationState.FINISHED,
|
||||||
ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
|
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
|
||||||
new AppLogsAggregatedTransition())
|
new AppLogsAggregatedTransition())
|
||||||
|
|
||||||
// create the topology tables
|
// create the topology tables
|
||||||
@ -251,7 +251,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
|
|||||||
|
|
||||||
// Inform the logAggregator
|
// Inform the logAggregator
|
||||||
app.dispatcher.getEventHandler().handle(
|
app.dispatcher.getEventHandler().handle(
|
||||||
new LogAggregatorAppStartedEvent(app.appId, app.user,
|
new LogHandlerAppStartedEvent(app.appId, app.user,
|
||||||
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
|
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
|
||||||
app.applicationACLs));
|
app.applicationACLs));
|
||||||
|
|
||||||
@ -351,7 +351,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
|
|||||||
|
|
||||||
// Inform the logService
|
// Inform the logService
|
||||||
app.dispatcher.getEventHandler().handle(
|
app.dispatcher.getEventHandler().handle(
|
||||||
new LogAggregatorAppFinishedEvent(app.appId));
|
new LogHandlerAppFinishedEvent(app.appId));
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -56,7 +56,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
@ -410,7 +410,7 @@ private void finished() {
|
|||||||
// Remove the container from the resource-monitor
|
// Remove the container from the resource-monitor
|
||||||
eventHandler.handle(new ContainerStopMonitoringEvent(containerID));
|
eventHandler.handle(new ContainerStopMonitoringEvent(containerID));
|
||||||
// Tell the logService too
|
// Tell the logService too
|
||||||
eventHandler.handle(new LogAggregatorContainerFinishedEvent(
|
eventHandler.handle(new LogHandlerContainerFinishedEvent(
|
||||||
containerID, exitCode));
|
containerID, exitCode));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,7 +179,7 @@ public Object run() throws Exception {
|
|||||||
|
|
||||||
this.dispatcher.getEventHandler().handle(
|
this.dispatcher.getEventHandler().handle(
|
||||||
new ApplicationEvent(this.appId,
|
new ApplicationEvent(this.appId,
|
||||||
ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED));
|
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
|
||||||
|
|
||||||
this.appAggregationFinished.set(true);
|
this.appAggregationFinished.set(true);
|
||||||
}
|
}
|
||||||
|
@ -43,19 +43,19 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
||||||
import org.apache.hadoop.yarn.service.AbstractService;
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
public class LogAggregationService extends AbstractService implements
|
public class LogAggregationService extends AbstractService implements
|
||||||
EventHandler<LogAggregatorEvent> {
|
LogHandler {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory
|
private static final Log LOG = LogFactory
|
||||||
.getLog(LogAggregationService.class);
|
.getLog(LogAggregationService.class);
|
||||||
@ -87,7 +87,6 @@ public class LogAggregationService extends AbstractService implements
|
|||||||
Path remoteRootLogDir;
|
Path remoteRootLogDir;
|
||||||
String remoteRootLogDirSuffix;
|
String remoteRootLogDirSuffix;
|
||||||
private NodeId nodeId;
|
private NodeId nodeId;
|
||||||
private boolean isLogAggregationEnabled = false;
|
|
||||||
|
|
||||||
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
|
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
|
||||||
|
|
||||||
@ -117,8 +116,6 @@ public synchronized void init(Configuration conf) {
|
|||||||
this.remoteRootLogDirSuffix =
|
this.remoteRootLogDirSuffix =
|
||||||
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
|
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
|
||||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
|
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
|
||||||
this.isLogAggregationEnabled =
|
|
||||||
conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
|
|
||||||
|
|
||||||
super.init(conf);
|
super.init(conf);
|
||||||
}
|
}
|
||||||
@ -411,31 +408,30 @@ private void stopApp(ApplicationId appId) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handle(LogAggregatorEvent event) {
|
public void handle(LogHandlerEvent event) {
|
||||||
if (this.isLogAggregationEnabled) {
|
switch (event.getType()) {
|
||||||
switch (event.getType()) {
|
case APPLICATION_STARTED:
|
||||||
case APPLICATION_STARTED:
|
LogHandlerAppStartedEvent appStartEvent =
|
||||||
LogAggregatorAppStartedEvent appStartEvent =
|
(LogHandlerAppStartedEvent) event;
|
||||||
(LogAggregatorAppStartedEvent) event;
|
initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
|
||||||
initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
|
appStartEvent.getCredentials(),
|
||||||
appStartEvent.getCredentials(),
|
appStartEvent.getLogRetentionPolicy(),
|
||||||
appStartEvent.getLogRetentionPolicy(),
|
appStartEvent.getApplicationAcls());
|
||||||
appStartEvent.getApplicationAcls());
|
break;
|
||||||
break;
|
case CONTAINER_FINISHED:
|
||||||
case CONTAINER_FINISHED:
|
LogHandlerContainerFinishedEvent containerFinishEvent =
|
||||||
LogAggregatorContainerFinishedEvent containerFinishEvent =
|
(LogHandlerContainerFinishedEvent) event;
|
||||||
(LogAggregatorContainerFinishedEvent) event;
|
stopContainer(containerFinishEvent.getContainerId(),
|
||||||
stopContainer(containerFinishEvent.getContainerId(),
|
containerFinishEvent.getExitCode());
|
||||||
containerFinishEvent.getExitCode());
|
break;
|
||||||
break;
|
case APPLICATION_FINISHED:
|
||||||
case APPLICATION_FINISHED:
|
LogHandlerAppFinishedEvent appFinishedEvent =
|
||||||
LogAggregatorAppFinishedEvent appFinishedEvent =
|
(LogHandlerAppFinishedEvent) event;
|
||||||
(LogAggregatorAppFinishedEvent) event;
|
stopApp(appFinishedEvent.getApplicationId());
|
||||||
stopApp(appFinishedEvent.getApplicationId());
|
break;
|
||||||
break;
|
default:
|
||||||
default:
|
; // Ignore
|
||||||
; // Ignore
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,26 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
||||||
|
|
||||||
|
public interface LogHandler extends EventHandler<LogHandlerEvent> {
|
||||||
|
public void handle(LogHandlerEvent event);
|
||||||
|
}
|
@ -0,0 +1,153 @@
|
|||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
||||||
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Log Handler which schedules deletion of log files based on the configured log
|
||||||
|
* retention time.
|
||||||
|
*/
|
||||||
|
public class NonAggregatingLogHandler extends AbstractService implements
|
||||||
|
LogHandler {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(NonAggregatingLogHandler.class);
|
||||||
|
private final Dispatcher dispatcher;
|
||||||
|
private final DeletionService delService;
|
||||||
|
private final Map<ApplicationId, String> appOwners;
|
||||||
|
|
||||||
|
private String[] rootLogDirs;
|
||||||
|
private long deleteDelaySeconds;
|
||||||
|
private ScheduledThreadPoolExecutor sched;
|
||||||
|
|
||||||
|
public NonAggregatingLogHandler(Dispatcher dispatcher,
|
||||||
|
DeletionService delService) {
|
||||||
|
super(NonAggregatingLogHandler.class.getName());
|
||||||
|
this.dispatcher = dispatcher;
|
||||||
|
this.delService = delService;
|
||||||
|
this.appOwners = new ConcurrentHashMap<ApplicationId, String>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Configuration conf) {
|
||||||
|
// Default 3 hours.
|
||||||
|
this.deleteDelaySeconds =
|
||||||
|
conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 3 * 60 * 60);
|
||||||
|
this.rootLogDirs =
|
||||||
|
conf.getStrings(YarnConfiguration.NM_LOG_DIRS,
|
||||||
|
YarnConfiguration.DEFAULT_NM_LOG_DIRS);
|
||||||
|
sched = createScheduledThreadPoolExecutor(conf);
|
||||||
|
super.init(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
sched.shutdown();
|
||||||
|
boolean isShutdown = false;
|
||||||
|
try {
|
||||||
|
isShutdown = sched.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
sched.shutdownNow();
|
||||||
|
isShutdown = true;
|
||||||
|
}
|
||||||
|
if (!isShutdown) {
|
||||||
|
sched.shutdownNow();
|
||||||
|
}
|
||||||
|
super.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(LogHandlerEvent event) {
|
||||||
|
switch (event.getType()) {
|
||||||
|
case APPLICATION_STARTED:
|
||||||
|
LogHandlerAppStartedEvent appStartedEvent =
|
||||||
|
(LogHandlerAppStartedEvent) event;
|
||||||
|
this.appOwners.put(appStartedEvent.getApplicationId(),
|
||||||
|
appStartedEvent.getUser());
|
||||||
|
break;
|
||||||
|
case CONTAINER_FINISHED:
|
||||||
|
// Ignore
|
||||||
|
break;
|
||||||
|
case APPLICATION_FINISHED:
|
||||||
|
LogHandlerAppFinishedEvent appFinishedEvent =
|
||||||
|
(LogHandlerAppFinishedEvent) event;
|
||||||
|
// Schedule - so that logs are available on the UI till they're deleted.
|
||||||
|
LOG.info("Scheduling Log Deletion for application: "
|
||||||
|
+ appFinishedEvent.getApplicationId() + ", with delay of "
|
||||||
|
+ this.deleteDelaySeconds + " seconds");
|
||||||
|
sched.schedule(
|
||||||
|
new LogDeleterRunnable(appOwners.remove(appFinishedEvent
|
||||||
|
.getApplicationId()), appFinishedEvent.getApplicationId()),
|
||||||
|
this.deleteDelaySeconds, TimeUnit.SECONDS);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
; // Ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(
|
||||||
|
Configuration conf) {
|
||||||
|
ThreadFactory tf =
|
||||||
|
new ThreadFactoryBuilder().setNameFormat("LogDeleter #%d").build();
|
||||||
|
sched =
|
||||||
|
new ScheduledThreadPoolExecutor(conf.getInt(
|
||||||
|
YarnConfiguration.NM_LOG_DELETION_THREADS_COUNT,
|
||||||
|
YarnConfiguration.DEFAULT_NM_LOG_DELETE_THREAD_COUNT), tf);
|
||||||
|
return sched;
|
||||||
|
}
|
||||||
|
|
||||||
|
class LogDeleterRunnable implements Runnable {
|
||||||
|
private String user;
|
||||||
|
private ApplicationId applicationId;
|
||||||
|
|
||||||
|
public LogDeleterRunnable(String user, ApplicationId applicationId) {
|
||||||
|
this.user = user;
|
||||||
|
this.applicationId = applicationId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void run() {
|
||||||
|
Path[] localAppLogDirs =
|
||||||
|
new Path[NonAggregatingLogHandler.this.rootLogDirs.length];
|
||||||
|
int index = 0;
|
||||||
|
for (String rootLogDir : NonAggregatingLogHandler.this.rootLogDirs) {
|
||||||
|
localAppLogDirs[index] = new Path(rootLogDir, applicationId.toString());
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
// Inform the application before the actual delete itself, so that links
|
||||||
|
// to logs will no longer be there on NM web-UI.
|
||||||
|
NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle(
|
||||||
|
new ApplicationEvent(this.applicationId,
|
||||||
|
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
|
||||||
|
NonAggregatingLogHandler.this.delService.delete(user, null,
|
||||||
|
localAppLogDirs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "LogDeleter for AppId " + this.applicationId.toString()
|
||||||
|
+ ", owned by " + user;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -16,16 +16,16 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
|
||||||
public class LogAggregatorAppFinishedEvent extends LogAggregatorEvent {
|
public class LogHandlerAppFinishedEvent extends LogHandlerEvent {
|
||||||
|
|
||||||
private final ApplicationId applicationId;
|
private final ApplicationId applicationId;
|
||||||
|
|
||||||
public LogAggregatorAppFinishedEvent(ApplicationId appId) {
|
public LogHandlerAppFinishedEvent(ApplicationId appId) {
|
||||||
super(LogAggregatorEventType.APPLICATION_FINISHED);
|
super(LogHandlerEventType.APPLICATION_FINISHED);
|
||||||
this.applicationId = appId;
|
this.applicationId = appId;
|
||||||
}
|
}
|
||||||
|
|
@ -16,7 +16,7 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@ -25,7 +25,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
|
||||||
|
|
||||||
public class LogAggregatorAppStartedEvent extends LogAggregatorEvent {
|
public class LogHandlerAppStartedEvent extends LogHandlerEvent {
|
||||||
|
|
||||||
private final ApplicationId applicationId;
|
private final ApplicationId applicationId;
|
||||||
private final ContainerLogsRetentionPolicy retentionPolicy;
|
private final ContainerLogsRetentionPolicy retentionPolicy;
|
||||||
@ -33,10 +33,10 @@ public class LogAggregatorAppStartedEvent extends LogAggregatorEvent {
|
|||||||
private final Credentials credentials;
|
private final Credentials credentials;
|
||||||
private final Map<ApplicationAccessType, String> appAcls;
|
private final Map<ApplicationAccessType, String> appAcls;
|
||||||
|
|
||||||
public LogAggregatorAppStartedEvent(ApplicationId appId, String user,
|
public LogHandlerAppStartedEvent(ApplicationId appId, String user,
|
||||||
Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
|
Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
|
||||||
Map<ApplicationAccessType, String> appAcls) {
|
Map<ApplicationAccessType, String> appAcls) {
|
||||||
super(LogAggregatorEventType.APPLICATION_STARTED);
|
super(LogHandlerEventType.APPLICATION_STARTED);
|
||||||
this.applicationId = appId;
|
this.applicationId = appId;
|
||||||
this.user = user;
|
this.user = user;
|
||||||
this.credentials = credentials;
|
this.credentials = credentials;
|
@ -16,18 +16,18 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
|
||||||
public class LogAggregatorContainerFinishedEvent extends LogAggregatorEvent {
|
public class LogHandlerContainerFinishedEvent extends LogHandlerEvent {
|
||||||
|
|
||||||
private final ContainerId containerId;
|
private final ContainerId containerId;
|
||||||
private final int exitCode;
|
private final int exitCode;
|
||||||
|
|
||||||
public LogAggregatorContainerFinishedEvent(ContainerId containerId,
|
public LogHandlerContainerFinishedEvent(ContainerId containerId,
|
||||||
int exitCode) {
|
int exitCode) {
|
||||||
super(LogAggregatorEventType.CONTAINER_FINISHED);
|
super(LogHandlerEventType.CONTAINER_FINISHED);
|
||||||
this.containerId = containerId;
|
this.containerId = containerId;
|
||||||
this.exitCode = exitCode;
|
this.exitCode = exitCode;
|
||||||
}
|
}
|
@ -16,14 +16,14 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.event.AbstractEvent;
|
import org.apache.hadoop.yarn.event.AbstractEvent;
|
||||||
|
|
||||||
public class LogAggregatorEvent extends AbstractEvent<LogAggregatorEventType>{
|
public class LogHandlerEvent extends AbstractEvent<LogHandlerEventType>{
|
||||||
|
|
||||||
public LogAggregatorEvent(LogAggregatorEventType type) {
|
public LogHandlerEvent(LogHandlerEventType type) {
|
||||||
super(type);
|
super(type);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
@ -16,8 +16,8 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
|
||||||
|
|
||||||
public enum LogAggregatorEventType {
|
public enum LogHandlerEventType {
|
||||||
APPLICATION_STARTED, CONTAINER_FINISHED, APPLICATION_FINISHED
|
APPLICATION_STARTED, CONTAINER_FINISHED, APPLICATION_FINISHED
|
||||||
}
|
}
|
@ -53,6 +53,14 @@ protected void render(Block html) {
|
|||||||
logEntity = containerId.toString();
|
logEntity = containerId.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
|
||||||
|
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
|
||||||
|
html.h1()
|
||||||
|
._("Aggregation is not enabled. Try the nodemanager at " + nodeId)
|
||||||
|
._();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
Path remoteRootLogDir =
|
Path remoteRootLogDir =
|
||||||
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
||||||
@ -69,7 +77,7 @@ protected void render(Block html) {
|
|||||||
._("Logs not available for "
|
._("Logs not available for "
|
||||||
+ logEntity
|
+ logEntity
|
||||||
+ ". Aggregation may not be complete, "
|
+ ". Aggregation may not be complete, "
|
||||||
+ "Check back later or try the nodemanager on "
|
+ "Check back later or try the nodemanager at "
|
||||||
+ nodeId)._();
|
+ nodeId)._();
|
||||||
return;
|
return;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -86,7 +86,9 @@ public void logs() {
|
|||||||
ApplicationId appId =
|
ApplicationId appId =
|
||||||
containerId.getApplicationAttemptId().getApplicationId();
|
containerId.getApplicationAttemptId().getApplicationId();
|
||||||
Application app = nmContext.getApplications().get(appId);
|
Application app = nmContext.getApplications().get(appId);
|
||||||
if (app == null) {
|
if (app == null
|
||||||
|
&& nmConf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
|
||||||
|
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
|
||||||
String logServerUrl = nmConf.get(YarnConfiguration.YARN_LOG_SERVER_URL);
|
String logServerUrl = nmConf.get(YarnConfiguration.YARN_LOG_SERVER_URL);
|
||||||
String redirectUrl = null;
|
String redirectUrl = null;
|
||||||
if (logServerUrl == null || logServerUrl.isEmpty()) {
|
if (logServerUrl == null || logServerUrl.isEmpty()) {
|
||||||
|
@ -24,11 +24,9 @@
|
|||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
@ -49,8 +47,8 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
|
|
||||||
public class DummyContainerManager extends ContainerManagerImpl {
|
public class DummyContainerManager extends ContainerManagerImpl {
|
||||||
@ -68,6 +66,7 @@ public DummyContainerManager(Context context, ContainerExecutor exec,
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
protected ResourceLocalizationService createResourceLocalizationService(ContainerExecutor exec,
|
protected ResourceLocalizationService createResourceLocalizationService(ContainerExecutor exec,
|
||||||
DeletionService deletionContext) {
|
DeletionService deletionContext) {
|
||||||
return new ResourceLocalizationService(super.dispatcher, exec, deletionContext) {
|
return new ResourceLocalizationService(super.dispatcher, exec, deletionContext) {
|
||||||
@ -123,6 +122,7 @@ public void handle(LocalizationEvent event) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
protected ContainersLauncher createContainersLauncher(Context context,
|
protected ContainersLauncher createContainersLauncher(Context context,
|
||||||
ContainerExecutor exec) {
|
ContainerExecutor exec) {
|
||||||
return new ContainersLauncher(context, super.dispatcher, exec) {
|
return new ContainersLauncher(context, super.dispatcher, exec) {
|
||||||
@ -147,23 +147,23 @@ public void handle(ContainersLauncherEvent event) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected LogAggregationService createLogAggregationService(Context context,
|
protected LogHandler createLogHandler(Configuration conf,
|
||||||
DeletionService deletionService) {
|
Context context, DeletionService deletionService) {
|
||||||
return new LogAggregationService(new AsyncDispatcher(), context,
|
return new LogHandler() {
|
||||||
deletionService) {
|
|
||||||
@Override
|
@Override
|
||||||
public void handle(LogAggregatorEvent event) {
|
public void handle(LogHandlerEvent event) {
|
||||||
switch (event.getType()) {
|
switch (event.getType()) {
|
||||||
case APPLICATION_STARTED:
|
case APPLICATION_STARTED:
|
||||||
break;
|
break;
|
||||||
case CONTAINER_FINISHED:
|
case CONTAINER_FINISHED:
|
||||||
break;
|
break;
|
||||||
case APPLICATION_FINISHED:
|
case APPLICATION_FINISHED:
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
// Ignore
|
// Ignore
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -33,8 +33,8 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
@ -349,7 +349,7 @@ private class WrappedApplication {
|
|||||||
final EventHandler<ContainersMonitorEvent> monitorBus;
|
final EventHandler<ContainersMonitorEvent> monitorBus;
|
||||||
final EventHandler<AuxServicesEvent> auxBus;
|
final EventHandler<AuxServicesEvent> auxBus;
|
||||||
final EventHandler<ContainerEvent> containerBus;
|
final EventHandler<ContainerEvent> containerBus;
|
||||||
final EventHandler<LogAggregatorEvent> logAggregationBus;
|
final EventHandler<LogHandlerEvent> logAggregationBus;
|
||||||
final String user;
|
final String user;
|
||||||
final List<Container> containers;
|
final List<Container> containers;
|
||||||
final Context context;
|
final Context context;
|
||||||
@ -373,7 +373,7 @@ private class WrappedApplication {
|
|||||||
dispatcher.register(ContainersMonitorEventType.class, monitorBus);
|
dispatcher.register(ContainersMonitorEventType.class, monitorBus);
|
||||||
dispatcher.register(AuxServicesEventType.class, auxBus);
|
dispatcher.register(AuxServicesEventType.class, auxBus);
|
||||||
dispatcher.register(ContainerEventType.class, containerBus);
|
dispatcher.register(ContainerEventType.class, containerBus);
|
||||||
dispatcher.register(LogAggregatorEventType.class, logAggregationBus);
|
dispatcher.register(LogHandlerEventType.class, logAggregationBus);
|
||||||
|
|
||||||
context = mock(Context.class);
|
context = mock(Context.class);
|
||||||
|
|
||||||
|
@ -70,9 +70,9 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogReader;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogReader;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -114,7 +114,6 @@ public void testLocalFileDeletionAfterUpload() throws IOException {
|
|||||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||||
this.remoteRootLogDir.getAbsolutePath());
|
this.remoteRootLogDir.getAbsolutePath());
|
||||||
this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true);
|
|
||||||
|
|
||||||
DrainDispatcher dispatcher = createDispatcher();
|
DrainDispatcher dispatcher = createDispatcher();
|
||||||
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
|
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
|
||||||
@ -133,7 +132,7 @@ public void testLocalFileDeletionAfterUpload() throws IOException {
|
|||||||
new File(localLogDir, ConverterUtils.toString(application1));
|
new File(localLogDir, ConverterUtils.toString(application1));
|
||||||
app1LogDir.mkdir();
|
app1LogDir.mkdir();
|
||||||
logAggregationService
|
logAggregationService
|
||||||
.handle(new LogAggregatorAppStartedEvent(
|
.handle(new LogHandlerAppStartedEvent(
|
||||||
application1, this.user, null,
|
application1, this.user, null,
|
||||||
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
|
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
|
||||||
|
|
||||||
@ -143,9 +142,9 @@ public void testLocalFileDeletionAfterUpload() throws IOException {
|
|||||||
// Simulate log-file creation
|
// Simulate log-file creation
|
||||||
writeContainerLogs(app1LogDir, container11);
|
writeContainerLogs(app1LogDir, container11);
|
||||||
logAggregationService.handle(
|
logAggregationService.handle(
|
||||||
new LogAggregatorContainerFinishedEvent(container11, 0));
|
new LogHandlerContainerFinishedEvent(container11, 0));
|
||||||
|
|
||||||
logAggregationService.handle(new LogAggregatorAppFinishedEvent(
|
logAggregationService.handle(new LogHandlerAppFinishedEvent(
|
||||||
application1));
|
application1));
|
||||||
|
|
||||||
logAggregationService.stop();
|
logAggregationService.stop();
|
||||||
@ -169,7 +168,7 @@ public void testLocalFileDeletionAfterUpload() throws IOException {
|
|||||||
ArgumentCaptor<ApplicationEvent> eventCaptor =
|
ArgumentCaptor<ApplicationEvent> eventCaptor =
|
||||||
ArgumentCaptor.forClass(ApplicationEvent.class);
|
ArgumentCaptor.forClass(ApplicationEvent.class);
|
||||||
verify(appEventHandler).handle(eventCaptor.capture());
|
verify(appEventHandler).handle(eventCaptor.capture());
|
||||||
assertEquals(ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
|
assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
|
||||||
eventCaptor.getValue().getType());
|
eventCaptor.getValue().getType());
|
||||||
assertEquals(appAttemptId.getApplicationId(), eventCaptor.getValue()
|
assertEquals(appAttemptId.getApplicationId(), eventCaptor.getValue()
|
||||||
.getApplicationID());
|
.getApplicationID());
|
||||||
@ -182,7 +181,6 @@ public void testNoContainerOnNode() {
|
|||||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||||
this.remoteRootLogDir.getAbsolutePath());
|
this.remoteRootLogDir.getAbsolutePath());
|
||||||
this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true);
|
|
||||||
|
|
||||||
DrainDispatcher dispatcher = createDispatcher();
|
DrainDispatcher dispatcher = createDispatcher();
|
||||||
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
|
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
|
||||||
@ -200,11 +198,11 @@ public void testNoContainerOnNode() {
|
|||||||
new File(localLogDir, ConverterUtils.toString(application1));
|
new File(localLogDir, ConverterUtils.toString(application1));
|
||||||
app1LogDir.mkdir();
|
app1LogDir.mkdir();
|
||||||
logAggregationService
|
logAggregationService
|
||||||
.handle(new LogAggregatorAppStartedEvent(
|
.handle(new LogHandlerAppStartedEvent(
|
||||||
application1, this.user, null,
|
application1, this.user, null,
|
||||||
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
|
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
|
||||||
|
|
||||||
logAggregationService.handle(new LogAggregatorAppFinishedEvent(
|
logAggregationService.handle(new LogHandlerAppFinishedEvent(
|
||||||
application1));
|
application1));
|
||||||
|
|
||||||
logAggregationService.stop();
|
logAggregationService.stop();
|
||||||
@ -217,7 +215,7 @@ public void testNoContainerOnNode() {
|
|||||||
ArgumentCaptor<ApplicationEvent> eventCaptor =
|
ArgumentCaptor<ApplicationEvent> eventCaptor =
|
||||||
ArgumentCaptor.forClass(ApplicationEvent.class);
|
ArgumentCaptor.forClass(ApplicationEvent.class);
|
||||||
verify(appEventHandler).handle(eventCaptor.capture());
|
verify(appEventHandler).handle(eventCaptor.capture());
|
||||||
assertEquals(ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
|
assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
|
||||||
eventCaptor.getValue().getType());
|
eventCaptor.getValue().getType());
|
||||||
verify(appEventHandler).handle(eventCaptor.capture());
|
verify(appEventHandler).handle(eventCaptor.capture());
|
||||||
assertEquals(application1, eventCaptor.getValue()
|
assertEquals(application1, eventCaptor.getValue()
|
||||||
@ -231,7 +229,6 @@ public void testMultipleAppsLogAggregation() throws IOException {
|
|||||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||||
this.remoteRootLogDir.getAbsolutePath());
|
this.remoteRootLogDir.getAbsolutePath());
|
||||||
this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true);
|
|
||||||
|
|
||||||
DrainDispatcher dispatcher = createDispatcher();
|
DrainDispatcher dispatcher = createDispatcher();
|
||||||
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
|
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
|
||||||
@ -249,7 +246,7 @@ public void testMultipleAppsLogAggregation() throws IOException {
|
|||||||
new File(localLogDir, ConverterUtils.toString(application1));
|
new File(localLogDir, ConverterUtils.toString(application1));
|
||||||
app1LogDir.mkdir();
|
app1LogDir.mkdir();
|
||||||
logAggregationService
|
logAggregationService
|
||||||
.handle(new LogAggregatorAppStartedEvent(
|
.handle(new LogHandlerAppStartedEvent(
|
||||||
application1, this.user, null,
|
application1, this.user, null,
|
||||||
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
|
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
|
||||||
|
|
||||||
@ -260,7 +257,7 @@ public void testMultipleAppsLogAggregation() throws IOException {
|
|||||||
// Simulate log-file creation
|
// Simulate log-file creation
|
||||||
writeContainerLogs(app1LogDir, container11);
|
writeContainerLogs(app1LogDir, container11);
|
||||||
logAggregationService.handle(
|
logAggregationService.handle(
|
||||||
new LogAggregatorContainerFinishedEvent(container11, 0));
|
new LogHandlerContainerFinishedEvent(container11, 0));
|
||||||
|
|
||||||
ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2);
|
ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2);
|
||||||
ApplicationAttemptId appAttemptId2 =
|
ApplicationAttemptId appAttemptId2 =
|
||||||
@ -269,7 +266,7 @@ public void testMultipleAppsLogAggregation() throws IOException {
|
|||||||
File app2LogDir =
|
File app2LogDir =
|
||||||
new File(localLogDir, ConverterUtils.toString(application2));
|
new File(localLogDir, ConverterUtils.toString(application2));
|
||||||
app2LogDir.mkdir();
|
app2LogDir.mkdir();
|
||||||
logAggregationService.handle(new LogAggregatorAppStartedEvent(
|
logAggregationService.handle(new LogHandlerAppStartedEvent(
|
||||||
application2, this.user, null,
|
application2, this.user, null,
|
||||||
ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls));
|
ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls));
|
||||||
|
|
||||||
@ -278,13 +275,13 @@ public void testMultipleAppsLogAggregation() throws IOException {
|
|||||||
|
|
||||||
writeContainerLogs(app2LogDir, container21);
|
writeContainerLogs(app2LogDir, container21);
|
||||||
logAggregationService.handle(
|
logAggregationService.handle(
|
||||||
new LogAggregatorContainerFinishedEvent(container21, 0));
|
new LogHandlerContainerFinishedEvent(container21, 0));
|
||||||
|
|
||||||
ContainerId container12 = BuilderUtils.newContainerId(appAttemptId1, 2);
|
ContainerId container12 = BuilderUtils.newContainerId(appAttemptId1, 2);
|
||||||
|
|
||||||
writeContainerLogs(app1LogDir, container12);
|
writeContainerLogs(app1LogDir, container12);
|
||||||
logAggregationService.handle(
|
logAggregationService.handle(
|
||||||
new LogAggregatorContainerFinishedEvent(container12, 0));
|
new LogHandlerContainerFinishedEvent(container12, 0));
|
||||||
|
|
||||||
ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3);
|
ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3);
|
||||||
ApplicationAttemptId appAttemptId3 =
|
ApplicationAttemptId appAttemptId3 =
|
||||||
@ -293,7 +290,7 @@ public void testMultipleAppsLogAggregation() throws IOException {
|
|||||||
File app3LogDir =
|
File app3LogDir =
|
||||||
new File(localLogDir, ConverterUtils.toString(application3));
|
new File(localLogDir, ConverterUtils.toString(application3));
|
||||||
app3LogDir.mkdir();
|
app3LogDir.mkdir();
|
||||||
logAggregationService.handle(new LogAggregatorAppStartedEvent(application3,
|
logAggregationService.handle(new LogHandlerAppStartedEvent(application3,
|
||||||
this.user, null,
|
this.user, null,
|
||||||
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
|
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
|
||||||
|
|
||||||
@ -301,28 +298,28 @@ public void testMultipleAppsLogAggregation() throws IOException {
|
|||||||
ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1);
|
ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1);
|
||||||
writeContainerLogs(app3LogDir, container31);
|
writeContainerLogs(app3LogDir, container31);
|
||||||
logAggregationService.handle(
|
logAggregationService.handle(
|
||||||
new LogAggregatorContainerFinishedEvent(container31, 0));
|
new LogHandlerContainerFinishedEvent(container31, 0));
|
||||||
|
|
||||||
ContainerId container32 = BuilderUtils.newContainerId(appAttemptId3, 2);
|
ContainerId container32 = BuilderUtils.newContainerId(appAttemptId3, 2);
|
||||||
writeContainerLogs(app3LogDir, container32);
|
writeContainerLogs(app3LogDir, container32);
|
||||||
logAggregationService.handle(
|
logAggregationService.handle(
|
||||||
new LogAggregatorContainerFinishedEvent(container32, 1)); // Failed
|
new LogHandlerContainerFinishedEvent(container32, 1)); // Failed
|
||||||
|
|
||||||
ContainerId container22 = BuilderUtils.newContainerId(appAttemptId2, 2);
|
ContainerId container22 = BuilderUtils.newContainerId(appAttemptId2, 2);
|
||||||
writeContainerLogs(app2LogDir, container22);
|
writeContainerLogs(app2LogDir, container22);
|
||||||
logAggregationService.handle(
|
logAggregationService.handle(
|
||||||
new LogAggregatorContainerFinishedEvent(container22, 0));
|
new LogHandlerContainerFinishedEvent(container22, 0));
|
||||||
|
|
||||||
ContainerId container33 = BuilderUtils.newContainerId(appAttemptId3, 3);
|
ContainerId container33 = BuilderUtils.newContainerId(appAttemptId3, 3);
|
||||||
writeContainerLogs(app3LogDir, container33);
|
writeContainerLogs(app3LogDir, container33);
|
||||||
logAggregationService.handle(
|
logAggregationService.handle(
|
||||||
new LogAggregatorContainerFinishedEvent(container33, 0));
|
new LogHandlerContainerFinishedEvent(container33, 0));
|
||||||
|
|
||||||
logAggregationService.handle(new LogAggregatorAppFinishedEvent(
|
logAggregationService.handle(new LogHandlerAppFinishedEvent(
|
||||||
application2));
|
application2));
|
||||||
logAggregationService.handle(new LogAggregatorAppFinishedEvent(
|
logAggregationService.handle(new LogHandlerAppFinishedEvent(
|
||||||
application3));
|
application3));
|
||||||
logAggregationService.handle(new LogAggregatorAppFinishedEvent(
|
logAggregationService.handle(new LogHandlerAppFinishedEvent(
|
||||||
application1));
|
application1));
|
||||||
|
|
||||||
logAggregationService.stop();
|
logAggregationService.stop();
|
||||||
@ -342,7 +339,7 @@ public void testMultipleAppsLogAggregation() throws IOException {
|
|||||||
List<ApplicationEvent> capturedEvents = eventCaptor.getAllValues();
|
List<ApplicationEvent> capturedEvents = eventCaptor.getAllValues();
|
||||||
Set<ApplicationId> appIds = new HashSet<ApplicationId>();
|
Set<ApplicationId> appIds = new HashSet<ApplicationId>();
|
||||||
for (ApplicationEvent cap : capturedEvents) {
|
for (ApplicationEvent cap : capturedEvents) {
|
||||||
assertEquals(ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
|
assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
|
||||||
eventCaptor.getValue().getType());
|
eventCaptor.getValue().getType());
|
||||||
appIds.add(cap.getApplicationID());
|
appIds.add(cap.getApplicationID());
|
||||||
}
|
}
|
||||||
@ -447,7 +444,6 @@ private void verifyContainerLogs(
|
|||||||
public void testLogAggregationForRealContainerLaunch() throws IOException,
|
public void testLogAggregationForRealContainerLaunch() throws IOException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
|
|
||||||
this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true);
|
|
||||||
this.containerManager.start();
|
this.containerManager.start();
|
||||||
|
|
||||||
|
|
||||||
|
@ -0,0 +1,187 @@
|
|||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
||||||
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.exceptions.verification.WantedButNotInvoked;
|
||||||
|
|
||||||
|
public class TestNonAggregatingLogHandler {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testLogDeletion() {
|
||||||
|
DeletionService delService = mock(DeletionService.class);
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
String user = "testuser";
|
||||||
|
|
||||||
|
File[] localLogDirs = new File[2];
|
||||||
|
localLogDirs[0] =
|
||||||
|
new File("target", this.getClass().getName() + "-localLogDir0")
|
||||||
|
.getAbsoluteFile();
|
||||||
|
localLogDirs[1] =
|
||||||
|
new File("target", this.getClass().getName() + "-localLogDir1")
|
||||||
|
.getAbsoluteFile();
|
||||||
|
String localLogDirsString =
|
||||||
|
localLogDirs[0].getAbsolutePath() + ","
|
||||||
|
+ localLogDirs[1].getAbsolutePath();
|
||||||
|
|
||||||
|
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
|
||||||
|
conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
|
||||||
|
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l);
|
||||||
|
|
||||||
|
DrainDispatcher dispatcher = createDispatcher(conf);
|
||||||
|
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
|
||||||
|
dispatcher.register(ApplicationEventType.class, appEventHandler);
|
||||||
|
|
||||||
|
ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
|
||||||
|
ApplicationAttemptId appAttemptId1 =
|
||||||
|
BuilderUtils.newApplicationAttemptId(appId1, 1);
|
||||||
|
ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
|
||||||
|
|
||||||
|
NonAggregatingLogHandler logHandler =
|
||||||
|
new NonAggregatingLogHandler(dispatcher, delService);
|
||||||
|
logHandler.init(conf);
|
||||||
|
logHandler.start();
|
||||||
|
|
||||||
|
logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null,
|
||||||
|
ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
|
||||||
|
|
||||||
|
logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
|
||||||
|
|
||||||
|
logHandler.handle(new LogHandlerAppFinishedEvent(appId1));
|
||||||
|
|
||||||
|
Path[] localAppLogDirs = new Path[2];
|
||||||
|
localAppLogDirs[0] =
|
||||||
|
new Path(localLogDirs[0].getAbsolutePath(), appId1.toString());
|
||||||
|
localAppLogDirs[1] =
|
||||||
|
new Path(localLogDirs[1].getAbsolutePath(), appId1.toString());
|
||||||
|
|
||||||
|
// 5 seconds for the delete which is a separate thread.
|
||||||
|
long verifyStartTime = System.currentTimeMillis();
|
||||||
|
WantedButNotInvoked notInvokedException = null;
|
||||||
|
boolean matched = false;
|
||||||
|
while (!matched && System.currentTimeMillis() < verifyStartTime + 5000l) {
|
||||||
|
try {
|
||||||
|
verify(delService).delete(eq(user), (Path) eq(null),
|
||||||
|
eq(localAppLogDirs[0]), eq(localAppLogDirs[1]));
|
||||||
|
matched = true;
|
||||||
|
} catch (WantedButNotInvoked e) {
|
||||||
|
notInvokedException = e;
|
||||||
|
try {
|
||||||
|
Thread.sleep(50l);
|
||||||
|
} catch (InterruptedException i) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!matched) {
|
||||||
|
throw notInvokedException;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testDelayedDelete() {
|
||||||
|
DeletionService delService = mock(DeletionService.class);
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
String user = "testuser";
|
||||||
|
|
||||||
|
File[] localLogDirs = new File[2];
|
||||||
|
localLogDirs[0] =
|
||||||
|
new File("target", this.getClass().getName() + "-localLogDir0")
|
||||||
|
.getAbsoluteFile();
|
||||||
|
localLogDirs[1] =
|
||||||
|
new File("target", this.getClass().getName() + "-localLogDir1")
|
||||||
|
.getAbsoluteFile();
|
||||||
|
String localLogDirsString =
|
||||||
|
localLogDirs[0].getAbsolutePath() + ","
|
||||||
|
+ localLogDirs[1].getAbsolutePath();
|
||||||
|
|
||||||
|
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
|
||||||
|
conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
|
||||||
|
|
||||||
|
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 10800l);
|
||||||
|
|
||||||
|
DrainDispatcher dispatcher = createDispatcher(conf);
|
||||||
|
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
|
||||||
|
dispatcher.register(ApplicationEventType.class, appEventHandler);
|
||||||
|
|
||||||
|
ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
|
||||||
|
ApplicationAttemptId appAttemptId1 =
|
||||||
|
BuilderUtils.newApplicationAttemptId(appId1, 1);
|
||||||
|
ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
|
||||||
|
|
||||||
|
NonAggregatingLogHandler logHandler =
|
||||||
|
new NonAggregatingLogHandlerWithMockExecutor(dispatcher, delService);
|
||||||
|
logHandler.init(conf);
|
||||||
|
logHandler.start();
|
||||||
|
|
||||||
|
logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null,
|
||||||
|
ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
|
||||||
|
|
||||||
|
logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
|
||||||
|
|
||||||
|
logHandler.handle(new LogHandlerAppFinishedEvent(appId1));
|
||||||
|
|
||||||
|
Path[] localAppLogDirs = new Path[2];
|
||||||
|
localAppLogDirs[0] =
|
||||||
|
new Path(localLogDirs[0].getAbsolutePath(), appId1.toString());
|
||||||
|
localAppLogDirs[1] =
|
||||||
|
new Path(localLogDirs[1].getAbsolutePath(), appId1.toString());
|
||||||
|
|
||||||
|
ScheduledThreadPoolExecutor mockSched =
|
||||||
|
((NonAggregatingLogHandlerWithMockExecutor) logHandler).mockSched;
|
||||||
|
|
||||||
|
verify(mockSched).schedule(any(Runnable.class), eq(10800l),
|
||||||
|
eq(TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
private class NonAggregatingLogHandlerWithMockExecutor extends
|
||||||
|
NonAggregatingLogHandler {
|
||||||
|
|
||||||
|
private ScheduledThreadPoolExecutor mockSched;
|
||||||
|
|
||||||
|
public NonAggregatingLogHandlerWithMockExecutor(Dispatcher dispatcher,
|
||||||
|
DeletionService delService) {
|
||||||
|
super(dispatcher, delService);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(
|
||||||
|
Configuration conf) {
|
||||||
|
mockSched = mock(ScheduledThreadPoolExecutor.class);
|
||||||
|
return mockSched;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private DrainDispatcher createDispatcher(Configuration conf) {
|
||||||
|
DrainDispatcher dispatcher = new DrainDispatcher();
|
||||||
|
dispatcher.init(conf);
|
||||||
|
dispatcher.start();
|
||||||
|
return dispatcher;
|
||||||
|
}
|
||||||
|
}
|
@ -224,18 +224,13 @@ Hadoop MapReduce Next Generation - Cluster Setup
|
|||||||
| | <<<ResourceManager>>> Scheduler class. | |
|
| | <<<ResourceManager>>> Scheduler class. | |
|
||||||
| | | <<<CapacityScheduler>>> (recommended) or <<<FifoScheduler>>> |
|
| | | <<<CapacityScheduler>>> (recommended) or <<<FifoScheduler>>> |
|
||||||
*-------------------------+-------------------------+------------------------+
|
*-------------------------+-------------------------+------------------------+
|
||||||
| <<<yarn.nodemanager.remote-app-log-dir>>> | | |
|
|
||||||
| | </logs> | |
|
|
||||||
| | | HDFS directory where the application logs are moved on application |
|
|
||||||
| | | completion. Need to set appropriate permissions. |
|
|
||||||
*-------------------------+-------------------------+------------------------+
|
|
||||||
| <<<yarn.resourcemanager.nodes.include-path>>> / | | |
|
| <<<yarn.resourcemanager.nodes.include-path>>> / | | |
|
||||||
| <<<yarn.resourcemanager.nodes.exclude-path>>> | | |
|
| <<<yarn.resourcemanager.nodes.exclude-path>>> | | |
|
||||||
| | List of permitted/excluded NodeManagers. | |
|
| | List of permitted/excluded NodeManagers. | |
|
||||||
| | | If necessary, use these files to control the list of allowable |
|
| | | If necessary, use these files to control the list of allowable |
|
||||||
| | | NodeManagers. |
|
| | | NodeManagers. |
|
||||||
*-------------------------+-------------------------+------------------------+
|
*-------------------------+-------------------------+------------------------+
|
||||||
|
|
|
||||||
* Configurations for NodeManager:
|
* Configurations for NodeManager:
|
||||||
|
|
||||||
*-------------------------+-------------------------+------------------------+
|
*-------------------------+-------------------------+------------------------+
|
||||||
@ -263,6 +258,27 @@ Hadoop MapReduce Next Generation - Cluster Setup
|
|||||||
| | are written. | |
|
| | are written. | |
|
||||||
| | | Multiple paths help spread disk i/o. |
|
| | | Multiple paths help spread disk i/o. |
|
||||||
*-------------------------+-------------------------+------------------------+
|
*-------------------------+-------------------------+------------------------+
|
||||||
|
| <<<yarn.nodemanager.log-aggregation-enable>>> | | |
|
||||||
|
| | <false> | |
|
||||||
|
| | | Configuration to enable or disable log aggregation |
|
||||||
|
*-------------------------+-------------------------+------------------------+
|
||||||
|
| <<<yarn.nodemanager.log.retain-seconds>>> | | |
|
||||||
|
| | <10800> | |
|
||||||
|
| | | Default time (in seconds) to retain log files on the NodeManager |
|
||||||
|
| | | Only applicable if log-aggregation is disabled. |
|
||||||
|
*-------------------------+-------------------------+------------------------+
|
||||||
|
| <<<yarn.nodemanager.remote-app-log-dir>>> | | |
|
||||||
|
| | </logs> | |
|
||||||
|
| | | HDFS directory where the application logs are moved on application |
|
||||||
|
| | | completion. Need to set appropriate permissions. |
|
||||||
|
| | | Only applicable if log-aggregation is enabled. |
|
||||||
|
*-------------------------+-------------------------+------------------------+
|
||||||
|
| <<<yarn.nodemanager.remote-app-log-dir-suffix>>> | | |
|
||||||
|
| | <logs> | |
|
||||||
|
| | | Suffix appended to the remote log dir. Logs will be aggregated to |
|
||||||
|
| | | $\{yarn.nodemanager.remote-app-log-dir\}/$\{user\}/$\{thisParam\} |
|
||||||
|
| | | Only applicable if log-aggregation is enabled. |
|
||||||
|
*-------------------------+-------------------------+------------------------+
|
||||||
| <<<yarn.nodemanager.aux-services>>> | | |
|
| <<<yarn.nodemanager.aux-services>>> | | |
|
||||||
| | mapreduce.shuffle | |
|
| | mapreduce.shuffle | |
|
||||||
| | | Shuffle service that needs to be set for Map Reduce applications. |
|
| | | Shuffle service that needs to be set for Map Reduce applications. |
|
||||||
|
Loading…
Reference in New Issue
Block a user