diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java index 6e7a99f50e..cbbe5fdc60 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java @@ -51,9 +51,10 @@ public PathOutputCommitter createOutputCommitter(Path outputPath, throw new PathCommitException(outputPath, "Filesystem not supported by this committer"); } - LOG.info("Using Committer {} for {}", + LOG.info("Using Committer {} for {} created by {}", outputCommitter, - outputPath); + outputPath, + this); return outputCommitter; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java index 36d0af187d..7f5455b609 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java @@ -113,11 +113,14 @@ private AbstractS3ACommitterFactory chooseCommitterFactory( // job/task configurations. Configuration fsConf = fileSystem.getConf(); - String name = fsConf.getTrimmed(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE); + String name = fsConf.getTrimmed(FS_S3A_COMMITTER_NAME, ""); + LOG.debug("Committer from filesystems \"{}\"", name); + name = taskConf.getTrimmed(FS_S3A_COMMITTER_NAME, name); - LOG.debug("Committer option is {}", name); + LOG.debug("Committer option is \"{}\"", name); switch (name) { case COMMITTER_NAME_FILE: + case "": factory = null; break; case COMMITTER_NAME_DIRECTORY: diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java index 2ad2568d5c..2561a69f60 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java @@ -19,15 +19,24 @@ package org.apache.hadoop.fs.s3a.commit; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter; import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -35,20 +44,24 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.security.UserGroupInformation; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.COMMITTER_NAME_STAGING; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** - * Tests for some aspects of the committer factory. - * All tests are grouped into one single test so that only one - * S3A FS client is set up and used for the entire run. - * Saves time and money. + * Tests for the committer factory creation/override process. */ -public class ITestS3ACommitterFactory extends AbstractCommitITest { - - - protected static final String INVALID_NAME = "invalid-name"; +@RunWith(Parameterized.class) +public final class ITestS3ACommitterFactory extends AbstractCommitITest { + private static final Logger LOG = LoggerFactory.getLogger( + ITestS3ACommitterFactory.class); + /** + * Name for invalid committer: {@value}. + */ + private static final String INVALID_NAME = "invalid-name"; /** * Counter to guarantee that even in parallel test runs, no job has the same @@ -72,121 +85,156 @@ public class ITestS3ACommitterFactory extends AbstractCommitITest { * Parameterized list of bindings of committer name in config file to * expected class instantiated. */ - private static final Object[][] bindings = { - {COMMITTER_NAME_FILE, FileOutputCommitter.class}, - {COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class}, - {COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class}, - {InternalCommitterConstants.COMMITTER_NAME_STAGING, - StagingCommitter.class}, - {COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class} + private static final Object[][] BINDINGS = { + {"", "", FileOutputCommitter.class, "Default Binding"}, + {COMMITTER_NAME_FILE, "", FileOutputCommitter.class, "File committer in FS"}, + {COMMITTER_NAME_PARTITIONED, "", PartitionedStagingCommitter.class, + "partitoned committer in FS"}, + {COMMITTER_NAME_STAGING, "", StagingCommitter.class, "staging committer in FS"}, + {COMMITTER_NAME_MAGIC, "", MagicS3GuardCommitter.class, "magic committer in FS"}, + {COMMITTER_NAME_DIRECTORY, "", DirectoryStagingCommitter.class, "Dir committer in FS"}, + {INVALID_NAME, "", null, "invalid committer in FS"}, + + {"", COMMITTER_NAME_FILE, FileOutputCommitter.class, "File committer in task"}, + {"", COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class, + "partioned committer in task"}, + {"", COMMITTER_NAME_STAGING, StagingCommitter.class, "staging committer in task"}, + {"", COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class, "magic committer in task"}, + {"", COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class, "Dir committer in task"}, + {"", INVALID_NAME, null, "invalid committer in task"}, }; /** - * This is a ref to the FS conf, so changes here are visible - * to callers querying the FS config. + * Test array for parameterized test runs. + * + * @return the committer binding for this run. */ - private Configuration filesystemConfRef; + @Parameterized.Parameters(name = "{3}-fs=[{0}]-task=[{1}]-[{2}]") + public static Collection params() { + return Arrays.asList(BINDINGS); + } - private Configuration taskConfRef; + /** + * Name of committer to set in filesystem config. If "" do not set one. + */ + private final String fsCommitterName; + + /** + * Name of committer to set in job config. + */ + private final String jobCommitterName; + + /** + * Expected committer class. + * If null: an exception is expected + */ + private final Class committerClass; + + /** + * Description from parameters, simply for thread names to be more informative. + */ + private final String description; + + /** + * Create a parameterized instance. + * @param fsCommitterName committer to set in filesystem config + * @param jobCommitterName committer to set in job config + * @param committerClass expected committer class + * @param description debug text for thread names. + */ + public ITestS3ACommitterFactory( + final String fsCommitterName, + final String jobCommitterName, + final Class committerClass, + final String description) { + this.fsCommitterName = fsCommitterName; + this.jobCommitterName = jobCommitterName; + this.committerClass = committerClass; + this.description = description; + } + + @Override + protected Configuration createConfiguration() { + final Configuration conf = super.createConfiguration(); + // do not cache, because we want the committer one to pick up + // the fs with fs-specific configuration + conf.setBoolean(FS_S3A_IMPL_DISABLE_CACHE, false); + removeBaseAndBucketOverrides(conf, FS_S3A_COMMITTER_NAME); + maybeSetCommitterName(conf, fsCommitterName); + return conf; + } + + /** + * Set a committer name in a configuration. + * @param conf configuration to patch. + * @param name name. If "" the option is unset. + */ + private static void maybeSetCommitterName(final Configuration conf, final String name) { + if (!name.isEmpty()) { + conf.set(FS_S3A_COMMITTER_NAME, name); + } else { + conf.unset(FS_S3A_COMMITTER_NAME); + } + } @Override public void setup() throws Exception { + // destroy all filesystems from previous runs. + FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser()); super.setup(); jobId = randomJobId(); attempt0 = "attempt_" + jobId + "_m_000000_0"; taskAttempt0 = TaskAttemptID.forName(attempt0); - outDir = path(getMethodName()); + outDir = methodPath(); factory = new S3ACommitterFactory(); - Configuration conf = new Configuration(); - conf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString()); - conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0); - conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); - filesystemConfRef = getFileSystem().getConf(); - tContext = new TaskAttemptContextImpl(conf, taskAttempt0); - taskConfRef = tContext.getConfiguration(); + final Configuration fsConf = getConfiguration(); + JobConf jobConf = new JobConf(fsConf); + jobConf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString()); + jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0); + jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); + maybeSetCommitterName(jobConf, jobCommitterName); + tContext = new TaskAttemptContextImpl(jobConf, taskAttempt0); + + LOG.info("{}: Filesystem Committer='{}'; task='{}'", + description, + fsConf.get(FS_S3A_COMMITTER_NAME), + jobConf.get(FS_S3A_COMMITTER_NAME)); } - @Test - public void testEverything() throws Throwable { - testImplicitFileBinding(); - testBindingsInTask(); - testBindingsInFSConfig(); - testInvalidFileBinding(); - testInvalidTaskBinding(); + + @Override + protected void deleteTestDirInTeardown() { + // no-op } /** * Verify that if all config options are unset, the FileOutputCommitter - * * is returned. */ - public void testImplicitFileBinding() throws Throwable { - taskConfRef.unset(FS_S3A_COMMITTER_NAME); - filesystemConfRef.unset(FS_S3A_COMMITTER_NAME); - assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class); - } - - /** - * Verify that task bindings are picked up. - */ - public void testBindingsInTask() throws Throwable { - // set this to an invalid value to be confident it is not - // being checked. - filesystemConfRef.set(FS_S3A_COMMITTER_NAME, "INVALID"); - taskConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE); - assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class); - for (Object[] binding : bindings) { - taskConfRef.set(FS_S3A_COMMITTER_NAME, - (String) binding[0]); - assertFactoryCreatesExpectedCommitter((Class) binding[1]); - } - } - - /** - * Verify that FS bindings are picked up. - */ - public void testBindingsInFSConfig() throws Throwable { - taskConfRef.unset(FS_S3A_COMMITTER_NAME); - filesystemConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE); - assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class); - for (Object[] binding : bindings) { - taskConfRef.set(FS_S3A_COMMITTER_NAME, (String) binding[0]); - assertFactoryCreatesExpectedCommitter((Class) binding[1]); - } - } - - /** - * Create an invalid committer via the FS binding. - */ - public void testInvalidFileBinding() throws Throwable { - taskConfRef.unset(FS_S3A_COMMITTER_NAME); - filesystemConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME); - LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME, - () -> createCommitter()); - } - - /** - * Create an invalid committer via the task attempt. - */ - public void testInvalidTaskBinding() throws Throwable { - filesystemConfRef.unset(FS_S3A_COMMITTER_NAME); - taskConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME); - LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME, - () -> createCommitter()); + @Test + public void testBinding() throws Throwable { + assertFactoryCreatesExpectedCommitter(committerClass); } /** * Assert that the factory creates the expected committer. + * If a null committer is passed in, a {@link PathIOException} + * is expected. * @param expected expected committer class. - * @throws IOException IO failure. + * @throws Exception IO failure. */ - protected void assertFactoryCreatesExpectedCommitter( + private void assertFactoryCreatesExpectedCommitter( final Class expected) - throws IOException { - assertEquals("Wrong Committer from factory", - expected, - createCommitter().getClass()); + throws Exception { + describe("Creating committer: expected class \"%s\"", expected); + if (expected != null) { + assertEquals("Wrong Committer from factory", + expected, + createCommitter().getClass()); + } else { + intercept(PathCommitException.class, this::createCommitter); + } } /** diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index 25247aaaab..7442a357f9 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -49,6 +49,8 @@ log4j.logger.org.apache.hadoop.mapred.ShuffleHandler=WARN log4j.logger.org.apache.hadoop.ipc.Server=WARN #log4j.logger.=WARN +# information about origin of committers +log4j.logger.org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory=DEBUG # for debugging low level S3a operations, uncomment these lines # Log all S3A classes