HADOOP-10686. Writables are not always configured. (Abraham Elmahrek via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1602079 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-06-12 07:23:20 +00:00
parent deb858a835
commit fc7c8f9bf2
11 changed files with 137 additions and 22 deletions

View File

@ -537,6 +537,9 @@ Release 2.5.0 - UNRELEASED
HADOOP-10622. Shell.runCommand can deadlock (Gera Shegalov via jlowe)
HADOOP-10686. Writables are not always configured.
(Abraham Elmahrek via kasha)
BREAKDOWN OF HADOOP-10514 SUBTASKS AND RELATED JIRAS
HADOOP-10520. Extended attributes definition and FileSystem APIs for

View File

@ -256,7 +256,7 @@ public Writer(Configuration conf,
} else {
keyClass=
(Class<? extends WritableComparable>) keyClassOption.getValue();
this.comparator = WritableComparator.get(keyClass);
this.comparator = WritableComparator.get(keyClass, conf);
}
this.lastKey = comparator.newKey();
FileSystem fs = dirName.getFileSystem(conf);
@ -428,12 +428,13 @@ protected synchronized void open(Path dir,
this.data = createDataFileReader(dataFile, conf, options);
this.firstPosition = data.getPosition();
if (comparator == null)
this.comparator =
WritableComparator.get(data.getKeyClass().
asSubclass(WritableComparable.class));
else
if (comparator == null) {
Class<? extends WritableComparable> cls;
cls = data.getKeyClass().asSubclass(WritableComparable.class);
this.comparator = WritableComparator.get(cls, conf);
} else {
this.comparator = comparator;
}
// open the index
SequenceFile.Reader.Option[] indexOptions =

View File

@ -2676,7 +2676,7 @@ public static class Sorter {
/** Sort and merge files containing the named classes. */
public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
Class valClass, Configuration conf) {
this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf);
}
/** Sort and merge using an arbitrary {@link RawComparator}. */

View File

@ -52,7 +52,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass,
SequenceFile.CompressionType compress)
throws IOException {
this(conf, fs, dirName, WritableComparator.get(keyClass), compress);
this(conf, fs, dirName, WritableComparator.get(keyClass, conf), compress);
}
/** Create a set naming the element comparator and compression type. */

View File

