HADOOP-19047: S3A: Support in-memory tracking of Magic Commit data (#6468)

If the option fs.s3a.committer.magic.track.commits.in.memory.enabled
is set to true, then rather than save data about in-progress uploads
to S3, this information is cached in memory.

If the number of files being committed is low, this will save network IO
in both the generation of .pending and marker files, and in the scanning
of task attempt directory trees during task commit.

Contributed by Syed Shameerur Rahman
This commit is contained in:
Syed Shameerur Rahman 2024-03-26 20:59:35 +05:30 committed by GitHub
parent 9fe371aa15
commit 032796a0fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 626 additions and 110 deletions

View File

@ -117,6 +117,7 @@
import org.apache.hadoop.fs.s3a.auth.SignerManager; import org.apache.hadoop.fs.s3a.auth.SignerManager;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations; import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider; import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
import org.apache.hadoop.fs.s3a.impl.AWSCannedACL; import org.apache.hadoop.fs.s3a.impl.AWSCannedACL;
import org.apache.hadoop.fs.s3a.impl.AWSHeaders; import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler; import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler;
@ -231,6 +232,8 @@
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding; import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME;
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE; import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE;
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE; import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE;
@ -3900,6 +3903,21 @@ public void access(final Path f, final FsAction mode)
@Retries.RetryTranslated @Retries.RetryTranslated
public FileStatus getFileStatus(final Path f) throws IOException { public FileStatus getFileStatus(final Path f) throws IOException {
Path path = qualify(f); Path path = qualify(f);
if (isTrackMagicCommitsInMemoryEnabled(getConf()) && isMagicCommitPath(path)) {
// Some downstream apps might call getFileStatus for a magic path to get the file size.
// when commit data is stored in memory construct the dummy S3AFileStatus with correct
// file size fetched from the memory.
if (InMemoryMagicCommitTracker.getPathToBytesWritten().containsKey(path)) {
long len = InMemoryMagicCommitTracker.getPathToBytesWritten().get(path);
return new S3AFileStatus(len,
0L,
path,
getDefaultBlockSize(path),
username,
MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME,
null);
}
}
return trackDurationAndSpan( return trackDurationAndSpan(
INVOCATION_GET_FILE_STATUS, path, () -> INVOCATION_GET_FILE_STATUS, path, () ->
innerGetFileStatus(path, false, StatusProbeEnum.ALL)); innerGetFileStatus(path, false, StatusProbeEnum.ALL));

View File

@ -58,6 +58,10 @@ private CommitConstants() {
*/ */
public static final String PENDINGSET_SUFFIX = ".pendingset"; public static final String PENDINGSET_SUFFIX = ".pendingset";
/**
* Etag name to be returned on non-committed S3 object: {@value}.
*/
public static final String MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME = "pending";
/** /**
* Prefix to use for config options: {@value}. * Prefix to use for config options: {@value}.
@ -242,6 +246,18 @@ private CommitConstants() {
*/ */
public static final int DEFAULT_COMMITTER_THREADS = 32; public static final int DEFAULT_COMMITTER_THREADS = 32;
/**
* Should Magic committer track all the pending commits in memory?
*/
public static final String FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED =
"fs.s3a.committer.magic.track.commits.in.memory.enabled";
/**
* Default value for {@link #FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED}: {@value}.
*/
public static final boolean FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT =
false;
/** /**
* Path in the cluster filesystem for temporary data: {@value}. * Path in the cluster filesystem for temporary data: {@value}.
* This is for HDFS, not the local filesystem. * This is for HDFS, not the local filesystem.

View File

@ -26,11 +26,13 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker; import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
import org.apache.hadoop.fs.s3a.commit.magic.S3MagicCommitTracker;
import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation; import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*; import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
/** /**
* Adds the code needed for S3A to support magic committers. * Adds the code needed for S3A to support magic committers.
@ -105,13 +107,15 @@ public PutTracker createTracker(Path path, String key,
String pendingsetPath = key + CommitConstants.PENDING_SUFFIX; String pendingsetPath = key + CommitConstants.PENDING_SUFFIX;
getStoreContext().incrementStatistic( getStoreContext().incrementStatistic(
Statistic.COMMITTER_MAGIC_FILES_CREATED); Statistic.COMMITTER_MAGIC_FILES_CREATED);
tracker = new MagicCommitTracker(path, if (isTrackMagicCommitsInMemoryEnabled(getStoreContext().getConfiguration())) {
getStoreContext().getBucket(), tracker = new InMemoryMagicCommitTracker(path, getStoreContext().getBucket(),
key, key, destKey, pendingsetPath, owner.getWriteOperationHelper(),
destKey,
pendingsetPath,
owner.getWriteOperationHelper(),
trackerStatistics); trackerStatistics);
} else {
tracker = new S3MagicCommitTracker(path, getStoreContext().getBucket(),
key, destKey, pendingsetPath, owner.getWriteOperationHelper(),
trackerStatistics);
}
LOG.debug("Created {}", tracker); LOG.debug("Created {}", tracker);
} else { } else {
LOG.warn("File being created has a \"magic\" path, but the filesystem" LOG.warn("File being created has a \"magic\" path, but the filesystem"

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.magic;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.util.Preconditions;
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.extractTaskAttemptIdFromPath;
/**
* InMemoryMagicCommitTracker stores the commit data in memory.
* The commit data and related data stores are flushed out from
* the memory when the task is committed or aborted.
*/
public class InMemoryMagicCommitTracker extends MagicCommitTracker {
/**
* Map to store taskAttemptId, and it's corresponding list of pending commit data.
* The entries in the Map gets removed when a task commits or aborts.
*/
private final static Map<String, List<SinglePendingCommit>> TASK_ATTEMPT_ID_TO_MPU_METADATA = new ConcurrentHashMap<>();
/**
* Map to store path of the file, and it's corresponding size.
* The entries in the Map gets removed when a task commits or aborts.
*/
private final static Map<Path, Long> PATH_TO_BYTES_WRITTEN = new ConcurrentHashMap<>();
/**
* Map to store taskAttemptId, and list of paths to files written by it.
* The entries in the Map gets removed when a task commits or aborts.
*/
private final static Map<String, List<Path>> TASK_ATTEMPT_ID_TO_PATH = new ConcurrentHashMap<>();
public InMemoryMagicCommitTracker(Path path,
String bucket,
String originalDestKey,
String destKey,
String pendingsetKey,
WriteOperationHelper writer,
PutTrackerStatistics trackerStatistics) {
super(path, bucket, originalDestKey, destKey, pendingsetKey, writer, trackerStatistics);
}
@Override
public boolean aboutToComplete(String uploadId,
List<CompletedPart> parts,
long bytesWritten,
final IOStatistics iostatistics)
throws IOException {
Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId),
"empty/null upload ID: " + uploadId);
Preconditions.checkArgument(parts != null, "No uploaded parts list");
Preconditions.checkArgument(!parts.isEmpty(), "No uploaded parts to save");
// build the commit summary
SinglePendingCommit commitData = new SinglePendingCommit();
commitData.touch(System.currentTimeMillis());
commitData.setDestinationKey(getDestKey());
commitData.setBucket(getBucket());
commitData.setUri(getPath().toUri().toString());
commitData.setUploadId(uploadId);
commitData.setText("");
commitData.setLength(bytesWritten);
commitData.bindCommitData(parts);
commitData.setIOStatistics(new IOStatisticsSnapshot(iostatistics));
// extract the taskAttemptId from the path
String taskAttemptId = extractTaskAttemptIdFromPath(getPath());
// store the commit data with taskAttemptId as the key
TASK_ATTEMPT_ID_TO_MPU_METADATA.computeIfAbsent(taskAttemptId,
k -> Collections.synchronizedList(new ArrayList<>())).add(commitData);
// store the byteswritten(length) for the corresponding file
PATH_TO_BYTES_WRITTEN.put(getPath(), bytesWritten);
// store the mapping between taskAttemptId and path
// This information is used for removing entries from
// the map once the taskAttempt is completed/committed.
TASK_ATTEMPT_ID_TO_PATH.computeIfAbsent(taskAttemptId,
k -> Collections.synchronizedList(new ArrayList<>())).add(getPath());
LOG.info("commit metadata for {} parts in {}. size: {} byte(s) "
+ "for the taskAttemptId: {} is stored in memory",
parts.size(), getPendingPartKey(), bytesWritten, taskAttemptId);
LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}",
getPath(), getPendingPartKey(), commitData);
return false;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"InMemoryMagicCommitTracker{");
sb.append(", Number of taskAttempts=").append(TASK_ATTEMPT_ID_TO_MPU_METADATA.size());
sb.append(", Number of files=").append(PATH_TO_BYTES_WRITTEN.size());
sb.append('}');
return sb.toString();
}
public static Map<String, List<SinglePendingCommit>> getTaskAttemptIdToMpuMetadata() {
return TASK_ATTEMPT_ID_TO_MPU_METADATA;
}
public static Map<Path, Long> getPathToBytesWritten() {
return PATH_TO_BYTES_WRITTEN;
}
public static Map<String, List<Path>> getTaskAttemptIdToPath() {
return TASK_ATTEMPT_ID_TO_PATH;
}
}

