MAPREDUCE-5028. Fixed a bug in MapTask that was causing mappers to fail when a large value of io.sort.mb is set. Contributed by Karthik Kambatla.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1576170 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0edda25373
commit
98ecd4ffef
@ -90,7 +90,10 @@ public class DataInputBuffer extends DataInputStream {
|
||||
/** Returns the current position in the input. */
|
||||
public int getPosition() { return buffer.getPosition(); }
|
||||
|
||||
/** Returns the length of the input. */
|
||||
/**
|
||||
* Returns the index one greater than the last valid character in the input
|
||||
* stream buffer.
|
||||
*/
|
||||
public int getLength() { return buffer.getLength(); }
|
||||
|
||||
}
|
||||
|
@ -211,6 +211,9 @@ Release 2.4.0 - UNRELEASED
|
||||
MAPREDUCE-5780. SliveTest should use the specified path to get the
|
||||
particular FileSystem instead of using the default FileSystem. (szetszwo)
|
||||
|
||||
MAPREDUCE-5028. Fixed a bug in MapTask that was causing mappers to fail
|
||||
when a large value of io.sort.mb is set. (Karthik Kambatla via vinodkv)
|
||||
|
||||
Release 2.3.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -1176,8 +1176,9 @@ public class MapTask extends Task {
|
||||
equator = pos;
|
||||
// set index prior to first entry, aligned at meta boundary
|
||||
final int aligned = pos - (pos % METASIZE);
|
||||
kvindex =
|
||||
((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
|
||||
// Cast one of the operands to long to avoid integer overflow
|
||||
kvindex = (int)
|
||||
(((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
|
||||
LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
|
||||
"(" + (kvindex * 4) + ")");
|
||||
}
|
||||
@ -1192,8 +1193,9 @@ public class MapTask extends Task {
|
||||
bufstart = bufend = e;
|
||||
final int aligned = e - (e % METASIZE);
|
||||
// set start/end to point to first meta record
|
||||
kvstart = kvend =
|
||||
((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
|
||||
// Cast one of the operands to long to avoid integer overflow
|
||||
kvstart = kvend = (int)
|
||||
(((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
|
||||
LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
|
||||
(kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
|
||||
}
|
||||
|
@ -141,7 +141,8 @@ public class ReduceContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
|
||||
buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
|
||||
key = keyDeserializer.deserialize(key);
|
||||
DataInputBuffer nextVal = input.getValue();
|
||||
buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
|
||||
buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
|
||||
- nextVal.getPosition());
|
||||
value = valueDeserializer.deserialize(value);
|
||||
|
||||
currentKeyLength = nextKey.getLength() - nextKey.getPosition();
|
||||
@ -205,7 +206,8 @@ public class ReduceContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
|
||||
if (backupStore.hasNext()) {
|
||||
backupStore.next();
|
||||
DataInputBuffer next = backupStore.nextValue();
|
||||
buffer.reset(next.getData(), next.getPosition(), next.getLength());
|
||||
buffer.reset(next.getData(), next.getPosition(), next.getLength()
|
||||
- next.getPosition());
|
||||
value = valueDeserializer.deserialize(value);
|
||||
return value;
|
||||
} else {
|
||||
|
@ -37,9 +37,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
public class InMemoryReader<K, V> extends Reader<K, V> {
|
||||
private final TaskAttemptID taskAttemptId;
|
||||
private final MergeManagerImpl<K,V> merger;
|
||||
DataInputBuffer memDataIn = new DataInputBuffer();
|
||||
private int start;
|
||||
private int length;
|
||||
private final DataInputBuffer memDataIn = new DataInputBuffer();
|
||||
private final int start;
|
||||
private final int length;
|
||||
|
||||
public InMemoryReader(MergeManagerImpl<K,V> merger, TaskAttemptID taskAttemptId,
|
||||
byte[] data, int start, int length, Configuration conf)
|
||||
@ -50,14 +50,14 @@ public class InMemoryReader<K, V> extends Reader<K, V> {
|
||||
|
||||
buffer = data;
|
||||
bufferSize = (int)fileLength;
|
||||
memDataIn.reset(buffer, start, length);
|
||||
memDataIn.reset(buffer, start, length - start);
|
||||
this.start = start;
|
||||
this.length = length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset(int offset) {
|
||||
memDataIn.reset(buffer, start + offset, length);
|
||||
memDataIn.reset(buffer, start + offset, length - start - offset);
|
||||
bytesRead = offset;
|
||||
eof = false;
|
||||
}
|
||||
|
@ -0,0 +1,269 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
/**
|
||||
* A sample MR job that helps with testing large sorts in the MapReduce
|
||||
* framework. Mapper generates the specified number of bytes and pipes them
|
||||
* to the reducers.
|
||||
*
|
||||
* <code>mapreduce.large-sorter.mbs-per-map</code> specifies the amount
|
||||
* of data (in MBs) to generate per map. By default, this is twice the value
|
||||
* of <code>mapreduce.task.io.sort.mb</code> or 1 GB if that is not specified
|
||||
* either.
|
||||
* <code>mapreduce.large-sorter.map-tasks</code> specifies the number of map
|
||||
* tasks to run.
|
||||
* <code>mapreduce.large-sorter.reduce-tasks</code> specifies the number of
|
||||
* reduce tasks to run.
|
||||
*/
|
||||
public class LargeSorter extends Configured implements Tool {
|
||||
private static final String LS_PREFIX = "mapreduce.large-sorter.";
|
||||
|
||||
public static final String MBS_PER_MAP = LS_PREFIX + "mbs-per-map";
|
||||
public static final String NUM_MAP_TASKS = LS_PREFIX + "map-tasks";
|
||||
public static final String NUM_REDUCE_TASKS = LS_PREFIX + "reduce-tasks";
|
||||
|
||||
private static final String MAX_VALUE = LS_PREFIX + "max-value";
|
||||
private static final String MIN_VALUE = LS_PREFIX + "min-value";
|
||||
private static final String MIN_KEY = LS_PREFIX + "min-key";
|
||||
private static final String MAX_KEY = LS_PREFIX + "max-key";
|
||||
|
||||
/**
|
||||
* User counters
|
||||
*/
|
||||
static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
|
||||
|
||||
/**
|
||||
* A custom input format that creates virtual inputs of a single string
|
||||
* for each map.
|
||||
*/
|
||||
static class RandomInputFormat extends InputFormat<Text, Text> {
|
||||
|
||||
/**
|
||||
* Generate the requested number of file splits, with the filename
|
||||
* set to the filename of the output file.
|
||||
*/
|
||||
public List<InputSplit> getSplits(JobContext job) throws IOException {
|
||||
List<InputSplit> result = new ArrayList<InputSplit>();
|
||||
Path outDir = FileOutputFormat.getOutputPath(job);
|
||||
int numSplits =
|
||||
job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
|
||||
for(int i=0; i < numSplits; ++i) {
|
||||
result.add(new FileSplit(
|
||||
new Path(outDir, "dummy-split-" + i), 0, 1, null));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a single record (filename, "") where the filename is taken from
|
||||
* the file split.
|
||||
*/
|
||||
static class RandomRecordReader extends RecordReader<Text, Text> {
|
||||
Path name;
|
||||
Text key = null;
|
||||
Text value = new Text();
|
||||
public RandomRecordReader(Path p) {
|
||||
name = p;
|
||||
}
|
||||
|
||||
public void initialize(InputSplit split,
|
||||
TaskAttemptContext context)
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
}
|
||||
|
||||
public boolean nextKeyValue() {
|
||||
if (name != null) {
|
||||
key = new Text();
|
||||
key.set(name.getName());
|
||||
name = null;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public Text getCurrentKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public Text getCurrentValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public void close() {}
|
||||
|
||||
public float getProgress() {
|
||||
return 0.0f;
|
||||
}
|
||||
}
|
||||
|
||||
public RecordReader<Text, Text> createRecordReader(InputSplit split,
|
||||
TaskAttemptContext context) throws IOException, InterruptedException {
|
||||
return new RandomRecordReader(((FileSplit) split).getPath());
|
||||
}
|
||||
}
|
||||
|
||||
static class RandomMapper extends Mapper<WritableComparable, Writable,
|
||||
BytesWritable, BytesWritable> {
|
||||
|
||||
private long numBytesToWrite;
|
||||
private int minKeySize;
|
||||
private int keySizeRange;
|
||||
private int minValueSize;
|
||||
private int valueSizeRange;
|
||||
private Random random = new Random();
|
||||
private BytesWritable randomKey = new BytesWritable();
|
||||
private BytesWritable randomValue = new BytesWritable();
|
||||
|
||||
private void randomizeBytes(byte[] data, int offset, int length) {
|
||||
for(int i=offset + length - 1; i >= offset; --i) {
|
||||
data[i] = (byte) random.nextInt(256);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup(Context context) {
|
||||
Configuration conf = context.getConfiguration();
|
||||
numBytesToWrite = 1024 * 1024 * conf.getLong(MBS_PER_MAP,
|
||||
2 * conf.getInt(MRJobConfig.IO_SORT_MB, 512));
|
||||
minKeySize = conf.getInt(MIN_KEY, 10);
|
||||
keySizeRange =
|
||||
conf.getInt(MAX_KEY, 1000) - minKeySize;
|
||||
minValueSize = conf.getInt(MIN_VALUE, 0);
|
||||
valueSizeRange =
|
||||
conf.getInt(MAX_VALUE, 20000) - minValueSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given an output filename, write a bunch of random records to it.
|
||||
*/
|
||||
public void map(WritableComparable key,
|
||||
Writable value,
|
||||
Context context) throws IOException,InterruptedException {
|
||||
int itemCount = 0;
|
||||
while (numBytesToWrite > 0) {
|
||||
int keyLength = minKeySize +
|
||||
(keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
|
||||
randomKey.setSize(keyLength);
|
||||
randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
|
||||
int valueLength = minValueSize +
|
||||
(valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
|
||||
randomValue.setSize(valueLength);
|
||||
randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
|
||||
context.write(randomKey, randomValue);
|
||||
numBytesToWrite -= keyLength + valueLength;
|
||||
context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength);
|
||||
context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
|
||||
if (++itemCount % 200 == 0) {
|
||||
context.setStatus("wrote record " + itemCount + ". " +
|
||||
numBytesToWrite + " bytes left.");
|
||||
}
|
||||
}
|
||||
context.setStatus("done with " + itemCount + " records.");
|
||||
}
|
||||
}
|
||||
|
||||
static class Discarder extends Reducer<BytesWritable, BytesWritable,
|
||||
WritableComparable, Writable> {
|
||||
@Override
|
||||
public void reduce(BytesWritable key, Iterable<BytesWritable> values,
|
||||
Context context) throws IOException, InterruptedException {
|
||||
// Do nothing
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyNotZero(Configuration conf, String config) {
|
||||
if (conf.getInt(config, 1) <= 0) {
|
||||
throw new IllegalArgumentException(config + "should be > 0");
|
||||
}
|
||||
}
|
||||
|
||||
public int run(String[] args) throws Exception {
|
||||
Path outDir = new Path(
|
||||
LargeSorter.class.getName() + System.currentTimeMillis());
|
||||
|
||||
Configuration conf = getConf();
|
||||
verifyNotZero(conf, MBS_PER_MAP);
|
||||
verifyNotZero(conf, NUM_MAP_TASKS);
|
||||
|
||||
conf.setInt(MRJobConfig.NUM_MAPS, conf.getInt(NUM_MAP_TASKS, 2));
|
||||
|
||||
int ioSortMb = conf.getInt(MRJobConfig.IO_SORT_MB, 512);
|
||||
int mapMb = Math.max(2 * ioSortMb, conf.getInt(MRJobConfig.MAP_MEMORY_MB,
|
||||
MRJobConfig.DEFAULT_MAP_MEMORY_MB));
|
||||
conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMb);
|
||||
conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx" + (mapMb - 200) + "m");
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
Job job = new Job(conf);
|
||||
job.setJarByClass(LargeSorter.class);
|
||||
job.setJobName("large-sorter");
|
||||
FileOutputFormat.setOutputPath(job, outDir);
|
||||
job.setOutputKeyClass(BytesWritable.class);
|
||||
job.setOutputValueClass(BytesWritable.class);
|
||||
job.setInputFormatClass(RandomInputFormat.class);
|
||||
job.setMapperClass(RandomMapper.class);
|
||||
job.setReducerClass(Discarder.class);
|
||||
job.setOutputFormatClass(SequenceFileOutputFormat.class);
|
||||
job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS, 1));
|
||||
|
||||
Date startTime = new Date();
|
||||
System.out.println("Job started: " + startTime);
|
||||
int ret = 1;
|
||||
try {
|
||||
ret = job.waitForCompletion(true) ? 0 : 1;
|
||||
} finally {
|
||||
FileSystem.get(conf).delete(outDir, true);
|
||||
}
|
||||
Date endTime = new Date();
|
||||
System.out.println("Job ended: " + endTime);
|
||||
System.out.println("The job took " +
|
||||
(endTime.getTime() - startTime.getTime()) /1000 +
|
||||
" seconds.");
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
int res = ToolRunner.run(new Configuration(), new LargeSorter(), args);
|
||||
System.exit(res);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,65 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapred.MiniMRClientCluster;
|
||||
import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class TestLargeSort {
|
||||
MiniMRClientCluster cluster;
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
cluster = MiniMRClientClusterFactory.create(this.getClass(), 2, conf);
|
||||
cluster.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() throws IOException {
|
||||
if (cluster != null) {
|
||||
cluster.stop();
|
||||
cluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLargeSort() throws Exception {
|
||||
String[] args = new String[0];
|
||||
int[] ioSortMbs = {128, 256, 1536};
|
||||
for (int ioSortMb : ioSortMbs) {
|
||||
Configuration conf = new Configuration(cluster.getConfig());
|
||||
conf.setInt(MRJobConfig.IO_SORT_MB, ioSortMb);
|
||||
conf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
|
||||
conf.setInt(LargeSorter.MBS_PER_MAP, ioSortMb);
|
||||
assertEquals("Large sort failed for " + ioSortMb, 0,
|
||||
ToolRunner.run(conf, new LargeSorter(), args));
|
||||
}
|
||||
}
|
||||
}
|
@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.TestSequenceFileInputFormat;
|
||||
import org.apache.hadoop.mapred.TestTextInputFormat;
|
||||
import org.apache.hadoop.mapred.ThreadedMapBenchmark;
|
||||
import org.apache.hadoop.mapreduce.FailJob;
|
||||
import org.apache.hadoop.mapreduce.LargeSorter;
|
||||
import org.apache.hadoop.mapreduce.MiniHadoopClusterManager;
|
||||
import org.apache.hadoop.mapreduce.SleepJob;
|
||||
import org.apache.hadoop.util.ProgramDriver;
|
||||
@ -104,6 +105,8 @@ public class MapredTestDriver {
|
||||
"HDFS Stress Test and Live Data Verification.");
|
||||
pgd.addClass("minicluster", MiniHadoopClusterManager.class,
|
||||
"Single process HDFS and MR cluster.");
|
||||
pgd.addClass("largesorter", LargeSorter.class,
|
||||
"Large-Sort tester");
|
||||
} catch(Throwable e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user