HADOOP-18215. Enhance WritableName to be able to return aliases for classes that use serializers (#4215)
This commit is contained in:
parent
8c9c68c19e
commit
aa6c51364a
@ -92,7 +92,7 @@ public static synchronized Class<?> getClass(String name, Configuration conf
|
|||||||
) throws IOException {
|
) throws IOException {
|
||||||
Class<?> writableClass = NAME_TO_CLASS.get(name);
|
Class<?> writableClass = NAME_TO_CLASS.get(name);
|
||||||
if (writableClass != null)
|
if (writableClass != null)
|
||||||
return writableClass.asSubclass(Writable.class);
|
return writableClass;
|
||||||
try {
|
try {
|
||||||
return conf.getClassByName(name);
|
return conf.getClassByName(name);
|
||||||
} catch (ClassNotFoundException e) {
|
} catch (ClassNotFoundException e) {
|
||||||
|
@ -26,6 +26,9 @@
|
|||||||
import org.apache.hadoop.io.SequenceFile.Metadata;
|
import org.apache.hadoop.io.SequenceFile.Metadata;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
import org.apache.hadoop.io.compress.DefaultCodec;
|
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||||
|
import org.apache.hadoop.io.serializer.Deserializer;
|
||||||
|
import org.apache.hadoop.io.serializer.Serialization;
|
||||||
|
import org.apache.hadoop.io.serializer.Serializer;
|
||||||
import org.apache.hadoop.io.serializer.avro.AvroReflectSerialization;
|
import org.apache.hadoop.io.serializer.avro.AvroReflectSerialization;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
@ -750,6 +753,122 @@ public void testSequenceFileWriter() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerializationUsingWritableNameAlias() throws IOException {
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
config.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, SimpleSerializer.class.getName());
|
||||||
|
Path path = new Path(System.getProperty("test.build.data", "."),
|
||||||
|
"SerializationUsingWritableNameAlias");
|
||||||
|
|
||||||
|
// write with the original serializable class
|
||||||
|
SequenceFile.Writer writer = SequenceFile.createWriter(
|
||||||
|
config,
|
||||||
|
SequenceFile.Writer.file(path),
|
||||||
|
SequenceFile.Writer.keyClass(SimpleSerializable.class),
|
||||||
|
SequenceFile.Writer.valueClass(SimpleSerializable.class));
|
||||||
|
|
||||||
|
int max = 10;
|
||||||
|
try {
|
||||||
|
SimpleSerializable val = new SimpleSerializable();
|
||||||
|
val.setId(-1);
|
||||||
|
for (int i = 0; i < max; i++) {
|
||||||
|
SimpleSerializable key = new SimpleSerializable();
|
||||||
|
key.setId(i);
|
||||||
|
writer.append(key, val);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// override name so it gets forced to the new serializable
|
||||||
|
WritableName.setName(AnotherSimpleSerializable.class, SimpleSerializable.class.getName());
|
||||||
|
|
||||||
|
// read and expect our new serializable, and all the correct values read
|
||||||
|
SequenceFile.Reader reader = new SequenceFile.Reader(
|
||||||
|
config,
|
||||||
|
SequenceFile.Reader.file(path));
|
||||||
|
|
||||||
|
AnotherSimpleSerializable key = new AnotherSimpleSerializable();
|
||||||
|
int count = 0;
|
||||||
|
while (true) {
|
||||||
|
key = (AnotherSimpleSerializable) reader.next(key);
|
||||||
|
if (key == null) {
|
||||||
|
// make sure we exhausted all the ints we wrote
|
||||||
|
assertEquals(count, max);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
assertEquals(count++, key.getId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class SimpleSerializable implements Serializable {
|
||||||
|
|
||||||
|
private int id;
|
||||||
|
|
||||||
|
public int getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setId(int id) {
|
||||||
|
this.id = id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class AnotherSimpleSerializable extends SimpleSerializable {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class SimpleSerializer implements Serialization<SimpleSerializable> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean accept(Class<?> c) {
|
||||||
|
return SimpleSerializable.class.isAssignableFrom(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Serializer<SimpleSerializable> getSerializer(Class<SimpleSerializable> c) {
|
||||||
|
return new Serializer<SimpleSerializable>() {
|
||||||
|
private DataOutputStream out;
|
||||||
|
@Override
|
||||||
|
public void open(OutputStream out) throws IOException {
|
||||||
|
this.out = new DataOutputStream(out);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serialize(SimpleSerializable simpleSerializable) throws IOException {
|
||||||
|
out.writeInt(simpleSerializable.getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Deserializer<SimpleSerializable> getDeserializer(Class<SimpleSerializable> c) {
|
||||||
|
return new Deserializer<SimpleSerializable>() {
|
||||||
|
private DataInputStream dis;
|
||||||
|
@Override
|
||||||
|
public void open(InputStream in) throws IOException {
|
||||||
|
dis = new DataInputStream(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SimpleSerializable deserialize(SimpleSerializable simpleSerializable)
|
||||||
|
throws IOException {
|
||||||
|
simpleSerializable.setId(dis.readInt());
|
||||||
|
return simpleSerializable;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
dis.close();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** For debugging and testing. */
|
/** For debugging and testing. */
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
int count = 1024 * 1024;
|
int count = 1024 * 1024;
|
||||||
|
@ -24,8 +24,14 @@
|
|||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.io.serializer.Deserializer;
|
||||||
|
import org.apache.hadoop.io.serializer.Serialization;
|
||||||
|
import org.apache.hadoop.io.serializer.SerializationFactory;
|
||||||
|
import org.apache.hadoop.io.serializer.Serializer;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/** Unit tests for WritableName. */
|
/** Unit tests for WritableName. */
|
||||||
@ -63,6 +69,28 @@ public boolean equals(Object o) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class SimpleSerializable {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class SimpleSerializer implements Serialization<SimpleSerializable> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean accept(Class<?> c) {
|
||||||
|
return c.equals(SimpleSerializable.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Serializer<SimpleSerializable> getSerializer(Class<SimpleSerializable> c) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Deserializer<SimpleSerializable> getDeserializer(Class<SimpleSerializable> c) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static final String testName = "mystring";
|
private static final String testName = "mystring";
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -95,7 +123,27 @@ public void testAddName() throws Exception {
|
|||||||
// check original name still works
|
// check original name still works
|
||||||
test = WritableName.getClass(testName, conf);
|
test = WritableName.getClass(testName, conf);
|
||||||
assertTrue(test.equals(SimpleWritable.class));
|
assertTrue(test.equals(SimpleWritable.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddNameSerializable() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, SimpleSerializer.class.getName());
|
||||||
|
SerializationFactory serializationFactory =
|
||||||
|
new SerializationFactory(conf);
|
||||||
|
|
||||||
|
String altName = testName + ".alt";
|
||||||
|
|
||||||
|
WritableName.addName(SimpleSerializable.class, altName);
|
||||||
|
|
||||||
|
Class<?> test = WritableName.getClass(altName, conf);
|
||||||
|
assertEquals(test, SimpleSerializable.class);
|
||||||
|
assertNotNull(serializationFactory.getSerialization(test));
|
||||||
|
|
||||||
|
// check original name still works
|
||||||
|
test = WritableName.getClass(SimpleSerializable.class.getName(), conf);
|
||||||
|
assertEquals(test, SimpleSerializable.class);
|
||||||
|
assertNotNull(serializationFactory.getSerialization(test));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
Reference in New Issue
Block a user