From 6d52bbbfcfd7750b7e547abdcd0d14632d6ed9b6 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Mon, 20 Jan 2020 12:36:55 +0100 Subject: [PATCH] YARN-9525. IFile format is not working against s3a remote folder. Contributed by Adam Antal --- .../LogAggregationIndexedFileController.java | 78 ++++++++++++++----- 1 file changed, 58 insertions(+), 20 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index 7af58d7a83..605997f3ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -44,6 +44,7 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.commons.lang3.SerializationUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -110,6 +111,7 @@ public class LogAggregationIndexedFileController "indexedFile.fs.retry-interval-ms"; private static final String LOG_ROLL_OVER_MAX_FILE_SIZE_GB = "indexedFile.log.roll-over.max-file-size-gb"; + private static final int LOG_ROLL_OVER_MAX_FILE_SIZE_GB_DEFAULT = 10; @VisibleForTesting public static final String CHECK_SUM_FILE_SUFFIX = "-checksum"; @@ -182,9 +184,16 @@ public Object run() throws Exception { indexedLogsMeta.setCompressName(compressName); } Path aggregatedLogFile = null; + Pair initializationResult = null; + boolean createdNew; + if (context.isLogAggregationInRolling()) { - aggregatedLogFile = initializeWriterInRolling( + // In rolling log aggregation we need special initialization + // done in initializeWriterInRolling. + initializationResult = initializeWriterInRolling( remoteLogFile, appId, nodeId); + aggregatedLogFile = initializationResult.getLeft(); + createdNew = initializationResult.getRight(); } else { aggregatedLogFile = remoteLogFile; fsDataOStream = fc.create(remoteLogFile, @@ -195,22 +204,30 @@ public Object run() throws Exception { } fsDataOStream.write(uuid); fsDataOStream.flush(); + createdNew = true; } - long aggregatedLogFileLength = fc.getFileStatus( - aggregatedLogFile).getLen(); - // append a simple character("\n") to move the writer cursor, so - // we could get the correct position when we call - // fsOutputStream.getStartPos() - final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8")); - fsDataOStream.write(dummyBytes); - fsDataOStream.flush(); - - if (fsDataOStream.getPos() >= (aggregatedLogFileLength - + dummyBytes.length)) { + // If we have created a new file, we know that the offset is zero. + // Otherwise we should get this information through getFileStatus. + if (createdNew) { currentOffSet = 0; } else { - currentOffSet = aggregatedLogFileLength; + long aggregatedLogFileLength = fc.getFileStatus( + aggregatedLogFile).getLen(); + // append a simple character("\n") to move the writer cursor, so + // we could get the correct position when we call + // fsOutputStream.getStartPos() + final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8")); + fsDataOStream.write(dummyBytes); + fsDataOStream.flush(); + + if (fsDataOStream.getPos() < (aggregatedLogFileLength + + dummyBytes.length)) { + currentOffSet = fc.getFileStatus( + aggregatedLogFile).getLen(); + } else { + currentOffSet = 0; + } } return null; } @@ -220,8 +237,22 @@ public Object run() throws Exception { } } - private Path initializeWriterInRolling(final Path remoteLogFile, - final ApplicationId appId, final String nodeId) throws Exception { + /** + * Initializes the write for the log aggregation controller in the + * rolling case. It sets up / modifies checksum and meta files if needed. + * + * @param remoteLogFile the Path of the remote log file + * @param appId the application id + * @param nodeId the node id + * @return a Pair of Path and Boolean - the Path is path of the + * aggregated log file, while the Boolean is whether a new + * file was created or not + * @throws Exception + */ + private Pair initializeWriterInRolling( + final Path remoteLogFile, final ApplicationId appId, + final String nodeId) throws Exception { + boolean createdNew = false; Path aggregatedLogFile = null; // check uuid // if we can not find uuid, we would load the uuid @@ -281,6 +312,7 @@ private Path initializeWriterInRolling(final Path remoteLogFile, // writes the uuid fsDataOStream.write(uuid); fsDataOStream.flush(); + createdNew = true; } else { aggregatedLogFile = currentRemoteLogFile; fsDataOStream = fc.create(currentRemoteLogFile, @@ -289,8 +321,13 @@ private Path initializeWriterInRolling(final Path remoteLogFile, } // recreate checksum file if needed before aggregate the logs if (overwriteCheckSum) { - final long currentAggregatedLogFileLength = fc - .getFileStatus(aggregatedLogFile).getLen(); + long currentAggregatedLogFileLength; + if (createdNew) { + currentAggregatedLogFileLength = 0; + } else { + currentAggregatedLogFileLength = fc + .getFileStatus(aggregatedLogFile).getLen(); + } FSDataOutputStream checksumFileOutputStream = null; try { checksumFileOutputStream = fc.create(remoteLogCheckSumFile, @@ -307,7 +344,8 @@ private Path initializeWriterInRolling(final Path remoteLogFile, IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream); } } - return aggregatedLogFile; + + return Pair.of(aggregatedLogFile, createdNew); } @Override @@ -572,7 +610,6 @@ public boolean readAggregatedLogs(ContainerLogsRequest logRequest, return findLogs; } - // TODO: fix me if the remote file system does not support append operation. @Override public List readAggregatedLogsMeta( ContainerLogsRequest logRequest) throws IOException { @@ -1144,7 +1181,8 @@ public long getRollOverLogMaxSize(Configuration conf) { } if (supportAppend) { return 1024L * 1024 * 1024 * conf.getInt( - LOG_ROLL_OVER_MAX_FILE_SIZE_GB, 10); + LOG_ROLL_OVER_MAX_FILE_SIZE_GB, + LOG_ROLL_OVER_MAX_FILE_SIZE_GB_DEFAULT); } else { return 0L; }