HADOOP-15469. S3A directory committer commit job fails if _temporary directory created under dest.
Contributed by Steve Loughran. (cherry picked from commit 170f1040d46f9b1a084e6637def91e9864446acc)
This commit is contained in:
parent
c0ec061e28
commit
cc3600aabd
@ -93,11 +93,8 @@ protected void preCommitJob(JobContext context,
|
|||||||
Configuration fsConf = fs.getConf();
|
Configuration fsConf = fs.getConf();
|
||||||
switch (getConflictResolutionMode(context, fsConf)) {
|
switch (getConflictResolutionMode(context, fsConf)) {
|
||||||
case FAIL:
|
case FAIL:
|
||||||
// this was checked in setupJob, but this avoids some cases where
|
// this was checked in setupJob; temporary files may have been
|
||||||
// output was created while the job was processing
|
// created, so do not check again.
|
||||||
if (fs.exists(outputPath)) {
|
|
||||||
throw new PathExistsException(outputPath.toString(), E_DEST_EXISTS);
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case APPEND:
|
case APPEND:
|
||||||
// do nothing
|
// do nothing
|
||||||
|
@ -67,15 +67,14 @@ protected void verifyFailureConflictOutcome() throws Exception {
|
|||||||
pathExists(mockS3, OUTPUT_PATH);
|
pathExists(mockS3, OUTPUT_PATH);
|
||||||
final DirectoryStagingCommitter committer = newJobCommitter();
|
final DirectoryStagingCommitter committer = newJobCommitter();
|
||||||
|
|
||||||
|
// this should fail
|
||||||
intercept(PathExistsException.class,
|
intercept(PathExistsException.class,
|
||||||
InternalCommitterConstants.E_DEST_EXISTS,
|
InternalCommitterConstants.E_DEST_EXISTS,
|
||||||
"Should throw an exception because the path exists",
|
"Should throw an exception because the path exists",
|
||||||
() -> committer.setupJob(getJob()));
|
() -> committer.setupJob(getJob()));
|
||||||
|
|
||||||
intercept(PathExistsException.class,
|
// but there are no checks in job commit (HADOOP-15469)
|
||||||
InternalCommitterConstants.E_DEST_EXISTS,
|
committer.commitJob(getJob());
|
||||||
"Should throw an exception because the path exists",
|
|
||||||
() -> committer.commitJob(getJob()));
|
|
||||||
|
|
||||||
reset(mockS3);
|
reset(mockS3);
|
||||||
pathDoesNotExist(mockS3, OUTPUT_PATH);
|
pathDoesNotExist(mockS3, OUTPUT_PATH);
|
||||||
@ -87,7 +86,6 @@ protected void verifyFailureConflictOutcome() throws Exception {
|
|||||||
reset(mockS3);
|
reset(mockS3);
|
||||||
pathDoesNotExist(mockS3, OUTPUT_PATH);
|
pathDoesNotExist(mockS3, OUTPUT_PATH);
|
||||||
committer.commitJob(getJob());
|
committer.commitJob(getJob());
|
||||||
verifyExistenceChecked(mockS3, OUTPUT_PATH);
|
|
||||||
verifyCompletion(mockS3);
|
verifyCompletion(mockS3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user