View File

@ -18,37 +18,22 @@
package org.apache.hadoop.fs.s3a.commit.magic; package org.apache.hadoop.fs.s3a.commit.magic;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3ADataBlocks;
import org.apache.hadoop.fs.s3a.WriteOperationHelper; import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import org.apache.hadoop.fs.s3a.commit.PutTracker; import org.apache.hadoop.fs.s3a.commit.PutTracker;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.util.Preconditions;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
/** /**
* Put tracker for Magic commits. * Put tracker for Magic commits.
@ -56,7 +41,7 @@
* uses any datatype in hadoop-mapreduce. * uses any datatype in hadoop-mapreduce.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class MagicCommitTracker extends PutTracker { public abstract class MagicCommitTracker extends PutTracker {
public static final Logger LOG = LoggerFactory.getLogger( public static final Logger LOG = LoggerFactory.getLogger(
MagicCommitTracker.class); MagicCommitTracker.class);
@ -65,7 +50,7 @@ public class MagicCommitTracker extends PutTracker {
private final Path path; private final Path path;
private final WriteOperationHelper writer; private final WriteOperationHelper writer;
private final String bucket; private final String bucket;
private static final byte[] EMPTY = new byte[0]; protected static final byte[] EMPTY = new byte[0];
private final PutTrackerStatistics trackerStatistics; private final PutTrackerStatistics trackerStatistics;
/** /**
@ -127,68 +112,11 @@ public boolean outputImmediatelyVisible() {
* @throws IllegalArgumentException bad argument * @throws IllegalArgumentException bad argument
*/ */
@Override @Override
public boolean aboutToComplete(String uploadId, public abstract boolean aboutToComplete(String uploadId,
List<CompletedPart> parts, List<CompletedPart> parts,
long bytesWritten, long bytesWritten,
final IOStatistics iostatistics) IOStatistics iostatistics)
throws IOException { throws IOException;
Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId),
"empty/null upload ID: "+ uploadId);
Preconditions.checkArgument(parts != null,
"No uploaded parts list");
Preconditions.checkArgument(!parts.isEmpty(),
"No uploaded parts to save");
// put a 0-byte file with the name of the original under-magic path
// Add the final file length as a header
// this is done before the task commit, so its duration can be
// included in the statistics
Map<String, String> headers = new HashMap<>();
headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten));
PutObjectRequest originalDestPut = writer.createPutObjectRequest(
originalDestKey,
0,
new PutObjectOptions(true, null, headers), false);
upload(originalDestPut, new ByteArrayInputStream(EMPTY));
// build the commit summary
SinglePendingCommit commitData = new SinglePendingCommit();
commitData.touch(System.currentTimeMillis());
commitData.setDestinationKey(getDestKey());
commitData.setBucket(bucket);
commitData.setUri(path.toUri().toString());
commitData.setUploadId(uploadId);
commitData.setText("");
commitData.setLength(bytesWritten);
commitData.bindCommitData(parts);
commitData.setIOStatistics(
new IOStatisticsSnapshot(iostatistics));
byte[] bytes = commitData.toBytes(SinglePendingCommit.serializer());
LOG.info("Uncommitted data pending to file {};"
+ " commit metadata for {} parts in {}. size: {} byte(s)",
path.toUri(), parts.size(), pendingPartKey, bytesWritten);
LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}",
path, pendingPartKey, commitData);
PutObjectRequest put = writer.createPutObjectRequest(
pendingPartKey,
bytes.length, null, false);
upload(put, new ByteArrayInputStream(bytes));
return false;
}
/**
* PUT an object.
* @param request the request
* @param inputStream input stream of data to be uploaded
* @throws IOException on problems
*/
@Retries.RetryTranslated
private void upload(PutObjectRequest request, InputStream inputStream) throws IOException {
trackDurationOfInvocation(trackerStatistics, COMMITTER_MAGIC_MARKER_PUT.getSymbol(),
() -> writer.putObject(request, PutObjectOptions.keepingDirs(),
new S3ADataBlocks.BlockUploadData(inputStream), false, null));
}
@Override @Override
public String toString() { public String toString() {
@ -201,4 +129,28 @@ public String toString() {
sb.append('}'); sb.append('}');
return sb.toString(); return sb.toString();
} }
public String getOriginalDestKey() {
return originalDestKey;
}
public String getPendingPartKey() {
return pendingPartKey;
}
public Path getPath() {
return path;
}
public String getBucket() {
return bucket;
}
public WriteOperationHelper getWriter() {
return writer;
}
public PutTrackerStatistics getTrackerStatistics() {
return trackerStatistics;
}
} }

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.magic;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.MagicCommitPaths;
import static org.apache.hadoop.util.Preconditions.checkArgument;
/**
* Utility class for the class {@link MagicCommitTracker} and its subclasses.
*/
public final class MagicCommitTrackerUtils {
private MagicCommitTrackerUtils() {
}
/**
* The magic path is of the following format.
* "s3://bucket-name/table-path/__magic_jobId/job-id/taskAttempt/id/tasks/taskAttemptId"
* So the third child from the "__magic" path will give the task attempt id.
* @param path Path
* @return taskAttemptId
*/
public static String extractTaskAttemptIdFromPath(Path path) {
List<String> elementsInPath = MagicCommitPaths.splitPathToElements(path);
List<String> childrenOfMagicPath = MagicCommitPaths.magicPathChildren(elementsInPath);
checkArgument(childrenOfMagicPath.size() >= 3, "Magic Path is invalid");
// 3rd child of the magic path is the taskAttemptId
return childrenOfMagicPath.get(3);
}
/**
* Is tracking of magic commit data in-memory enabled.
* @param conf Configuration
* @return true if in memory tracking of commit data is enabled.
*/
public static boolean isTrackMagicCommitsInMemoryEnabled(Configuration conf) {
return conf.getBoolean(
CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED,
CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT);
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.s3a.commit.magic; package org.apache.hadoop.fs.s3a.commit.magic;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -48,8 +49,8 @@
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TASK_ATTEMPT_ID; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TASK_ATTEMPT_ID;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA;
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.*; import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.*;
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
/** /**
@ -192,23 +193,9 @@ public void commitTask(TaskAttemptContext context) throws IOException {
*/ */
private PendingSet innerCommitTask( private PendingSet innerCommitTask(
TaskAttemptContext context) throws IOException { TaskAttemptContext context) throws IOException {
Path taskAttemptPath = getTaskAttemptPath(context);
// load in all pending commits. // load in all pending commits.
CommitOperations actions = getCommitOperations(); PendingSet pendingSet = loadPendingCommits(context);
PendingSet pendingSet;
try (CommitContext commitContext = initiateTaskOperation(context)) { try (CommitContext commitContext = initiateTaskOperation(context)) {
Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>>
loaded = actions.loadSinglePendingCommits(
taskAttemptPath, true, commitContext);
pendingSet = loaded.getKey();
List<Pair<LocatedFileStatus, IOException>> failures = loaded.getValue();
if (!failures.isEmpty()) {
// At least one file failed to load
// revert all which did; report failure with first exception
LOG.error("At least one commit file could not be read: failing");
abortPendingUploads(commitContext, pendingSet.getCommits(), true);
throw failures.get(0).getValue();
}
// patch in IDs // patch in IDs
String jobId = getUUID(); String jobId = getUUID();
String taskId = String.valueOf(context.getTaskAttemptID()); String taskId = String.valueOf(context.getTaskAttemptID());
@ -248,6 +235,84 @@ private PendingSet innerCommitTask(
return pendingSet; return pendingSet;
} }
/**
* Loads pending commits from either memory or from the remote store (S3) based on the config.
* @param context TaskAttemptContext
* @return All pending commit data for the given TaskAttemptContext
* @throws IOException
* if there is an error trying to read the commit data
*/
protected PendingSet loadPendingCommits(TaskAttemptContext context) throws IOException {
PendingSet pendingSet = new PendingSet();
if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) {
// load from memory
List<SinglePendingCommit> pendingCommits = loadPendingCommitsFromMemory(context);
for (SinglePendingCommit singleCommit : pendingCommits) {
// aggregate stats
pendingSet.getIOStatistics()
.aggregate(singleCommit.getIOStatistics());
// then clear so they aren't marshalled again.
singleCommit.getIOStatistics().clear();
}
pendingSet.setCommits(pendingCommits);
} else {
// Load from remote store
CommitOperations actions = getCommitOperations();
Path taskAttemptPath = getTaskAttemptPath(context);
try (CommitContext commitContext = initiateTaskOperation(context)) {
Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>> loaded =
actions.loadSinglePendingCommits(taskAttemptPath, true, commitContext);
pendingSet = loaded.getKey();
List<Pair<LocatedFileStatus, IOException>> failures = loaded.getValue();
if (!failures.isEmpty()) {
// At least one file failed to load
// revert all which did; report failure with first exception
LOG.error("At least one commit file could not be read: failing");
abortPendingUploads(commitContext, pendingSet.getCommits(), true);
throw failures.get(0).getValue();
}
}
}
return pendingSet;
}
/**
* Loads the pending commits from the memory data structure for a given taskAttemptId.
* @param context TaskContext
* @return list of pending commits
*/
private List<SinglePendingCommit> loadPendingCommitsFromMemory(TaskAttemptContext context) {
String taskAttemptId = String.valueOf(context.getTaskAttemptID());
// get all the pending commit metadata associated with the taskAttemptId.
// This will also remove the entry from the map.
List<SinglePendingCommit> pendingCommits =
InMemoryMagicCommitTracker.getTaskAttemptIdToMpuMetadata().remove(taskAttemptId);
// get all the path/files associated with the taskAttemptId.
// This will also remove the entry from the map.
List<Path> pathsAssociatedWithTaskAttemptId =
InMemoryMagicCommitTracker.getTaskAttemptIdToPath().remove(taskAttemptId);
// for each of the path remove the entry from map,
// This is done so that there is no memory leak.
if (pathsAssociatedWithTaskAttemptId != null) {
for (Path path : pathsAssociatedWithTaskAttemptId) {
boolean cleared =
InMemoryMagicCommitTracker.getPathToBytesWritten().remove(path) != null;
LOG.debug("Removing path: {} from the memory isSuccess: {}", path, cleared);
}
} else {
LOG.debug("No paths to remove for taskAttemptId: {}", taskAttemptId);
}
if (pendingCommits == null || pendingCommits.isEmpty()) {
LOG.info("No commit data present for the taskAttemptId: {} in the memory", taskAttemptId);
return new ArrayList<>();
}
return pendingCommits;
}
/** /**
* Abort a task. Attempt load then abort all pending files, * Abort a task. Attempt load then abort all pending files,
* then try to delete the task attempt path. * then try to delete the task attempt path.
@ -264,9 +329,14 @@ public void abortTask(TaskAttemptContext context) throws IOException {
try (DurationInfo d = new DurationInfo(LOG, try (DurationInfo d = new DurationInfo(LOG,
"Abort task %s", context.getTaskAttemptID()); "Abort task %s", context.getTaskAttemptID());
CommitContext commitContext = initiateTaskOperation(context)) { CommitContext commitContext = initiateTaskOperation(context)) {
getCommitOperations().abortAllSinglePendingCommits(attemptPath, if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) {
commitContext, List<SinglePendingCommit> pendingCommits = loadPendingCommitsFromMemory(context);
true); for (SinglePendingCommit singleCommit : pendingCommits) {
commitContext.abortSingleCommit(singleCommit);
}
} else {
getCommitOperations().abortAllSinglePendingCommits(attemptPath, commitContext, true);
}
} finally { } finally {
deleteQuietly( deleteQuietly(
attemptPath.getFileSystem(context.getConfiguration()), attemptPath.getFileSystem(context.getConfiguration()),

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.magic;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3ADataBlocks;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.util.Preconditions;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
/**
* Stores the commit data under the magic path.
*/
public class S3MagicCommitTracker extends MagicCommitTracker {
public S3MagicCommitTracker(Path path,
String bucket,
String originalDestKey,
String destKey,
String pendingsetKey,
WriteOperationHelper writer,
PutTrackerStatistics trackerStatistics) {
super(path, bucket, originalDestKey, destKey, pendingsetKey, writer, trackerStatistics);
}
@Override
public boolean aboutToComplete(String uploadId,
List<CompletedPart> parts,
long bytesWritten,
final IOStatistics iostatistics)
throws IOException {
Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId),
"empty/null upload ID: "+ uploadId);
Preconditions.checkArgument(parts != null,
"No uploaded parts list");
Preconditions.checkArgument(!parts.isEmpty(),
"No uploaded parts to save");
// put a 0-byte file with the name of the original under-magic path
// Add the final file length as a header
// this is done before the task commit, so its duration can be
// included in the statistics
Map<String, String> headers = new HashMap<>();
headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten));
PutObjectRequest originalDestPut = getWriter().createPutObjectRequest(
getOriginalDestKey(),
0,
new PutObjectOptions(true, null, headers), false);
upload(originalDestPut, new ByteArrayInputStream(EMPTY));
// build the commit summary
SinglePendingCommit commitData = new SinglePendingCommit();
commitData.touch(System.currentTimeMillis());
commitData.setDestinationKey(getDestKey());
commitData.setBucket(getBucket());
commitData.setUri(getPath().toUri().toString());
commitData.setUploadId(uploadId);
commitData.setText("");
commitData.setLength(bytesWritten);
commitData.bindCommitData(parts);
commitData.setIOStatistics(
new IOStatisticsSnapshot(iostatistics));
byte[] bytes = commitData.toBytes(SinglePendingCommit.serializer());
LOG.info("Uncommitted data pending to file {};"
+ " commit metadata for {} parts in {}. size: {} byte(s)",
getPath().toUri(), parts.size(), getPendingPartKey(), bytesWritten);
LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}",
getPath(), getPendingPartKey(), commitData);
PutObjectRequest put = getWriter().createPutObjectRequest(
getPendingPartKey(),
bytes.length, null, false);
upload(put, new ByteArrayInputStream(bytes));
return false;
}
/**
* PUT an object.
* @param request the request
* @param inputStream input stream of data to be uploaded
* @throws IOException on problems
*/
@Retries.RetryTranslated
private void upload(PutObjectRequest request, InputStream inputStream) throws IOException {
trackDurationOfInvocation(getTrackerStatistics(), COMMITTER_MAGIC_MARKER_PUT.getSymbol(),
() -> getWriter().putObject(request, PutObjectOptions.keepingDirs(),
new S3ADataBlocks.BlockUploadData(inputStream), false, null));
}
}

