From 2826a35469acd262c76fa26e730a21843ac6e856 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Fri, 21 Mar 2014 18:35:54 +0000 Subject: [PATCH] YARN-1670. Fixed a bug in log-aggregation that can cause the writer to write more log-data than the log-length that it records. Contributed by Mit Desai. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1580005 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../logaggregation/AggregatedLogFormat.java | 15 ++- .../TestAggregatedLogFormat.java | 91 +++++++++++++++++++ 3 files changed, 106 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e7d171cbdb..1b719701dd 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -536,6 +536,9 @@ Release 2.4.0 - UNRELEASED YARN-1811. Fixed AMFilters in YARN to correctly accept requests from either web-app proxy or the RMs when HA is enabled. (Robert Kanter via vinodkv) + YARN-1670. Fixed a bug in log-aggregation that can cause the writer to write + more log-data than the log-length that it records. (Mit Desai via vinodk) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index 6ec4ec2ed4..c4ec30f86b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -182,20 +182,29 @@ public void write(DataOutputStream out) throws IOException { Arrays.sort(logFiles); for (File logFile : logFiles) { + long fileLength = 0; + // Write the logFile Type out.writeUTF(logFile.getName()); // Write the log length as UTF so that it is printable - out.writeUTF(String.valueOf(logFile.length())); + out.writeUTF(String.valueOf(fileLength = logFile.length())); // Write the log itself FileInputStream in = null; try { in = SecureIOUtils.openForRead(logFile, getUser(), null); byte[] buf = new byte[65535]; + long curRead = 0; int len = 0; - while ((len = in.read(buf)) != -1) { + while ( ((len = in.read(buf)) != -1) && (curRead < fileLength) ) { out.write(buf, 0, len); + curRead += len; + } + long newLength = logFile.length(); + if(fileLength < newLength) { + LOG.warn("Aggregated Logs Truncated by "+ + (newLength-fileLength) +" bytes."); } } catch (IOException e) { String message = "Error aggregating log file. Log file : " @@ -553,7 +562,7 @@ public static void readAContainerLogsForALogType( out.println(fileLengthStr); out.println("Log Contents:"); - int curRead = 0; + long curRead = 0; long pendingRead = fileLength - curRead; int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java index c10a7fff3c..676a156ff7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java @@ -34,6 +34,7 @@ import java.io.Writer; import java.util.Arrays; import java.util.Collections; +import java.util.concurrent.CountDownLatch; import org.junit.Assert; @@ -87,6 +88,96 @@ public void cleanupTestDir() throws Exception { fs.delete(workDirPath, true); } + //Test for Corrupted AggregatedLogs. The Logs should not write more data + //if Logvalue.write() is called and the application is still + //appending to logs + + @Test + public void testForCorruptedAggregatedLogs() throws Exception { + Configuration conf = new Configuration(); + File workDir = new File(testWorkDir, "testReadAcontainerLogs1"); + Path remoteAppLogFile = + new Path(workDir.getAbsolutePath(), "aggregatedLogFile"); + Path srcFileRoot = new Path(workDir.getAbsolutePath(), "srcFiles"); + ContainerId testContainerId = TestContainerId.newContainerId(1, 1, 1, 1); + Path t = + new Path(srcFileRoot, testContainerId.getApplicationAttemptId() + .getApplicationId().toString()); + Path srcFilePath = new Path(t, testContainerId.toString()); + + long numChars = 950000; + + writeSrcFileAndALog(srcFilePath, "stdout", numChars, remoteAppLogFile, + srcFileRoot, testContainerId); + + LogReader logReader = new LogReader(conf, remoteAppLogFile); + LogKey rLogKey = new LogKey(); + DataInputStream dis = logReader.next(rLogKey); + Writer writer = new StringWriter(); + try { + LogReader.readAcontainerLogs(dis, writer); + } catch (Exception e) { + if(e.toString().contains("NumberFormatException")) { + Assert.fail("Aggregated logs are corrupted."); + } + } + } + + private void writeSrcFileAndALog(Path srcFilePath, String fileName, final long length, + Path remoteAppLogFile, Path srcFileRoot, ContainerId testContainerId) + throws Exception { + File dir = new File(srcFilePath.toString()); + if (!dir.exists()) { + if (!dir.mkdirs()) { + throw new IOException("Unable to create directory : " + dir); + } + } + + File outputFile = new File(new File(srcFilePath.toString()), fileName); + FileOutputStream os = new FileOutputStream(outputFile); + final OutputStreamWriter osw = new OutputStreamWriter(os, "UTF8"); + final int ch = filler; + + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi); + + LogKey logKey = new LogKey(testContainerId); + LogValue logValue = + spy(new LogValue(Collections.singletonList(srcFileRoot.toString()), + testContainerId, ugi.getShortUserName())); + + final CountDownLatch latch = new CountDownLatch(1); + + Thread t = new Thread() { + public void run() { + try { + for(int i=0; i < length/3; i++) { + osw.write(ch); + } + + latch.countDown(); + + for(int i=0; i < (2*length)/3; i++) { + osw.write(ch); + } + osw.close(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + }; + t.start(); + + //Wait till the osw is partially written + //aggregation starts once the ows has completed 1/3rd of its work + latch.await(); + + //Aggregate The Logs + logWriter.append(logKey, logValue); + logWriter.close(); + } + //Verify the output generated by readAContainerLogs(DataInputStream, Writer) @Test public void testReadAcontainerLogs1() throws Exception {