From c24af4b0d6fc32938b076161b5a8c86d38e3e0a1 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Tue, 12 Mar 2019 20:57:27 +0530 Subject: [PATCH] YARN-9336. JobHistoryServer leaks CLOSE_WAIT tcp connections when using LogAggregationIndexedFileController. Contributed by Tarun Parimi. --- .../ifile/IndexedFileAggregatedLogsBlock.java | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java index eb9634bc31..c49d3726be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java @@ -202,38 +202,38 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock { Decompressor decompressor = compressName.getDecompressor(); FileContext fileContext = FileContext.getFileContext( thisNodeFile.getPath().toUri(), conf); - FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath()); - int bufferSize = 65536; - for (IndexedFileLogMeta candidate : candidates) { - if (candidate.getLastModifiedTime() < startTime - || candidate.getLastModifiedTime() > endTime) { - continue; - } - byte[] cbuf = new byte[bufferSize]; - InputStream in = null; - try { - in = compressName.createDecompressionStream( - new BoundedRangeFileInputStream(fsin, candidate.getStartIndex(), - candidate.getFileCompressedSize()), decompressor, - LogAggregationIndexedFileController.getFSInputBufferSize(conf)); - long logLength = candidate.getFileSize(); - html.pre().__("\n\n").__(); - html.p().__("Log Type: " + candidate.getFileName()).__(); - html.p().__( - "Log Upload Time: " + Times.format(candidate.getLastModifiedTime())) - .__(); - html.p().__("Log Length: " + Long.toString(logLength)).__(); + try (FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath())) { + int bufferSize = 65536; + for (IndexedFileLogMeta candidate : candidates) { + if (candidate.getLastModifiedTime() < startTime + || candidate.getLastModifiedTime() > endTime) { + continue; + } + byte[] cbuf = new byte[bufferSize]; + InputStream in = null; + try { + in = compressName.createDecompressionStream( + new BoundedRangeFileInputStream(fsin, candidate.getStartIndex(), + candidate.getFileCompressedSize()), decompressor, + LogAggregationIndexedFileController.getFSInputBufferSize(conf)); + long logLength = candidate.getFileSize(); + html.pre().__("\n\n").__(); + html.p().__("Log Type: " + candidate.getFileName()).__(); + html.p().__("Log Upload Time: " + + Times.format(candidate.getLastModifiedTime())).__(); + html.p().__("Log Length: " + Long.toString(logLength)).__(); - long[] range = checkParseRange(html, start, end, startTime, endTime, - logLength, candidate.getFileName()); - processContainerLog(html, range, in, bufferSize, cbuf); + long[] range = checkParseRange(html, start, end, startTime, endTime, + logLength, candidate.getFileName()); + processContainerLog(html, range, in, bufferSize, cbuf); - foundLog = true; - } catch (Exception ex) { - LOG.error("Error getting logs for " + logEntity, ex); - continue; - } finally { - IOUtils.closeQuietly(in); + foundLog = true; + } catch (Exception ex) { + LOG.error("Error getting logs for " + logEntity, ex); + continue; + } finally { + IOUtils.closeQuietly(in); + } } } return foundLog;