From 0777474cc2c79079be1aabfbafdee8def3553c22 Mon Sep 17 00:00:00 2001 From: Robert Joseph Evans Date: Wed, 24 Apr 2013 14:11:50 +0000 Subject: [PATCH] MAPREDUCE-5069. add concrete common implementations of CombineFileInputFormat (Sangjin Lee via bobby) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1471424 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../lib/CombineFileRecordReaderWrapper.java | 86 ++++++ .../lib/CombineSequenceFileInputFormat.java | 66 +++++ .../mapred/lib/CombineTextInputFormat.java | 68 +++++ .../input/CombineFileRecordReaderWrapper.java | 108 +++++++ .../input/CombineSequenceFileInputFormat.java | 64 +++++ .../lib/input/CombineTextInputFormat.java | 65 +++++ .../TestCombineSequenceFileInputFormat.java | 170 +++++++++++ .../mapred/TestCombineTextInputFormat.java | 250 ++++++++++++++++ .../TestCombineSequenceFileInputFormat.java | 186 ++++++++++++ .../lib/input/TestCombineTextInputFormat.java | 267 ++++++++++++++++++ 11 files changed, 1333 insertions(+) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReaderWrapper.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineSequenceFileInputFormat.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineTextInputFormat.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReaderWrapper.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineSequenceFileInputFormat.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineTextInputFormat.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineSequenceFileInputFormat.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineTextInputFormat.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 51c1113675..4842ae94c9 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -209,6 +209,9 @@ Release 2.0.5-beta - UNRELEASED MAPREDUCE-5175. Updated MR App to not set envs that will be set by NMs anyways after YARN-561. (Xuan Gong via vinodkv) + MAPREDUCE-5069. add concrete common implementations of + CombineFileInputFormat (Sangjin Lee via bobby) + OPTIMIZATIONS MAPREDUCE-4974. Optimising the LineRecordReader initialize() method diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReaderWrapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReaderWrapper.java new file mode 100644 index 0000000000..08e7b47517 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReaderWrapper.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.lib; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +/** + * A wrapper class for a record reader that handles a single file split. It + * delegates most of the methods to the wrapped instance. A concrete subclass + * needs to provide a constructor that calls this parent constructor with the + * appropriate input format. The subclass constructor must satisfy the specific + * constructor signature that is required by + * CombineFileRecordReader. + * + * Subclassing is needed to get a concrete record reader wrapper because of the + * constructor requirement. + * + * @see CombineFileRecordReader + * @see CombineFileInputFormat + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public abstract class CombineFileRecordReaderWrapper + implements RecordReader { + private final RecordReader delegate; + + protected CombineFileRecordReaderWrapper(FileInputFormat inputFormat, + CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) + throws IOException { + FileSplit fileSplit = new FileSplit(split.getPath(idx), + split.getOffset(idx), + split.getLength(idx), + split.getLocations()); + + delegate = inputFormat.getRecordReader(fileSplit, (JobConf)conf, reporter); + } + + public boolean next(K key, V value) throws IOException { + return delegate.next(key, value); + } + + public K createKey() { + return delegate.createKey(); + } + + public V createValue() { + return delegate.createValue(); + } + + public long getPos() throws IOException { + return delegate.getPos(); + } + + public void close() throws IOException { + delegate.close(); + } + + public float getProgress() throws IOException { + return delegate.getProgress(); + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineSequenceFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineSequenceFileInputFormat.java new file mode 100644 index 0000000000..3a5157e15e --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineSequenceFileInputFormat.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.lib; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; + +/** + * Input format that is a CombineFileInputFormat-equivalent for + * SequenceFileInputFormat. + * + * @see CombineFileInputFormat + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class CombineSequenceFileInputFormat + extends CombineFileInputFormat { + @SuppressWarnings({ "rawtypes", "unchecked" }) + public RecordReader getRecordReader(InputSplit split, JobConf conf, + Reporter reporter) throws IOException { + return new CombineFileRecordReader(conf, (CombineFileSplit)split, reporter, + SequenceFileRecordReaderWrapper.class); + } + + /** + * A record reader that may be passed to CombineFileRecordReader + * so that it can be used in a CombineFileInputFormat-equivalent + * for SequenceFileInputFormat. + * + * @see CombineFileRecordReader + * @see CombineFileInputFormat + * @see SequenceFileInputFormat + */ + private static class SequenceFileRecordReaderWrapper + extends CombineFileRecordReaderWrapper { + // this constructor signature is required by CombineFileRecordReader + public SequenceFileRecordReaderWrapper(CombineFileSplit split, + Configuration conf, Reporter reporter, Integer idx) throws IOException { + super(new SequenceFileInputFormat(), split, conf, reporter, idx); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineTextInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineTextInputFormat.java new file mode 100644 index 0000000000..988e6e6160 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineTextInputFormat.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.lib; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; + +/** + * Input format that is a CombineFileInputFormat-equivalent for + * TextInputFormat. + * + * @see CombineFileInputFormat + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class CombineTextInputFormat + extends CombineFileInputFormat { + @SuppressWarnings({ "rawtypes", "unchecked" }) + public RecordReader getRecordReader(InputSplit split, + JobConf conf, Reporter reporter) throws IOException { + return new CombineFileRecordReader(conf, (CombineFileSplit)split, reporter, + TextRecordReaderWrapper.class); + } + + /** + * A record reader that may be passed to CombineFileRecordReader + * so that it can be used in a CombineFileInputFormat-equivalent + * for TextInputFormat. + * + * @see CombineFileRecordReader + * @see CombineFileInputFormat + * @see TextInputFormat + */ + private static class TextRecordReaderWrapper + extends CombineFileRecordReaderWrapper { + // this constructor signature is required by CombineFileRecordReader + public TextRecordReaderWrapper(CombineFileSplit split, Configuration conf, + Reporter reporter, Integer idx) throws IOException { + super(new TextInputFormat(), split, conf, reporter, idx); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReaderWrapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReaderWrapper.java new file mode 100644 index 0000000000..bf847367b6 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReaderWrapper.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * A wrapper class for a record reader that handles a single file split. It + * delegates most of the methods to the wrapped instance. A concrete subclass + * needs to provide a constructor that calls this parent constructor with the + * appropriate input format. The subclass constructor must satisfy the specific + * constructor signature that is required by + * CombineFileRecordReader. + * + * Subclassing is needed to get a concrete record reader wrapper because of the + * constructor requirement. + * + * @see CombineFileRecordReader + * @see CombineFileInputFormat + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public abstract class CombineFileRecordReaderWrapper + extends RecordReader { + private final FileSplit fileSplit; + private final RecordReader delegate; + + protected CombineFileRecordReaderWrapper(FileInputFormat inputFormat, + CombineFileSplit split, TaskAttemptContext context, Integer idx) + throws IOException, InterruptedException { + fileSplit = new FileSplit(split.getPath(idx), + split.getOffset(idx), + split.getLength(idx), + split.getLocations()); + + delegate = inputFormat.createRecordReader(fileSplit, context); + } + + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + // it really should be the same file split at the time the wrapper instance + // was created + assert fileSplitIsValid(context); + + delegate.initialize(fileSplit, context); + } + + private boolean fileSplitIsValid(TaskAttemptContext context) { + Configuration conf = context.getConfiguration(); + long offset = conf.getLong(MRJobConfig.MAP_INPUT_START, 0L); + if (fileSplit.getStart() != offset) { + return false; + } + long length = conf.getLong(MRJobConfig.MAP_INPUT_PATH, 0L); + if (fileSplit.getLength() != length) { + return false; + } + String path = conf.get(MRJobConfig.MAP_INPUT_FILE); + if (!fileSplit.getPath().toString().equals(path)) { + return false; + } + return true; + } + + public boolean nextKeyValue() throws IOException, InterruptedException { + return delegate.nextKeyValue(); + } + + public K getCurrentKey() throws IOException, InterruptedException { + return delegate.getCurrentKey(); + } + + public V getCurrentValue() throws IOException, InterruptedException { + return delegate.getCurrentValue(); + } + + public float getProgress() throws IOException, InterruptedException { + return delegate.getProgress(); + } + + public void close() throws IOException { + delegate.close(); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineSequenceFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineSequenceFileInputFormat.java new file mode 100644 index 0000000000..368254813b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineSequenceFileInputFormat.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Input format that is a CombineFileInputFormat-equivalent for + * SequenceFileInputFormat. + * + * @see CombineFileInputFormat + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class CombineSequenceFileInputFormat + extends CombineFileInputFormat { + @SuppressWarnings({ "rawtypes", "unchecked" }) + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException { + return new CombineFileRecordReader((CombineFileSplit)split, context, + SequenceFileRecordReaderWrapper.class); + } + + /** + * A record reader that may be passed to CombineFileRecordReader + * so that it can be used in a CombineFileInputFormat-equivalent + * for SequenceFileInputFormat. + * + * @see CombineFileRecordReader + * @see CombineFileInputFormat + * @see SequenceFileInputFormat + */ + private static class SequenceFileRecordReaderWrapper + extends CombineFileRecordReaderWrapper { + // this constructor signature is required by CombineFileRecordReader + public SequenceFileRecordReaderWrapper(CombineFileSplit split, + TaskAttemptContext context, Integer idx) + throws IOException, InterruptedException { + super(new SequenceFileInputFormat(), split, context, idx); + } + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineTextInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineTextInputFormat.java new file mode 100644 index 0000000000..8087611222 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineTextInputFormat.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Input format that is a CombineFileInputFormat-equivalent for + * TextInputFormat. + * + * @see CombineFileInputFormat + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class CombineTextInputFormat + extends CombineFileInputFormat { + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException { + return new CombineFileRecordReader( + (CombineFileSplit)split, context, TextRecordReaderWrapper.class); + } + + /** + * A record reader that may be passed to CombineFileRecordReader + * so that it can be used in a CombineFileInputFormat-equivalent + * for TextInputFormat. + * + * @see CombineFileRecordReader + * @see CombineFileInputFormat + * @see TextInputFormat + */ + private static class TextRecordReaderWrapper + extends CombineFileRecordReaderWrapper { + // this constructor signature is required by CombineFileRecordReader + public TextRecordReaderWrapper(CombineFileSplit split, + TaskAttemptContext context, Integer idx) + throws IOException, InterruptedException { + super(new TextInputFormat(), split, context, idx); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineSequenceFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineSequenceFileInputFormat.java new file mode 100644 index 0000000000..f76fb22660 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineSequenceFileInputFormat.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; + +import java.io.IOException; +import java.util.BitSet; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapred.lib.CombineFileSplit; +import org.apache.hadoop.mapred.lib.CombineSequenceFileInputFormat; +import org.junit.Test; + +public class TestCombineSequenceFileInputFormat { + private static final Log LOG = + LogFactory.getLog(TestCombineSequenceFileInputFormat.class); + + private static Configuration conf = new Configuration(); + private static FileSystem localFs = null; + + static { + try { + conf.set("fs.defaultFS", "file:///"); + localFs = FileSystem.getLocal(conf); + } catch (IOException e) { + throw new RuntimeException("init failure", e); + } + } + + @SuppressWarnings("deprecation") + private static Path workDir = + new Path(new Path(System.getProperty("test.build.data", "/tmp")), + "TestCombineSequenceFileInputFormat").makeQualified(localFs); + + @Test(timeout=10000) + public void testFormat() throws Exception { + JobConf job = new JobConf(conf); + + Reporter reporter = Reporter.NULL; + + Random random = new Random(); + long seed = random.nextLong(); + LOG.info("seed = "+seed); + random.setSeed(seed); + + localFs.delete(workDir, true); + + FileInputFormat.setInputPaths(job, workDir); + + final int length = 10000; + final int numFiles = 10; + + // create a file with various lengths + createFiles(length, numFiles, random); + + // create a combine split for the files + InputFormat format = + new CombineSequenceFileInputFormat(); + IntWritable key = new IntWritable(); + BytesWritable value = new BytesWritable(); + for (int i = 0; i < 3; i++) { + int numSplits = + random.nextInt(length/(SequenceFile.SYNC_INTERVAL/20))+1; + LOG.info("splitting: requesting = " + numSplits); + InputSplit[] splits = format.getSplits(job, numSplits); + LOG.info("splitting: got = " + splits.length); + + // we should have a single split as the length is comfortably smaller than + // the block size + assertEquals("We got more than one splits!", 1, splits.length); + InputSplit split = splits[0]; + assertEquals("It should be CombineFileSplit", + CombineFileSplit.class, split.getClass()); + + // check each split + BitSet bits = new BitSet(length); + RecordReader reader = + format.getRecordReader(split, job, reporter); + try { + while (reader.next(key, value)) { + assertFalse("Key in multiple partitions.", bits.get(key.get())); + bits.set(key.get()); + } + } finally { + reader.close(); + } + assertEquals("Some keys in no partition.", length, bits.cardinality()); + } + } + + private static class Range { + private final int start; + private final int end; + + Range(int start, int end) { + this.start = start; + this.end = end; + } + + @Override + public String toString() { + return "(" + start + ", " + end + ")"; + } + } + + private static Range[] createRanges(int length, int numFiles, Random random) { + // generate a number of files with various lengths + Range[] ranges = new Range[numFiles]; + for (int i = 0; i < numFiles; i++) { + int start = i == 0 ? 0 : ranges[i-1].end; + int end = i == numFiles - 1 ? + length : + (length/numFiles)*(2*i + 1)/2 + random.nextInt(length/numFiles) + 1; + ranges[i] = new Range(start, end); + } + return ranges; + } + + private static void createFiles(int length, int numFiles, Random random) + throws IOException { + Range[] ranges = createRanges(length, numFiles, random); + + for (int i = 0; i < numFiles; i++) { + Path file = new Path(workDir, "test_" + i + ".seq"); + // create a file with length entries + @SuppressWarnings("deprecation") + SequenceFile.Writer writer = + SequenceFile.createWriter(localFs, conf, file, + IntWritable.class, BytesWritable.class); + Range range = ranges[i]; + try { + for (int j = range.start; j < range.end; j++) { + IntWritable key = new IntWritable(j); + byte[] data = new byte[random.nextInt(10)]; + random.nextBytes(data); + BytesWritable value = new BytesWritable(data); + writer.append(key, value); + } + } finally { + writer.close(); + } + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineTextInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineTextInputFormat.java new file mode 100644 index 0000000000..384bd39c75 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineTextInputFormat.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.fail; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.lib.CombineFileSplit; +import org.apache.hadoop.mapred.lib.CombineTextInputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.junit.Test; + +public class TestCombineTextInputFormat { + private static final Log LOG = + LogFactory.getLog(TestCombineTextInputFormat.class); + + private static JobConf defaultConf = new JobConf(); + private static FileSystem localFs = null; + + static { + try { + defaultConf.set("fs.defaultFS", "file:///"); + localFs = FileSystem.getLocal(defaultConf); + } catch (IOException e) { + throw new RuntimeException("init failure", e); + } + } + + @SuppressWarnings("deprecation") + private static Path workDir = + new Path(new Path(System.getProperty("test.build.data", "/tmp")), + "TestCombineTextInputFormat").makeQualified(localFs); + + // A reporter that does nothing + private static final Reporter voidReporter = Reporter.NULL; + + @Test(timeout=10000) + public void testFormat() throws Exception { + JobConf job = new JobConf(defaultConf); + + Random random = new Random(); + long seed = random.nextLong(); + LOG.info("seed = "+seed); + random.setSeed(seed); + + localFs.delete(workDir, true); + FileInputFormat.setInputPaths(job, workDir); + + final int length = 10000; + final int numFiles = 10; + + createFiles(length, numFiles, random); + + // create a combined split for the files + CombineTextInputFormat format = new CombineTextInputFormat(); + LongWritable key = new LongWritable(); + Text value = new Text(); + for (int i = 0; i < 3; i++) { + int numSplits = random.nextInt(length/20)+1; + LOG.info("splitting: requesting = " + numSplits); + InputSplit[] splits = format.getSplits(job, numSplits); + LOG.info("splitting: got = " + splits.length); + + // we should have a single split as the length is comfortably smaller than + // the block size + assertEquals("We got more than one splits!", 1, splits.length); + InputSplit split = splits[0]; + assertEquals("It should be CombineFileSplit", + CombineFileSplit.class, split.getClass()); + + // check the split + BitSet bits = new BitSet(length); + LOG.debug("split= " + split); + RecordReader reader = + format.getRecordReader(split, job, voidReporter); + try { + int count = 0; + while (reader.next(key, value)) { + int v = Integer.parseInt(value.toString()); + LOG.debug("read " + v); + if (bits.get(v)) { + LOG.warn("conflict with " + v + + " at position "+reader.getPos()); + } + assertFalse("Key in multiple partitions.", bits.get(v)); + bits.set(v); + count++; + } + LOG.info("splits="+split+" count=" + count); + } finally { + reader.close(); + } + assertEquals("Some keys in no partition.", length, bits.cardinality()); + } + } + + private static class Range { + private final int start; + private final int end; + + Range(int start, int end) { + this.start = start; + this.end = end; + } + + @Override + public String toString() { + return "(" + start + ", " + end + ")"; + } + } + + private static Range[] createRanges(int length, int numFiles, Random random) { + // generate a number of files with various lengths + Range[] ranges = new Range[numFiles]; + for (int i = 0; i < numFiles; i++) { + int start = i == 0 ? 0 : ranges[i-1].end; + int end = i == numFiles - 1 ? + length : + (length/numFiles)*(2*i + 1)/2 + random.nextInt(length/numFiles) + 1; + ranges[i] = new Range(start, end); + } + return ranges; + } + + private static void createFiles(int length, int numFiles, Random random) + throws IOException { + Range[] ranges = createRanges(length, numFiles, random); + + for (int i = 0; i < numFiles; i++) { + Path file = new Path(workDir, "test_" + i + ".txt"); + Writer writer = new OutputStreamWriter(localFs.create(file)); + Range range = ranges[i]; + try { + for (int j = range.start; j < range.end; j++) { + writer.write(Integer.toString(j)); + writer.write("\n"); + } + } finally { + writer.close(); + } + } + } + + private static void writeFile(FileSystem fs, Path name, + CompressionCodec codec, + String contents) throws IOException { + OutputStream stm; + if (codec == null) { + stm = fs.create(name); + } else { + stm = codec.createOutputStream(fs.create(name)); + } + stm.write(contents.getBytes()); + stm.close(); + } + + private static List readSplit(InputFormat format, + InputSplit split, + JobConf job) throws IOException { + List result = new ArrayList(); + RecordReader reader = + format.getRecordReader(split, job, voidReporter); + LongWritable key = reader.createKey(); + Text value = reader.createValue(); + while (reader.next(key, value)) { + result.add(value); + value = reader.createValue(); + } + reader.close(); + return result; + } + + /** + * Test using the gzip codec for reading + */ + @Test(timeout=10000) + public void testGzip() throws IOException { + JobConf job = new JobConf(defaultConf); + CompressionCodec gzip = new GzipCodec(); + ReflectionUtils.setConf(gzip, job); + localFs.delete(workDir, true); + writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, + "the quick\nbrown\nfox jumped\nover\n the lazy\n dog\n"); + writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, + "this is a test\nof gzip\n"); + FileInputFormat.setInputPaths(job, workDir); + CombineTextInputFormat format = new CombineTextInputFormat(); + InputSplit[] splits = format.getSplits(job, 100); + assertEquals("compressed splits == 1", 1, splits.length); + List results = readSplit(format, splits[0], job); + assertEquals("splits[0] length", 8, results.size()); + + final String[] firstList = + {"the quick", "brown", "fox jumped", "over", " the lazy", " dog"}; + final String[] secondList = {"this is a test", "of gzip"}; + String first = results.get(0).toString(); + if (first.equals(firstList[0])) { + testResults(results, firstList, secondList); + } else if (first.equals(secondList[0])) { + testResults(results, secondList, firstList); + } else { + fail("unexpected first token!"); + } + } + + private static void testResults(List results, String[] first, + String[] second) { + for (int i = 0; i < first.length; i++) { + assertEquals("splits[0]["+i+"]", first[i], results.get(i).toString()); + } + for (int i = 0; i < second.length; i++) { + int j = i + first.length; + assertEquals("splits[0]["+j+"]", second[i], results.get(j).toString()); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java new file mode 100644 index 0000000000..047f94909f --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertNotNull; + +import java.io.IOException; +import java.util.BitSet; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.MapReduceTestUtil; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.task.MapContextImpl; +import org.junit.Test; + +public class TestCombineSequenceFileInputFormat { + private static final Log LOG = + LogFactory.getLog(TestCombineSequenceFileInputFormat.class); + private static Configuration conf = new Configuration(); + private static FileSystem localFs = null; + + static { + try { + conf.set("fs.defaultFS", "file:///"); + localFs = FileSystem.getLocal(conf); + } catch (IOException e) { + throw new RuntimeException("init failure", e); + } + } + + private static Path workDir = + new Path(new Path(System.getProperty("test.build.data", "."), "data"), + "TestCombineSequenceFileInputFormat"); + + @Test(timeout=10000) + public void testFormat() throws IOException, InterruptedException { + Job job = Job.getInstance(conf); + + Random random = new Random(); + long seed = random.nextLong(); + random.setSeed(seed); + + localFs.delete(workDir, true); + FileInputFormat.setInputPaths(job, workDir); + + final int length = 10000; + final int numFiles = 10; + + // create files with a variety of lengths + createFiles(length, numFiles, random, job); + + TaskAttemptContext context = MapReduceTestUtil. + createDummyMapTaskAttemptContext(job.getConfiguration()); + // create a combine split for the files + InputFormat format = + new CombineSequenceFileInputFormat(); + for (int i = 0; i < 3; i++) { + int numSplits = + random.nextInt(length/(SequenceFile.SYNC_INTERVAL/20)) + 1; + LOG.info("splitting: requesting = " + numSplits); + List splits = format.getSplits(job); + LOG.info("splitting: got = " + splits.size()); + + // we should have a single split as the length is comfortably smaller than + // the block size + assertEquals("We got more than one splits!", 1, splits.size()); + InputSplit split = splits.get(0); + assertEquals("It should be CombineFileSplit", + CombineFileSplit.class, split.getClass()); + + // check the split + BitSet bits = new BitSet(length); + RecordReader reader = + format.createRecordReader(split, context); + MapContext mcontext = + new MapContextImpl(job.getConfiguration(), + context.getTaskAttemptID(), reader, null, null, + MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + assertEquals("reader class is CombineFileRecordReader.", + CombineFileRecordReader.class, reader.getClass()); + + try { + while (reader.nextKeyValue()) { + IntWritable key = reader.getCurrentKey(); + BytesWritable value = reader.getCurrentValue(); + assertNotNull("Value should not be null.", value); + final int k = key.get(); + LOG.debug("read " + k); + assertFalse("Key in multiple partitions.", bits.get(k)); + bits.set(k); + } + } finally { + reader.close(); + } + assertEquals("Some keys in no partition.", length, bits.cardinality()); + } + } + + + private static class Range { + private final int start; + private final int end; + + Range(int start, int end) { + this.start = start; + this.end = end; + } + + @Override + public String toString() { + return "(" + start + ", " + end + ")"; + } + } + + private static Range[] createRanges(int length, int numFiles, Random random) { + // generate a number of files with various lengths + Range[] ranges = new Range[numFiles]; + for (int i = 0; i < numFiles; i++) { + int start = i == 0 ? 0 : ranges[i-1].end; + int end = i == numFiles - 1 ? + length : + (length/numFiles)*(2*i + 1)/2 + random.nextInt(length/numFiles) + 1; + ranges[i] = new Range(start, end); + } + return ranges; + } + + private static void createFiles(int length, int numFiles, Random random, + Job job) throws IOException { + Range[] ranges = createRanges(length, numFiles, random); + + for (int i = 0; i < numFiles; i++) { + Path file = new Path(workDir, "test_" + i + ".seq"); + // create a file with length entries + @SuppressWarnings("deprecation") + SequenceFile.Writer writer = + SequenceFile.createWriter(localFs, job.getConfiguration(), file, + IntWritable.class, BytesWritable.class); + Range range = ranges[i]; + try { + for (int j = range.start; j < range.end; j++) { + IntWritable key = new IntWritable(j); + byte[] data = new byte[random.nextInt(10)]; + random.nextBytes(data); + BytesWritable value = new BytesWritable(data); + writer.append(key, value); + } + } finally { + writer.close(); + } + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java new file mode 100644 index 0000000000..0b44ca610a --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.fail; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.MapReduceTestUtil; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.task.MapContextImpl; +import org.apache.hadoop.util.ReflectionUtils; +import org.junit.Test; + +public class TestCombineTextInputFormat { + private static final Log LOG = + LogFactory.getLog(TestCombineTextInputFormat.class); + + private static Configuration defaultConf = new Configuration(); + private static FileSystem localFs = null; + + static { + try { + defaultConf.set("fs.defaultFS", "file:///"); + localFs = FileSystem.getLocal(defaultConf); + } catch (IOException e) { + throw new RuntimeException("init failure", e); + } + } + + private static Path workDir = + new Path(new Path(System.getProperty("test.build.data", "."), "data"), + "TestCombineTextInputFormat"); + + @Test(timeout=10000) + public void testFormat() throws Exception { + Job job = Job.getInstance(new Configuration(defaultConf)); + + Random random = new Random(); + long seed = random.nextLong(); + LOG.info("seed = " + seed); + random.setSeed(seed); + + localFs.delete(workDir, true); + FileInputFormat.setInputPaths(job, workDir); + + final int length = 10000; + final int numFiles = 10; + + // create files with various lengths + createFiles(length, numFiles, random); + + // create a combined split for the files + CombineTextInputFormat format = new CombineTextInputFormat(); + for (int i = 0; i < 3; i++) { + int numSplits = random.nextInt(length/20) + 1; + LOG.info("splitting: requesting = " + numSplits); + List splits = format.getSplits(job); + LOG.info("splitting: got = " + splits.size()); + + // we should have a single split as the length is comfortably smaller than + // the block size + assertEquals("We got more than one splits!", 1, splits.size()); + InputSplit split = splits.get(0); + assertEquals("It should be CombineFileSplit", + CombineFileSplit.class, split.getClass()); + + // check the split + BitSet bits = new BitSet(length); + LOG.debug("split= " + split); + TaskAttemptContext context = MapReduceTestUtil. + createDummyMapTaskAttemptContext(job.getConfiguration()); + RecordReader reader = + format.createRecordReader(split, context); + assertEquals("reader class is CombineFileRecordReader.", + CombineFileRecordReader.class, reader.getClass()); + MapContext mcontext = + new MapContextImpl(job.getConfiguration(), + context.getTaskAttemptID(), reader, null, null, + MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + + try { + int count = 0; + while (reader.nextKeyValue()) { + LongWritable key = reader.getCurrentKey(); + assertNotNull("Key should not be null.", key); + Text value = reader.getCurrentValue(); + final int v = Integer.parseInt(value.toString()); + LOG.debug("read " + v); + assertFalse("Key in multiple partitions.", bits.get(v)); + bits.set(v); + count++; + } + LOG.debug("split=" + split + " count=" + count); + } finally { + reader.close(); + } + assertEquals("Some keys in no partition.", length, bits.cardinality()); + } + } + + private static class Range { + private final int start; + private final int end; + + Range(int start, int end) { + this.start = start; + this.end = end; + } + + @Override + public String toString() { + return "(" + start + ", " + end + ")"; + } + } + + private static Range[] createRanges(int length, int numFiles, Random random) { + // generate a number of files with various lengths + Range[] ranges = new Range[numFiles]; + for (int i = 0; i < numFiles; i++) { + int start = i == 0 ? 0 : ranges[i-1].end; + int end = i == numFiles - 1 ? + length : + (length/numFiles)*(2*i + 1)/2 + random.nextInt(length/numFiles) + 1; + ranges[i] = new Range(start, end); + } + return ranges; + } + + private static void createFiles(int length, int numFiles, Random random) + throws IOException { + Range[] ranges = createRanges(length, numFiles, random); + + for (int i = 0; i < numFiles; i++) { + Path file = new Path(workDir, "test_" + i + ".txt"); + Writer writer = new OutputStreamWriter(localFs.create(file)); + Range range = ranges[i]; + try { + for (int j = range.start; j < range.end; j++) { + writer.write(Integer.toString(j)); + writer.write("\n"); + } + } finally { + writer.close(); + } + } + } + + private static void writeFile(FileSystem fs, Path name, + CompressionCodec codec, + String contents) throws IOException { + OutputStream stm; + if (codec == null) { + stm = fs.create(name); + } else { + stm = codec.createOutputStream(fs.create(name)); + } + stm.write(contents.getBytes()); + stm.close(); + } + + private static List readSplit(InputFormat format, + InputSplit split, Job job) throws IOException, InterruptedException { + List result = new ArrayList(); + Configuration conf = job.getConfiguration(); + TaskAttemptContext context = MapReduceTestUtil. + createDummyMapTaskAttemptContext(conf); + RecordReader reader = format.createRecordReader(split, + MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); + MapContext mcontext = + new MapContextImpl(conf, + context.getTaskAttemptID(), reader, null, null, + MapReduceTestUtil.createDummyReporter(), + split); + reader.initialize(split, mcontext); + while (reader.nextKeyValue()) { + result.add(new Text(reader.getCurrentValue())); + } + return result; + } + + /** + * Test using the gzip codec for reading + */ + @Test(timeout=10000) + public void testGzip() throws IOException, InterruptedException { + Configuration conf = new Configuration(defaultConf); + CompressionCodec gzip = new GzipCodec(); + ReflectionUtils.setConf(gzip, conf); + localFs.delete(workDir, true); + writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, + "the quick\nbrown\nfox jumped\nover\n the lazy\n dog\n"); + writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, + "this is a test\nof gzip\n"); + Job job = Job.getInstance(conf); + FileInputFormat.setInputPaths(job, workDir); + CombineTextInputFormat format = new CombineTextInputFormat(); + List splits = format.getSplits(job); + assertEquals("compressed splits == 1", 1, splits.size()); + List results = readSplit(format, splits.get(0), job); + assertEquals("splits[0] length", 8, results.size()); + + final String[] firstList = + {"the quick", "brown", "fox jumped", "over", " the lazy", " dog"}; + final String[] secondList = {"this is a test", "of gzip"}; + String first = results.get(0).toString(); + if (first.equals(firstList[0])) { + testResults(results, firstList, secondList); + } else if (first.equals(secondList[0])) { + testResults(results, secondList, firstList); + } else { + fail("unexpected first token!"); + } + } + + private static void testResults(List results, String[] first, + String[] second) { + for (int i = 0; i < first.length; i++) { + assertEquals("splits[0]["+i+"]", first[i], results.get(i).toString()); + } + for (int i = 0; i < second.length; i++) { + int j = i + first.length; + assertEquals("splits[0]["+j+"]", second[i], results.get(j).toString()); + } + } +}