From 0ac443b1f8c9412b9778a9b163bdde9b84d1e8c5 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Mon, 19 Jul 2021 10:11:20 +0800 Subject: [PATCH] YARN-10855. yarn logs cli fails to retrieve logs if any TFile is corrupt or empty. Contributed by Jim Brennan. --- .../hadoop/yarn/client/cli/TestLogsCLI.java | 30 +++++++++++++++++ .../logaggregation/AggregatedLogFormat.java | 2 +- .../tfile/LogAggregationTFileController.java | 33 +++++++------------ 3 files changed, 43 insertions(+), 22 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index f7512cf935..6ec8549be9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -396,7 +396,9 @@ public void testFetchFinishedApplictionLogs() throws Exception { ContainerId containerId1 = ContainerId.newContainerId(appAttemptId1, 1); ContainerId containerId2 = ContainerId.newContainerId(appAttemptId1, 2); ContainerId containerId3 = ContainerId.newContainerId(appAttemptId2, 3); + ContainerId containerId4 = ContainerId.newContainerId(appAttemptId2, 4); final NodeId nodeId = NodeId.newInstance("localhost", 1234); + final NodeId badNodeId = NodeId.newInstance("badhost", 5678); // create local logs String rootLogDir = "target/LocalLogs"; @@ -449,6 +451,8 @@ public void testFetchFinishedApplictionLogs() throws Exception { containerId2, path, fs); uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirs, nodeId, containerId3, path, fs); + uploadTruncatedTFileIntoRemoteDir(ugi, conf, badNodeId, + containerId4, fs); YarnClient mockYarnClient = createMockYarnClient( @@ -801,6 +805,17 @@ public ContainerReport getContainerReport(String containerIdStr) "Invalid ContainerId specified")); sysErrStream.reset(); + // Uploaded the empty log for container4. We should see a message + // showing the log for container4 is not present. + exitCode = + cli.run(new String[] {"-applicationId", appId.toString(), + "-nodeAddress", badNodeId.toString(), "-containerId", + containerId4.toString()}); + assertTrue(exitCode == -1); + assertTrue(sysErrStream.toString().contains( + "Can not find any log file matching the pattern")); + sysErrStream.reset(); + fs.delete(new Path(remoteLogRootDir), true); fs.delete(new Path(rootLogDir), true); } @@ -1820,6 +1835,21 @@ private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ug } } + private static void uploadTruncatedTFileIntoRemoteDir( + UserGroupInformation ugi, Configuration configuration, + NodeId nodeId, ContainerId containerId, + FileSystem fs) throws Exception { + LogAggregationFileControllerFactory factory + = new LogAggregationFileControllerFactory(configuration); + LogAggregationFileController fileFormat = factory + .getFileControllerForWrite(); + ApplicationId appId = containerId.getApplicationAttemptId() + .getApplicationId(); + Path path = fileFormat.getRemoteNodeLogFileForApp( + appId, ugi.getCurrentUser().getShortUserName(), nodeId); + fs.create(path, true).close(); + } + private LogsCLI createCli() throws IOException, YarnException { YarnClient mockYarnClient = createMockYarnClient(YarnApplicationState.FINISHED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index 9f4bdb6c03..0c6c3f02de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -565,7 +565,7 @@ public void close() throws DSQuotaExceededException { @Public @Evolving - public static class LogReader { + public static class LogReader implements AutoCloseable { private final FSDataInputStream fsDataIStream; private final TFile.Reader.Scanner scanner; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java index b365424de8..e1e2c9aeaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; @@ -193,10 +192,7 @@ public boolean readAggregatedLogs(ContainerLogsRequest logRequest, if ((nodeId == null || nodeName.contains(LogAggregationUtils .getNodeString(nodeId))) && !nodeName.endsWith( LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = null; - try { - reader = new AggregatedLogFormat.LogReader(conf, - thisNodeFile.getPath()); + try (LogReader reader = new LogReader(conf, thisNodeFile.getPath())) { DataInputStream valueStream; LogKey key = new LogKey(); valueStream = reader.next(key); @@ -251,10 +247,10 @@ public boolean readAggregatedLogs(ContainerLogsRequest logRequest, key = new LogKey(); valueStream = reader.next(key); } - } finally { - if (reader != null) { - reader.close(); - } + } catch (IOException ex) { + LOG.error("Skipping empty or corrupt file " + + thisNodeFile.getPath(), ex); + continue; // skip empty or corrupt files } } } @@ -268,10 +264,7 @@ public Map> getLogMetaFilesOfNode( Map> logMetaFiles = new HashMap<>(); Path nodePath = currentNodeFile.getPath(); - LogReader reader = - new LogReader(conf, - nodePath); - try { + try (LogReader reader = new LogReader(conf, nodePath)) { DataInputStream valueStream; LogKey key = new LogKey(); valueStream = reader.next(key); @@ -286,8 +279,6 @@ public Map> getLogMetaFilesOfNode( key = new LogKey(); valueStream = reader.next(key); } - } finally { - reader.close(); } return logMetaFiles; } @@ -349,10 +340,8 @@ public List readAggregatedLogsMeta( } if (!thisNodeFile.getPath().getName() .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = - new AggregatedLogFormat.LogReader(conf, - thisNodeFile.getPath()); - try { + try (LogReader reader = new LogReader(conf, + thisNodeFile.getPath())) { DataInputStream valueStream; LogKey key = new LogKey(); valueStream = reader.next(key); @@ -383,8 +372,10 @@ public List readAggregatedLogsMeta( key = new LogKey(); valueStream = reader.next(key); } - } finally { - reader.close(); + } catch (IOException ex) { + LOG.error("Skipping empty or corrupt file " + + thisNodeFile.getPath(), ex); + continue; // skip empty or corrupt files } } }