YARN-2724. Skipped uploading a local log file to HDFS if exception is raised when opening it. Contributed by Xuan Gong.

This commit is contained in:
Zhijie Shen 2014-10-24 11:13:44 -07:00
parent b3d8a642a9
commit e31f0a6558
3 changed files with 43 additions and 10 deletions

View File

@ -733,6 +733,9 @@ Release 2.6.0 - UNRELEASED
YARN-2732. Fixed syntax error in SecureContainer.apt.vm. (Jian He via zjshen) YARN-2732. Fixed syntax error in SecureContainer.apt.vm. (Jian He via zjshen)
YARN-2724. Skipped uploading a local log file to HDFS if exception is raised
when opening it. (Xuan Gong via zjshen)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -69,6 +69,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -211,6 +212,16 @@ public void write(DataOutputStream out, Set<File> pendingUploadFiles)
Collections.sort(fileList); Collections.sort(fileList);
for (File logFile : fileList) { for (File logFile : fileList) {
FileInputStream in = null;
try {
in = secureOpenFile(logFile);
} catch (IOException e) {
logErrorMessage(logFile, e);
IOUtils.cleanup(LOG, in);
continue;
}
final long fileLength = logFile.length(); final long fileLength = logFile.length();
// Write the logFile Type // Write the logFile Type
out.writeUTF(logFile.getName()); out.writeUTF(logFile.getName());
@ -219,9 +230,7 @@ public void write(DataOutputStream out, Set<File> pendingUploadFiles)
out.writeUTF(String.valueOf(fileLength)); out.writeUTF(String.valueOf(fileLength));
// Write the log itself // Write the log itself
FileInputStream in = null;
try { try {
in = SecureIOUtils.openForRead(logFile, getUser(), null);
byte[] buf = new byte[65535]; byte[] buf = new byte[65535];
int len = 0; int len = 0;
long bytesLeft = fileLength; long bytesLeft = fileLength;
@ -244,16 +253,24 @@ public void write(DataOutputStream out, Set<File> pendingUploadFiles)
} }
this.uploadedFiles.add(logFile); this.uploadedFiles.add(logFile);
} catch (IOException e) { } catch (IOException e) {
String message = "Error aggregating log file. Log file : " String message = logErrorMessage(logFile, e);
+ logFile.getAbsolutePath() + e.getMessage();
LOG.error(message, e);
out.write(message.getBytes()); out.write(message.getBytes());
} finally { } finally {
if (in != null) { IOUtils.cleanup(LOG, in);
in.close();
} }
} }
} }
@VisibleForTesting
public FileInputStream secureOpenFile(File logFile) throws IOException {
return SecureIOUtils.openForRead(logFile, getUser(), null);
}
private static String logErrorMessage(File logFile, Exception e) {
String message = "Error aggregating log file. Log file : "
+ logFile.getAbsolutePath() + ". " + e.getMessage();
LOG.error(message, e);
return message;
} }
// Added for testing purpose. // Added for testing purpose.

View File

@ -20,6 +20,7 @@
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doThrow;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.DataInputStream; import java.io.DataInputStream;
@ -37,7 +38,6 @@
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.junit.Assert; import org.junit.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -194,6 +194,8 @@ public void testReadAcontainerLogs1() throws Exception {
int numChars = 80000; int numChars = 80000;
// create file stderr and stdout in containerLogDir
writeSrcFile(srcFilePath, "stderr", numChars);
writeSrcFile(srcFilePath, "stdout", numChars); writeSrcFile(srcFilePath, "stdout", numChars);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@ -204,7 +206,14 @@ public void testReadAcontainerLogs1() throws Exception {
new LogValue(Collections.singletonList(srcFileRoot.toString()), new LogValue(Collections.singletonList(srcFileRoot.toString()),
testContainerId, ugi.getShortUserName()); testContainerId, ugi.getShortUserName());
logWriter.append(logKey, logValue); // When we try to open FileInputStream for stderr, it will throw out an IOException.
// Skip the log aggregation for stderr.
LogValue spyLogValue = spy(logValue);
File errorFile = new File((new Path(srcFilePath, "stderr")).toString());
doThrow(new IOException("Mock can not open FileInputStream")).when(
spyLogValue).secureOpenFile(errorFile);
logWriter.append(logKey, spyLogValue);
logWriter.close(); logWriter.close();
// make sure permission are correct on the file // make sure permission are correct on the file
@ -218,11 +227,15 @@ public void testReadAcontainerLogs1() throws Exception {
Writer writer = new StringWriter(); Writer writer = new StringWriter();
LogReader.readAcontainerLogs(dis, writer); LogReader.readAcontainerLogs(dis, writer);
// We should only do the log aggregation for stdout.
// Since we could not open the fileInputStream for stderr, this file is not
// aggregated.
String s = writer.toString(); String s = writer.toString();
int expectedLength = int expectedLength =
"\n\nLogType:stdout".length() + ("\nLogLength:" + numChars).length() "\n\nLogType:stdout".length() + ("\nLogLength:" + numChars).length()
+ "\nLog Contents:\n".length() + numChars; + "\nLog Contents:\n".length() + numChars;
Assert.assertTrue("LogType not matched", s.contains("LogType:stdout")); Assert.assertTrue("LogType not matched", s.contains("LogType:stdout"));
Assert.assertTrue("log file:stderr should not be aggregated.", !s.contains("LogType:stderr"));
Assert.assertTrue("LogLength not matched", s.contains("LogLength:" + numChars)); Assert.assertTrue("LogLength not matched", s.contains("LogLength:" + numChars));
Assert.assertTrue("Log Contents not matched", s.contains("Log Contents")); Assert.assertTrue("Log Contents not matched", s.contains("Log Contents"));