HADOOP-18793. S3A StagingCommitter does not clean up staging-uploads directory (#5818)

Contributed by Harunobu Daikoku
This commit is contained in:
Harunobu Daikoku 2023-07-08 18:53:54 +07:00 committed by GitHub
parent a36d8adfd1
commit e3683a954f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 128 additions and 11 deletions

View File

@ -227,6 +227,19 @@ public final class Paths {
MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
}
/**
* Build a qualified parent path for the temporary multipart upload commit
* directory built by {@link #getMultipartUploadCommitsDirectory(Configuration, String)}.
* @param conf configuration defining default FS.
* @param uuid uuid of job
* @return a path which can be used for temporary work
* @throws IOException on an IO failure.
*/
public static Path getStagingUploadsParentDirectory(Configuration conf,
String uuid) throws IOException {
return getMultipartUploadCommitsDirectory(conf, uuid).getParent();
}
/**
* Build a qualified temporary path for the multipart upload commit
* information in the cluster filesystem.

View File

@ -501,8 +501,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
/**
* Staging committer cleanup includes calling wrapped committer's
* cleanup method, and removing all destination paths in the final
* filesystem.
* cleanup method, and removing staging uploads path and all
* destination paths in the final filesystem.
* @param commitContext commit context
* @param suppressExceptions should exceptions be suppressed?
* @throws IOException IO failures if exceptions are not suppressed.
@ -515,6 +515,9 @@ public class StagingCommitter extends AbstractS3ACommitter {
maybeIgnore(suppressExceptions, "Cleanup wrapped committer",
() -> wrappedCommitter.cleanupJob(
commitContext.getJobContext()));
maybeIgnore(suppressExceptions, "Delete staging uploads path",
() -> deleteStagingUploadsParentDirectory(
commitContext.getJobContext()));
maybeIgnore(suppressExceptions, "Delete destination paths",
() -> deleteDestinationPaths(
commitContext.getJobContext()));
@ -543,11 +546,26 @@ public class StagingCommitter extends AbstractS3ACommitter {
}
}
/**
* Delete the multipart upload staging directory.
* @param context job context
* @throws IOException IO failure
*/
protected void deleteStagingUploadsParentDirectory(JobContext context)
throws IOException {
Path stagingUploadsPath = Paths.getStagingUploadsParentDirectory(
context.getConfiguration(), getUUID());
ignoreIOExceptions(LOG,
"Deleting staging uploads path", stagingUploadsPath.toString(),
() -> deleteWithWarning(
stagingUploadsPath.getFileSystem(getConf()),
stagingUploadsPath,
true));
}
/**
* Delete the working paths of a job.
* <ol>
* <li>The job attempt path</li>
* <li>{@code $dest/__temporary}</li>
* <li>the local working directory for staged files</li>
* </ol>
@ -556,14 +574,6 @@ public class StagingCommitter extends AbstractS3ACommitter {
* @throws IOException IO failure
*/
protected void deleteDestinationPaths(JobContext context) throws IOException {
Path attemptPath = getJobAttemptPath(context);
ignoreIOExceptions(LOG,
"Deleting Job attempt Path", attemptPath.toString(),
() -> deleteWithWarning(
getJobAttemptFileSystem(context),
attemptPath,
true));
// delete the __temporary directory. This will cause problems
// if there is >1 task targeting the same dest dir
deleteWithWarning(getDestFS(),

View File

@ -403,6 +403,30 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
this.committer = committer;
conf = job.getConfiguration();
}
public Job getJob() {
return job;
}
public JobContext getJContext() {
return jContext;
}
public TaskAttemptContext getTContext() {
return tContext;
}
public AbstractS3ACommitter getCommitter() {
return committer;
}
public Configuration getConf() {
return conf;
}
public Path getWrittenTextPath() {
return writtenTextPath;
}
}
/**

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.fs.s3a.commit.staging.integration;
import java.io.IOException;
import java.util.UUID;
import org.junit.Test;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -141,6 +143,74 @@ public class ITestStagingCommitProtocol extends AbstractITCommitProtocol {
assertEquals("file", wd.toUri().getScheme());
}
@Test
public void testStagingUploadsDirectoryCleanedUp() throws Exception {
describe("Assert that the staging uploads directory is cleaned up after successful commit");
JobData jobData = startJob(false);
JobContext jContext = jobData.getJContext();
TaskAttemptContext tContext = jobData.getTContext();
StagingCommitter committer = (StagingCommitter) jobData.getCommitter();
Path stagingUploadsDir = Paths.getStagingUploadsParentDirectory(
jContext.getConfiguration(),
committer.getUUID());
ContractTestUtils.assertPathExists(
stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
"staging uploads path must exist after setupJob",
stagingUploadsDir
);
// write output
writeTextOutput(tContext);
// do commit
committer.commitTask(tContext);
commitJob(committer, jContext);
ContractTestUtils.assertPathDoesNotExist(
stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
"staging uploads path must not exist after commitJob",
stagingUploadsDir
);
}
@Test
public void testStagingUploadsDirectoryCleanedUpWithFailure() throws Exception {
describe("Assert that the staging uploads directory is cleaned up after failed commit");
JobData jobData = startJob(new FailingCommitterFactory(), false);
JobContext jContext = jobData.getJContext();
TaskAttemptContext tContext = jobData.getTContext();
StagingCommitter committer = (StagingCommitter) jobData.getCommitter();
Path stagingUploadsDir = Paths.getStagingUploadsParentDirectory(
jContext.getConfiguration(),
committer.getUUID());
ContractTestUtils.assertPathExists(
stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
"staging uploads path must exist after setupJob",
stagingUploadsDir
);
// do commit
committer.commitTask(tContext);
// now fail job
expectSimulatedFailureOnJobCommit(jContext, committer);
commitJob(committer, jContext);
expectJobCommitToFail(jContext, committer);
ContractTestUtils.assertPathDoesNotExist(
stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
"staging uploads path must not exist after commitJob",
stagingUploadsDir
);
}
/**
* The class provides a overridden implementation of commitJobInternal which
* causes the commit failed for the first time then succeed.