diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7c766887c4..cfa072716c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -733,6 +733,9 @@ Release 2.6.0 - UNRELEASED YARN-2732. Fixed syntax error in SecureContainer.apt.vm. (Jian He via zjshen) + YARN-2724. Skipped uploading a local log file to HDFS if exception is raised + when opening it. (Xuan Gong via zjshen) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index e1d1e00e99..46e39ccb39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.ConverterUtils; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; @@ -211,6 +212,16 @@ public void write(DataOutputStream out, Set pendingUploadFiles) Collections.sort(fileList); for (File logFile : fileList) { + + FileInputStream in = null; + try { + in = secureOpenFile(logFile); + } catch (IOException e) { + logErrorMessage(logFile, e); + IOUtils.cleanup(LOG, in); + continue; + } + final long fileLength = logFile.length(); // Write the logFile Type out.writeUTF(logFile.getName()); @@ -219,9 +230,7 @@ public void write(DataOutputStream out, Set pendingUploadFiles) out.writeUTF(String.valueOf(fileLength)); // Write the log itself - FileInputStream in = null; try { - in = SecureIOUtils.openForRead(logFile, getUser(), null); byte[] buf = new byte[65535]; int len = 0; long bytesLeft = fileLength; @@ -244,18 +253,26 @@ public void write(DataOutputStream out, Set pendingUploadFiles) } this.uploadedFiles.add(logFile); } catch (IOException e) { - String message = "Error aggregating log file. Log file : " - + logFile.getAbsolutePath() + e.getMessage(); - LOG.error(message, e); + String message = logErrorMessage(logFile, e); out.write(message.getBytes()); } finally { - if (in != null) { - in.close(); - } + IOUtils.cleanup(LOG, in); } } } + @VisibleForTesting + public FileInputStream secureOpenFile(File logFile) throws IOException { + return SecureIOUtils.openForRead(logFile, getUser(), null); + } + + private static String logErrorMessage(File logFile, Exception e) { + String message = "Error aggregating log file. Log file : " + + logFile.getAbsolutePath() + ". " + e.getMessage(); + LOG.error(message, e); + return message; + } + // Added for testing purpose. public String getUser() { return user; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java index 676a156ff7..0fae77d863 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doThrow; import java.io.BufferedReader; import java.io.DataInputStream; @@ -37,7 +38,6 @@ import java.util.concurrent.CountDownLatch; import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -194,6 +194,8 @@ public void testReadAcontainerLogs1() throws Exception { int numChars = 80000; + // create file stderr and stdout in containerLogDir + writeSrcFile(srcFilePath, "stderr", numChars); writeSrcFile(srcFilePath, "stdout", numChars); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); @@ -204,7 +206,14 @@ public void testReadAcontainerLogs1() throws Exception { new LogValue(Collections.singletonList(srcFileRoot.toString()), testContainerId, ugi.getShortUserName()); - logWriter.append(logKey, logValue); + // When we try to open FileInputStream for stderr, it will throw out an IOException. + // Skip the log aggregation for stderr. + LogValue spyLogValue = spy(logValue); + File errorFile = new File((new Path(srcFilePath, "stderr")).toString()); + doThrow(new IOException("Mock can not open FileInputStream")).when( + spyLogValue).secureOpenFile(errorFile); + + logWriter.append(logKey, spyLogValue); logWriter.close(); // make sure permission are correct on the file @@ -218,11 +227,15 @@ public void testReadAcontainerLogs1() throws Exception { Writer writer = new StringWriter(); LogReader.readAcontainerLogs(dis, writer); + // We should only do the log aggregation for stdout. + // Since we could not open the fileInputStream for stderr, this file is not + // aggregated. String s = writer.toString(); int expectedLength = "\n\nLogType:stdout".length() + ("\nLogLength:" + numChars).length() + "\nLog Contents:\n".length() + numChars; Assert.assertTrue("LogType not matched", s.contains("LogType:stdout")); + Assert.assertTrue("log file:stderr should not be aggregated.", !s.contains("LogType:stderr")); Assert.assertTrue("LogLength not matched", s.contains("LogLength:" + numChars)); Assert.assertTrue("Log Contents not matched", s.contains("Log Contents"));