From 98ecd4ffef333cb6703e922de3f1d8512cacefed Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 11 Mar 2014 01:29:56 +0000 Subject: [PATCH] MAPREDUCE-5028. Fixed a bug in MapTask that was causing mappers to fail when a large value of io.sort.mb is set. Contributed by Karthik Kambatla. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1576170 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/hadoop/io/DataInputBuffer.java | 5 +- hadoop-mapreduce-project/CHANGES.txt | 3 + .../org/apache/hadoop/mapred/MapTask.java | 10 +- .../mapreduce/task/ReduceContextImpl.java | 6 +- .../mapreduce/task/reduce/InMemoryReader.java | 10 +- .../apache/hadoop/mapreduce/LargeSorter.java | 269 ++++++++++++++++++ .../hadoop/mapreduce/TestLargeSort.java | 65 +++++ .../apache/hadoop/test/MapredTestDriver.java | 3 + 8 files changed, 359 insertions(+), 12 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/LargeSorter.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLargeSort.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/DataInputBuffer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/DataInputBuffer.java index cc5500f32f..f0b6726716 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/DataInputBuffer.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/DataInputBuffer.java @@ -90,7 +90,10 @@ public class DataInputBuffer extends DataInputStream { /** Returns the current position in the input. */ public int getPosition() { return buffer.getPosition(); } - /** Returns the length of the input. */ + /** + * Returns the index one greater than the last valid character in the input + * stream buffer. + */ public int getLength() { return buffer.getLength(); } } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5c3e0a0ac7..54f3325d50 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -211,6 +211,9 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5780. SliveTest should use the specified path to get the particular FileSystem instead of using the default FileSystem. (szetszwo) + MAPREDUCE-5028. Fixed a bug in MapTask that was causing mappers to fail + when a large value of io.sort.mb is set. (Karthik Kambatla via vinodkv) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java index 0af58ebe72..84fdd92cc5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java @@ -1176,8 +1176,9 @@ public class MapTask extends Task { equator = pos; // set index prior to first entry, aligned at meta boundary final int aligned = pos - (pos % METASIZE); - kvindex = - ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4; + // Cast one of the operands to long to avoid integer overflow + kvindex = (int) + (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4; LOG.info("(EQUATOR) " + pos + " kvi " + kvindex + "(" + (kvindex * 4) + ")"); } @@ -1192,8 +1193,9 @@ public class MapTask extends Task { bufstart = bufend = e; final int aligned = e - (e % METASIZE); // set start/end to point to first meta record - kvstart = kvend = - ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4; + // Cast one of the operands to long to avoid integer overflow + kvstart = kvend = (int) + (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4; LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" + (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")"); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java index 7ad08e956d..43ce837092 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java @@ -141,7 +141,8 @@ public class ReduceContextImpl buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength()); key = keyDeserializer.deserialize(key); DataInputBuffer nextVal = input.getValue(); - buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()); + buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength() + - nextVal.getPosition()); value = valueDeserializer.deserialize(value); currentKeyLength = nextKey.getLength() - nextKey.getPosition(); @@ -205,7 +206,8 @@ public class ReduceContextImpl if (backupStore.hasNext()) { backupStore.next(); DataInputBuffer next = backupStore.nextValue(); - buffer.reset(next.getData(), next.getPosition(), next.getLength()); + buffer.reset(next.getData(), next.getPosition(), next.getLength() + - next.getPosition()); value = valueDeserializer.deserialize(value); return value; } else { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java index 7b8f63a2f3..b246d24f2b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java @@ -37,9 +37,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; public class InMemoryReader extends Reader { private final TaskAttemptID taskAttemptId; private final MergeManagerImpl merger; - DataInputBuffer memDataIn = new DataInputBuffer(); - private int start; - private int length; + private final DataInputBuffer memDataIn = new DataInputBuffer(); + private final int start; + private final int length; public InMemoryReader(MergeManagerImpl merger, TaskAttemptID taskAttemptId, byte[] data, int start, int length, Configuration conf) @@ -50,14 +50,14 @@ public class InMemoryReader extends Reader { buffer = data; bufferSize = (int)fileLength; - memDataIn.reset(buffer, start, length); + memDataIn.reset(buffer, start, length - start); this.start = start; this.length = length; } @Override public void reset(int offset) { - memDataIn.reset(buffer, start + offset, length); + memDataIn.reset(buffer, start + offset, length - start - offset); bytesRead = offset; eof = false; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/LargeSorter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/LargeSorter.java new file mode 100644 index 0000000000..c5be44642e --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/LargeSorter.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * A sample MR job that helps with testing large sorts in the MapReduce + * framework. Mapper generates the specified number of bytes and pipes them + * to the reducers. + * + * mapreduce.large-sorter.mbs-per-map specifies the amount + * of data (in MBs) to generate per map. By default, this is twice the value + * of mapreduce.task.io.sort.mb or 1 GB if that is not specified + * either. + * mapreduce.large-sorter.map-tasks specifies the number of map + * tasks to run. + * mapreduce.large-sorter.reduce-tasks specifies the number of + * reduce tasks to run. + */ +public class LargeSorter extends Configured implements Tool { + private static final String LS_PREFIX = "mapreduce.large-sorter."; + + public static final String MBS_PER_MAP = LS_PREFIX + "mbs-per-map"; + public static final String NUM_MAP_TASKS = LS_PREFIX + "map-tasks"; + public static final String NUM_REDUCE_TASKS = LS_PREFIX + "reduce-tasks"; + + private static final String MAX_VALUE = LS_PREFIX + "max-value"; + private static final String MIN_VALUE = LS_PREFIX + "min-value"; + private static final String MIN_KEY = LS_PREFIX + "min-key"; + private static final String MAX_KEY = LS_PREFIX + "max-key"; + + /** + * User counters + */ + static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN } + + /** + * A custom input format that creates virtual inputs of a single string + * for each map. + */ + static class RandomInputFormat extends InputFormat { + + /** + * Generate the requested number of file splits, with the filename + * set to the filename of the output file. + */ + public List getSplits(JobContext job) throws IOException { + List result = new ArrayList(); + Path outDir = FileOutputFormat.getOutputPath(job); + int numSplits = + job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1); + for(int i=0; i < numSplits; ++i) { + result.add(new FileSplit( + new Path(outDir, "dummy-split-" + i), 0, 1, null)); + } + return result; + } + + /** + * Return a single record (filename, "") where the filename is taken from + * the file split. + */ + static class RandomRecordReader extends RecordReader { + Path name; + Text key = null; + Text value = new Text(); + public RandomRecordReader(Path p) { + name = p; + } + + public void initialize(InputSplit split, + TaskAttemptContext context) + throws IOException, InterruptedException { + + } + + public boolean nextKeyValue() { + if (name != null) { + key = new Text(); + key.set(name.getName()); + name = null; + return true; + } + return false; + } + + public Text getCurrentKey() { + return key; + } + + public Text getCurrentValue() { + return value; + } + + public void close() {} + + public float getProgress() { + return 0.0f; + } + } + + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + return new RandomRecordReader(((FileSplit) split).getPath()); + } + } + + static class RandomMapper extends Mapper { + + private long numBytesToWrite; + private int minKeySize; + private int keySizeRange; + private int minValueSize; + private int valueSizeRange; + private Random random = new Random(); + private BytesWritable randomKey = new BytesWritable(); + private BytesWritable randomValue = new BytesWritable(); + + private void randomizeBytes(byte[] data, int offset, int length) { + for(int i=offset + length - 1; i >= offset; --i) { + data[i] = (byte) random.nextInt(256); + } + } + + @Override + public void setup(Context context) { + Configuration conf = context.getConfiguration(); + numBytesToWrite = 1024 * 1024 * conf.getLong(MBS_PER_MAP, + 2 * conf.getInt(MRJobConfig.IO_SORT_MB, 512)); + minKeySize = conf.getInt(MIN_KEY, 10); + keySizeRange = + conf.getInt(MAX_KEY, 1000) - minKeySize; + minValueSize = conf.getInt(MIN_VALUE, 0); + valueSizeRange = + conf.getInt(MAX_VALUE, 20000) - minValueSize; + } + + /** + * Given an output filename, write a bunch of random records to it. + */ + public void map(WritableComparable key, + Writable value, + Context context) throws IOException,InterruptedException { + int itemCount = 0; + while (numBytesToWrite > 0) { + int keyLength = minKeySize + + (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0); + randomKey.setSize(keyLength); + randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength()); + int valueLength = minValueSize + + (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0); + randomValue.setSize(valueLength); + randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength()); + context.write(randomKey, randomValue); + numBytesToWrite -= keyLength + valueLength; + context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength); + context.getCounter(Counters.RECORDS_WRITTEN).increment(1); + if (++itemCount % 200 == 0) { + context.setStatus("wrote record " + itemCount + ". " + + numBytesToWrite + " bytes left."); + } + } + context.setStatus("done with " + itemCount + " records."); + } + } + + static class Discarder extends Reducer { + @Override + public void reduce(BytesWritable key, Iterable values, + Context context) throws IOException, InterruptedException { + // Do nothing + } + } + + private void verifyNotZero(Configuration conf, String config) { + if (conf.getInt(config, 1) <= 0) { + throw new IllegalArgumentException(config + "should be > 0"); + } + } + + public int run(String[] args) throws Exception { + Path outDir = new Path( + LargeSorter.class.getName() + System.currentTimeMillis()); + + Configuration conf = getConf(); + verifyNotZero(conf, MBS_PER_MAP); + verifyNotZero(conf, NUM_MAP_TASKS); + + conf.setInt(MRJobConfig.NUM_MAPS, conf.getInt(NUM_MAP_TASKS, 2)); + + int ioSortMb = conf.getInt(MRJobConfig.IO_SORT_MB, 512); + int mapMb = Math.max(2 * ioSortMb, conf.getInt(MRJobConfig.MAP_MEMORY_MB, + MRJobConfig.DEFAULT_MAP_MEMORY_MB)); + conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMb); + conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx" + (mapMb - 200) + "m"); + + @SuppressWarnings("deprecation") + Job job = new Job(conf); + job.setJarByClass(LargeSorter.class); + job.setJobName("large-sorter"); + FileOutputFormat.setOutputPath(job, outDir); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(BytesWritable.class); + job.setInputFormatClass(RandomInputFormat.class); + job.setMapperClass(RandomMapper.class); + job.setReducerClass(Discarder.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS, 1)); + + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int ret = 1; + try { + ret = job.waitForCompletion(true) ? 0 : 1; + } finally { + FileSystem.get(conf).delete(outDir, true); + } + Date endTime = new Date(); + System.out.println("Job ended: " + endTime); + System.out.println("The job took " + + (endTime.getTime() - startTime.getTime()) /1000 + + " seconds."); + + return ret; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new LargeSorter(), args); + System.exit(res); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLargeSort.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLargeSort.java new file mode 100644 index 0000000000..ab99a2fe0c --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLargeSort.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.MiniMRClientCluster; +import org.apache.hadoop.mapred.MiniMRClientClusterFactory; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class TestLargeSort { + MiniMRClientCluster cluster; + + @Before + public void setup() throws IOException { + Configuration conf = new YarnConfiguration(); + cluster = MiniMRClientClusterFactory.create(this.getClass(), 2, conf); + cluster.start(); + } + + @After + public void cleanup() throws IOException { + if (cluster != null) { + cluster.stop(); + cluster = null; + } + } + + @Test + public void testLargeSort() throws Exception { + String[] args = new String[0]; + int[] ioSortMbs = {128, 256, 1536}; + for (int ioSortMb : ioSortMbs) { + Configuration conf = new Configuration(cluster.getConfig()); + conf.setInt(MRJobConfig.IO_SORT_MB, ioSortMb); + conf.setInt(LargeSorter.NUM_MAP_TASKS, 1); + conf.setInt(LargeSorter.MBS_PER_MAP, ioSortMb); + assertEquals("Large sort failed for " + ioSortMb, 0, + ToolRunner.run(conf, new LargeSorter(), args)); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java index cc7e63fbc5..f2cd53cdfa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java @@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.TestSequenceFileInputFormat; import org.apache.hadoop.mapred.TestTextInputFormat; import org.apache.hadoop.mapred.ThreadedMapBenchmark; import org.apache.hadoop.mapreduce.FailJob; +import org.apache.hadoop.mapreduce.LargeSorter; import org.apache.hadoop.mapreduce.MiniHadoopClusterManager; import org.apache.hadoop.mapreduce.SleepJob; import org.apache.hadoop.util.ProgramDriver; @@ -104,6 +105,8 @@ public class MapredTestDriver { "HDFS Stress Test and Live Data Verification."); pgd.addClass("minicluster", MiniHadoopClusterManager.class, "Single process HDFS and MR cluster."); + pgd.addClass("largesorter", LargeSorter.class, + "Large-Sort tester"); } catch(Throwable e) { e.printStackTrace(); }