HADOOP-6120. Add support for Avro specific and reflect data. Contributed by sharad.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@797197 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Doug Cutting 2009-07-23 19:21:21 +00:00
parent 3200b2ec58
commit 8296413d49
16 changed files with 597 additions and 49 deletions

View File

@ -152,6 +152,9 @@ Trunk (unreleased changes)
HADOOP-5976. Add a new command, classpath, to the hadoop script. (Owen
O'Malley and Gary Murry via szetszwo)
HADOOP-6120. Add support for Avro specific and reflect data.
(sharad via cutting)
IMPROVEMENTS
HADOOP-4565. Added CombineFileInputFormat to use data locality information

View File

@ -417,10 +417,21 @@
</recordcc>
</target>
<target name="generate-avro-records" depends="init, ivy-retrieve-test">
<taskdef name="schema" classname="org.apache.avro.specific.SchemaTask">
<classpath refid="test.core.classpath"/>
</taskdef>
<schema destdir="${test.generated.dir}">
<fileset dir="${test.src.dir}">
<include name="**/*.avsc" />
</fileset>
</schema>
</target>
<!-- ================================================================== -->
<!-- Compile test code -->
<!-- ================================================================== -->
<target name="compile-core-test" depends="compile-core-classes, ivy-retrieve-test, generate-test-records">
<target name="compile-core-test" depends="compile-core-classes, ivy-retrieve-test, generate-test-records, generate-avro-records">
<mkdir dir="${test.core.build.classes}"/>
<javac
encoding="${build.encoding}"

12
ivy.xml
View File

@ -269,6 +269,18 @@
rev="${slf4j-log4j12.version}"
conf="common->master">
</dependency>
<dependency org="org.apache.hadoop"
name="avro"
rev="1.0.0"
conf="common->default"/>
<dependency org="org.codehaus.jackson"
name="jackson-mapper-asl"
rev="1.0.1"
conf="common->default"/>
<dependency org="com.thoughtworks.paranamer"
name="paranamer"
rev="1.5"
conf="common->default"/>
</dependencies>
</ivy-module>

View File

@ -74,7 +74,7 @@
rather than look for them online.
-->
<module organisation="org.apache.hadoop" name=".*" resolver="internal"/>
<module organisation="org.apache.hadoop" name="Hadoop.*" resolver="internal"/>
<!--until commons cli is external, we need to pull it in from the snapshot repository -if present -->
<module organisation="org.apache.commons" name=".*" resolver="external-and-snapshots"/>
</modules>

View File

@ -85,7 +85,7 @@
<property>
<name>io.serializations</name>
<value>org.apache.hadoop.io.serializer.WritableSerialization</value>
<value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization</value>
<description>A list of serialization classes that can be used for
obtaining serializers and deserializers.</description>
</property>

View File

@ -25,6 +25,8 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.serializer.avro.AvroReflectSerialization;
import org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@ -50,7 +52,9 @@ public class SerializationFactory extends Configured {
public SerializationFactory(Configuration conf) {
super(conf);
for (String serializerName : conf.getStrings("io.serializations",
new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"})) {
new String[]{WritableSerialization.class.getName(),
AvroSpecificSerialization.class.getName(),
AvroReflectSerialization.class.getName()})) {
add(conf, serializerName);
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.serializer.avro;
/**
* Tag interface for Avro 'reflect' serializable classes. Classes implementing
* this interface can be serialized/deserialized using
* {@link AvroReflectSerialization}.
*/
public interface AvroReflectSerializable {
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.serializer.avro;
import java.util.HashSet;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
/**
* Serialization for Avro Reflect classes. For a class to be accepted by this
* serialization, it must either be in the package list configured via
* {@link AvroReflectSerialization#AVRO_REFLECT_PACKAGES} or implement
* {@link AvroReflectSerializable} interface.
*
*/
@SuppressWarnings("unchecked")
public class AvroReflectSerialization extends AvroSerialization<Object>{
/**
* Key to configure packages that contain classes to be serialized and
* deserialized using this class. Multiple packages can be specified using
* comma-separated list.
*/
public static final String AVRO_REFLECT_PACKAGES = "avro.reflect.pkgs";
private Set<String> packages;
public synchronized boolean accept(Class<?> c) {
if (packages == null) {
getPackages();
}
return AvroReflectSerializable.class.isAssignableFrom(c) ||
packages.contains(c.getPackage().getName());
}
private void getPackages() {
String[] pkgList = getConf().getStrings(AVRO_REFLECT_PACKAGES);
packages = new HashSet<String>();
if (pkgList != null) {
for (String pkg : pkgList) {
packages.add(pkg.trim());
}
}
}
protected DatumReader getReader(Class<Object> clazz) {
try {
String prefix =
((clazz.getEnclosingClass() == null
|| "null".equals(clazz.getEnclosingClass().getName())) ?
clazz.getPackage().getName() + "."
: (clazz.getEnclosingClass().getName() + "$"));
return new ReflectDatumReader(ReflectData.getSchema(clazz), prefix);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
protected Schema getSchema(Object t) {
return ReflectData.getSchema(t.getClass());
}
protected DatumWriter getWriter(Class<Object> clazz) {
return new ReflectDatumWriter();
}
}

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.serializer.avro;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.Serializer;
/**
* Base class for providing serialization to Avro types.
*/
public abstract class AvroSerialization<T> extends Configured
implements Serialization<T>{
public Deserializer<T> getDeserializer(Class<T> c) {
return new AvroDeserializer(c);
}
public Serializer<T> getSerializer(Class<T> c) {
return new AvroSerializer(c);
}
/**
* Return an Avro Schema instance for the given class.
*/
protected abstract Schema getSchema(T t);
/**
* Create and return Avro DatumWriter for the given class.
*/
protected abstract DatumWriter<T> getWriter(Class<T> clazz);
/**
* Create and return Avro DatumReader for the given class.
*/
protected abstract DatumReader<T> getReader(Class<T> clazz);
class AvroSerializer implements Serializer<T> {
private DatumWriter<T> writer;
private BinaryEncoder encoder;
private OutputStream outStream;
protected Class<T> clazz;
AvroSerializer(Class<T> clazz) {
writer = getWriter(clazz);
}
public void close() throws IOException {
encoder.flush();
outStream.close();
}
public void open(OutputStream out) throws IOException {
outStream = out;
encoder = new BinaryEncoder(out);
}
public void serialize(T t) throws IOException {
writer.setSchema(getSchema(t));
writer.write(t, encoder);
}
}
class AvroDeserializer implements Deserializer<T> {
private DatumReader<T> reader;
private BinaryDecoder decoder;
private InputStream inStream;
AvroDeserializer(Class<T> clazz) {
this.reader = getReader(clazz);
}
public void close() throws IOException {
inStream.close();
}
public T deserialize(T t) throws IOException {
return reader.read(t, decoder);
}
public void open(InputStream in) throws IOException {
inStream = in;
decoder = new BinaryDecoder(in);
}
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.serializer.avro;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
/**
* Serialization for Avro Specific classes. This serialization is to be used
* for classes generated by Avro's 'specific' compiler.
*/
@SuppressWarnings("unchecked")
public class AvroSpecificSerialization
extends AvroSerialization<SpecificRecord>{
public boolean accept(Class<?> c) {
return SpecificRecord.class.isAssignableFrom(c);
}
protected DatumReader getReader(Class<SpecificRecord> clazz) {
try {
return new SpecificDatumReader(clazz.newInstance().schema());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
protected Schema getSchema(SpecificRecord t) {
return t.schema();
}
protected DatumWriter getWriter(Class<SpecificRecord> clazz) {
return new SpecificDatumWriter();
}
}

View File

@ -0,0 +1,43 @@
<html>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<body>
<p>
This package provides Avro serialization in Hadoop. This can be used to
serialize/deserialize Avro types in Hadoop.
</p>
<p>
Use {@link org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization} for
serialization of classes generated by Avro's 'specific' compiler.
</p>
<p>
Use {@link org.apache.hadoop.io.serializer.avro.AvroReflectSerialization} for
other classes.
{@link org.apache.hadoop.io.serializer.avro.AvroReflectSerialization} work for
any class which is either in the package list configured via
{@link org.apache.hadoop.io.serializer.avro.AvroReflectSerialization#AVRO_REFLECT_PACKAGES}
or implement {@link org.apache.hadoop.io.serializer.avro.AvroReflectSerializable}
interface.
</p>
</body>
</html>

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.serializer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.util.GenericsUtil;
public class SerializationTestUtil {
/**
* A utility that tests serialization/deserialization.
* @param <K> the class of the item
* @param conf configuration to use, "io.serializations" is read to
* determine the serialization
* @param before item to (de)serialize
* @return deserialized item
*/
public static<K> K testSerialization(Configuration conf, K before)
throws Exception {
SerializationFactory factory = new SerializationFactory(conf);
Serializer<K> serializer
= factory.getSerializer(GenericsUtil.getClass(before));
Deserializer<K> deserializer
= factory.getDeserializer(GenericsUtil.getClass(before));
DataOutputBuffer out = new DataOutputBuffer();
serializer.open(out);
serializer.serialize(before);
serializer.close();
DataInputBuffer in = new DataInputBuffer();
in.reset(out.getData(), out.getLength());
deserializer.open(in);
K after = deserializer.deserialize(null);
deserializer.close();
return after;
}
}

View File

@ -23,25 +23,18 @@
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.TestGenericWritable.Baz;
import org.apache.hadoop.io.TestGenericWritable.FooGenericWritable;
import org.apache.hadoop.util.GenericsUtil;
public class TestWritableSerialization extends TestCase {
private static final Configuration conf = new Configuration();
static {
conf.set("io.serializations"
, "org.apache.hadoop.io.serializer.WritableSerialization");
}
public void testWritableSerialization() throws Exception {
Text before = new Text("test writable");
testSerialization(conf, before);
Text after = SerializationTestUtil.testSerialization(conf, before);
assertEquals(before, after);
}
@ -56,40 +49,8 @@ public void testWritableConfigurable() throws Exception {
generic.setConf(conf);
Baz baz = new Baz();
generic.set(baz);
Baz result = testSerialization(conf, baz);
Baz result = SerializationTestUtil.testSerialization(conf, baz);
assertEquals(baz, result);
assertNotNull(result.getConf());
}
/**
* A utility that tests serialization/deserialization.
* @param <K> the class of the item
* @param conf configuration to use, "io.serializations" is read to
* determine the serialization
* @param before item to (de)serialize
* @return deserialized item
*/
public static<K> K testSerialization(Configuration conf, K before)
throws Exception {
SerializationFactory factory = new SerializationFactory(conf);
Serializer<K> serializer
= factory.getSerializer(GenericsUtil.getClass(before));
Deserializer<K> deserializer
= factory.getDeserializer(GenericsUtil.getClass(before));
DataOutputBuffer out = new DataOutputBuffer();
serializer.open(out);
serializer.serialize(before);
serializer.close();
DataInputBuffer in = new DataInputBuffer();
in.reset(out.getData(), out.getLength());
deserializer.open(in);
K after = deserializer.deserialize(null);
deserializer.close();
assertEquals(before, after);
return after;
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.serializer.avro;
public class Record {
public int x = 7;
public int hashCode() {
return x;
}
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
final Record other = (Record) obj;
if (x != other.x)
return false;
return true;
}
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.serializer.avro;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.serializer.SerializationTestUtil;
public class TestAvroSerialization extends TestCase {
private static final Configuration conf = new Configuration();
public void testSpecific() throws Exception {
AvroRecord before = new AvroRecord();
before.intField = 5;
AvroRecord after = SerializationTestUtil.testSerialization(conf, before);
assertEquals(before, after);
}
public void testReflectPkg() throws Exception {
Record before = new Record();
before.x = 10;
conf.set(AvroReflectSerialization.AVRO_REFLECT_PACKAGES,
before.getClass().getPackage().getName());
Record after = SerializationTestUtil.testSerialization(conf, before);
assertEquals(before, after);
}
public void testReflectInnerClass() throws Exception {
InnerRecord before = new InnerRecord();
before.x = 10;
conf.set(AvroReflectSerialization.AVRO_REFLECT_PACKAGES,
before.getClass().getPackage().getName());
InnerRecord after = SerializationTestUtil.testSerialization(conf, before);
assertEquals(before, after);
}
public void testReflect() throws Exception {
RefSerializable before = new RefSerializable();
before.x = 10;
RefSerializable after =
SerializationTestUtil.testSerialization(conf, before);
assertEquals(before, after);
}
public static class InnerRecord {
public int x = 7;
public int hashCode() {
return x;
}
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
final InnerRecord other = (InnerRecord) obj;
if (x != other.x)
return false;
return true;
}
}
public static class RefSerializable implements AvroReflectSerializable {
public int x = 7;
public int hashCode() {
return x;
}
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
final RefSerializable other = (RefSerializable) obj;
if (x != other.x)
return false;
return true;
}
}
}

View File

@ -0,0 +1,23 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{"type": "record", "name":"AvroRecord",
"namespace": "org.apache.hadoop.io.serializer.avro",
"fields": [
{"name": "intField", "type": "int"}
]
}