MAPREDUCE-6956 FileOutputCommitter to gain abstract superclass PathOutputCommitter.

Contributed by Steve Loughran
This commit is contained in:
Steve Loughran 2017-09-15 16:59:04 +01:00
parent 78bdf10ae4
commit 11390c2d11
No known key found for this signature in database
GPG Key ID: 950CC3E032B79CA2
6 changed files with 553 additions and 20 deletions

View File

@ -39,13 +39,14 @@
import org.apache.hadoop.mapreduce.TaskAttemptID;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/** An {@link OutputCommitter} that commits files specified
* in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
**/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FileOutputCommitter extends OutputCommitter {
public class FileOutputCommitter extends PathOutputCommitter {
private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
/**
@ -101,8 +102,11 @@ public class FileOutputCommitter extends OutputCommitter {
public FileOutputCommitter(Path outputPath,
TaskAttemptContext context) throws IOException {
this(outputPath, (JobContext)context);
if (outputPath != null) {
workPath = getTaskAttemptPath(context, outputPath);
if (getOutputPath() != null) {
workPath = Preconditions.checkNotNull(
getTaskAttemptPath(context, getOutputPath()),
"Null task attempt path in %s and output path %s",
context, outputPath);
}
}
@ -116,6 +120,7 @@ public FileOutputCommitter(Path outputPath,
@Private
public FileOutputCommitter(Path outputPath,
JobContext context) throws IOException {
super(outputPath, context);
Configuration conf = context.getConfiguration();
algorithmVersion =
conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
@ -705,4 +710,18 @@ public void recoverTask(TaskAttemptContext context)
LOG.warn("Output Path is null in recoverTask()");
}
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"FileOutputCommitter{");
sb.append(super.toString()).append("; ");
sb.append("outputPath=").append(outputPath);
sb.append(", workPath=").append(workPath);
sb.append(", algorithmVersion=").append(algorithmVersion);
sb.append(", skipCleanup=").append(skipCleanup);
sb.append(", ignoreCleanupFailures=").append(ignoreCleanupFailures);
sb.append('}');
return sb.toString();
}
}

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import java.text.NumberFormat;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -38,11 +39,15 @@
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
private static final Logger LOG =
LoggerFactory.getLogger(FileOutputFormat.class);
/** Construct output file names so that, when an output directory listing is
* sorted lexicographically, positions correspond to output partitions.*/
@ -53,12 +58,25 @@ public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
NUMBER_FORMAT.setMinimumIntegerDigits(5);
NUMBER_FORMAT.setGroupingUsed(false);
}
private FileOutputCommitter committer = null;
public static final String COMPRESS ="mapreduce.output.fileoutputformat.compress";
public static final String COMPRESS_CODEC =
"mapreduce.output.fileoutputformat.compress.codec";
public static final String COMPRESS_TYPE = "mapreduce.output.fileoutputformat.compress.type";
public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir";
private PathOutputCommitter committer = null;
/** Configuration option: should output be compressed? {@value}. */
public static final String COMPRESS =
"mapreduce.output.fileoutputformat.compress";
/** If compression is enabled, name of codec: {@value}. */
public static final String COMPRESS_CODEC =
"mapreduce.output.fileoutputformat.compress.codec";
/**
* Type of compression {@value}: NONE, RECORD, BLOCK.
* Generally only used in {@code SequenceFileOutputFormat}.
*/
public static final String COMPRESS_TYPE =
"mapreduce.output.fileoutputformat.compress.type";
/** Destination directory of work: {@value}. */
public static final String OUTDIR =
"mapreduce.output.fileoutputformat.outputdir";
@Deprecated
public enum Counter {
@ -110,14 +128,14 @@ public static boolean getCompressOutput(JobContext job) {
*/
public static Class<? extends CompressionCodec>
getOutputCompressorClass(JobContext job,
Class<? extends CompressionCodec> defaultValue) {
Class<? extends CompressionCodec> defaultValue) {
Class<? extends CompressionCodec> codecClass = defaultValue;
Configuration conf = job.getConfiguration();
String name = conf.get(FileOutputFormat.COMPRESS_CODEC);
if (name != null) {
try {
codecClass =
conf.getClassByName(name).asSubclass(CompressionCodec.class);
codecClass =
conf.getClassByName(name).asSubclass(CompressionCodec.class);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Compression codec " + name +
" was not found.", e);
@ -219,9 +237,11 @@ public static Path getOutputPath(JobContext job) {
public static Path getWorkOutputPath(TaskInputOutputContext<?,?,?,?> context
) throws IOException,
InterruptedException {
FileOutputCommitter committer = (FileOutputCommitter)
PathOutputCommitter committer = (PathOutputCommitter)
context.getOutputCommitter();
return committer.getWorkPath();
Path workPath = committer.getWorkPath();
LOG.debug("Work path is {}", workPath);
return workPath;
}
/**
@ -281,10 +301,17 @@ public synchronized static String getUniqueFile(TaskAttemptContext context,
*/
public Path getDefaultWorkFile(TaskAttemptContext context,
String extension) throws IOException{
FileOutputCommitter committer =
(FileOutputCommitter) getOutputCommitter(context);
return new Path(committer.getWorkPath(), getUniqueFile(context,
getOutputName(context), extension));
OutputCommitter c = getOutputCommitter(context);
Preconditions.checkState(c instanceof PathOutputCommitter,
"Committer %s is not a PathOutputCommitter", c);
Path workPath = ((PathOutputCommitter) c).getWorkPath();
Preconditions.checkNotNull(workPath,
"Null workPath returned by committer %s", c);
Path workFile = new Path(workPath,
getUniqueFile(context, getOutputName(context), extension));
LOG.debug("Work file for {} extension '{}' is {}",
context, extension, workFile);
return workFile;
}
/**

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output;
import java.io.IOException;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* A committer which somehow commits data written to a working directory
* to the final directory during the commit process. The reference
* implementation of this is the {@link FileOutputCommitter}.
*
* There are two constructors, both of which do nothing but long and
* validate their arguments.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class PathOutputCommitter extends OutputCommitter {
private static final Logger LOG =
LoggerFactory.getLogger(PathOutputCommitter.class);
private final JobContext context;
/**
* Constructor for a task attempt.
* Subclasses should provide a public constructor with this signature.
* @param outputPath output path: may be null
* @param context task context
* @throws IOException IO problem
*/
protected PathOutputCommitter(Path outputPath,
TaskAttemptContext context) throws IOException {
this.context = Preconditions.checkNotNull(context, "Null context");
LOG.debug("Creating committer with output path {} and task context"
+ " {}", outputPath, context);
}
/**
* Constructor for a job attempt.
* Subclasses should provide a public constructor with this signature.
* @param outputPath output path: may be null
* @param context task context
* @throws IOException IO problem
*/
protected PathOutputCommitter(Path outputPath,
JobContext context) throws IOException {
this.context = Preconditions.checkNotNull(context, "Null context");
LOG.debug("Creating committer with output path {} and job context"
+ " {}", outputPath, context);
}
/**
* Get the directory that the task should write results into.
* Warning: there's no guarantee that this work path is on the same
* FS as the final output, or that it's visible across machines.
* @return the work directory
* @throws IOException IO problem
*/
public abstract Path getWorkPath() throws IOException;
@Override
public String toString() {
return "PathOutputCommitter{context=" + context + '}';
}
}

View File

@ -451,5 +451,13 @@ public String getUser() {
public Credentials getCredentials() {
return credentials;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"JobContextImpl{");
sb.append("jobId=").append(jobId);
sb.append('}');
return sb.toString();
}
}

View File

@ -118,4 +118,15 @@ public float getProgress() {
public float getProgress() {
return reporter.getProgress();
}
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"TaskAttemptContextImpl{");
sb.append(super.toString());
sb.append("; taskId=").append(taskId);
sb.append(", status='").append(status).append('\'');
sb.append('}');
return sb.toString();
}
}

View File

@ -0,0 +1,377 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output;
import java.io.IOException;
import java.net.URI;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.security.Credentials;
/**
* Test the path output committer binding to FileOutputFormat.
*/
public class TestPathOutputCommitter extends Assert {
@Test
public void testFileOutputCommitterOverrride() throws Throwable {
TaskContext context = new TaskContext();
Path workPath = new Path("file:///work");
context.setOutputCommitter(
new SimpleCommitter(new Path("/"), context, workPath));
assertEquals(workPath, FileOutputFormat.getWorkOutputPath(context));
}
@Test
public void testFileOutputCommitterNullWorkPath() throws Throwable {
TaskContext context = new TaskContext();
context.setOutputCommitter(
new SimpleCommitter(new Path("/"), context, null));
assertNull(FileOutputFormat.getWorkOutputPath(context));
}
private static class SimpleCommitter extends PathOutputCommitter {
private final Path workPath;
SimpleCommitter(Path outputPath,
TaskAttemptContext context, Path workPath) throws IOException {
super(outputPath, context);
this.workPath = workPath;
}
SimpleCommitter(Path outputPath,
JobContext context, Path workPath) throws IOException {
super(outputPath, context);
this.workPath = workPath;
}
@Override
public Path getWorkPath() throws IOException {
return workPath;
}
@Override
public void setupJob(JobContext jobContext) throws IOException {
}
@Override
public void setupTask(TaskAttemptContext taskContext) throws IOException {
}
@Override
public boolean needsTaskCommit(TaskAttemptContext taskContext)
throws IOException {
return false;
}
@Override
public void commitTask(TaskAttemptContext taskContext) throws IOException {
}
@Override
public void abortTask(TaskAttemptContext taskContext) throws IOException {
}
}
/**
* Stub task context.
*/
public class TaskContext
implements TaskInputOutputContext<String, String, String, String> {
private OutputCommitter outputCommitter;
public void setOutputCommitter(OutputCommitter outputCommitter) {
this.outputCommitter = outputCommitter;
}
@Override
public OutputCommitter getOutputCommitter() {
return outputCommitter;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return false;
}
@Override
public String getCurrentKey() throws IOException, InterruptedException {
return null;
}
@Override
public String getCurrentValue() throws IOException, InterruptedException {
return null;
}
@Override
public void write(String key, String value)
throws IOException, InterruptedException {
}
@Override
public TaskAttemptID getTaskAttemptID() {
return null;
}
@Override
public void setStatus(String msg) {
}
@Override
public String getStatus() {
return null;
}
@Override
public float getProgress() {
return 0;
}
@Override
public Counter getCounter(Enum<?> counterName) {
return null;
}
@Override
public Counter getCounter(String groupName, String counterName) {
return null;
}
@Override
public Configuration getConfiguration() {
return null;
}
@Override
public Credentials getCredentials() {
return null;
}
@Override
public JobID getJobID() {
return null;
}
@Override
public int getNumReduceTasks() {
return 0;
}
@Override
public Path getWorkingDirectory() throws IOException {
return null;
}
@Override
public Class<?> getOutputKeyClass() {
return null;
}
@Override
public Class<?> getOutputValueClass() {
return null;
}
@Override
public Class<?> getMapOutputKeyClass() {
return null;
}
@Override
public Class<?> getMapOutputValueClass() {
return null;
}
@Override
public String getJobName() {
return null;
}
@Override
public Class<? extends InputFormat<?, ?>> getInputFormatClass()
throws ClassNotFoundException {
return null;
}
@Override
public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass()
throws ClassNotFoundException {
return null;
}
@Override
public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass()
throws ClassNotFoundException {
return null;
}
@Override
public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass()
throws ClassNotFoundException {
return null;
}
@Override
public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
throws ClassNotFoundException {
return null;
}
@Override
public Class<? extends Partitioner<?, ?>> getPartitionerClass()
throws ClassNotFoundException {
return null;
}
@Override
public RawComparator<?> getSortComparator() {
return null;
}
@Override
public String getJar() {
return null;
}
@Override
public RawComparator<?> getCombinerKeyGroupingComparator() {
return null;
}
@Override
public RawComparator<?> getGroupingComparator() {
return null;
}
@Override
public boolean getJobSetupCleanupNeeded() {
return false;
}
@Override
public boolean getTaskCleanupNeeded() {
return false;
}
@Override
public boolean getProfileEnabled() {
return false;
}
@Override
public String getProfileParams() {
return null;
}
@Override
public Configuration.IntegerRanges getProfileTaskRange(boolean isMap) {
return null;
}
@Override
public String getUser() {
return null;
}
@Override
public boolean getSymlink() {
return false;
}
@Override
public Path[] getArchiveClassPaths() {
return new Path[0];
}
@Override
public URI[] getCacheArchives() throws IOException {
return new URI[0];
}
@Override
public URI[] getCacheFiles() throws IOException {
return new URI[0];
}
@Override
public Path[] getLocalCacheArchives() throws IOException {
return new Path[0];
}
@Override
public Path[] getLocalCacheFiles() throws IOException {
return new Path[0];
}
@Override
public Path[] getFileClassPaths() {
return new Path[0];
}
@Override
public String[] getArchiveTimestamps() {
return new String[0];
}
@Override
public String[] getFileTimestamps() {
return new String[0];
}
@Override
public int getMaxMapAttempts() {
return 0;
}
@Override
public int getMaxReduceAttempts() {
return 0;
}
@Override
public void progress() {
}
}
}