diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 8e421336cc..6af8653aae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -703,7 +703,7 @@ private void sendFinishedEvents() { eventHandler.handle(new ContainerStopMonitoringEvent(containerId)); // Tell the logService too eventHandler.handle(new LogHandlerContainerFinishedEvent( - containerId, exitCode)); + containerId, containerTokenIdentifier.getContainerType(), exitCode)); } @SuppressWarnings("unchecked") // dispatcher not typed diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 1a59e4589a..4938939ae2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; @@ -287,7 +286,8 @@ int getNumAggregators() { return this.appLogAggregators.size(); } - private void stopContainer(ContainerId containerId, int exitCode) { + private void stopContainer(ContainerId containerId, + ContainerType containerType, int exitCode) { // A container is complete. Put this containers' logs up for aggregation if // this containers' logs are needed. @@ -298,14 +298,6 @@ private void stopContainer(ContainerId containerId, int exitCode) { + ", did it fail to start?"); return; } - Container container = context.getContainers().get(containerId); - if (null == container) { - LOG.warn("Log aggregation cannot be started for " + containerId - + ", as its an absent container"); - return; - } - ContainerType containerType = - container.getContainerTokenIdentifier().getContainerType(); aggregator.startContainerLogAggregation( new ContainerLogContext(containerId, containerType, exitCode)); } @@ -344,6 +336,7 @@ public void handle(LogHandlerEvent event) { LogHandlerContainerFinishedEvent containerFinishEvent = (LogHandlerContainerFinishedEvent) event; stopContainer(containerFinishEvent.getContainerId(), + containerFinishEvent.getContainerType(), containerFinishEvent.getExitCode()); break; case APPLICATION_FINISHED: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerFinishedEvent.java index 038006edcf..3f4b6a076f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerFinishedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerFinishedEvent.java @@ -19,16 +19,19 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.ContainerType; public class LogHandlerContainerFinishedEvent extends LogHandlerEvent { private final ContainerId containerId; + private final ContainerType containerType; private final int exitCode; public LogHandlerContainerFinishedEvent(ContainerId containerId, - int exitCode) { + ContainerType containerType, int exitCode) { super(LogHandlerEventType.CONTAINER_FINISHED); this.containerId = containerId; + this.containerType = containerType; this.exitCode = exitCode; } @@ -36,6 +39,10 @@ public ContainerId getContainerId() { return this.containerId; } + public ContainerType getContainerType() { + return containerType; + } + public int getExitCode() { return this.exitCode; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 0fa012c53a..a3e01afcbc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -141,7 +141,6 @@ import com.google.common.base.Supplier; import org.slf4j.LoggerFactory; -//@Ignore public class TestLogAggregationService extends BaseContainerManagerTest { private Map acls = createAppAcls(); @@ -208,13 +207,13 @@ private void verifyLocalFileDeletion( ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(application1, 1); - ContainerId container11 = createContainer(appAttemptId, 1, - ContainerType.APPLICATION_MASTER); + ContainerId container11 = ContainerId.newContainerId(appAttemptId, 1); // Simulate log-file creation writeContainerLogs(app1LogDir, container11, new String[] { "stdout", "stderr", "syslog" }); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container11, 0)); + new LogHandlerContainerFinishedEvent(container11, + ContainerType.APPLICATION_MASTER, 0)); logAggregationService.handle(new LogHandlerAppFinishedEvent( application1)); @@ -332,11 +331,11 @@ public void testNoLogsUploadedOnAppFinish() throws Exception { ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(app, 1); - ContainerId cont = createContainer(appAttemptId, 1, - ContainerType.APPLICATION_MASTER); + ContainerId cont = ContainerId.newContainerId(appAttemptId, 1); writeContainerLogs(appLogDir, cont, new String[] { "stdout", "stderr", "syslog" }); - logAggregationService.handle(new LogHandlerContainerFinishedEvent(cont, 0)); + logAggregationService.handle(new LogHandlerContainerFinishedEvent(cont, + ContainerType.APPLICATION_MASTER, 0)); logAggregationService.handle(new LogHandlerAppFinishedEvent(app)); logAggregationService.stop(); delSrvc.stop(); @@ -421,13 +420,13 @@ public void testMultipleAppsLogAggregation() throws Exception { ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(application1, 1); - ContainerId container11 = createContainer(appAttemptId1, 1, - ContainerType.APPLICATION_MASTER); + ContainerId container11 = ContainerId.newContainerId(appAttemptId1, 1); // Simulate log-file creation writeContainerLogs(app1LogDir, container11, fileNames); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container11, 0)); + new LogHandlerContainerFinishedEvent(container11, + ContainerType.APPLICATION_MASTER, 0)); ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2); ApplicationAttemptId appAttemptId2 = @@ -444,19 +443,19 @@ public void testMultipleAppsLogAggregation() throws Exception { logAggregationService.handle(new LogHandlerAppStartedEvent( application2, this.user, null, this.acls, contextWithAMOnly)); - ContainerId container21 = createContainer(appAttemptId2, 1, - ContainerType.APPLICATION_MASTER); + ContainerId container21 = ContainerId.newContainerId(appAttemptId2, 1); writeContainerLogs(app2LogDir, container21, fileNames); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container21, 0)); + new LogHandlerContainerFinishedEvent(container21, + ContainerType.APPLICATION_MASTER, 0)); - ContainerId container12 = createContainer(appAttemptId1, 2, - ContainerType.TASK); + ContainerId container12 = ContainerId.newContainerId(appAttemptId1, 2); writeContainerLogs(app1LogDir, container12, fileNames); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container12, 0)); + new LogHandlerContainerFinishedEvent(container12, + ContainerType.TASK, 0)); ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3); ApplicationAttemptId appAttemptId3 = @@ -488,29 +487,29 @@ public void testMultipleAppsLogAggregation() throws Exception { checkEvents(appEventHandler, expectedInitEvents, false, "getType", "getApplicationID"); reset(appEventHandler); - ContainerId container31 = createContainer(appAttemptId3, 1, - ContainerType.APPLICATION_MASTER); + ContainerId container31 = ContainerId.newContainerId(appAttemptId3, 1); writeContainerLogs(app3LogDir, container31, fileNames); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container31, 0)); + new LogHandlerContainerFinishedEvent(container31, + ContainerType.APPLICATION_MASTER, 0)); - ContainerId container32 = createContainer(appAttemptId3, 2, - ContainerType.TASK); + ContainerId container32 = ContainerId.newContainerId(appAttemptId3, 2); writeContainerLogs(app3LogDir, container32, fileNames); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container32, 1)); // Failed + new LogHandlerContainerFinishedEvent(container32, + ContainerType.TASK, 1)); // Failed - ContainerId container22 = createContainer(appAttemptId2, 2, - ContainerType.TASK); + ContainerId container22 = ContainerId.newContainerId(appAttemptId2, 2); writeContainerLogs(app2LogDir, container22, fileNames); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container22, 0)); + new LogHandlerContainerFinishedEvent(container22, + ContainerType.TASK, 0)); - ContainerId container33 = createContainer(appAttemptId3, 3, - ContainerType.TASK); + ContainerId container33 = ContainerId.newContainerId(appAttemptId3, 3); writeContainerLogs(app3LogDir, container33, fileNames); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container33, 0)); + new LogHandlerContainerFinishedEvent(container33, + ContainerType.TASK, 0)); logAggregationService.handle(new LogHandlerAppFinishedEvent( application2)); @@ -800,7 +799,8 @@ public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception { // verify trying to collect logs for containers/apps we don't know about // doesn't blow up and tear down the NM logAggregationService.handle(new LogHandlerContainerFinishedEvent( - BuilderUtils.newContainerId(4, 1, 1, 1), 0)); + BuilderUtils.newContainerId(4, 1, 1, 1), + ContainerType.APPLICATION_MASTER, 0)); dispatcher.await(); logAggregationService.handle(new LogHandlerAppFinishedEvent( BuilderUtils.newApplicationId(1, 5))); @@ -866,7 +866,8 @@ public LogAggregationFileController getLogAggregationFileController( // verify trying to collect logs for containers/apps we don't know about // doesn't blow up and tear down the NM logAggregationService.handle(new LogHandlerContainerFinishedEvent( - BuilderUtils.newContainerId(4, 1, 1, 1), 0)); + BuilderUtils.newContainerId(4, 1, 1, 1), + ContainerType.APPLICATION_MASTER, 0)); dispatcher.await(); logAggregationService.handle(new LogHandlerAppFinishedEvent( BuilderUtils.newApplicationId(1, 5))); @@ -1528,14 +1529,13 @@ public void testLogAggregationServiceWithPatterns() throws Exception { ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(application1, 1); - ContainerId container1 = createContainer(appAttemptId1, 1, - ContainerType.APPLICATION_MASTER); + ContainerId container1 = ContainerId.newContainerId(appAttemptId1, 1); // Simulate log-file creation writeContainerLogs(appLogDir1, container1, new String[] { "stdout", "stderr", "syslog" }); logAggregationService.handle(new LogHandlerContainerFinishedEvent( - container1, 0)); + container1, ContainerType.APPLICATION_MASTER, 0)); // LogContext for application2 has excludePatten which includes // stdout and syslog. @@ -1551,13 +1551,13 @@ public void testLogAggregationServiceWithPatterns() throws Exception { AMOnlyLogAggregationPolicy.class.getName()); logAggregationService.handle(new LogHandlerAppStartedEvent(application2, this.user, null, this.acls, LogAggregationContextWithExcludePatterns)); - ContainerId container2 = createContainer(appAttemptId2, 1, - ContainerType.APPLICATION_MASTER); + ContainerId container2 = ContainerId.newContainerId(appAttemptId2, 1); writeContainerLogs(app2LogDir, container2, new String[] { "stdout", "stderr", "syslog" }); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container2, 0)); + new LogHandlerContainerFinishedEvent(container2, + ContainerType.APPLICATION_MASTER, 0)); // LogContext for application3 has includePattern which is *.log and // excludePatten which includes std.log and sys.log. @@ -1576,12 +1576,12 @@ public void testLogAggregationServiceWithPatterns() throws Exception { AMOnlyLogAggregationPolicy.class.getName()); logAggregationService.handle(new LogHandlerAppStartedEvent(application3, this.user, null, this.acls, context1)); - ContainerId container3 = createContainer(appAttemptId3, 1, - ContainerType.APPLICATION_MASTER); + ContainerId container3 = ContainerId.newContainerId(appAttemptId3, 1); writeContainerLogs(app3LogDir, container3, new String[] { "stdout", "sys.log", "std.log", "out.log", "err.log", "log" }); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container3, 0)); + new LogHandlerContainerFinishedEvent(container3, + ContainerType.APPLICATION_MASTER, 0)); // LogContext for application4 has includePattern // which includes std.log and sys.log and @@ -1601,12 +1601,12 @@ public void testLogAggregationServiceWithPatterns() throws Exception { AMOnlyLogAggregationPolicy.class.getName()); logAggregationService.handle(new LogHandlerAppStartedEvent(application4, this.user, null, this.acls, context2)); - ContainerId container4 = createContainer(appAttemptId4, 1, - ContainerType.APPLICATION_MASTER); + ContainerId container4 = ContainerId.newContainerId(appAttemptId4, 1); writeContainerLogs(app4LogDir, container4, new String[] { "stdout", "sys.log", "std.log", "out.log", "err.log", "log" }); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container4, 0)); + new LogHandlerContainerFinishedEvent(container4, + ContainerType.APPLICATION_MASTER, 0)); dispatcher.await(); ApplicationEvent expectedInitEvents[] = @@ -1733,7 +1733,8 @@ public void testLogAggregationServiceWithPatternsAndIntervals() new ContainerId[] {container}, logFiles, 1, true); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container, 0)); + new LogHandlerContainerFinishedEvent(container, + ContainerType.APPLICATION_MASTER, 0)); dispatcher.await(); @@ -1857,14 +1858,8 @@ public void testLogAggregationAbsentContainer() throws Exception { ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(appId, 1); ContainerId containerId = BuilderUtils.newContainerId(appAttemptId1, 2l); - try { - logAggregationService.handle(new LogHandlerContainerFinishedEvent( - containerId, 100)); - assertTrue("Should skip when null containerID", true); - } catch (Exception e) { - Assert.assertFalse("Exception not expected should skip null containerid", - true); - } + logAggregationService.handle(new LogHandlerContainerFinishedEvent( + containerId, ContainerType.APPLICATION_MASTER, 100)); } @Test (timeout = 50000) @@ -2189,8 +2184,7 @@ private ContainerId finishContainer(ApplicationId application1, long cId, int exitCode, String[] logFiles) throws IOException { ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(application1, 1); - ContainerId containerId = createContainer(appAttemptId1, cId, - containerType); + ContainerId containerId = ContainerId.newContainerId(appAttemptId1, cId); // Simulate log-file creation File appLogDir1 = new File(localLogDir, application1.toString()); @@ -2198,7 +2192,7 @@ private ContainerId finishContainer(ApplicationId application1, writeContainerLogs(appLogDir1, containerId, logFiles); logAggregationService.handle(new LogHandlerContainerFinishedEvent( - containerId, exitCode)); + containerId, containerType, exitCode)); return containerId; } @@ -2388,7 +2382,8 @@ private void testLogAggregationService(boolean retentionSizeLimitation) writeContainerLogs(appLogDir, container, logFiles3); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container, 0)); + new LogHandlerContainerFinishedEvent(container, + ContainerType.APPLICATION_MASTER, 0)); dispatcher.await(); logAggregationService.handle(new LogHandlerAppFinishedEvent(application)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java index 7a4ea88760..c6fa16df10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; @@ -156,7 +157,8 @@ public void testLogDeletion() throws IOException { logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null)); - logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); + logHandler.handle(new LogHandlerContainerFinishedEvent(container11, + ContainerType.APPLICATION_MASTER, 0)); logHandler.handle(new LogHandlerAppFinishedEvent(appId)); @@ -196,7 +198,8 @@ public void testDelayedDelete() throws IOException { logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null)); - logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); + logHandler.handle(new LogHandlerContainerFinishedEvent(container11, + ContainerType.APPLICATION_MASTER, 0)); logHandler.handle(new LogHandlerAppFinishedEvent(appId)); @@ -365,7 +368,8 @@ public void testRecovery() throws Exception { logHandler.start(); logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null)); - logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); + logHandler.handle(new LogHandlerContainerFinishedEvent(container11, + ContainerType.APPLICATION_MASTER, 0)); logHandler.handle(new LogHandlerAppFinishedEvent(appId)); // simulate a restart and verify deletion is rescheduled