YARN-9525. IFile format is not working against s3a remote folder. Contributed by Adam Antal

This commit is contained in:
Szilard Nemeth 2020-01-20 12:36:55 +01:00
parent 8b3ee2f7e9
commit 6d52bbbfcf

View File

@ -44,6 +44,7 @@
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang3.SerializationUtils; import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -110,6 +111,7 @@ public class LogAggregationIndexedFileController
"indexedFile.fs.retry-interval-ms"; "indexedFile.fs.retry-interval-ms";
private static final String LOG_ROLL_OVER_MAX_FILE_SIZE_GB = private static final String LOG_ROLL_OVER_MAX_FILE_SIZE_GB =
"indexedFile.log.roll-over.max-file-size-gb"; "indexedFile.log.roll-over.max-file-size-gb";
private static final int LOG_ROLL_OVER_MAX_FILE_SIZE_GB_DEFAULT = 10;
@VisibleForTesting @VisibleForTesting
public static final String CHECK_SUM_FILE_SUFFIX = "-checksum"; public static final String CHECK_SUM_FILE_SUFFIX = "-checksum";
@ -182,9 +184,16 @@ public Object run() throws Exception {
indexedLogsMeta.setCompressName(compressName); indexedLogsMeta.setCompressName(compressName);
} }
Path aggregatedLogFile = null; Path aggregatedLogFile = null;
Pair<Path, Boolean> initializationResult = null;
boolean createdNew;
if (context.isLogAggregationInRolling()) { if (context.isLogAggregationInRolling()) {
aggregatedLogFile = initializeWriterInRolling( // In rolling log aggregation we need special initialization
// done in initializeWriterInRolling.
initializationResult = initializeWriterInRolling(
remoteLogFile, appId, nodeId); remoteLogFile, appId, nodeId);
aggregatedLogFile = initializationResult.getLeft();
createdNew = initializationResult.getRight();
} else { } else {
aggregatedLogFile = remoteLogFile; aggregatedLogFile = remoteLogFile;
fsDataOStream = fc.create(remoteLogFile, fsDataOStream = fc.create(remoteLogFile,
@ -195,22 +204,30 @@ public Object run() throws Exception {
} }
fsDataOStream.write(uuid); fsDataOStream.write(uuid);
fsDataOStream.flush(); fsDataOStream.flush();
createdNew = true;
} }
long aggregatedLogFileLength = fc.getFileStatus( // If we have created a new file, we know that the offset is zero.
aggregatedLogFile).getLen(); // Otherwise we should get this information through getFileStatus.
// append a simple character("\n") to move the writer cursor, so if (createdNew) {
// we could get the correct position when we call
// fsOutputStream.getStartPos()
final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8"));
fsDataOStream.write(dummyBytes);
fsDataOStream.flush();
if (fsDataOStream.getPos() >= (aggregatedLogFileLength
+ dummyBytes.length)) {
currentOffSet = 0; currentOffSet = 0;
} else { } else {
currentOffSet = aggregatedLogFileLength; long aggregatedLogFileLength = fc.getFileStatus(
aggregatedLogFile).getLen();
// append a simple character("\n") to move the writer cursor, so
// we could get the correct position when we call
// fsOutputStream.getStartPos()
final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8"));
fsDataOStream.write(dummyBytes);
fsDataOStream.flush();
if (fsDataOStream.getPos() < (aggregatedLogFileLength
+ dummyBytes.length)) {
currentOffSet = fc.getFileStatus(
aggregatedLogFile).getLen();
} else {
currentOffSet = 0;
}
} }
return null; return null;
} }
@ -220,8 +237,22 @@ public Object run() throws Exception {
} }
} }
private Path initializeWriterInRolling(final Path remoteLogFile, /**
final ApplicationId appId, final String nodeId) throws Exception { * Initializes the write for the log aggregation controller in the
* rolling case. It sets up / modifies checksum and meta files if needed.
*
* @param remoteLogFile the Path of the remote log file
* @param appId the application id
* @param nodeId the node id
* @return a Pair of Path and Boolean - the Path is path of the
* aggregated log file, while the Boolean is whether a new
* file was created or not
* @throws Exception
*/
private Pair<Path, Boolean> initializeWriterInRolling(
final Path remoteLogFile, final ApplicationId appId,
final String nodeId) throws Exception {
boolean createdNew = false;
Path aggregatedLogFile = null; Path aggregatedLogFile = null;
// check uuid // check uuid
// if we can not find uuid, we would load the uuid // if we can not find uuid, we would load the uuid
@ -281,6 +312,7 @@ private Path initializeWriterInRolling(final Path remoteLogFile,
// writes the uuid // writes the uuid
fsDataOStream.write(uuid); fsDataOStream.write(uuid);
fsDataOStream.flush(); fsDataOStream.flush();
createdNew = true;
} else { } else {
aggregatedLogFile = currentRemoteLogFile; aggregatedLogFile = currentRemoteLogFile;
fsDataOStream = fc.create(currentRemoteLogFile, fsDataOStream = fc.create(currentRemoteLogFile,
@ -289,8 +321,13 @@ private Path initializeWriterInRolling(final Path remoteLogFile,
} }
// recreate checksum file if needed before aggregate the logs // recreate checksum file if needed before aggregate the logs
if (overwriteCheckSum) { if (overwriteCheckSum) {
final long currentAggregatedLogFileLength = fc long currentAggregatedLogFileLength;
.getFileStatus(aggregatedLogFile).getLen(); if (createdNew) {
currentAggregatedLogFileLength = 0;
} else {
currentAggregatedLogFileLength = fc
.getFileStatus(aggregatedLogFile).getLen();
}
FSDataOutputStream checksumFileOutputStream = null; FSDataOutputStream checksumFileOutputStream = null;
try { try {
checksumFileOutputStream = fc.create(remoteLogCheckSumFile, checksumFileOutputStream = fc.create(remoteLogCheckSumFile,
@ -307,7 +344,8 @@ private Path initializeWriterInRolling(final Path remoteLogFile,
IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream); IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream);
} }
} }
return aggregatedLogFile;
return Pair.of(aggregatedLogFile, createdNew);
} }
@Override @Override
@ -572,7 +610,6 @@ public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
return findLogs; return findLogs;
} }
// TODO: fix me if the remote file system does not support append operation.
@Override @Override
public List<ContainerLogMeta> readAggregatedLogsMeta( public List<ContainerLogMeta> readAggregatedLogsMeta(
ContainerLogsRequest logRequest) throws IOException { ContainerLogsRequest logRequest) throws IOException {
@ -1144,7 +1181,8 @@ public long getRollOverLogMaxSize(Configuration conf) {
} }
if (supportAppend) { if (supportAppend) {
return 1024L * 1024 * 1024 * conf.getInt( return 1024L * 1024 * 1024 * conf.getInt(
LOG_ROLL_OVER_MAX_FILE_SIZE_GB, 10); LOG_ROLL_OVER_MAX_FILE_SIZE_GB,
LOG_ROLL_OVER_MAX_FILE_SIZE_GB_DEFAULT);
} else { } else {
return 0L; return 0L;
} }