YARN-5251. Yarn CLI to obtain App logs for last 'n' bytes fails. Contributed by Xuan Gong.

This commit is contained in:
Junping Du 2016-06-17 08:24:24 -07:00
parent 09e82acaf9
commit c35fa4a0e5
6 changed files with 27 additions and 25 deletions

View File

@ -400,7 +400,7 @@ public ContainerReport getContainerReport(String containerIdStr)
String logMessage = "Hello container_0_0001_01_000003 in stdout!"; String logMessage = "Hello container_0_0001_01_000003 in stdout!";
int fileContentSize = logMessage.getBytes().length; int fileContentSize = logMessage.getBytes().length;
int tailContentSize = "End of LogType:syslog\n\n".getBytes().length; int tailContentSize = "\nEnd of LogType:syslog\n\n".getBytes().length;
// specify how many bytes we should get from logs // specify how many bytes we should get from logs
// specify a position number, it would get the first n bytes from // specify a position number, it would get the first n bytes from

View File

@ -787,20 +787,19 @@ private static void readContainerLogs(DataInputStream valueStream,
long toSkip = 0; long toSkip = 0;
long totalBytesToRead = fileLength; long totalBytesToRead = fileLength;
long skipAfterRead = 0;
if (bytes < 0) { if (bytes < 0) {
long absBytes = Math.abs(bytes); long absBytes = Math.abs(bytes);
if (absBytes < fileLength) { if (absBytes < fileLength) {
toSkip = fileLength - absBytes; toSkip = fileLength - absBytes;
totalBytesToRead = absBytes; totalBytesToRead = absBytes;
} }
long skippedBytes = valueStream.skip(toSkip); org.apache.hadoop.io.IOUtils.skipFully(
if (skippedBytes != toSkip) { valueStream, toSkip);
throw new IOException("The bytes were skipped are "
+ "different from the caller requested");
}
} else { } else {
if (bytes < fileLength) { if (bytes < fileLength) {
totalBytesToRead = bytes; totalBytesToRead = bytes;
skipAfterRead = fileLength - bytes;
} }
} }
@ -818,7 +817,9 @@ private static void readContainerLogs(DataInputStream valueStream,
pendingRead > buf.length ? buf.length : (int) pendingRead; pendingRead > buf.length ? buf.length : (int) pendingRead;
len = valueStream.read(buf, 0, toRead); len = valueStream.read(buf, 0, toRead);
} }
out.println("End of LogType:" + fileType); org.apache.hadoop.io.IOUtils.skipFully(
valueStream, skipAfterRead);
out.println("\nEnd of LogType:" + fileType);
out.println(""); out.println("");
} }
@ -913,20 +914,19 @@ public static int readContainerLogsForALogType(
long toSkip = 0; long toSkip = 0;
long totalBytesToRead = fileLength; long totalBytesToRead = fileLength;
long skipAfterRead = 0;
if (bytes < 0) { if (bytes < 0) {
long absBytes = Math.abs(bytes); long absBytes = Math.abs(bytes);
if (absBytes < fileLength) { if (absBytes < fileLength) {
toSkip = fileLength - absBytes; toSkip = fileLength - absBytes;
totalBytesToRead = absBytes; totalBytesToRead = absBytes;
} }
long skippedBytes = valueStream.skip(toSkip); org.apache.hadoop.io.IOUtils.skipFully(
if (skippedBytes != toSkip) { valueStream, toSkip);
throw new IOException("The bytes were skipped are "
+ "different from the caller requested");
}
} else { } else {
if (bytes < fileLength) { if (bytes < fileLength) {
totalBytesToRead = bytes; totalBytesToRead = bytes;
skipAfterRead = fileLength - bytes;
} }
} }
@ -942,7 +942,9 @@ public static int readContainerLogsForALogType(
toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; toRead = pendingRead > buf.length ? buf.length : (int) pendingRead;
len = valueStream.read(buf, 0, toRead); len = valueStream.read(buf, 0, toRead);
} }
out.println("End of LogType:" + fileType); org.apache.hadoop.io.IOUtils.skipFully(
valueStream, skipAfterRead);
out.println("\nEnd of LogType:" + fileType);
out.println(""); out.println("");
return 0; return 0;
} else { } else {

View File

@ -258,7 +258,7 @@ private void testReadAcontainerLog(boolean logUploadedTime) throws Exception {
.currentTimeMillis())).length() : 0) .currentTimeMillis())).length() : 0)
+ ("\nLogLength:" + numChars).length() + ("\nLogLength:" + numChars).length()
+ "\nLog Contents:\n".length() + numChars + "\n".length() + "\nLog Contents:\n".length() + numChars + "\n".length()
+ "End of LogType:stdout\n".length(); + "\nEnd of LogType:stdout\n".length();
Assert.assertTrue("LogType not matched", s.contains("LogType:stdout")); Assert.assertTrue("LogType not matched", s.contains("LogType:stdout"));
Assert.assertTrue("log file:stderr should not be aggregated.", !s.contains("LogType:stderr")); Assert.assertTrue("log file:stderr should not be aggregated.", !s.contains("LogType:stderr"));
Assert.assertTrue("log file:logs should not be aggregated.", !s.contains("LogType:logs")); Assert.assertTrue("log file:logs should not be aggregated.", !s.contains("LogType:logs"));

