From bec0864394fbf30d7979bb7359dc0b5403731c0c Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Wed, 25 Sep 2019 10:28:34 +0200 Subject: [PATCH] YARN-9808. Zero length files in container log output haven't got a header. Contributed by Adam Antal --- .../hadoop/yarn/client/cli/TestLogsCLI.java | 59 ++++++++-- .../yarn/logaggregation/LogToolUtils.java | 60 +++++----- .../TestAggregatedLogFormat.java | 42 ++++++- ...stLogAggregationIndexedFileController.java | 31 ++++- .../TestLogAggregationService.java | 106 +++++++++++------- .../nodemanager/webapp/TestNMWebServices.java | 43 ++++--- 6 files changed, 239 insertions(+), 102 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 7a229dc992..03bad0d642 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -36,6 +36,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import com.google.common.collect.ImmutableList; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse.Status; @@ -52,6 +53,7 @@ import java.net.URL; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -93,9 +95,14 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestLogsCLI { + private static final Logger LOG = + LoggerFactory.getLogger(TestLogsCLI.class); + ByteArrayOutputStream sysOutStream; private PrintStream sysOut; @@ -402,13 +409,15 @@ public void testFetchFinishedApplictionLogs() throws Exception { List logTypes = new ArrayList(); logTypes.add("syslog"); // create container logs in localLogDir - createContainerLogInLocalDir(appLogsDir, containerId1, fs, logTypes); - createContainerLogInLocalDir(appLogsDir, containerId2, fs, logTypes); - + createContainerLogInLocalDir(appLogsDir, containerId1, fs, logTypes, + ImmutableList.of("empty")); + createContainerLogInLocalDir(appLogsDir, containerId2, fs, logTypes, + Collections.emptyList()); // create two logs for container3 in localLogDir logTypes.add("stdout"); logTypes.add("stdout1234"); - createContainerLogInLocalDir(appLogsDir, containerId3, fs, logTypes); + createContainerLogInLocalDir(appLogsDir, containerId3, fs, logTypes, + Collections.emptyList()); Path path = new Path(remoteLogRootDir + ugi.getShortUserName() @@ -449,6 +458,7 @@ public ContainerReport getContainerReport(String containerIdStr) cli.setConf(configuration); int exitCode = cli.run(new String[] { "-applicationId", appId.toString() }); + LOG.info(sysOutStream.toString()); assertTrue(exitCode == 0); assertTrue(sysOutStream.toString().contains( logMessage(containerId1, "syslog"))); @@ -460,6 +470,8 @@ public ContainerReport getContainerReport(String containerIdStr) logMessage(containerId3, "stdout"))); assertTrue(sysOutStream.toString().contains( logMessage(containerId3, "stdout1234"))); + assertTrue(sysOutStream.toString().contains( + createEmptyLog("empty"))); sysOutStream.reset(); exitCode = cli.run(new String[] {"-applicationId", appId.toString(), @@ -475,6 +487,8 @@ public ContainerReport getContainerReport(String containerIdStr) logMessage(containerId3, "stdout"))); assertTrue(sysOutStream.toString().contains( logMessage(containerId3, "stdout1234"))); + assertTrue(sysOutStream.toString().contains( + createEmptyLog("empty"))); sysOutStream.reset(); exitCode = cli.run(new String[] {"-applicationId", appId.toString(), @@ -490,6 +504,8 @@ public ContainerReport getContainerReport(String containerIdStr) logMessage(containerId3, "stdout"))); assertTrue(sysOutStream.toString().contains( logMessage(containerId3, "stdout1234"))); + assertTrue(sysOutStream.toString().contains( + createEmptyLog("empty"))); int fullSize = sysOutStream.toByteArray().length; sysOutStream.reset(); @@ -506,6 +522,8 @@ public ContainerReport getContainerReport(String containerIdStr) logMessage(containerId3, "stdout"))); assertFalse(sysOutStream.toString().contains( logMessage(containerId3, "stdout1234"))); + assertFalse(sysOutStream.toString().contains( + createEmptyLog("empty"))); sysOutStream.reset(); exitCode = cli.run(new String[] {"-applicationId", appId.toString(), @@ -521,6 +539,8 @@ public ContainerReport getContainerReport(String containerIdStr) logMessage(containerId3, "stdout"))); assertTrue(sysOutStream.toString().contains( logMessage(containerId3, "stdout1234"))); + assertFalse(sysOutStream.toString().contains( + createEmptyLog("empty"))); sysOutStream.reset(); exitCode = cli.run(new String[] {"-applicationId", appId.toString(), @@ -591,6 +611,15 @@ public ContainerReport getContainerReport(String containerIdStr) (fullContextSize - fileContentSize - tailContentSize), 5)); sysOutStream.reset(); + // specify how many bytes we should get from an empty log + exitCode = cli.run(new String[] {"-applicationId", appId.toString(), + "-containerId", containerId1.toString(), "-log_files", "empty", + "-size", "5"}); + assertTrue(exitCode == 0); + assertTrue(sysOutStream.toString().contains( + createEmptyLog("empty"))); + sysOutStream.reset(); + // specify a negative number, it would get the last n bytes from // container log exitCode = cli.run(new String[] {"-applicationId", appId.toString(), @@ -794,7 +823,8 @@ public void testGetRunningContainerLogs() throws Exception { List logTypes = new ArrayList(); logTypes.add(fileName); // create container logs in localLogDir - createContainerLogInLocalDir(appLogsDir, containerId1, fs, logTypes); + createContainerLogInLocalDir(appLogsDir, containerId1, fs, logTypes, + Collections.emptyList()); Path containerDirPath = new Path(appLogsDir, containerId1.toString()); Path logPath = new Path(containerDirPath, fileName); @@ -968,7 +998,8 @@ public void testFetchApplictionLogsAsAnotherUser() throws Exception { logTypes.add("syslog"); // create container logs in localLogDir for app - createContainerLogInLocalDir(appLogsDir, containerId, fs, logTypes); + createContainerLogInLocalDir(appLogsDir, containerId, fs, logTypes, + Collections.emptyList()); // create the remote app dir for app but for a different user testUser Path path = new Path(remoteLogRootDir + testUser + @@ -1547,7 +1578,8 @@ private void createContainerLogs(Configuration configuration, logTypes.add("syslog"); // create container logs in localLogDir for (ContainerId containerId : containerIds) { - createContainerLogInLocalDir(appLogsDir, containerId, fs, logTypes); + createContainerLogInLocalDir(appLogsDir, containerId, fs, logTypes, + Collections.emptyList()); } Path path = new Path(remoteLogRootDir + ugi.getShortUserName() @@ -1564,7 +1596,8 @@ private void createContainerLogs(Configuration configuration, } private static void createContainerLogInLocalDir(Path appLogsDir, - ContainerId containerId, FileSystem fs, List logTypes) throws Exception { + ContainerId containerId, FileSystem fs, List logTypes, + List emptyLogTypes) throws Exception { Path containerLogsDir = new Path(appLogsDir, containerId.toString()); if (fs.exists(containerLogsDir)) { fs.delete(containerLogsDir, true); @@ -1576,6 +1609,12 @@ private static void createContainerLogInLocalDir(Path appLogsDir, writer.write(logMessage(containerId, logType)); writer.close(); } + for (String emptyLogType : emptyLogTypes) { + Writer writer = + new FileWriter(new File(containerLogsDir.toString(), emptyLogType)); + writer.write(""); + writer.close(); + } } private static String logMessage(ContainerId containerId, String logType) { @@ -1584,6 +1623,10 @@ private static String logMessage(ContainerId containerId, String logType) { return sb.toString(); } + private static String createEmptyLog(String logType) { + return "LogContents:\n\nEnd of LogType:" + logType; + } + private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi, Configuration configuration, List rootLogDirs, NodeId nodeId, ContainerId containerId, Path appDir, FileSystem fs) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java index c242d89848..5bc0b14e69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java @@ -43,6 +43,26 @@ private LogToolUtils() {} public static final String CONTAINER_ON_NODE_PATTERN = "Container: %s on %s"; + /** + * Formats the header of an aggregated log file. + */ + private static byte[] formatContainerLogHeader(String containerId, + String nodeId, ContainerLogAggregationType logType, String fileName, + String lastModifiedTime, long fileLength) { + StringBuilder sb = new StringBuilder(); + String containerStr = String.format( + LogToolUtils.CONTAINER_ON_NODE_PATTERN, + containerId, nodeId); + sb.append(containerStr + "\n") + .append("LogAggregationType: " + logType + "\n") + .append(StringUtils.repeat("=", containerStr.length()) + "\n") + .append("LogType:" + fileName + "\n") + .append("LogLastModifiedTime:" + lastModifiedTime + "\n") + .append("LogLength:" + fileLength + "\n") + .append("LogContents:\n"); + return sb.toString().getBytes(Charset.forName("UTF-8")); + } + /** * Output container log. * @param containerId the containerId @@ -84,22 +104,10 @@ public static void outputContainerLog(String containerId, String nodeId, : (int) pendingRead; int len = fis.read(buf, 0, toRead); boolean keepGoing = (len != -1 && curRead < totalBytesToRead); - if (keepGoing) { - StringBuilder sb = new StringBuilder(); - String containerStr = String.format( - LogToolUtils.CONTAINER_ON_NODE_PATTERN, - containerId, nodeId); - sb.append(containerStr + "\n") - .append("LogAggregationType: " + logType + "\n") - .append(StringUtils.repeat("=", containerStr.length()) + "\n") - .append("LogType:" + fileName + "\n") - .append("LogLastModifiedTime:" + lastModifiedTime + "\n") - .append("LogLength:" + Long.toString(fileLength) + "\n") - .append("LogContents:\n"); - byte[] b = sb.toString().getBytes( - Charset.forName("UTF-8")); - os.write(b, 0, b.length); - } + + byte[] b = formatContainerLogHeader(containerId, nodeId, logType, fileName, + lastModifiedTime, fileLength); + os.write(b, 0, b.length); while (keepGoing) { os.write(buf, 0, len); curRead += len; @@ -132,22 +140,12 @@ public static void outputContainerLogThroughZeroCopy(String containerId, } } + // output log summary + byte[] b = formatContainerLogHeader(containerId, nodeId, logType, fileName, + lastModifiedTime, fileLength); + os.write(b, 0, b.length); + if (totalBytesToRead > 0) { - // output log summary - StringBuilder sb = new StringBuilder(); - String containerStr = String.format( - LogToolUtils.CONTAINER_ON_NODE_PATTERN, - containerId, nodeId); - sb.append(containerStr + "\n") - .append("LogAggregationType: " + logType + "\n") - .append(StringUtils.repeat("=", containerStr.length()) + "\n") - .append("LogType:" + fileName + "\n") - .append("LogLastModifiedTime:" + lastModifiedTime + "\n") - .append("LogLength:" + Long.toString(fileLength) + "\n") - .append("LogContents:\n"); - byte[] b = sb.toString().getBytes( - Charset.forName("UTF-8")); - os.write(b, 0, b.length); // output log content FileChannel inputChannel = fis.getChannel(); WritableByteChannel outputChannel = Channels.newChannel(os); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java index 6c26c40290..9ae2983ca0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java @@ -67,7 +67,6 @@ public class TestAggregatedLogFormat { private static final File testWorkDir = new File("target", "TestAggregatedLogFormat"); - private static final Configuration conf = new Configuration(); private static final FileSystem fs; private static final char filler = 'x'; private static final Logger LOG = LoggerFactory @@ -75,7 +74,7 @@ public class TestAggregatedLogFormat { static { try { - fs = FileSystem.get(conf); + fs = FileSystem.get(new Configuration()); } catch (IOException e) { throw new RuntimeException(e); } @@ -282,6 +281,45 @@ private void testReadAcontainerLog(boolean logUploadedTime) throws Exception { Assert.assertEquals(expectedLength, s.length()); } + @Test + public void testZeroLengthLog() throws IOException { + Configuration conf = new Configuration(); + File workDir = new File(testWorkDir, "testZeroLength"); + Path remoteAppLogFile = new Path(workDir.getAbsolutePath(), + "aggregatedLogFile"); + Path srcFileRoot = new Path(workDir.getAbsolutePath(), "srcFiles"); + ContainerId testContainerId = TestContainerId.newContainerId(1, 1, 1, 1); + Path t = new Path(srcFileRoot, testContainerId.getApplicationAttemptId() + .getApplicationId().toString()); + Path srcFilePath = new Path(t, testContainerId.toString()); + + // Create zero byte file + writeSrcFile(srcFilePath, "stdout", 0); + + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + try (LogWriter logWriter = new LogWriter()) { + logWriter.initialize(conf, remoteAppLogFile, ugi); + + LogKey logKey = new LogKey(testContainerId); + LogValue logValue = + new LogValue(Collections.singletonList(srcFileRoot.toString()), + testContainerId, ugi.getShortUserName()); + + logWriter.append(logKey, logValue); + } + + LogReader logReader = new LogReader(conf, remoteAppLogFile); + LogKey rLogKey = new LogKey(); + DataInputStream dis = logReader.next(rLogKey); + Writer writer = new StringWriter(); + LogReader.readAcontainerLogs(dis, writer); + + Assert.assertEquals("LogType:stdout\n" + + "LogLength:0\n" + + "Log Contents:\n\n" + + "End of LogType:stdout\n\n", writer.toString()); + } + @Test(timeout=10000) public void testContainerLogsFileAccess() throws IOException { // This test will run only if NativeIO is enabled as SecureIOUtils diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java index e63e469d98..098f3be4ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java @@ -85,6 +85,7 @@ public class TestLogAggregationIndexedFileController .createImmutable((short) (0777)); private static final UserGroupInformation USER_UGI = UserGroupInformation .createRemoteUser("testUser"); + private static final String ZERO_FILE = "zero"; private FileSystem fs; private ApplicationId appId; private ContainerId containerId; @@ -153,6 +154,8 @@ public void testLogAggregationIndexFileFormat() throws Exception { logType); files.add(file); } + files.add(createZeroLocalLogFile(appLogsDir)); + LogValue value = mock(LogValue.class); when(value.getPendingLogFilesToUploadForThisContainer()).thenReturn(files); @@ -212,12 +215,13 @@ public boolean isRollover(final FileContext fc, for (ContainerLogMeta log : meta) { assertEquals(containerId.toString(), log.getContainerId()); assertEquals(nodeId.toString(), log.getNodeId()); - assertEquals(3, log.getContainerLogMeta().size()); + assertEquals(4, log.getContainerLogMeta().size()); for (ContainerLogFileInfo file : log.getContainerLogMeta()) { fileNames.add(file.getFileName()); } } fileNames.removeAll(logTypes); + fileNames.remove(ZERO_FILE); assertTrue(fileNames.isEmpty()); boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); @@ -226,6 +230,7 @@ public boolean isRollover(final FileContext fc, assertTrue(sysOutStream.toString().contains(logMessage( containerId, logType))); } + assertZeroFileIsContained(sysOutStream.toString()); sysOutStream.reset(); Configuration factoryConf = new Configuration(getConf()); @@ -297,12 +302,13 @@ public boolean isRollover(final FileContext fc, for (ContainerLogMeta log : meta) { assertEquals(containerId.toString(), log.getContainerId()); assertEquals(nodeId.toString(), log.getNodeId()); - assertEquals(3, log.getContainerLogMeta().size()); + assertEquals(4, log.getContainerLogMeta().size()); for (ContainerLogFileInfo file : log.getContainerLogMeta()) { fileNames.add(file.getFileName()); } } fileNames.removeAll(logTypes); + fileNames.remove(ZERO_FILE); assertTrue(fileNames.isEmpty()); foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); assertTrue(foundLogs); @@ -333,6 +339,7 @@ public boolean isRollover(final FileContext fc, } } fileNames.removeAll(newLogTypes); + fileNames.remove(ZERO_FILE); assertTrue(fileNames.isEmpty()); foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); assertTrue(foundLogs); @@ -361,6 +368,7 @@ public boolean isRollover(final FileContext fc, } } fileNames.removeAll(newLogTypes); + fileNames.remove(ZERO_FILE); assertTrue(fileNames.isEmpty()); foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); assertTrue(foundLogs); @@ -423,8 +431,25 @@ public void testFetchApplictionLogsHar() throws Exception { sysOutStream.reset(); } + private void assertZeroFileIsContained(String outStream) { + assertTrue(outStream.contains( + "LogContents:\n" + + "\n" + + "End of LogType:zero")); + } + + private File createZeroLocalLogFile(Path localLogDir) throws IOException { + return createAndWriteLocalLogFile(localLogDir, ZERO_FILE, ""); + } + private File createAndWriteLocalLogFile(ContainerId containerId, Path localLogDir, String logType) throws IOException { + return createAndWriteLocalLogFile(localLogDir, logType, + logMessage(containerId, logType)); + } + + private File createAndWriteLocalLogFile(Path localLogDir, String logType, + String message) throws IOException { File file = new File(localLogDir.toString(), logType); if (file.exists()) { file.delete(); @@ -433,7 +458,7 @@ private File createAndWriteLocalLogFile(ContainerId containerId, Writer writer = null; try { writer = new FileWriter(file); - writer.write(logMessage(containerId, logType)); + writer.write(message); writer.close(); return file; } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 063215ee80..0a2d63e08e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -153,6 +153,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { private Map acls = createAppAcls(); + private static final String[] EMPTY_FILES = new String[] {"zero"}; static { LOG = LoggerFactory.getLogger(TestLogAggregationService.class); @@ -219,7 +220,7 @@ private void verifyLocalFileDeletion( ContainerId container11 = ContainerId.newContainerId(appAttemptId, 1); // Simulate log-file creation writeContainerLogs(app1LogDir, container11, new String[] { "stdout", - "stderr", "syslog" }); + "stderr", "syslog" }, EMPTY_FILES); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container11, ContainerType.APPLICATION_MASTER, 0)); @@ -342,7 +343,7 @@ public void testNoLogsUploadedOnAppFinish() throws Exception { BuilderUtils.newApplicationAttemptId(app, 1); ContainerId cont = ContainerId.newContainerId(appAttemptId, 1); writeContainerLogs(appLogDir, cont, new String[] { "stdout", - "stderr", "syslog" }); + "stderr", "syslog" }, EMPTY_FILES); logAggregationService.handle(new LogHandlerContainerFinishedEvent(cont, ContainerType.APPLICATION_MASTER, 0)); logAggregationService.handle(new LogHandlerAppFinishedEvent(app)); @@ -432,7 +433,7 @@ public void testMultipleAppsLogAggregation() throws Exception { ContainerId container11 = ContainerId.newContainerId(appAttemptId1, 1); // Simulate log-file creation - writeContainerLogs(app1LogDir, container11, fileNames); + writeContainerLogs(app1LogDir, container11, fileNames, EMPTY_FILES); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container11, ContainerType.APPLICATION_MASTER, 0)); @@ -454,14 +455,14 @@ public void testMultipleAppsLogAggregation() throws Exception { ContainerId container21 = ContainerId.newContainerId(appAttemptId2, 1); - writeContainerLogs(app2LogDir, container21, fileNames); + writeContainerLogs(app2LogDir, container21, fileNames, EMPTY_FILES); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container21, ContainerType.APPLICATION_MASTER, 0)); ContainerId container12 = ContainerId.newContainerId(appAttemptId1, 2); - writeContainerLogs(app1LogDir, container12, fileNames); + writeContainerLogs(app1LogDir, container12, fileNames, EMPTY_FILES); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container12, ContainerType.TASK, 0)); @@ -497,25 +498,25 @@ public void testMultipleAppsLogAggregation() throws Exception { reset(appEventHandler); ContainerId container31 = ContainerId.newContainerId(appAttemptId3, 1); - writeContainerLogs(app3LogDir, container31, fileNames); + writeContainerLogs(app3LogDir, container31, fileNames, EMPTY_FILES); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container31, ContainerType.APPLICATION_MASTER, 0)); ContainerId container32 = ContainerId.newContainerId(appAttemptId3, 2); - writeContainerLogs(app3LogDir, container32, fileNames); + writeContainerLogs(app3LogDir, container32, fileNames, EMPTY_FILES); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container32, ContainerType.TASK, 1)); // Failed ContainerId container22 = ContainerId.newContainerId(appAttemptId2, 2); - writeContainerLogs(app2LogDir, container22, fileNames); + writeContainerLogs(app2LogDir, container22, fileNames, EMPTY_FILES); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container22, ContainerType.TASK, 0)); ContainerId container33 = ContainerId.newContainerId(appAttemptId3, 3); - writeContainerLogs(app3LogDir, container33, fileNames); + writeContainerLogs(app3LogDir, container33, fileNames, EMPTY_FILES); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container33, ContainerType.TASK, 0)); @@ -531,13 +532,15 @@ public void testMultipleAppsLogAggregation() throws Exception { assertEquals(0, logAggregationService.getNumAggregators()); verifyContainerLogs(logAggregationService, application1, - new ContainerId[] { container11, container12 }, fileNames, 3, false); + new ContainerId[] {container11, container12}, fileNames, 4, false, + EMPTY_FILES); verifyContainerLogs(logAggregationService, application2, - new ContainerId[] { container21 }, fileNames, 3, false); + new ContainerId[] {container21}, fileNames, 4, false, EMPTY_FILES); verifyContainerLogs(logAggregationService, application3, - new ContainerId[] { container31, container32 }, fileNames, 3, false); + new ContainerId[] {container31, container32}, fileNames, 4, false, + EMPTY_FILES); dispatcher.await(); @@ -935,7 +938,7 @@ public LogAggregationFileController getLogAggregationFileController( } private void writeContainerLogs(File appLogDir, ContainerId containerId, - String[] fileName) throws IOException { + String[] fileName, String[] emptyFiles) throws IOException { // ContainerLogDir should be created String containerStr = containerId.toString(); File containerLogDir = new File(appLogDir, containerStr); @@ -947,17 +950,22 @@ private void writeContainerLogs(File appLogDir, ContainerId containerId, writer11.write(containerStr + " Hello " + fileType + "!"); writer11.close(); } + for (String emptyFile : emptyFiles) { + Writer writer11 = new FileWriter(new File(containerLogDir, emptyFile)); + writer11.write(""); + writer11.close(); + } } private LogFileStatusInLastCycle verifyContainerLogs( LogAggregationService logAggregationService, ApplicationId appId, ContainerId[] expectedContainerIds, String[] logFiles, int numOfLogsPerContainer, - boolean multiLogs) throws IOException { + boolean multiLogs, String[] zeroLengthFiles) throws IOException { return verifyContainerLogs(logAggregationService, appId, expectedContainerIds, expectedContainerIds.length, expectedContainerIds.length, logFiles, numOfLogsPerContainer, - multiLogs); + multiLogs, zeroLengthFiles); } // expectedContainerIds is the minimal set of containers to check. @@ -968,7 +976,8 @@ private LogFileStatusInLastCycle verifyContainerLogs( LogAggregationService logAggregationService, ApplicationId appId, ContainerId[] expectedContainerIds, int minNumOfContainers, int maxNumOfContainers, - String[] logFiles, int numOfLogsPerContainer, boolean multiLogs) + String[] logFiles, int numOfLogsPerContainer, boolean multiLogs, + String[] zeroLengthLogFiles) throws IOException { Path appLogDir = logAggregationService.getLogAggregationFileController( conf).getRemoteAppLogDir(appId, this.user); @@ -1089,6 +1098,11 @@ private LogFileStatusInLastCycle verifyContainerLogs( + " not present in aggregated log-file!", foundValue); Assert.assertEquals(expectedValue, foundValue); } + for (String emptyFile : zeroLengthLogFiles) { + String foundValue = thisContainerMap.remove(emptyFile); + String expectedValue = "\nEnd of LogType:" + emptyFile; + Assert.assertEquals(expectedValue, foundValue); + } Assert.assertEquals(0, thisContainerMap.size()); } Assert.assertTrue("number of remaining containers should be at least " + @@ -1584,7 +1598,7 @@ public void testLogAggregationServiceWithPatterns() throws Exception { // Simulate log-file creation writeContainerLogs(appLogDir1, container1, new String[] { "stdout", - "stderr", "syslog" }); + "stderr", "syslog" }, EMPTY_FILES); logAggregationService.handle(new LogHandlerContainerFinishedEvent( container1, ContainerType.APPLICATION_MASTER, 0)); @@ -1605,7 +1619,7 @@ public void testLogAggregationServiceWithPatterns() throws Exception { ContainerId container2 = ContainerId.newContainerId(appAttemptId2, 1); writeContainerLogs(app2LogDir, container2, new String[] { "stdout", - "stderr", "syslog" }); + "stderr", "syslog" }, EMPTY_FILES); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container2, ContainerType.APPLICATION_MASTER, 0)); @@ -1629,7 +1643,7 @@ public void testLogAggregationServiceWithPatterns() throws Exception { this.user, null, this.acls, context1)); ContainerId container3 = ContainerId.newContainerId(appAttemptId3, 1); writeContainerLogs(app3LogDir, container3, new String[] { "stdout", - "sys.log", "std.log", "out.log", "err.log", "log" }); + "sys.log", "std.log", "out.log", "err.log", "log" }, EMPTY_FILES); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container3, ContainerType.APPLICATION_MASTER, 0)); @@ -1654,7 +1668,7 @@ public void testLogAggregationServiceWithPatterns() throws Exception { this.user, null, this.acls, context2)); ContainerId container4 = ContainerId.newContainerId(appAttemptId4, 1); writeContainerLogs(app4LogDir, container4, new String[] { "stdout", - "sys.log", "std.log", "out.log", "err.log", "log" }); + "sys.log", "std.log", "out.log", "err.log", "log" }, EMPTY_FILES); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container4, ContainerType.APPLICATION_MASTER, 0)); @@ -1682,19 +1696,19 @@ public void testLogAggregationServiceWithPatterns() throws Exception { String[] logFiles = new String[] { "stdout", "syslog" }; verifyContainerLogs(logAggregationService, application1, - new ContainerId[] { container1 }, logFiles, 2, false); + new ContainerId[] {container1}, logFiles, 2, false, new String[] {}); logFiles = new String[] { "stderr" }; verifyContainerLogs(logAggregationService, application2, - new ContainerId[] { container2 }, logFiles, 1, false); + new ContainerId[] {container2}, logFiles, 2, false, EMPTY_FILES); logFiles = new String[] { "out.log", "err.log" }; verifyContainerLogs(logAggregationService, application3, - new ContainerId[] { container3 }, logFiles, 2, false); + new ContainerId[] {container3}, logFiles, 2, false, new String[] {}); logFiles = new String[] { "sys.log" }; verifyContainerLogs(logAggregationService, application4, - new ContainerId[] { container4 }, logFiles, 1, false); + new ContainerId[] {container4}, logFiles, 1, false, new String[] {}); dispatcher.await(); @@ -1721,8 +1735,8 @@ public void testLogAggregationServiceWithPatternsAndIntervals() // When the app is running, we only aggregate the log with // the name stdout. After the app finishes, we only aggregate // the log with the name std_final. - logAggregationContext.setRolledLogsIncludePattern("stdout"); - logAggregationContext.setIncludePattern("std_final"); + logAggregationContext.setRolledLogsIncludePattern("stdout|zero"); + logAggregationContext.setIncludePattern("std_final|empty_final"); this.conf.set( YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); //configure YarnConfiguration.NM_REMOTE_APP_LOG_DIR to @@ -1767,7 +1781,8 @@ public void testLogAggregationServiceWithPatternsAndIntervals() // until the app finishes. String[] logFilesWithFinalLog = new String[] {"stdout", "std_final"}; - writeContainerLogs(appLogDir, container, logFilesWithFinalLog); + String[] zeroFiles = new String[] {"zero", "empty_final"}; + writeContainerLogs(appLogDir, container, logFilesWithFinalLog, zeroFiles); // Do log aggregation AppLogAggregatorImpl aggregator = @@ -1781,7 +1796,7 @@ public void testLogAggregationServiceWithPatternsAndIntervals() String[] logFiles = new String[] { "stdout" }; verifyContainerLogs(logAggregationService, application, - new ContainerId[] {container}, logFiles, 1, true); + new ContainerId[] {container}, logFiles, 2, true, EMPTY_FILES); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container, @@ -1800,8 +1815,9 @@ public void testLogAggregationServiceWithPatternsAndIntervals() // This container finishes. // The log "std_final" should be aggregated this time. String[] logFinalLog = new String[] {"std_final"}; + String[] emptyFinalLog = new String[] {"empty_final"}; verifyContainerLogs(logAggregationService, application, - new ContainerId[] {container}, logFinalLog, 1, true); + new ContainerId[] {container}, logFinalLog, 2, true, emptyFinalLog); logAggregationService.handle(new LogHandlerAppFinishedEvent(application)); @@ -1823,7 +1839,7 @@ public void testNoneContainerPolicy() throws Exception { finishApplication(appId, logAggregationService); verifyContainerLogs(logAggregationService, appId, - new ContainerId[] {container1}, logFiles, 0, false); + new ContainerId[] {container1}, logFiles, 0, false, EMPTY_FILES); verifyLogAggFinishEvent(appId); } @@ -1847,7 +1863,7 @@ public void testFailedContainerPolicy() throws Exception { finishApplication(appId, logAggregationService); verifyContainerLogs(logAggregationService, appId, - new ContainerId[] { container1 }, logFiles, 1, false); + new ContainerId[] {container1}, logFiles, 2, false, EMPTY_FILES); verifyLogAggFinishEvent(appId); } @@ -1871,7 +1887,8 @@ public void testAMOrFailedContainerPolicy() throws Exception { finishApplication(appId, logAggregationService); verifyContainerLogs(logAggregationService, appId, - new ContainerId[] { container1, container2 }, logFiles, 1, false); + new ContainerId[] {container1, container2}, logFiles, 2, false, + EMPTY_FILES); verifyLogAggFinishEvent(appId); } @@ -1895,7 +1912,8 @@ public void testFailedOrKilledContainerPolicy() throws Exception { finishApplication(appId, logAggregationService); verifyContainerLogs(logAggregationService, appId, - new ContainerId[] { container2, container3 }, logFiles, 1, false); + new ContainerId[] {container2, container3}, logFiles, 2, false, + EMPTY_FILES); verifyLogAggFinishEvent(appId); } @@ -1931,7 +1949,7 @@ public void testAMOnlyContainerPolicy() throws Exception { finishApplication(appId, logAggregationService); verifyContainerLogs(logAggregationService, appId, - new ContainerId[] { container1 }, logFiles, 1, false); + new ContainerId[] {container1}, logFiles, 2, false, EMPTY_FILES); verifyLogAggFinishEvent(appId); } @@ -2080,7 +2098,7 @@ private void verifyDefaultPolicy(ApplicationId appId, verifyContainerLogs(logAggregationService, appId, new ContainerId[] { container1, container2, container3 }, - logFiles, 1, false); + logFiles, 2, false, EMPTY_FILES); verifyLogAggFinishEvent(appId); } @@ -2162,7 +2180,7 @@ private void setupAndTestSampleContainerPolicy(int successfulContainers, verifyContainerLogs(logAggregationService, appId, containerIds.toArray(new ContainerId[containerIds.size()]), minOfContainersWithLogs, maxOfContainersWithLogs, - logFiles, 1, false); + logFiles, 2, false, EMPTY_FILES); verifyLogAggFinishEvent(appId); } @@ -2240,7 +2258,7 @@ private ContainerId finishContainer(ApplicationId application1, File appLogDir1 = new File(localLogDir, application1.toString()); appLogDir1.mkdir(); - writeContainerLogs(appLogDir1, containerId, logFiles); + writeContainerLogs(appLogDir1, containerId, logFiles, EMPTY_FILES); logAggregationService.handle(new LogHandlerContainerFinishedEvent( containerId, containerType, exitCode)); @@ -2361,7 +2379,8 @@ private void testLogAggregationService(boolean retentionSizeLimitation) String[] logFiles1WithFinalLog = new String[] { "stdout", "stderr", "syslog", "std_final" }; String[] logFiles1 = new String[] { "stdout", "stderr", "syslog"}; - writeContainerLogs(appLogDir, container, logFiles1WithFinalLog); + writeContainerLogs(appLogDir, container, logFiles1WithFinalLog, + EMPTY_FILES); // Do log aggregation AppLogAggregatorImpl aggregator = @@ -2378,7 +2397,7 @@ private void testLogAggregationService(boolean retentionSizeLimitation) } // Container logs should be uploaded logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application, - new ContainerId[] { container }, logFiles1, 3, true); + new ContainerId[] {container}, logFiles1, 4, true, EMPTY_FILES); for(String logFile : logFiles1) { Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle() .contains(logFile)); @@ -2403,7 +2422,7 @@ private void testLogAggregationService(boolean retentionSizeLimitation) // Do log aggregation String[] logFiles2 = new String[] { "stdout_1", "stderr_1", "syslog_1" }; - writeContainerLogs(appLogDir, container, logFiles2); + writeContainerLogs(appLogDir, container, logFiles2, EMPTY_FILES); aggregator.doLogAggregationOutOfBand(); @@ -2416,7 +2435,7 @@ private void testLogAggregationService(boolean retentionSizeLimitation) } // Container logs should be uploaded logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application, - new ContainerId[] { container }, logFiles2, 3, true); + new ContainerId[] {container}, logFiles2, 4, true, EMPTY_FILES); for(String logFile : logFiles2) { Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle() @@ -2430,7 +2449,7 @@ private void testLogAggregationService(boolean retentionSizeLimitation) // create another logs String[] logFiles3 = new String[] { "stdout_2", "stderr_2", "syslog_2" }; - writeContainerLogs(appLogDir, container, logFiles3); + writeContainerLogs(appLogDir, container, logFiles3, EMPTY_FILES); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container, @@ -2450,7 +2469,8 @@ private void testLogAggregationService(boolean retentionSizeLimitation) String[] logFiles3WithFinalLog = new String[] { "stdout_2", "stderr_2", "syslog_2", "std_final" }; verifyContainerLogs(logAggregationService, application, - new ContainerId[] { container }, logFiles3WithFinalLog, 4, true); + new ContainerId[] {container}, logFiles3WithFinalLog, 5, true, + EMPTY_FILES); logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index 740af8f974..ad17ae8132 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -28,6 +28,7 @@ import com.sun.jersey.api.client.WebResource; import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; import com.sun.jersey.test.framework.WebAppDescriptor; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -118,6 +119,7 @@ public class TestNMWebServices extends JerseyTestBase { private static LocalDirsHandlerService dirsHandler; private static WebApp nmWebApp; private static final String LOGSERVICEWSADDR = "test:1234"; + private static final String LOG_MESSAGE = "log message\n"; private static final File testRootDir = new File("target", TestNMWebServices.class.getSimpleName()); @@ -441,20 +443,26 @@ public void testSingleNodesXML() throws JSONException, Exception { @Test (timeout = 5000) public void testContainerLogsWithNewAPI() throws Exception { - final ContainerId containerId = BuilderUtils.newContainerId(0, 0, 0, 0); - WebResource r = resource(); - r = r.path("ws").path("v1").path("node").path("containers") - .path(containerId.toString()).path("logs"); - testContainerLogs(r, containerId); + ContainerId containerId0 = BuilderUtils.newContainerId(0, 0, 0, 0); + WebResource r0 = resource(); + r0 = r0.path("ws").path("v1").path("node").path("containers") + .path(containerId0.toString()).path("logs"); + testContainerLogs(r0, containerId0, LOG_MESSAGE); + + ContainerId containerId1 = BuilderUtils.newContainerId(0, 0, 0, 1); + WebResource r1 = resource(); + r1 = r1.path("ws").path("v1").path("node").path("containers") + .path(containerId1.toString()).path("logs"); + testContainerLogs(r1, containerId1, ""); } @Test (timeout = 5000) public void testContainerLogsWithOldAPI() throws Exception { - final ContainerId containerId = BuilderUtils.newContainerId(1, 1, 0, 1); + final ContainerId containerId2 = BuilderUtils.newContainerId(1, 1, 0, 2); WebResource r = resource(); r = r.path("ws").path("v1").path("node").path("containerlogs") - .path(containerId.toString()); - testContainerLogs(r, containerId); + .path(containerId2.toString()); + testContainerLogs(r, containerId2, LOG_MESSAGE); } @Test (timeout = 10000) @@ -583,15 +591,14 @@ public void testGetYarnGpuResourceInfo() 2, json.getJSONArray("assignedGpuDevices").length()); } - private void testContainerLogs(WebResource r, ContainerId containerId) - throws Exception { + private void testContainerLogs(WebResource r, ContainerId containerId, + String logMessage) throws Exception { final String containerIdStr = containerId.toString(); final ApplicationAttemptId appAttemptId = containerId .getApplicationAttemptId(); final ApplicationId appId = appAttemptId.getApplicationId(); final String appIdStr = appId.toString(); final String filename = "logfile1"; - final String logMessage = "log message\n"; nmContext.getApplications().put(appId, new ApplicationImpl(null, "user", appId, null, nmContext)); @@ -607,6 +614,9 @@ private void testContainerLogs(WebResource r, ContainerId containerId) File logFile = new File(path.toUri().getPath()); logFile.deleteOnExit(); + if (logFile.getParentFile().exists()) { + FileUtils.deleteDirectory(logFile.getParentFile()); + } assertTrue("Failed to create log dir", logFile.getParentFile().mkdirs()); PrintWriter pw = new PrintWriter(logFile); pw.print(logMessage); @@ -628,8 +638,10 @@ private void testContainerLogs(WebResource r, ContainerId containerId) .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); responseText = response.getEntity(String.class); responseLogMessage = getLogContext(responseText); - assertEquals(5, responseLogMessage.getBytes().length); - assertEquals(new String(logMessage.getBytes(), 0, 5), responseLogMessage); + int truncatedLength = Math.min(5, logMessage.getBytes().length); + assertEquals(truncatedLength, responseLogMessage.getBytes().length); + assertEquals(new String(logMessage.getBytes(), 0, truncatedLength), + responseLogMessage); assertTrue(fullTextSize >= responseLogMessage.getBytes().length); // specify the bytes which is larger than the actual file size, @@ -649,9 +661,10 @@ private void testContainerLogs(WebResource r, ContainerId containerId) .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); responseText = response.getEntity(String.class); responseLogMessage = getLogContext(responseText); - assertEquals(5, responseLogMessage.getBytes().length); + assertEquals(truncatedLength, responseLogMessage.getBytes().length); assertEquals(new String(logMessage.getBytes(), - logMessage.getBytes().length - 5, 5), responseLogMessage); + logMessage.getBytes().length - truncatedLength, truncatedLength), + responseLogMessage); assertTrue(fullTextSize >= responseLogMessage.getBytes().length); response = r.path(filename)