diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java index 2127006ec0..bbe636fdc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java @@ -88,6 +88,7 @@ public class LogsCLI extends Configured implements Tool { private static final String SHOW_META_INFO = "show_meta_info"; private static final String LIST_NODES_OPTION = "list_nodes"; private static final String OUT_OPTION = "out"; + private static final String SIZE_OPTION = "size"; public static final String HELP_CMD = "help"; @Override @@ -113,6 +114,7 @@ public int run(String[] args) throws Exception { String[] logFiles = null; List amContainersList = new ArrayList(); String localDir = null; + long bytes = Long.MAX_VALUE; try { CommandLine commandLine = parser.parse(opts, args, true); appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION); @@ -134,6 +136,9 @@ public int run(String[] args) throws Exception { if (commandLine.hasOption(CONTAINER_LOG_FILES)) { logFiles = commandLine.getOptionValues(CONTAINER_LOG_FILES); } + if (commandLine.hasOption(SIZE_OPTION)) { + bytes = Long.parseLong(commandLine.getOptionValue(SIZE_OPTION)); + } } catch (ParseException e) { System.err.println("options parsing failed: " + e.getMessage()); printHelpMessage(printOpts); @@ -195,7 +200,7 @@ public int run(String[] args) throws Exception { ContainerLogsRequest request = new ContainerLogsRequest(appId, isApplicationFinished(appState), appOwner, nodeAddress, null, - containerIdStr, localDir, logs); + containerIdStr, localDir, logs, bytes); if (showMetaInfo) { return showMetaInfo(request, logCliHelper); @@ -402,6 +407,7 @@ public void printContainerLogsFromRunningApplication(Configuration conf, ClientResponse response = webResource.path("ws").path("v1").path("node") .path("containerlogs").path(containerIdStr).path(logFile) + .queryParam("size", Long.toString(request.getBytes())) .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); out.println(response.getEntity(String.class)); out.println("End of LogType:" + logFile); @@ -442,7 +448,9 @@ private int printContainerLogsForFinishedApplicationWithoutNodeId( newOptions); } - private ContainerReport getContainerReport(String containerIdStr) + @Private + @VisibleForTesting + public ContainerReport getContainerReport(String containerIdStr) throws YarnException, IOException { YarnClient yarnClient = createYarnClient(); try { @@ -636,12 +644,16 @@ private Options createCommandOpts() { opts.addOption(OUT_OPTION, true, "Local directory for storing individual " + "container logs. The container logs will be stored based on the " + "node the container ran on."); + opts.addOption(SIZE_OPTION, true, "Prints the log file's first 'n' bytes " + + "or the last 'n' bytes. Use negative values as bytes to read from " + + "the end and positive values as bytes to read from the beginning."); opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID"); opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID"); opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address"); opts.getOption(APP_OWNER_OPTION).setArgName("Application Owner"); opts.getOption(AM_CONTAINER_OPTION).setArgName("AM Containers"); opts.getOption(OUT_OPTION).setArgName("Local Directory"); + opts.getOption(SIZE_OPTION).setArgName("size"); return opts; } @@ -656,6 +668,7 @@ private Options createPrintOpts(Options commandOpts) { printOpts.addOption(commandOpts.getOption(SHOW_META_INFO)); printOpts.addOption(commandOpts.getOption(LIST_NODES_OPTION)); printOpts.addOption(commandOpts.getOption(OUT_OPTION)); + printOpts.addOption(commandOpts.getOption(SIZE_OPTION)); return printOpts; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 2639a0a8af..db40b50e14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -203,6 +203,11 @@ public void testHelpMessage() throws Exception { pw.println(" for all the containers on the specific"); pw.println(" NodeManager. Currently, this option can"); pw.println(" only be used for finished applications."); + pw.println(" -size Prints the log file's first 'n' bytes or"); + pw.println(" the last 'n' bytes. Use negative values"); + pw.println(" as bytes to read from the end and"); + pw.println(" positive values as bytes to read from the"); + pw.println(" beginning."); pw.close(); String appReportStr = baos.toString("UTF-8"); Assert.assertEquals(appReportStr, sysOutStream.toString()); @@ -227,7 +232,7 @@ public void testFetchApplictionLogs() throws Exception { ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2); ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3); - NodeId nodeId = NodeId.newInstance("localhost", 1234); + final NodeId nodeId = NodeId.newInstance("localhost", 1234); // create local logs String rootLogDir = "target/LocalLogs"; @@ -281,7 +286,16 @@ public void testFetchApplictionLogs() throws Exception { YarnClient mockYarnClient = createMockYarnClient( YarnApplicationState.FINISHED, ugi.getShortUserName()); - LogsCLI cli = new LogsCLIForTest(mockYarnClient); + LogsCLI cli = new LogsCLIForTest(mockYarnClient) { + @Override + public ContainerReport getContainerReport(String containerIdStr) + throws YarnException, IOException { + ContainerReport mockReport = mock(ContainerReport.class); + doReturn(nodeId).when(mockReport).getAssignedNode(); + doReturn("http://localhost:2345").when(mockReport).getNodeHttpAddress(); + return mockReport; + } + }; cli.setConf(configuration); int exitCode = cli.run(new String[] { "-applicationId", appId.toString() }); @@ -307,6 +321,7 @@ public void testFetchApplictionLogs() throws Exception { "Hello container_0_0001_01_000003 in syslog!")); assertTrue(sysOutStream.toString().contains( "Hello container_0_0001_01_000003 in stdout!")); + int fullSize = sysOutStream.toByteArray().length; sysOutStream.reset(); exitCode = cli.run(new String[] {"-applicationId", appId.toString(), @@ -329,6 +344,14 @@ public void testFetchApplictionLogs() throws Exception { "Can not find any log file matching the pattern: [123]")); sysErrStream.reset(); + // specify the bytes which is larger than the actual file size, + // we would get the full logs + exitCode = cli.run(new String[] {"-applicationId", appId.toString(), + "-logFiles", ".*", "-size", "10000" }); + assertTrue(exitCode == 0); + assertTrue(sysOutStream.toByteArray().length == fullSize); + sysOutStream.reset(); + // uploaded two logs for container1. The first log is empty. // The second one is not empty. // We can still successfully read logs for container1. @@ -345,6 +368,49 @@ public void testFetchApplictionLogs() throws Exception { + " are not present in this log-file.")); sysOutStream.reset(); + exitCode = cli.run(new String[] {"-applicationId", appId.toString(), + "-containerId", containerId3.toString(), "-logFiles", "stdout" }); + assertTrue(exitCode == 0); + int fullContextSize = sysOutStream.toByteArray().length; + String fullContext = sysOutStream.toString(); + sysOutStream.reset(); + + String logMessage = "Hello container_0_0001_01_000003 in stdout!"; + int fileContentSize = logMessage.getBytes().length; + int tailContentSize = "End of LogType:syslog\n\n".getBytes().length; + + // specify how many bytes we should get from logs + // specify a position number, it would get the first n bytes from + // container log + exitCode = cli.run(new String[] {"-applicationId", appId.toString(), + "-containerId", containerId3.toString(), "-logFiles", "stdout", + "-size", "5"}); + assertTrue(exitCode == 0); + Assert.assertEquals(new String(logMessage.getBytes(), 0, 5), + new String(sysOutStream.toByteArray(), + (fullContextSize - fileContentSize - tailContentSize), 5)); + sysOutStream.reset(); + + // specify a negative number, it would get the last n bytes from + // container log + exitCode = cli.run(new String[] {"-applicationId", appId.toString(), + "-containerId", containerId3.toString(), "-logFiles", "stdout", + "-size", "-5"}); + assertTrue(exitCode == 0); + Assert.assertEquals(new String(logMessage.getBytes(), + logMessage.getBytes().length - 5, 5), + new String(sysOutStream.toByteArray(), + (fullContextSize - fileContentSize - tailContentSize), 5)); + sysOutStream.reset(); + + long negative = (fullContextSize + 1000) * (-1); + exitCode = cli.run(new String[] {"-applicationId", appId.toString(), + "-containerId", containerId3.toString(), "-logFiles", "stdout", + "-size", Long.toString(negative)}); + assertTrue(exitCode == 0); + Assert.assertEquals(fullContext, sysOutStream.toString()); + sysOutStream.reset(); + // Uploaded the empty log for container0. // We should see the message showing the log for container0 // are not present. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index d636200975..98ffce16f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -733,7 +733,7 @@ public static void readAcontainerLogs(DataInputStream valueStream, ps = new PrintStream(os); while (true) { try { - readContainerLogs(valueStream, ps, logUploadedTime); + readContainerLogs(valueStream, ps, logUploadedTime, Long.MAX_VALUE); } catch (EOFException e) { // EndOfFile return; @@ -757,7 +757,8 @@ public static void readAcontainerLogs(DataInputStream valueStream, } private static void readContainerLogs(DataInputStream valueStream, - PrintStream out, long logUploadedTime) throws IOException { + PrintStream out, long logUploadedTime, long bytes) + throws IOException { byte[] buf = new byte[65535]; String fileType = valueStream.readUTF(); @@ -773,16 +774,35 @@ private static void readContainerLogs(DataInputStream valueStream, out.println(fileLengthStr); out.println("Log Contents:"); + long toSkip = 0; + long totalBytesToRead = fileLength; + if (bytes < 0) { + long absBytes = Math.abs(bytes); + if (absBytes < fileLength) { + toSkip = fileLength - absBytes; + totalBytesToRead = absBytes; + } + long skippedBytes = valueStream.skip(toSkip); + if (skippedBytes != toSkip) { + throw new IOException("The bytes were skipped are " + + "different from the caller requested"); + } + } else { + if (bytes < fileLength) { + totalBytesToRead = bytes; + } + } + long curRead = 0; - long pendingRead = fileLength - curRead; + long pendingRead = totalBytesToRead - curRead; int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; int len = valueStream.read(buf, 0, toRead); - while (len != -1 && curRead < fileLength) { + while (len != -1 && curRead < totalBytesToRead) { out.write(buf, 0, len); curRead += len; - pendingRead = fileLength - curRead; + pendingRead = totalBytesToRead - curRead; toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; len = valueStream.read(buf, 0, toRead); @@ -803,7 +823,23 @@ private static void readContainerLogs(DataInputStream valueStream, public static void readAContainerLogsForALogType( DataInputStream valueStream, PrintStream out, long logUploadedTime) throws IOException { - readContainerLogs(valueStream, out, logUploadedTime); + readContainerLogs(valueStream, out, logUploadedTime, Long.MAX_VALUE); + } + + /** + * Keep calling this till you get a {@link EOFException} for getting logs of + * all types for a single container for the specific bytes. + * + * @param valueStream + * @param out + * @param logUploadedTime + * @param bytes + * @throws IOException + */ + public static void readAContainerLogsForALogType( + DataInputStream valueStream, PrintStream out, long logUploadedTime, + long bytes) throws IOException { + readContainerLogs(valueStream, out, logUploadedTime, bytes); } /** @@ -832,6 +868,22 @@ public static void readAContainerLogsForALogType( public static int readContainerLogsForALogType( DataInputStream valueStream, PrintStream out, long logUploadedTime, List logType) throws IOException { + return readContainerLogsForALogType(valueStream, out, logUploadedTime, + logType, Long.MAX_VALUE); + } + + /** + * Keep calling this till you get a {@link EOFException} for getting logs of + * the specific types for a single container. + * @param valueStream + * @param out + * @param logUploadedTime + * @param logType + * @throws IOException + */ + public static int readContainerLogsForALogType( + DataInputStream valueStream, PrintStream out, long logUploadedTime, + List logType, long bytes) throws IOException { byte[] buf = new byte[65535]; String fileType = valueStream.readUTF(); @@ -848,15 +900,34 @@ public static int readContainerLogsForALogType( out.println(fileLengthStr); out.println("Log Contents:"); + long toSkip = 0; + long totalBytesToRead = fileLength; + if (bytes < 0) { + long absBytes = Math.abs(bytes); + if (absBytes < fileLength) { + toSkip = fileLength - absBytes; + totalBytesToRead = absBytes; + } + long skippedBytes = valueStream.skip(toSkip); + if (skippedBytes != toSkip) { + throw new IOException("The bytes were skipped are " + + "different from the caller requested"); + } + } else { + if (bytes < fileLength) { + totalBytesToRead = bytes; + } + } + long curRead = 0; - long pendingRead = fileLength - curRead; + long pendingRead = totalBytesToRead - curRead; int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; int len = valueStream.read(buf, 0, toRead); - while (len != -1 && curRead < fileLength) { + while (len != -1 && curRead < totalBytesToRead) { out.write(buf, 0, len); curRead += len; - pendingRead = fileLength - curRead; + pendingRead = totalBytesToRead - curRead; toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; len = valueStream.read(buf, 0, toRead); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRequest.java index b0a7fdc3ec..f32285cc96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRequest.java @@ -30,6 +30,7 @@ public class ContainerLogsRequest { private boolean appFinished; private String outputLocalDir; private List logTypes; + private long bytes; public ContainerLogsRequest() {} @@ -42,12 +43,13 @@ public ContainerLogsRequest(ContainerLogsRequest request) { this.setContainerId(request.getContainerId()); this.setOutputLocalDir(request.getOutputLocalDir()); this.setLogTypes(request.getLogTypes()); + this.setBytes(request.getBytes()); } public ContainerLogsRequest(ApplicationId applicationId, boolean isAppFinished, String owner, String address, String httpAddress, String container, String localDir, - List logs) { + List logs, long bytes) { this.setAppId(applicationId); this.setAppFinished(isAppFinished); this.setAppOwner(owner); @@ -56,6 +58,7 @@ public ContainerLogsRequest(ApplicationId applicationId, this.setContainerId(container); this.setOutputLocalDir(localDir); this.setLogTypes(logs); + this.setBytes(bytes); } public ApplicationId getAppId() { @@ -121,4 +124,12 @@ public List getLogTypes() { public void setLogTypes(List logTypes) { this.logTypes = logTypes; } + + public long getBytes() { + return bytes; + } + + public void setBytes(long bytes) { + this.bytes = bytes; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java index 22147aed55..3811054a45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java @@ -65,6 +65,7 @@ public int dumpAContainersLogs(String appId, String containerId, options.setAppOwner(jobOwner); List logs = new ArrayList(); options.setLogTypes(logs); + options.setBytes(Long.MAX_VALUE); return dumpAContainersLogsForALogType(options, false); } @@ -160,12 +161,13 @@ public int dumpAContainersLogsForALogType(ContainerLogsRequest options, thisNodeFile.getPath()); if (logType == null || logType.isEmpty()) { if (dumpAContainerLogs(containerId, reader, out, - thisNodeFile.getModificationTime()) > -1) { + thisNodeFile.getModificationTime(), options.getBytes()) > -1) { foundContainerLogs = true; } } else { if (dumpAContainerLogsForALogType(containerId, reader, out, - thisNodeFile.getModificationTime(), logType) > -1) { + thisNodeFile.getModificationTime(), logType, + options.getBytes()) > -1) { foundContainerLogs = true; } } @@ -222,12 +224,13 @@ public int dumpAContainersLogsForALogTypeWithoutNodeId( out.println(StringUtils.repeat("=", containerId.length())); if (logType == null || logType.isEmpty()) { if (dumpAContainerLogs(containerId, reader, out, - thisNodeFile.getModificationTime()) > -1) { + thisNodeFile.getModificationTime(), options.getBytes()) > -1) { foundContainerLogs = true; } } else { if (dumpAContainerLogsForALogType(containerId, reader, out, - thisNodeFile.getModificationTime(), logType) > -1) { + thisNodeFile.getModificationTime(), logType, + options.getBytes()) > -1) { foundContainerLogs = true; } } @@ -249,7 +252,7 @@ public int dumpAContainersLogsForALogTypeWithoutNodeId( @Private public int dumpAContainerLogs(String containerIdStr, AggregatedLogFormat.LogReader reader, PrintStream out, - long logUploadedTime) throws IOException { + long logUploadedTime, long bytes) throws IOException { DataInputStream valueStream = getContainerLogsStream( containerIdStr, reader); @@ -261,7 +264,7 @@ public int dumpAContainerLogs(String containerIdStr, while (true) { try { LogReader.readAContainerLogsForALogType(valueStream, out, - logUploadedTime); + logUploadedTime, bytes); foundContainerLogs = true; } catch (EOFException eof) { break; @@ -290,7 +293,8 @@ private DataInputStream getContainerLogsStream(String containerIdStr, @Private public int dumpAContainerLogsForALogType(String containerIdStr, AggregatedLogFormat.LogReader reader, PrintStream out, - long logUploadedTime, List logType) throws IOException { + long logUploadedTime, List logType, long bytes) + throws IOException { DataInputStream valueStream = getContainerLogsStream( containerIdStr, reader); if (valueStream == null) { @@ -301,7 +305,7 @@ public int dumpAContainerLogsForALogType(String containerIdStr, while (true) { try { int result = LogReader.readContainerLogsForALogType( - valueStream, out, logUploadedTime, logType); + valueStream, out, logUploadedTime, logType, bytes); if (result == 0) { foundContainerLogs = true; } @@ -361,12 +365,13 @@ public int dumpAllContainersLogs(ContainerLogsRequest options) try { if (logTypes == null || logTypes.isEmpty()) { LogReader.readAContainerLogsForALogType(valueStream, out, - thisNodeFile.getModificationTime()); + thisNodeFile.getModificationTime(), + options.getBytes()); foundAnyLogs = true; } else { int result = LogReader.readContainerLogsForALogType( valueStream, out, thisNodeFile.getModificationTime(), - logTypes); + logTypes, options.getBytes()); if (result == 0) { foundAnyLogs = true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java index e422c35372..d91ae5506d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java @@ -213,7 +213,8 @@ public Response getLogs(@Context HttpServletRequest req, @Context HttpServletResponse res, @PathParam("containerid") String containerIdStr, @PathParam("filename") String filename, - @QueryParam("download") String download) { + @QueryParam("download") String download, + @QueryParam("size") String size) { init(res); ContainerId containerId; try { @@ -225,6 +226,9 @@ public Response getLogs(@Context HttpServletRequest req, boolean downloadFile = parseBooleanParam(download); + + final long length = parseLongParam(size); + ApplicationId appId = containerId.getApplicationAttemptId() .getApplicationId(); AppInfo appInfo; @@ -233,7 +237,7 @@ public Response getLogs(@Context HttpServletRequest req, } catch (Exception ex) { // directly find logs from HDFS. return sendStreamOutputResponse(appId, null, null, containerIdStr, - filename, downloadFile); + filename, downloadFile, length); } String appOwner = appInfo.getUser(); @@ -247,7 +251,7 @@ public Response getLogs(@Context HttpServletRequest req, if (isFinishedState(appInfo.getAppState())) { // directly find logs from HDFS. return sendStreamOutputResponse(appId, appOwner, null, containerIdStr, - filename, downloadFile); + filename, downloadFile, length); } return createBadResponse(Status.INTERNAL_SERVER_ERROR, "Can not get ContainerInfo for the container: " + containerId); @@ -267,7 +271,7 @@ public Response getLogs(@Context HttpServletRequest req, return response.build(); } else if (isFinishedState(appInfo.getAppState())) { return sendStreamOutputResponse(appId, appOwner, nodeId, - containerIdStr, filename, downloadFile); + containerIdStr, filename, downloadFile, length); } else { return createBadResponse(Status.NOT_FOUND, "The application is not at Running or Finished State."); @@ -296,11 +300,11 @@ private boolean parseBooleanParam(String param) { private Response sendStreamOutputResponse(ApplicationId appId, String appOwner, String nodeId, String containerIdStr, - String fileName, boolean downloadFile) { + String fileName, boolean downloadFile, long bytes) { StreamingOutput stream = null; try { stream = getStreamingOutput(appId, appOwner, nodeId, - containerIdStr, fileName); + containerIdStr, fileName, bytes); } catch (Exception ex) { return createBadResponse(Status.INTERNAL_SERVER_ERROR, ex.getMessage()); @@ -318,7 +322,7 @@ private Response sendStreamOutputResponse(ApplicationId appId, private StreamingOutput getStreamingOutput(ApplicationId appId, String appOwner, final String nodeId, final String containerIdStr, - final String logFile) throws IOException{ + final String logFile, final long bytes) throws IOException{ String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); org.apache.hadoop.fs.Path remoteRootLogDir = new org.apache.hadoop.fs.Path( conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, @@ -391,16 +395,35 @@ public void write(OutputStream os) throws IOException, byte[] b = sb.toString().getBytes(Charset.forName("UTF-8")); os.write(b, 0, b.length); + long toSkip = 0; + long totalBytesToRead = fileLength; + if (bytes < 0) { + long absBytes = Math.abs(bytes); + if (absBytes < fileLength) { + toSkip = fileLength - absBytes; + totalBytesToRead = absBytes; + } + long skippedBytes = valueStream.skip(toSkip); + if (skippedBytes != toSkip) { + throw new IOException("The bytes were skipped are " + + "different from the caller requested"); + } + } else { + if (bytes < fileLength) { + totalBytesToRead = bytes; + } + } + long curRead = 0; - long pendingRead = fileLength - curRead; + long pendingRead = totalBytesToRead - curRead; int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; int len = valueStream.read(buf, 0, toRead); - while (len != -1 && curRead < fileLength) { + while (len != -1 && curRead < totalBytesToRead) { os.write(buf, 0, len); curRead += len; - pendingRead = fileLength - curRead; + pendingRead = totalBytesToRead - curRead; toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; len = valueStream.read(buf, 0, toRead); @@ -433,4 +456,11 @@ public void write(OutputStream os) throws IOException, }; return stream; } + + private long parseLongParam(String bytes) { + if (bytes == null || bytes.isEmpty()) { + return Long.MAX_VALUE; + } + return Long.parseLong(bytes); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java index f985fe4cb8..71b02757c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java @@ -601,6 +601,72 @@ public void testContainerLogsForFinishedApps() throws Exception { .get(ClientResponse.class); responseText = response.getEntity(String.class); assertTrue(responseText.contains("Hello." + containerId1ForApp100)); + int fullTextSize = responseText.getBytes().length; + int tailTextSize = "\nEnd of LogType:syslog\n".getBytes().length; + + String logMessage = "Hello." + containerId1ForApp100; + int fileContentSize = logMessage.getBytes().length; + // specify how many bytes we should get from logs + // if we specify a position number, it would get the first n bytes from + // container log + r = resource(); + response = r.path("ws").path("v1") + .path("applicationhistory").path("containerlogs") + .path(containerId1ForApp100.toString()).path(fileName) + .queryParam("user.name", user) + .queryParam("size", "5") + .accept(MediaType.TEXT_PLAIN) + .get(ClientResponse.class); + responseText = response.getEntity(String.class); + assertEquals(responseText.getBytes().length, + (fullTextSize - fileContentSize) + 5); + assertTrue(fullTextSize >= responseText.getBytes().length); + assertEquals(new String(responseText.getBytes(), + (fullTextSize - fileContentSize - tailTextSize), 5), + new String(logMessage.getBytes(), 0, 5)); + + // specify how many bytes we should get from logs + // if we specify a negative number, it would get the last n bytes from + // container log + r = resource(); + response = r.path("ws").path("v1") + .path("applicationhistory").path("containerlogs") + .path(containerId1ForApp100.toString()).path(fileName) + .queryParam("user.name", user) + .queryParam("size", "-5") + .accept(MediaType.TEXT_PLAIN) + .get(ClientResponse.class); + responseText = response.getEntity(String.class); + assertEquals(responseText.getBytes().length, + (fullTextSize - fileContentSize) + 5); + assertTrue(fullTextSize >= responseText.getBytes().length); + assertEquals(new String(responseText.getBytes(), + (fullTextSize - fileContentSize - tailTextSize), 5), + new String(logMessage.getBytes(), fileContentSize - 5, 5)); + + // specify the bytes which is larger than the actual file size, + // we would get the full logs + r = resource(); + response = r.path("ws").path("v1") + .path("applicationhistory").path("containerlogs") + .path(containerId1ForApp100.toString()).path(fileName) + .queryParam("user.name", user) + .queryParam("size", "10000") + .accept(MediaType.TEXT_PLAIN) + .get(ClientResponse.class); + responseText = response.getEntity(String.class); + assertEquals(responseText.getBytes().length, fullTextSize); + + r = resource(); + response = r.path("ws").path("v1") + .path("applicationhistory").path("containerlogs") + .path(containerId1ForApp100.toString()).path(fileName) + .queryParam("user.name", user) + .queryParam("size", "-10000") + .accept(MediaType.TEXT_PLAIN) + .get(ClientResponse.class); + responseText = response.getEntity(String.class); + assertEquals(responseText.getBytes().length, fullTextSize); } private static void createContainerLogInLocalDir(Path appLogsDir, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java index 5c66511c5e..e13baa7af6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java @@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.webapp.NotFoundException; import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; - import com.google.inject.Inject; import com.google.inject.Singleton; @@ -217,7 +216,8 @@ public ContainerInfo getNodeContainer(@javax.ws.rs.core.Context @Unstable public Response getLogs(@PathParam("containerid") String containerIdStr, @PathParam("filename") String filename, - @QueryParam("download") String download) { + @QueryParam("download") String download, + @QueryParam("size") String size) { ContainerId containerId; try { containerId = ConverterUtils.toContainerId(containerIdStr); @@ -235,19 +235,51 @@ public Response getLogs(@PathParam("containerid") String containerIdStr, return Response.serverError().entity(ex.getMessage()).build(); } boolean downloadFile = parseBooleanParam(download); + final long bytes = parseLongParam(size); + try { final FileInputStream fis = ContainerLogsUtils.openLogFileForRead( containerIdStr, logFile, nmContext); - + final long fileLength = logFile.length(); + StreamingOutput stream = new StreamingOutput() { @Override public void write(OutputStream os) throws IOException, WebApplicationException { int bufferSize = 65536; byte[] buf = new byte[bufferSize]; - int len; - while ((len = fis.read(buf, 0, bufferSize)) > 0) { + long toSkip = 0; + long totalBytesToRead = fileLength; + if (bytes < 0) { + long absBytes = Math.abs(bytes); + if (absBytes < fileLength) { + toSkip = fileLength - absBytes; + totalBytesToRead = absBytes; + } + long skippedBytes = fis.skip(toSkip); + if (skippedBytes != toSkip) { + throw new IOException("The bytes were skipped are different " + + "from the caller requested"); + } + } else { + if (bytes < fileLength) { + totalBytesToRead = bytes; + } + } + + long curRead = 0; + long pendingRead = totalBytesToRead - curRead; + int toRead = pendingRead > buf.length ? buf.length + : (int) pendingRead; + int len = fis.read(buf, 0, toRead); + while (len != -1 && curRead < totalBytesToRead) { os.write(buf, 0, len); + curRead += len; + + pendingRead = totalBytesToRead - curRead; + toRead = pendingRead > buf.length ? buf.length + : (int) pendingRead; + len = fis.read(buf, 0, toRead); } os.flush(); } @@ -268,4 +300,11 @@ private boolean parseBooleanParam(String param) { } return false; } + + private long parseLongParam(String bytes) { + if (bytes == null || bytes.isEmpty()) { + return Long.MAX_VALUE; + } + return Long.parseLong(bytes); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index ce1b309b36..4e2feee0d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -26,17 +26,14 @@ import java.io.IOException; import java.io.PrintWriter; import java.io.StringReader; - import javax.ws.rs.core.MediaType; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.NodeHealthScriptRunner; import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -313,7 +310,7 @@ public void testSingleNodesXML() throws JSONException, Exception { assertEquals("incorrect number of elements", 1, nodes.getLength()); verifyNodesXML(nodes); } - + @Test public void testContainerLogs() throws IOException { WebResource r = resource(); @@ -351,6 +348,49 @@ public void testContainerLogs() throws IOException { .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); String responseText = response.getEntity(String.class); assertEquals(logMessage, responseText); + int fullTextSize = responseText.getBytes().length; + + // specify how many bytes we should get from logs + // specify a position number, it would get the first n bytes from + // container log + response = r.path("ws").path("v1").path("node") + .path("containerlogs").path(containerIdStr).path(filename) + .queryParam("size", "5") + .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); + responseText = response.getEntity(String.class); + assertEquals(5, responseText.getBytes().length); + assertEquals(new String(logMessage.getBytes(), 0, 5), responseText); + assertTrue(fullTextSize >= responseText.getBytes().length); + + // specify the bytes which is larger than the actual file size, + // we would get the full logs + response = r.path("ws").path("v1").path("node") + .path("containerlogs").path(containerIdStr).path(filename) + .queryParam("size", "10000") + .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); + responseText = response.getEntity(String.class); + assertEquals(fullTextSize, responseText.getBytes().length); + assertEquals(logMessage, responseText); + + // specify a negative number, it would get the last n bytes from + // container log + response = r.path("ws").path("v1").path("node") + .path("containerlogs").path(containerIdStr).path(filename) + .queryParam("size", "-5") + .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); + responseText = response.getEntity(String.class); + assertEquals(5, responseText.getBytes().length); + assertEquals(new String(logMessage.getBytes(), + logMessage.getBytes().length - 5, 5), responseText); + assertTrue(fullTextSize >= responseText.getBytes().length); + + response = r.path("ws").path("v1").path("node") + .path("containerlogs").path(containerIdStr).path(filename) + .queryParam("size", "-10000") + .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); + responseText = response.getEntity(String.class); + assertEquals(fullTextSize, responseText.getBytes().length); + assertEquals(logMessage, responseText); // ask and download it response = r.path("ws").path("v1").path("node").path("containerlogs")