HADOOP-19189. ITestS3ACommitterFactory failing (#6857)
* parameterize the test run rather than do it from within the test suite. * log what the committer factory is up to (and improve its logging) * close all filesystems, then create the test filesystem with cache enabled. The cache is critical, we want the fs from cache to be used when querying filesystem properties, rather than one created from the committer jobconf, which will have the same options as the task committer, so not actually validate the override logic. Contributed by Steve Loughran
This commit is contained in:
parent
bbb17e76a7
commit
01d257d5aa
@ -51,9 +51,10 @@ public PathOutputCommitter createOutputCommitter(Path outputPath,
|
|||||||
throw new PathCommitException(outputPath,
|
throw new PathCommitException(outputPath,
|
||||||
"Filesystem not supported by this committer");
|
"Filesystem not supported by this committer");
|
||||||
}
|
}
|
||||||
LOG.info("Using Committer {} for {}",
|
LOG.info("Using Committer {} for {} created by {}",
|
||||||
outputCommitter,
|
outputCommitter,
|
||||||
outputPath);
|
outputPath,
|
||||||
|
this);
|
||||||
return outputCommitter;
|
return outputCommitter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,11 +113,14 @@ private AbstractS3ACommitterFactory chooseCommitterFactory(
|
|||||||
// job/task configurations.
|
// job/task configurations.
|
||||||
Configuration fsConf = fileSystem.getConf();
|
Configuration fsConf = fileSystem.getConf();
|
||||||
|
|
||||||
String name = fsConf.getTrimmed(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
|
String name = fsConf.getTrimmed(FS_S3A_COMMITTER_NAME, "");
|
||||||
|
LOG.debug("Committer from filesystems \"{}\"", name);
|
||||||
|
|
||||||
name = taskConf.getTrimmed(FS_S3A_COMMITTER_NAME, name);
|
name = taskConf.getTrimmed(FS_S3A_COMMITTER_NAME, name);
|
||||||
LOG.debug("Committer option is {}", name);
|
LOG.debug("Committer option is \"{}\"", name);
|
||||||
switch (name) {
|
switch (name) {
|
||||||
case COMMITTER_NAME_FILE:
|
case COMMITTER_NAME_FILE:
|
||||||
|
case "":
|
||||||
factory = null;
|
factory = null;
|
||||||
break;
|
break;
|
||||||
case COMMITTER_NAME_DIRECTORY:
|
case COMMITTER_NAME_DIRECTORY:
|
||||||
|
@ -19,15 +19,24 @@
|
|||||||
package org.apache.hadoop.fs.s3a.commit;
|
package org.apache.hadoop.fs.s3a.commit;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PathIOException;
|
||||||
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
|
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
|
||||||
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
|
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
|
||||||
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
|
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
|
||||||
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
|
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
@ -35,20 +44,24 @@
|
|||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
|
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
|
||||||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
||||||
import org.apache.hadoop.test.LambdaTestUtils;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
|
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
|
||||||
|
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.COMMITTER_NAME_STAGING;
|
||||||
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests for some aspects of the committer factory.
|
* Tests for the committer factory creation/override process.
|
||||||
* All tests are grouped into one single test so that only one
|
|
||||||
* S3A FS client is set up and used for the entire run.
|
|
||||||
* Saves time and money.
|
|
||||||
*/
|
*/
|
||||||
public class ITestS3ACommitterFactory extends AbstractCommitITest {
|
@RunWith(Parameterized.class)
|
||||||
|
public final class ITestS3ACommitterFactory extends AbstractCommitITest {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
protected static final String INVALID_NAME = "invalid-name";
|
ITestS3ACommitterFactory.class);
|
||||||
|
/**
|
||||||
|
* Name for invalid committer: {@value}.
|
||||||
|
*/
|
||||||
|
private static final String INVALID_NAME = "invalid-name";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Counter to guarantee that even in parallel test runs, no job has the same
|
* Counter to guarantee that even in parallel test runs, no job has the same
|
||||||
@ -72,121 +85,156 @@ public class ITestS3ACommitterFactory extends AbstractCommitITest {
|
|||||||
* Parameterized list of bindings of committer name in config file to
|
* Parameterized list of bindings of committer name in config file to
|
||||||
* expected class instantiated.
|
* expected class instantiated.
|
||||||
*/
|
*/
|
||||||
private static final Object[][] bindings = {
|
private static final Object[][] BINDINGS = {
|
||||||
{COMMITTER_NAME_FILE, FileOutputCommitter.class},
|
{"", "", FileOutputCommitter.class, "Default Binding"},
|
||||||
{COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class},
|
{COMMITTER_NAME_FILE, "", FileOutputCommitter.class, "File committer in FS"},
|
||||||
{COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class},
|
{COMMITTER_NAME_PARTITIONED, "", PartitionedStagingCommitter.class,
|
||||||
{InternalCommitterConstants.COMMITTER_NAME_STAGING,
|
"partitoned committer in FS"},
|
||||||
StagingCommitter.class},
|
{COMMITTER_NAME_STAGING, "", StagingCommitter.class, "staging committer in FS"},
|
||||||
{COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class}
|
{COMMITTER_NAME_MAGIC, "", MagicS3GuardCommitter.class, "magic committer in FS"},
|
||||||
|
{COMMITTER_NAME_DIRECTORY, "", DirectoryStagingCommitter.class, "Dir committer in FS"},
|
||||||
|
{INVALID_NAME, "", null, "invalid committer in FS"},
|
||||||
|
|
||||||
|
{"", COMMITTER_NAME_FILE, FileOutputCommitter.class, "File committer in task"},
|
||||||
|
{"", COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class,
|
||||||
|
"partioned committer in task"},
|
||||||
|
{"", COMMITTER_NAME_STAGING, StagingCommitter.class, "staging committer in task"},
|
||||||
|
{"", COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class, "magic committer in task"},
|
||||||
|
{"", COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class, "Dir committer in task"},
|
||||||
|
{"", INVALID_NAME, null, "invalid committer in task"},
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a ref to the FS conf, so changes here are visible
|
* Test array for parameterized test runs.
|
||||||
* to callers querying the FS config.
|
*
|
||||||
|
* @return the committer binding for this run.
|
||||||
*/
|
*/
|
||||||
private Configuration filesystemConfRef;
|
@Parameterized.Parameters(name = "{3}-fs=[{0}]-task=[{1}]-[{2}]")
|
||||||
|
public static Collection<Object[]> params() {
|
||||||
|
return Arrays.asList(BINDINGS);
|
||||||
|
}
|
||||||
|
|
||||||
private Configuration taskConfRef;
|
/**
|
||||||
|
* Name of committer to set in filesystem config. If "" do not set one.
|
||||||
|
*/
|
||||||
|
private final String fsCommitterName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Name of committer to set in job config.
|
||||||
|
*/
|
||||||
|
private final String jobCommitterName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Expected committer class.
|
||||||
|
* If null: an exception is expected
|
||||||
|
*/
|
||||||
|
private final Class<? extends AbstractS3ACommitter> committerClass;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description from parameters, simply for thread names to be more informative.
|
||||||
|
*/
|
||||||
|
private final String description;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a parameterized instance.
|
||||||
|
* @param fsCommitterName committer to set in filesystem config
|
||||||
|
* @param jobCommitterName committer to set in job config
|
||||||
|
* @param committerClass expected committer class
|
||||||
|
* @param description debug text for thread names.
|
||||||
|
*/
|
||||||
|
public ITestS3ACommitterFactory(
|
||||||
|
final String fsCommitterName,
|
||||||
|
final String jobCommitterName,
|
||||||
|
final Class<? extends AbstractS3ACommitter> committerClass,
|
||||||
|
final String description) {
|
||||||
|
this.fsCommitterName = fsCommitterName;
|
||||||
|
this.jobCommitterName = jobCommitterName;
|
||||||
|
this.committerClass = committerClass;
|
||||||
|
this.description = description;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Configuration createConfiguration() {
|
||||||
|
final Configuration conf = super.createConfiguration();
|
||||||
|
// do not cache, because we want the committer one to pick up
|
||||||
|
// the fs with fs-specific configuration
|
||||||
|
conf.setBoolean(FS_S3A_IMPL_DISABLE_CACHE, false);
|
||||||
|
removeBaseAndBucketOverrides(conf, FS_S3A_COMMITTER_NAME);
|
||||||
|
maybeSetCommitterName(conf, fsCommitterName);
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set a committer name in a configuration.
|
||||||
|
* @param conf configuration to patch.
|
||||||
|
* @param name name. If "" the option is unset.
|
||||||
|
*/
|
||||||
|
private static void maybeSetCommitterName(final Configuration conf, final String name) {
|
||||||
|
if (!name.isEmpty()) {
|
||||||
|
conf.set(FS_S3A_COMMITTER_NAME, name);
|
||||||
|
} else {
|
||||||
|
conf.unset(FS_S3A_COMMITTER_NAME);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
|
// destroy all filesystems from previous runs.
|
||||||
|
FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
|
||||||
super.setup();
|
super.setup();
|
||||||
jobId = randomJobId();
|
jobId = randomJobId();
|
||||||
attempt0 = "attempt_" + jobId + "_m_000000_0";
|
attempt0 = "attempt_" + jobId + "_m_000000_0";
|
||||||
taskAttempt0 = TaskAttemptID.forName(attempt0);
|
taskAttempt0 = TaskAttemptID.forName(attempt0);
|
||||||
|
|
||||||
outDir = path(getMethodName());
|
outDir = methodPath();
|
||||||
factory = new S3ACommitterFactory();
|
factory = new S3ACommitterFactory();
|
||||||
Configuration conf = new Configuration();
|
final Configuration fsConf = getConfiguration();
|
||||||
conf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
|
JobConf jobConf = new JobConf(fsConf);
|
||||||
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
|
jobConf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
|
||||||
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
|
jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
|
||||||
filesystemConfRef = getFileSystem().getConf();
|
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
|
||||||
tContext = new TaskAttemptContextImpl(conf, taskAttempt0);
|
maybeSetCommitterName(jobConf, jobCommitterName);
|
||||||
taskConfRef = tContext.getConfiguration();
|
tContext = new TaskAttemptContextImpl(jobConf, taskAttempt0);
|
||||||
|
|
||||||
|
LOG.info("{}: Filesystem Committer='{}'; task='{}'",
|
||||||
|
description,
|
||||||
|
fsConf.get(FS_S3A_COMMITTER_NAME),
|
||||||
|
jobConf.get(FS_S3A_COMMITTER_NAME));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testEverything() throws Throwable {
|
@Override
|
||||||
testImplicitFileBinding();
|
protected void deleteTestDirInTeardown() {
|
||||||
testBindingsInTask();
|
// no-op
|
||||||
testBindingsInFSConfig();
|
|
||||||
testInvalidFileBinding();
|
|
||||||
testInvalidTaskBinding();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify that if all config options are unset, the FileOutputCommitter
|
* Verify that if all config options are unset, the FileOutputCommitter
|
||||||
*
|
|
||||||
* is returned.
|
* is returned.
|
||||||
*/
|
*/
|
||||||
public void testImplicitFileBinding() throws Throwable {
|
@Test
|
||||||
taskConfRef.unset(FS_S3A_COMMITTER_NAME);
|
public void testBinding() throws Throwable {
|
||||||
filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
|
assertFactoryCreatesExpectedCommitter(committerClass);
|
||||||
assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Verify that task bindings are picked up.
|
|
||||||
*/
|
|
||||||
public void testBindingsInTask() throws Throwable {
|
|
||||||
// set this to an invalid value to be confident it is not
|
|
||||||
// being checked.
|
|
||||||
filesystemConfRef.set(FS_S3A_COMMITTER_NAME, "INVALID");
|
|
||||||
taskConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
|
|
||||||
assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
|
|
||||||
for (Object[] binding : bindings) {
|
|
||||||
taskConfRef.set(FS_S3A_COMMITTER_NAME,
|
|
||||||
(String) binding[0]);
|
|
||||||
assertFactoryCreatesExpectedCommitter((Class) binding[1]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Verify that FS bindings are picked up.
|
|
||||||
*/
|
|
||||||
public void testBindingsInFSConfig() throws Throwable {
|
|
||||||
taskConfRef.unset(FS_S3A_COMMITTER_NAME);
|
|
||||||
filesystemConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
|
|
||||||
assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
|
|
||||||
for (Object[] binding : bindings) {
|
|
||||||
taskConfRef.set(FS_S3A_COMMITTER_NAME, (String) binding[0]);
|
|
||||||
assertFactoryCreatesExpectedCommitter((Class) binding[1]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create an invalid committer via the FS binding.
|
|
||||||
*/
|
|
||||||
public void testInvalidFileBinding() throws Throwable {
|
|
||||||
taskConfRef.unset(FS_S3A_COMMITTER_NAME);
|
|
||||||
filesystemConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
|
|
||||||
LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
|
|
||||||
() -> createCommitter());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create an invalid committer via the task attempt.
|
|
||||||
*/
|
|
||||||
public void testInvalidTaskBinding() throws Throwable {
|
|
||||||
filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
|
|
||||||
taskConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
|
|
||||||
LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
|
|
||||||
() -> createCommitter());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assert that the factory creates the expected committer.
|
* Assert that the factory creates the expected committer.
|
||||||
|
* If a null committer is passed in, a {@link PathIOException}
|
||||||
|
* is expected.
|
||||||
* @param expected expected committer class.
|
* @param expected expected committer class.
|
||||||
* @throws IOException IO failure.
|
* @throws Exception IO failure.
|
||||||
*/
|
*/
|
||||||
protected void assertFactoryCreatesExpectedCommitter(
|
private void assertFactoryCreatesExpectedCommitter(
|
||||||
final Class expected)
|
final Class expected)
|
||||||
throws IOException {
|
throws Exception {
|
||||||
assertEquals("Wrong Committer from factory",
|
describe("Creating committer: expected class \"%s\"", expected);
|
||||||
expected,
|
if (expected != null) {
|
||||||
createCommitter().getClass());
|
assertEquals("Wrong Committer from factory",
|
||||||
|
expected,
|
||||||
|
createCommitter().getClass());
|
||||||
|
} else {
|
||||||
|
intercept(PathCommitException.class, this::createCommitter);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -49,6 +49,8 @@ log4j.logger.org.apache.hadoop.mapred.ShuffleHandler=WARN
|
|||||||
log4j.logger.org.apache.hadoop.ipc.Server=WARN
|
log4j.logger.org.apache.hadoop.ipc.Server=WARN
|
||||||
#log4j.logger.=WARN
|
#log4j.logger.=WARN
|
||||||
|
|
||||||
|
# information about origin of committers
|
||||||
|
log4j.logger.org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory=DEBUG
|
||||||
|
|
||||||
# for debugging low level S3a operations, uncomment these lines
|
# for debugging low level S3a operations, uncomment these lines
|
||||||
# Log all S3A classes
|
# Log all S3A classes
|
||||||
|
Loading…
Reference in New Issue
Block a user