@ -24,6 +24,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
/** A Comparator for {@link WritableComparable}s.
@ -37,13 +39,21 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class WritableComparator implements RawComparator {
public class WritableComparator implements RawComparator, Configurable {
private static final ConcurrentHashMap<Class, WritableComparator> comparators
= new ConcurrentHashMap<Class, WritableComparator>(); // registry
/** Get a comparator for a {@link WritableComparable} implementation. */
private Configuration conf;
/** For backwards compatibility. **/
public static WritableComparator get(Class<? extends WritableComparable> c) {
return get(c, null);
}
/** Get a comparator for a {@link WritableComparable} implementation. */
public static WritableComparator get(
Class<? extends WritableComparable> c, Configuration conf) {
WritableComparator comparator = comparators.get(c);
if (comparator == null) {
// force the static initializers to run
@ -52,12 +62,24 @@ public static WritableComparator get(Class<? extends WritableComparable> c) {
comparator = comparators.get(c);
// if not, use the generic one
if (comparator == null) {
comparator = new WritableComparator(c, true);
comparator = new WritableComparator(c, conf, true);
}
}
// Newly passed Configuration objects should be used.
ReflectionUtils.setConf(comparator, conf);
return comparator;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
/**
* Force initialization of the static members.
* As of Java 5, referencing a class doesn't force it to initialize. Since
@ -91,12 +113,19 @@ protected WritableComparator() {
/** Construct for a {@link WritableComparable} implementation. */
protected WritableComparator(Class<? extends WritableComparable> keyClass) {
this(keyClass, false);
this(keyClass, null, false);
}
protected WritableComparator(Class<? extends WritableComparable> keyClass,
boolean createInstances) {
this(keyClass, null, createInstances);
}
protected WritableComparator(Class<? extends WritableComparable> keyClass,
Configuration conf,
boolean createInstances) {
this.keyClass = keyClass;
this.conf = (conf != null) ? conf : new Configuration();
if (createInstances) {
key1 = newKey();
key2 = newKey();
@ -112,7 +141,7 @@ protected WritableComparator(Class<? extends WritableComparable> keyClass,
/** Construct a new {@link WritableComparable} instance. */
public WritableComparable newKey() {
return ReflectionUtils.newInstance(keyClass, null);
return ReflectionUtils.newInstance(keyClass, conf);
}
/** Optimization hook. Override this to make SequenceFile.Sorter's scream.

View File

@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
@ -30,6 +31,11 @@
/** Unit tests for Writable. */
public class TestWritable extends TestCase {
private static final String TEST_CONFIG_PARAM = "frob.test";
private static final String TEST_CONFIG_VALUE = "test";
private static final String TEST_WRITABLE_CONFIG_PARAM = "test.writable";
private static final String TEST_WRITABLE_CONFIG_VALUE = TEST_CONFIG_VALUE;
public TestWritable(String name) { super(name); }
/** Example class used in test cases below. */
@ -64,6 +70,25 @@ public boolean equals(Object o) {
}
}
public static class SimpleWritableComparable extends SimpleWritable
implements WritableComparable<SimpleWritableComparable>, Configurable {
private Configuration conf;
public SimpleWritableComparable() {}
public void setConf(Configuration conf) {
this.conf = conf;
}
public Configuration getConf() {
return this.conf;
}
public int compareTo(SimpleWritableComparable o) {
return this.state - o.state;
}
}
/** Test 1: Check that SimpleWritable. */
public void testSimpleWritable() throws Exception {
testWritable(new SimpleWritable());
@ -121,9 +146,34 @@ private static class Frob implements WritableComparable<Frob> {
@Override public int compareTo(Frob o) { return 0; }
}
/** Test that comparator is defined. */
/** Test that comparator is defined and configured. */
public static void testGetComparator() throws Exception {
assert(WritableComparator.get(Frob.class) instanceof FrobComparator);
Configuration conf = new Configuration();
// Without conf.
WritableComparator frobComparator = WritableComparator.get(Frob.class);
assert(frobComparator instanceof FrobComparator);
assertNotNull(frobComparator.getConf());
assertNull(frobComparator.getConf().get(TEST_CONFIG_PARAM));
// With conf.
conf.set(TEST_CONFIG_PARAM, TEST_CONFIG_VALUE);
frobComparator = WritableComparator.get(Frob.class, conf);
assert(frobComparator instanceof FrobComparator);
assertNotNull(frobComparator.getConf());
assertEquals(conf.get(TEST_CONFIG_PARAM), TEST_CONFIG_VALUE);
// Without conf. should reuse configuration.
frobComparator = WritableComparator.get(Frob.class);
assert(frobComparator instanceof FrobComparator);
assertNotNull(frobComparator.getConf());
assertEquals(conf.get(TEST_CONFIG_PARAM), TEST_CONFIG_VALUE);
// New conf. should use new configuration.
frobComparator = WritableComparator.get(Frob.class, new Configuration());
assert(frobComparator instanceof FrobComparator);
assertNotNull(frobComparator.getConf());
assertNull(frobComparator.getConf().get(TEST_CONFIG_PARAM));
}
/**
@ -153,4 +203,17 @@ public void testShortWritableComparator() throws Exception {
.compare(writable1, writable3) == 0);
}
/**
* Test that Writable's are configured by Comparator.
*/
public void testConfigurableWritableComparator() throws Exception {
Configuration conf = new Configuration();
conf.set(TEST_WRITABLE_CONFIG_PARAM, TEST_WRITABLE_CONFIG_VALUE);
WritableComparator wc = WritableComparator.get(SimpleWritableComparable.class, conf);
SimpleWritableComparable key = ((SimpleWritableComparable)wc.newKey());
assertNotNull(wc.getConf());
assertNotNull(key.getConf());
assertEquals(key.getConf().get(TEST_WRITABLE_CONFIG_PARAM), TEST_WRITABLE_CONFIG_VALUE);
}
}

View File

@ -882,7 +882,7 @@ public RawComparator getOutputKeyComparator() {
JobContext.KEY_COMPARATOR, null, RawComparator.class);
if (theClass != null)
return ReflectionUtils.newInstance(theClass, this);
return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}
/**

View File

@ -131,7 +131,7 @@ protected WritableComparator getComparator() {
public void add(ComposableRecordReader<K,? extends V> rr) throws IOException {
kids[rr.id()] = rr;
if (null == q) {
cmp = WritableComparator.get(rr.createKey().getClass());
cmp = WritableComparator.get(rr.createKey().getClass(), conf);
q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
new Comparator<ComposableRecordReader<K,?>>() {
public int compare(ComposableRecordReader<K,?> o1,

View File

@ -22,6 +22,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
@ -38,7 +40,7 @@
@InterfaceStability.Stable
public class WrappedRecordReader<K extends WritableComparable,
U extends Writable>
implements ComposableRecordReader<K,U> {
implements ComposableRecordReader<K,U>, Configurable {
private boolean empty = false;
private RecordReader<K,U> rr;
@ -47,6 +49,7 @@ public class WrappedRecordReader<K extends WritableComparable,
private K khead; // key at the top of this RR
private U vhead; // value assoc with khead
private WritableComparator cmp;
private Configuration conf;
private ResetableIterator<U> vjoin;
@ -55,13 +58,20 @@ public class WrappedRecordReader<K extends WritableComparable,
*/
WrappedRecordReader(int id, RecordReader<K,U> rr,
Class<? extends WritableComparator> cmpcl) throws IOException {
this(id, rr, cmpcl, null);
}
WrappedRecordReader(int id, RecordReader<K,U> rr,
Class<? extends WritableComparator> cmpcl,
Configuration conf) throws IOException {
this.id = id;
this.rr = rr;
this.conf = (conf == null) ? new Configuration() : conf;
khead = rr.createKey();
vhead = rr.createValue();
try {
cmp = (null == cmpcl)
? WritableComparator.get(khead.getClass())
? WritableComparator.get(khead.getClass(), this.conf)
: cmpcl.newInstance();
} catch (InstantiationException e) {
throw (IOException)new IOException().initCause(e);
@ -207,4 +217,13 @@ public int hashCode() {
return 42;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
}

View File

@ -109,7 +109,7 @@ public void initialize(InputSplit split, TaskAttemptContext context)
}
// create priority queue
if (null == q) {
cmp = WritableComparator.get(keyclass);
cmp = WritableComparator.get(keyclass, conf);
q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
new Comparator<ComposableRecordReader<K,?>>() {
public int compare(ComposableRecordReader<K,?> o1,

View File

@ -92,7 +92,7 @@ public void initialize(InputSplit split,
keyclass = key.getClass().asSubclass(WritableComparable.class);
valueclass = value.getClass();
if (cmp == null) {
cmp = WritableComparator.get(keyclass);
cmp = WritableComparator.get(keyclass, conf);
}
}
}