HADOOP-16058. S3A tests to include Terasort.

Contributed by Steve Loughran.

This includes
 - HADOOP-15890. Some S3A committer tests don't match ITest* pattern; don't run in maven
 - MAPREDUCE-7090. BigMapOutput example doesn't work with paths off cluster fs
 - MAPREDUCE-7091. Terasort on S3A to switch to new committers
 - MAPREDUCE-7092. MR examples to work better against cloud stores
This commit is contained in:
Steve Loughran 2019-03-21 11:15:37 +00:00
parent 60cdd4cac1
commit 9f1c017f44
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
17 changed files with 889 additions and 172 deletions

View File

@ -128,17 +128,20 @@ public int run(String[] args) throws Exception {
usage();
}
}
if (bigMapInput == null || outputPath == null) {
// report usage and exit
usage();
// this stops IDES warning about unset local variables.
return -1;
}
FileSystem fs = FileSystem.get(getConf());
JobConf jobConf = new JobConf(getConf(), BigMapOutput.class);
jobConf.setJobName("BigMapOutput");
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(jobConf, bigMapInput);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
outputPath.getFileSystem(jobConf).delete(outputPath, true);
FileOutputFormat.setOutputPath(jobConf, outputPath);
jobConf.setMapperClass(IdentityMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
@ -146,7 +149,10 @@ public int run(String[] args) throws Exception {
jobConf.setOutputValueClass(BytesWritable.class);
if (createInput) {
createBigMapInputFile(jobConf, fs, bigMapInput, fileSizeInMB);
createBigMapInputFile(jobConf,
bigMapInput.getFileSystem(jobConf),
bigMapInput,
fileSizeInMB);
}
Date startTime = new Date();

View File

@ -284,7 +284,7 @@ public int run(String[] args) throws Exception {
}
JobConf jobConf = setupJob(numMaps, numReduces, jarFile);
FileSystem fs = FileSystem.get(jobConf);
FileSystem fs = BASE_DIR.getFileSystem(jobConf);
Path inputFile = new Path(INPUT_DIR, "input_" + (new Random()).nextInt() + ".txt");
generateTextFile(fs, inputFile, inputLines, inputSortOrder);

View File

@ -30,10 +30,8 @@
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.slf4j.Logger;
@ -45,7 +43,6 @@
public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
private static final Logger LOG =
LoggerFactory.getLogger(TeraOutputFormat.class);
private OutputCommitter committer = null;
/**
* Set the requirement for a final sync before the stream is closed.
@ -145,12 +142,4 @@ public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job
return new TeraRecordWriter(fileOut, job);
}
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException {
if (committer == null) {
Path output = getOutputPath(context);
committer = new FileOutputCommitter(output, context);
}
return committer;
}
}

View File

@ -321,7 +321,7 @@ public int run(String[] args) throws Exception {
try {
TeraInputFormat.writePartitionFile(job, partitionFile);
} catch (Throwable e) {
LOG.error(e.getMessage());
LOG.error("{}", e.getMessage(), e);
return -1;
}
job.addCacheFile(partitionUri);

View File

@ -61,7 +61,7 @@ private void runTeraGen(Configuration conf, Path sortInput)
String[] genArgs = {NUM_ROWS, sortInput.toString()};
// Run TeraGen
assertEquals(ToolRunner.run(conf, new TeraGen(), genArgs), 0);
assertEquals(0, ToolRunner.run(conf, new TeraGen(), genArgs));
}
private void runTeraSort(Configuration conf,
@ -71,7 +71,7 @@ private void runTeraSort(Configuration conf,
String[] sortArgs = {sortInput.toString(), sortOutput.toString()};
// Run Sort
assertEquals(ToolRunner.run(conf, new TeraSort(), sortArgs), 0);
assertEquals(0, ToolRunner.run(conf, new TeraSort(), sortArgs));
}
private void runTeraValidator(Configuration job,
@ -80,7 +80,7 @@ private void runTeraValidator(Configuration job,
String[] svArgs = {sortOutput.toString(), valOutput.toString()};
// Run Tera-Validator
assertEquals(ToolRunner.run(job, new TeraValidate(), svArgs), 0);
assertEquals(0, ToolRunner.run(job, new TeraValidate(), svArgs));
}
@Test

View File

@ -186,6 +186,7 @@
<exclude>**/ITestS3AHuge*.java</exclude>
<!-- this sets out to overlaod DynamoDB, so must be run standalone -->
<exclude>**/ITestDynamoDBMetadataStoreScale.java</exclude>
<exclude>**/ITestTerasort*.java</exclude>
</excludes>
</configuration>
</execution>
@ -220,6 +221,9 @@
<include>**/ITestS3AEncryptionSSEC*.java</include>
<!-- this sets out to overlaod DynamoDB, so must be run standalone -->
<include>**/ITestDynamoDBMetadataStoreScale.java</include>
<!-- the terasort tests both work with a file in the same path in -->
<!-- the local FS. Running them sequentially guarantees isolation -->
<include>**/ITestTerasort*.java</include>
</includes>
</configuration>
</execution>

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.fs.s3a.commit;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@ -30,6 +32,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
@ -50,6 +53,7 @@
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
/**
@ -75,6 +79,7 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
private InconsistentAmazonS3Client inconsistentClient;
/**
* Should the inconsistent S3A client be used?
* Default value: true.
@ -436,4 +441,63 @@ public static TaskAttemptContext taskAttemptForJob(JobId jobId,
jContext.getConfiguration(),
TypeConverter.fromYarn(attemptID));
}
/**
* Load in the success data marker: this guarantees that an S3A
* committer was used.
* @param fs filesystem
* @param outputPath path of job
* @param committerName name of committer to match
* @return the success data
* @throws IOException IO failure
*/
public static SuccessData validateSuccessFile(final S3AFileSystem fs,
final Path outputPath, final String committerName) throws IOException {
SuccessData successData = null;
try {
successData = loadSuccessFile(fs, outputPath);
} catch (FileNotFoundException e) {
// either the output path is missing or, if its the success file,
// somehow the relevant committer wasn't picked up.
String dest = outputPath.toString();
LOG.error("No _SUCCESS file found under {}", dest);
List<String> files = new ArrayList<>();
applyLocatedFiles(fs.listFiles(outputPath, true),
(status) -> {
files.add(status.getPath().toString());
LOG.error("{} {}", status.getPath(), status.getLen());
});
throw new AssertionError("No _SUCCESS file in " + dest
+ "; found : " + files.stream().collect(Collectors.joining("\n")),
e);
}
String commitDetails = successData.toString();
LOG.info("Committer name " + committerName + "\n{}",
commitDetails);
LOG.info("Committer statistics: \n{}",
successData.dumpMetrics(" ", " = ", "\n"));
LOG.info("Diagnostics\n{}",
successData.dumpDiagnostics(" ", " = ", "\n"));
assertEquals("Wrong committer in " + commitDetails,
committerName, successData.getCommitter());
return successData;
}
/**
* Load a success file; fail if the file is empty/nonexistent.
* @param fs filesystem
* @param outputPath directory containing the success file.
* @return the loaded file.
* @throws IOException failure to find/load the file
* @throws AssertionError file is 0-bytes long
*/
public static SuccessData loadSuccessFile(final S3AFileSystem fs,
final Path outputPath) throws IOException {
Path success = new Path(outputPath, _SUCCESS);
FileStatus status = fs.getFileStatus(success);
assertTrue("0 byte success file - not a s3guard committer " + success,
status.getLen() > 0);
return SuccessData.load(fs, success);
}
}

View File

@ -30,22 +30,17 @@
import java.util.UUID;
import com.google.common.collect.Sets;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
@ -54,98 +49,36 @@
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.terminateService;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID;
/**
* Test for an MR Job with all the different committers.
*/
public abstract class AbstractITCommitMRJob extends AbstractYarnClusterITest {
/** Full integration test of an MR job. */
public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractITCommitMRJob.class);
private static final int TEST_FILE_COUNT = 2;
private static final int SCALE_TEST_FILE_COUNT = 20;
private static MiniDFSClusterService hdfs;
private static MiniMRYarnCluster yarn = null;
private static JobConf conf = null;
private boolean uniqueFilenames = false;
private boolean scaleTest;
protected static FileSystem getDFS() {
return hdfs.getClusterFS();
}
@BeforeClass
public static void setupClusters() throws IOException {
// the HDFS and YARN clusters share the same configuration, so
// the HDFS cluster binding is implicitly propagated to YARN
conf = new JobConf();
conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE);
hdfs = deployService(conf, new MiniDFSClusterService());
yarn = deployService(conf,
new MiniMRYarnCluster("ITCommitMRJob", 2));
}
@SuppressWarnings("ThrowableNotThrown")
@AfterClass
public static void teardownClusters() throws IOException {
conf = null;
yarn = terminateService(yarn);
hdfs = terminateService(hdfs);
}
public static MiniDFSCluster getHdfs() {
return hdfs.getCluster();
}
public static FileSystem getLocalFS() {
return hdfs.getLocalFS();
}
@Rule
public final TemporaryFolder temp = new TemporaryFolder();
/**
* The name of the committer as returned by
* {@link AbstractS3ACommitter#getName()} and used for committer construction.
*/
protected abstract String committerName();
@Override
public void setup() throws Exception {
super.setup();
scaleTest = getTestPropertyBool(
getConfiguration(),
KEY_SCALE_TESTS_ENABLED,
DEFAULT_SCALE_TESTS_ENABLED);
}
@Override
protected int getTestTimeoutMillis() {
return SCALE_TEST_TIMEOUT_SECONDS * 1000;
}
@Test
public void testMRJob() throws Exception {
describe("Run a simple MR Job");
S3AFileSystem fs = getFileSystem();
// final dest is in S3A
Path outputPath = path("testMRJob");
Path outputPath = path(getMethodName());
String commitUUID = UUID.randomUUID().toString();
String suffix = uniqueFilenames ? ("-" + commitUUID) : "";
String suffix = isUniqueFilenames() ? ("-" + commitUUID) : "";
int numFiles = getTestFileCount();
List<String> expectedFiles = new ArrayList<>(numFiles);
Set<String> expectedKeys = Sets.newHashSet();
for (int i = 0; i < numFiles; i += 1) {
File file = temp.newFile(String.valueOf(i) + ".text");
File file = temp.newFile(i + ".text");
try (FileOutputStream out = new FileOutputStream(file)) {
out.write(("file " + i).getBytes(StandardCharsets.UTF_8));
}
@ -156,17 +89,8 @@ public void testMRJob() throws Exception {
}
Collections.sort(expectedFiles);
Job mrJob = Job.getInstance(yarn.getConfig(), "test-committer-job");
Job mrJob = createJob();
JobConf jobConf = (JobConf) mrJob.getConfiguration();
jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
uniqueFilenames);
bindCommitter(jobConf,
CommitConstants.S3A_COMMITTER_FACTORY,
committerName());
// pass down the scale test flag
jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, scaleTest);
mrJob.setOutputFormatClass(LoggingTextOutputFormat.class);
FileOutputFormat.setOutputPath(mrJob, outputPath);
@ -200,7 +124,7 @@ public void testMRJob() throws Exception {
mrJob.setMaxMapAttempts(1);
mrJob.submit();
try (DurationInfo d = new DurationInfo(LOG, "Job Execution")) {
try (DurationInfo ignore = new DurationInfo(LOG, "Job Execution")) {
boolean succeeded = mrJob.waitForCompletion(true);
assertTrue("MR job failed", succeeded);
}
@ -219,24 +143,11 @@ public void testMRJob() throws Exception {
}
Collections.sort(actualFiles);
// load in the success data marker: this guarantees that a s3guard
// committer was used
Path success = new Path(outputPath, _SUCCESS);
FileStatus status = fs.getFileStatus(success);
assertTrue("0 byte success file - not a s3guard committer " + success,
status.getLen() > 0);
SuccessData successData = SuccessData.load(fs, success);
String commitDetails = successData.toString();
LOG.info("Committer name " + committerName() + "\n{}",
commitDetails);
LOG.info("Committer statistics: \n{}",
successData.dumpMetrics(" ", " = ", "\n"));
LOG.info("Diagnostics\n{}",
successData.dumpDiagnostics(" ", " = ", "\n"));
assertEquals("Wrong committer in " + commitDetails,
committerName(), successData.getCommitter());
SuccessData successData = validateSuccessFile(fs, outputPath,
committerName());
List<String> successFiles = successData.getFilenames();
assertTrue("No filenames in " + commitDetails,
String commitData = successData.toString();
assertTrue("No filenames in " + commitData,
!successFiles.isEmpty());
assertEquals("Should commit the expected files",
@ -245,41 +156,12 @@ public void testMRJob() throws Exception {
Set<String> summaryKeys = Sets.newHashSet();
summaryKeys.addAll(successFiles);
assertEquals("Summary keyset doesn't list the the expected paths "
+ commitDetails, expectedKeys, summaryKeys);
+ commitData, expectedKeys, summaryKeys);
assertPathDoesNotExist("temporary dir",
new Path(outputPath, CommitConstants.TEMPORARY));
customPostExecutionValidation(outputPath, successData);
}
/**
* Get the file count for the test.
* @return the number of mappers to create.
*/
public int getTestFileCount() {
return scaleTest ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT;
}
/**
* Override point to let implementations tune the MR Job conf.
* @param jobConf configuration
*/
protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
}
/**
* Override point for any committer specific validation operations;
* called after the base assertions have all passed.
* @param destPath destination of work
* @param successData loaded success data
* @throws Exception failure
*/
protected void customPostExecutionValidation(Path destPath,
SuccessData successData)
throws Exception {
}
/**
* Test Mapper.
* This is executed in separate process, and must not make any assumptions
@ -301,7 +183,7 @@ protected void setup(Context context)
org.apache.log4j.BasicConfigurator.configure();
boolean scaleMap = context.getConfiguration()
.getBoolean(KEY_SCALE_TESTS_ENABLED, false);
operations = scaleMap ? 1000 : 10;
operations = scaleMap ? SCALE_TEST_KEYS : BASE_TEST_KEYS;
id = context.getTaskAttemptID().toString();
}

View File

@ -0,0 +1,256 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit;
import java.io.IOException;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.deployService;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.terminateService;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES;
/**
* Full integration test MR jobs.
*
* This is all done on shared static mini YARN and HDFS clusters, set up before
* any of the tests methods run.
*
* To isolate tests properly for parallel test runs, that static state
* needs to be stored in the final classes implementing the tests, and
* exposed to the base class, with the setup clusters in the
* specific test suites creating the clusters with unique names.
*
* This is "hard" to do in Java, unlike, say, Scala.
*
* Note: this turns out not to be the root cause of ordering problems
* with the Terasort tests (that is hard coded use of a file in the local FS),
* but this design here does make it clear that the before and after class
* operations are explicitly called in the subclasses.
* If two subclasses of this class are instantiated in the same JVM, in order,
* they are guaranteed to be isolated.
*
* History: this is a superclass extracted from
* {@link AbstractITCommitMRJob} while adding support for testing terasorting.
*
*/
public abstract class AbstractYarnClusterITest extends AbstractCommitITest {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractYarnClusterITest.class);
private static final int TEST_FILE_COUNT = 2;
private static final int SCALE_TEST_FILE_COUNT = 20;
public static final int SCALE_TEST_KEYS = 1000;
public static final int BASE_TEST_KEYS = 10;
private boolean scaleTest;
private boolean uniqueFilenames = false;
/**
* This is the cluster binding which every subclass must create.
*/
protected static final class ClusterBinding {
private final MiniDFSClusterService hdfs;
private final MiniMRYarnCluster yarn;
public ClusterBinding(
final MiniDFSClusterService hdfs,
final MiniMRYarnCluster yarn) {
this.hdfs = checkNotNull(hdfs);
this.yarn = checkNotNull(yarn);
}
public MiniDFSClusterService getHdfs() {
return hdfs;
}
public MiniMRYarnCluster getYarn() {
return yarn;
}
public Configuration getConf() {
return getYarn().getConfig();
}
public void terminate() {
terminateService(getYarn());
terminateService(getHdfs());
}
}
/**
* Create the cluster binding. This must be done in
* class setup of the (final) subclass.
* The HDFS and YARN clusters share the same configuration, so
* the HDFS cluster binding is implicitly propagated to YARN.
* @param conf configuration to start with.
* @return the cluster binding.
* @throws IOException failure.
*/
protected static ClusterBinding createCluster(JobConf conf)
throws IOException {
conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE);
// create a unique cluster name.
String clusterName = "yarn-" + UUID.randomUUID();
MiniDFSClusterService miniDFSClusterService = deployService(conf,
new MiniDFSClusterService());
MiniMRYarnCluster yarnCluster = deployService(conf,
new MiniMRYarnCluster(clusterName, 2));
return new ClusterBinding(miniDFSClusterService, yarnCluster);
}
/**
* Get the cluster binding for this subclass
* @return
*/
protected abstract ClusterBinding getClusterBinding();
protected MiniDFSClusterService getHdfs() {
return getClusterBinding().getHdfs();
}
protected MiniMRYarnCluster getYarn() {
return getClusterBinding().getYarn();
}
public FileSystem getLocalFS() {
return getHdfs().getLocalFS();
}
protected FileSystem getDFS() {
return getHdfs().getClusterFS();
}
/**
* The name of the committer as returned by
* {@link AbstractS3ACommitter#getName()} and used for committer construction.
*/
protected abstract String committerName();
@Override
public void setup() throws Exception {
super.setup();
assertNotNull("cluster is not bound",
getClusterBinding());
scaleTest = getTestPropertyBool(
getConfiguration(),
KEY_SCALE_TESTS_ENABLED,
DEFAULT_SCALE_TESTS_ENABLED);
}
@Override
protected int getTestTimeoutMillis() {
return SCALE_TEST_TIMEOUT_SECONDS * 1000;
}
protected JobConf newJobConf() {
return new JobConf(getYarn().getConfig());
}
protected Job createJob() throws IOException {
Job mrJob = Job.getInstance(getClusterBinding().getConf(),
getMethodName());
patchConfigurationForCommitter(mrJob.getConfiguration());
return mrJob;
}
protected Configuration patchConfigurationForCommitter(
final Configuration jobConf) {
jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
uniqueFilenames);
bindCommitter(jobConf,
CommitConstants.S3A_COMMITTER_FACTORY,
committerName());
// pass down the scale test flag
jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, scaleTest);
return jobConf;
}
/**
* Get the file count for the test.
* @return the number of mappers to create.
*/
public int getTestFileCount() {
return scaleTest ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT;
}
/**
* Override point to let implementations tune the MR Job conf.
* @param jobConf configuration
*/
protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
}
/**
* Override point for any committer specific validation operations;
* called after the base assertions have all passed.
* @param destPath destination of work
* @param successData loaded success data
* @throws Exception failure
*/
protected void customPostExecutionValidation(Path destPath,
SuccessData successData)
throws Exception {
}
/**
* Assume that scale tests are enabled.
*/
protected void requireScaleTestsEnabled() {
assume("Scale test disabled: to enable set property " +
KEY_SCALE_TESTS_ENABLED,
isScaleTest());
}
public boolean isScaleTest() {
return scaleTest;
}
public boolean isUniqueFilenames() {
return uniqueFilenames;
}
}

