From 1b081ca27e05e97d8b7d284ca24200d43763e481 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 6 Apr 2017 16:24:36 -0500 Subject: [PATCH] YARN-6288. Exceptions during aggregated log writes are mishandled. Contributed by Akira Ajisaka --- .../hadoop/yarn/client/cli/TestLogsCLI.java | 54 ++++---- .../logaggregation/AggregatedLogFormat.java | 30 +++-- .../TestAggregatedLogFormat.java | 115 +++++++++--------- .../TestAggregatedLogsBlock.java | 21 ++-- .../TestContainerLogsUtils.java | 15 +-- .../logaggregation/AppLogAggregatorImpl.java | 70 ++++++----- .../TestAppLogAggregatorImpl.java | 8 +- 7 files changed, 161 insertions(+), 152 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 05993d5dd9..37c859cf5b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -1345,18 +1345,18 @@ private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi, Path path = new Path(appDir, LogAggregationUtils.getNodeString(nodeId) + System.currentTimeMillis()); - AggregatedLogFormat.LogWriter writer = - new AggregatedLogFormat.LogWriter(configuration, path, ugi); - writer.writeApplicationOwner(ugi.getUserName()); + try (AggregatedLogFormat.LogWriter writer = + new AggregatedLogFormat.LogWriter()) { + writer.initialize(configuration, path, ugi); + writer.writeApplicationOwner(ugi.getUserName()); - Map appAcls = - new HashMap(); - appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); - writer.writeApplicationACLs(appAcls); - writer.append(new AggregatedLogFormat.LogKey(containerId), - new AggregatedLogFormat.LogValue(rootLogDirs, containerId, - UserGroupInformation.getCurrentUser().getShortUserName())); - writer.close(); + Map appAcls = new HashMap<>(); + appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); + writer.writeApplicationACLs(appAcls); + writer.append(new AggregatedLogFormat.LogKey(containerId), + new AggregatedLogFormat.LogValue(rootLogDirs, containerId, + UserGroupInformation.getCurrentUser().getShortUserName())); + } } private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi, @@ -1365,23 +1365,23 @@ private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ug Path path = new Path(appDir, LogAggregationUtils.getNodeString(nodeId) + System.currentTimeMillis()); - AggregatedLogFormat.LogWriter writer = - new AggregatedLogFormat.LogWriter(configuration, path, ugi); - writer.writeApplicationOwner(ugi.getUserName()); + try (AggregatedLogFormat.LogWriter writer = + new AggregatedLogFormat.LogWriter()) { + writer.initialize(configuration, path, ugi); + writer.writeApplicationOwner(ugi.getUserName()); - Map appAcls = - new HashMap(); - appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); - writer.writeApplicationACLs(appAcls); - DataOutputStream out = writer.getWriter().prepareAppendKey(-1); - new AggregatedLogFormat.LogKey(containerId).write(out); - out.close(); - out = writer.getWriter().prepareAppendValue(-1); - new AggregatedLogFormat.LogValue(rootLogDirs, containerId, - UserGroupInformation.getCurrentUser().getShortUserName()).write(out, - new HashSet()); - out.close(); - writer.close(); + Map appAcls = new HashMap<>(); + appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); + writer.writeApplicationACLs(appAcls); + DataOutputStream out = writer.getWriter().prepareAppendKey(-1); + new AggregatedLogFormat.LogKey(containerId).write(out); + out.close(); + out = writer.getWriter().prepareAppendValue(-1); + new AggregatedLogFormat.LogValue(rootLogDirs, containerId, + UserGroupInformation.getCurrentUser().getShortUserName()).write(out, + new HashSet<>()); + out.close(); + } } private YarnClient createMockYarnClient(YarnApplicationState appState, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index 1b46007ebb..8d86967215 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -446,14 +446,23 @@ public boolean shouldRetainLog() { * The writer that writes out the aggregated logs. */ @Private - public static class LogWriter { + public static class LogWriter implements AutoCloseable { - private final FSDataOutputStream fsDataOStream; - private final TFile.Writer writer; + private FSDataOutputStream fsDataOStream; + private TFile.Writer writer; private FileContext fc; - public LogWriter(final Configuration conf, final Path remoteAppLogFile, - UserGroupInformation userUgi) throws IOException { + /** + * Initialize the LogWriter. + * Must be called just after the instance is created. + * @param conf Configuration + * @param remoteAppLogFile remote log file path + * @param userUgi Ugi of the user + * @throws IOException Failed to initialize + */ + public void initialize(final Configuration conf, + final Path remoteAppLogFile, + UserGroupInformation userUgi) throws IOException { try { this.fsDataOStream = userUgi.doAs(new PrivilegedExceptionAction() { @@ -530,11 +539,14 @@ public void append(LogKey logKey, LogValue logValue) throws IOException { } } + @Override public void close() { - try { - this.writer.close(); - } catch (IOException e) { - LOG.warn("Exception closing writer", e); + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + LOG.warn("Exception closing writer", e); + } } IOUtils.closeStream(fsDataOStream); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java index 8cbec1065b..efbaa4c44c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java @@ -140,44 +140,44 @@ private void writeSrcFileAndALog(Path srcFilePath, String fileName, final long l final int ch = filler; UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - LogWriter logWriter = new LogWriter(new Configuration(), remoteAppLogFile, - ugi); + try (LogWriter logWriter = new LogWriter()) { + logWriter.initialize(new Configuration(), remoteAppLogFile, ugi); - LogKey logKey = new LogKey(testContainerId); - LogValue logValue = - spy(new LogValue(Collections.singletonList(srcFileRoot.toString()), - testContainerId, ugi.getShortUserName())); + LogKey logKey = new LogKey(testContainerId); + LogValue logValue = + spy(new LogValue(Collections.singletonList(srcFileRoot.toString()), + testContainerId, ugi.getShortUserName())); - final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(1); - Thread t = new Thread() { - public void run() { - try { - for(int i=0; i < length/3; i++) { + Thread t = new Thread() { + public void run() { + try { + for (int i = 0; i < length / 3; i++) { osw.write(ch); - } + } - latch.countDown(); + latch.countDown(); - for(int i=0; i < (2*length)/3; i++) { - osw.write(ch); + for (int i = 0; i < (2 * length) / 3; i++) { + osw.write(ch); + } + osw.close(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); } - osw.close(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); } - } - }; - t.start(); + }; + t.start(); - //Wait till the osw is partially written - //aggregation starts once the ows has completed 1/3rd of its work - latch.await(); + //Wait till the osw is partially written + //aggregation starts once the ows has completed 1/3rd of its work + latch.await(); - //Aggregate The Logs - logWriter.append(logKey, logValue); - logWriter.close(); + //Aggregate The Logs + logWriter.append(logKey, logValue); + } } @Test @@ -216,22 +216,23 @@ private void testReadAcontainerLog(boolean logUploadedTime) throws Exception { writeSrcFile(srcFilePath, "stdout", numChars); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi); + try (LogWriter logWriter = new LogWriter()) { + logWriter.initialize(conf, remoteAppLogFile, ugi); - LogKey logKey = new LogKey(testContainerId); - LogValue logValue = - new LogValue(Collections.singletonList(srcFileRoot.toString()), - testContainerId, ugi.getShortUserName()); + LogKey logKey = new LogKey(testContainerId); + LogValue logValue = + new LogValue(Collections.singletonList(srcFileRoot.toString()), + testContainerId, ugi.getShortUserName()); - // When we try to open FileInputStream for stderr, it will throw out an IOException. - // Skip the log aggregation for stderr. - LogValue spyLogValue = spy(logValue); - File errorFile = new File((new Path(srcFilePath, "stderr")).toString()); - doThrow(new IOException("Mock can not open FileInputStream")).when( - spyLogValue).secureOpenFile(errorFile); + // When we try to open FileInputStream for stderr, it will throw out an + // IOException. Skip the log aggregation for stderr. + LogValue spyLogValue = spy(logValue); + File errorFile = new File((new Path(srcFilePath, "stderr")).toString()); + doThrow(new IOException("Mock can not open FileInputStream")).when( + spyLogValue).secureOpenFile(errorFile); - logWriter.append(logKey, spyLogValue); - logWriter.close(); + logWriter.append(logKey, spyLogValue); + } // make sure permission are correct on the file FileStatus fsStatus = fs.getFileStatus(remoteAppLogFile); @@ -311,24 +312,24 @@ public void testContainerLogsFileAccess() throws IOException { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi); + try (LogWriter logWriter = new LogWriter()) { + logWriter.initialize(conf, remoteAppLogFile, ugi); - LogKey logKey = new LogKey(testContainerId1); - String randomUser = "randomUser"; - LogValue logValue = - spy(new LogValue(Collections.singletonList(srcFileRoot.toString()), - testContainerId1, randomUser)); - - // It is trying simulate a situation where first log file is owned by - // different user (probably symlink) and second one by the user itself. - // The first file should not be aggregated. Because this log file has the invalid - // user name. - when(logValue.getUser()).thenReturn(randomUser).thenReturn( - ugi.getShortUserName()); - logWriter.append(logKey, logValue); + LogKey logKey = new LogKey(testContainerId1); + String randomUser = "randomUser"; + LogValue logValue = + spy(new LogValue(Collections.singletonList(srcFileRoot.toString()), + testContainerId1, randomUser)); + + // It is trying simulate a situation where first log file is owned by + // different user (probably symlink) and second one by the user itself. + // The first file should not be aggregated. Because this log file has + // the invalid user name. + when(logValue.getUser()).thenReturn(randomUser).thenReturn( + ugi.getShortUserName()); + logWriter.append(logKey, logValue); + } - logWriter.close(); - BufferedReader in = new BufferedReader(new FileReader(new File(remoteAppLogFile .toUri().getRawPath()))); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java index 594f18644f..1e71b3cdca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java @@ -295,17 +295,20 @@ private void writeLog(Configuration configuration, String user) List rootLogDirs = Arrays.asList("target/logs/logs"); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - AggregatedLogFormat.LogWriter writer = new AggregatedLogFormat.LogWriter( - configuration, new Path(path), ugi); - writer.writeApplicationOwner(ugi.getUserName()); + try (AggregatedLogFormat.LogWriter writer = + new AggregatedLogFormat.LogWriter()) { + writer.initialize(configuration, new Path(path), ugi); + writer.writeApplicationOwner(ugi.getUserName()); - Map appAcls = new HashMap(); - appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); - writer.writeApplicationACLs(appAcls); + Map appAcls = new HashMap<>(); + appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); + writer.writeApplicationACLs(appAcls); - writer.append(new AggregatedLogFormat.LogKey("container_0_0001_01_000001"), - new AggregatedLogFormat.LogValue(rootLogDirs, containerId,UserGroupInformation.getCurrentUser().getShortUserName())); - writer.close(); + writer.append( + new AggregatedLogFormat.LogKey("container_0_0001_01_000001"), + new AggregatedLogFormat.LogValue(rootLogDirs, containerId, + UserGroupInformation.getCurrentUser().getShortUserName())); + } } private void writeLogs(String dirName) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java index c6841c90f7..8b665e03f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java @@ -110,13 +110,14 @@ private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi, ContainerId containerId, Path appDir, FileSystem fs) throws IOException { Path path = new Path(appDir, LogAggregationUtils.getNodeString(nodeId)); - AggregatedLogFormat.LogWriter writer = - new AggregatedLogFormat.LogWriter(configuration, path, ugi); - writer.writeApplicationOwner(ugi.getUserName()); + try (AggregatedLogFormat.LogWriter writer = + new AggregatedLogFormat.LogWriter()) { + writer.initialize(configuration, path, ugi); + writer.writeApplicationOwner(ugi.getUserName()); - writer.append(new AggregatedLogFormat.LogKey(containerId), - new AggregatedLogFormat.LogValue(rootLogDirs, containerId, - ugi.getShortUserName())); - writer.close(); + writer.append(new AggregatedLogFormat.LogKey(containerId), + new AggregatedLogFormat.LogValue(rootLogDirs, containerId, + ugi.getShortUserName())); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index d70acc940e..f465534b66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -295,18 +295,18 @@ private void uploadLogsForContainers(boolean appFinished) { } } - LogWriter writer = null; + if (pendingContainerInThisCycle.isEmpty()) { + sendLogAggregationReport(true, "", appFinished); + return; + } + + logAggregationTimes++; String diagnosticMessage = ""; boolean logAggregationSucceedInThisCycle = true; - try { - if (pendingContainerInThisCycle.isEmpty()) { - return; - } - - logAggregationTimes++; - + try (LogWriter writer = createLogWriter()) { try { - writer = createLogWriter(); + writer.initialize(this.conf, this.remoteNodeTmpLogFileForApp, + this.userUgi); // Write ACLs once when the writer is created. writer.writeApplicationACLs(appAcls); writer.writeApplicationOwner(this.userUgi.getShortUserName()); @@ -351,11 +351,6 @@ private void uploadLogsForContainers(boolean appFinished) { cleanupOldLogTimes++; } - if (writer != null) { - writer.close(); - writer = null; - } - long currentTime = System.currentTimeMillis(); final Path renamedPath = this.rollingMonitorInterval <= 0 ? remoteNodeLogFileForApp : new Path( @@ -396,34 +391,37 @@ public Object run() throws Exception { logAggregationSucceedInThisCycle = false; } } finally { - LogAggregationStatus logAggregationStatus = - logAggregationSucceedInThisCycle - ? LogAggregationStatus.RUNNING - : LogAggregationStatus.RUNNING_WITH_FAILURE; - sendLogAggregationReport(logAggregationStatus, diagnosticMessage); - if (appFinished) { - // If the app is finished, one extra final report with log aggregation - // status SUCCEEDED/FAILED will be sent to RM to inform the RM - // that the log aggregation in this NM is completed. - LogAggregationStatus finalLogAggregationStatus = - renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle - ? LogAggregationStatus.FAILED - : LogAggregationStatus.SUCCEEDED; - sendLogAggregationReport(finalLogAggregationStatus, ""); - } - - if (writer != null) { - writer.close(); - } + sendLogAggregationReport(logAggregationSucceedInThisCycle, + diagnosticMessage, appFinished); } } - protected LogWriter createLogWriter() throws IOException { - return new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp, - this.userUgi); + @VisibleForTesting + protected LogWriter createLogWriter() { + return new LogWriter(); } private void sendLogAggregationReport( + boolean logAggregationSucceedInThisCycle, String diagnosticMessage, + boolean appFinished) { + LogAggregationStatus logAggregationStatus = + logAggregationSucceedInThisCycle + ? LogAggregationStatus.RUNNING + : LogAggregationStatus.RUNNING_WITH_FAILURE; + sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage); + if (appFinished) { + // If the app is finished, one extra final report with log aggregation + // status SUCCEEDED/FAILED will be sent to RM to inform the RM + // that the log aggregation in this NM is completed. + LogAggregationStatus finalLogAggregationStatus = + renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle + ? LogAggregationStatus.FAILED + : LogAggregationStatus.SUCCEEDED; + sendLogAggregationReportInternal(finalLogAggregationStatus, ""); + } + } + + private void sendLogAggregationReportInternal( LogAggregationStatus logAggregationStatus, String diagnosticMessage) { LogAggregationReport report = Records.newRecord(LogAggregationReport.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java index 17d527a7a5..097146b1e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java @@ -416,8 +416,7 @@ public AppLogAggregatorInTest(Dispatcher dispatcher, logAggregationContext, context, lfs, -1, recoveredLogInitedTime); this.applicationId = appId; this.deletionService = deletionService; - - this.logWriter = getSpiedLogWriter(conf, ugi, remoteNodeLogFileForApp); + this.logWriter = spy(new LogWriter()); this.logValue = ArgumentCaptor.forClass(LogValue.class); } @@ -425,10 +424,5 @@ public AppLogAggregatorInTest(Dispatcher dispatcher, protected LogWriter createLogWriter() { return this.logWriter; } - - private LogWriter getSpiedLogWriter(Configuration conf, - UserGroupInformation ugi, Path remoteAppLogFile) throws IOException { - return spy(new LogWriter(conf, remoteAppLogFile, ugi)); - } } }