View File

@ -404,20 +404,19 @@ public void write(OutputStream os) throws IOException,
long toSkip = 0; long toSkip = 0;
long totalBytesToRead = fileLength; long totalBytesToRead = fileLength;
long skipAfterRead = 0;
if (bytes < 0) { if (bytes < 0) {
long absBytes = Math.abs(bytes); long absBytes = Math.abs(bytes);
if (absBytes < fileLength) { if (absBytes < fileLength) {
toSkip = fileLength - absBytes; toSkip = fileLength - absBytes;
totalBytesToRead = absBytes; totalBytesToRead = absBytes;
} }
long skippedBytes = valueStream.skip(toSkip); org.apache.hadoop.io.IOUtils.skipFully(
if (skippedBytes != toSkip) { valueStream, toSkip);
throw new IOException("The bytes were skipped are "
+ "different from the caller requested");
}
} else { } else {
if (bytes < fileLength) { if (bytes < fileLength) {
totalBytesToRead = bytes; totalBytesToRead = bytes;
skipAfterRead = fileLength - bytes;
} }
} }
@ -435,6 +434,8 @@ public void write(OutputStream os) throws IOException,
: (int) pendingRead; : (int) pendingRead;
len = valueStream.read(buf, 0, toRead); len = valueStream.read(buf, 0, toRead);
} }
org.apache.hadoop.io.IOUtils.skipFully(
valueStream, skipAfterRead);
sb = new StringBuilder(); sb = new StringBuilder();
sb.append("\nEnd of LogType:" + fileType + "\n"); sb.append("\nEnd of LogType:" + fileType + "\n");
b = sb.toString().getBytes(Charset.forName("UTF-8")); b = sb.toString().getBytes(Charset.forName("UTF-8"));

View File

@ -264,20 +264,18 @@ public void write(OutputStream os) throws IOException,
byte[] buf = new byte[bufferSize]; byte[] buf = new byte[bufferSize];
long toSkip = 0; long toSkip = 0;
long totalBytesToRead = fileLength; long totalBytesToRead = fileLength;
long skipAfterRead = 0;
if (bytes < 0) { if (bytes < 0) {
long absBytes = Math.abs(bytes); long absBytes = Math.abs(bytes);
if (absBytes < fileLength) { if (absBytes < fileLength) {
toSkip = fileLength - absBytes; toSkip = fileLength - absBytes;
totalBytesToRead = absBytes; totalBytesToRead = absBytes;
} }
long skippedBytes = fis.skip(toSkip); org.apache.hadoop.io.IOUtils.skipFully(fis, toSkip);
if (skippedBytes != toSkip) {
throw new IOException("The bytes were skipped are different "
+ "from the caller requested");
}
} else { } else {
if (bytes < fileLength) { if (bytes < fileLength) {
totalBytesToRead = bytes; totalBytesToRead = bytes;
skipAfterRead = fileLength - bytes;
} }
} }
@ -295,6 +293,7 @@ public void write(OutputStream os) throws IOException,
: (int) pendingRead; : (int) pendingRead;
len = fis.read(buf, 0, toRead); len = fis.read(buf, 0, toRead);
} }
org.apache.hadoop.io.IOUtils.skipFully(fis, skipAfterRead);
os.flush(); os.flush();
} finally { } finally {
IOUtils.closeQuietly(fis); IOUtils.closeQuietly(fis);

View File

@ -948,7 +948,7 @@ private LogFileStatusInLastCycle verifyContainerLogs(
Assert.assertEquals(numOfLogsPerContainer, thisContainerMap.size()); Assert.assertEquals(numOfLogsPerContainer, thisContainerMap.size());
for (String fileType : logFiles) { for (String fileType : logFiles) {
String expectedValue = String expectedValue =
containerStr + " Hello " + fileType + "!End of LogType:" containerStr + " Hello " + fileType + "!\nEnd of LogType:"
+ fileType; + fileType;
LOG.info("Expected log-content : " + new String(expectedValue)); LOG.info("Expected log-content : " + new String(expectedValue));
String foundValue = thisContainerMap.remove(fileType); String foundValue = thisContainerMap.remove(fileType);