diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 3fb2dd6156..668b4a893f 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -196,6 +196,8 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5550. Task Status message (reporter.setStatus) not shown in UI
with Hadoop 2.0 (Gera Shegalov via Sandy Ryza)
+ MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners (tucu)
+
OPTIMIZATIONS
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
index 5bae686ab2..53159fbe59 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
@@ -949,6 +949,23 @@ public String getKeyFieldPartitionerOption() {
return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
}
+ /**
+ * Get the user defined {@link WritableComparable} comparator for
+ * grouping keys of inputs to the combiner.
+ *
+ * @return comparator set by the user for grouping values.
+ * @see #setCombinerKeyGroupingComparator(Class) for details.
+ */
+ public RawComparator getCombinerKeyGroupingComparator() {
+ Class extends RawComparator> theClass = getClass(
+ JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
+ if (theClass == null) {
+ return getOutputKeyComparator();
+ }
+
+ return ReflectionUtils.newInstance(theClass, this);
+ }
+
/**
* Get the user defined {@link WritableComparable} comparator for
* grouping keys of inputs to the reduce.
@@ -966,6 +983,37 @@ public RawComparator getOutputValueGroupingComparator() {
return ReflectionUtils.newInstance(theClass, this);
}
+ /**
+ * Set the user defined {@link RawComparator} comparator for
+ * grouping keys in the input to the combiner.
+ *
+ * This comparator should be provided if the equivalence rules for keys
+ * for sorting the intermediates are different from those for grouping keys
+ * before each call to
+ * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.
+ *
+ * For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
+ * in a single call to the reduce function if K1 and K2 compare as equal.
+ *
+ * Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
+ * how keys are sorted, this can be used in conjunction to simulate
+ * secondary sort on values.
+ *
+ * Note: This is not a guarantee of the combiner sort being
+ * stable in any sense. (In any case, with the order of available
+ * map-outputs to the combiner being non-deterministic, it wouldn't make
+ * that much sense.)
+ *
+ * @param theClass the comparator class to be used for grouping keys for the
+ * combiner. It should implement RawComparator
.
+ * @see #setOutputKeyComparatorClass(Class)
+ */
+ public void setCombinerKeyGroupingComparator(
+ Class extends RawComparator> theClass) {
+ setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
+ theClass, RawComparator.class);
+ }
+
/**
* Set the user defined {@link RawComparator} comparator for
* grouping keys in the input to the reduce.
@@ -989,7 +1037,9 @@ public RawComparator getOutputValueGroupingComparator() {
*
* @param theClass the comparator class to be used for grouping keys.
* It should implement RawComparator
.
- * @see #setOutputKeyComparatorClass(Class)
+ * @see #setOutputKeyComparatorClass(Class)
+ * @see {@link #setCombinerKeyGroupingComparator(Class)} for setting a
+ * comparator for the combiner.
*/
public void setOutputValueGroupingComparator(
Class extends RawComparator> theClass) {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
index 660ffc65ad..72cd41c9ea 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
@@ -1575,7 +1575,8 @@ protected OldCombinerRunner(Class extends Reducer> cls,
combinerClass = cls;
keyClass = (Class) job.getMapOutputKeyClass();
valueClass = (Class) job.getMapOutputValueClass();
- comparator = (RawComparator) job.getOutputKeyComparator();
+ comparator = (RawComparator)
+ job.getCombinerKeyGroupingComparator();
}
@SuppressWarnings("unchecked")
@@ -1624,7 +1625,7 @@ protected static class NewCombinerRunner extends CombinerRunner {
this.taskId = taskId;
keyClass = (Class) context.getMapOutputKeyClass();
valueClass = (Class) context.getMapOutputValueClass();
- comparator = (RawComparator) context.getSortComparator();
+ comparator = (RawComparator) context.getCombinerKeyGroupingComparator();
this.committer = committer;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
index 78c6b4b1a9..4bb97e84d6 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
@@ -948,11 +948,27 @@ public void setOutputValueClass(Class> theClass
conf.setOutputValueClass(theClass);
}
+ /**
+ * Define the comparator that controls which keys are grouped together
+ * for a single call to combiner,
+ * {@link Reducer#reduce(Object, Iterable,
+ * org.apache.hadoop.mapreduce.Reducer.Context)}
+ *
+ * @param cls the raw comparator to use
+ * @throws IllegalStateException if the job is submitted
+ */
+ public void setCombinerKeyGroupingComparatorClass(
+ Class extends RawComparator> cls) throws IllegalStateException {
+ ensureState(JobState.DEFINE);
+ conf.setCombinerKeyGroupingComparator(cls);
+ }
+
/**
* Define the comparator that controls how the keys are sorted before they
* are passed to the {@link Reducer}.
* @param cls the raw comparator
* @throws IllegalStateException if the job is submitted
+ * @see {@link #setCombinerKeyGroupingComparatorClass(Class)}
*/
public void setSortComparatorClass(Class extends RawComparator> cls
) throws IllegalStateException {
@@ -967,6 +983,8 @@ public void setSortComparatorClass(Class extends RawComparator> cls
* org.apache.hadoop.mapreduce.Reducer.Context)}
* @param cls the raw comparator to use
* @throws IllegalStateException if the job is submitted
+ * @see {@link #setCombinerKeyGroupingComparatorClass(Class)} for setting a
+ * comparator for the combiner.
*/
public void setGroupingComparatorClass(Class extends RawComparator> cls
) throws IllegalStateException {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
index 4842e20b9c..fa73a5f066 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
@@ -167,13 +167,24 @@ public Class extends Partitioner,?>> getPartitionerClass()
*/
public String getJar();
- /**
- * Get the user defined {@link RawComparator} comparator for
- * grouping keys of inputs to the reduce.
- *
+ /**
+ * Get the user defined {@link RawComparator} comparator for
+ * grouping keys of inputs to the combiner.
+ *
* @return comparator set by the user for grouping values.
- * @see Job#setGroupingComparatorClass(Class) for details.
+ * @see Job#setCombinerKeyGroupingComparatorClass(Class) for details.
*/
+ public RawComparator> getCombinerKeyGroupingComparator();
+
+ /**
+ * Get the user defined {@link RawComparator} comparator for
+ * grouping keys of inputs to the reduce.
+ *
+ * @return comparator set by the user for grouping values.
+ * @see Job#setGroupingComparatorClass(Class) for details.
+ * @see {@link #getCombinerKeyGroupingComparator()} for setting a
+ * comparator for the combiner.
+ */
public RawComparator> getGroupingComparator();
/**
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index e696b86553..1be7ba3e3b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -93,6 +93,8 @@ public interface MRJobConfig {
public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";
+ public static final String COMBINER_GROUP_COMPARATOR_CLASS = "mapreduce.job.combiner.group.comparator.class";
+
public static final String GROUP_COMPARATOR_CLASS = "mapreduce.job.output.group.comparator.class";
public static final String WORKING_DIR = "mapreduce.job.working.dir";
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
index 598bb93606..ea2c77ace9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
@@ -166,6 +166,11 @@ public String[] getFileTimestamps() {
return base.getFileTimestamps();
}
+ @Override
+ public RawComparator> getCombinerKeyGroupingComparator() {
+ return base.getCombinerKeyGroupingComparator();
+ }
+
@Override
public RawComparator> getGroupingComparator() {
return base.getGroupingComparator();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
index 8d6648468e..5e9a1add87 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
@@ -159,6 +159,11 @@ public String[] getFileTimestamps() {
return base.getFileTimestamps();
}
+ @Override
+ public RawComparator> getCombinerKeyGroupingComparator() {
+ return base.getCombinerKeyGroupingComparator();
+ }
+
@Override
public RawComparator> getGroupingComparator() {
return base.getGroupingComparator();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
index 95c4b90c0f..8865a36c31 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
@@ -168,6 +168,11 @@ public String[] getFileTimestamps() {
return mapContext.getFileTimestamps();
}
+ @Override
+ public RawComparator> getCombinerKeyGroupingComparator() {
+ return mapContext.getCombinerKeyGroupingComparator();
+ }
+
@Override
public RawComparator> getGroupingComparator() {
return mapContext.getGroupingComparator();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
index 39178642f2..185c135c2e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
@@ -161,6 +161,11 @@ public String[] getFileTimestamps() {
return reduceContext.getFileTimestamps();
}
+ @Override
+ public RawComparator> getCombinerKeyGroupingComparator() {
+ return reduceContext.getCombinerKeyGroupingComparator();
+ }
+
@Override
public RawComparator> getGroupingComparator() {
return reduceContext.getGroupingComparator();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
index b4c6dca554..247c2f2029 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
@@ -252,6 +252,17 @@ public String getJar() {
return conf.getJar();
}
+ /**
+ * Get the user defined {@link RawComparator} comparator for
+ * grouping keys of inputs to the combiner.
+ *
+ * @return comparator set by the user for grouping values.
+ * @see Job#setCombinerKeyGroupingComparatorClass(Class) for details.
+ */
+ public RawComparator> getCombinerKeyGroupingComparator() {
+ return conf.getCombinerKeyGroupingComparator();
+ }
+
/**
* Get the user defined {@link RawComparator} comparator for
* grouping keys of inputs to the reduce.
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
index ca3bed9399..a821e4d1b8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
@@ -582,7 +582,7 @@ private void combineAndSpill(
Class keyClass = (Class) job.getMapOutputKeyClass();
Class valClass = (Class) job.getMapOutputValueClass();
RawComparator comparator =
- (RawComparator)job.getOutputKeyComparator();
+ (RawComparator)job.getCombinerKeyGroupingComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java
new file mode 100644
index 0000000000..96919bef68
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+
+public class TestOldCombinerGrouping {
+ private static String TEST_ROOT_DIR =
+ new File("build", UUID.randomUUID().toString()).getAbsolutePath();
+
+ public static class Map implements
+ Mapper {
+ @Override
+ public void map(LongWritable key, Text value,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+ String v = value.toString();
+ String k = v.substring(0, v.indexOf(","));
+ v = v.substring(v.indexOf(",") + 1);
+ output.collect(new Text(k), new LongWritable(Long.parseLong(v)));
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ }
+ }
+
+ public static class Reduce implements
+ Reducer {
+
+ @Override
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+ LongWritable maxValue = null;
+ while (values.hasNext()) {
+ LongWritable value = values.next();
+ if (maxValue == null) {
+ maxValue = value;
+ } else if (value.compareTo(maxValue) > 0) {
+ maxValue = value;
+ }
+ }
+ output.collect(key, maxValue);
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ }
+ }
+
+ public static class Combiner extends Reduce {
+ }
+
+ public static class GroupComparator implements RawComparator {
+ @Override
+ public int compare(byte[] bytes, int i, int i2, byte[] bytes2, int i3,
+ int i4) {
+ byte[] b1 = new byte[i2];
+ System.arraycopy(bytes, i, b1, 0, i2);
+
+ byte[] b2 = new byte[i4];
+ System.arraycopy(bytes2, i3, b2, 0, i4);
+
+ return compare(new Text(new String(b1)), new Text(new String(b2)));
+ }
+
+ @Override
+ public int compare(Text o1, Text o2) {
+ String s1 = o1.toString();
+ String s2 = o2.toString();
+ s1 = s1.substring(0, s1.indexOf("|"));
+ s2 = s2.substring(0, s2.indexOf("|"));
+ return s1.compareTo(s2);
+ }
+
+ }
+
+ @Test
+ public void testCombiner() throws Exception {
+ if (!new File(TEST_ROOT_DIR).mkdirs()) {
+ throw new RuntimeException("Could not create test dir: " + TEST_ROOT_DIR);
+ }
+ File in = new File(TEST_ROOT_DIR, "input");
+ if (!in.mkdirs()) {
+ throw new RuntimeException("Could not create test dir: " + in);
+ }
+ File out = new File(TEST_ROOT_DIR, "output");
+ PrintWriter pw = new PrintWriter(new FileWriter(new File(in, "data.txt")));
+ pw.println("A|a,1");
+ pw.println("A|b,2");
+ pw.println("B|a,3");
+ pw.println("B|b,4");
+ pw.println("B|c,5");
+ pw.close();
+ JobConf job = new JobConf();
+ job.set("mapreduce.framework.name", "local");
+ TextInputFormat.setInputPaths(job, new Path(in.getPath()));
+ TextOutputFormat.setOutputPath(job, new Path(out.getPath()));
+ job.setMapperClass(Map.class);
+ job.setReducerClass(Reduce.class);
+ job.setInputFormat(TextInputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(LongWritable.class);
+ job.setOutputFormat(TextOutputFormat.class);
+ job.setOutputValueGroupingComparator(GroupComparator.class);
+
+ job.setCombinerClass(Combiner.class);
+ job.setCombinerKeyGroupingComparator(GroupComparator.class);
+ job.setInt("min.num.spills.for.combine", 0);
+
+ JobClient client = new JobClient(job);
+ RunningJob runningJob = client.submitJob(job);
+ runningJob.waitForCompletion();
+ if (runningJob.isSuccessful()) {
+ Counters counters = runningJob.getCounters();
+
+ long combinerInputRecords = counters.getGroup(
+ "org.apache.hadoop.mapreduce.TaskCounter").
+ getCounter("COMBINE_INPUT_RECORDS");
+ long combinerOutputRecords = counters.getGroup(
+ "org.apache.hadoop.mapreduce.TaskCounter").
+ getCounter("COMBINE_OUTPUT_RECORDS");
+ Assert.assertTrue(combinerInputRecords > 0);
+ Assert.assertTrue(combinerInputRecords > combinerOutputRecords);
+
+ BufferedReader br = new BufferedReader(new FileReader(
+ new File(out, "part-00000")));
+ Set output = new HashSet();
+ String line = br.readLine();
+ Assert.assertNotNull(line);
+ output.add(line.substring(0, 1) + line.substring(4, 5));
+ line = br.readLine();
+ Assert.assertNotNull(line);
+ output.add(line.substring(0, 1) + line.substring(4, 5));
+ line = br.readLine();
+ Assert.assertNull(line);
+ br.close();
+
+ Set expected = new HashSet();
+ expected.add("A2");
+ expected.add("B5");
+
+ Assert.assertEquals(expected, output);
+
+ } else {
+ Assert.fail("Job failed");
+ }
+ }
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java
new file mode 100644
index 0000000000..c4b734bdc5
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+public class TestNewCombinerGrouping {
+ private static String TEST_ROOT_DIR =
+ new File("build", UUID.randomUUID().toString()).getAbsolutePath();
+
+ public static class Map extends
+ Mapper {
+
+ @Override
+ protected void map(LongWritable key, Text value,
+ Context context)
+ throws IOException, InterruptedException {
+ String v = value.toString();
+ String k = v.substring(0, v.indexOf(","));
+ v = v.substring(v.indexOf(",") + 1);
+ context.write(new Text(k), new LongWritable(Long.parseLong(v)));
+ }
+ }
+
+ public static class Reduce extends
+ Reducer {
+
+ @Override
+ protected void reduce(Text key, Iterable values,
+ Context context)
+ throws IOException, InterruptedException {
+ LongWritable maxValue = null;
+ for (LongWritable value : values) {
+ if (maxValue == null) {
+ maxValue = value;
+ } else if (value.compareTo(maxValue) > 0) {
+ maxValue = value;
+ }
+ }
+ context.write(key, maxValue);
+ }
+ }
+
+ public static class Combiner extends Reduce {
+ }
+
+ public static class GroupComparator implements RawComparator {
+ @Override
+ public int compare(byte[] bytes, int i, int i2, byte[] bytes2, int i3,
+ int i4) {
+ byte[] b1 = new byte[i2];
+ System.arraycopy(bytes, i, b1, 0, i2);
+
+ byte[] b2 = new byte[i4];
+ System.arraycopy(bytes2, i3, b2, 0, i4);
+
+ return compare(new Text(new String(b1)), new Text(new String(b2)));
+ }
+
+ @Override
+ public int compare(Text o1, Text o2) {
+ String s1 = o1.toString();
+ String s2 = o2.toString();
+ s1 = s1.substring(0, s1.indexOf("|"));
+ s2 = s2.substring(0, s2.indexOf("|"));
+ return s1.compareTo(s2);
+ }
+
+ }
+
+ @Test
+ public void testCombiner() throws Exception {
+ if (!new File(TEST_ROOT_DIR).mkdirs()) {
+ throw new RuntimeException("Could not create test dir: " + TEST_ROOT_DIR);
+ }
+ File in = new File(TEST_ROOT_DIR, "input");
+ if (!in.mkdirs()) {
+ throw new RuntimeException("Could not create test dir: " + in);
+ }
+ File out = new File(TEST_ROOT_DIR, "output");
+ PrintWriter pw = new PrintWriter(new FileWriter(new File(in, "data.txt")));
+ pw.println("A|a,1");
+ pw.println("A|b,2");
+ pw.println("B|a,3");
+ pw.println("B|b,4");
+ pw.println("B|c,5");
+ pw.close();
+ JobConf conf = new JobConf();
+ conf.set("mapreduce.framework.name", "local");
+ Job job = new Job(conf);
+ TextInputFormat.setInputPaths(job, new Path(in.getPath()));
+ TextOutputFormat.setOutputPath(job, new Path(out.getPath()));
+
+ job.setMapperClass(Map.class);
+ job.setReducerClass(Reduce.class);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(LongWritable.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ job.setGroupingComparatorClass(GroupComparator.class);
+
+ job.setCombinerKeyGroupingComparatorClass(GroupComparator.class);
+ job.setCombinerClass(Combiner.class);
+ job.getConfiguration().setInt("min.num.spills.for.combine", 0);
+
+ job.submit();
+ job.waitForCompletion(false);
+ if (job.isSuccessful()) {
+ Counters counters = job.getCounters();
+
+ long combinerInputRecords = counters.findCounter(
+ "org.apache.hadoop.mapreduce.TaskCounter",
+ "COMBINE_INPUT_RECORDS").getValue();
+ long combinerOutputRecords = counters.findCounter(
+ "org.apache.hadoop.mapreduce.TaskCounter",
+ "COMBINE_OUTPUT_RECORDS").getValue();
+ Assert.assertTrue(combinerInputRecords > 0);
+ Assert.assertTrue(combinerInputRecords > combinerOutputRecords);
+
+ BufferedReader br = new BufferedReader(new FileReader(
+ new File(out, "part-r-00000")));
+ Set output = new HashSet();
+ String line = br.readLine();
+ Assert.assertNotNull(line);
+ output.add(line.substring(0, 1) + line.substring(4, 5));
+ line = br.readLine();
+ Assert.assertNotNull(line);
+ output.add(line.substring(0, 1) + line.substring(4, 5));
+ line = br.readLine();
+ Assert.assertNull(line);
+ br.close();
+
+ Set expected = new HashSet();
+ expected.add("A2");
+ expected.add("B5");
+
+ Assert.assertEquals(expected, output);
+
+ } else {
+ Assert.fail("Job failed");
+ }
+ }
+
+}