diff --git a/CHANGES.txt b/CHANGES.txt index 0af1a137ec..0fb89cd257 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -19,6 +19,9 @@ Trunk (unreleased changes) HADOOP-6392. Run namenode and jobtracker on separate EC2 instances. (tomwhite) + HADOOP-6323. Add comparators to the serialization API. + (Aaron Kimball via cutting) + IMPROVEMENTS HADOOP-6283. Improve the exception messages thrown by diff --git a/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java b/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java index b44b4b1db7..fea57e7063 100644 --- a/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java +++ b/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java @@ -24,6 +24,9 @@ import java.io.ObjectOutputStream; import java.io.OutputStream; import java.io.Serializable; +import java.util.Map; + +import org.apache.hadoop.io.RawComparator; /** *

@@ -31,10 +34,10 @@ *

* @see JavaSerializationComparator */ -public class JavaSerialization implements Serialization { - +public class JavaSerialization extends SerializationBase { + static class JavaSerializationDeserializer - implements Deserializer { + extends DeserializerBase { private ObjectInputStream ois; @@ -61,11 +64,16 @@ public void close() throws IOException { } } - - static class JavaSerializationSerializer - implements Serializer { + + static class JavaSerializationSerializer + extends SerializerBase { private ObjectOutputStream oos; + private Map metadata; + + public JavaSerializationSerializer(Map metadata) { + this.metadata = metadata; + } public void open(OutputStream out) throws IOException { oos = new ObjectOutputStream(out) { @@ -75,7 +83,7 @@ public void open(OutputStream out) throws IOException { }; } - public void serialize(Serializable object) throws IOException { + public void serialize(T object) throws IOException { oos.reset(); // clear (class) back-references oos.writeObject(object); } @@ -84,18 +92,55 @@ public void close() throws IOException { oos.close(); } + @Override + public Map getMetadata() throws IOException { + return metadata; + } } - public boolean accept(Class c) { + public boolean accept(Map metadata) { + String intendedSerializer = metadata.get(SERIALIZATION_KEY); + if (intendedSerializer != null && + !getClass().getName().equals(intendedSerializer)) { + return false; + } + + Class c = getClassFromMetadata(metadata); return Serializable.class.isAssignableFrom(c); } - public Deserializer getDeserializer(Class c) { + public DeserializerBase getDeserializer( + Map metadata) { return new JavaSerializationDeserializer(); } - public Serializer getSerializer(Class c) { - return new JavaSerializationSerializer(); + public SerializerBase getSerializer( + Map metadata) { + return new JavaSerializationSerializer(metadata); } + @SuppressWarnings("unchecked") + @Override + public RawComparator getRawComparator( + Map metadata) { + Class klazz = getClassFromMetadata(metadata); + if (null == klazz) { + throw new IllegalArgumentException( + "Cannot get comparator without " + SerializationBase.CLASS_KEY + + " set in metadata"); + } + + if (Serializable.class.isAssignableFrom(klazz)) { + try { + return (RawComparator) new JavaSerializationComparator(); + } catch (IOException ioe) { + throw new IllegalArgumentException( + "Could not instantiate JavaSerializationComparator for type " + + klazz.getName(), ioe); + } + } else { + throw new IllegalArgumentException("Class " + klazz.getName() + + " is incompatible with JavaSerialization"); + } + } } diff --git a/src/java/org/apache/hadoop/io/serializer/LegacySerialization.java b/src/java/org/apache/hadoop/io/serializer/LegacySerialization.java index d5f0f9a836..e97a673a32 100644 --- a/src/java/org/apache/hadoop/io/serializer/LegacySerialization.java +++ b/src/java/org/apache/hadoop/io/serializer/LegacySerialization.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.RawComparator; /** *

@@ -82,4 +83,14 @@ public DeserializerBase getDeserializer(Map metadata) { return new LegacyDeserializer(getDeserializer(c)); } + @Override + public RawComparator getRawComparator(Map metadata) { + // Since this method is being added to an API meant to provide legacy + // compatability with deprecated serializers, leaving this as an incomplete + // stub. + + throw new UnsupportedOperationException( + "LegacySerialization does not provide raw comparators"); + } + } diff --git a/src/java/org/apache/hadoop/io/serializer/SerializationBase.java b/src/java/org/apache/hadoop/io/serializer/SerializationBase.java index 01df3ef856..7934b089fe 100644 --- a/src/java/org/apache/hadoop/io/serializer/SerializationBase.java +++ b/src/java/org/apache/hadoop/io/serializer/SerializationBase.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.RawComparator; /** *

@@ -88,4 +89,16 @@ protected Class getClassFromMetadata(Map metadata) { throw new IllegalArgumentException(e); } } + + /** Provide a raw comparator for the specified serializable class. + * Requires a serialization-specific metadata entry to name the class + * to compare (e.g., "Serialized-Class" for JavaSerialization and + * WritableSerialization). + * @param metadata a set of string mappings providing serialization-specific + * arguments that parameterize the data being serialized/compared. + * @return a {@link RawComparator} for the given metadata. + * @throws UnsupportedOperationException if it cannot instantiate a RawComparator + * for this given metadata. + */ + public abstract RawComparator getRawComparator(Map metadata); } diff --git a/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java b/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java index 04211a185c..7c0dec1af1 100644 --- a/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java +++ b/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java @@ -26,8 +26,12 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; /** * A {@link SerializationBase} for {@link Writable}s that delegates to @@ -35,7 +39,6 @@ * {@link Writable#readFields(java.io.DataInput)}. */ public class WritableSerialization extends SerializationBase { - static class WritableDeserializer extends DeserializerBase { private Class writableClass; @@ -79,9 +82,25 @@ static class WritableSerializer extends SerializerBase { private Map metadata; private DataOutputStream dataOut; + private Class serializedClass; - public WritableSerializer(Map metadata) { + public WritableSerializer(Configuration conf, + Map metadata) { this.metadata = metadata; + + // If this metadata specifies a serialized class, memoize the + // class object for this. + String className = this.metadata.get(CLASS_KEY); + if (null != className) { + try { + this.serializedClass = conf.getClassByName(className); + } catch (ClassNotFoundException cnfe) { + throw new RuntimeException(cnfe); + } + } else { + throw new UnsupportedOperationException("the " + + CLASS_KEY + " metadata is missing, but is required."); + } } @Override @@ -95,6 +114,10 @@ public void open(OutputStream out) { @Override public void serialize(Writable w) throws IOException { + if (serializedClass != w.getClass()) { + throw new IOException("Type mismatch in serialization: expected " + + serializedClass + "; received " + w.getClass()); + } w.write(dataOut); } @@ -112,8 +135,10 @@ public Map getMetadata() throws IOException { @Override public boolean accept(Map metadata) { - if (getClass().getName().equals(metadata.get(SERIALIZATION_KEY))) { - return true; + String intendedSerializer = metadata.get(SERIALIZATION_KEY); + if (intendedSerializer != null && + !getClass().getName().equals(intendedSerializer)) { + return false; } Class c = getClassFromMetadata(metadata); return c == null ? false : Writable.class.isAssignableFrom(c); @@ -121,7 +146,7 @@ public boolean accept(Map metadata) { @Override public SerializerBase getSerializer(Map metadata) { - return new WritableSerializer(metadata); + return new WritableSerializer(getConf(), metadata); } @Override @@ -130,4 +155,17 @@ public DeserializerBase getDeserializer(Map metadata) return new WritableDeserializer(getConf(), c); } + @Override + @SuppressWarnings("unchecked") + public RawComparator getRawComparator(Map metadata) { + Class klazz = getClassFromMetadata(metadata); + if (null == klazz) { + throw new IllegalArgumentException( + "Cannot get comparator without " + SerializationBase.CLASS_KEY + + " set in metadata"); + } + + return (RawComparator) WritableComparator.get( + (Class)klazz); + } } diff --git a/src/java/org/apache/hadoop/io/serializer/avro/AvroComparator.java b/src/java/org/apache/hadoop/io/serializer/avro/AvroComparator.java new file mode 100644 index 0000000000..2f499d067f --- /dev/null +++ b/src/java/org/apache/hadoop/io/serializer/avro/AvroComparator.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.serializer.avro; + +import org.apache.avro.Schema; +import org.apache.avro.io.BinaryData; +import org.apache.hadoop.io.RawComparator; + +/** + *

+ * A {@link RawComparator} that uses Avro to extract data from the + * source stream and compare their contents without explicit + * deserialization. + */ +public class AvroComparator> + implements RawComparator { + + private final Schema schema; + + public AvroComparator(final Schema s) { + this.schema = s; + } + + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + return BinaryData.compare(b1, s1, b2, s2, schema); + } + + public int compare(T t1, T t2) { + return t1.compareTo(t2); + } + +} diff --git a/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java b/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java index 52f25aec6a..464b5fd88d 100644 --- a/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java +++ b/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java @@ -50,9 +50,8 @@ protected DatumReader getReader(Map metadata) { } @Override - protected Schema getSchema(Object t, Map metadata) { - String jsonSchema = metadata.get(AVRO_SCHEMA_KEY); - return jsonSchema != null ? Schema.parse(jsonSchema) : GenericData.get().induce(t); + protected Schema getSchema(Map metadata) { + return Schema.parse(metadata.get(AVRO_SCHEMA_KEY)); } @Override diff --git a/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java b/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java index 9666f05e76..28662f880b 100644 --- a/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java +++ b/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java @@ -85,8 +85,9 @@ protected DatumReader getReader(Map metadata) { } @Override - protected Schema getSchema(Object t, Map metadata) { - return ReflectData.get().getSchema(t.getClass()); + protected Schema getSchema(Map metadata) { + Class c = getClassFromMetadata(metadata); + return ReflectData.get().getSchema(c); } @Override diff --git a/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java b/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java index fe8f45e57f..d1a649591f 100644 --- a/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java +++ b/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java @@ -28,6 +28,7 @@ import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; +import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.serializer.DeserializerBase; import org.apache.hadoop.io.serializer.SerializationBase; import org.apache.hadoop.io.serializer.SerializerBase; @@ -50,7 +51,7 @@ public SerializerBase getSerializer(Map metadata) { /** * Return an Avro Schema instance for the given class and metadata. */ - protected abstract Schema getSchema(T t, Map metadata); + protected abstract Schema getSchema(Map metadata); /** * Create and return Avro DatumWriter for the given metadata. @@ -68,10 +69,13 @@ class AvroSerializer extends SerializerBase { private DatumWriter writer; private BinaryEncoder encoder; private OutputStream outStream; + private Schema schema; AvroSerializer(Map metadata) { this.metadata = metadata; - writer = getWriter(metadata); + this.writer = getWriter(metadata); + this.schema = getSchema(this.metadata); + writer.setSchema(this.schema); } @Override @@ -88,7 +92,6 @@ public void open(OutputStream out) throws IOException { @Override public void serialize(T t) throws IOException { - writer.setSchema(getSchema(t, metadata)); writer.write(t, encoder); } @@ -127,4 +130,19 @@ public void open(InputStream in) throws IOException { } + @Override + @SuppressWarnings("unchecked") + /** + * Provides a raw comparator for Avro-encoded serialized data. + * Requires that {@link AvroSerialization#AVRO_SCHEMA_KEY} be provided + * in the metadata argument. + * @param metadata the Avro-serialization-specific parameters being + * provided that detail the schema for the data to deserialize and compare. + * @return a RawComparator parameterized for the specified Avro schema. + */ + public RawComparator getRawComparator(Map metadata) { + Schema schema = Schema.parse(metadata.get(AVRO_SCHEMA_KEY)); + return new AvroComparator(schema); + } + } diff --git a/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java b/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java index 8496b8b45b..e60ee89b93 100644 --- a/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java +++ b/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java @@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecord; @@ -55,8 +56,9 @@ protected DatumReader getReader(Map metadata) { } @Override - protected Schema getSchema(SpecificRecord t, Map metadata) { - return t.getSchema(); + protected Schema getSchema(Map metadata) { + Class c = getClassFromMetadata(metadata); + return SpecificData.get().getSchema(c); } @Override diff --git a/src/test/core/org/apache/hadoop/io/serializer/TestRawComparators.java b/src/test/core/org/apache/hadoop/io/serializer/TestRawComparators.java new file mode 100644 index 0000000000..7656ce0f33 --- /dev/null +++ b/src/test/core/org/apache/hadoop/io/serializer/TestRawComparators.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.serializer; + +import static org.apache.hadoop.io.TestGenericWritable.CONF_TEST_KEY; +import static org.apache.hadoop.io.TestGenericWritable.CONF_TEST_VALUE; +import junit.framework.TestCase; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.util.Utf8; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.serializer.avro.AvroSerialization; +import org.apache.hadoop.io.serializer.avro.AvroGenericSerialization; +import org.apache.hadoop.util.GenericsUtil; + +/** + * Test the getRawComparator API of the various serialization systems. + */ +public class TestRawComparators extends TestCase { + + private Configuration conf; + + public void setUp() { + conf = new Configuration(); + } + + /** A WritableComparable that is guaranteed to use the + * generic WritableComparator. + */ + public static class FooWritable implements WritableComparable { + + public long val; + + public FooWritable() { + this.val = 0; + } + + public FooWritable(long v) { + this.val = v; + } + + public void write(DataOutput out) throws IOException { + out.writeLong(val); + } + + public void readFields(DataInput in) throws IOException { + val = in.readLong(); + } + + public int compareTo(FooWritable other) { + return new Long(val).compareTo(other.val); + } + } + + @SuppressWarnings("unchecked") + private void runComparisonTest(Object low, Object high) throws Exception { + Map metadata = + SerializationBase.getMetadataFromClass(GenericsUtil.getClass(low)); + runComparisonTest(low, high, metadata); + } + + @SuppressWarnings("unchecked") + private void runComparisonTest(Object low, Object high, + Map metadata) throws Exception { + + DataOutputBuffer out1 = new DataOutputBuffer(); + DataOutputBuffer out2 = new DataOutputBuffer(); + DataInputBuffer in1 = new DataInputBuffer(); + DataInputBuffer in2 = new DataInputBuffer(); + + SerializationFactory factory = new SerializationFactory(conf); + + // Serialize some data to two byte streams. + SerializerBase serializer = factory.getSerializer(metadata); + assertNotNull("Serializer is null!", serializer); + + serializer.open(out1); + serializer.serialize(low); + serializer.close(); + + serializer.open(out2); + serializer.serialize(high); + serializer.close(); + + // Shift that data into an input buffer. + in1.reset(out1.getData(), out1.getLength()); + in2.reset(out2.getData(), out2.getLength()); + + // Get the serialization and then the RawComparator; + // use these to compare the data in the input streams and + // assert that the low stream (1) is less than the high stream (2). + + SerializationBase serializationBase = factory.getSerialization(metadata); + assertNotNull("Null SerializationBase!", serializationBase); + + RawComparator rawComparator = serializationBase.getRawComparator(metadata); + assertNotNull("Null raw comparator!", rawComparator); + int actual = rawComparator.compare(in1.getData(), 0, in1.getLength(), + in2.getData(), 0, in2.getLength()); + assertTrue("Did not compare FooWritable correctly", actual < 0); + } + + public void testBasicWritable() throws Exception { + // Test that a WritableComparable can be used with this API + // correctly. + + FooWritable low = new FooWritable(10); + FooWritable high = new FooWritable(42); + + runComparisonTest(low, high); + } + + public void testTextWritable() throws Exception { + // Test that a Text object (which uses Writable serialization, and + // has its own RawComparator implementation) can be used with this + // API correctly. + + Text low = new Text("aaa"); + Text high = new Text("zzz"); + + runComparisonTest(low, high); + } + + public void testAvroComparator() throws Exception { + // Test a record created via an Avro schema that doesn't have a fixed + // class associated with it. + + Schema s1 = Schema.create(Schema.Type.INT); + + // Create a metadata mapping containing an Avro schema and a request to use + // Avro generic serialization. + Map metadata = new HashMap(); + metadata.put(AvroSerialization.AVRO_SCHEMA_KEY, s1.toString()); + metadata.put(SerializationBase.SERIALIZATION_KEY, + AvroGenericSerialization.class.getName()); + + runComparisonTest(new Integer(42), new Integer(123), metadata); + + // Now test it with a string record type. + Schema s2 = Schema.create(Schema.Type.STRING); + metadata.put(AvroSerialization.AVRO_SCHEMA_KEY, s2.toString()); + runComparisonTest(new Utf8("baz"), new Utf8("meep"), metadata); + + } + +} diff --git a/src/test/core/org/apache/hadoop/io/serializer/TestWritableSerialization.java b/src/test/core/org/apache/hadoop/io/serializer/TestWritableSerialization.java index 0d7c50b42b..eb98a76108 100644 --- a/src/test/core/org/apache/hadoop/io/serializer/TestWritableSerialization.java +++ b/src/test/core/org/apache/hadoop/io/serializer/TestWritableSerialization.java @@ -22,10 +22,17 @@ import static org.apache.hadoop.io.TestGenericWritable.CONF_TEST_VALUE; import junit.framework.TestCase; +import java.io.IOException; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.TestGenericWritable.Foo; +import org.apache.hadoop.io.TestGenericWritable.Bar; import org.apache.hadoop.io.TestGenericWritable.Baz; import org.apache.hadoop.io.TestGenericWritable.FooGenericWritable; +import org.apache.hadoop.util.GenericsUtil; public class TestWritableSerialization extends TestCase { @@ -53,4 +60,56 @@ public void testWritableConfigurable() throws Exception { assertEquals(baz, result); assertNotNull(result.getConf()); } + + @SuppressWarnings("unchecked") + public void testReuseSerializer() throws IOException { + // Test that we can write multiple objects of the same type + // through the same serializer. + + DataOutputBuffer out = new DataOutputBuffer(); + SerializationFactory factory = new SerializationFactory( + new Configuration()); + + // Create a few Foo objects and serialize them. + Foo foo = new Foo(); + Foo foo2 = new Foo(); + Map metadata = SerializationBase.getMetadataFromClass( + GenericsUtil.getClass(foo)); + + SerializerBase fooSerializer = factory.getSerializer(metadata); + fooSerializer.open(out); + fooSerializer.serialize(foo); + fooSerializer.serialize(foo2); + fooSerializer.close(); + + out.reset(); + + // Create a new serializer for Bar objects + Bar bar = new Bar(); + Baz baz = new Baz(); // Baz inherits from Bar. + metadata = SerializationBase.getMetadataFromClass( + GenericsUtil.getClass(bar)); + // Check that we can serialize Bar objects. + SerializerBase barSerializer = factory.getSerializer(metadata); + barSerializer.open(out); + barSerializer.serialize(bar); // this should work. + try { + // This should not work. We should not allow subtype serialization. + barSerializer.serialize(baz); + fail("Expected IOException serializing baz via bar serializer."); + } catch (IOException ioe) { + // Expected. + } + + try { + // This should not work. Disallow unrelated type serialization. + barSerializer.serialize(foo); + fail("Expected IOException serializing foo via bar serializer."); + } catch (IOException ioe) { + // Expected. + } + + barSerializer.close(); + out.reset(); + } }