diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 515ded4f7a..cf36ee4341 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -138,6 +138,9 @@ Trunk (Unreleased) MAPREDUCE-4695. Fix LocalRunner on trunk after MAPREDUCE-3223 broke it (harsh) + MAPREDUCE-4574. Fix TotalOrderParitioner to work with + non-WritableComparable key types. (harsh) + Release 2.0.3-alpha - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java index 14e0962da2..f393876e42 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java @@ -31,7 +31,7 @@ */ @InterfaceAudience.Public @InterfaceStability.Stable -public class TotalOrderPartitioner,V> +public class TotalOrderPartitioner extends org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner implements Partitioner { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java index fa12976c26..632abdfc06 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java @@ -47,7 +47,7 @@ */ @InterfaceAudience.Public @InterfaceStability.Stable -public class TotalOrderPartitioner,V> +public class TotalOrderPartitioner extends Partitioner implements Configurable { private Node partitions; @@ -298,12 +298,13 @@ public int findPartition(BinaryComparable key) { @SuppressWarnings("unchecked") // map output key class private K[] readPartitions(FileSystem fs, Path p, Class keyClass, Configuration conf) throws IOException { - SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf); + SequenceFile.Reader reader = new SequenceFile.Reader( + conf, + SequenceFile.Reader.file(p)); ArrayList parts = new ArrayList(); K key = ReflectionUtils.newInstance(keyClass, conf); - NullWritable value = NullWritable.get(); try { - while (reader.next(key, value)) { + while ((key = (K) reader.next(key)) != null) { parts.add(key); key = ReflectionUtils.newInstance(keyClass, conf); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java index a3cd18c4ba..a844737e09 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java @@ -21,19 +21,25 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.serializer.JavaSerialization; +import org.apache.hadoop.io.serializer.JavaSerializationComparator; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.WritableSerialization; import org.apache.hadoop.mapreduce.MRJobConfig; public class TestTotalOrderPartitioner extends TestCase { @@ -51,6 +57,19 @@ public class TestTotalOrderPartitioner extends TestCase { new Text("yak"), // 9 }; + private static final String[] splitJavaStrings = new String[] { + // -inf // 0 + new String("aabbb"), // 1 + new String("babbb"), // 2 + new String("daddd"), // 3 + new String("dddee"), // 4 + new String("ddhee"), // 5 + new String("dingo"), // 6 + new String("hijjj"), // 7 + new String("n"), // 8 + new String("yak"), // 9 + }; + static class Check { T data; int part; @@ -76,19 +95,41 @@ static class Check { testStrings.add(new Check(new Text("hi"), 6)); }; - private static > Path writePartitionFile( + private static final ArrayList> testJavaStrings = + new ArrayList>(); + static { + testJavaStrings.add(new Check(new String("aaaaa"), 0)); + testJavaStrings.add(new Check(new String("aaabb"), 0)); + testJavaStrings.add(new Check(new String("aabbb"), 1)); + testJavaStrings.add(new Check(new String("aaaaa"), 0)); + testJavaStrings.add(new Check(new String("babbb"), 2)); + testJavaStrings.add(new Check(new String("baabb"), 1)); + testJavaStrings.add(new Check(new String("yai"), 8)); + testJavaStrings.add(new Check(new String("yak"), 9)); + testJavaStrings.add(new Check(new String("z"), 9)); + testJavaStrings.add(new Check(new String("ddngo"), 5)); + testJavaStrings.add(new Check(new String("hi"), 6)); + }; + + + private static Path writePartitionFile( String testname, Configuration conf, T[] splits) throws IOException { final FileSystem fs = FileSystem.getLocal(conf); final Path testdir = new Path(System.getProperty("test.build.data", "/tmp") - ).makeQualified(fs); + ).makeQualified( + fs.getUri(), + fs.getWorkingDirectory()); Path p = new Path(testdir, testname + "/_partition.lst"); TotalOrderPartitioner.setPartitionFile(conf, p); conf.setInt(MRJobConfig.NUM_REDUCES, splits.length + 1); SequenceFile.Writer w = null; try { - w = SequenceFile.createWriter(fs, conf, p, - splits[0].getClass(), NullWritable.class, - SequenceFile.CompressionType.NONE); + w = SequenceFile.createWriter( + conf, + SequenceFile.Writer.file(p), + SequenceFile.Writer.keyClass(splits[0].getClass()), + SequenceFile.Writer.valueClass(NullWritable.class), + SequenceFile.Writer.compression(CompressionType.NONE)); for (int i = 0; i < splits.length; ++i) { w.append(splits[i], NullWritable.get()); } @@ -99,6 +140,31 @@ private static > Path writePartitionFile( return p; } + public void testTotalOrderWithCustomSerialization() throws Exception { + TotalOrderPartitioner partitioner = + new TotalOrderPartitioner(); + Configuration conf = new Configuration(); + conf.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, + JavaSerialization.class.getName(), + WritableSerialization.class.getName()); + conf.setClass(MRJobConfig.KEY_COMPARATOR, + JavaSerializationComparator.class, + Comparator.class); + Path p = TestTotalOrderPartitioner.writePartitionFile( + "totalordercustomserialization", conf, splitJavaStrings); + conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, String.class, Object.class); + try { + partitioner.setConf(conf); + NullWritable nw = NullWritable.get(); + for (Check chk : testJavaStrings) { + assertEquals(chk.data.toString(), chk.part, + partitioner.getPartition(chk.data, nw, splitJavaStrings.length + 1)); + } + } finally { + p.getFileSystem(conf).delete(p, true); + } + } + public void testTotalOrderMemCmp() throws Exception { TotalOrderPartitioner partitioner = new TotalOrderPartitioner();