From 5512c9f92464d9c7d83a79d2460efc2537fc4c3f Mon Sep 17 00:00:00 2001 From: Syed Shameerur Rahman Date: Wed, 20 Sep 2023 15:56:42 +0530 Subject: [PATCH] HADOOP-18797. Support Concurrent Writes With S3A Magic Committer (#6006) Jobs which commit their work to S3 thr magic committer now use a unique magic containing the job ID: __magic_job-${jobid} This allows for multiple jobs to write to the same destination simultaneously. Contributed by Syed Shameerur Rahman --- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 4 +- .../hadoop/fs/s3a/commit/CommitConstants.java | 6 ++- .../commit/InternalCommitterConstants.java | 4 +- .../fs/s3a/commit/MagicCommitPaths.java | 35 ++++++++-------- .../fs/s3a/commit/impl/CommitUtilsWithMR.java | 19 +++++---- .../commit/magic/MagicS3GuardCommitter.java | 3 +- .../hadoop-aws/committer_architecture.md | 42 +++++++++---------- .../markdown/tools/hadoop-aws/committers.md | 15 ++++--- .../s3a/commit/AbstractITCommitProtocol.java | 6 ++- .../s3a/commit/AbstractYarnClusterITest.java | 3 +- .../fs/s3a/commit/CommitterTestHelper.java | 6 ++- .../s3a/commit/ITestCommitOperationCost.java | 11 ++--- .../fs/s3a/commit/ITestCommitOperations.java | 6 +-- .../fs/s3a/commit/TestMagicCommitPaths.java | 26 ++++++------ .../integration/ITestS3ACommitterMRJob.java | 16 +++---- .../magic/ITestMagicCommitProtocol.java | 18 ++++---- .../magic/ITestS3AHugeMagicCommits.java | 9 ++-- .../ITestStagingCommitProtocol.java | 3 +- .../fs/s3a/impl/TestHeaderProcessing.java | 2 +- 19 files changed, 125 insertions(+), 109 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 9307c4c265..fa7de69140 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -3669,7 +3669,7 @@ public UserGroupInformation getOwner() { * directories. Has the semantics of Unix {@code 'mkdir -p'}. * Existence of the directory hierarchy is not an error. * Parent elements are scanned to see if any are a file, - * except under __magic paths. + * except under "MAGIC PATH" paths. * There the FS assumes that the destination directory creation * did that scan and that paths in job/task attempts are all * "well formed" @@ -4769,7 +4769,7 @@ public boolean isMagicCommitPath(Path path) { /** * Predicate: is a path under a magic commit path? - * True if magic commit is enabled and the path is under __magic, + * True if magic commit is enabled and the path is under "MAGIC PATH", * irrespective of file type. * @param path path to examine * @return true if the path is in a magic dir and the FS has magic writes enabled. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java index 6e2a5d8c9f..52df58d6a4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java @@ -39,6 +39,8 @@ private CommitConstants() { * {@value}. */ public static final String MAGIC = "__magic"; + public static final String JOB_ID_PREFIX = "job-"; + public static final String MAGIC_PATH_PREFIX = MAGIC + "_" + JOB_ID_PREFIX; /** * Marker of the start of a directory tree for calculating @@ -280,10 +282,12 @@ private CommitConstants() { /** * Default configuration value for * {@link #FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS}. + * It is disabled by default to support concurrent writes on the same + * parent directory but different partition/sub directory. * Value: {@value}. */ public static final boolean DEFAULT_FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS = - true; + false; /** * The limit to the number of committed objects tracked during diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java index b2d4bfaeea..ee07e652be 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java @@ -25,8 +25,8 @@ import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitterFactory; import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterFactory; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX; /** * These are internal constants not intended for public use. @@ -108,7 +108,7 @@ private InternalCommitterConstants() { /** Error message for a path without a magic element in the list: {@value}. */ public static final String E_NO_MAGIC_PATH_ELEMENT - = "No " + MAGIC + " element in path"; + = "No " + MAGIC_PATH_PREFIX + " element in path"; /** * The UUID for jobs: {@value}. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java index 18bb90da82..011d811fc0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java @@ -21,13 +21,15 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.stream.IntStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.StringUtils; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX; import static org.apache.hadoop.util.Preconditions.checkArgument; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_MAGIC_PATH_ELEMENT; /** @@ -76,7 +78,8 @@ public static List splitPathToElements(Path path) { * @return true if a path is considered magic */ public static boolean isMagicPath(List elements) { - return elements.contains(MAGIC); + return elements.stream() + .anyMatch(element -> element.startsWith(MAGIC_PATH_PREFIX)); } /** @@ -96,9 +99,16 @@ public static boolean containsBasePath(List elements) { * @throws IllegalArgumentException if there is no magic element */ public static int magicElementIndex(List elements) { - int index = elements.indexOf(MAGIC); - checkArgument(index >= 0, E_NO_MAGIC_PATH_ELEMENT); - return index; + Optional index = IntStream.range(0, elements.size()) + .filter(i -> elements.get(i).startsWith(MAGIC_PATH_PREFIX)) + .boxed() + .findFirst(); + + if (index.isPresent()) { + return index.get(); + } else { + throw new IllegalArgumentException(E_NO_MAGIC_PATH_ELEMENT); + } } /** @@ -182,18 +192,9 @@ public static String lastElement(List strings) { return strings.get(strings.size() - 1); } - /** - * Get the magic subdirectory of a destination directory. - * @param destDir the destination directory - * @return a new path. - */ - public static Path magicSubdir(Path destDir) { - return new Path(destDir, MAGIC); - } - /** * Calculates the final destination of a file. - * This is the parent of any {@code __magic} element, and the filename + * This is the parent of any "MAGIC PATH" element, and the filename * of the path. That is: all intermediate child path elements are discarded. * Why so? paths under the magic path include job attempt and task attempt * subdirectories, which need to be skipped. @@ -208,8 +209,8 @@ public static List finalDestination(List elements) { if (isMagicPath(elements)) { List destDir = magicPathParents(elements); List children = magicPathChildren(elements); - checkArgument(!children.isEmpty(), "No path found under " + - MAGIC); + checkArgument(!children.isEmpty(), "No path found under the prefix " + + MAGIC_PATH_PREFIX); ArrayList dest = new ArrayList<>(destDir); if (containsBasePath(children)) { // there's a base marker in the path diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitUtilsWithMR.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitUtilsWithMR.java index c38ab2e9ba..bccfa7523b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitUtilsWithMR.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitUtilsWithMR.java @@ -26,9 +26,11 @@ import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.Preconditions; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.JOB_ID_PREFIX; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA; /** @@ -49,10 +51,13 @@ private CommitUtilsWithMR() { /** * Get the location of magic job attempts. * @param out the base output directory. + * @param jobUUID unique Job ID. * @return the location of magic job attempts. */ - public static Path getMagicJobAttemptsPath(Path out) { - return new Path(out, MAGIC); + public static Path getMagicJobAttemptsPath(Path out, String jobUUID) { + Preconditions.checkArgument(jobUUID != null && !(jobUUID.isEmpty()), + "Invalid job ID: %s", jobUUID); + return new Path(out, MAGIC_PATH_PREFIX + jobUUID); } /** @@ -76,7 +81,7 @@ public static Path getMagicJobAttemptPath(String jobUUID, int appAttemptId, Path dest) { return new Path( - getMagicJobAttemptsPath(dest), + getMagicJobAttemptsPath(dest, jobUUID), formatAppAttemptDir(jobUUID, appAttemptId)); } @@ -88,9 +93,7 @@ public static Path getMagicJobAttemptPath(String jobUUID, */ public static Path getMagicJobPath(String jobUUID, Path dest) { - return new Path( - getMagicJobAttemptsPath(dest), - formatJobDir(jobUUID)); + return getMagicJobAttemptsPath(dest, jobUUID); } /** @@ -102,7 +105,7 @@ public static Path getMagicJobPath(String jobUUID, */ public static String formatJobDir( String jobUUID) { - return String.format("job-%s", jobUUID); + return JOB_ID_PREFIX + jobUUID; } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index 9ded64eedc..518831b7d4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -105,7 +105,6 @@ public void setupJob(JobContext context) throws IOException { Path jobPath = getJobPath(); final FileSystem destFS = getDestinationFS(jobPath, context.getConfiguration()); - destFS.delete(jobPath, true); destFS.mkdirs(jobPath); } } @@ -132,7 +131,7 @@ protected ActiveCommit listPendingUploadsToCommit( */ public void cleanupStagingDirs() { final Path out = getOutputPath(); - Path path = magicSubdir(out); + Path path = getMagicJobPath(getUUID(), out); try(DurationInfo ignored = new DurationInfo(LOG, true, "Deleting magic directory %s", path)) { Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(), diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md index 30fcf15786..29ca6ff578 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md @@ -1308,12 +1308,12 @@ so returning the special new stream. -This is done with a "magic" temporary directory name, `__magic`, to indicate that all files +This is done with a "MAGIC PATH" (where the filesystem knows to remap paths with prefix `__magic_job-${jobId}`) temporary directory name to indicate that all files created under this path are not to be completed during the stream write process. Directories created under the path will still be created —this allows job- and task-specific directories to be created for individual job and task attempts. -For example, the pattern `__magic/${jobID}/${taskId}` could be used to +For example, the pattern `${MAGIC PATH}/${jobID}/${taskId}` could be used to store pending commits to the final directory for that specific task. If that task is committed, all pending commit files stored in that path will be loaded and used to commit the final uploads. @@ -1322,19 +1322,19 @@ Consider a job with the final directory `/results/latest` The intermediate directory for the task 01 attempt 01 of job `job_400_1` would be - /results/latest/__magic/job_400_1/_task_01_01 + /results/latest/__magic_job-400/job_400_1/_task_01_01 This would be returned as the temp directory. When a client attempted to create the file -`/results/latest/__magic/job_400_1/task_01_01/latest.orc.lzo` , the S3A FS would initiate +`/results/latest/__magic_job-400/job_400_1/task_01_01/latest.orc.lzo` , the S3A FS would initiate a multipart request with the final destination of `/results/latest/latest.orc.lzo`. As data was written to the output stream, it would be incrementally uploaded as individual multipart PUT operations On `close()`, summary data would be written to the file -`/results/latest/__magic/job400_1/task_01_01/latest.orc.lzo.pending`. +`/results/latest/__magic_job-400/job400_1/task_01_01/latest.orc.lzo.pending`. This would contain the upload ID and all the parts and etags of uploaded data. A marker file is also created, so that code which verifies that a newly created file @@ -1358,7 +1358,7 @@ to the job attempt. 1. These are merged into to a single `Pendingset` structure. 1. Which is saved to a `.pendingset` file in the job attempt directory. 1. Finally, the task attempt directory is deleted. In the example, this -would be to `/results/latest/__magic/job400_1/task_01_01.pendingset`; +would be to `/results/latest/__magic_job-400/job400_1/task_01_01.pendingset`; A failure to load any of the single pending upload files (i.e. the file @@ -1386,9 +1386,9 @@ file. To allow tasks to generate data in subdirectories, a special filename `__base` will be used to provide an extra cue as to the final path. When mapping an output -path `/results/latest/__magic/job_400/task_01_01/__base/2017/2017-01-01.orc.lzo.pending` +path `/results/latest/__magic_job-400/job_400/task_01_01/__base/2017/2017-01-01.orc.lzo.pending` to a final destination path, the path will become `/results/latest/2017/2017-01-01.orc.lzo`. -That is: all directories between `__magic` and `__base` inclusive will be ignored. +That is: all directories between `__magic_job-400` and `__base` inclusive will be ignored. **Issues** @@ -1479,16 +1479,16 @@ Job drivers themselves may be preempted. One failure case is that the entire execution framework failed; a new process must identify outstanding jobs with pending work, and abort them, then delete -the appropriate `__magic` directories. +the appropriate `"MAGIC PATH"` directories. -This can be done either by scanning the directory tree for `__magic` directories +This can be done either by scanning the directory tree for `"MAGIC PATH"` directories and scanning underneath them, or by using the `listMultipartUploads()` call to list multipart uploads under a path, then cancel them. The most efficient solution may be to use `listMultipartUploads` to identify all outstanding request, and use that -to identify which requests to cancel, and where to scan for `__magic` directories. +to identify which requests to cancel, and where to scan for `"MAGIC PATH"` directories. This strategy should address scalability problems when working with repositories with many millions of objects —rather than list all keys searching for those -with `/__magic/**/*.pending` in their name, work backwards from the active uploads to +with `/${MAGIC PATH}/**/*.pending` in their name, work backwards from the active uploads to the directories with the data. We may also want to consider having a cleanup operation in the S3 CLI to @@ -1569,11 +1569,11 @@ a directory, then it is not going to work: the existing data will not be cleaned up. A cleanup operation would need to be included in the job commit, deleting all files in the destination directory which where not being overwritten. -1. It requires a path element, such as `__magic` which cannot be used +1. It requires a path element, such as `"MAGIC PATH"` which cannot be used for any purpose other than for the storage of pending commit data. 1. Unless extra code is added to every FS operation, it will still be possible -to manipulate files under the `__magic` tree. That's not bad, just potentially +to manipulate files under the `"MAGIC PATH"` tree. That's not bad, just potentially confusing. 1. As written data is not materialized until the commit, it will not be possible @@ -1692,9 +1692,9 @@ must be used, which means: the V2 classes. #### Resolved issues -**Magic Committer: Name of directory** +**Magic Committer: Directory Naming** -The design proposes the name `__magic` for the directory. HDFS and +The design proposes `__magic_job-` as the prefix for the magic paths of different jobs for the directory. HDFS and the various scanning routines always treat files and directories starting with `_` as temporary/excluded data. @@ -1705,14 +1705,14 @@ It is legal to create subdirectories in a task work directory, which will then be moved into the destination directory, retaining that directory tree. -That is, a if the task working dir is `dest/__magic/app1/task1/`, all files -under `dest/__magic/app1/task1/part-0000/` must end up under the path +That is, a if the task working dir is `dest/${MAGIC PATH}/app1/task1/`, all files +under `dest/${MAGIC PATH}/app1/task1/part-0000/` must end up under the path `dest/part-0000/`. This behavior is relied upon for the writing of intermediate map data in an MR job. -This means it is not simply enough to strip off all elements of under `__magic`, +This means it is not simply enough to strip off all elements of under ``"MAGIC PATH"``, it is critical to determine the base path. Proposed: use the special name `__base` as a marker of the base element for @@ -1918,9 +1918,9 @@ bandwidth and the data upload bandwidth. No use is made of the cluster filesystem; there are no risks there. -A malicious user with write access to the `__magic` directory could manipulate +A malicious user with write access to the ``"MAGIC PATH"`` directory could manipulate or delete the metadata of pending uploads, or potentially inject new work int -the commit. Having access to the `__magic` directory implies write access +the commit. Having access to the ``"MAGIC PATH"`` directory implies write access to the parent destination directory: a malicious user could just as easily manipulate the final output, without needing to attack the committer's intermediate files. diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md index 4dd69e8efe..4c14921c4b 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md @@ -507,7 +507,7 @@ performance. ### FileSystem client setup -The S3A connector can recognize files created under paths with `__magic/` as a parent directory. +The S3A connector can recognize files created under paths with `${MAGIC PATH}/` as a parent directory. This allows it to handle those files in a special way, such as uploading to a different location and storing the information needed to complete pending multipart uploads. @@ -686,7 +686,7 @@ The examples below shows how these options can be configured in XML. ### Disabling magic committer path rewriting -The magic committer recognizes when files are created under paths with `__magic/` as a parent directory +The magic committer recognizes when files are created under paths with `${MAGIC PATH}/` as a parent directory and redirects the upload to a different location, adding the information needed to complete the upload in the job commit operation. @@ -708,10 +708,12 @@ You will not be able to use the Magic Committer if this option is disabled. ## Concurrent Jobs writing to the same destination -It is sometimes possible for multiple jobs to simultaneously write to the same destination path. +It is sometimes possible for multiple jobs to simultaneously write to the same destination path. To support +such use case, The "MAGIC PATH" for each job is unique of the format `__magic_job-${jobId}` so that +multiple job running simultaneously do not step into each other. Before attempting this, the committers must be set to not delete all incomplete uploads on job commit, -by setting `fs.s3a.committer.abort.pending.uploads` to `false` +by setting `fs.s3a.committer.abort.pending.uploads` to `false`. This is set to `false`by default ```xml @@ -740,9 +742,6 @@ For example, for any job executed through Hadoop MapReduce, the Job ID can be us ``` -Even with these settings, the outcome of concurrent jobs to the same destination is -inherently nondeterministic -use with caution. - ## Troubleshooting ### `Filesystem does not have support for 'magic' committer` @@ -805,7 +804,7 @@ files which are actually written to a different destination than their stated pa This message should not appear through the committer itself —it will fail with the error message in the previous section, but may arise -if other applications are attempting to create files under the path `/__magic/`. +if other applications are attempting to create files under the path `/${MAGIC PATH}/`. ### `FileOutputCommitter` appears to be still used (from logs or delays in commits) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java index 258c34b5cb..67c88039aa 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java @@ -1419,7 +1419,7 @@ public void testOutputFormatIntegration() throws Throwable { recordWriter.write(iw, iw); long expectedLength = 4; Path dest = recordWriter.getDest(); - validateTaskAttemptPathDuringWrite(dest, expectedLength); + validateTaskAttemptPathDuringWrite(dest, expectedLength, jobData.getCommitter().getUUID()); recordWriter.close(tContext); // at this point validateTaskAttemptPathAfterWrite(dest, expectedLength); @@ -1833,10 +1833,12 @@ public void testS3ACommitterFactoryBinding() throws Throwable { * itself. * @param p path * @param expectedLength + * @param jobId job id * @throws IOException IO failure */ protected void validateTaskAttemptPathDuringWrite(Path p, - final long expectedLength) throws IOException { + final long expectedLength, + String jobId) throws IOException { } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java index aa44c171d7..39265f1d8e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java @@ -330,10 +330,11 @@ protected void applyCustomConfigOptions(JobConf jobConf) throws IOException { * called after the base assertions have all passed. * @param destPath destination of work * @param successData loaded success data + * @param jobId job id * @throws Exception failure */ protected void customPostExecutionValidation(Path destPath, - SuccessData successData) + SuccessData successData, String jobId) throws Exception { } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterTestHelper.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterTestHelper.java index cd703f96da..573dd23b18 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterTestHelper.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterTestHelper.java @@ -34,7 +34,7 @@ import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.XA_MAGIC_MARKER; import static org.apache.hadoop.fs.s3a.commit.impl.CommitOperations.extractMagicFileLength; @@ -52,6 +52,8 @@ public class CommitterTestHelper { */ private final S3AFileSystem fileSystem; + public static final String JOB_ID = "123"; + /** * Constructor. * @param fileSystem filesystem to work with. @@ -108,7 +110,7 @@ public void assertFileLacksMarkerHeader(Path path) throws IOException { */ public static Path makeMagic(Path destFile) { return new Path(destFile.getParent(), - MAGIC + '/' + BASE + "/" + destFile.getName()); + MAGIC_PATH_PREFIX + JOB_ID + '/' + BASE + "/" + destFile.getName()); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java index d0c86b738c..fbe1a0a312 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java @@ -51,7 +51,8 @@ import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_METADATA_REQUESTS; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_INITIATED; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX; +import static org.apache.hadoop.fs.s3a.commit.CommitterTestHelper.JOB_ID; import static org.apache.hadoop.fs.s3a.commit.CommitterTestHelper.assertIsMagicStream; import static org.apache.hadoop.fs.s3a.commit.CommitterTestHelper.makeMagic; import static org.apache.hadoop.fs.s3a.performance.OperationCost.LIST_FILES_LIST_OP; @@ -123,12 +124,12 @@ protected String fileSystemIOStats() { @Test public void testMagicMkdir() throws Throwable { - describe("Mkdirs __magic always skips dir marker deletion"); + describe("Mkdirs 'MAGIC PATH' always skips dir marker deletion"); S3AFileSystem fs = getFileSystem(); Path baseDir = methodPath(); // create dest dir marker, always fs.mkdirs(baseDir); - Path magicDir = new Path(baseDir, MAGIC); + Path magicDir = new Path(baseDir, MAGIC_PATH_PREFIX + JOB_ID); verifyMetrics(() -> { fs.mkdirs(magicDir); return fileSystemIOStats(); @@ -151,10 +152,10 @@ public void testMagicMkdir() throws Throwable { */ @Test public void testMagicMkdirs() throws Throwable { - describe("Mkdirs __magic/subdir always skips dir marker deletion"); + describe("Mkdirs __magic_job-/subdir always skips dir marker deletion"); S3AFileSystem fs = getFileSystem(); Path baseDir = methodPath(); - Path magicDir = new Path(baseDir, MAGIC); + Path magicDir = new Path(baseDir, MAGIC_PATH_PREFIX + JOB_ID); fs.delete(baseDir, true); Path magicSubdir = new Path(magicDir, "subdir"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java index a76a65be8b..64def00fd2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java @@ -207,7 +207,7 @@ private CommitOperations newCommitOperations() */ private static Path makeMagic(Path destFile) { return new Path(destFile.getParent(), - MAGIC + '/' + destFile.getName()); + MAGIC_PATH_PREFIX + JOB_ID + '/' + destFile.getName()); } @Test @@ -279,7 +279,7 @@ public void testBaseRelativePath() throws Throwable { S3AFileSystem fs = getFileSystem(); Path destDir = methodSubPath("testBaseRelativePath"); fs.delete(destDir, true); - Path pendingBaseDir = new Path(destDir, MAGIC + "/child/" + BASE); + Path pendingBaseDir = new Path(destDir, MAGIC_PATH_PREFIX + JOB_ID + "/child/" + BASE); String child = "subdir/child.txt"; Path pendingChildPath = new Path(pendingBaseDir, child); Path expectedDestPath = new Path(destDir, child); @@ -334,7 +334,7 @@ public void testMarkerFileRename() /** * Create a file through the magic commit mechanism. - * @param filename file to create (with __magic path.) + * @param filename file to create (with "MAGIC PATH".) * @param data data to write * @throws Exception failure */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitPaths.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitPaths.java index fdc4ec8058..610491867f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitPaths.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitPaths.java @@ -38,16 +38,16 @@ public class TestMagicCommitPaths extends Assert { private static final List MAGIC_AT_ROOT = - list(MAGIC); + list(MAGIC_PATH_PREFIX); private static final List MAGIC_AT_ROOT_WITH_CHILD = - list(MAGIC, "child"); + list(MAGIC_PATH_PREFIX, "child"); private static final List MAGIC_WITH_CHILD = - list("parent", MAGIC, "child"); + list("parent", MAGIC_PATH_PREFIX, "child"); private static final List MAGIC_AT_WITHOUT_CHILD = - list("parent", MAGIC); + list("parent", MAGIC_PATH_PREFIX); private static final List DEEP_MAGIC = - list("parent1", "parent2", MAGIC, "child1", "child2"); + list("parent1", "parent2", MAGIC_PATH_PREFIX, "child1", "child2"); public static final String[] EMPTY = {}; @@ -161,40 +161,40 @@ public void testFinalDestinationNoMagic() { @Test public void testFinalDestinationMagic1() { assertEquals(l("first", "2"), - finalDestination(l("first", MAGIC, "2"))); + finalDestination(l("first", MAGIC_PATH_PREFIX, "2"))); } @Test public void testFinalDestinationMagic2() { assertEquals(l("first", "3.txt"), - finalDestination(l("first", MAGIC, "2", "3.txt"))); + finalDestination(l("first", MAGIC_PATH_PREFIX, "2", "3.txt"))); } @Test public void testFinalDestinationRootMagic2() { assertEquals(l("3.txt"), - finalDestination(l(MAGIC, "2", "3.txt"))); + finalDestination(l(MAGIC_PATH_PREFIX, "2", "3.txt"))); } @Test(expected = IllegalArgumentException.class) public void testFinalDestinationMagicNoChild() { - finalDestination(l(MAGIC)); + finalDestination(l(MAGIC_PATH_PREFIX)); } @Test public void testFinalDestinationBaseDirectChild() { - finalDestination(l(MAGIC, BASE, "3.txt")); + finalDestination(l(MAGIC_PATH_PREFIX, BASE, "3.txt")); } @Test(expected = IllegalArgumentException.class) public void testFinalDestinationBaseNoChild() { - assertEquals(l(), finalDestination(l(MAGIC, BASE))); + assertEquals(l(), finalDestination(l(MAGIC_PATH_PREFIX, BASE))); } @Test public void testFinalDestinationBaseSubdirsChild() { assertEquals(l("2", "3.txt"), - finalDestination(l(MAGIC, "4", BASE, "2", "3.txt"))); + finalDestination(l(MAGIC_PATH_PREFIX, "4", BASE, "2", "3.txt"))); } /** @@ -203,7 +203,7 @@ public void testFinalDestinationBaseSubdirsChild() { @Test public void testFinalDestinationIgnoresBaseBeforeMagic() { assertEquals(l(BASE, "home", "3.txt"), - finalDestination(l(BASE, "home", MAGIC, "2", "3.txt"))); + finalDestination(l(BASE, "home", MAGIC_PATH_PREFIX, "2", "3.txt"))); } /** varargs to array. */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java index 622ead2617..71be373b40 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java @@ -75,7 +75,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR; import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX; import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS; import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID; import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory; @@ -332,7 +332,7 @@ public void test_200_execute() throws Exception { assertPathDoesNotExist("temporary dir should only be from" + " classic file committers", new Path(outputPath, CommitConstants.TEMPORARY)); - customPostExecutionValidation(outputPath, successData); + customPostExecutionValidation(outputPath, successData, jobID); } @Override @@ -343,8 +343,8 @@ protected void applyCustomConfigOptions(final JobConf jobConf) @Override protected void customPostExecutionValidation(final Path destPath, - final SuccessData successData) throws Exception { - committerTestBinding.validateResult(destPath, successData); + final SuccessData successData, String jobId) throws Exception { + committerTestBinding.validateResult(destPath, successData, jobId); } /** @@ -482,7 +482,7 @@ protected void applyCustomConfigOptions(JobConf jobConf) * @throws Exception failure */ protected void validateResult(Path destPath, - SuccessData successData) + SuccessData successData, String jobId) throws Exception { } @@ -576,7 +576,7 @@ private MagicCommitterTestBinding() { } /** - * The result validation here is that there isn't a __magic directory + * The result validation here is that there isn't a "MAGIC PATH" directory * any more. * @param destPath destination of work * @param successData loaded success data @@ -584,9 +584,9 @@ private MagicCommitterTestBinding() { */ @Override protected void validateResult(final Path destPath, - final SuccessData successData) + final SuccessData successData, final String jobId) throws Exception { - Path magicDir = new Path(destPath, MAGIC); + Path magicDir = new Path(destPath, MAGIC_PATH_PREFIX + jobId); // if an FNFE isn't raised on getFileStatus, list out the directory // tree diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java index 32cab03770..fa963a4b97 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java @@ -42,6 +42,7 @@ import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.getMagicJobPath; import static org.apache.hadoop.util.functional.RemoteIterators.toList; /** @@ -74,7 +75,7 @@ public void setup() throws Exception { public void assertJobAbortCleanedUp(JobData jobData) throws Exception { // special handling of magic directory; harmless in staging - Path magicDir = new Path(getOutDir(), MAGIC); + Path magicDir = getMagicJobPath(jobData.getCommitter().getUUID(), getOutDir()); ContractTestUtils.assertPathDoesNotExist(getFileSystem(), "magic dir ", magicDir); super.assertJobAbortCleanedUp(jobData); @@ -94,11 +95,12 @@ public MagicS3GuardCommitter createFailingCommitter( } protected void validateTaskAttemptPathDuringWrite(Path p, - final long expectedLength) throws IOException { + final long expectedLength, + String jobId) throws IOException { String pathStr = p.toString(); Assertions.assertThat(pathStr) .describedAs("Magic path") - .contains(MAGIC); + .contains("/" + MAGIC_PATH_PREFIX + jobId + "/"); assertPathDoesNotExist("task attempt visible", p); } @@ -129,7 +131,7 @@ protected void validateTaskAttemptPathAfterWrite(Path marker, /** * The magic committer paths are always on S3, and always have - * "__magic" in the path. + * "MAGIC PATH" in the path. * @param committer committer instance * @param context task attempt context * @throws IOException IO failure @@ -143,11 +145,11 @@ protected void validateTaskAttemptWorkingDirectory( + " with committer " + committer, "s3a", wd.getScheme()); Assertions.assertThat(wd.getPath()) - .contains('/' + CommitConstants.MAGIC + '/'); + .contains("/" + MAGIC_PATH_PREFIX + committer.getUUID() + "/"); } /** - * Verify that the __magic path for the application/tasks use the + * Verify that the "MAGIC PATH" for the application/tasks use the * committer UUID to ensure uniqueness in the case of more than * one job writing to the same destination path. */ @@ -164,7 +166,7 @@ public void testCommittersPathsHaveUUID() throws Throwable { Assertions.assertThat(taskAttemptPath.toString()) .describedAs("task path of %s", committer) .contains(committer.getUUID()) - .contains(MAGIC) + .contains("/" + MAGIC_PATH_PREFIX + committer.getUUID() + "/") .doesNotContain(TEMP_DATA) .endsWith(BASE) .contains(ta0); @@ -176,7 +178,7 @@ public void testCommittersPathsHaveUUID() throws Throwable { .describedAs("Temp task path of %s", committer) .contains(committer.getUUID()) .contains(TEMP_DATA) - .doesNotContain(MAGIC) + .doesNotContain("/" + MAGIC_PATH_PREFIX + committer.getUUID() + "/") .doesNotContain(BASE) .contains(ta0); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java index e1440a497b..613fab1928 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java @@ -33,7 +33,6 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.fs.s3a.commit.CommitUtils; import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; @@ -59,6 +58,8 @@ public class ITestS3AHugeMagicCommits extends AbstractSTestS3AHugeFiles { ITestS3AHugeMagicCommits.class); private static final int COMMITTER_THREADS = 64; + private static final String JOB_ID = "123"; + private Path magicDir; private Path jobDir; @@ -100,8 +101,8 @@ public void setup() throws Exception { // set up the paths for the commit operation finalDirectory = new Path(getScaleTestDir(), "commit"); - magicDir = new Path(finalDirectory, MAGIC); - jobDir = new Path(magicDir, "job_001"); + magicDir = new Path(finalDirectory, MAGIC_PATH_PREFIX + JOB_ID); + jobDir = new Path(magicDir, "job_" + JOB_ID); String filename = "commit.bin"; setHugefile(new Path(finalDirectory, filename)); magicOutputFile = new Path(jobDir, filename); @@ -141,7 +142,7 @@ public void test_030_postCreationAssertions() throws Throwable { ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); CommitOperations operations = new CommitOperations(fs); Path destDir = getHugefile().getParent(); - assertPathExists("Magic dir", new Path(destDir, CommitConstants.MAGIC)); + assertPathExists("Magic dir", new Path(destDir, MAGIC_PATH_PREFIX + JOB_ID)); String destDirKey = fs.pathToKey(destDir); Assertions.assertThat(listMultipartUploads(fs, destDirKey)) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java index dd62064f86..81c3af812a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java @@ -111,7 +111,8 @@ protected void expectJobCommitToFail(JobContext jContext, } protected void validateTaskAttemptPathDuringWrite(Path p, - final long expectedLength) throws IOException { + final long expectedLength, + String jobId) throws IOException { // this is expected to be local FS ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestHeaderProcessing.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestHeaderProcessing.java index 81bd8a5efe..f6be1f75cf 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestHeaderProcessing.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestHeaderProcessing.java @@ -74,7 +74,7 @@ public class TestHeaderProcessing extends HadoopTestBase { private HeaderProcessing headerProcessing; private static final String MAGIC_KEY - = "dest/__magic/job1/ta1/__base/output.csv"; + = "dest/__magic_job-1/job1/ta1/__base/output.csv"; private static final String MAGIC_FILE = "s3a://bucket/" + MAGIC_KEY;