From 7d637a3a9932bfe06c59107dc0914d9383b0f93c Mon Sep 17 00:00:00 2001 From: Sanford Ryza Date: Fri, 18 Oct 2013 20:43:53 +0000 Subject: [PATCH] MAPREDUCE-5457. Add a KeyOnlyTextOutputReader to enable streaming to write out text files without separators (Sandy Ryza) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1533624 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../streaming/io/IdentifierResolver.java | 7 ++ .../streaming/io/KeyOnlyTextInputWriter.java | 35 ++++++++ .../streaming/io/KeyOnlyTextOutputReader.java | 90 +++++++++++++++++++ .../hadoop/streaming/io/TextInputWriter.java | 4 +- .../TestStreamingOutputOnlyKeys.java | 51 +++++++++++ .../io/TestKeyOnlyTextOutputReader.java | 68 ++++++++++++++ 7 files changed, 256 insertions(+), 2 deletions(-) create mode 100644 hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/KeyOnlyTextInputWriter.java create mode 100644 hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/KeyOnlyTextOutputReader.java create mode 100644 hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/TestStreamingOutputOnlyKeys.java create mode 100644 hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/io/TestKeyOnlyTextOutputReader.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1b5503b16f..da543dddc5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -209,6 +209,9 @@ Release 2.2.1 - UNRELEASED MAPREDUCE-5463. Deprecate SLOTS_MILLIS counters (Tzuyoshi Ozawa via Sandy Ryza) + MAPREDUCE-5457. Add a KeyOnlyTextOutputReader to enable streaming to write + out text files without separators (Sandy Ryza) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/IdentifierResolver.java b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/IdentifierResolver.java index 6c331bdf87..b0cd5b4fdb 100644 --- a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/IdentifierResolver.java +++ b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/IdentifierResolver.java @@ -19,6 +19,7 @@ package org.apache.hadoop.streaming.io; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.typedbytes.TypedBytesWritable; @@ -34,6 +35,7 @@ public class IdentifierResolver { public static final String TEXT_ID = "text"; public static final String RAW_BYTES_ID = "rawbytes"; public static final String TYPED_BYTES_ID = "typedbytes"; + public static final String KEY_ONLY_TEXT_ID = "keyonlytext"; private Class inputWriterClass = null; private Class outputReaderClass = null; @@ -55,6 +57,11 @@ public void resolve(String identifier) { setOutputReaderClass(TypedBytesOutputReader.class); setOutputKeyClass(TypedBytesWritable.class); setOutputValueClass(TypedBytesWritable.class); + } else if (identifier.equalsIgnoreCase(KEY_ONLY_TEXT_ID)) { + setInputWriterClass(KeyOnlyTextInputWriter.class); + setOutputReaderClass(KeyOnlyTextOutputReader.class); + setOutputKeyClass(Text.class); + setOutputValueClass(NullWritable.class); } else { // assume TEXT_ID setInputWriterClass(TextInputWriter.class); setOutputReaderClass(TextOutputReader.class); diff --git a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/KeyOnlyTextInputWriter.java b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/KeyOnlyTextInputWriter.java new file mode 100644 index 0000000000..366eff72f3 --- /dev/null +++ b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/KeyOnlyTextInputWriter.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming.io; + +import java.io.IOException; + + +public class KeyOnlyTextInputWriter extends TextInputWriter { + + @Override + public void writeKey(Object key) throws IOException { + writeUTF8(key); + clientOut.write('\n'); + } + + @Override + public void writeValue(Object value) throws IOException {} + +} diff --git a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/KeyOnlyTextOutputReader.java b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/KeyOnlyTextOutputReader.java new file mode 100644 index 0000000000..32bba397ce --- /dev/null +++ b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/KeyOnlyTextOutputReader.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming.io; + +import java.io.DataInput; +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.streaming.PipeMapRed; +import org.apache.hadoop.util.LineReader; + +/** + * OutputReader that reads the client's output as text, interpreting each line + * as a key and outputting NullWritables for values. + */ +public class KeyOnlyTextOutputReader extends OutputReader { + + private LineReader lineReader; + private byte[] bytes; + private DataInput clientIn; + private Configuration conf; + private Text key; + private Text line; + + @Override + public void initialize(PipeMapRed pipeMapRed) throws IOException { + super.initialize(pipeMapRed); + clientIn = pipeMapRed.getClientInput(); + conf = pipeMapRed.getConfiguration(); + lineReader = new LineReader((InputStream)clientIn, conf); + key = new Text(); + line = new Text(); + } + + @Override + public boolean readKeyValue() throws IOException { + if (lineReader.readLine(line) <= 0) { + return false; + } + bytes = line.getBytes(); + key.set(bytes, 0, line.getLength()); + + line.clear(); + return true; + } + + @Override + public Text getCurrentKey() throws IOException { + return key; + } + + @Override + public NullWritable getCurrentValue() throws IOException { + return NullWritable.get(); + } + + @Override + public String getLastOutput() { + if (bytes != null) { + try { + return new String(bytes, "UTF-8"); + } catch (UnsupportedEncodingException e) { + return ""; + } + } else { + return null; + } + } + +} diff --git a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/TextInputWriter.java b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/TextInputWriter.java index 614c0cb0b0..6f0fd8bfa5 100644 --- a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/TextInputWriter.java +++ b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/TextInputWriter.java @@ -30,7 +30,7 @@ */ public class TextInputWriter extends InputWriter { - private DataOutput clientOut; + protected DataOutput clientOut; private byte[] inputSeparator; @Override @@ -53,7 +53,7 @@ public void writeValue(Object value) throws IOException { } // Write an object to the output stream using UTF-8 encoding - private void writeUTF8(Object object) throws IOException { + protected void writeUTF8(Object object) throws IOException { byte[] bval; int valSize; if (object instanceof BytesWritable) { diff --git a/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/TestStreamingOutputOnlyKeys.java b/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/TestStreamingOutputOnlyKeys.java new file mode 100644 index 0000000000..ee2647ea3e --- /dev/null +++ b/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/TestStreamingOutputOnlyKeys.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import java.io.IOException; + +import org.junit.Test; + +public class TestStreamingOutputOnlyKeys extends TestStreaming { + + public TestStreamingOutputOnlyKeys() throws IOException { + super(); + } + + @Test + public void testOutputOnlyKeys() throws Exception { + args.add("-jobconf"); args.add("stream.reduce.input" + + "=keyonlytext"); + args.add("-jobconf"); args.add("stream.reduce.output" + + "=keyonlytext"); + super.testCommandLine(); + } + + @Override + public String getExpectedOutput() { + return outputExpect.replaceAll("\t", ""); + } + + @Override + @Test + public void testCommandLine() { + // Do nothing + } + +} diff --git a/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/io/TestKeyOnlyTextOutputReader.java b/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/io/TestKeyOnlyTextOutputReader.java new file mode 100644 index 0000000000..09f5777cc1 --- /dev/null +++ b/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/io/TestKeyOnlyTextOutputReader.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming.io; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.streaming.PipeMapRed; +import org.apache.hadoop.streaming.PipeMapper; +import org.junit.Test; + +public class TestKeyOnlyTextOutputReader { + @Test + public void testKeyOnlyTextOutputReader() throws IOException { + String text = "key,value\nkey2,value2\nnocomma\n"; + PipeMapRed pipeMapRed = new MyPipeMapRed(text); + KeyOnlyTextOutputReader outputReader = new KeyOnlyTextOutputReader(); + outputReader.initialize(pipeMapRed); + outputReader.readKeyValue(); + Assert.assertEquals(new Text("key,value"), outputReader.getCurrentKey()); + outputReader.readKeyValue(); + Assert.assertEquals(new Text("key2,value2"), outputReader.getCurrentKey()); + outputReader.readKeyValue(); + Assert.assertEquals(new Text("nocomma"), outputReader.getCurrentKey()); + Assert.assertEquals(false, outputReader.readKeyValue()); + } + + private class MyPipeMapRed extends PipeMapper { + private DataInput clientIn; + private Configuration conf = new Configuration(); + + public MyPipeMapRed(String text) { + clientIn = new DataInputStream(new ByteArrayInputStream(text.getBytes())); + } + + @Override + public DataInput getClientInput() { + return clientIn; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + } +}