HADOOP-16357. TeraSort Job failing on S3 DirectoryStagingCommitter: destination path exists.
Contributed by Steve Loughran. This patch * changes the default for the staging committer to append, as we get for the classic FileOutputFormat committer * adds a check for the dest path being a file not a dir * adds tests for this * Changes AbstractCommitTerasortIT. to not use the simple parser, so fails if the file is present. Change-Id: Id53742958ed1cf321ff96c9063505d64f3254f53
This commit is contained in:
parent
5747f6cff5
commit
6a3433bffd
@ -1747,7 +1747,7 @@
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.committer.staging.conflict-mode</name>
|
||||
<value>fail</value>
|
||||
<value>append</value>
|
||||
<description>
|
||||
Staging committer conflict resolution policy.
|
||||
Supported: "fail", "append", "replace".
|
||||
|
@ -198,7 +198,7 @@ public final class CommitConstants {
|
||||
public static final String CONFLICT_MODE_REPLACE = "replace";
|
||||
|
||||
/** Default conflict mode: {@value}. */
|
||||
public static final String DEFAULT_CONFLICT_MODE = CONFLICT_MODE_FAIL;
|
||||
public static final String DEFAULT_CONFLICT_MODE = CONFLICT_MODE_APPEND;
|
||||
|
||||
/**
|
||||
* Number of threads in committers for parallel operations on files
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.fs.s3a.commit.staging;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
@ -25,8 +26,11 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathExistsException;
|
||||
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
|
||||
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
@ -65,11 +69,31 @@ public class DirectoryStagingCommitter extends StagingCommitter {
|
||||
super.setupJob(context);
|
||||
Path outputPath = getOutputPath();
|
||||
FileSystem fs = getDestFS();
|
||||
if (getConflictResolutionMode(context, fs.getConf())
|
||||
== ConflictResolution.FAIL
|
||||
&& fs.exists(outputPath)) {
|
||||
throw failDestinationExists(outputPath,
|
||||
"Setting job as " + getRole());
|
||||
ConflictResolution conflictResolution = getConflictResolutionMode(
|
||||
context, fs.getConf());
|
||||
LOG.info("Conflict Resolution mode is {}", conflictResolution);
|
||||
try {
|
||||
final FileStatus status = fs.getFileStatus(outputPath);
|
||||
|
||||
// if it is not a directory, fail fast for all conflict options.
|
||||
if (!status.isDirectory()) {
|
||||
throw new PathExistsException(outputPath.toString(),
|
||||
"output path is not a directory: "
|
||||
+ InternalCommitterConstants.E_DEST_EXISTS);
|
||||
}
|
||||
switch(conflictResolution) {
|
||||
case FAIL:
|
||||
throw failDestinationExists(outputPath,
|
||||
"Setting job as " + getRole());
|
||||
case APPEND:
|
||||
case REPLACE:
|
||||
LOG.debug("Destination directory exists; conflict policy permits this");
|
||||
}
|
||||
} catch (FileNotFoundException ignored) {
|
||||
// there is no destination path, hence, no conflict.
|
||||
// make the parent directory, which also triggers a recursive directory
|
||||
// creation operation
|
||||
fs.mkdirs(outputPath);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -842,7 +842,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
|
||||
Configuration fsConf) {
|
||||
if (conflictResolution == null) {
|
||||
this.conflictResolution = ConflictResolution.valueOf(
|
||||
getConfictModeOption(context, fsConf));
|
||||
getConfictModeOption(context, fsConf, DEFAULT_CONFLICT_MODE));
|
||||
}
|
||||
return conflictResolution;
|
||||
}
|
||||
@ -889,14 +889,15 @@ public class StagingCommitter extends AbstractS3ACommitter {
|
||||
* Get the conflict mode option string.
|
||||
* @param context context with the config
|
||||
* @param fsConf filesystem config
|
||||
* @param defVal default value.
|
||||
* @return the trimmed configuration option, upper case.
|
||||
*/
|
||||
public static String getConfictModeOption(JobContext context,
|
||||
Configuration fsConf) {
|
||||
Configuration fsConf, String defVal) {
|
||||
return getConfigurationOption(context,
|
||||
fsConf,
|
||||
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE,
|
||||
DEFAULT_CONFLICT_MODE).toUpperCase(Locale.ENGLISH);
|
||||
defVal).toUpperCase(Locale.ENGLISH);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -173,7 +173,7 @@ This then is the problem which the S3A committers address:
|
||||
*How to safely and reliably commit work to Amazon S3 or compatible object store*
|
||||
|
||||
|
||||
## Meet the S3A Commmitters
|
||||
## Meet the S3A Committers
|
||||
|
||||
Since Hadoop 3.1, the S3A FileSystem has been accompanied by classes
|
||||
designed to integrate with the Hadoop and Spark job commit protocols, classes
|
||||
@ -226,8 +226,8 @@ it is committed through the standard "v1" commit algorithm.
|
||||
When the Job is committed, the Job Manager reads the lists of pending writes from its
|
||||
HDFS Job destination directory and completes those uploads.
|
||||
|
||||
Cancelling a task is straightforward: the local directory is deleted with
|
||||
its staged data. Cancelling a job is achieved by reading in the lists of
|
||||
Canceling a task is straightforward: the local directory is deleted with
|
||||
its staged data. Canceling a job is achieved by reading in the lists of
|
||||
pending writes from the HDFS job attempt directory, and aborting those
|
||||
uploads. For extra safety, all outstanding multipart writes to the destination directory
|
||||
are aborted.
|
||||
@ -537,9 +537,8 @@ Conflict management is left to the execution engine itself.
|
||||
|--------|-------|-----------|-------------|---------|---------|
|
||||
| `mapreduce.fileoutputcommitter.marksuccessfuljobs` | X | X | X | Write a `_SUCCESS` file at the end of each job | `true` |
|
||||
| `fs.s3a.committer.threads` | X | X | X | Number of threads in committers for parallel operations on files. | 8 |
|
||||
| `fs.s3a.committer.staging.conflict-mode` | | X | X | Conflict resolution: `fail`, `abort` or `overwrite`| `fail` |
|
||||
| `fs.s3a.committer.staging.conflict-mode` | | X | X | Conflict resolution: `fail`, `append` or `replace`| `append` |
|
||||
| `fs.s3a.committer.staging.unique-filenames` | | X | X | Generate unique filenames | `true` |
|
||||
|
||||
| `fs.s3a.committer.magic.enabled` | X | | | Enable "magic committer" support in the filesystem | `false` |
|
||||
|
||||
|
||||
@ -607,7 +606,7 @@ Conflict management is left to the execution engine itself.
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.committer.staging.conflict-mode</name>
|
||||
<value>fail</value>
|
||||
<value>append</value>
|
||||
<description>
|
||||
Staging committer conflict resolution policy.
|
||||
Supported: "fail", "append", "replace".
|
||||
|
@ -42,6 +42,8 @@ import org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
/**
|
||||
* Relays FS calls to the mocked FS, allows for some extra logging with
|
||||
* stack traces to be included, stubbing out other methods
|
||||
@ -240,6 +242,12 @@ public class MockS3AFileSystem extends S3AFileSystem {
|
||||
mock.setWorkingDirectory(newDir);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean mkdirs(Path f) throws IOException {
|
||||
event("mkdirs(%s)", f);
|
||||
return mock.mkdirs(f);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
||||
event("mkdirs(%s)", f);
|
||||
@ -249,7 +257,8 @@ public class MockS3AFileSystem extends S3AFileSystem {
|
||||
@Override
|
||||
public FileStatus getFileStatus(Path f) throws IOException {
|
||||
event("getFileStatus(%s)", f);
|
||||
return mock.getFileStatus(f);
|
||||
return checkNotNull(mock.getFileStatus(f),
|
||||
"Mock getFileStatus(%s) returned null", f);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -18,10 +18,8 @@
|
||||
|
||||
package org.apache.hadoop.fs.s3a.commit;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -53,7 +51,6 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
|
||||
|
||||
/**
|
||||
@ -109,6 +106,13 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
Configuration conf = super.createConfiguration();
|
||||
String bucketName = getTestBucketName(conf);
|
||||
removeBucketOverrides(bucketName, conf,
|
||||
MAGIC_COMMITTER_ENABLED,
|
||||
S3A_COMMITTER_FACTORY_KEY,
|
||||
FS_S3A_COMMITTER_NAME,
|
||||
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
|
||||
|
||||
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
|
||||
conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
|
||||
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
|
||||
@ -355,25 +359,7 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
|
||||
* @throws IOException IO Failure
|
||||
*/
|
||||
protected SuccessData verifySuccessMarker(Path dir) throws IOException {
|
||||
assertPathExists("Success marker",
|
||||
new Path(dir, _SUCCESS));
|
||||
SuccessData successData = loadSuccessMarker(dir);
|
||||
log().info("Success data {}", successData.toString());
|
||||
log().info("Metrics\n{}",
|
||||
successData.dumpMetrics(" ", " = ", "\n"));
|
||||
log().info("Diagnostics\n{}",
|
||||
successData.dumpDiagnostics(" ", " = ", "\n"));
|
||||
return successData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the success marker and return the data inside it.
|
||||
* @param dir directory containing the marker
|
||||
* @return the loaded data
|
||||
* @throws IOException on any failure to load or validate the data
|
||||
*/
|
||||
protected SuccessData loadSuccessMarker(Path dir) throws IOException {
|
||||
return SuccessData.load(getFileSystem(), new Path(dir, _SUCCESS));
|
||||
return validateSuccessFile(dir, "", getFileSystem(), "query");
|
||||
}
|
||||
|
||||
/**
|
||||
@ -447,32 +433,18 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
|
||||
/**
|
||||
* Load in the success data marker: this guarantees that an S3A
|
||||
* committer was used.
|
||||
* @param fs filesystem
|
||||
* @param outputPath path of job
|
||||
* @param committerName name of committer to match
|
||||
* @param committerName name of committer to match, or ""
|
||||
* @param fs filesystem
|
||||
* @param origin origin (e.g. "teragen" for messages)
|
||||
* @return the success data
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
public static SuccessData validateSuccessFile(final S3AFileSystem fs,
|
||||
final Path outputPath, final String committerName) throws IOException {
|
||||
SuccessData successData = null;
|
||||
try {
|
||||
successData = loadSuccessFile(fs, outputPath);
|
||||
} catch (FileNotFoundException e) {
|
||||
// either the output path is missing or, if its the success file,
|
||||
// somehow the relevant committer wasn't picked up.
|
||||
String dest = outputPath.toString();
|
||||
LOG.error("No _SUCCESS file found under {}", dest);
|
||||
List<String> files = new ArrayList<>();
|
||||
applyLocatedFiles(fs.listFiles(outputPath, true),
|
||||
(status) -> {
|
||||
files.add(status.getPath().toString());
|
||||
LOG.error("{} {}", status.getPath(), status.getLen());
|
||||
});
|
||||
throw new AssertionError("No _SUCCESS file in " + dest
|
||||
+ "; found : " + files.stream().collect(Collectors.joining("\n")),
|
||||
e);
|
||||
}
|
||||
public static SuccessData validateSuccessFile(final Path outputPath,
|
||||
final String committerName,
|
||||
final S3AFileSystem fs,
|
||||
final String origin) throws IOException {
|
||||
SuccessData successData = loadSuccessFile(fs, outputPath, origin);
|
||||
String commitDetails = successData.toString();
|
||||
LOG.info("Committer name " + committerName + "\n{}",
|
||||
commitDetails);
|
||||
@ -480,8 +452,10 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
|
||||
successData.dumpMetrics(" ", " = ", "\n"));
|
||||
LOG.info("Diagnostics\n{}",
|
||||
successData.dumpDiagnostics(" ", " = ", "\n"));
|
||||
assertEquals("Wrong committer in " + commitDetails,
|
||||
committerName, successData.getCommitter());
|
||||
if (!committerName.isEmpty()) {
|
||||
assertEquals("Wrong committer in " + commitDetails,
|
||||
committerName, successData.getCommitter());
|
||||
}
|
||||
return successData;
|
||||
}
|
||||
|
||||
@ -489,16 +463,29 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
|
||||
* Load a success file; fail if the file is empty/nonexistent.
|
||||
* @param fs filesystem
|
||||
* @param outputPath directory containing the success file.
|
||||
* @param origin origin of the file
|
||||
* @return the loaded file.
|
||||
* @throws IOException failure to find/load the file
|
||||
* @throws AssertionError file is 0-bytes long
|
||||
* @throws AssertionError file is 0-bytes long,
|
||||
*/
|
||||
public static SuccessData loadSuccessFile(final S3AFileSystem fs,
|
||||
final Path outputPath) throws IOException {
|
||||
final Path outputPath, final String origin) throws IOException {
|
||||
ContractTestUtils.assertPathExists(fs,
|
||||
"Output directory " + outputPath
|
||||
+ " from " + origin
|
||||
+ " not found: Job may not have executed",
|
||||
outputPath);
|
||||
Path success = new Path(outputPath, _SUCCESS);
|
||||
ContractTestUtils.assertIsFile(fs, success);
|
||||
FileStatus status = fs.getFileStatus(success);
|
||||
assertTrue("0 byte success file - not a s3guard committer " + success,
|
||||
FileStatus status = ContractTestUtils.verifyPathExists(fs,
|
||||
"job completion marker " + success
|
||||
+ " from " + origin
|
||||
+ " not found: Job may have failed",
|
||||
success);
|
||||
assertTrue("_SUCCESS outout from " + origin + " is not a file " + status,
|
||||
status.isFile());
|
||||
assertTrue("0 byte success file "
|
||||
+ success + " from " + origin
|
||||
+ "; a s3guard committer was not used",
|
||||
status.getLen() > 0);
|
||||
return SuccessData.load(fs, success);
|
||||
}
|
||||
|
@ -147,8 +147,8 @@ public abstract class AbstractITCommitMRJob extends AbstractYarnClusterITest {
|
||||
}
|
||||
Collections.sort(actualFiles);
|
||||
|
||||
SuccessData successData = validateSuccessFile(fs, outputPath,
|
||||
committerName());
|
||||
SuccessData successData = validateSuccessFile(outputPath, committerName(),
|
||||
fs, "MR job");
|
||||
List<String> successFiles = successData.getFilenames();
|
||||
String commitData = successData.toString();
|
||||
assertTrue("No filenames in " + commitData,
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.fs.s3a.commit.staging;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.Serializable;
|
||||
@ -55,6 +56,7 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -196,9 +198,34 @@ public class StagingTestBase {
|
||||
when(mockS3.exists(path)).thenReturn(true);
|
||||
}
|
||||
|
||||
public static void pathIsDirectory(FileSystem mockS3, Path path)
|
||||
throws IOException {
|
||||
hasFileStatus(mockS3, path,
|
||||
new FileStatus(0, true, 0, 0, 0, path));
|
||||
}
|
||||
|
||||
public static void pathIsFile(FileSystem mockS3, Path path)
|
||||
throws IOException {
|
||||
pathExists(mockS3, path);
|
||||
hasFileStatus(mockS3, path,
|
||||
new FileStatus(0, false, 0, 0, 0, path));
|
||||
}
|
||||
|
||||
public static void pathDoesNotExist(FileSystem mockS3, Path path)
|
||||
throws IOException {
|
||||
when(mockS3.exists(path)).thenReturn(false);
|
||||
when(mockS3.getFileStatus(path)).thenThrow(
|
||||
new FileNotFoundException("mock fnfe of " + path));
|
||||
}
|
||||
|
||||
public static void hasFileStatus(FileSystem mockS3,
|
||||
Path path, FileStatus status) throws IOException {
|
||||
when(mockS3.getFileStatus(path)).thenReturn(status);
|
||||
}
|
||||
|
||||
public static void mkdirsHasOutcome(FileSystem mockS3,
|
||||
Path path, boolean outcome) throws IOException {
|
||||
when(mockS3.mkdirs(path)).thenReturn(outcome);
|
||||
}
|
||||
|
||||
public static void canDelete(FileSystem mockS3, String... children)
|
||||
@ -221,7 +248,12 @@ public class StagingTestBase {
|
||||
|
||||
public static void verifyExistenceChecked(FileSystem mockS3, Path path)
|
||||
throws IOException {
|
||||
verify(mockS3).exists(path);
|
||||
verify(mockS3).getFileStatus(path);
|
||||
}
|
||||
|
||||
public static void verifyMkdirsInvoked(FileSystem mockS3, Path path)
|
||||
throws IOException {
|
||||
verify(mockS3).mkdirs(path);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -117,10 +117,10 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
||||
|
||||
/**
|
||||
* Test array for parameterized test runs: how many threads and
|
||||
* how many files to use.
|
||||
* whether or not filenames are unique.
|
||||
* @return a list of parameter tuples.
|
||||
*/
|
||||
@Parameterized.Parameters
|
||||
@Parameterized.Parameters(name="threads-{0}-unique-{1}")
|
||||
public static Collection<Object[]> params() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{0, false},
|
||||
|
@ -18,9 +18,15 @@
|
||||
|
||||
package org.apache.hadoop.fs.s3a.commit.staging;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.PathExistsException;
|
||||
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
|
||||
@ -34,6 +40,9 @@ import static org.mockito.Mockito.*;
|
||||
public class TestStagingDirectoryOutputCommitter
|
||||
extends StagingTestBase.JobCommitterTest<DirectoryStagingCommitter> {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestStagingDirectoryOutputCommitter.class);
|
||||
|
||||
@Override
|
||||
DirectoryStagingCommitter newJobCommitter() throws Exception {
|
||||
return new DirectoryStagingCommitter(outputPath,
|
||||
@ -53,8 +62,10 @@ public class TestStagingDirectoryOutputCommitter
|
||||
public void testDefaultConflictResolution() throws Exception {
|
||||
getJob().getConfiguration().unset(
|
||||
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
|
||||
verifyFailureConflictOutcome();
|
||||
pathIsDirectory(getMockS3A(), outputPath);
|
||||
verifyJobSetupAndCommit();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailConflictResolution() throws Exception {
|
||||
getJob().getConfiguration().set(
|
||||
@ -63,8 +74,7 @@ public class TestStagingDirectoryOutputCommitter
|
||||
}
|
||||
|
||||
protected void verifyFailureConflictOutcome() throws Exception {
|
||||
FileSystem mockS3 = getMockS3A();
|
||||
pathExists(mockS3, outputPath);
|
||||
pathIsDirectory(getMockS3A(), outputPath);
|
||||
final DirectoryStagingCommitter committer = newJobCommitter();
|
||||
|
||||
// this should fail
|
||||
@ -76,32 +86,36 @@ public class TestStagingDirectoryOutputCommitter
|
||||
// but there are no checks in job commit (HADOOP-15469)
|
||||
committer.commitJob(getJob());
|
||||
|
||||
reset(mockS3);
|
||||
pathDoesNotExist(mockS3, outputPath);
|
||||
reset((FileSystem) getMockS3A());
|
||||
pathDoesNotExist(getMockS3A(), outputPath);
|
||||
|
||||
committer.setupJob(getJob());
|
||||
verifyExistenceChecked(mockS3, outputPath);
|
||||
verifyNoMoreInteractions(mockS3);
|
||||
verifyExistenceChecked(getMockS3A(), outputPath);
|
||||
verifyMkdirsInvoked(getMockS3A(), outputPath);
|
||||
verifyNoMoreInteractions((FileSystem) getMockS3A());
|
||||
|
||||
reset(mockS3);
|
||||
pathDoesNotExist(mockS3, outputPath);
|
||||
reset((FileSystem) getMockS3A());
|
||||
pathDoesNotExist(getMockS3A(), outputPath);
|
||||
committer.commitJob(getJob());
|
||||
verifyCompletion(mockS3);
|
||||
verifyCompletion(getMockS3A());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendConflictResolution() throws Exception {
|
||||
FileSystem mockS3 = getMockS3A();
|
||||
|
||||
pathExists(mockS3, outputPath);
|
||||
|
||||
getJob().getConfiguration().set(
|
||||
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
|
||||
FileSystem mockS3 = getMockS3A();
|
||||
pathIsDirectory(mockS3, outputPath);
|
||||
verifyJobSetupAndCommit();
|
||||
}
|
||||
|
||||
protected void verifyJobSetupAndCommit()
|
||||
throws Exception {
|
||||
final DirectoryStagingCommitter committer = newJobCommitter();
|
||||
|
||||
committer.setupJob(getJob());
|
||||
verifyNoMoreInteractions(mockS3);
|
||||
FileSystem mockS3 = getMockS3A();
|
||||
|
||||
Mockito.reset(mockS3);
|
||||
pathExists(mockS3, outputPath);
|
||||
@ -114,7 +128,7 @@ public class TestStagingDirectoryOutputCommitter
|
||||
public void testReplaceConflictResolution() throws Exception {
|
||||
FileSystem mockS3 = getMockS3A();
|
||||
|
||||
pathExists(mockS3, outputPath);
|
||||
pathIsDirectory(mockS3, outputPath);
|
||||
|
||||
getJob().getConfiguration().set(
|
||||
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_REPLACE);
|
||||
@ -122,7 +136,6 @@ public class TestStagingDirectoryOutputCommitter
|
||||
final DirectoryStagingCommitter committer = newJobCommitter();
|
||||
|
||||
committer.setupJob(getJob());
|
||||
verifyNoMoreInteractions(mockS3);
|
||||
|
||||
Mockito.reset(mockS3);
|
||||
pathExists(mockS3, outputPath);
|
||||
@ -133,4 +146,49 @@ public class TestStagingDirectoryOutputCommitter
|
||||
verifyCompletion(mockS3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplaceConflictFailsIfDestIsFile() throws Exception {
|
||||
pathIsFile(getMockS3A(), outputPath);
|
||||
|
||||
getJob().getConfiguration().set(
|
||||
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_REPLACE);
|
||||
|
||||
intercept(PathExistsException.class,
|
||||
InternalCommitterConstants.E_DEST_EXISTS,
|
||||
"Expected a PathExistsException as the destination"
|
||||
+ " was a file",
|
||||
() -> {
|
||||
newJobCommitter().setupJob(getJob());
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendConflictFailsIfDestIsFile() throws Exception {
|
||||
pathIsFile(getMockS3A(), outputPath);
|
||||
|
||||
getJob().getConfiguration().set(
|
||||
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
|
||||
|
||||
intercept(PathExistsException.class,
|
||||
InternalCommitterConstants.E_DEST_EXISTS,
|
||||
"Expected a PathExistsException as a the destination"
|
||||
+ " is a file",
|
||||
() -> {
|
||||
newJobCommitter().setupJob(getJob());
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateDefaultConflictMode() throws Throwable {
|
||||
Configuration baseConf = new Configuration(true);
|
||||
String[] sources = baseConf.getPropertySources(
|
||||
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
|
||||
String sourceStr = Arrays.stream(sources)
|
||||
.collect(Collectors.joining(","));
|
||||
LOG.info("source of conflict mode {}", sourceStr);
|
||||
String baseConfVal = baseConf
|
||||
.getTrimmed(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
|
||||
assertEquals("conflict mode in core config from " + sourceStr,
|
||||
CONFLICT_MODE_APPEND, baseConfVal);
|
||||
}
|
||||
}
|
||||
|
@ -33,7 +33,6 @@ import org.junit.Test;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathExistsException;
|
||||
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
|
||||
@ -80,48 +79,13 @@ public class TestStagingPartitionedTaskCommit
|
||||
|
||||
@Test
|
||||
public void testDefault() throws Exception {
|
||||
FileSystem mockS3 = getMockS3A();
|
||||
|
||||
JobContext job = getJob();
|
||||
job.getConfiguration().unset(
|
||||
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
|
||||
final PartitionedStagingCommitter committer = newTaskCommitter();
|
||||
|
||||
committer.setupTask(getTAC());
|
||||
assertConflictResolution(committer, job, ConflictResolution.FAIL);
|
||||
createTestOutputFiles(relativeFiles,
|
||||
committer.getTaskAttemptPath(getTAC()), getTAC().getConfiguration());
|
||||
|
||||
// test failure when one partition already exists
|
||||
reset(mockS3);
|
||||
Path exists = new Path(outputPath, relativeFiles.get(0)).getParent();
|
||||
pathExists(mockS3, exists);
|
||||
|
||||
intercept(PathExistsException.class,
|
||||
InternalCommitterConstants.E_DEST_EXISTS,
|
||||
"Expected a PathExistsException as a partition"
|
||||
+ " already exists:" + exists,
|
||||
() -> {
|
||||
committer.commitTask(getTAC());
|
||||
mockS3.getFileStatus(exists);
|
||||
});
|
||||
|
||||
// test success
|
||||
reset(mockS3);
|
||||
|
||||
committer.commitTask(getTAC());
|
||||
Set<String> files = Sets.newHashSet();
|
||||
for (InitiateMultipartUploadRequest request :
|
||||
getMockResults().getRequests().values()) {
|
||||
assertEquals(BUCKET, request.getBucketName());
|
||||
files.add(request.getKey());
|
||||
}
|
||||
assertEquals("Should have the right number of uploads",
|
||||
relativeFiles.size(), files.size());
|
||||
|
||||
Set<String> expected = buildExpectedList(committer);
|
||||
|
||||
assertEquals("Should have correct paths", expected, files);
|
||||
assertConflictResolution(committer, job, ConflictResolution.APPEND);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -19,7 +19,12 @@
|
||||
package org.apache.hadoop.fs.s3a.commit.staging.integration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
|
||||
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
||||
@ -30,6 +35,9 @@ import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.JobStatus;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE;
|
||||
|
||||
/** ITest of the low level protocol methods. */
|
||||
public class ITestDirectoryCommitProtocol extends ITestStagingCommitProtocol {
|
||||
|
||||
@ -57,6 +65,32 @@ public class ITestDirectoryCommitProtocol extends ITestStagingCommitProtocol {
|
||||
return new CommitterWithFailedThenSucceed(getOutDir(), tContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is here because somehow test runs were failing with
|
||||
* the confict mode being fail. Unsetting per-bucket options
|
||||
* in setup made this go away; its retained for regression
|
||||
* testing
|
||||
*/
|
||||
@Test
|
||||
public void testValidateDefaultConflictMode() throws Throwable {
|
||||
describe("Checking default conflict mode adoption");
|
||||
Configuration baseConf = new Configuration(true);
|
||||
String[] sources = baseConf.getPropertySources(
|
||||
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
|
||||
String sourceStr = Arrays.stream(sources)
|
||||
.collect(Collectors.joining(","));
|
||||
String baseConfVal = baseConf
|
||||
.getTrimmed(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
|
||||
assertEquals("conflict mode in core config from "+ sourceStr,
|
||||
CONFLICT_MODE_APPEND, baseConfVal);
|
||||
|
||||
Configuration fsConf = getFileSystem().getConf();
|
||||
String conflictModeDefVal = fsConf
|
||||
.getTrimmed(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
|
||||
assertEquals("conflict mode in filesystem",
|
||||
CONFLICT_MODE_APPEND, conflictModeDefVal);
|
||||
}
|
||||
|
||||
/**
|
||||
* The class provides a overridden implementation of commitJobInternal which
|
||||
* causes the commit failed for the first time then succeed.
|
||||
|
@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.hadoop.fs.s3a.commit.terasort;
|
||||
|
||||
import java.io.File;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Optional;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
@ -27,13 +29,13 @@ import org.junit.runners.MethodSorters;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.examples.terasort.TeraGen;
|
||||
import org.apache.hadoop.examples.terasort.TeraSort;
|
||||
import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
|
||||
import org.apache.hadoop.examples.terasort.TeraValidate;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.util.DurationInfo;
|
||||
@ -108,6 +110,9 @@ public abstract class AbstractCommitTerasortIT extends
|
||||
yarnConfig.setBoolean(
|
||||
TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
|
||||
true);
|
||||
yarnConfig.setBoolean(
|
||||
TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
|
||||
false);
|
||||
terasortPath = new Path("/terasort-" + getClass().getSimpleName())
|
||||
.makeQualified(getFileSystem());
|
||||
sortInput = new Path(terasortPath, "sortin");
|
||||
@ -143,7 +148,7 @@ public abstract class AbstractCommitTerasortIT extends
|
||||
assertEquals(stage
|
||||
+ "(" + StringUtils.join(", ", args) + ")"
|
||||
+ " failed", 0, result);
|
||||
validateSuccessFile(getFileSystem(), dest, committerName());
|
||||
validateSuccessFile(dest, committerName(), getFileSystem(), stage);
|
||||
return Optional.of(d);
|
||||
}
|
||||
|
||||
@ -161,6 +166,7 @@ public abstract class AbstractCommitTerasortIT extends
|
||||
@Test
|
||||
public void test_110_teragen() throws Throwable {
|
||||
describe("Teragen to %s", sortInput);
|
||||
getFileSystem().delete(sortInput, true);
|
||||
|
||||
JobConf jobConf = newJobConf();
|
||||
patchConfigurationForCommitter(jobConf);
|
||||
@ -174,7 +180,9 @@ public abstract class AbstractCommitTerasortIT extends
|
||||
@Test
|
||||
public void test_120_terasort() throws Throwable {
|
||||
describe("Terasort from %s to %s", sortInput, sortOutput);
|
||||
loadSuccessFile(getFileSystem(), sortInput);
|
||||
getFileSystem().delete(sortOutput, true);
|
||||
|
||||
loadSuccessFile(getFileSystem(), sortInput, "previous teragen stage");
|
||||
JobConf jobConf = newJobConf();
|
||||
patchConfigurationForCommitter(jobConf);
|
||||
// this job adds some data, so skip it.
|
||||
@ -189,7 +197,8 @@ public abstract class AbstractCommitTerasortIT extends
|
||||
@Test
|
||||
public void test_130_teravalidate() throws Throwable {
|
||||
describe("TeraValidate from %s to %s", sortOutput, sortValidate);
|
||||
loadSuccessFile(getFileSystem(), sortOutput);
|
||||
getFileSystem().delete(sortValidate, true);
|
||||
loadSuccessFile(getFileSystem(), sortOutput, "previous terasort stage");
|
||||
JobConf jobConf = newJobConf();
|
||||
patchConfigurationForCommitter(jobConf);
|
||||
teravalidateStageDuration = executeStage("TeraValidate",
|
||||
@ -224,9 +233,9 @@ public abstract class AbstractCommitTerasortIT extends
|
||||
stage.accept("Validate", teravalidateStageDuration);
|
||||
stage.accept("Completed", terasortDuration);
|
||||
String text = results.toString();
|
||||
Path path = new Path(terasortPath, "results.csv");
|
||||
LOG.info("Results are in {}\n{}", path, text);
|
||||
ContractTestUtils.writeTextFile(getFileSystem(), path, text, true);
|
||||
File resultsFile = File.createTempFile("results", ".csv");
|
||||
FileUtils.write(resultsFile, text, Charset.forName("UTF-8"));
|
||||
LOG.info("Results are in {}\n{}", resultsFile, text);
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user