MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1422345 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2012-12-15 20:23:08 +00:00
parent 803e5155d1
commit 8329fae686
8 changed files with 609 additions and 65 deletions

View File

@ -14,6 +14,8 @@ Trunk (Unreleased)
MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins.
(Avner BenHanoch via acmurthy)
MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu)
IMPROVEMENTS
MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
public class IndexRecord {
public long startOffset;
public long rawLength;
public long partLength;
public IndexRecord() { }
public IndexRecord(long startOffset, long rawLength, long partLength) {
this.startOffset = startOffset;
this.rawLength = rawLength;
this.partLength = partLength;
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.Task.TaskReporter;
@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
public interface MapOutputCollector<K, V> {
public void init(Context context
) throws IOException, ClassNotFoundException;
public void collect(K key, V value, int partition
) throws IOException, InterruptedException;
public void close() throws IOException, InterruptedException;
public void flush() throws IOException, InterruptedException,
ClassNotFoundException;
@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
public static class Context {
private final MapTask mapTask;
private final JobConf jobConf;
private final TaskReporter reporter;
public Context(MapTask mapTask, JobConf jobConf, TaskReporter reporter) {
this.mapTask = mapTask;
this.jobConf = jobConf;
this.reporter = reporter;
}
public MapTask getMapTask() {
return mapTask;
}
public JobConf getJobConf() {
return jobConf;
}
public TaskReporter getReporter() {
return reporter;
}
}
}

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
@ -342,6 +343,10 @@ public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
done(umbilical, reporter);
}
public Progress getSortPhase() {
return sortPhase;
}
@SuppressWarnings("unchecked")
private <T> T getSplitDetails(Path file, long offset)
throws IOException {
@ -370,6 +375,22 @@ private <T> T getSplitDetails(Path file, long offset)
return split;
}
@SuppressWarnings("unchecked")
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
createSortingCollector(JobConf job, TaskReporter reporter)
throws IOException, ClassNotFoundException {
MapOutputCollector<KEY, VALUE> collector
= (MapOutputCollector<KEY, VALUE>)
ReflectionUtils.newInstance(
job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
MapOutputBuffer.class, MapOutputCollector.class), job);
LOG.info("Map output collector class = " + collector.getClass().getName());
MapOutputCollector.Context context =
new MapOutputCollector.Context(this, job, reporter);
collector.init(context);
return collector;
}
@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runOldMapper(final JobConf job,
@ -392,11 +413,14 @@ void runOldMapper(final JobConf job,
int numReduceTasks = conf.getNumReduceTasks();
LOG.info("numReduceTasks: " + numReduceTasks);
MapOutputCollector collector = null;
MapOutputCollector<OUTKEY, OUTVALUE> collector = null;
if (numReduceTasks > 0) {
collector = new MapOutputBuffer(umbilical, job, reporter);
collector = createSortingCollector(job, reporter);
} else {
collector = new DirectMapOutputCollector(umbilical, job, reporter);
collector = new DirectMapOutputCollector<OUTKEY, OUTVALUE>();
MapOutputCollector.Context context =
new MapOutputCollector.Context(this, job, reporter);
collector.init(context);
}
MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
@ -642,7 +666,7 @@ private class NewOutputCollector<K,V>
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
collector = createSortingCollector(job, reporter);
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
@ -738,17 +762,6 @@ void runNewMapper(final JobConf job,
output.close(mapperContext);
}
interface MapOutputCollector<K, V> {
public void collect(K key, V value, int partition
) throws IOException, InterruptedException;
public void close() throws IOException, InterruptedException;
public void flush() throws IOException, InterruptedException,
ClassNotFoundException;
}
class DirectMapOutputCollector<K, V>
implements MapOutputCollector<K, V> {
@ -756,14 +769,18 @@ class DirectMapOutputCollector<K, V>
private TaskReporter reporter = null;
private final Counters.Counter mapOutputRecordCounter;
private final Counters.Counter fileOutputByteCounter;
private final List<Statistics> fsStats;
private Counters.Counter mapOutputRecordCounter;
private Counters.Counter fileOutputByteCounter;
private List<Statistics> fsStats;
public DirectMapOutputCollector() {
}
@SuppressWarnings("unchecked")
public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
JobConf job, TaskReporter reporter) throws IOException {
this.reporter = reporter;
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {
this.reporter = context.getReporter();
JobConf job = context.getJobConf();
String finalName = getOutputName(getPartition());
FileSystem fs = FileSystem.get(job);
@ -818,25 +835,27 @@ private long getOutputBytes(List<Statistics> stats) {
}
}
private class MapOutputBuffer<K extends Object, V extends Object>
@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
public static class MapOutputBuffer<K extends Object, V extends Object>
implements MapOutputCollector<K, V>, IndexedSortable {
final int partitions;
final JobConf job;
final TaskReporter reporter;
final Class<K> keyClass;
final Class<V> valClass;
final RawComparator<K> comparator;
final SerializationFactory serializationFactory;
final Serializer<K> keySerializer;
final Serializer<V> valSerializer;
final CombinerRunner<K,V> combinerRunner;
final CombineOutputCollector<K, V> combineCollector;
private int partitions;
private JobConf job;
private TaskReporter reporter;
private Class<K> keyClass;
private Class<V> valClass;
private RawComparator<K> comparator;
private SerializationFactory serializationFactory;
private Serializer<K> keySerializer;
private Serializer<V> valSerializer;
private CombinerRunner<K,V> combinerRunner;
private CombineOutputCollector<K, V> combineCollector;
// Compression for map-outputs
final CompressionCodec codec;
private CompressionCodec codec;
// k/v accounting
final IntBuffer kvmeta; // metadata overlay on backing store
private IntBuffer kvmeta; // metadata overlay on backing store
int kvstart; // marks origin of spill metadata
int kvend; // marks end of spill metadata
int kvindex; // marks end of fully serialized records
@ -860,15 +879,15 @@ private class MapOutputBuffer<K extends Object, V extends Object>
private static final int METASIZE = NMETA * 4; // size in bytes
// spill accounting
final int maxRec;
final int softLimit;
private int maxRec;
private int softLimit;
boolean spillInProgress;;
int bufferRemaining;
volatile Throwable sortSpillException = null;
int numSpills = 0;
final int minSpillsForCombine;
final IndexedSorter sorter;
private int minSpillsForCombine;
private IndexedSorter sorter;
final ReentrantLock spillLock = new ReentrantLock();
final Condition spillDone = spillLock.newCondition();
final Condition spillReady = spillLock.newCondition();
@ -876,12 +895,12 @@ private class MapOutputBuffer<K extends Object, V extends Object>
volatile boolean spillThreadRunning = false;
final SpillThread spillThread = new SpillThread();
final FileSystem rfs;
private FileSystem rfs;
// Counters
final Counters.Counter mapOutputByteCounter;
final Counters.Counter mapOutputRecordCounter;
final Counters.Counter fileOutputByteCounter;
private Counters.Counter mapOutputByteCounter;
private Counters.Counter mapOutputRecordCounter;
private Counters.Counter fileOutputByteCounter;
final ArrayList<SpillRecord> indexCacheList =
new ArrayList<SpillRecord>();
@ -889,12 +908,23 @@ private class MapOutputBuffer<K extends Object, V extends Object>
private int indexCacheMemoryLimit;
private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
private MapTask mapTask;
private MapOutputFile mapOutputFile;
private Progress sortPhase;
private Counters.Counter spilledRecordsCounter;
public MapOutputBuffer() {
}
@SuppressWarnings("unchecked")
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
this.job = job;
this.reporter = reporter;
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {
job = context.getJobConf();
reporter = context.getReporter();
mapTask = context.getMapTask();
mapOutputFile = mapTask.getMapOutputFile();
sortPhase = mapTask.getSortPhase();
spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
partitions = job.getNumReduceTasks();
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
@ -971,7 +1001,7 @@ public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
if (combinerRunner != null) {
final Counters.Counter combineOutputCounter =
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, conf);
combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
} else {
combineCollector = null;
}
@ -1122,6 +1152,10 @@ public synchronized void collect(K key, V value, final int partition
}
}
private TaskAttemptID getTaskID() {
return mapTask.getTaskID();
}
/**
* Set the point from which meta and serialization data expand. The meta
* indices are aligned with the buffer, so metadata never spans the ends of
@ -1494,7 +1528,7 @@ private void checkSpillException() throws IOException {
if (lspillException instanceof Error) {
final String logMsg = "Task " + getTaskID() + " failed : " +
StringUtils.stringifyException(lspillException);
reportFatalError(getTaskID(), lspillException, logMsg);
mapTask.reportFatalError(getTaskID(), lspillException, logMsg);
}
throw new IOException("Spill failed", lspillException);
}

View File

@ -26,6 +26,8 @@
import java.util.zip.CheckedOutputStream;
import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -34,7 +36,9 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.PureJavaCrc32;
class SpillRecord {
@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
public class SpillRecord {
/** Backing store */
private final ByteBuffer buf;
@ -143,17 +147,3 @@ public void writeToFile(Path loc, JobConf job, Checksum crc)
}
}
class IndexRecord {
long startOffset;
long rawLength;
long partLength;
public IndexRecord() { }
public IndexRecord(long startOffset, long rawLength, long partLength) {
this.startOffset = startOffset;
this.rawLength = rawLength;
this.partLength = partLength;
}
}

View File

@ -30,6 +30,9 @@ public interface MRJobConfig {
public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class";
public static final String MAP_OUTPUT_COLLECTOR_CLASS_ATTR
= "mapreduce.job.map.output.collector.class";
public static final String COMBINE_CLASS_ATTR = "mapreduce.job.combine.class";
public static final String REDUCE_CLASS_ATTR = "mapreduce.job.reduce.class";

View File

@ -938,4 +938,12 @@
<value>jhs/_HOST@REALM.TLD</value>
</property>
<property>
<name>mapreduce.job.map.output.collector.class</name>
<value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
<description>
It defines the MapOutputCollector implementation to use.
</description>
</property>
</configuration>

View File

@ -0,0 +1,405 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.Task.TaskReporter;
import junit.framework.TestCase;
@SuppressWarnings(value={"unchecked", "deprecation"})
/**
* This test tests the support for a merge operation in Hadoop. The input files
* are already sorted on the key. This test implements an external
* MapOutputCollector implementation that just copies the records to different
* partitions while maintaining the sort order in each partition. The Hadoop
* framework's merge on the reduce side will merge the partitions created to
* generate the final output which is sorted on the key.
*/
public class TestMerge extends TestCase {
private static final int NUM_HADOOP_DATA_NODES = 2;
// Number of input files is same as the number of mappers.
private static final int NUM_MAPPERS = 10;
// Number of reducers.
private static final int NUM_REDUCERS = 4;
// Number of lines per input file.
private static final int NUM_LINES = 1000;
// Where MR job's input will reside.
private static final Path INPUT_DIR = new Path("/testplugin/input");
// Where output goes.
private static final Path OUTPUT = new Path("/testplugin/output");
public void testMerge() throws Exception {
MiniDFSCluster dfsCluster = null;
MiniMRClientCluster mrCluster = null;
FileSystem fileSystem = null;
try {
Configuration conf = new Configuration();
// Start the mini-MR and mini-DFS clusters
dfsCluster = new MiniDFSCluster(conf, NUM_HADOOP_DATA_NODES, true, null);
fileSystem = dfsCluster.getFileSystem();
mrCluster = MiniMRClientClusterFactory.create(this.getClass(),
NUM_HADOOP_DATA_NODES, conf);
// Generate input.
createInput(fileSystem);
// Run the test.
runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem);
} finally {
if (dfsCluster != null) {
dfsCluster.shutdown();
}
if (mrCluster != null) {
mrCluster.stop();
}
}
}
private void createInput(FileSystem fs) throws Exception {
fs.delete(INPUT_DIR, true);
for (int i = 0; i < NUM_MAPPERS; i++) {
OutputStream os = fs.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
Writer writer = new OutputStreamWriter(os);
for (int j = 0; j < NUM_LINES; j++) {
// Create sorted key, value pairs.
int k = j + 1;
String formattedNumber = String.format("%09d", k);
writer.write(formattedNumber + " " + formattedNumber + "\n");
}
writer.close();
}
}
private void runMergeTest(JobConf job, FileSystem fileSystem)
throws Exception {
// Delete any existing output.
fileSystem.delete(OUTPUT, true);
job.setJobName("MergeTest");
JobClient client = new JobClient(job);
RunningJob submittedJob = null;
FileInputFormat.setInputPaths(job, INPUT_DIR);
FileOutputFormat.setOutputPath(job, OUTPUT);
job.set("mapreduce.output.textoutputformat.separator", " ");
job.setInputFormat(TextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MyMapper.class);
job.setPartitionerClass(MyPartitioner.class);
job.setOutputFormat(TextOutputFormat.class);
job.setNumReduceTasks(NUM_REDUCERS);
job.set(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
MapOutputCopier.class.getName());
try {
submittedJob = client.submitJob(job);
try {
if (! client.monitorAndPrintJob(job, submittedJob)) {
throw new IOException("Job failed!");
}
} catch(InterruptedException ie) {
Thread.currentThread().interrupt();
}
} catch(IOException ioe) {
System.err.println("Job failed with: " + ioe);
} finally {
verifyOutput(submittedJob, fileSystem);
}
}
private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem)
throws Exception {
FSDataInputStream dis = null;
long numValidRecords = 0;
long numInvalidRecords = 0;
long numMappersLaunched = NUM_MAPPERS;
String prevKeyValue = "000000000";
Path[] fileList =
FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
new Utils.OutputFileUtils.OutputFilesFilter()));
for (Path outFile : fileList) {
try {
dis = fileSystem.open(outFile);
String record;
while((record = dis.readLine()) != null) {
// Split the line into key and value.
int blankPos = record.indexOf(" ");
String keyString = record.substring(0, blankPos);
String valueString = record.substring(blankPos+1);
// Check for sorted output and correctness of record.
if (keyString.compareTo(prevKeyValue) >= 0
&& keyString.equals(valueString)) {
prevKeyValue = keyString;
numValidRecords++;
} else {
numInvalidRecords++;
}
}
} finally {
if (dis != null) {
dis.close();
dis = null;
}
}
}
// Make sure we got all input records in the output in sorted order.
assertEquals((long)(NUM_MAPPERS*NUM_LINES), numValidRecords);
// Make sure there is no extraneous invalid record.
assertEquals(0, numInvalidRecords);
}
/**
* A mapper implementation that assumes that key text contains valid integers
* in displayable form.
*/
public static class MyMapper extends MapReduceBase
implements Mapper<LongWritable, Text, Text, Text> {
private Text keyText;
private Text valueText;
public MyMapper() {
keyText = new Text();
valueText = new Text();
}
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String record = value.toString();
int blankPos = record.indexOf(" ");
keyText.set(record.substring(0, blankPos));
valueText.set(record.substring(blankPos+1));
output.collect(keyText, valueText);
}
public void close() throws IOException {
}
}
/**
* Partitioner implementation to make sure that output is in total sorted
* order. We basically route key ranges to different reducers such that
* key values monotonically increase with the partition number. For example,
* in this test, the keys are numbers from 1 to 1000 in the form "000000001"
* to "000001000" in each input file. The keys "000000001" to "000000250" are
* routed to partition 0, "000000251" to "000000500" are routed to partition 1
* and so on since we have 4 reducers.
*/
static class MyPartitioner implements Partitioner<Text, Text> {
public MyPartitioner() {
}
public void configure(JobConf job) {
}
public int getPartition(Text key, Text value, int numPartitions) {
int keyValue = 0;
try {
keyValue = Integer.parseInt(key.toString());
} catch(NumberFormatException nfe) {
keyValue = 0;
}
int partitionNumber = (numPartitions*(Math.max(0, keyValue-1)))/NUM_LINES;
return partitionNumber;
}
}
/**
* Implementation of map output copier(that avoids sorting) on the map side.
* It maintains keys in the input order within each partition created for
* reducers.
*/
static class MapOutputCopier<K, V>
implements MapOutputCollector<K, V> {
private static final int BUF_SIZE = 128*1024;
private MapTask mapTask;
private JobConf jobConf;
private TaskReporter reporter;
private int numberOfPartitions;
private Class<K> keyClass;
private Class<V> valueClass;
private KeyValueWriter<K, V> recordWriters[];
private ByteArrayOutputStream outStreams[];
public MapOutputCopier() {
}
@SuppressWarnings("unchecked")
public void init(MapOutputCollector.Context context)
throws IOException, ClassNotFoundException {
this.mapTask = context.getMapTask();
this.jobConf = context.getJobConf();
this.reporter = context.getReporter();
numberOfPartitions = jobConf.getNumReduceTasks();
keyClass = (Class<K>)jobConf.getMapOutputKeyClass();
valueClass = (Class<V>)jobConf.getMapOutputValueClass();
recordWriters = new KeyValueWriter[numberOfPartitions];
outStreams = new ByteArrayOutputStream[numberOfPartitions];
// Create output streams for partitions.
for (int i = 0; i < numberOfPartitions; i++) {
outStreams[i] = new ByteArrayOutputStream();
recordWriters[i] = new KeyValueWriter<K, V>(jobConf, outStreams[i],
keyClass, valueClass);
}
}
public synchronized void collect(K key, V value, int partitionNumber
) throws IOException, InterruptedException {
if (partitionNumber >= 0 && partitionNumber < numberOfPartitions) {
recordWriters[partitionNumber].write(key, value);
} else {
throw new IOException("Invalid partition number: " + partitionNumber);
}
reporter.progress();
}
public void close() throws IOException, InterruptedException {
long totalSize = 0;
for (int i = 0; i < numberOfPartitions; i++) {
recordWriters[i].close();
outStreams[i].close();
totalSize += outStreams[i].size();
}
MapOutputFile mapOutputFile = mapTask.getMapOutputFile();
Path finalOutput = mapOutputFile.getOutputFileForWrite(totalSize);
Path indexPath = mapOutputFile.getOutputIndexFileForWrite(
numberOfPartitions*mapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
// Copy partitions to final map output.
copyPartitions(finalOutput, indexPath);
}
public void flush() throws IOException, InterruptedException,
ClassNotFoundException {
}
private void copyPartitions(Path mapOutputPath, Path indexPath)
throws IOException {
FileSystem localFs = FileSystem.getLocal(jobConf);
FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
FSDataOutputStream rawOutput = rfs.create(mapOutputPath, true, BUF_SIZE);
SpillRecord spillRecord = new SpillRecord(numberOfPartitions);
IndexRecord indexRecord = new IndexRecord();
for (int i = 0; i < numberOfPartitions; i++) {
indexRecord.startOffset = rawOutput.getPos();
byte buffer[] = outStreams[i].toByteArray();
IFileOutputStream checksumOutput = new IFileOutputStream(rawOutput);
checksumOutput.write(buffer);
// Write checksum.
checksumOutput.finish();
// Write index record
indexRecord.rawLength = (long)buffer.length;
indexRecord.partLength = rawOutput.getPos() - indexRecord.startOffset;
spillRecord.putIndex(indexRecord, i);
reporter.progress();
}
rawOutput.close();
spillRecord.writeToFile(indexPath, jobConf);
}
}
static class KeyValueWriter<K, V> {
private Class<K> keyClass;
private Class<V> valueClass;
private DataOutputBuffer dataBuffer;
private Serializer<K> keySerializer;
private Serializer<V> valueSerializer;
private DataOutputStream outputStream;
public KeyValueWriter(Configuration conf, OutputStream output,
Class<K> kyClass, Class<V> valClass
) throws IOException {
keyClass = kyClass;
valueClass = valClass;
dataBuffer = new DataOutputBuffer();
SerializationFactory serializationFactory
= new SerializationFactory(conf);
keySerializer
= (Serializer<K>)serializationFactory.getSerializer(keyClass);
keySerializer.open(dataBuffer);
valueSerializer
= (Serializer<V>)serializationFactory.getSerializer(valueClass);
valueSerializer.open(dataBuffer);
outputStream = new DataOutputStream(output);
}
public void write(K key, V value) throws IOException {
if (key.getClass() != keyClass) {
throw new IOException("wrong key class: "+ key.getClass()
+" is not "+ keyClass);
}
if (value.getClass() != valueClass) {
throw new IOException("wrong value class: "+ value.getClass()
+" is not "+ valueClass);
}
// Append the 'key'
keySerializer.serialize(key);
int keyLength = dataBuffer.getLength();
if (keyLength < 0) {
throw new IOException("Negative key-length not allowed: " + keyLength +
" for " + key);
}
// Append the 'value'
valueSerializer.serialize(value);
int valueLength = dataBuffer.getLength() - keyLength;
if (valueLength < 0) {
throw new IOException("Negative value-length not allowed: " +
valueLength + " for " + value);
}
// Write the record out
WritableUtils.writeVInt(outputStream, keyLength);
WritableUtils.writeVInt(outputStream, valueLength);
outputStream.write(dataBuffer.getData(), 0, dataBuffer.getLength());
// Reset
dataBuffer.reset();
}
public void close() throws IOException {
keySerializer.close();
valueSerializer.close();
WritableUtils.writeVInt(outputStream, IFile.EOF_MARKER);
WritableUtils.writeVInt(outputStream, IFile.EOF_MARKER);
outputStream.close();
}
}
}