HADOOP-18797. Support Concurrent Writes With S3A Magic Committer (#6006)

Jobs which commit their work to S3 thr
magic committer now use a unique magic
containing the job ID:
 __magic_job-${jobid}

This allows for multiple jobs to write
to the same destination simultaneously.

Contributed by Syed Shameerur Rahman
This commit is contained in:
Syed Shameerur Rahman 2023-09-20 15:56:42 +05:30 committed by GitHub
parent f24b73e5f3
commit 5512c9f924
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 125 additions and 109 deletions

View File

@ -3669,7 +3669,7 @@ public UserGroupInformation getOwner() {
* directories. Has the semantics of Unix {@code 'mkdir -p'}. * directories. Has the semantics of Unix {@code 'mkdir -p'}.
* Existence of the directory hierarchy is not an error. * Existence of the directory hierarchy is not an error.
* Parent elements are scanned to see if any are a file, * Parent elements are scanned to see if any are a file,
* <i>except under __magic</i> paths. * <i>except under "MAGIC PATH"</i> paths.
* There the FS assumes that the destination directory creation * There the FS assumes that the destination directory creation
* did that scan and that paths in job/task attempts are all * did that scan and that paths in job/task attempts are all
* "well formed" * "well formed"
@ -4769,7 +4769,7 @@ public boolean isMagicCommitPath(Path path) {
/** /**
* Predicate: is a path under a magic commit path? * Predicate: is a path under a magic commit path?
* True if magic commit is enabled and the path is under __magic, * True if magic commit is enabled and the path is under "MAGIC PATH",
* irrespective of file type. * irrespective of file type.
* @param path path to examine * @param path path to examine
* @return true if the path is in a magic dir and the FS has magic writes enabled. * @return true if the path is in a magic dir and the FS has magic writes enabled.

View File

@ -39,6 +39,8 @@ private CommitConstants() {
* {@value}. * {@value}.
*/ */
public static final String MAGIC = "__magic"; public static final String MAGIC = "__magic";
public static final String JOB_ID_PREFIX = "job-";
public static final String MAGIC_PATH_PREFIX = MAGIC + "_" + JOB_ID_PREFIX;
/** /**
* Marker of the start of a directory tree for calculating * Marker of the start of a directory tree for calculating
@ -280,10 +282,12 @@ private CommitConstants() {
/** /**
* Default configuration value for * Default configuration value for
* {@link #FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS}. * {@link #FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS}.
* It is disabled by default to support concurrent writes on the same
* parent directory but different partition/sub directory.
* Value: {@value}. * Value: {@value}.
*/ */
public static final boolean DEFAULT_FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS = public static final boolean DEFAULT_FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS =
true; false;
/** /**
* The limit to the number of committed objects tracked during * The limit to the number of committed objects tracked during

View File

@ -25,8 +25,8 @@
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitterFactory; import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitterFactory;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterFactory; import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterFactory;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
/** /**
* These are internal constants not intended for public use. * These are internal constants not intended for public use.
@ -108,7 +108,7 @@ private InternalCommitterConstants() {
/** Error message for a path without a magic element in the list: {@value}. */ /** Error message for a path without a magic element in the list: {@value}. */
public static final String E_NO_MAGIC_PATH_ELEMENT public static final String E_NO_MAGIC_PATH_ELEMENT
= "No " + MAGIC + " element in path"; = "No " + MAGIC_PATH_PREFIX + " element in path";
/** /**
* The UUID for jobs: {@value}. * The UUID for jobs: {@value}.

View File

@ -21,13 +21,15 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
import static org.apache.hadoop.util.Preconditions.checkArgument; import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_MAGIC_PATH_ELEMENT; import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_MAGIC_PATH_ELEMENT;
/** /**
@ -76,7 +78,8 @@ public static List<String> splitPathToElements(Path path) {
* @return true if a path is considered magic * @return true if a path is considered magic
*/ */
public static boolean isMagicPath(List<String> elements) { public static boolean isMagicPath(List<String> elements) {
return elements.contains(MAGIC); return elements.stream()
.anyMatch(element -> element.startsWith(MAGIC_PATH_PREFIX));
} }
/** /**
@ -96,9 +99,16 @@ public static boolean containsBasePath(List<String> elements) {
* @throws IllegalArgumentException if there is no magic element * @throws IllegalArgumentException if there is no magic element
*/ */
public static int magicElementIndex(List<String> elements) { public static int magicElementIndex(List<String> elements) {
int index = elements.indexOf(MAGIC); Optional<Integer> index = IntStream.range(0, elements.size())
checkArgument(index >= 0, E_NO_MAGIC_PATH_ELEMENT); .filter(i -> elements.get(i).startsWith(MAGIC_PATH_PREFIX))
return index; .boxed()
.findFirst();
if (index.isPresent()) {
return index.get();
} else {
throw new IllegalArgumentException(E_NO_MAGIC_PATH_ELEMENT);
}
} }
/** /**
@ -182,18 +192,9 @@ public static String lastElement(List<String> strings) {
return strings.get(strings.size() - 1); return strings.get(strings.size() - 1);
} }
/**
* Get the magic subdirectory of a destination directory.
* @param destDir the destination directory
* @return a new path.
*/
public static Path magicSubdir(Path destDir) {
return new Path(destDir, MAGIC);
}
/** /**
* Calculates the final destination of a file. * Calculates the final destination of a file.
* This is the parent of any {@code __magic} element, and the filename * This is the parent of any "MAGIC PATH" element, and the filename
* of the path. That is: all intermediate child path elements are discarded. * of the path. That is: all intermediate child path elements are discarded.
* Why so? paths under the magic path include job attempt and task attempt * Why so? paths under the magic path include job attempt and task attempt
* subdirectories, which need to be skipped. * subdirectories, which need to be skipped.
@ -208,8 +209,8 @@ public static List<String> finalDestination(List<String> elements) {
if (isMagicPath(elements)) { if (isMagicPath(elements)) {
List<String> destDir = magicPathParents(elements); List<String> destDir = magicPathParents(elements);
List<String> children = magicPathChildren(elements); List<String> children = magicPathChildren(elements);
checkArgument(!children.isEmpty(), "No path found under " + checkArgument(!children.isEmpty(), "No path found under the prefix " +
MAGIC); MAGIC_PATH_PREFIX);
ArrayList<String> dest = new ArrayList<>(destDir); ArrayList<String> dest = new ArrayList<>(destDir);
if (containsBasePath(children)) { if (containsBasePath(children)) {
// there's a base marker in the path // there's a base marker in the path

View File

@ -26,9 +26,11 @@
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Preconditions;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.JOB_ID_PREFIX;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA;
/** /**
@ -49,10 +51,13 @@ private CommitUtilsWithMR() {
/** /**
* Get the location of magic job attempts. * Get the location of magic job attempts.
* @param out the base output directory. * @param out the base output directory.
* @param jobUUID unique Job ID.
* @return the location of magic job attempts. * @return the location of magic job attempts.
*/ */
public static Path getMagicJobAttemptsPath(Path out) { public static Path getMagicJobAttemptsPath(Path out, String jobUUID) {
return new Path(out, MAGIC); Preconditions.checkArgument(jobUUID != null && !(jobUUID.isEmpty()),
"Invalid job ID: %s", jobUUID);
return new Path(out, MAGIC_PATH_PREFIX + jobUUID);
} }
/** /**
@ -76,7 +81,7 @@ public static Path getMagicJobAttemptPath(String jobUUID,
int appAttemptId, int appAttemptId,
Path dest) { Path dest) {
return new Path( return new Path(
getMagicJobAttemptsPath(dest), getMagicJobAttemptsPath(dest, jobUUID),
formatAppAttemptDir(jobUUID, appAttemptId)); formatAppAttemptDir(jobUUID, appAttemptId));
} }
@ -88,9 +93,7 @@ public static Path getMagicJobAttemptPath(String jobUUID,
*/ */
public static Path getMagicJobPath(String jobUUID, public static Path getMagicJobPath(String jobUUID,
Path dest) { Path dest) {
return new Path( return getMagicJobAttemptsPath(dest, jobUUID);
getMagicJobAttemptsPath(dest),
formatJobDir(jobUUID));
} }
/** /**
@ -102,7 +105,7 @@ public static Path getMagicJobPath(String jobUUID,
*/ */
public static String formatJobDir( public static String formatJobDir(
String jobUUID) { String jobUUID) {
return String.format("job-%s", jobUUID); return JOB_ID_PREFIX + jobUUID;
} }
/** /**

View File

@ -105,7 +105,6 @@ public void setupJob(JobContext context) throws IOException {
Path jobPath = getJobPath(); Path jobPath = getJobPath();
final FileSystem destFS = getDestinationFS(jobPath, final FileSystem destFS = getDestinationFS(jobPath,
context.getConfiguration()); context.getConfiguration());
destFS.delete(jobPath, true);
destFS.mkdirs(jobPath); destFS.mkdirs(jobPath);
} }
} }
@ -132,7 +131,7 @@ protected ActiveCommit listPendingUploadsToCommit(
*/ */
public void cleanupStagingDirs() { public void cleanupStagingDirs() {
final Path out = getOutputPath(); final Path out = getOutputPath();
Path path = magicSubdir(out); Path path = getMagicJobPath(getUUID(), out);
try(DurationInfo ignored = new DurationInfo(LOG, true, try(DurationInfo ignored = new DurationInfo(LOG, true,
"Deleting magic directory %s", path)) { "Deleting magic directory %s", path)) {
Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(), Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(),

View File

@ -1308,12 +1308,12 @@ so returning the special new stream.
This is done with a "magic" temporary directory name, `__magic`, to indicate that all files This is done with a "MAGIC PATH" (where the filesystem knows to remap paths with prefix `__magic_job-${jobId}`) temporary directory name to indicate that all files
created under this path are not to be completed during the stream write process. created under this path are not to be completed during the stream write process.
Directories created under the path will still be created —this allows job- and Directories created under the path will still be created —this allows job- and
task-specific directories to be created for individual job and task attempts. task-specific directories to be created for individual job and task attempts.
For example, the pattern `__magic/${jobID}/${taskId}` could be used to For example, the pattern `${MAGIC PATH}/${jobID}/${taskId}` could be used to
store pending commits to the final directory for that specific task. If that store pending commits to the final directory for that specific task. If that
task is committed, all pending commit files stored in that path will be loaded task is committed, all pending commit files stored in that path will be loaded
and used to commit the final uploads. and used to commit the final uploads.
@ -1322,19 +1322,19 @@ Consider a job with the final directory `/results/latest`
The intermediate directory for the task 01 attempt 01 of job `job_400_1` would be The intermediate directory for the task 01 attempt 01 of job `job_400_1` would be
/results/latest/__magic/job_400_1/_task_01_01 /results/latest/__magic_job-400/job_400_1/_task_01_01
This would be returned as the temp directory. This would be returned as the temp directory.
When a client attempted to create the file When a client attempted to create the file
`/results/latest/__magic/job_400_1/task_01_01/latest.orc.lzo` , the S3A FS would initiate `/results/latest/__magic_job-400/job_400_1/task_01_01/latest.orc.lzo` , the S3A FS would initiate
a multipart request with the final destination of `/results/latest/latest.orc.lzo`. a multipart request with the final destination of `/results/latest/latest.orc.lzo`.
As data was written to the output stream, it would be incrementally uploaded as As data was written to the output stream, it would be incrementally uploaded as
individual multipart PUT operations individual multipart PUT operations
On `close()`, summary data would be written to the file On `close()`, summary data would be written to the file
`/results/latest/__magic/job400_1/task_01_01/latest.orc.lzo.pending`. `/results/latest/__magic_job-400/job400_1/task_01_01/latest.orc.lzo.pending`.
This would contain the upload ID and all the parts and etags of uploaded data. This would contain the upload ID and all the parts and etags of uploaded data.
A marker file is also created, so that code which verifies that a newly created file A marker file is also created, so that code which verifies that a newly created file
@ -1358,7 +1358,7 @@ to the job attempt.
1. These are merged into to a single `Pendingset` structure. 1. These are merged into to a single `Pendingset` structure.
1. Which is saved to a `.pendingset` file in the job attempt directory. 1. Which is saved to a `.pendingset` file in the job attempt directory.
1. Finally, the task attempt directory is deleted. In the example, this 1. Finally, the task attempt directory is deleted. In the example, this
would be to `/results/latest/__magic/job400_1/task_01_01.pendingset`; would be to `/results/latest/__magic_job-400/job400_1/task_01_01.pendingset`;
A failure to load any of the single pending upload files (i.e. the file A failure to load any of the single pending upload files (i.e. the file
@ -1386,9 +1386,9 @@ file.
To allow tasks to generate data in subdirectories, a special filename `__base` To allow tasks to generate data in subdirectories, a special filename `__base`
will be used to provide an extra cue as to the final path. When mapping an output will be used to provide an extra cue as to the final path. When mapping an output
path `/results/latest/__magic/job_400/task_01_01/__base/2017/2017-01-01.orc.lzo.pending` path `/results/latest/__magic_job-400/job_400/task_01_01/__base/2017/2017-01-01.orc.lzo.pending`
to a final destination path, the path will become `/results/latest/2017/2017-01-01.orc.lzo`. to a final destination path, the path will become `/results/latest/2017/2017-01-01.orc.lzo`.
That is: all directories between `__magic` and `__base` inclusive will be ignored. That is: all directories between `__magic_job-400` and `__base` inclusive will be ignored.
**Issues** **Issues**
@ -1479,16 +1479,16 @@ Job drivers themselves may be preempted.
One failure case is that the entire execution framework failed; a new process One failure case is that the entire execution framework failed; a new process
must identify outstanding jobs with pending work, and abort them, then delete must identify outstanding jobs with pending work, and abort them, then delete
the appropriate `__magic` directories. the appropriate `"MAGIC PATH"` directories.
This can be done either by scanning the directory tree for `__magic` directories This can be done either by scanning the directory tree for `"MAGIC PATH"` directories
and scanning underneath them, or by using the `listMultipartUploads()` call to and scanning underneath them, or by using the `listMultipartUploads()` call to
list multipart uploads under a path, then cancel them. The most efficient solution list multipart uploads under a path, then cancel them. The most efficient solution
may be to use `listMultipartUploads` to identify all outstanding request, and use that may be to use `listMultipartUploads` to identify all outstanding request, and use that
to identify which requests to cancel, and where to scan for `__magic` directories. to identify which requests to cancel, and where to scan for `"MAGIC PATH"` directories.
This strategy should address scalability problems when working with repositories This strategy should address scalability problems when working with repositories
with many millions of objects —rather than list all keys searching for those with many millions of objects —rather than list all keys searching for those
with `/__magic/**/*.pending` in their name, work backwards from the active uploads to with `/${MAGIC PATH}/**/*.pending` in their name, work backwards from the active uploads to
the directories with the data. the directories with the data.
We may also want to consider having a cleanup operation in the S3 CLI to We may also want to consider having a cleanup operation in the S3 CLI to
@ -1569,11 +1569,11 @@ a directory, then it is not going to work: the existing data will not be cleaned
up. A cleanup operation would need to be included in the job commit, deleting up. A cleanup operation would need to be included in the job commit, deleting
all files in the destination directory which where not being overwritten. all files in the destination directory which where not being overwritten.
1. It requires a path element, such as `__magic` which cannot be used 1. It requires a path element, such as `"MAGIC PATH"` which cannot be used
for any purpose other than for the storage of pending commit data. for any purpose other than for the storage of pending commit data.
1. Unless extra code is added to every FS operation, it will still be possible 1. Unless extra code is added to every FS operation, it will still be possible
to manipulate files under the `__magic` tree. That's not bad, just potentially to manipulate files under the `"MAGIC PATH"` tree. That's not bad, just potentially
confusing. confusing.
1. As written data is not materialized until the commit, it will not be possible 1. As written data is not materialized until the commit, it will not be possible
@ -1692,9 +1692,9 @@ must be used, which means: the V2 classes.
#### Resolved issues #### Resolved issues
**Magic Committer: Name of directory** **Magic Committer: Directory Naming**
The design proposes the name `__magic` for the directory. HDFS and The design proposes `__magic_job-` as the prefix for the magic paths of different jobs for the directory. HDFS and
the various scanning routines always treat files and directories starting with `_` the various scanning routines always treat files and directories starting with `_`
as temporary/excluded data. as temporary/excluded data.
@ -1705,14 +1705,14 @@ It is legal to create subdirectories in a task work directory, which
will then be moved into the destination directory, retaining that directory will then be moved into the destination directory, retaining that directory
tree. tree.
That is, a if the task working dir is `dest/__magic/app1/task1/`, all files That is, a if the task working dir is `dest/${MAGIC PATH}/app1/task1/`, all files
under `dest/__magic/app1/task1/part-0000/` must end up under the path under `dest/${MAGIC PATH}/app1/task1/part-0000/` must end up under the path
`dest/part-0000/`. `dest/part-0000/`.
This behavior is relied upon for the writing of intermediate map data in an MR This behavior is relied upon for the writing of intermediate map data in an MR
job. job.
This means it is not simply enough to strip off all elements of under `__magic`, This means it is not simply enough to strip off all elements of under ``"MAGIC PATH"``,
it is critical to determine the base path. it is critical to determine the base path.
Proposed: use the special name `__base` as a marker of the base element for Proposed: use the special name `__base` as a marker of the base element for
@ -1918,9 +1918,9 @@ bandwidth and the data upload bandwidth.
No use is made of the cluster filesystem; there are no risks there. No use is made of the cluster filesystem; there are no risks there.
A malicious user with write access to the `__magic` directory could manipulate A malicious user with write access to the ``"MAGIC PATH"`` directory could manipulate
or delete the metadata of pending uploads, or potentially inject new work int or delete the metadata of pending uploads, or potentially inject new work int
the commit. Having access to the `__magic` directory implies write access the commit. Having access to the ``"MAGIC PATH"`` directory implies write access
to the parent destination directory: a malicious user could just as easily to the parent destination directory: a malicious user could just as easily
manipulate the final output, without needing to attack the committer's intermediate manipulate the final output, without needing to attack the committer's intermediate
files. files.

View File

@ -507,7 +507,7 @@ performance.
### FileSystem client setup ### FileSystem client setup
The S3A connector can recognize files created under paths with `__magic/` as a parent directory. The S3A connector can recognize files created under paths with `${MAGIC PATH}/` as a parent directory.
This allows it to handle those files in a special way, such as uploading to a different location This allows it to handle those files in a special way, such as uploading to a different location
and storing the information needed to complete pending multipart uploads. and storing the information needed to complete pending multipart uploads.
@ -686,7 +686,7 @@ The examples below shows how these options can be configured in XML.
### Disabling magic committer path rewriting ### Disabling magic committer path rewriting
The magic committer recognizes when files are created under paths with `__magic/` as a parent directory The magic committer recognizes when files are created under paths with `${MAGIC PATH}/` as a parent directory
and redirects the upload to a different location, adding the information needed to complete the upload and redirects the upload to a different location, adding the information needed to complete the upload
in the job commit operation. in the job commit operation.
@ -708,10 +708,12 @@ You will not be able to use the Magic Committer if this option is disabled.
## <a name="concurrent-jobs"></a> Concurrent Jobs writing to the same destination ## <a name="concurrent-jobs"></a> Concurrent Jobs writing to the same destination
It is sometimes possible for multiple jobs to simultaneously write to the same destination path. It is sometimes possible for multiple jobs to simultaneously write to the same destination path. To support
such use case, The "MAGIC PATH" for each job is unique of the format `__magic_job-${jobId}` so that
multiple job running simultaneously do not step into each other.
Before attempting this, the committers must be set to not delete all incomplete uploads on job commit, Before attempting this, the committers must be set to not delete all incomplete uploads on job commit,
by setting `fs.s3a.committer.abort.pending.uploads` to `false` by setting `fs.s3a.committer.abort.pending.uploads` to `false`. This is set to `false`by default
```xml ```xml
<property> <property>
@ -740,9 +742,6 @@ For example, for any job executed through Hadoop MapReduce, the Job ID can be us
</property> </property>
``` ```
Even with these settings, the outcome of concurrent jobs to the same destination is
inherently nondeterministic -use with caution.
## Troubleshooting ## Troubleshooting
### `Filesystem does not have support for 'magic' committer` ### `Filesystem does not have support for 'magic' committer`
@ -805,7 +804,7 @@ files which are actually written to a different destination than their stated pa
This message should not appear through the committer itself &mdash;it will This message should not appear through the committer itself &mdash;it will
fail with the error message in the previous section, but may arise fail with the error message in the previous section, but may arise
if other applications are attempting to create files under the path `/__magic/`. if other applications are attempting to create files under the path `/${MAGIC PATH}/`.
### `FileOutputCommitter` appears to be still used (from logs or delays in commits) ### `FileOutputCommitter` appears to be still used (from logs or delays in commits)

View File

@ -1419,7 +1419,7 @@ public void testOutputFormatIntegration() throws Throwable {
recordWriter.write(iw, iw); recordWriter.write(iw, iw);
long expectedLength = 4; long expectedLength = 4;
Path dest = recordWriter.getDest(); Path dest = recordWriter.getDest();
validateTaskAttemptPathDuringWrite(dest, expectedLength); validateTaskAttemptPathDuringWrite(dest, expectedLength, jobData.getCommitter().getUUID());
recordWriter.close(tContext); recordWriter.close(tContext);
// at this point // at this point
validateTaskAttemptPathAfterWrite(dest, expectedLength); validateTaskAttemptPathAfterWrite(dest, expectedLength);
@ -1833,10 +1833,12 @@ public void testS3ACommitterFactoryBinding() throws Throwable {
* itself. * itself.
* @param p path * @param p path
* @param expectedLength * @param expectedLength
* @param jobId job id
* @throws IOException IO failure * @throws IOException IO failure
*/ */
protected void validateTaskAttemptPathDuringWrite(Path p, protected void validateTaskAttemptPathDuringWrite(Path p,
final long expectedLength) throws IOException { final long expectedLength,
String jobId) throws IOException {
} }

View File

@ -330,10 +330,11 @@ protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
* called after the base assertions have all passed. * called after the base assertions have all passed.
* @param destPath destination of work * @param destPath destination of work
* @param successData loaded success data * @param successData loaded success data
* @param jobId job id
* @throws Exception failure * @throws Exception failure
*/ */
protected void customPostExecutionValidation(Path destPath, protected void customPostExecutionValidation(Path destPath,
SuccessData successData) SuccessData successData, String jobId)
throws Exception { throws Exception {
} }

View File

@ -34,7 +34,7 @@
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists; import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.XA_MAGIC_MARKER; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.XA_MAGIC_MARKER;
import static org.apache.hadoop.fs.s3a.commit.impl.CommitOperations.extractMagicFileLength; import static org.apache.hadoop.fs.s3a.commit.impl.CommitOperations.extractMagicFileLength;
@ -52,6 +52,8 @@ public class CommitterTestHelper {
*/ */
private final S3AFileSystem fileSystem; private final S3AFileSystem fileSystem;
public static final String JOB_ID = "123";
/** /**
* Constructor. * Constructor.
* @param fileSystem filesystem to work with. * @param fileSystem filesystem to work with.
@ -108,7 +110,7 @@ public void assertFileLacksMarkerHeader(Path path) throws IOException {
*/ */
public static Path makeMagic(Path destFile) { public static Path makeMagic(Path destFile) {
return new Path(destFile.getParent(), return new Path(destFile.getParent(),
MAGIC + '/' + BASE + "/" + destFile.getName()); MAGIC_PATH_PREFIX + JOB_ID + '/' + BASE + "/" + destFile.getName());
} }
/** /**

View File

@ -51,7 +51,8 @@
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_METADATA_REQUESTS; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_METADATA_REQUESTS;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_INITIATED; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_INITIATED;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
import static org.apache.hadoop.fs.s3a.commit.CommitterTestHelper.JOB_ID;
import static org.apache.hadoop.fs.s3a.commit.CommitterTestHelper.assertIsMagicStream; import static org.apache.hadoop.fs.s3a.commit.CommitterTestHelper.assertIsMagicStream;
import static org.apache.hadoop.fs.s3a.commit.CommitterTestHelper.makeMagic; import static org.apache.hadoop.fs.s3a.commit.CommitterTestHelper.makeMagic;
import static org.apache.hadoop.fs.s3a.performance.OperationCost.LIST_FILES_LIST_OP; import static org.apache.hadoop.fs.s3a.performance.OperationCost.LIST_FILES_LIST_OP;
@ -123,12 +124,12 @@ protected String fileSystemIOStats() {
@Test @Test
public void testMagicMkdir() throws Throwable { public void testMagicMkdir() throws Throwable {
describe("Mkdirs __magic always skips dir marker deletion"); describe("Mkdirs 'MAGIC PATH' always skips dir marker deletion");
S3AFileSystem fs = getFileSystem(); S3AFileSystem fs = getFileSystem();
Path baseDir = methodPath(); Path baseDir = methodPath();
// create dest dir marker, always // create dest dir marker, always
fs.mkdirs(baseDir); fs.mkdirs(baseDir);
Path magicDir = new Path(baseDir, MAGIC); Path magicDir = new Path(baseDir, MAGIC_PATH_PREFIX + JOB_ID);
verifyMetrics(() -> { verifyMetrics(() -> {
fs.mkdirs(magicDir); fs.mkdirs(magicDir);
return fileSystemIOStats(); return fileSystemIOStats();
@ -151,10 +152,10 @@ public void testMagicMkdir() throws Throwable {
*/ */
@Test @Test
public void testMagicMkdirs() throws Throwable { public void testMagicMkdirs() throws Throwable {
describe("Mkdirs __magic/subdir always skips dir marker deletion"); describe("Mkdirs __magic_job-<jobId>/subdir always skips dir marker deletion");
S3AFileSystem fs = getFileSystem(); S3AFileSystem fs = getFileSystem();
Path baseDir = methodPath(); Path baseDir = methodPath();
Path magicDir = new Path(baseDir, MAGIC); Path magicDir = new Path(baseDir, MAGIC_PATH_PREFIX + JOB_ID);
fs.delete(baseDir, true); fs.delete(baseDir, true);
Path magicSubdir = new Path(magicDir, "subdir"); Path magicSubdir = new Path(magicDir, "subdir");

View File

@ -207,7 +207,7 @@ private CommitOperations newCommitOperations()
*/ */
private static Path makeMagic(Path destFile) { private static Path makeMagic(Path destFile) {
return new Path(destFile.getParent(), return new Path(destFile.getParent(),
MAGIC + '/' + destFile.getName()); MAGIC_PATH_PREFIX + JOB_ID + '/' + destFile.getName());
} }
@Test @Test
@ -279,7 +279,7 @@ public void testBaseRelativePath() throws Throwable {
S3AFileSystem fs = getFileSystem(); S3AFileSystem fs = getFileSystem();
Path destDir = methodSubPath("testBaseRelativePath"); Path destDir = methodSubPath("testBaseRelativePath");
fs.delete(destDir, true); fs.delete(destDir, true);
Path pendingBaseDir = new Path(destDir, MAGIC + "/child/" + BASE); Path pendingBaseDir = new Path(destDir, MAGIC_PATH_PREFIX + JOB_ID + "/child/" + BASE);
String child = "subdir/child.txt"; String child = "subdir/child.txt";
Path pendingChildPath = new Path(pendingBaseDir, child); Path pendingChildPath = new Path(pendingBaseDir, child);
Path expectedDestPath = new Path(destDir, child); Path expectedDestPath = new Path(destDir, child);
@ -334,7 +334,7 @@ public void testMarkerFileRename()
/** /**
* Create a file through the magic commit mechanism. * Create a file through the magic commit mechanism.
* @param filename file to create (with __magic path.) * @param filename file to create (with "MAGIC PATH".)
* @param data data to write * @param data data to write
* @throws Exception failure * @throws Exception failure
*/ */

View File

@ -38,16 +38,16 @@
public class TestMagicCommitPaths extends Assert { public class TestMagicCommitPaths extends Assert {
private static final List<String> MAGIC_AT_ROOT = private static final List<String> MAGIC_AT_ROOT =
list(MAGIC); list(MAGIC_PATH_PREFIX);
private static final List<String> MAGIC_AT_ROOT_WITH_CHILD = private static final List<String> MAGIC_AT_ROOT_WITH_CHILD =
list(MAGIC, "child"); list(MAGIC_PATH_PREFIX, "child");
private static final List<String> MAGIC_WITH_CHILD = private static final List<String> MAGIC_WITH_CHILD =
list("parent", MAGIC, "child"); list("parent", MAGIC_PATH_PREFIX, "child");
private static final List<String> MAGIC_AT_WITHOUT_CHILD = private static final List<String> MAGIC_AT_WITHOUT_CHILD =
list("parent", MAGIC); list("parent", MAGIC_PATH_PREFIX);
private static final List<String> DEEP_MAGIC = private static final List<String> DEEP_MAGIC =
list("parent1", "parent2", MAGIC, "child1", "child2"); list("parent1", "parent2", MAGIC_PATH_PREFIX, "child1", "child2");
public static final String[] EMPTY = {}; public static final String[] EMPTY = {};
@ -161,40 +161,40 @@ public void testFinalDestinationNoMagic() {
@Test @Test
public void testFinalDestinationMagic1() { public void testFinalDestinationMagic1() {
assertEquals(l("first", "2"), assertEquals(l("first", "2"),
finalDestination(l("first", MAGIC, "2"))); finalDestination(l("first", MAGIC_PATH_PREFIX, "2")));
} }
@Test @Test
public void testFinalDestinationMagic2() { public void testFinalDestinationMagic2() {
assertEquals(l("first", "3.txt"), assertEquals(l("first", "3.txt"),
finalDestination(l("first", MAGIC, "2", "3.txt"))); finalDestination(l("first", MAGIC_PATH_PREFIX, "2", "3.txt")));
} }
@Test @Test
public void testFinalDestinationRootMagic2() { public void testFinalDestinationRootMagic2() {
assertEquals(l("3.txt"), assertEquals(l("3.txt"),
finalDestination(l(MAGIC, "2", "3.txt"))); finalDestination(l(MAGIC_PATH_PREFIX, "2", "3.txt")));
} }
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testFinalDestinationMagicNoChild() { public void testFinalDestinationMagicNoChild() {
finalDestination(l(MAGIC)); finalDestination(l(MAGIC_PATH_PREFIX));
} }
@Test @Test
public void testFinalDestinationBaseDirectChild() { public void testFinalDestinationBaseDirectChild() {
finalDestination(l(MAGIC, BASE, "3.txt")); finalDestination(l(MAGIC_PATH_PREFIX, BASE, "3.txt"));
} }
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testFinalDestinationBaseNoChild() { public void testFinalDestinationBaseNoChild() {
assertEquals(l(), finalDestination(l(MAGIC, BASE))); assertEquals(l(), finalDestination(l(MAGIC_PATH_PREFIX, BASE)));
} }
@Test @Test
public void testFinalDestinationBaseSubdirsChild() { public void testFinalDestinationBaseSubdirsChild() {
assertEquals(l("2", "3.txt"), assertEquals(l("2", "3.txt"),
finalDestination(l(MAGIC, "4", BASE, "2", "3.txt"))); finalDestination(l(MAGIC_PATH_PREFIX, "4", BASE, "2", "3.txt")));
} }
/** /**
@ -203,7 +203,7 @@ public void testFinalDestinationBaseSubdirsChild() {
@Test @Test
public void testFinalDestinationIgnoresBaseBeforeMagic() { public void testFinalDestinationIgnoresBaseBeforeMagic() {
assertEquals(l(BASE, "home", "3.txt"), assertEquals(l(BASE, "home", "3.txt"),
finalDestination(l(BASE, "home", MAGIC, "2", "3.txt"))); finalDestination(l(BASE, "home", MAGIC_PATH_PREFIX, "2", "3.txt")));
} }
/** varargs to array. */ /** varargs to array. */

View File

@ -75,7 +75,7 @@
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR; import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles; import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS; import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID; import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory; import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory;
@ -332,7 +332,7 @@ public void test_200_execute() throws Exception {
assertPathDoesNotExist("temporary dir should only be from" assertPathDoesNotExist("temporary dir should only be from"
+ " classic file committers", + " classic file committers",
new Path(outputPath, CommitConstants.TEMPORARY)); new Path(outputPath, CommitConstants.TEMPORARY));
customPostExecutionValidation(outputPath, successData); customPostExecutionValidation(outputPath, successData, jobID);
} }
@Override @Override
@ -343,8 +343,8 @@ protected void applyCustomConfigOptions(final JobConf jobConf)
@Override @Override
protected void customPostExecutionValidation(final Path destPath, protected void customPostExecutionValidation(final Path destPath,
final SuccessData successData) throws Exception { final SuccessData successData, String jobId) throws Exception {
committerTestBinding.validateResult(destPath, successData); committerTestBinding.validateResult(destPath, successData, jobId);
} }
/** /**
@ -482,7 +482,7 @@ protected void applyCustomConfigOptions(JobConf jobConf)
* @throws Exception failure * @throws Exception failure
*/ */
protected void validateResult(Path destPath, protected void validateResult(Path destPath,
SuccessData successData) SuccessData successData, String jobId)
throws Exception { throws Exception {
} }
@ -576,7 +576,7 @@ private MagicCommitterTestBinding() {
} }
/** /**
* The result validation here is that there isn't a __magic directory * The result validation here is that there isn't a "MAGIC PATH" directory
* any more. * any more.
* @param destPath destination of work * @param destPath destination of work
* @param successData loaded success data * @param successData loaded success data
@ -584,9 +584,9 @@ private MagicCommitterTestBinding() {
*/ */
@Override @Override
protected void validateResult(final Path destPath, protected void validateResult(final Path destPath,
final SuccessData successData) final SuccessData successData, final String jobId)
throws Exception { throws Exception {
Path magicDir = new Path(destPath, MAGIC); Path magicDir = new Path(destPath, MAGIC_PATH_PREFIX + jobId);
// if an FNFE isn't raised on getFileStatus, list out the directory // if an FNFE isn't raised on getFileStatus, list out the directory
// tree // tree

View File

@ -42,6 +42,7 @@
import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter; import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.getMagicJobPath;
import static org.apache.hadoop.util.functional.RemoteIterators.toList; import static org.apache.hadoop.util.functional.RemoteIterators.toList;
/** /**
@ -74,7 +75,7 @@ public void setup() throws Exception {
public void assertJobAbortCleanedUp(JobData jobData) public void assertJobAbortCleanedUp(JobData jobData)
throws Exception { throws Exception {
// special handling of magic directory; harmless in staging // special handling of magic directory; harmless in staging
Path magicDir = new Path(getOutDir(), MAGIC); Path magicDir = getMagicJobPath(jobData.getCommitter().getUUID(), getOutDir());
ContractTestUtils.assertPathDoesNotExist(getFileSystem(), ContractTestUtils.assertPathDoesNotExist(getFileSystem(),
"magic dir ", magicDir); "magic dir ", magicDir);
super.assertJobAbortCleanedUp(jobData); super.assertJobAbortCleanedUp(jobData);
@ -94,11 +95,12 @@ public MagicS3GuardCommitter createFailingCommitter(
} }
protected void validateTaskAttemptPathDuringWrite(Path p, protected void validateTaskAttemptPathDuringWrite(Path p,
final long expectedLength) throws IOException { final long expectedLength,
String jobId) throws IOException {
String pathStr = p.toString(); String pathStr = p.toString();
Assertions.assertThat(pathStr) Assertions.assertThat(pathStr)
.describedAs("Magic path") .describedAs("Magic path")
.contains(MAGIC); .contains("/" + MAGIC_PATH_PREFIX + jobId + "/");
assertPathDoesNotExist("task attempt visible", p); assertPathDoesNotExist("task attempt visible", p);
} }
@ -129,7 +131,7 @@ protected void validateTaskAttemptPathAfterWrite(Path marker,
/** /**
* The magic committer paths are always on S3, and always have * The magic committer paths are always on S3, and always have
* "__magic" in the path. * "MAGIC PATH" in the path.
* @param committer committer instance * @param committer committer instance
* @param context task attempt context * @param context task attempt context
* @throws IOException IO failure * @throws IOException IO failure
@ -143,11 +145,11 @@ protected void validateTaskAttemptWorkingDirectory(
+ " with committer " + committer, + " with committer " + committer,
"s3a", wd.getScheme()); "s3a", wd.getScheme());
Assertions.assertThat(wd.getPath()) Assertions.assertThat(wd.getPath())
.contains('/' + CommitConstants.MAGIC + '/'); .contains("/" + MAGIC_PATH_PREFIX + committer.getUUID() + "/");
} }
/** /**
* Verify that the __magic path for the application/tasks use the * Verify that the "MAGIC PATH" for the application/tasks use the
* committer UUID to ensure uniqueness in the case of more than * committer UUID to ensure uniqueness in the case of more than
* one job writing to the same destination path. * one job writing to the same destination path.
*/ */
@ -164,7 +166,7 @@ public void testCommittersPathsHaveUUID() throws Throwable {
Assertions.assertThat(taskAttemptPath.toString()) Assertions.assertThat(taskAttemptPath.toString())
.describedAs("task path of %s", committer) .describedAs("task path of %s", committer)
.contains(committer.getUUID()) .contains(committer.getUUID())
.contains(MAGIC) .contains("/" + MAGIC_PATH_PREFIX + committer.getUUID() + "/")
.doesNotContain(TEMP_DATA) .doesNotContain(TEMP_DATA)
.endsWith(BASE) .endsWith(BASE)
.contains(ta0); .contains(ta0);
@ -176,7 +178,7 @@ public void testCommittersPathsHaveUUID() throws Throwable {
.describedAs("Temp task path of %s", committer) .describedAs("Temp task path of %s", committer)
.contains(committer.getUUID()) .contains(committer.getUUID())
.contains(TEMP_DATA) .contains(TEMP_DATA)
.doesNotContain(MAGIC) .doesNotContain("/" + MAGIC_PATH_PREFIX + committer.getUUID() + "/")
.doesNotContain(BASE) .doesNotContain(BASE)
.contains(ta0); .contains(ta0);
} }

View File

@ -33,7 +33,6 @@
import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.CommitUtils; import org.apache.hadoop.fs.s3a.commit.CommitUtils;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
@ -59,6 +58,8 @@ public class ITestS3AHugeMagicCommits extends AbstractSTestS3AHugeFiles {
ITestS3AHugeMagicCommits.class); ITestS3AHugeMagicCommits.class);
private static final int COMMITTER_THREADS = 64; private static final int COMMITTER_THREADS = 64;
private static final String JOB_ID = "123";
private Path magicDir; private Path magicDir;
private Path jobDir; private Path jobDir;
@ -100,8 +101,8 @@ public void setup() throws Exception {
// set up the paths for the commit operation // set up the paths for the commit operation
finalDirectory = new Path(getScaleTestDir(), "commit"); finalDirectory = new Path(getScaleTestDir(), "commit");
magicDir = new Path(finalDirectory, MAGIC); magicDir = new Path(finalDirectory, MAGIC_PATH_PREFIX + JOB_ID);
jobDir = new Path(magicDir, "job_001"); jobDir = new Path(magicDir, "job_" + JOB_ID);
String filename = "commit.bin"; String filename = "commit.bin";
setHugefile(new Path(finalDirectory, filename)); setHugefile(new Path(finalDirectory, filename));
magicOutputFile = new Path(jobDir, filename); magicOutputFile = new Path(jobDir, filename);
@ -141,7 +142,7 @@ public void test_030_postCreationAssertions() throws Throwable {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
CommitOperations operations = new CommitOperations(fs); CommitOperations operations = new CommitOperations(fs);
Path destDir = getHugefile().getParent(); Path destDir = getHugefile().getParent();
assertPathExists("Magic dir", new Path(destDir, CommitConstants.MAGIC)); assertPathExists("Magic dir", new Path(destDir, MAGIC_PATH_PREFIX + JOB_ID));
String destDirKey = fs.pathToKey(destDir); String destDirKey = fs.pathToKey(destDir);
Assertions.assertThat(listMultipartUploads(fs, destDirKey)) Assertions.assertThat(listMultipartUploads(fs, destDirKey))

View File

@ -111,7 +111,8 @@ protected void expectJobCommitToFail(JobContext jContext,
} }
protected void validateTaskAttemptPathDuringWrite(Path p, protected void validateTaskAttemptPathDuringWrite(Path p,
final long expectedLength) throws IOException { final long expectedLength,
String jobId) throws IOException {
// this is expected to be local FS // this is expected to be local FS
ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p); ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p);
} }

View File

@ -74,7 +74,7 @@ public class TestHeaderProcessing extends HadoopTestBase {
private HeaderProcessing headerProcessing; private HeaderProcessing headerProcessing;
private static final String MAGIC_KEY private static final String MAGIC_KEY
= "dest/__magic/job1/ta1/__base/output.csv"; = "dest/__magic_job-1/job1/ta1/__base/output.csv";
private static final String MAGIC_FILE private static final String MAGIC_FILE
= "s3a://bucket/" + MAGIC_KEY; = "s3a://bucket/" + MAGIC_KEY;