View File

@ -362,6 +362,13 @@ the magic directory path rewriting is enabled by default.
The Magic Committer has not been field tested to the extent of Netflix's committer; The Magic Committer has not been field tested to the extent of Netflix's committer;
consider it the least mature of the committers. consider it the least mature of the committers.
When there are less number of files to be written, The Magic committer has an option to store the commit data in-memory which can speed up the TaskCommit operation as well as save S3 cost. This can be enabled by the following property
```xml
<property>
<name>fs.s3a.committer.magic.track.commits.in.memory.enabled</name>
<value>true</value>
</property>
```
### Which Committer to Use? ### Which Committer to Use?

View File

@ -82,6 +82,7 @@
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE; import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID; import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_TASKS_SUCCEEDED; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_TASKS_SUCCEEDED;
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@ -906,8 +907,15 @@ public void testCommitterWithDuplicatedCommit() throws Exception {
assertNoMultipartUploadsPending(outDir); assertNoMultipartUploadsPending(outDir);
// commit task to fail on retry // commit task to fail on retry
// FNFE is not thrown in case of Magic committer when
// in memory commit data is enabled and hence skip the check.
boolean skipExpectFNFE = committer instanceof MagicS3GuardCommitter &&
isTrackMagicCommitsInMemoryEnabled(tContext.getConfiguration());
if (!skipExpectFNFE) {
expectFNFEonTaskCommit(committer, tContext); expectFNFEonTaskCommit(committer, tContext);
} }
}
/** /**
* HADOOP-17258. If a second task attempt is committed, it * HADOOP-17258. If a second task attempt is committed, it
@ -1422,7 +1430,10 @@ public void testOutputFormatIntegration() throws Throwable {
validateTaskAttemptPathDuringWrite(dest, expectedLength, jobData.getCommitter().getUUID()); validateTaskAttemptPathDuringWrite(dest, expectedLength, jobData.getCommitter().getUUID());
recordWriter.close(tContext); recordWriter.close(tContext);
// at this point // at this point
// Skip validation when commit data is stored in memory
if (!isTrackMagicCommitsInMemoryEnabled(conf)) {
validateTaskAttemptPathAfterWrite(dest, expectedLength); validateTaskAttemptPathAfterWrite(dest, expectedLength);
}
assertTrue("Committer does not have data to commit " + committer, assertTrue("Committer does not have data to commit " + committer,
committer.needsTaskCommit(tContext)); committer.needsTaskCommit(tContext));
commitTask(committer, tContext); commitTask(committer, tContext);

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR;
import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.junit.Before;
import org.junit.Test;
import static junit.framework.TestCase.assertEquals;
import static org.apache.hadoop.fs.s3a.commit.AbstractCommitITest.randomJobId;
/**
* Class to test {@link MagicCommitTrackerUtils}.
*/
public final class TestMagicCommitTrackerUtils {
private String jobId;
private String attemptId;
private TaskAttemptID taskAttemptId;
private static final Path DEST_PATH = new Path("s3://dummyBucket/dummyTable");
@Before
public void setup() throws Exception {
jobId = randomJobId();
attemptId = "attempt_" + jobId + "_m_000000_0";
taskAttemptId = TaskAttemptID.forName(attemptId);
}
@Test
public void testExtractTaskAttemptIdFromPath() {
TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(
new Configuration(),
taskAttemptId);
Path path = CommitUtilsWithMR
.getBaseMagicTaskAttemptPath(taskAttemptContext, "00001", DEST_PATH);
assertEquals("TaskAttemptId didn't match", attemptId,
MagicCommitTrackerUtils.extractTaskAttemptIdFromPath(path));
}
}

