MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners (tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1555968 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
410f3a9f60
commit
76238b9722
@ -196,6 +196,8 @@ Release 2.4.0 - UNRELEASED
|
|||||||
MAPREDUCE-5550. Task Status message (reporter.setStatus) not shown in UI
|
MAPREDUCE-5550. Task Status message (reporter.setStatus) not shown in UI
|
||||||
with Hadoop 2.0 (Gera Shegalov via Sandy Ryza)
|
with Hadoop 2.0 (Gera Shegalov via Sandy Ryza)
|
||||||
|
|
||||||
|
MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners (tucu)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
|
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
|
||||||
|
@ -949,6 +949,23 @@ public String getKeyFieldPartitionerOption() {
|
|||||||
return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
|
return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the user defined {@link WritableComparable} comparator for
|
||||||
|
* grouping keys of inputs to the combiner.
|
||||||
|
*
|
||||||
|
* @return comparator set by the user for grouping values.
|
||||||
|
* @see #setCombinerKeyGroupingComparator(Class) for details.
|
||||||
|
*/
|
||||||
|
public RawComparator getCombinerKeyGroupingComparator() {
|
||||||
|
Class<? extends RawComparator> theClass = getClass(
|
||||||
|
JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
|
||||||
|
if (theClass == null) {
|
||||||
|
return getOutputKeyComparator();
|
||||||
|
}
|
||||||
|
|
||||||
|
return ReflectionUtils.newInstance(theClass, this);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the user defined {@link WritableComparable} comparator for
|
* Get the user defined {@link WritableComparable} comparator for
|
||||||
* grouping keys of inputs to the reduce.
|
* grouping keys of inputs to the reduce.
|
||||||
@ -966,6 +983,37 @@ public RawComparator getOutputValueGroupingComparator() {
|
|||||||
return ReflectionUtils.newInstance(theClass, this);
|
return ReflectionUtils.newInstance(theClass, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the user defined {@link RawComparator} comparator for
|
||||||
|
* grouping keys in the input to the combiner.
|
||||||
|
* <p/>
|
||||||
|
* <p>This comparator should be provided if the equivalence rules for keys
|
||||||
|
* for sorting the intermediates are different from those for grouping keys
|
||||||
|
* before each call to
|
||||||
|
* {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
|
||||||
|
* <p/>
|
||||||
|
* <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
|
||||||
|
* in a single call to the reduce function if K1 and K2 compare as equal.</p>
|
||||||
|
* <p/>
|
||||||
|
* <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
|
||||||
|
* how keys are sorted, this can be used in conjunction to simulate
|
||||||
|
* <i>secondary sort on values</i>.</p>
|
||||||
|
* <p/>
|
||||||
|
* <p><i>Note</i>: This is not a guarantee of the combiner sort being
|
||||||
|
* <i>stable</i> in any sense. (In any case, with the order of available
|
||||||
|
* map-outputs to the combiner being non-deterministic, it wouldn't make
|
||||||
|
* that much sense.)</p>
|
||||||
|
*
|
||||||
|
* @param theClass the comparator class to be used for grouping keys for the
|
||||||
|
* combiner. It should implement <code>RawComparator</code>.
|
||||||
|
* @see #setOutputKeyComparatorClass(Class)
|
||||||
|
*/
|
||||||
|
public void setCombinerKeyGroupingComparator(
|
||||||
|
Class<? extends RawComparator> theClass) {
|
||||||
|
setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
|
||||||
|
theClass, RawComparator.class);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the user defined {@link RawComparator} comparator for
|
* Set the user defined {@link RawComparator} comparator for
|
||||||
* grouping keys in the input to the reduce.
|
* grouping keys in the input to the reduce.
|
||||||
@ -990,6 +1038,8 @@ public RawComparator getOutputValueGroupingComparator() {
|
|||||||
* @param theClass the comparator class to be used for grouping keys.
|
* @param theClass the comparator class to be used for grouping keys.
|
||||||
* It should implement <code>RawComparator</code>.
|
* It should implement <code>RawComparator</code>.
|
||||||
* @see #setOutputKeyComparatorClass(Class)
|
* @see #setOutputKeyComparatorClass(Class)
|
||||||
|
* @see {@link #setCombinerKeyGroupingComparator(Class)} for setting a
|
||||||
|
* comparator for the combiner.
|
||||||
*/
|
*/
|
||||||
public void setOutputValueGroupingComparator(
|
public void setOutputValueGroupingComparator(
|
||||||
Class<? extends RawComparator> theClass) {
|
Class<? extends RawComparator> theClass) {
|
||||||
|
@ -1575,7 +1575,8 @@ protected OldCombinerRunner(Class<? extends Reducer<K,V,K,V>> cls,
|
|||||||
combinerClass = cls;
|
combinerClass = cls;
|
||||||
keyClass = (Class<K>) job.getMapOutputKeyClass();
|
keyClass = (Class<K>) job.getMapOutputKeyClass();
|
||||||
valueClass = (Class<V>) job.getMapOutputValueClass();
|
valueClass = (Class<V>) job.getMapOutputValueClass();
|
||||||
comparator = (RawComparator<K>) job.getOutputKeyComparator();
|
comparator = (RawComparator<K>)
|
||||||
|
job.getCombinerKeyGroupingComparator();
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@ -1624,7 +1625,7 @@ protected static class NewCombinerRunner<K, V> extends CombinerRunner<K,V> {
|
|||||||
this.taskId = taskId;
|
this.taskId = taskId;
|
||||||
keyClass = (Class<K>) context.getMapOutputKeyClass();
|
keyClass = (Class<K>) context.getMapOutputKeyClass();
|
||||||
valueClass = (Class<V>) context.getMapOutputValueClass();
|
valueClass = (Class<V>) context.getMapOutputValueClass();
|
||||||
comparator = (RawComparator<K>) context.getSortComparator();
|
comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator();
|
||||||
this.committer = committer;
|
this.committer = committer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -948,11 +948,27 @@ public void setOutputValueClass(Class<?> theClass
|
|||||||
conf.setOutputValueClass(theClass);
|
conf.setOutputValueClass(theClass);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Define the comparator that controls which keys are grouped together
|
||||||
|
* for a single call to combiner,
|
||||||
|
* {@link Reducer#reduce(Object, Iterable,
|
||||||
|
* org.apache.hadoop.mapreduce.Reducer.Context)}
|
||||||
|
*
|
||||||
|
* @param cls the raw comparator to use
|
||||||
|
* @throws IllegalStateException if the job is submitted
|
||||||
|
*/
|
||||||
|
public void setCombinerKeyGroupingComparatorClass(
|
||||||
|
Class<? extends RawComparator> cls) throws IllegalStateException {
|
||||||
|
ensureState(JobState.DEFINE);
|
||||||
|
conf.setCombinerKeyGroupingComparator(cls);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Define the comparator that controls how the keys are sorted before they
|
* Define the comparator that controls how the keys are sorted before they
|
||||||
* are passed to the {@link Reducer}.
|
* are passed to the {@link Reducer}.
|
||||||
* @param cls the raw comparator
|
* @param cls the raw comparator
|
||||||
* @throws IllegalStateException if the job is submitted
|
* @throws IllegalStateException if the job is submitted
|
||||||
|
* @see {@link #setCombinerKeyGroupingComparatorClass(Class)}
|
||||||
*/
|
*/
|
||||||
public void setSortComparatorClass(Class<? extends RawComparator> cls
|
public void setSortComparatorClass(Class<? extends RawComparator> cls
|
||||||
) throws IllegalStateException {
|
) throws IllegalStateException {
|
||||||
@ -967,6 +983,8 @@ public void setSortComparatorClass(Class<? extends RawComparator> cls
|
|||||||
* org.apache.hadoop.mapreduce.Reducer.Context)}
|
* org.apache.hadoop.mapreduce.Reducer.Context)}
|
||||||
* @param cls the raw comparator to use
|
* @param cls the raw comparator to use
|
||||||
* @throws IllegalStateException if the job is submitted
|
* @throws IllegalStateException if the job is submitted
|
||||||
|
* @see {@link #setCombinerKeyGroupingComparatorClass(Class)} for setting a
|
||||||
|
* comparator for the combiner.
|
||||||
*/
|
*/
|
||||||
public void setGroupingComparatorClass(Class<? extends RawComparator> cls
|
public void setGroupingComparatorClass(Class<? extends RawComparator> cls
|
||||||
) throws IllegalStateException {
|
) throws IllegalStateException {
|
||||||
|
@ -167,12 +167,23 @@ public Class<? extends Partitioner<?,?>> getPartitionerClass()
|
|||||||
*/
|
*/
|
||||||
public String getJar();
|
public String getJar();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the user defined {@link RawComparator} comparator for
|
||||||
|
* grouping keys of inputs to the combiner.
|
||||||
|
*
|
||||||
|
* @return comparator set by the user for grouping values.
|
||||||
|
* @see Job#setCombinerKeyGroupingComparatorClass(Class) for details.
|
||||||
|
*/
|
||||||
|
public RawComparator<?> getCombinerKeyGroupingComparator();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the user defined {@link RawComparator} comparator for
|
* Get the user defined {@link RawComparator} comparator for
|
||||||
* grouping keys of inputs to the reduce.
|
* grouping keys of inputs to the reduce.
|
||||||
*
|
*
|
||||||
* @return comparator set by the user for grouping values.
|
* @return comparator set by the user for grouping values.
|
||||||
* @see Job#setGroupingComparatorClass(Class) for details.
|
* @see Job#setGroupingComparatorClass(Class) for details.
|
||||||
|
* @see {@link #getCombinerKeyGroupingComparator()} for setting a
|
||||||
|
* comparator for the combiner.
|
||||||
*/
|
*/
|
||||||
public RawComparator<?> getGroupingComparator();
|
public RawComparator<?> getGroupingComparator();
|
||||||
|
|
||||||
|
@ -93,6 +93,8 @@ public interface MRJobConfig {
|
|||||||
|
|
||||||
public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";
|
public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";
|
||||||
|
|
||||||
|
public static final String COMBINER_GROUP_COMPARATOR_CLASS = "mapreduce.job.combiner.group.comparator.class";
|
||||||
|
|
||||||
public static final String GROUP_COMPARATOR_CLASS = "mapreduce.job.output.group.comparator.class";
|
public static final String GROUP_COMPARATOR_CLASS = "mapreduce.job.output.group.comparator.class";
|
||||||
|
|
||||||
public static final String WORKING_DIR = "mapreduce.job.working.dir";
|
public static final String WORKING_DIR = "mapreduce.job.working.dir";
|
||||||
|
@ -166,6 +166,11 @@ public String[] getFileTimestamps() {
|
|||||||
return base.getFileTimestamps();
|
return base.getFileTimestamps();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RawComparator<?> getCombinerKeyGroupingComparator() {
|
||||||
|
return base.getCombinerKeyGroupingComparator();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RawComparator<?> getGroupingComparator() {
|
public RawComparator<?> getGroupingComparator() {
|
||||||
return base.getGroupingComparator();
|
return base.getGroupingComparator();
|
||||||
|
@ -159,6 +159,11 @@ public String[] getFileTimestamps() {
|
|||||||
return base.getFileTimestamps();
|
return base.getFileTimestamps();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RawComparator<?> getCombinerKeyGroupingComparator() {
|
||||||
|
return base.getCombinerKeyGroupingComparator();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RawComparator<?> getGroupingComparator() {
|
public RawComparator<?> getGroupingComparator() {
|
||||||
return base.getGroupingComparator();
|
return base.getGroupingComparator();
|
||||||
|
@ -168,6 +168,11 @@ public String[] getFileTimestamps() {
|
|||||||
return mapContext.getFileTimestamps();
|
return mapContext.getFileTimestamps();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RawComparator<?> getCombinerKeyGroupingComparator() {
|
||||||
|
return mapContext.getCombinerKeyGroupingComparator();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RawComparator<?> getGroupingComparator() {
|
public RawComparator<?> getGroupingComparator() {
|
||||||
return mapContext.getGroupingComparator();
|
return mapContext.getGroupingComparator();
|
||||||
|
@ -161,6 +161,11 @@ public String[] getFileTimestamps() {
|
|||||||
return reduceContext.getFileTimestamps();
|
return reduceContext.getFileTimestamps();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RawComparator<?> getCombinerKeyGroupingComparator() {
|
||||||
|
return reduceContext.getCombinerKeyGroupingComparator();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RawComparator<?> getGroupingComparator() {
|
public RawComparator<?> getGroupingComparator() {
|
||||||
return reduceContext.getGroupingComparator();
|
return reduceContext.getGroupingComparator();
|
||||||
|
@ -252,6 +252,17 @@ public String getJar() {
|
|||||||
return conf.getJar();
|
return conf.getJar();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the user defined {@link RawComparator} comparator for
|
||||||
|
* grouping keys of inputs to the combiner.
|
||||||
|
*
|
||||||
|
* @return comparator set by the user for grouping values.
|
||||||
|
* @see Job#setCombinerKeyGroupingComparatorClass(Class) for details.
|
||||||
|
*/
|
||||||
|
public RawComparator<?> getCombinerKeyGroupingComparator() {
|
||||||
|
return conf.getCombinerKeyGroupingComparator();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the user defined {@link RawComparator} comparator for
|
* Get the user defined {@link RawComparator} comparator for
|
||||||
* grouping keys of inputs to the reduce.
|
* grouping keys of inputs to the reduce.
|
||||||
|
@ -582,7 +582,7 @@ private void combineAndSpill(
|
|||||||
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
|
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
|
||||||
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
|
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
|
||||||
RawComparator<K> comparator =
|
RawComparator<K> comparator =
|
||||||
(RawComparator<K>)job.getOutputKeyComparator();
|
(RawComparator<K>)job.getCombinerKeyGroupingComparator();
|
||||||
try {
|
try {
|
||||||
CombineValuesIterator values = new CombineValuesIterator(
|
CombineValuesIterator values = new CombineValuesIterator(
|
||||||
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
|
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
|
||||||
|
@ -0,0 +1,191 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.RawComparator;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileReader;
|
||||||
|
import java.io.FileWriter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.PrintWriter;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
public class TestOldCombinerGrouping {
|
||||||
|
private static String TEST_ROOT_DIR =
|
||||||
|
new File("build", UUID.randomUUID().toString()).getAbsolutePath();
|
||||||
|
|
||||||
|
public static class Map implements
|
||||||
|
Mapper<LongWritable, Text, Text, LongWritable> {
|
||||||
|
@Override
|
||||||
|
public void map(LongWritable key, Text value,
|
||||||
|
OutputCollector<Text, LongWritable> output, Reporter reporter)
|
||||||
|
throws IOException {
|
||||||
|
String v = value.toString();
|
||||||
|
String k = v.substring(0, v.indexOf(","));
|
||||||
|
v = v.substring(v.indexOf(",") + 1);
|
||||||
|
output.collect(new Text(k), new LongWritable(Long.parseLong(v)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(JobConf job) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Reduce implements
|
||||||
|
Reducer<Text, LongWritable, Text, LongWritable> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reduce(Text key, Iterator<LongWritable> values,
|
||||||
|
OutputCollector<Text, LongWritable> output, Reporter reporter)
|
||||||
|
throws IOException {
|
||||||
|
LongWritable maxValue = null;
|
||||||
|
while (values.hasNext()) {
|
||||||
|
LongWritable value = values.next();
|
||||||
|
if (maxValue == null) {
|
||||||
|
maxValue = value;
|
||||||
|
} else if (value.compareTo(maxValue) > 0) {
|
||||||
|
maxValue = value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
output.collect(key, maxValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(JobConf job) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Combiner extends Reduce {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class GroupComparator implements RawComparator<Text> {
|
||||||
|
@Override
|
||||||
|
public int compare(byte[] bytes, int i, int i2, byte[] bytes2, int i3,
|
||||||
|
int i4) {
|
||||||
|
byte[] b1 = new byte[i2];
|
||||||
|
System.arraycopy(bytes, i, b1, 0, i2);
|
||||||
|
|
||||||
|
byte[] b2 = new byte[i4];
|
||||||
|
System.arraycopy(bytes2, i3, b2, 0, i4);
|
||||||
|
|
||||||
|
return compare(new Text(new String(b1)), new Text(new String(b2)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(Text o1, Text o2) {
|
||||||
|
String s1 = o1.toString();
|
||||||
|
String s2 = o2.toString();
|
||||||
|
s1 = s1.substring(0, s1.indexOf("|"));
|
||||||
|
s2 = s2.substring(0, s2.indexOf("|"));
|
||||||
|
return s1.compareTo(s2);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCombiner() throws Exception {
|
||||||
|
if (!new File(TEST_ROOT_DIR).mkdirs()) {
|
||||||
|
throw new RuntimeException("Could not create test dir: " + TEST_ROOT_DIR);
|
||||||
|
}
|
||||||
|
File in = new File(TEST_ROOT_DIR, "input");
|
||||||
|
if (!in.mkdirs()) {
|
||||||
|
throw new RuntimeException("Could not create test dir: " + in);
|
||||||
|
}
|
||||||
|
File out = new File(TEST_ROOT_DIR, "output");
|
||||||
|
PrintWriter pw = new PrintWriter(new FileWriter(new File(in, "data.txt")));
|
||||||
|
pw.println("A|a,1");
|
||||||
|
pw.println("A|b,2");
|
||||||
|
pw.println("B|a,3");
|
||||||
|
pw.println("B|b,4");
|
||||||
|
pw.println("B|c,5");
|
||||||
|
pw.close();
|
||||||
|
JobConf job = new JobConf();
|
||||||
|
job.set("mapreduce.framework.name", "local");
|
||||||
|
TextInputFormat.setInputPaths(job, new Path(in.getPath()));
|
||||||
|
TextOutputFormat.setOutputPath(job, new Path(out.getPath()));
|
||||||
|
job.setMapperClass(Map.class);
|
||||||
|
job.setReducerClass(Reduce.class);
|
||||||
|
job.setInputFormat(TextInputFormat.class);
|
||||||
|
job.setMapOutputKeyClass(Text.class);
|
||||||
|
job.setMapOutputValueClass(LongWritable.class);
|
||||||
|
job.setOutputFormat(TextOutputFormat.class);
|
||||||
|
job.setOutputValueGroupingComparator(GroupComparator.class);
|
||||||
|
|
||||||
|
job.setCombinerClass(Combiner.class);
|
||||||
|
job.setCombinerKeyGroupingComparator(GroupComparator.class);
|
||||||
|
job.setInt("min.num.spills.for.combine", 0);
|
||||||
|
|
||||||
|
JobClient client = new JobClient(job);
|
||||||
|
RunningJob runningJob = client.submitJob(job);
|
||||||
|
runningJob.waitForCompletion();
|
||||||
|
if (runningJob.isSuccessful()) {
|
||||||
|
Counters counters = runningJob.getCounters();
|
||||||
|
|
||||||
|
long combinerInputRecords = counters.getGroup(
|
||||||
|
"org.apache.hadoop.mapreduce.TaskCounter").
|
||||||
|
getCounter("COMBINE_INPUT_RECORDS");
|
||||||
|
long combinerOutputRecords = counters.getGroup(
|
||||||
|
"org.apache.hadoop.mapreduce.TaskCounter").
|
||||||
|
getCounter("COMBINE_OUTPUT_RECORDS");
|
||||||
|
Assert.assertTrue(combinerInputRecords > 0);
|
||||||
|
Assert.assertTrue(combinerInputRecords > combinerOutputRecords);
|
||||||
|
|
||||||
|
BufferedReader br = new BufferedReader(new FileReader(
|
||||||
|
new File(out, "part-00000")));
|
||||||
|
Set<String> output = new HashSet<String>();
|
||||||
|
String line = br.readLine();
|
||||||
|
Assert.assertNotNull(line);
|
||||||
|
output.add(line.substring(0, 1) + line.substring(4, 5));
|
||||||
|
line = br.readLine();
|
||||||
|
Assert.assertNotNull(line);
|
||||||
|
output.add(line.substring(0, 1) + line.substring(4, 5));
|
||||||
|
line = br.readLine();
|
||||||
|
Assert.assertNull(line);
|
||||||
|
br.close();
|
||||||
|
|
||||||
|
Set<String> expected = new HashSet<String>();
|
||||||
|
expected.add("A2");
|
||||||
|
expected.add("B5");
|
||||||
|
|
||||||
|
Assert.assertEquals(expected, output);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
Assert.fail("Job failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,178 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.RawComparator;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileReader;
|
||||||
|
import java.io.FileWriter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.PrintWriter;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
public class TestNewCombinerGrouping {
|
||||||
|
private static String TEST_ROOT_DIR =
|
||||||
|
new File("build", UUID.randomUUID().toString()).getAbsolutePath();
|
||||||
|
|
||||||
|
public static class Map extends
|
||||||
|
Mapper<LongWritable, Text, Text, LongWritable> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void map(LongWritable key, Text value,
|
||||||
|
Context context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
String v = value.toString();
|
||||||
|
String k = v.substring(0, v.indexOf(","));
|
||||||
|
v = v.substring(v.indexOf(",") + 1);
|
||||||
|
context.write(new Text(k), new LongWritable(Long.parseLong(v)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Reduce extends
|
||||||
|
Reducer<Text, LongWritable, Text, LongWritable> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void reduce(Text key, Iterable<LongWritable> values,
|
||||||
|
Context context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
LongWritable maxValue = null;
|
||||||
|
for (LongWritable value : values) {
|
||||||
|
if (maxValue == null) {
|
||||||
|
maxValue = value;
|
||||||
|
} else if (value.compareTo(maxValue) > 0) {
|
||||||
|
maxValue = value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
context.write(key, maxValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Combiner extends Reduce {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class GroupComparator implements RawComparator<Text> {
|
||||||
|
@Override
|
||||||
|
public int compare(byte[] bytes, int i, int i2, byte[] bytes2, int i3,
|
||||||
|
int i4) {
|
||||||
|
byte[] b1 = new byte[i2];
|
||||||
|
System.arraycopy(bytes, i, b1, 0, i2);
|
||||||
|
|
||||||
|
byte[] b2 = new byte[i4];
|
||||||
|
System.arraycopy(bytes2, i3, b2, 0, i4);
|
||||||
|
|
||||||
|
return compare(new Text(new String(b1)), new Text(new String(b2)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(Text o1, Text o2) {
|
||||||
|
String s1 = o1.toString();
|
||||||
|
String s2 = o2.toString();
|
||||||
|
s1 = s1.substring(0, s1.indexOf("|"));
|
||||||
|
s2 = s2.substring(0, s2.indexOf("|"));
|
||||||
|
return s1.compareTo(s2);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCombiner() throws Exception {
|
||||||
|
if (!new File(TEST_ROOT_DIR).mkdirs()) {
|
||||||
|
throw new RuntimeException("Could not create test dir: " + TEST_ROOT_DIR);
|
||||||
|
}
|
||||||
|
File in = new File(TEST_ROOT_DIR, "input");
|
||||||
|
if (!in.mkdirs()) {
|
||||||
|
throw new RuntimeException("Could not create test dir: " + in);
|
||||||
|
}
|
||||||
|
File out = new File(TEST_ROOT_DIR, "output");
|
||||||
|
PrintWriter pw = new PrintWriter(new FileWriter(new File(in, "data.txt")));
|
||||||
|
pw.println("A|a,1");
|
||||||
|
pw.println("A|b,2");
|
||||||
|
pw.println("B|a,3");
|
||||||
|
pw.println("B|b,4");
|
||||||
|
pw.println("B|c,5");
|
||||||
|
pw.close();
|
||||||
|
JobConf conf = new JobConf();
|
||||||
|
conf.set("mapreduce.framework.name", "local");
|
||||||
|
Job job = new Job(conf);
|
||||||
|
TextInputFormat.setInputPaths(job, new Path(in.getPath()));
|
||||||
|
TextOutputFormat.setOutputPath(job, new Path(out.getPath()));
|
||||||
|
|
||||||
|
job.setMapperClass(Map.class);
|
||||||
|
job.setReducerClass(Reduce.class);
|
||||||
|
job.setInputFormatClass(TextInputFormat.class);
|
||||||
|
job.setMapOutputKeyClass(Text.class);
|
||||||
|
job.setMapOutputValueClass(LongWritable.class);
|
||||||
|
job.setOutputFormatClass(TextOutputFormat.class);
|
||||||
|
job.setGroupingComparatorClass(GroupComparator.class);
|
||||||
|
|
||||||
|
job.setCombinerKeyGroupingComparatorClass(GroupComparator.class);
|
||||||
|
job.setCombinerClass(Combiner.class);
|
||||||
|
job.getConfiguration().setInt("min.num.spills.for.combine", 0);
|
||||||
|
|
||||||
|
job.submit();
|
||||||
|
job.waitForCompletion(false);
|
||||||
|
if (job.isSuccessful()) {
|
||||||
|
Counters counters = job.getCounters();
|
||||||
|
|
||||||
|
long combinerInputRecords = counters.findCounter(
|
||||||
|
"org.apache.hadoop.mapreduce.TaskCounter",
|
||||||
|
"COMBINE_INPUT_RECORDS").getValue();
|
||||||
|
long combinerOutputRecords = counters.findCounter(
|
||||||
|
"org.apache.hadoop.mapreduce.TaskCounter",
|
||||||
|
"COMBINE_OUTPUT_RECORDS").getValue();
|
||||||
|
Assert.assertTrue(combinerInputRecords > 0);
|
||||||
|
Assert.assertTrue(combinerInputRecords > combinerOutputRecords);
|
||||||
|
|
||||||
|
BufferedReader br = new BufferedReader(new FileReader(
|
||||||
|
new File(out, "part-r-00000")));
|
||||||
|
Set<String> output = new HashSet<String>();
|
||||||
|
String line = br.readLine();
|
||||||
|
Assert.assertNotNull(line);
|
||||||
|
output.add(line.substring(0, 1) + line.substring(4, 5));
|
||||||
|
line = br.readLine();
|
||||||
|
Assert.assertNotNull(line);
|
||||||
|
output.add(line.substring(0, 1) + line.substring(4, 5));
|
||||||
|
line = br.readLine();
|
||||||
|
Assert.assertNull(line);
|
||||||
|
br.close();
|
||||||
|
|
||||||
|
Set<String> expected = new HashSet<String>();
|
||||||
|
expected.add("A2");
|
||||||
|
expected.add("B5");
|
||||||
|
|
||||||
|
Assert.assertEquals(expected, output);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
Assert.fail("Job failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user