From 032796a0fb21b432b8d873902fe00b44afa2496c Mon Sep 17 00:00:00 2001 From: Syed Shameerur Rahman Date: Tue, 26 Mar 2024 20:59:35 +0530 Subject: [PATCH] HADOOP-19047: S3A: Support in-memory tracking of Magic Commit data (#6468) If the option fs.s3a.committer.magic.track.commits.in.memory.enabled is set to true, then rather than save data about in-progress uploads to S3, this information is cached in memory. If the number of files being committed is low, this will save network IO in both the generation of .pending and marker files, and in the scanning of task attempt directory trees during task commit. Contributed by Syed Shameerur Rahman --- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 18 +++ .../hadoop/fs/s3a/commit/CommitConstants.java | 16 ++ .../fs/s3a/commit/MagicCommitIntegration.java | 20 ++- .../magic/InMemoryMagicCommitTracker.java | 146 ++++++++++++++++++ .../s3a/commit/magic/MagicCommitTracker.java | 106 ++++--------- .../commit/magic/MagicCommitTrackerUtils.java | 64 ++++++++ .../commit/magic/MagicS3GuardCommitter.java | 108 ++++++++++--- .../commit/magic/S3MagicCommitTracker.java | 125 +++++++++++++++ .../markdown/tools/hadoop-aws/committers.md | 7 + .../s3a/commit/AbstractITCommitProtocol.java | 15 +- .../commit/TestMagicCommitTrackerUtils.java | 64 ++++++++ .../magic/ITestMagicCommitProtocol.java | 30 ++++ .../commit/terasort/ITestTerasortOnS3A.java | 17 +- 13 files changed, 626 insertions(+), 110 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitTrackerUtils.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index a404fc1c21..0e2ae0f74d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -117,6 +117,7 @@ import org.apache.hadoop.fs.s3a.auth.SignerManager; import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations; import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider; +import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker; import org.apache.hadoop.fs.s3a.impl.AWSCannedACL; import org.apache.hadoop.fs.s3a.impl.AWSHeaders; import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler; @@ -231,6 +232,8 @@ import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME; +import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE; import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE; @@ -3900,6 +3903,21 @@ public void access(final Path f, final FsAction mode) @Retries.RetryTranslated public FileStatus getFileStatus(final Path f) throws IOException { Path path = qualify(f); + if (isTrackMagicCommitsInMemoryEnabled(getConf()) && isMagicCommitPath(path)) { + // Some downstream apps might call getFileStatus for a magic path to get the file size. + // when commit data is stored in memory construct the dummy S3AFileStatus with correct + // file size fetched from the memory. + if (InMemoryMagicCommitTracker.getPathToBytesWritten().containsKey(path)) { + long len = InMemoryMagicCommitTracker.getPathToBytesWritten().get(path); + return new S3AFileStatus(len, + 0L, + path, + getDefaultBlockSize(path), + username, + MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME, + null); + } + } return trackDurationAndSpan( INVOCATION_GET_FILE_STATUS, path, () -> innerGetFileStatus(path, false, StatusProbeEnum.ALL)); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java index 52df58d6a4..4f00055099 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java @@ -58,6 +58,10 @@ private CommitConstants() { */ public static final String PENDINGSET_SUFFIX = ".pendingset"; + /** + * Etag name to be returned on non-committed S3 object: {@value}. + */ + public static final String MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME = "pending"; /** * Prefix to use for config options: {@value}. @@ -242,6 +246,18 @@ private CommitConstants() { */ public static final int DEFAULT_COMMITTER_THREADS = 32; + /** + * Should Magic committer track all the pending commits in memory? + */ + public static final String FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED = + "fs.s3a.committer.magic.track.commits.in.memory.enabled"; + + /** + * Default value for {@link #FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED}: {@value}. + */ + public static final boolean FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT = + false; + /** * Path in the cluster filesystem for temporary data: {@value}. * This is for HDFS, not the local filesystem. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java index e6524c9196..ba1dd400f6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java @@ -26,11 +26,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.Statistic; -import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker; +import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker; +import org.apache.hadoop.fs.s3a.commit.magic.S3MagicCommitTracker; import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation; import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*; +import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled; /** * Adds the code needed for S3A to support magic committers. @@ -105,13 +107,15 @@ public PutTracker createTracker(Path path, String key, String pendingsetPath = key + CommitConstants.PENDING_SUFFIX; getStoreContext().incrementStatistic( Statistic.COMMITTER_MAGIC_FILES_CREATED); - tracker = new MagicCommitTracker(path, - getStoreContext().getBucket(), - key, - destKey, - pendingsetPath, - owner.getWriteOperationHelper(), - trackerStatistics); + if (isTrackMagicCommitsInMemoryEnabled(getStoreContext().getConfiguration())) { + tracker = new InMemoryMagicCommitTracker(path, getStoreContext().getBucket(), + key, destKey, pendingsetPath, owner.getWriteOperationHelper(), + trackerStatistics); + } else { + tracker = new S3MagicCommitTracker(path, getStoreContext().getBucket(), + key, destKey, pendingsetPath, owner.getWriteOperationHelper(), + trackerStatistics); + } LOG.debug("Created {}", tracker); } else { LOG.warn("File being created has a \"magic\" path, but the filesystem" diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java new file mode 100644 index 0000000000..8e36b1e485 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import software.amazon.awssdk.services.s3.model.CompletedPart; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.util.Preconditions; + +import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.extractTaskAttemptIdFromPath; + +/** + * InMemoryMagicCommitTracker stores the commit data in memory. + * The commit data and related data stores are flushed out from + * the memory when the task is committed or aborted. + */ +public class InMemoryMagicCommitTracker extends MagicCommitTracker { + + /** + * Map to store taskAttemptId, and it's corresponding list of pending commit data. + * The entries in the Map gets removed when a task commits or aborts. + */ + private final static Map> TASK_ATTEMPT_ID_TO_MPU_METADATA = new ConcurrentHashMap<>(); + + /** + * Map to store path of the file, and it's corresponding size. + * The entries in the Map gets removed when a task commits or aborts. + */ + private final static Map PATH_TO_BYTES_WRITTEN = new ConcurrentHashMap<>(); + + /** + * Map to store taskAttemptId, and list of paths to files written by it. + * The entries in the Map gets removed when a task commits or aborts. + */ + private final static Map> TASK_ATTEMPT_ID_TO_PATH = new ConcurrentHashMap<>(); + + public InMemoryMagicCommitTracker(Path path, + String bucket, + String originalDestKey, + String destKey, + String pendingsetKey, + WriteOperationHelper writer, + PutTrackerStatistics trackerStatistics) { + super(path, bucket, originalDestKey, destKey, pendingsetKey, writer, trackerStatistics); + } + + @Override + public boolean aboutToComplete(String uploadId, + List parts, + long bytesWritten, + final IOStatistics iostatistics) + throws IOException { + Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId), + "empty/null upload ID: " + uploadId); + Preconditions.checkArgument(parts != null, "No uploaded parts list"); + Preconditions.checkArgument(!parts.isEmpty(), "No uploaded parts to save"); + + // build the commit summary + SinglePendingCommit commitData = new SinglePendingCommit(); + commitData.touch(System.currentTimeMillis()); + commitData.setDestinationKey(getDestKey()); + commitData.setBucket(getBucket()); + commitData.setUri(getPath().toUri().toString()); + commitData.setUploadId(uploadId); + commitData.setText(""); + commitData.setLength(bytesWritten); + commitData.bindCommitData(parts); + commitData.setIOStatistics(new IOStatisticsSnapshot(iostatistics)); + + // extract the taskAttemptId from the path + String taskAttemptId = extractTaskAttemptIdFromPath(getPath()); + + // store the commit data with taskAttemptId as the key + TASK_ATTEMPT_ID_TO_MPU_METADATA.computeIfAbsent(taskAttemptId, + k -> Collections.synchronizedList(new ArrayList<>())).add(commitData); + + // store the byteswritten(length) for the corresponding file + PATH_TO_BYTES_WRITTEN.put(getPath(), bytesWritten); + + // store the mapping between taskAttemptId and path + // This information is used for removing entries from + // the map once the taskAttempt is completed/committed. + TASK_ATTEMPT_ID_TO_PATH.computeIfAbsent(taskAttemptId, + k -> Collections.synchronizedList(new ArrayList<>())).add(getPath()); + + LOG.info("commit metadata for {} parts in {}. size: {} byte(s) " + + "for the taskAttemptId: {} is stored in memory", + parts.size(), getPendingPartKey(), bytesWritten, taskAttemptId); + LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}", + getPath(), getPendingPartKey(), commitData); + + return false; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "InMemoryMagicCommitTracker{"); + sb.append(", Number of taskAttempts=").append(TASK_ATTEMPT_ID_TO_MPU_METADATA.size()); + sb.append(", Number of files=").append(PATH_TO_BYTES_WRITTEN.size()); + sb.append('}'); + return sb.toString(); + } + + + public static Map> getTaskAttemptIdToMpuMetadata() { + return TASK_ATTEMPT_ID_TO_MPU_METADATA; + } + + public static Map getPathToBytesWritten() { + return PATH_TO_BYTES_WRITTEN; + } + + public static Map> getTaskAttemptIdToPath() { + return TASK_ATTEMPT_ID_TO_PATH; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java index b2e703e1b0..62151658b5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java @@ -18,37 +18,22 @@ package org.apache.hadoop.fs.s3a.commit.magic; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; import java.util.List; -import java.util.Map; import software.amazon.awssdk.services.s3.model.CompletedPart; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.Retries; -import org.apache.hadoop.fs.s3a.S3ADataBlocks; import org.apache.hadoop.fs.s3a.WriteOperationHelper; import org.apache.hadoop.fs.s3a.commit.PutTracker; -import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; -import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; -import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; -import org.apache.hadoop.util.Preconditions; import static java.util.Objects.requireNonNull; -import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER; -import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; /** * Put tracker for Magic commits. @@ -56,7 +41,7 @@ * uses any datatype in hadoop-mapreduce. */ @InterfaceAudience.Private -public class MagicCommitTracker extends PutTracker { +public abstract class MagicCommitTracker extends PutTracker { public static final Logger LOG = LoggerFactory.getLogger( MagicCommitTracker.class); @@ -65,7 +50,7 @@ public class MagicCommitTracker extends PutTracker { private final Path path; private final WriteOperationHelper writer; private final String bucket; - private static final byte[] EMPTY = new byte[0]; + protected static final byte[] EMPTY = new byte[0]; private final PutTrackerStatistics trackerStatistics; /** @@ -127,68 +112,11 @@ public boolean outputImmediatelyVisible() { * @throws IllegalArgumentException bad argument */ @Override - public boolean aboutToComplete(String uploadId, + public abstract boolean aboutToComplete(String uploadId, List parts, long bytesWritten, - final IOStatistics iostatistics) - throws IOException { - Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId), - "empty/null upload ID: "+ uploadId); - Preconditions.checkArgument(parts != null, - "No uploaded parts list"); - Preconditions.checkArgument(!parts.isEmpty(), - "No uploaded parts to save"); - - // put a 0-byte file with the name of the original under-magic path - // Add the final file length as a header - // this is done before the task commit, so its duration can be - // included in the statistics - Map headers = new HashMap<>(); - headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten)); - PutObjectRequest originalDestPut = writer.createPutObjectRequest( - originalDestKey, - 0, - new PutObjectOptions(true, null, headers), false); - upload(originalDestPut, new ByteArrayInputStream(EMPTY)); - - // build the commit summary - SinglePendingCommit commitData = new SinglePendingCommit(); - commitData.touch(System.currentTimeMillis()); - commitData.setDestinationKey(getDestKey()); - commitData.setBucket(bucket); - commitData.setUri(path.toUri().toString()); - commitData.setUploadId(uploadId); - commitData.setText(""); - commitData.setLength(bytesWritten); - commitData.bindCommitData(parts); - commitData.setIOStatistics( - new IOStatisticsSnapshot(iostatistics)); - - byte[] bytes = commitData.toBytes(SinglePendingCommit.serializer()); - LOG.info("Uncommitted data pending to file {};" - + " commit metadata for {} parts in {}. size: {} byte(s)", - path.toUri(), parts.size(), pendingPartKey, bytesWritten); - LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}", - path, pendingPartKey, commitData); - PutObjectRequest put = writer.createPutObjectRequest( - pendingPartKey, - bytes.length, null, false); - upload(put, new ByteArrayInputStream(bytes)); - return false; - - } - /** - * PUT an object. - * @param request the request - * @param inputStream input stream of data to be uploaded - * @throws IOException on problems - */ - @Retries.RetryTranslated - private void upload(PutObjectRequest request, InputStream inputStream) throws IOException { - trackDurationOfInvocation(trackerStatistics, COMMITTER_MAGIC_MARKER_PUT.getSymbol(), - () -> writer.putObject(request, PutObjectOptions.keepingDirs(), - new S3ADataBlocks.BlockUploadData(inputStream), false, null)); - } + IOStatistics iostatistics) + throws IOException; @Override public String toString() { @@ -201,4 +129,28 @@ public String toString() { sb.append('}'); return sb.toString(); } + + public String getOriginalDestKey() { + return originalDestKey; + } + + public String getPendingPartKey() { + return pendingPartKey; + } + + public Path getPath() { + return path; + } + + public String getBucket() { + return bucket; + } + + public WriteOperationHelper getWriter() { + return writer; + } + + public PutTrackerStatistics getTrackerStatistics() { + return trackerStatistics; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java new file mode 100644 index 0000000000..2ceac1c8e0 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.MagicCommitPaths; + +import static org.apache.hadoop.util.Preconditions.checkArgument; + +/** + * Utility class for the class {@link MagicCommitTracker} and its subclasses. + */ +public final class MagicCommitTrackerUtils { + + private MagicCommitTrackerUtils() { + } + + /** + * The magic path is of the following format. + * "s3://bucket-name/table-path/__magic_jobId/job-id/taskAttempt/id/tasks/taskAttemptId" + * So the third child from the "__magic" path will give the task attempt id. + * @param path Path + * @return taskAttemptId + */ + public static String extractTaskAttemptIdFromPath(Path path) { + List elementsInPath = MagicCommitPaths.splitPathToElements(path); + List childrenOfMagicPath = MagicCommitPaths.magicPathChildren(elementsInPath); + + checkArgument(childrenOfMagicPath.size() >= 3, "Magic Path is invalid"); + // 3rd child of the magic path is the taskAttemptId + return childrenOfMagicPath.get(3); + } + + /** + * Is tracking of magic commit data in-memory enabled. + * @param conf Configuration + * @return true if in memory tracking of commit data is enabled. + */ + public static boolean isTrackMagicCommitsInMemoryEnabled(Configuration conf) { + return conf.getBoolean( + CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED, + CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index 518831b7d4..5ed1a3abd4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a.commit.magic; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; @@ -48,8 +49,8 @@ import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TASK_ATTEMPT_ID; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA; import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; -import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*; import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.*; +import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; /** @@ -192,23 +193,9 @@ public void commitTask(TaskAttemptContext context) throws IOException { */ private PendingSet innerCommitTask( TaskAttemptContext context) throws IOException { - Path taskAttemptPath = getTaskAttemptPath(context); // load in all pending commits. - CommitOperations actions = getCommitOperations(); - PendingSet pendingSet; + PendingSet pendingSet = loadPendingCommits(context); try (CommitContext commitContext = initiateTaskOperation(context)) { - Pair>> - loaded = actions.loadSinglePendingCommits( - taskAttemptPath, true, commitContext); - pendingSet = loaded.getKey(); - List> failures = loaded.getValue(); - if (!failures.isEmpty()) { - // At least one file failed to load - // revert all which did; report failure with first exception - LOG.error("At least one commit file could not be read: failing"); - abortPendingUploads(commitContext, pendingSet.getCommits(), true); - throw failures.get(0).getValue(); - } // patch in IDs String jobId = getUUID(); String taskId = String.valueOf(context.getTaskAttemptID()); @@ -248,6 +235,84 @@ private PendingSet innerCommitTask( return pendingSet; } + /** + * Loads pending commits from either memory or from the remote store (S3) based on the config. + * @param context TaskAttemptContext + * @return All pending commit data for the given TaskAttemptContext + * @throws IOException + * if there is an error trying to read the commit data + */ + protected PendingSet loadPendingCommits(TaskAttemptContext context) throws IOException { + PendingSet pendingSet = new PendingSet(); + if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) { + // load from memory + List pendingCommits = loadPendingCommitsFromMemory(context); + + for (SinglePendingCommit singleCommit : pendingCommits) { + // aggregate stats + pendingSet.getIOStatistics() + .aggregate(singleCommit.getIOStatistics()); + // then clear so they aren't marshalled again. + singleCommit.getIOStatistics().clear(); + } + pendingSet.setCommits(pendingCommits); + } else { + // Load from remote store + CommitOperations actions = getCommitOperations(); + Path taskAttemptPath = getTaskAttemptPath(context); + try (CommitContext commitContext = initiateTaskOperation(context)) { + Pair>> loaded = + actions.loadSinglePendingCommits(taskAttemptPath, true, commitContext); + pendingSet = loaded.getKey(); + List> failures = loaded.getValue(); + if (!failures.isEmpty()) { + // At least one file failed to load + // revert all which did; report failure with first exception + LOG.error("At least one commit file could not be read: failing"); + abortPendingUploads(commitContext, pendingSet.getCommits(), true); + throw failures.get(0).getValue(); + } + } + } + return pendingSet; + } + + /** + * Loads the pending commits from the memory data structure for a given taskAttemptId. + * @param context TaskContext + * @return list of pending commits + */ + private List loadPendingCommitsFromMemory(TaskAttemptContext context) { + String taskAttemptId = String.valueOf(context.getTaskAttemptID()); + // get all the pending commit metadata associated with the taskAttemptId. + // This will also remove the entry from the map. + List pendingCommits = + InMemoryMagicCommitTracker.getTaskAttemptIdToMpuMetadata().remove(taskAttemptId); + // get all the path/files associated with the taskAttemptId. + // This will also remove the entry from the map. + List pathsAssociatedWithTaskAttemptId = + InMemoryMagicCommitTracker.getTaskAttemptIdToPath().remove(taskAttemptId); + + // for each of the path remove the entry from map, + // This is done so that there is no memory leak. + if (pathsAssociatedWithTaskAttemptId != null) { + for (Path path : pathsAssociatedWithTaskAttemptId) { + boolean cleared = + InMemoryMagicCommitTracker.getPathToBytesWritten().remove(path) != null; + LOG.debug("Removing path: {} from the memory isSuccess: {}", path, cleared); + } + } else { + LOG.debug("No paths to remove for taskAttemptId: {}", taskAttemptId); + } + + if (pendingCommits == null || pendingCommits.isEmpty()) { + LOG.info("No commit data present for the taskAttemptId: {} in the memory", taskAttemptId); + return new ArrayList<>(); + } + + return pendingCommits; + } + /** * Abort a task. Attempt load then abort all pending files, * then try to delete the task attempt path. @@ -264,9 +329,14 @@ public void abortTask(TaskAttemptContext context) throws IOException { try (DurationInfo d = new DurationInfo(LOG, "Abort task %s", context.getTaskAttemptID()); CommitContext commitContext = initiateTaskOperation(context)) { - getCommitOperations().abortAllSinglePendingCommits(attemptPath, - commitContext, - true); + if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) { + List pendingCommits = loadPendingCommitsFromMemory(context); + for (SinglePendingCommit singleCommit : pendingCommits) { + commitContext.abortSingleCommit(singleCommit); + } + } else { + getCommitOperations().abortAllSinglePendingCommits(attemptPath, commitContext, true); + } } finally { deleteQuietly( attemptPath.getFileSystem(context.getConfiguration()), diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java new file mode 100644 index 0000000000..0ab3cee520 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3ADataBlocks; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; +import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.util.Preconditions; + +import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; + +/** + * Stores the commit data under the magic path. + */ +public class S3MagicCommitTracker extends MagicCommitTracker { + + public S3MagicCommitTracker(Path path, + String bucket, + String originalDestKey, + String destKey, + String pendingsetKey, + WriteOperationHelper writer, + PutTrackerStatistics trackerStatistics) { + super(path, bucket, originalDestKey, destKey, pendingsetKey, writer, trackerStatistics); + } + + @Override + public boolean aboutToComplete(String uploadId, + List parts, + long bytesWritten, + final IOStatistics iostatistics) + throws IOException { + Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId), + "empty/null upload ID: "+ uploadId); + Preconditions.checkArgument(parts != null, + "No uploaded parts list"); + Preconditions.checkArgument(!parts.isEmpty(), + "No uploaded parts to save"); + + // put a 0-byte file with the name of the original under-magic path + // Add the final file length as a header + // this is done before the task commit, so its duration can be + // included in the statistics + Map headers = new HashMap<>(); + headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten)); + PutObjectRequest originalDestPut = getWriter().createPutObjectRequest( + getOriginalDestKey(), + 0, + new PutObjectOptions(true, null, headers), false); + upload(originalDestPut, new ByteArrayInputStream(EMPTY)); + + // build the commit summary + SinglePendingCommit commitData = new SinglePendingCommit(); + commitData.touch(System.currentTimeMillis()); + commitData.setDestinationKey(getDestKey()); + commitData.setBucket(getBucket()); + commitData.setUri(getPath().toUri().toString()); + commitData.setUploadId(uploadId); + commitData.setText(""); + commitData.setLength(bytesWritten); + commitData.bindCommitData(parts); + commitData.setIOStatistics( + new IOStatisticsSnapshot(iostatistics)); + + byte[] bytes = commitData.toBytes(SinglePendingCommit.serializer()); + LOG.info("Uncommitted data pending to file {};" + + " commit metadata for {} parts in {}. size: {} byte(s)", + getPath().toUri(), parts.size(), getPendingPartKey(), bytesWritten); + LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}", + getPath(), getPendingPartKey(), commitData); + PutObjectRequest put = getWriter().createPutObjectRequest( + getPendingPartKey(), + bytes.length, null, false); + upload(put, new ByteArrayInputStream(bytes)); + return false; + } + + /** + * PUT an object. + * @param request the request + * @param inputStream input stream of data to be uploaded + * @throws IOException on problems + */ + @Retries.RetryTranslated + private void upload(PutObjectRequest request, InputStream inputStream) throws IOException { + trackDurationOfInvocation(getTrackerStatistics(), COMMITTER_MAGIC_MARKER_PUT.getSymbol(), + () -> getWriter().putObject(request, PutObjectOptions.keepingDirs(), + new S3ADataBlocks.BlockUploadData(inputStream), false, null)); + } +} diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md index fb42d507b2..8958154449 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md @@ -362,6 +362,13 @@ the magic directory path rewriting is enabled by default. The Magic Committer has not been field tested to the extent of Netflix's committer; consider it the least mature of the committers. +When there are less number of files to be written, The Magic committer has an option to store the commit data in-memory which can speed up the TaskCommit operation as well as save S3 cost. This can be enabled by the following property +```xml + + fs.s3a.committer.magic.track.commits.in.memory.enabled + true + +``` ### Which Committer to Use? diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java index 67c88039aa..3a7cceb236 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java @@ -82,6 +82,7 @@ import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE; import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_TASKS_SUCCEEDED; +import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -906,7 +907,14 @@ public void testCommitterWithDuplicatedCommit() throws Exception { assertNoMultipartUploadsPending(outDir); // commit task to fail on retry - expectFNFEonTaskCommit(committer, tContext); + // FNFE is not thrown in case of Magic committer when + // in memory commit data is enabled and hence skip the check. + boolean skipExpectFNFE = committer instanceof MagicS3GuardCommitter && + isTrackMagicCommitsInMemoryEnabled(tContext.getConfiguration()); + + if (!skipExpectFNFE) { + expectFNFEonTaskCommit(committer, tContext); + } } /** @@ -1422,7 +1430,10 @@ public void testOutputFormatIntegration() throws Throwable { validateTaskAttemptPathDuringWrite(dest, expectedLength, jobData.getCommitter().getUUID()); recordWriter.close(tContext); // at this point - validateTaskAttemptPathAfterWrite(dest, expectedLength); + // Skip validation when commit data is stored in memory + if (!isTrackMagicCommitsInMemoryEnabled(conf)) { + validateTaskAttemptPathAfterWrite(dest, expectedLength); + } assertTrue("Committer does not have data to commit " + committer, committer.needsTaskCommit(tContext)); commitTask(committer, tContext); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitTrackerUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitTrackerUtils.java new file mode 100644 index 0000000000..a08f8d2d34 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitTrackerUtils.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.fs.s3a.commit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR; +import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.junit.Before; +import org.junit.Test; + +import static junit.framework.TestCase.assertEquals; +import static org.apache.hadoop.fs.s3a.commit.AbstractCommitITest.randomJobId; + +/** + * Class to test {@link MagicCommitTrackerUtils}. + */ +public final class TestMagicCommitTrackerUtils { + + private String jobId; + private String attemptId; + private TaskAttemptID taskAttemptId; + private static final Path DEST_PATH = new Path("s3://dummyBucket/dummyTable"); + + + @Before + public void setup() throws Exception { + jobId = randomJobId(); + attemptId = "attempt_" + jobId + "_m_000000_0"; + taskAttemptId = TaskAttemptID.forName(attemptId); + } + + @Test + public void testExtractTaskAttemptIdFromPath() { + TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl( + new Configuration(), + taskAttemptId); + Path path = CommitUtilsWithMR + .getBaseMagicTaskAttemptPath(taskAttemptContext, "00001", DEST_PATH); + assertEquals("TaskAttemptId didn't match", attemptId, + MagicCommitTrackerUtils.extractTaskAttemptIdFromPath(path)); + + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java index fa963a4b97..cbfc23a2a2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java @@ -20,8 +20,11 @@ import java.io.IOException; import java.net.URI; +import java.util.Arrays; +import java.util.Collection; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -39,7 +42,10 @@ import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.getMagicJobPath; @@ -48,8 +54,11 @@ /** * Test the magic committer's commit protocol. */ +@RunWith(Parameterized.class) public class ITestMagicCommitProtocol extends AbstractITCommitProtocol { + private final boolean trackCommitsInMemory; + @Override protected String suitename() { return "ITestMagicCommitProtocol"; @@ -71,6 +80,27 @@ public void setup() throws Exception { CommitUtils.verifyIsMagicCommitFS(getFileSystem()); } + @Parameterized.Parameters(name = "track-commit-in-memory-{0}") + public static Collection params() { + return Arrays.asList(new Object[][]{ + {false}, + {true} + }); + } + + public ITestMagicCommitProtocol(boolean trackCommitsInMemory) { + this.trackCommitsInMemory = trackCommitsInMemory; + } + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + removeBaseAndBucketOverrides(conf, FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED); + conf.setBoolean(FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED, trackCommitsInMemory); + + return conf; + } + @Override public void assertJobAbortCleanedUp(JobData jobData) throws Exception { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java index d28ee5172b..be52220833 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java @@ -44,6 +44,7 @@ import org.apache.hadoop.examples.terasort.TeraValidate; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; import org.apache.hadoop.mapred.JobConf; @@ -97,6 +98,9 @@ public class ITestTerasortOnS3A extends AbstractYarnClusterITest { /** Name of the committer for this run. */ private final String committerName; + /** Should Magic committer track pending commits in-memory. */ + private final boolean trackCommitsInMemory; + /** Base path for all the terasort input and output paths. */ private Path terasortPath; @@ -117,12 +121,14 @@ public class ITestTerasortOnS3A extends AbstractYarnClusterITest { @Parameterized.Parameters(name = "{0}") public static Collection params() { return Arrays.asList(new Object[][]{ - {DirectoryStagingCommitter.NAME}, - {MagicS3GuardCommitter.NAME}}); + {DirectoryStagingCommitter.NAME, false}, + {MagicS3GuardCommitter.NAME, false}, + {MagicS3GuardCommitter.NAME, true}}); } - public ITestTerasortOnS3A(final String committerName) { + public ITestTerasortOnS3A(final String committerName, final boolean trackCommitsInMemory) { this.committerName = committerName; + this.trackCommitsInMemory = trackCommitsInMemory; } @Override @@ -152,6 +158,9 @@ protected void applyCustomConfigOptions(JobConf conf) { conf.setBoolean( TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), false); + conf.setBoolean( + CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED, + trackCommitsInMemory); } private int getExpectedPartitionCount() { @@ -173,7 +182,7 @@ protected int getRowCount() { */ private void prepareToTerasort() { // small sample size for faster runs - terasortPath = new Path("/terasort-" + committerName) + terasortPath = new Path("/terasort-" + committerName + "-" + trackCommitsInMemory) .makeQualified(getFileSystem()); sortInput = new Path(terasortPath, "sortin"); sortOutput = new Path(terasortPath, "sortout");