MAPREDUCE-4574. Fix TotalOrderParitioner to work with non-WritableComparable key types. Contributed by Harsh J. (harsh)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1395936 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Harsh J 2012-10-09 09:58:21 +00:00
parent 8ac3910ae0
commit 5c3a331040
4 changed files with 81 additions and 11 deletions

View File

@ -138,6 +138,9 @@ Trunk (Unreleased)
MAPREDUCE-4695. Fix LocalRunner on trunk after MAPREDUCE-3223 broke it MAPREDUCE-4695. Fix LocalRunner on trunk after MAPREDUCE-3223 broke it
(harsh) (harsh)
MAPREDUCE-4574. Fix TotalOrderParitioner to work with
non-WritableComparable key types. (harsh)
Release 2.0.3-alpha - Unreleased Release 2.0.3-alpha - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -31,7 +31,7 @@
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public class TotalOrderPartitioner<K extends WritableComparable<?>,V> public class TotalOrderPartitioner<K ,V>
extends org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner<K, V> extends org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner<K, V>
implements Partitioner<K,V> { implements Partitioner<K,V> {

View File

@ -47,7 +47,7 @@
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public class TotalOrderPartitioner<K extends WritableComparable<?>,V> public class TotalOrderPartitioner<K,V>
extends Partitioner<K,V> implements Configurable { extends Partitioner<K,V> implements Configurable {
private Node partitions; private Node partitions;
@ -298,12 +298,13 @@ public int findPartition(BinaryComparable key) {
@SuppressWarnings("unchecked") // map output key class @SuppressWarnings("unchecked") // map output key class
private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass, private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
Configuration conf) throws IOException { Configuration conf) throws IOException {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf); SequenceFile.Reader reader = new SequenceFile.Reader(
conf,
SequenceFile.Reader.file(p));
ArrayList<K> parts = new ArrayList<K>(); ArrayList<K> parts = new ArrayList<K>();
K key = ReflectionUtils.newInstance(keyClass, conf); K key = ReflectionUtils.newInstance(keyClass, conf);
NullWritable value = NullWritable.get();
try { try {
while (reader.next(key, value)) { while ((key = (K) reader.next(key)) != null) {
parts.add(key); parts.add(key);
key = ReflectionUtils.newInstance(keyClass, conf); key = ReflectionUtils.newInstance(keyClass, conf);
} }

View File

@ -21,19 +21,25 @@
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.serializer.JavaSerialization;
import org.apache.hadoop.io.serializer.JavaSerializationComparator;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.WritableSerialization;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
public class TestTotalOrderPartitioner extends TestCase { public class TestTotalOrderPartitioner extends TestCase {
@ -51,6 +57,19 @@ public class TestTotalOrderPartitioner extends TestCase {
new Text("yak"), // 9 new Text("yak"), // 9
}; };
private static final String[] splitJavaStrings = new String[] {
// -inf // 0
new String("aabbb"), // 1
new String("babbb"), // 2
new String("daddd"), // 3
new String("dddee"), // 4
new String("ddhee"), // 5
new String("dingo"), // 6
new String("hijjj"), // 7
new String("n"), // 8
new String("yak"), // 9
};
static class Check<T> { static class Check<T> {
T data; T data;
int part; int part;
@ -76,19 +95,41 @@ static class Check<T> {
testStrings.add(new Check<Text>(new Text("hi"), 6)); testStrings.add(new Check<Text>(new Text("hi"), 6));
}; };
private static <T extends WritableComparable<?>> Path writePartitionFile( private static final ArrayList<Check<String>> testJavaStrings =
new ArrayList<Check<String>>();
static {
testJavaStrings.add(new Check<String>(new String("aaaaa"), 0));
testJavaStrings.add(new Check<String>(new String("aaabb"), 0));
testJavaStrings.add(new Check<String>(new String("aabbb"), 1));
testJavaStrings.add(new Check<String>(new String("aaaaa"), 0));
testJavaStrings.add(new Check<String>(new String("babbb"), 2));
testJavaStrings.add(new Check<String>(new String("baabb"), 1));
testJavaStrings.add(new Check<String>(new String("yai"), 8));
testJavaStrings.add(new Check<String>(new String("yak"), 9));
testJavaStrings.add(new Check<String>(new String("z"), 9));
testJavaStrings.add(new Check<String>(new String("ddngo"), 5));
testJavaStrings.add(new Check<String>(new String("hi"), 6));
};
private static <T> Path writePartitionFile(
String testname, Configuration conf, T[] splits) throws IOException { String testname, Configuration conf, T[] splits) throws IOException {
final FileSystem fs = FileSystem.getLocal(conf); final FileSystem fs = FileSystem.getLocal(conf);
final Path testdir = new Path(System.getProperty("test.build.data", "/tmp") final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
).makeQualified(fs); ).makeQualified(
fs.getUri(),
fs.getWorkingDirectory());
Path p = new Path(testdir, testname + "/_partition.lst"); Path p = new Path(testdir, testname + "/_partition.lst");
TotalOrderPartitioner.setPartitionFile(conf, p); TotalOrderPartitioner.setPartitionFile(conf, p);
conf.setInt(MRJobConfig.NUM_REDUCES, splits.length + 1); conf.setInt(MRJobConfig.NUM_REDUCES, splits.length + 1);
SequenceFile.Writer w = null; SequenceFile.Writer w = null;
try { try {
w = SequenceFile.createWriter(fs, conf, p, w = SequenceFile.createWriter(
splits[0].getClass(), NullWritable.class, conf,
SequenceFile.CompressionType.NONE); SequenceFile.Writer.file(p),
SequenceFile.Writer.keyClass(splits[0].getClass()),
SequenceFile.Writer.valueClass(NullWritable.class),
SequenceFile.Writer.compression(CompressionType.NONE));
for (int i = 0; i < splits.length; ++i) { for (int i = 0; i < splits.length; ++i) {
w.append(splits[i], NullWritable.get()); w.append(splits[i], NullWritable.get());
} }
@ -99,6 +140,31 @@ private static <T extends WritableComparable<?>> Path writePartitionFile(
return p; return p;
} }
public void testTotalOrderWithCustomSerialization() throws Exception {
TotalOrderPartitioner<String, NullWritable> partitioner =
new TotalOrderPartitioner<String, NullWritable>();
Configuration conf = new Configuration();
conf.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
JavaSerialization.class.getName(),
WritableSerialization.class.getName());
conf.setClass(MRJobConfig.KEY_COMPARATOR,
JavaSerializationComparator.class,
Comparator.class);
Path p = TestTotalOrderPartitioner.<String>writePartitionFile(
"totalordercustomserialization", conf, splitJavaStrings);
conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, String.class, Object.class);
try {
partitioner.setConf(conf);
NullWritable nw = NullWritable.get();
for (Check<String> chk : testJavaStrings) {
assertEquals(chk.data.toString(), chk.part,
partitioner.getPartition(chk.data, nw, splitJavaStrings.length + 1));
}
} finally {
p.getFileSystem(conf).delete(p, true);
}
}
public void testTotalOrderMemCmp() throws Exception { public void testTotalOrderMemCmp() throws Exception {
TotalOrderPartitioner<Text,NullWritable> partitioner = TotalOrderPartitioner<Text,NullWritable> partitioner =
new TotalOrderPartitioner<Text,NullWritable>(); new TotalOrderPartitioner<Text,NullWritable>();