View File

@ -20,8 +20,11 @@
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.junit.Test; import org.junit.Test;
@ -39,7 +42,10 @@
import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter; import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.getMagicJobPath; import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.getMagicJobPath;
@ -48,8 +54,11 @@
/** /**
* Test the magic committer's commit protocol. * Test the magic committer's commit protocol.
*/ */
@RunWith(Parameterized.class)
public class ITestMagicCommitProtocol extends AbstractITCommitProtocol { public class ITestMagicCommitProtocol extends AbstractITCommitProtocol {
private final boolean trackCommitsInMemory;
@Override @Override
protected String suitename() { protected String suitename() {
return "ITestMagicCommitProtocol"; return "ITestMagicCommitProtocol";
@ -71,6 +80,27 @@ public void setup() throws Exception {
CommitUtils.verifyIsMagicCommitFS(getFileSystem()); CommitUtils.verifyIsMagicCommitFS(getFileSystem());
} }
@Parameterized.Parameters(name = "track-commit-in-memory-{0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{false},
{true}
});
}
public ITestMagicCommitProtocol(boolean trackCommitsInMemory) {
this.trackCommitsInMemory = trackCommitsInMemory;
}
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf, FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED);
conf.setBoolean(FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED, trackCommitsInMemory);
return conf;
}
@Override @Override
public void assertJobAbortCleanedUp(JobData jobData) public void assertJobAbortCleanedUp(JobData jobData)
throws Exception { throws Exception {

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.examples.terasort.TeraValidate; import org.apache.hadoop.examples.terasort.TeraValidate;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest; import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
@ -97,6 +98,9 @@ public class ITestTerasortOnS3A extends AbstractYarnClusterITest {
/** Name of the committer for this run. */ /** Name of the committer for this run. */
private final String committerName; private final String committerName;
/** Should Magic committer track pending commits in-memory. */
private final boolean trackCommitsInMemory;
/** Base path for all the terasort input and output paths. */ /** Base path for all the terasort input and output paths. */
private Path terasortPath; private Path terasortPath;
@ -117,12 +121,14 @@ public class ITestTerasortOnS3A extends AbstractYarnClusterITest {
@Parameterized.Parameters(name = "{0}") @Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> params() { public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{ return Arrays.asList(new Object[][]{
{DirectoryStagingCommitter.NAME}, {DirectoryStagingCommitter.NAME, false},
{MagicS3GuardCommitter.NAME}}); {MagicS3GuardCommitter.NAME, false},
{MagicS3GuardCommitter.NAME, true}});
} }
public ITestTerasortOnS3A(final String committerName) { public ITestTerasortOnS3A(final String committerName, final boolean trackCommitsInMemory) {
this.committerName = committerName; this.committerName = committerName;
this.trackCommitsInMemory = trackCommitsInMemory;
} }
@Override @Override
@ -152,6 +158,9 @@ protected void applyCustomConfigOptions(JobConf conf) {
conf.setBoolean( conf.setBoolean(
TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
false); false);
conf.setBoolean(
CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED,
trackCommitsInMemory);
} }
private int getExpectedPartitionCount() { private int getExpectedPartitionCount() {
@ -173,7 +182,7 @@ protected int getRowCount() {
*/ */
private void prepareToTerasort() { private void prepareToTerasort() {
// small sample size for faster runs // small sample size for faster runs
terasortPath = new Path("/terasort-" + committerName) terasortPath = new Path("/terasort-" + committerName + "-" + trackCommitsInMemory)
.makeQualified(getFileSystem()); .makeQualified(getFileSystem());
sortInput = new Path(terasortPath, "sortin"); sortInput = new Path(terasortPath, "sortin");
sortOutput = new Path(terasortPath, "sortout"); sortOutput = new Path(terasortPath, "sortout");