View File

@ -18,6 +18,11 @@
package org.apache.hadoop.fs.s3a.commit.magic;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
@ -33,7 +38,30 @@
* the settings in {@link AbstractITCommitMRJob#applyCustomConfigOptions(JobConf)} are
* passed down to these processes.
*/
public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
public final class ITestMagicCommitMRJob extends AbstractITCommitMRJob {
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
@BeforeClass
public static void setupClusters() throws IOException {
clusterBinding = createCluster(new JobConf());
}
@AfterClass
public static void teardownClusters() throws IOException {
clusterBinding.terminate();
}
@Override
public ClusterBinding getClusterBinding() {
return clusterBinding;
}
/**
* Need consistency here.

View File

@ -18,13 +18,41 @@
package org.apache.hadoop.fs.s3a.commit.staging.integration;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
import org.apache.hadoop.mapred.JobConf;
/**
* Full integration test for the directory committer.
*/
public class ITDirectoryCommitMRJob extends AbstractITCommitMRJob {
public final class ITestDirectoryCommitMRJob extends AbstractITCommitMRJob {
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
@BeforeClass
public static void setupClusters() throws IOException {
clusterBinding = createCluster(new JobConf()); }
@AfterClass
public static void teardownClusters() throws IOException {
clusterBinding.terminate();
}
@Override
public ClusterBinding getClusterBinding() {
return clusterBinding;
}
@Override
protected String committerName() {

View File

@ -18,13 +18,42 @@
package org.apache.hadoop.fs.s3a.commit.staging.integration;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
import org.apache.hadoop.mapred.JobConf;
/**
* Full integration test for the partition committer.
*/
public class ITPartitionCommitMRJob extends AbstractITCommitMRJob {
public final class ITestPartitionCommitMRJob extends AbstractITCommitMRJob {
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
@BeforeClass
public static void setupClusters() throws IOException {
clusterBinding = createCluster(new JobConf());
}
@AfterClass
public static void teardownClusters() throws IOException {
clusterBinding.terminate();
}
@Override
public ClusterBinding getClusterBinding() {
return clusterBinding;
}
@Override
protected String committerName() {

View File

@ -18,25 +18,53 @@
package org.apache.hadoop.fs.s3a.commit.staging.integration;
import org.junit.Test;
import java.io.IOException;
import org.hamcrest.core.StringContains;
import org.hamcrest.core.StringEndsWith;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory;
import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS;
/**
* Full integration test for the staging committer.
*/
public class ITStagingCommitMRJob extends AbstractITCommitMRJob {
public final class ITestStagingCommitMRJob extends AbstractITCommitMRJob {
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
@BeforeClass
public static void setupClusters() throws IOException {
clusterBinding = createCluster(new JobConf());
}
@AfterClass
public static void teardownClusters() throws IOException {
clusterBinding.terminate();
}
@Override
public ClusterBinding getClusterBinding() {
return clusterBinding;
}
@Override
protected String committerName() {
@ -51,12 +79,12 @@ protected String committerName() {
public void testStagingDirectory() throws Throwable {
FileSystem hdfs = getDFS();
Configuration conf = hdfs.getConf();
conf.set(CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH,
"private");
conf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, "private");
Path dir = getMultipartUploadCommitsDirectory(conf, "UUID");
assertThat(dir.toString(), StringEndsWith.endsWith(
"UUID/"
+ StagingCommitterConstants.STAGING_UPLOADS));
assertThat("Directory " + dir + " path is wrong",
dir.toString(),
StringEndsWith.endsWith("UUID/"
+ STAGING_UPLOADS));
assertTrue("path unqualified", dir.isAbsolute());
String self = UserGroupInformation.getCurrentUser().getShortUserName();
assertThat(dir.toString(),

View File

@ -20,6 +20,9 @@
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
@ -33,7 +36,30 @@
* This is a test to verify that the committer will fail if the destination
* directory exists, and that this happens in job setup.
*/
public class ITStagingCommitMRJobBadDest extends AbstractITCommitMRJob {
public final class ITestStagingCommitMRJobBadDest extends AbstractITCommitMRJob {
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
@BeforeClass
public static void setupClusters() throws IOException {
clusterBinding = createCluster(new JobConf());
}
@AfterClass
public static void teardownClusters() throws IOException {
clusterBinding.terminate();
}
@Override
public ClusterBinding getClusterBinding() {
return clusterBinding;
}
@Override
protected String committerName() {
@ -59,4 +85,5 @@ public void testMRJob() throws Exception {
"Output directory",
super::testMRJob);
}
}

View File

@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.terasort;
import java.util.Optional;
import java.util.function.BiConsumer;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.terasort.TeraGen;
import org.apache.hadoop.examples.terasort.TeraSort;
import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
import org.apache.hadoop.examples.terasort.TeraValidate;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import static java.util.Optional.empty;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE;
/**
* Runs Terasort against S3A.
*
* This is all done on a shared mini YARN and HDFS clusters, set up before
* any of the tests methods run.
*
* The tests run in sequence, so each operation is isolated.
* This also means that the test paths deleted in test
* teardown; shared variables must all be static.
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@SuppressWarnings("StaticNonFinalField")
public abstract class AbstractCommitTerasortIT extends
AbstractYarnClusterITest {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractCommitTerasortIT.class);
// all the durations are optional as they only get filled in when
// a test run successfully completes. Failed tests don't have numbers.
private static Optional<DurationInfo> terasortDuration = empty();
private static Optional<DurationInfo> teragenStageDuration = empty();
private static Optional<DurationInfo> terasortStageDuration = empty();
private static Optional<DurationInfo> teravalidateStageDuration = empty();
private Path terasortPath;
private Path sortInput;
private Path sortOutput;
private Path sortValidate;
/**
* Not using special paths here.
* @return false
*/
@Override
public boolean useInconsistentClient() {
return false;
}
@Override
public void setup() throws Exception {
super.setup();
requireScaleTestsEnabled();
prepareToTerasort();
}
/**
* Set up for terasorting by initializing paths.
* The paths used must be unique across parallel runs.
*/
private void prepareToTerasort() {
// small sample size for faster runs
Configuration yarnConfig = getYarn().getConfig();
yarnConfig.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000);
yarnConfig.setBoolean(
TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
true);
terasortPath = new Path("/terasort-" + getClass().getSimpleName())
.makeQualified(getFileSystem());
sortInput = new Path(terasortPath, "sortin");
sortOutput = new Path(terasortPath, "sortout");
sortValidate = new Path(terasortPath, "validate");
if (!terasortDuration.isPresent()) {
terasortDuration = Optional.of(new DurationInfo(LOG, "Terasort"));
}
}
/**
* Execute a single stage in the terasort,
* @param stage Stage name for messages/assertions.
* @param jobConf job conf
* @param dest destination directory -the _SUCCESS File will be expected here.
* @param tool tool to run.
* @param args args for the tool.
* @throws Exception any failure
*/
private Optional<DurationInfo> executeStage(
final String stage,
final JobConf jobConf,
final Path dest,
final Tool tool,
final String[] args) throws Exception {
int result;
DurationInfo d = new DurationInfo(LOG, stage);
try {
result = ToolRunner.run(jobConf, tool, args);
} finally {
d.close();
}
assertEquals(stage
+ "(" + StringUtils.join(", ", args) + ")"
+ " failed", 0, result);
validateSuccessFile(getFileSystem(), dest, committerName());
return Optional.of(d);
}
/**
* Set up terasort by cleaning out the destination, and note the initial
* time before any of the jobs are executed.
*/
@Test
public void test_100_terasort_setup() throws Throwable {
describe("Setting up for a terasort");
getFileSystem().delete(terasortPath, true);
}
@Test
public void test_110_teragen() throws Throwable {
describe("Teragen to %s", sortInput);
JobConf jobConf = newJobConf();
patchConfigurationForCommitter(jobConf);
teragenStageDuration = executeStage("Teragen",
jobConf,
sortInput,
new TeraGen(),
new String[]{Integer.toString(SCALE_TEST_KEYS), sortInput.toString()});
}
@Test
public void test_120_terasort() throws Throwable {
describe("Terasort from %s to %s", sortInput, sortOutput);
loadSuccessFile(getFileSystem(), sortInput);
JobConf jobConf = newJobConf();
patchConfigurationForCommitter(jobConf);
// this job adds some data, so skip it.
jobConf.set(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
terasortStageDuration = executeStage("TeraSort",
jobConf,
sortOutput,
new TeraSort(),
new String[]{sortInput.toString(), sortOutput.toString()});
}
@Test
public void test_130_teravalidate() throws Throwable {
describe("TeraValidate from %s to %s", sortOutput, sortValidate);
loadSuccessFile(getFileSystem(), sortOutput);
JobConf jobConf = newJobConf();
patchConfigurationForCommitter(jobConf);
teravalidateStageDuration = executeStage("TeraValidate",
jobConf,
sortValidate,
new TeraValidate(),
new String[]{sortOutput.toString(), sortValidate.toString()});
}
/**
* Print the results, and save to the base dir as a CSV file.
* Why there? Makes it easy to list and compare.
*/
@Test
public void test_140_teracomplete() throws Throwable {
terasortDuration.get().close();
final StringBuilder results = new StringBuilder();
results.append("\"Operation\"\t\"Duration\"\n");
// this is how you dynamically create a function in a method
// for use afterwards.
// Works because there's no IOEs being raised in this sequence.
BiConsumer<String, Optional<DurationInfo>> stage =
(s, od) ->
results.append(String.format("\"%s\"\t\"%s\"\n",
s,
od.map(DurationInfo::getDurationString).orElse("")));
stage.accept("Generate", teragenStageDuration);
stage.accept("Terasort", terasortStageDuration);
stage.accept("Validate", teravalidateStageDuration);
stage.accept("Completed", terasortDuration);
String text = results.toString();
Path path = new Path(terasortPath, "results.csv");
LOG.info("Results are in {}\n{}", path, text);
ContractTestUtils.writeTextFile(getFileSystem(), path, text, true);
}
/**
* Reset the duration so if two committer tests are run sequentially.
* Without this the total execution time is reported as from the start of
* the first test suite to the end of the second.
*/
@Test
public void test_150_teracleanup() throws Throwable {
terasortDuration = Optional.empty();
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.terasort;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
import org.apache.hadoop.mapred.JobConf;
/**
* Terasort with the directory committer.
*/
public final class ITestTerasortDirectoryCommitter extends AbstractCommitTerasortIT {
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
@BeforeClass
public static void setupClusters() throws IOException {
clusterBinding = createCluster(new JobConf());
}
@AfterClass
public static void teardownClusters() throws IOException {
clusterBinding.terminate();
}
@Override
public ClusterBinding getClusterBinding() {
return clusterBinding;
}
@Override
protected String committerName() {
return DirectoryStagingCommitter.NAME;
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.terasort;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
import org.apache.hadoop.mapred.JobConf;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
/**
* Terasort with the magic committer.
*/
public final class ITestTerasortMagicCommitter
extends AbstractCommitTerasortIT {
/**
* The static cluster binding with the lifecycle of this test; served
* through instance-level methods for sharing across methods in the
* suite.
*/
@SuppressWarnings("StaticNonFinalField")
private static ClusterBinding clusterBinding;
@BeforeClass
public static void setupClusters() throws IOException {
clusterBinding = createCluster(new JobConf());
}
@AfterClass
public static void teardownClusters() throws IOException {
clusterBinding.terminate();
}
@Override
public ClusterBinding getClusterBinding() {
return clusterBinding;
}
@Override
protected String committerName() {
return MagicS3GuardCommitter.NAME;
}
/**
* Turn on the magic commit support for the FS, else nothing will work.
* @param conf configuration
*/
@Override
protected void applyCustomConfigOptions(JobConf conf) {
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
}
}