YARN-9808. Zero length files in container log output haven't got a header. Contributed by Adam Antal

This commit is contained in:
Szilard Nemeth 2019-09-25 10:28:34 +02:00
parent 3f89084ac7
commit bec0864394
6 changed files with 239 additions and 102 deletions

View File

@ -36,6 +36,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableList;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
@ -52,6 +53,7 @@
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -93,9 +95,14 @@
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestLogsCLI {
private static final Logger LOG =
LoggerFactory.getLogger(TestLogsCLI.class);
ByteArrayOutputStream sysOutStream;
private PrintStream sysOut;
@ -402,13 +409,15 @@ public void testFetchFinishedApplictionLogs() throws Exception {
List<String> logTypes = new ArrayList<String>();
logTypes.add("syslog");
// create container logs in localLogDir
createContainerLogInLocalDir(appLogsDir, containerId1, fs, logTypes);
createContainerLogInLocalDir(appLogsDir, containerId2, fs, logTypes);
createContainerLogInLocalDir(appLogsDir, containerId1, fs, logTypes,
ImmutableList.of("empty"));
createContainerLogInLocalDir(appLogsDir, containerId2, fs, logTypes,
Collections.emptyList());
// create two logs for container3 in localLogDir
logTypes.add("stdout");
logTypes.add("stdout1234");
createContainerLogInLocalDir(appLogsDir, containerId3, fs, logTypes);
createContainerLogInLocalDir(appLogsDir, containerId3, fs, logTypes,
Collections.emptyList());
Path path =
new Path(remoteLogRootDir + ugi.getShortUserName()
@ -449,6 +458,7 @@ public ContainerReport getContainerReport(String containerIdStr)
cli.setConf(configuration);
int exitCode = cli.run(new String[] { "-applicationId", appId.toString() });
LOG.info(sysOutStream.toString());
assertTrue(exitCode == 0);
assertTrue(sysOutStream.toString().contains(
logMessage(containerId1, "syslog")));
@ -460,6 +470,8 @@ public ContainerReport getContainerReport(String containerIdStr)
logMessage(containerId3, "stdout")));
assertTrue(sysOutStream.toString().contains(
logMessage(containerId3, "stdout1234")));
assertTrue(sysOutStream.toString().contains(
createEmptyLog("empty")));
sysOutStream.reset();
exitCode = cli.run(new String[] {"-applicationId", appId.toString(),
@ -475,6 +487,8 @@ public ContainerReport getContainerReport(String containerIdStr)
logMessage(containerId3, "stdout")));
assertTrue(sysOutStream.toString().contains(
logMessage(containerId3, "stdout1234")));
assertTrue(sysOutStream.toString().contains(
createEmptyLog("empty")));
sysOutStream.reset();
exitCode = cli.run(new String[] {"-applicationId", appId.toString(),
@ -490,6 +504,8 @@ public ContainerReport getContainerReport(String containerIdStr)
logMessage(containerId3, "stdout")));
assertTrue(sysOutStream.toString().contains(
logMessage(containerId3, "stdout1234")));
assertTrue(sysOutStream.toString().contains(
createEmptyLog("empty")));
int fullSize = sysOutStream.toByteArray().length;
sysOutStream.reset();
@ -506,6 +522,8 @@ public ContainerReport getContainerReport(String containerIdStr)
logMessage(containerId3, "stdout")));
assertFalse(sysOutStream.toString().contains(
logMessage(containerId3, "stdout1234")));
assertFalse(sysOutStream.toString().contains(
createEmptyLog("empty")));
sysOutStream.reset();
exitCode = cli.run(new String[] {"-applicationId", appId.toString(),
@ -521,6 +539,8 @@ public ContainerReport getContainerReport(String containerIdStr)
logMessage(containerId3, "stdout")));
assertTrue(sysOutStream.toString().contains(
logMessage(containerId3, "stdout1234")));
assertFalse(sysOutStream.toString().contains(
createEmptyLog("empty")));
sysOutStream.reset();
exitCode = cli.run(new String[] {"-applicationId", appId.toString(),
@ -591,6 +611,15 @@ public ContainerReport getContainerReport(String containerIdStr)
(fullContextSize - fileContentSize - tailContentSize), 5));
sysOutStream.reset();
// specify how many bytes we should get from an empty log
exitCode = cli.run(new String[] {"-applicationId", appId.toString(),
"-containerId", containerId1.toString(), "-log_files", "empty",
"-size", "5"});
assertTrue(exitCode == 0);
assertTrue(sysOutStream.toString().contains(
createEmptyLog("empty")));
sysOutStream.reset();
// specify a negative number, it would get the last n bytes from
// container log
exitCode = cli.run(new String[] {"-applicationId", appId.toString(),
@ -794,7 +823,8 @@ public void testGetRunningContainerLogs() throws Exception {
List<String> logTypes = new ArrayList<String>();
logTypes.add(fileName);
// create container logs in localLogDir
createContainerLogInLocalDir(appLogsDir, containerId1, fs, logTypes);
createContainerLogInLocalDir(appLogsDir, containerId1, fs, logTypes,
Collections.emptyList());
Path containerDirPath = new Path(appLogsDir, containerId1.toString());
Path logPath = new Path(containerDirPath, fileName);
@ -968,7 +998,8 @@ public void testFetchApplictionLogsAsAnotherUser() throws Exception {
logTypes.add("syslog");
// create container logs in localLogDir for app
createContainerLogInLocalDir(appLogsDir, containerId, fs, logTypes);
createContainerLogInLocalDir(appLogsDir, containerId, fs, logTypes,
Collections.emptyList());
// create the remote app dir for app but for a different user testUser
Path path = new Path(remoteLogRootDir + testUser +
@ -1547,7 +1578,8 @@ private void createContainerLogs(Configuration configuration,
logTypes.add("syslog");
// create container logs in localLogDir
for (ContainerId containerId : containerIds) {
createContainerLogInLocalDir(appLogsDir, containerId, fs, logTypes);
createContainerLogInLocalDir(appLogsDir, containerId, fs, logTypes,
Collections.emptyList());
}
Path path =
new Path(remoteLogRootDir + ugi.getShortUserName()
@ -1564,7 +1596,8 @@ private void createContainerLogs(Configuration configuration,
}
private static void createContainerLogInLocalDir(Path appLogsDir,
ContainerId containerId, FileSystem fs, List<String> logTypes) throws Exception {
ContainerId containerId, FileSystem fs, List<String> logTypes,
List<String> emptyLogTypes) throws Exception {
Path containerLogsDir = new Path(appLogsDir, containerId.toString());
if (fs.exists(containerLogsDir)) {
fs.delete(containerLogsDir, true);
@ -1576,6 +1609,12 @@ private static void createContainerLogInLocalDir(Path appLogsDir,
writer.write(logMessage(containerId, logType));
writer.close();
}
for (String emptyLogType : emptyLogTypes) {
Writer writer =
new FileWriter(new File(containerLogsDir.toString(), emptyLogType));
writer.write("");
writer.close();
}
}
private static String logMessage(ContainerId containerId, String logType) {
@ -1584,6 +1623,10 @@ private static String logMessage(ContainerId containerId, String logType) {
return sb.toString();
}
private static String createEmptyLog(String logType) {
return "LogContents:\n\nEnd of LogType:" + logType;
}
private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
ContainerId containerId, Path appDir, FileSystem fs) throws Exception {

View File

@ -43,6 +43,26 @@ private LogToolUtils() {}
public static final String CONTAINER_ON_NODE_PATTERN =
"Container: %s on %s";
/**
* Formats the header of an aggregated log file.
*/
private static byte[] formatContainerLogHeader(String containerId,
String nodeId, ContainerLogAggregationType logType, String fileName,
String lastModifiedTime, long fileLength) {
StringBuilder sb = new StringBuilder();
String containerStr = String.format(
LogToolUtils.CONTAINER_ON_NODE_PATTERN,
containerId, nodeId);
sb.append(containerStr + "\n")
.append("LogAggregationType: " + logType + "\n")
.append(StringUtils.repeat("=", containerStr.length()) + "\n")
.append("LogType:" + fileName + "\n")
.append("LogLastModifiedTime:" + lastModifiedTime + "\n")
.append("LogLength:" + fileLength + "\n")
.append("LogContents:\n");
return sb.toString().getBytes(Charset.forName("UTF-8"));
}
/**
* Output container log.
* @param containerId the containerId
@ -84,22 +104,10 @@ public static void outputContainerLog(String containerId, String nodeId,
: (int) pendingRead;
int len = fis.read(buf, 0, toRead);
boolean keepGoing = (len != -1 && curRead < totalBytesToRead);
if (keepGoing) {
StringBuilder sb = new StringBuilder();
String containerStr = String.format(
LogToolUtils.CONTAINER_ON_NODE_PATTERN,
containerId, nodeId);
sb.append(containerStr + "\n")
.append("LogAggregationType: " + logType + "\n")
.append(StringUtils.repeat("=", containerStr.length()) + "\n")
.append("LogType:" + fileName + "\n")
.append("LogLastModifiedTime:" + lastModifiedTime + "\n")
.append("LogLength:" + Long.toString(fileLength) + "\n")
.append("LogContents:\n");
byte[] b = sb.toString().getBytes(
Charset.forName("UTF-8"));
os.write(b, 0, b.length);
}
byte[] b = formatContainerLogHeader(containerId, nodeId, logType, fileName,
lastModifiedTime, fileLength);
os.write(b, 0, b.length);
while (keepGoing) {
os.write(buf, 0, len);
curRead += len;
@ -132,22 +140,12 @@ public static void outputContainerLogThroughZeroCopy(String containerId,
}
}
// output log summary
byte[] b = formatContainerLogHeader(containerId, nodeId, logType, fileName,
lastModifiedTime, fileLength);
os.write(b, 0, b.length);
if (totalBytesToRead > 0) {
// output log summary
StringBuilder sb = new StringBuilder();
String containerStr = String.format(
LogToolUtils.CONTAINER_ON_NODE_PATTERN,
containerId, nodeId);
sb.append(containerStr + "\n")
.append("LogAggregationType: " + logType + "\n")
.append(StringUtils.repeat("=", containerStr.length()) + "\n")
.append("LogType:" + fileName + "\n")
.append("LogLastModifiedTime:" + lastModifiedTime + "\n")
.append("LogLength:" + Long.toString(fileLength) + "\n")
.append("LogContents:\n");
byte[] b = sb.toString().getBytes(
Charset.forName("UTF-8"));
os.write(b, 0, b.length);
// output log content
FileChannel inputChannel = fis.getChannel();
WritableByteChannel outputChannel = Channels.newChannel(os);

View File

@ -67,7 +67,6 @@ public class TestAggregatedLogFormat {
private static final File testWorkDir = new File("target",
"TestAggregatedLogFormat");
private static final Configuration conf = new Configuration();
private static final FileSystem fs;
private static final char filler = 'x';
private static final Logger LOG = LoggerFactory
@ -75,7 +74,7 @@ public class TestAggregatedLogFormat {
static {
try {
fs = FileSystem.get(conf);
fs = FileSystem.get(new Configuration());
} catch (IOException e) {
throw new RuntimeException(e);
}
@ -282,6 +281,45 @@ private void testReadAcontainerLog(boolean logUploadedTime) throws Exception {
Assert.assertEquals(expectedLength, s.length());
}
@Test
public void testZeroLengthLog() throws IOException {
Configuration conf = new Configuration();
File workDir = new File(testWorkDir, "testZeroLength");
Path remoteAppLogFile = new Path(workDir.getAbsolutePath(),
"aggregatedLogFile");
Path srcFileRoot = new Path(workDir.getAbsolutePath(), "srcFiles");
ContainerId testContainerId = TestContainerId.newContainerId(1, 1, 1, 1);
Path t = new Path(srcFileRoot, testContainerId.getApplicationAttemptId()
.getApplicationId().toString());
Path srcFilePath = new Path(t, testContainerId.toString());
// Create zero byte file
writeSrcFile(srcFilePath, "stdout", 0);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
try (LogWriter logWriter = new LogWriter()) {
logWriter.initialize(conf, remoteAppLogFile, ugi);
LogKey logKey = new LogKey(testContainerId);
LogValue logValue =
new LogValue(Collections.singletonList(srcFileRoot.toString()),
testContainerId, ugi.getShortUserName());
logWriter.append(logKey, logValue);
}
LogReader logReader = new LogReader(conf, remoteAppLogFile);
LogKey rLogKey = new LogKey();
DataInputStream dis = logReader.next(rLogKey);
Writer writer = new StringWriter();
LogReader.readAcontainerLogs(dis, writer);
Assert.assertEquals("LogType:stdout\n" +
"LogLength:0\n" +
"Log Contents:\n\n" +
"End of LogType:stdout\n\n", writer.toString());
}
@Test(timeout=10000)
public void testContainerLogsFileAccess() throws IOException {
// This test will run only if NativeIO is enabled as SecureIOUtils

View File

@ -85,6 +85,7 @@ public class TestLogAggregationIndexedFileController
.createImmutable((short) (0777));
private static final UserGroupInformation USER_UGI = UserGroupInformation
.createRemoteUser("testUser");
private static final String ZERO_FILE = "zero";
private FileSystem fs;
private ApplicationId appId;
private ContainerId containerId;
@ -153,6 +154,8 @@ public void testLogAggregationIndexFileFormat() throws Exception {
logType);
files.add(file);
}
files.add(createZeroLocalLogFile(appLogsDir));
LogValue value = mock(LogValue.class);
when(value.getPendingLogFilesToUploadForThisContainer()).thenReturn(files);
@ -212,12 +215,13 @@ public boolean isRollover(final FileContext fc,
for (ContainerLogMeta log : meta) {
assertEquals(containerId.toString(), log.getContainerId());
assertEquals(nodeId.toString(), log.getNodeId());
assertEquals(3, log.getContainerLogMeta().size());
assertEquals(4, log.getContainerLogMeta().size());
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
fileNames.add(file.getFileName());
}
}
fileNames.removeAll(logTypes);
fileNames.remove(ZERO_FILE);
assertTrue(fileNames.isEmpty());
boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
@ -226,6 +230,7 @@ public boolean isRollover(final FileContext fc,
assertTrue(sysOutStream.toString().contains(logMessage(
containerId, logType)));
}
assertZeroFileIsContained(sysOutStream.toString());
sysOutStream.reset();
Configuration factoryConf = new Configuration(getConf());
@ -297,12 +302,13 @@ public boolean isRollover(final FileContext fc,
for (ContainerLogMeta log : meta) {
assertEquals(containerId.toString(), log.getContainerId());
assertEquals(nodeId.toString(), log.getNodeId());
assertEquals(3, log.getContainerLogMeta().size());
assertEquals(4, log.getContainerLogMeta().size());
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
fileNames.add(file.getFileName());
}
}
fileNames.removeAll(logTypes);
fileNames.remove(ZERO_FILE);
assertTrue(fileNames.isEmpty());
foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
assertTrue(foundLogs);
@ -333,6 +339,7 @@ public boolean isRollover(final FileContext fc,
}
}
fileNames.removeAll(newLogTypes);
fileNames.remove(ZERO_FILE);
assertTrue(fileNames.isEmpty());
foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
assertTrue(foundLogs);
@ -361,6 +368,7 @@ public boolean isRollover(final FileContext fc,
}
}
fileNames.removeAll(newLogTypes);
fileNames.remove(ZERO_FILE);
assertTrue(fileNames.isEmpty());
foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
assertTrue(foundLogs);
@ -423,8 +431,25 @@ public void testFetchApplictionLogsHar() throws Exception {
sysOutStream.reset();
}
private void assertZeroFileIsContained(String outStream) {
assertTrue(outStream.contains(
"LogContents:\n" +
"\n" +
"End of LogType:zero"));
}
private File createZeroLocalLogFile(Path localLogDir) throws IOException {
return createAndWriteLocalLogFile(localLogDir, ZERO_FILE, "");
}
private File createAndWriteLocalLogFile(ContainerId containerId,
Path localLogDir, String logType) throws IOException {
return createAndWriteLocalLogFile(localLogDir, logType,
logMessage(containerId, logType));
}
private File createAndWriteLocalLogFile(Path localLogDir, String logType,
String message) throws IOException {
File file = new File(localLogDir.toString(), logType);
if (file.exists()) {
file.delete();
@ -433,7 +458,7 @@ private File createAndWriteLocalLogFile(ContainerId containerId,
Writer writer = null;
try {
writer = new FileWriter(file);
writer.write(logMessage(containerId, logType));
writer.write(message);
writer.close();
return file;
} finally {

View File

@ -153,6 +153,7 @@
public class TestLogAggregationService extends BaseContainerManagerTest {
private Map<ApplicationAccessType, String> acls = createAppAcls();
private static final String[] EMPTY_FILES = new String[] {"zero"};
static {
LOG = LoggerFactory.getLogger(TestLogAggregationService.class);
@ -219,7 +220,7 @@ private void verifyLocalFileDeletion(
ContainerId container11 = ContainerId.newContainerId(appAttemptId, 1);
// Simulate log-file creation
writeContainerLogs(app1LogDir, container11, new String[] { "stdout",
"stderr", "syslog" });
"stderr", "syslog" }, EMPTY_FILES);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container11,
ContainerType.APPLICATION_MASTER, 0));
@ -342,7 +343,7 @@ public void testNoLogsUploadedOnAppFinish() throws Exception {
BuilderUtils.newApplicationAttemptId(app, 1);
ContainerId cont = ContainerId.newContainerId(appAttemptId, 1);
writeContainerLogs(appLogDir, cont, new String[] { "stdout",
"stderr", "syslog" });
"stderr", "syslog" }, EMPTY_FILES);
logAggregationService.handle(new LogHandlerContainerFinishedEvent(cont,
ContainerType.APPLICATION_MASTER, 0));
logAggregationService.handle(new LogHandlerAppFinishedEvent(app));
@ -432,7 +433,7 @@ public void testMultipleAppsLogAggregation() throws Exception {
ContainerId container11 = ContainerId.newContainerId(appAttemptId1, 1);
// Simulate log-file creation
writeContainerLogs(app1LogDir, container11, fileNames);
writeContainerLogs(app1LogDir, container11, fileNames, EMPTY_FILES);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container11,
ContainerType.APPLICATION_MASTER, 0));
@ -454,14 +455,14 @@ public void testMultipleAppsLogAggregation() throws Exception {
ContainerId container21 = ContainerId.newContainerId(appAttemptId2, 1);
writeContainerLogs(app2LogDir, container21, fileNames);
writeContainerLogs(app2LogDir, container21, fileNames, EMPTY_FILES);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container21,
ContainerType.APPLICATION_MASTER, 0));
ContainerId container12 = ContainerId.newContainerId(appAttemptId1, 2);
writeContainerLogs(app1LogDir, container12, fileNames);
writeContainerLogs(app1LogDir, container12, fileNames, EMPTY_FILES);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container12,
ContainerType.TASK, 0));
@ -497,25 +498,25 @@ public void testMultipleAppsLogAggregation() throws Exception {
reset(appEventHandler);
ContainerId container31 = ContainerId.newContainerId(appAttemptId3, 1);
writeContainerLogs(app3LogDir, container31, fileNames);
writeContainerLogs(app3LogDir, container31, fileNames, EMPTY_FILES);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container31,
ContainerType.APPLICATION_MASTER, 0));
ContainerId container32 = ContainerId.newContainerId(appAttemptId3, 2);
writeContainerLogs(app3LogDir, container32, fileNames);
writeContainerLogs(app3LogDir, container32, fileNames, EMPTY_FILES);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container32,
ContainerType.TASK, 1)); // Failed
ContainerId container22 = ContainerId.newContainerId(appAttemptId2, 2);
writeContainerLogs(app2LogDir, container22, fileNames);
writeContainerLogs(app2LogDir, container22, fileNames, EMPTY_FILES);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container22,
ContainerType.TASK, 0));
ContainerId container33 = ContainerId.newContainerId(appAttemptId3, 3);
writeContainerLogs(app3LogDir, container33, fileNames);
writeContainerLogs(app3LogDir, container33, fileNames, EMPTY_FILES);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container33,
ContainerType.TASK, 0));
@ -531,13 +532,15 @@ public void testMultipleAppsLogAggregation() throws Exception {
assertEquals(0, logAggregationService.getNumAggregators());
verifyContainerLogs(logAggregationService, application1,
new ContainerId[] { container11, container12 }, fileNames, 3, false);
new ContainerId[] {container11, container12}, fileNames, 4, false,
EMPTY_FILES);
verifyContainerLogs(logAggregationService, application2,
new ContainerId[] { container21 }, fileNames, 3, false);
new ContainerId[] {container21}, fileNames, 4, false, EMPTY_FILES);
verifyContainerLogs(logAggregationService, application3,
new ContainerId[] { container31, container32 }, fileNames, 3, false);
new ContainerId[] {container31, container32}, fileNames, 4, false,
EMPTY_FILES);
dispatcher.await();
@ -935,7 +938,7 @@ public LogAggregationFileController getLogAggregationFileController(
}
private void writeContainerLogs(File appLogDir, ContainerId containerId,
String[] fileName) throws IOException {
String[] fileName, String[] emptyFiles) throws IOException {
// ContainerLogDir should be created
String containerStr = containerId.toString();
File containerLogDir = new File(appLogDir, containerStr);
@ -947,17 +950,22 @@ private void writeContainerLogs(File appLogDir, ContainerId containerId,
writer11.write(containerStr + " Hello " + fileType + "!");
writer11.close();
}
for (String emptyFile : emptyFiles) {
Writer writer11 = new FileWriter(new File(containerLogDir, emptyFile));
writer11.write("");
writer11.close();
}
}
private LogFileStatusInLastCycle verifyContainerLogs(
LogAggregationService logAggregationService,
ApplicationId appId, ContainerId[] expectedContainerIds,
String[] logFiles, int numOfLogsPerContainer,
boolean multiLogs) throws IOException {
boolean multiLogs, String[] zeroLengthFiles) throws IOException {
return verifyContainerLogs(logAggregationService, appId,
expectedContainerIds, expectedContainerIds.length,
expectedContainerIds.length, logFiles, numOfLogsPerContainer,
multiLogs);
multiLogs, zeroLengthFiles);
}
// expectedContainerIds is the minimal set of containers to check.
@ -968,7 +976,8 @@ private LogFileStatusInLastCycle verifyContainerLogs(
LogAggregationService logAggregationService,
ApplicationId appId, ContainerId[] expectedContainerIds,
int minNumOfContainers, int maxNumOfContainers,
String[] logFiles, int numOfLogsPerContainer, boolean multiLogs)
String[] logFiles, int numOfLogsPerContainer, boolean multiLogs,
String[] zeroLengthLogFiles)
throws IOException {
Path appLogDir = logAggregationService.getLogAggregationFileController(
conf).getRemoteAppLogDir(appId, this.user);
@ -1089,6 +1098,11 @@ private LogFileStatusInLastCycle verifyContainerLogs(
+ " not present in aggregated log-file!", foundValue);
Assert.assertEquals(expectedValue, foundValue);
}
for (String emptyFile : zeroLengthLogFiles) {
String foundValue = thisContainerMap.remove(emptyFile);
String expectedValue = "\nEnd of LogType:" + emptyFile;
Assert.assertEquals(expectedValue, foundValue);
}
Assert.assertEquals(0, thisContainerMap.size());
}
Assert.assertTrue("number of remaining containers should be at least " +
@ -1584,7 +1598,7 @@ public void testLogAggregationServiceWithPatterns() throws Exception {
// Simulate log-file creation
writeContainerLogs(appLogDir1, container1, new String[] { "stdout",
"stderr", "syslog" });
"stderr", "syslog" }, EMPTY_FILES);
logAggregationService.handle(new LogHandlerContainerFinishedEvent(
container1, ContainerType.APPLICATION_MASTER, 0));
@ -1605,7 +1619,7 @@ public void testLogAggregationServiceWithPatterns() throws Exception {
ContainerId container2 = ContainerId.newContainerId(appAttemptId2, 1);
writeContainerLogs(app2LogDir, container2, new String[] { "stdout",
"stderr", "syslog" });
"stderr", "syslog" }, EMPTY_FILES);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container2,
ContainerType.APPLICATION_MASTER, 0));
@ -1629,7 +1643,7 @@ public void testLogAggregationServiceWithPatterns() throws Exception {
this.user, null, this.acls, context1));
ContainerId container3 = ContainerId.newContainerId(appAttemptId3, 1);
writeContainerLogs(app3LogDir, container3, new String[] { "stdout",
"sys.log", "std.log", "out.log", "err.log", "log" });
"sys.log", "std.log", "out.log", "err.log", "log" }, EMPTY_FILES);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container3,
ContainerType.APPLICATION_MASTER, 0));
@ -1654,7 +1668,7 @@ public void testLogAggregationServiceWithPatterns() throws Exception {
this.user, null, this.acls, context2));
ContainerId container4 = ContainerId.newContainerId(appAttemptId4, 1);
writeContainerLogs(app4LogDir, container4, new String[] { "stdout",
"sys.log", "std.log", "out.log", "err.log", "log" });
"sys.log", "std.log", "out.log", "err.log", "log" }, EMPTY_FILES);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container4,
ContainerType.APPLICATION_MASTER, 0));
@ -1682,19 +1696,19 @@ public void testLogAggregationServiceWithPatterns() throws Exception {
String[] logFiles = new String[] { "stdout", "syslog" };
verifyContainerLogs(logAggregationService, application1,
new ContainerId[] { container1 }, logFiles, 2, false);
new ContainerId[] {container1}, logFiles, 2, false, new String[] {});
logFiles = new String[] { "stderr" };
verifyContainerLogs(logAggregationService, application2,
new ContainerId[] { container2 }, logFiles, 1, false);
new ContainerId[] {container2}, logFiles, 2, false, EMPTY_FILES);
logFiles = new String[] { "out.log", "err.log" };
verifyContainerLogs(logAggregationService, application3,
new ContainerId[] { container3 }, logFiles, 2, false);
new ContainerId[] {container3}, logFiles, 2, false, new String[] {});
logFiles = new String[] { "sys.log" };
verifyContainerLogs(logAggregationService, application4,
new ContainerId[] { container4 }, logFiles, 1, false);
new ContainerId[] {container4}, logFiles, 1, false, new String[] {});
dispatcher.await();
@ -1721,8 +1735,8 @@ public void testLogAggregationServiceWithPatternsAndIntervals()
// When the app is running, we only aggregate the log with
// the name stdout. After the app finishes, we only aggregate
// the log with the name std_final.
logAggregationContext.setRolledLogsIncludePattern("stdout");
logAggregationContext.setIncludePattern("std_final");
logAggregationContext.setRolledLogsIncludePattern("stdout|zero");
logAggregationContext.setIncludePattern("std_final|empty_final");
this.conf.set(
YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
//configure YarnConfiguration.NM_REMOTE_APP_LOG_DIR to
@ -1767,7 +1781,8 @@ public void testLogAggregationServiceWithPatternsAndIntervals()
// until the app finishes.
String[] logFilesWithFinalLog =
new String[] {"stdout", "std_final"};
writeContainerLogs(appLogDir, container, logFilesWithFinalLog);
String[] zeroFiles = new String[] {"zero", "empty_final"};
writeContainerLogs(appLogDir, container, logFilesWithFinalLog, zeroFiles);
// Do log aggregation
AppLogAggregatorImpl aggregator =
@ -1781,7 +1796,7 @@ public void testLogAggregationServiceWithPatternsAndIntervals()
String[] logFiles = new String[] { "stdout" };
verifyContainerLogs(logAggregationService, application,
new ContainerId[] {container}, logFiles, 1, true);
new ContainerId[] {container}, logFiles, 2, true, EMPTY_FILES);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container,
@ -1800,8 +1815,9 @@ public void testLogAggregationServiceWithPatternsAndIntervals()
// This container finishes.
// The log "std_final" should be aggregated this time.
String[] logFinalLog = new String[] {"std_final"};
String[] emptyFinalLog = new String[] {"empty_final"};
verifyContainerLogs(logAggregationService, application,
new ContainerId[] {container}, logFinalLog, 1, true);
new ContainerId[] {container}, logFinalLog, 2, true, emptyFinalLog);
logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
@ -1823,7 +1839,7 @@ public void testNoneContainerPolicy() throws Exception {
finishApplication(appId, logAggregationService);
verifyContainerLogs(logAggregationService, appId,
new ContainerId[] {container1}, logFiles, 0, false);
new ContainerId[] {container1}, logFiles, 0, false, EMPTY_FILES);
verifyLogAggFinishEvent(appId);
}
@ -1847,7 +1863,7 @@ public void testFailedContainerPolicy() throws Exception {
finishApplication(appId, logAggregationService);
verifyContainerLogs(logAggregationService, appId,
new ContainerId[] { container1 }, logFiles, 1, false);
new ContainerId[] {container1}, logFiles, 2, false, EMPTY_FILES);
verifyLogAggFinishEvent(appId);
}
@ -1871,7 +1887,8 @@ public void testAMOrFailedContainerPolicy() throws Exception {
finishApplication(appId, logAggregationService);
verifyContainerLogs(logAggregationService, appId,
new ContainerId[] { container1, container2 }, logFiles, 1, false);
new ContainerId[] {container1, container2}, logFiles, 2, false,
EMPTY_FILES);
verifyLogAggFinishEvent(appId);
}
@ -1895,7 +1912,8 @@ public void testFailedOrKilledContainerPolicy() throws Exception {
finishApplication(appId, logAggregationService);
verifyContainerLogs(logAggregationService, appId,
new ContainerId[] { container2, container3 }, logFiles, 1, false);
new ContainerId[] {container2, container3}, logFiles, 2, false,
EMPTY_FILES);
verifyLogAggFinishEvent(appId);
}
@ -1931,7 +1949,7 @@ public void testAMOnlyContainerPolicy() throws Exception {
finishApplication(appId, logAggregationService);
verifyContainerLogs(logAggregationService, appId,
new ContainerId[] { container1 }, logFiles, 1, false);
new ContainerId[] {container1}, logFiles, 2, false, EMPTY_FILES);
verifyLogAggFinishEvent(appId);
}
@ -2080,7 +2098,7 @@ private void verifyDefaultPolicy(ApplicationId appId,
verifyContainerLogs(logAggregationService, appId,
new ContainerId[] { container1, container2, container3 },
logFiles, 1, false);
logFiles, 2, false, EMPTY_FILES);
verifyLogAggFinishEvent(appId);
}
@ -2162,7 +2180,7 @@ private void setupAndTestSampleContainerPolicy(int successfulContainers,
verifyContainerLogs(logAggregationService, appId,
containerIds.toArray(new ContainerId[containerIds.size()]),
minOfContainersWithLogs, maxOfContainersWithLogs,
logFiles, 1, false);
logFiles, 2, false, EMPTY_FILES);
verifyLogAggFinishEvent(appId);
}
@ -2240,7 +2258,7 @@ private ContainerId finishContainer(ApplicationId application1,
File appLogDir1 =
new File(localLogDir, application1.toString());
appLogDir1.mkdir();
writeContainerLogs(appLogDir1, containerId, logFiles);
writeContainerLogs(appLogDir1, containerId, logFiles, EMPTY_FILES);
logAggregationService.handle(new LogHandlerContainerFinishedEvent(
containerId, containerType, exitCode));
@ -2361,7 +2379,8 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
String[] logFiles1WithFinalLog =
new String[] { "stdout", "stderr", "syslog", "std_final" };
String[] logFiles1 = new String[] { "stdout", "stderr", "syslog"};
writeContainerLogs(appLogDir, container, logFiles1WithFinalLog);
writeContainerLogs(appLogDir, container, logFiles1WithFinalLog,
EMPTY_FILES);
// Do log aggregation
AppLogAggregatorImpl aggregator =
@ -2378,7 +2397,7 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
}
// Container logs should be uploaded
logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles1, 3, true);
new ContainerId[] {container}, logFiles1, 4, true, EMPTY_FILES);
for(String logFile : logFiles1) {
Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
.contains(logFile));
@ -2403,7 +2422,7 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
// Do log aggregation
String[] logFiles2 = new String[] { "stdout_1", "stderr_1", "syslog_1" };
writeContainerLogs(appLogDir, container, logFiles2);
writeContainerLogs(appLogDir, container, logFiles2, EMPTY_FILES);
aggregator.doLogAggregationOutOfBand();
@ -2416,7 +2435,7 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
}
// Container logs should be uploaded
logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles2, 3, true);
new ContainerId[] {container}, logFiles2, 4, true, EMPTY_FILES);
for(String logFile : logFiles2) {
Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
@ -2430,7 +2449,7 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
// create another logs
String[] logFiles3 = new String[] { "stdout_2", "stderr_2", "syslog_2" };
writeContainerLogs(appLogDir, container, logFiles3);
writeContainerLogs(appLogDir, container, logFiles3, EMPTY_FILES);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container,
@ -2450,7 +2469,8 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
String[] logFiles3WithFinalLog =
new String[] { "stdout_2", "stderr_2", "syslog_2", "std_final" };
verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles3WithFinalLog, 4, true);
new ContainerId[] {container}, logFiles3WithFinalLog, 5, true,
EMPTY_FILES);
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
}

View File

@ -28,6 +28,7 @@
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.WebAppDescriptor;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@ -118,6 +119,7 @@ public class TestNMWebServices extends JerseyTestBase {
private static LocalDirsHandlerService dirsHandler;
private static WebApp nmWebApp;
private static final String LOGSERVICEWSADDR = "test:1234";
private static final String LOG_MESSAGE = "log message\n";
private static final File testRootDir = new File("target",
TestNMWebServices.class.getSimpleName());
@ -441,20 +443,26 @@ public void testSingleNodesXML() throws JSONException, Exception {
@Test (timeout = 5000)
public void testContainerLogsWithNewAPI() throws Exception {
final ContainerId containerId = BuilderUtils.newContainerId(0, 0, 0, 0);
WebResource r = resource();
r = r.path("ws").path("v1").path("node").path("containers")
.path(containerId.toString()).path("logs");
testContainerLogs(r, containerId);
ContainerId containerId0 = BuilderUtils.newContainerId(0, 0, 0, 0);
WebResource r0 = resource();
r0 = r0.path("ws").path("v1").path("node").path("containers")
.path(containerId0.toString()).path("logs");
testContainerLogs(r0, containerId0, LOG_MESSAGE);
ContainerId containerId1 = BuilderUtils.newContainerId(0, 0, 0, 1);
WebResource r1 = resource();
r1 = r1.path("ws").path("v1").path("node").path("containers")
.path(containerId1.toString()).path("logs");
testContainerLogs(r1, containerId1, "");
}
@Test (timeout = 5000)
public void testContainerLogsWithOldAPI() throws Exception {
final ContainerId containerId = BuilderUtils.newContainerId(1, 1, 0, 1);
final ContainerId containerId2 = BuilderUtils.newContainerId(1, 1, 0, 2);
WebResource r = resource();
r = r.path("ws").path("v1").path("node").path("containerlogs")
.path(containerId.toString());
testContainerLogs(r, containerId);
.path(containerId2.toString());
testContainerLogs(r, containerId2, LOG_MESSAGE);
}
@Test (timeout = 10000)
@ -583,15 +591,14 @@ public void testGetYarnGpuResourceInfo()
2, json.getJSONArray("assignedGpuDevices").length());
}
private void testContainerLogs(WebResource r, ContainerId containerId)
throws Exception {
private void testContainerLogs(WebResource r, ContainerId containerId,
String logMessage) throws Exception {
final String containerIdStr = containerId.toString();
final ApplicationAttemptId appAttemptId = containerId
.getApplicationAttemptId();
final ApplicationId appId = appAttemptId.getApplicationId();
final String appIdStr = appId.toString();
final String filename = "logfile1";
final String logMessage = "log message\n";
nmContext.getApplications().put(appId, new ApplicationImpl(null, "user",
appId, null, nmContext));
@ -607,6 +614,9 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
File logFile = new File(path.toUri().getPath());
logFile.deleteOnExit();
if (logFile.getParentFile().exists()) {
FileUtils.deleteDirectory(logFile.getParentFile());
}
assertTrue("Failed to create log dir", logFile.getParentFile().mkdirs());
PrintWriter pw = new PrintWriter(logFile);
pw.print(logMessage);
@ -628,8 +638,10 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
responseLogMessage = getLogContext(responseText);
assertEquals(5, responseLogMessage.getBytes().length);
assertEquals(new String(logMessage.getBytes(), 0, 5), responseLogMessage);
int truncatedLength = Math.min(5, logMessage.getBytes().length);
assertEquals(truncatedLength, responseLogMessage.getBytes().length);
assertEquals(new String(logMessage.getBytes(), 0, truncatedLength),
responseLogMessage);
assertTrue(fullTextSize >= responseLogMessage.getBytes().length);
// specify the bytes which is larger than the actual file size,
@ -649,9 +661,10 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
responseLogMessage = getLogContext(responseText);
assertEquals(5, responseLogMessage.getBytes().length);
assertEquals(truncatedLength, responseLogMessage.getBytes().length);
assertEquals(new String(logMessage.getBytes(),
logMessage.getBytes().length - 5, 5), responseLogMessage);
logMessage.getBytes().length - truncatedLength, truncatedLength),
responseLogMessage);
assertTrue(fullTextSize >= responseLogMessage.getBytes().length);
response = r.path(filename)