From 76238b9722539b5fd4773129ecc31b11bd8255ef Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Mon, 6 Jan 2014 18:35:26 +0000 Subject: [PATCH] MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners (tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1555968 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 2 + .../org/apache/hadoop/mapred/JobConf.java | 52 ++++- .../java/org/apache/hadoop/mapred/Task.java | 5 +- .../java/org/apache/hadoop/mapreduce/Job.java | 18 ++ .../apache/hadoop/mapreduce/JobContext.java | 21 +- .../apache/hadoop/mapreduce/MRJobConfig.java | 2 + .../lib/chain/ChainMapContextImpl.java | 5 + .../lib/chain/ChainReduceContextImpl.java | 5 + .../mapreduce/lib/map/WrappedMapper.java | 5 + .../mapreduce/lib/reduce/WrappedReducer.java | 5 + .../hadoop/mapreduce/task/JobContextImpl.java | 11 + .../task/reduce/MergeManagerImpl.java | 2 +- .../mapred/TestOldCombinerGrouping.java | 191 ++++++++++++++++++ .../mapreduce/TestNewCombinerGrouping.java | 178 ++++++++++++++++ 14 files changed, 493 insertions(+), 9 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 3fb2dd6156..668b4a893f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -196,6 +196,8 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5550. Task Status message (reporter.setStatus) not shown in UI with Hadoop 2.0 (Gera Shegalov via Sandy Ryza) + MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners (tucu) + OPTIMIZATIONS MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java index 5bae686ab2..53159fbe59 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java @@ -949,6 +949,23 @@ public String getKeyFieldPartitionerOption() { return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS); } + /** + * Get the user defined {@link WritableComparable} comparator for + * grouping keys of inputs to the combiner. + * + * @return comparator set by the user for grouping values. + * @see #setCombinerKeyGroupingComparator(Class) for details. + */ + public RawComparator getCombinerKeyGroupingComparator() { + Class theClass = getClass( + JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class); + if (theClass == null) { + return getOutputKeyComparator(); + } + + return ReflectionUtils.newInstance(theClass, this); + } + /** * Get the user defined {@link WritableComparable} comparator for * grouping keys of inputs to the reduce. @@ -966,6 +983,37 @@ public RawComparator getOutputValueGroupingComparator() { return ReflectionUtils.newInstance(theClass, this); } + /** + * Set the user defined {@link RawComparator} comparator for + * grouping keys in the input to the combiner. + *

+ *

This comparator should be provided if the equivalence rules for keys + * for sorting the intermediates are different from those for grouping keys + * before each call to + * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.

+ *

+ *

For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed + * in a single call to the reduce function if K1 and K2 compare as equal.

+ *

+ *

Since {@link #setOutputKeyComparatorClass(Class)} can be used to control + * how keys are sorted, this can be used in conjunction to simulate + * secondary sort on values.

+ *

+ *

Note: This is not a guarantee of the combiner sort being + * stable in any sense. (In any case, with the order of available + * map-outputs to the combiner being non-deterministic, it wouldn't make + * that much sense.)

+ * + * @param theClass the comparator class to be used for grouping keys for the + * combiner. It should implement RawComparator. + * @see #setOutputKeyComparatorClass(Class) + */ + public void setCombinerKeyGroupingComparator( + Class theClass) { + setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS, + theClass, RawComparator.class); + } + /** * Set the user defined {@link RawComparator} comparator for * grouping keys in the input to the reduce. @@ -989,7 +1037,9 @@ public RawComparator getOutputValueGroupingComparator() { * * @param theClass the comparator class to be used for grouping keys. * It should implement RawComparator. - * @see #setOutputKeyComparatorClass(Class) + * @see #setOutputKeyComparatorClass(Class) + * @see {@link #setCombinerKeyGroupingComparator(Class)} for setting a + * comparator for the combiner. */ public void setOutputValueGroupingComparator( Class theClass) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 660ffc65ad..72cd41c9ea 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -1575,7 +1575,8 @@ protected OldCombinerRunner(Class> cls, combinerClass = cls; keyClass = (Class) job.getMapOutputKeyClass(); valueClass = (Class) job.getMapOutputValueClass(); - comparator = (RawComparator) job.getOutputKeyComparator(); + comparator = (RawComparator) + job.getCombinerKeyGroupingComparator(); } @SuppressWarnings("unchecked") @@ -1624,7 +1625,7 @@ protected static class NewCombinerRunner extends CombinerRunner { this.taskId = taskId; keyClass = (Class) context.getMapOutputKeyClass(); valueClass = (Class) context.getMapOutputValueClass(); - comparator = (RawComparator) context.getSortComparator(); + comparator = (RawComparator) context.getCombinerKeyGroupingComparator(); this.committer = committer; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index 78c6b4b1a9..4bb97e84d6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -948,11 +948,27 @@ public void setOutputValueClass(Class theClass conf.setOutputValueClass(theClass); } + /** + * Define the comparator that controls which keys are grouped together + * for a single call to combiner, + * {@link Reducer#reduce(Object, Iterable, + * org.apache.hadoop.mapreduce.Reducer.Context)} + * + * @param cls the raw comparator to use + * @throws IllegalStateException if the job is submitted + */ + public void setCombinerKeyGroupingComparatorClass( + Class cls) throws IllegalStateException { + ensureState(JobState.DEFINE); + conf.setCombinerKeyGroupingComparator(cls); + } + /** * Define the comparator that controls how the keys are sorted before they * are passed to the {@link Reducer}. * @param cls the raw comparator * @throws IllegalStateException if the job is submitted + * @see {@link #setCombinerKeyGroupingComparatorClass(Class)} */ public void setSortComparatorClass(Class cls ) throws IllegalStateException { @@ -967,6 +983,8 @@ public void setSortComparatorClass(Class cls * org.apache.hadoop.mapreduce.Reducer.Context)} * @param cls the raw comparator to use * @throws IllegalStateException if the job is submitted + * @see {@link #setCombinerKeyGroupingComparatorClass(Class)} for setting a + * comparator for the combiner. */ public void setGroupingComparatorClass(Class cls ) throws IllegalStateException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java index 4842e20b9c..fa73a5f066 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java @@ -167,13 +167,24 @@ public Class> getPartitionerClass() */ public String getJar(); - /** - * Get the user defined {@link RawComparator} comparator for - * grouping keys of inputs to the reduce. - * + /** + * Get the user defined {@link RawComparator} comparator for + * grouping keys of inputs to the combiner. + * * @return comparator set by the user for grouping values. - * @see Job#setGroupingComparatorClass(Class) for details. + * @see Job#setCombinerKeyGroupingComparatorClass(Class) for details. */ + public RawComparator getCombinerKeyGroupingComparator(); + + /** + * Get the user defined {@link RawComparator} comparator for + * grouping keys of inputs to the reduce. + * + * @return comparator set by the user for grouping values. + * @see Job#setGroupingComparatorClass(Class) for details. + * @see {@link #getCombinerKeyGroupingComparator()} for setting a + * comparator for the combiner. + */ public RawComparator getGroupingComparator(); /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index e696b86553..1be7ba3e3b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -93,6 +93,8 @@ public interface MRJobConfig { public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class"; + public static final String COMBINER_GROUP_COMPARATOR_CLASS = "mapreduce.job.combiner.group.comparator.class"; + public static final String GROUP_COMPARATOR_CLASS = "mapreduce.job.output.group.comparator.class"; public static final String WORKING_DIR = "mapreduce.job.working.dir"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java index 598bb93606..ea2c77ace9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java @@ -166,6 +166,11 @@ public String[] getFileTimestamps() { return base.getFileTimestamps(); } + @Override + public RawComparator getCombinerKeyGroupingComparator() { + return base.getCombinerKeyGroupingComparator(); + } + @Override public RawComparator getGroupingComparator() { return base.getGroupingComparator(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java index 8d6648468e..5e9a1add87 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java @@ -159,6 +159,11 @@ public String[] getFileTimestamps() { return base.getFileTimestamps(); } + @Override + public RawComparator getCombinerKeyGroupingComparator() { + return base.getCombinerKeyGroupingComparator(); + } + @Override public RawComparator getGroupingComparator() { return base.getGroupingComparator(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java index 95c4b90c0f..8865a36c31 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java @@ -168,6 +168,11 @@ public String[] getFileTimestamps() { return mapContext.getFileTimestamps(); } + @Override + public RawComparator getCombinerKeyGroupingComparator() { + return mapContext.getCombinerKeyGroupingComparator(); + } + @Override public RawComparator getGroupingComparator() { return mapContext.getGroupingComparator(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java index 39178642f2..185c135c2e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java @@ -161,6 +161,11 @@ public String[] getFileTimestamps() { return reduceContext.getFileTimestamps(); } + @Override + public RawComparator getCombinerKeyGroupingComparator() { + return reduceContext.getCombinerKeyGroupingComparator(); + } + @Override public RawComparator getGroupingComparator() { return reduceContext.getGroupingComparator(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java index b4c6dca554..247c2f2029 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java @@ -252,6 +252,17 @@ public String getJar() { return conf.getJar(); } + /** + * Get the user defined {@link RawComparator} comparator for + * grouping keys of inputs to the combiner. + * + * @return comparator set by the user for grouping values. + * @see Job#setCombinerKeyGroupingComparatorClass(Class) for details. + */ + public RawComparator getCombinerKeyGroupingComparator() { + return conf.getCombinerKeyGroupingComparator(); + } + /** * Get the user defined {@link RawComparator} comparator for * grouping keys of inputs to the reduce. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java index ca3bed9399..a821e4d1b8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java @@ -582,7 +582,7 @@ private void combineAndSpill( Class keyClass = (Class) job.getMapOutputKeyClass(); Class valClass = (Class) job.getMapOutputValueClass(); RawComparator comparator = - (RawComparator)job.getOutputKeyComparator(); + (RawComparator)job.getCombinerKeyGroupingComparator(); try { CombineValuesIterator values = new CombineValuesIterator( kvIter, comparator, keyClass, valClass, job, Reporter.NULL, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java new file mode 100644 index 0000000000..96919bef68 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import junit.framework.Assert; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.UUID; + +public class TestOldCombinerGrouping { + private static String TEST_ROOT_DIR = + new File("build", UUID.randomUUID().toString()).getAbsolutePath(); + + public static class Map implements + Mapper { + @Override + public void map(LongWritable key, Text value, + OutputCollector output, Reporter reporter) + throws IOException { + String v = value.toString(); + String k = v.substring(0, v.indexOf(",")); + v = v.substring(v.indexOf(",") + 1); + output.collect(new Text(k), new LongWritable(Long.parseLong(v))); + } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(JobConf job) { + } + } + + public static class Reduce implements + Reducer { + + @Override + public void reduce(Text key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + LongWritable maxValue = null; + while (values.hasNext()) { + LongWritable value = values.next(); + if (maxValue == null) { + maxValue = value; + } else if (value.compareTo(maxValue) > 0) { + maxValue = value; + } + } + output.collect(key, maxValue); + } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(JobConf job) { + } + } + + public static class Combiner extends Reduce { + } + + public static class GroupComparator implements RawComparator { + @Override + public int compare(byte[] bytes, int i, int i2, byte[] bytes2, int i3, + int i4) { + byte[] b1 = new byte[i2]; + System.arraycopy(bytes, i, b1, 0, i2); + + byte[] b2 = new byte[i4]; + System.arraycopy(bytes2, i3, b2, 0, i4); + + return compare(new Text(new String(b1)), new Text(new String(b2))); + } + + @Override + public int compare(Text o1, Text o2) { + String s1 = o1.toString(); + String s2 = o2.toString(); + s1 = s1.substring(0, s1.indexOf("|")); + s2 = s2.substring(0, s2.indexOf("|")); + return s1.compareTo(s2); + } + + } + + @Test + public void testCombiner() throws Exception { + if (!new File(TEST_ROOT_DIR).mkdirs()) { + throw new RuntimeException("Could not create test dir: " + TEST_ROOT_DIR); + } + File in = new File(TEST_ROOT_DIR, "input"); + if (!in.mkdirs()) { + throw new RuntimeException("Could not create test dir: " + in); + } + File out = new File(TEST_ROOT_DIR, "output"); + PrintWriter pw = new PrintWriter(new FileWriter(new File(in, "data.txt"))); + pw.println("A|a,1"); + pw.println("A|b,2"); + pw.println("B|a,3"); + pw.println("B|b,4"); + pw.println("B|c,5"); + pw.close(); + JobConf job = new JobConf(); + job.set("mapreduce.framework.name", "local"); + TextInputFormat.setInputPaths(job, new Path(in.getPath())); + TextOutputFormat.setOutputPath(job, new Path(out.getPath())); + job.setMapperClass(Map.class); + job.setReducerClass(Reduce.class); + job.setInputFormat(TextInputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setOutputFormat(TextOutputFormat.class); + job.setOutputValueGroupingComparator(GroupComparator.class); + + job.setCombinerClass(Combiner.class); + job.setCombinerKeyGroupingComparator(GroupComparator.class); + job.setInt("min.num.spills.for.combine", 0); + + JobClient client = new JobClient(job); + RunningJob runningJob = client.submitJob(job); + runningJob.waitForCompletion(); + if (runningJob.isSuccessful()) { + Counters counters = runningJob.getCounters(); + + long combinerInputRecords = counters.getGroup( + "org.apache.hadoop.mapreduce.TaskCounter"). + getCounter("COMBINE_INPUT_RECORDS"); + long combinerOutputRecords = counters.getGroup( + "org.apache.hadoop.mapreduce.TaskCounter"). + getCounter("COMBINE_OUTPUT_RECORDS"); + Assert.assertTrue(combinerInputRecords > 0); + Assert.assertTrue(combinerInputRecords > combinerOutputRecords); + + BufferedReader br = new BufferedReader(new FileReader( + new File(out, "part-00000"))); + Set output = new HashSet(); + String line = br.readLine(); + Assert.assertNotNull(line); + output.add(line.substring(0, 1) + line.substring(4, 5)); + line = br.readLine(); + Assert.assertNotNull(line); + output.add(line.substring(0, 1) + line.substring(4, 5)); + line = br.readLine(); + Assert.assertNull(line); + br.close(); + + Set expected = new HashSet(); + expected.add("A2"); + expected.add("B5"); + + Assert.assertEquals(expected, output); + + } else { + Assert.fail("Job failed"); + } + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java new file mode 100644 index 0000000000..c4b734bdc5 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import junit.framework.Assert; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +public class TestNewCombinerGrouping { + private static String TEST_ROOT_DIR = + new File("build", UUID.randomUUID().toString()).getAbsolutePath(); + + public static class Map extends + Mapper { + + @Override + protected void map(LongWritable key, Text value, + Context context) + throws IOException, InterruptedException { + String v = value.toString(); + String k = v.substring(0, v.indexOf(",")); + v = v.substring(v.indexOf(",") + 1); + context.write(new Text(k), new LongWritable(Long.parseLong(v))); + } + } + + public static class Reduce extends + Reducer { + + @Override + protected void reduce(Text key, Iterable values, + Context context) + throws IOException, InterruptedException { + LongWritable maxValue = null; + for (LongWritable value : values) { + if (maxValue == null) { + maxValue = value; + } else if (value.compareTo(maxValue) > 0) { + maxValue = value; + } + } + context.write(key, maxValue); + } + } + + public static class Combiner extends Reduce { + } + + public static class GroupComparator implements RawComparator { + @Override + public int compare(byte[] bytes, int i, int i2, byte[] bytes2, int i3, + int i4) { + byte[] b1 = new byte[i2]; + System.arraycopy(bytes, i, b1, 0, i2); + + byte[] b2 = new byte[i4]; + System.arraycopy(bytes2, i3, b2, 0, i4); + + return compare(new Text(new String(b1)), new Text(new String(b2))); + } + + @Override + public int compare(Text o1, Text o2) { + String s1 = o1.toString(); + String s2 = o2.toString(); + s1 = s1.substring(0, s1.indexOf("|")); + s2 = s2.substring(0, s2.indexOf("|")); + return s1.compareTo(s2); + } + + } + + @Test + public void testCombiner() throws Exception { + if (!new File(TEST_ROOT_DIR).mkdirs()) { + throw new RuntimeException("Could not create test dir: " + TEST_ROOT_DIR); + } + File in = new File(TEST_ROOT_DIR, "input"); + if (!in.mkdirs()) { + throw new RuntimeException("Could not create test dir: " + in); + } + File out = new File(TEST_ROOT_DIR, "output"); + PrintWriter pw = new PrintWriter(new FileWriter(new File(in, "data.txt"))); + pw.println("A|a,1"); + pw.println("A|b,2"); + pw.println("B|a,3"); + pw.println("B|b,4"); + pw.println("B|c,5"); + pw.close(); + JobConf conf = new JobConf(); + conf.set("mapreduce.framework.name", "local"); + Job job = new Job(conf); + TextInputFormat.setInputPaths(job, new Path(in.getPath())); + TextOutputFormat.setOutputPath(job, new Path(out.getPath())); + + job.setMapperClass(Map.class); + job.setReducerClass(Reduce.class); + job.setInputFormatClass(TextInputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setOutputFormatClass(TextOutputFormat.class); + job.setGroupingComparatorClass(GroupComparator.class); + + job.setCombinerKeyGroupingComparatorClass(GroupComparator.class); + job.setCombinerClass(Combiner.class); + job.getConfiguration().setInt("min.num.spills.for.combine", 0); + + job.submit(); + job.waitForCompletion(false); + if (job.isSuccessful()) { + Counters counters = job.getCounters(); + + long combinerInputRecords = counters.findCounter( + "org.apache.hadoop.mapreduce.TaskCounter", + "COMBINE_INPUT_RECORDS").getValue(); + long combinerOutputRecords = counters.findCounter( + "org.apache.hadoop.mapreduce.TaskCounter", + "COMBINE_OUTPUT_RECORDS").getValue(); + Assert.assertTrue(combinerInputRecords > 0); + Assert.assertTrue(combinerInputRecords > combinerOutputRecords); + + BufferedReader br = new BufferedReader(new FileReader( + new File(out, "part-r-00000"))); + Set output = new HashSet(); + String line = br.readLine(); + Assert.assertNotNull(line); + output.add(line.substring(0, 1) + line.substring(4, 5)); + line = br.readLine(); + Assert.assertNotNull(line); + output.add(line.substring(0, 1) + line.substring(4, 5)); + line = br.readLine(); + Assert.assertNull(line); + br.close(); + + Set expected = new HashSet(); + expected.add("A2"); + expected.add("B5"); + + Assert.assertEquals(expected, output); + + } else { + Assert.fail("Job failed"); + } + } + +}