MAPREDUCE-2632. Avoid calling the partitioner when the numReduceTasks is 1. (Ravi Teja Ch N V and Sunil G via kasha)
This commit is contained in:
parent
a0fae8dcae
commit
bdbd10fde1
@ -109,6 +109,9 @@ Trunk (Unreleased)
|
|||||||
MAPREDUCE-6057. Remove obsolete entries from mapred-default.xml
|
MAPREDUCE-6057. Remove obsolete entries from mapred-default.xml
|
||||||
(Ray Chiang via aw)
|
(Ray Chiang via aw)
|
||||||
|
|
||||||
|
MAPREDUCE-2632. Avoid calling the partitioner when the numReduceTasks is 1.
|
||||||
|
(Ravi Teja Ch N V and Sunil G via kasha)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
MAPREDUCE-6191. Improve clearing stale state of Java serialization
|
MAPREDUCE-6191. Improve clearing stale state of Java serialization
|
||||||
|
@ -95,7 +95,13 @@ Writable getEntry(MapFile.Reader[] readers,
|
|||||||
Partitioner<K, V> partitioner,
|
Partitioner<K, V> partitioner,
|
||||||
K key,
|
K key,
|
||||||
V value) throws IOException {
|
V value) throws IOException {
|
||||||
int part = partitioner.getPartition(key, value, readers.length);
|
int readerLength = readers.length;
|
||||||
|
int part;
|
||||||
|
if (readerLength <= 1) {
|
||||||
|
part = 0;
|
||||||
|
} else {
|
||||||
|
part = partitioner.getPartition(key, value, readers.length);
|
||||||
|
}
|
||||||
return readers[part].get(key, value);
|
return readers[part].get(key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -30,6 +30,9 @@
|
|||||||
* is the same as the number of reduce tasks for the job. Hence this controls
|
* is the same as the number of reduce tasks for the job. Hence this controls
|
||||||
* which of the <code>m</code> reduce tasks the intermediate key (and hence the
|
* which of the <code>m</code> reduce tasks the intermediate key (and hence the
|
||||||
* record) is sent for reduction.</p>
|
* record) is sent for reduction.</p>
|
||||||
|
*
|
||||||
|
* <p>Note: A <code>Partitioner</code> is created only when there are multiple
|
||||||
|
* reducers.</p>
|
||||||
*
|
*
|
||||||
* @see Reducer
|
* @see Reducer
|
||||||
*/
|
*/
|
||||||
|
@ -31,9 +31,12 @@
|
|||||||
* is the same as the number of reduce tasks for the job. Hence this controls
|
* is the same as the number of reduce tasks for the job. Hence this controls
|
||||||
* which of the <code>m</code> reduce tasks the intermediate key (and hence the
|
* which of the <code>m</code> reduce tasks the intermediate key (and hence the
|
||||||
* record) is sent for reduction.</p>
|
* record) is sent for reduction.</p>
|
||||||
*
|
*
|
||||||
* Note: If you require your Partitioner class to obtain the Job's configuration
|
* <p>Note: A <code>Partitioner</code> is created only when there are multiple
|
||||||
* object, implement the {@link Configurable} interface.
|
* reducers.</p>
|
||||||
|
*
|
||||||
|
* <p>Note: If you require your Partitioner class to obtain the Job's
|
||||||
|
* configuration object, implement the {@link Configurable} interface.</p>
|
||||||
*
|
*
|
||||||
* @see Reducer
|
* @see Reducer
|
||||||
*/
|
*/
|
||||||
|
@ -114,7 +114,13 @@ public boolean accept(Path path) {
|
|||||||
public static <K extends WritableComparable<?>, V extends Writable>
|
public static <K extends WritableComparable<?>, V extends Writable>
|
||||||
Writable getEntry(MapFile.Reader[] readers,
|
Writable getEntry(MapFile.Reader[] readers,
|
||||||
Partitioner<K, V> partitioner, K key, V value) throws IOException {
|
Partitioner<K, V> partitioner, K key, V value) throws IOException {
|
||||||
int part = partitioner.getPartition(key, value, readers.length);
|
int readerLength = readers.length;
|
||||||
|
int part;
|
||||||
|
if (readerLength <= 1) {
|
||||||
|
part = 0;
|
||||||
|
} else {
|
||||||
|
part = partitioner.getPartition(key, value, readers.length);
|
||||||
|
}
|
||||||
return readers[part].get(key, value);
|
return readers[part].get(key, value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,70 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.io.WritableComparable;
|
||||||
|
import org.apache.hadoop.io.MapFile.Reader;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
public class TestMapFileOutputFormat {
|
||||||
|
|
||||||
|
@SuppressWarnings("static-access")
|
||||||
|
@Test
|
||||||
|
public void testPartitionerShouldNotBeCalledWhenOneReducerIsPresent()
|
||||||
|
throws Exception {
|
||||||
|
MapFileOutputFormat outputFormat = new MapFileOutputFormat();
|
||||||
|
Reader reader = Mockito.mock(Reader.class);
|
||||||
|
Reader[] readers = new Reader[]{reader};
|
||||||
|
outputFormat.getEntry(readers, new MyPartitioner(), new Text(), new Text());
|
||||||
|
assertTrue(!MyPartitioner.isGetPartitionCalled());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void tearDown() throws Exception {
|
||||||
|
MyPartitioner.setGetPartitionCalled(false);
|
||||||
|
};
|
||||||
|
private static class MyPartitioner
|
||||||
|
implements
|
||||||
|
Partitioner<WritableComparable, Writable> {
|
||||||
|
private static boolean getPartitionCalled = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getPartition(WritableComparable key, Writable value,
|
||||||
|
int numPartitions) {
|
||||||
|
setGetPartitionCalled(true);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isGetPartitionCalled() {
|
||||||
|
return getPartitionCalled;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(JobConf job) {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setGetPartitionCalled(boolean getPartitionCalled) {
|
||||||
|
MyPartitioner.getPartitionCalled = getPartitionCalled;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,65 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.lib.output;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.io.WritableComparable;
|
||||||
|
import org.apache.hadoop.io.MapFile.Reader;
|
||||||
|
import org.apache.hadoop.mapreduce.Partitioner;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
public class TestMapFileOutputFormat {
|
||||||
|
|
||||||
|
@SuppressWarnings("static-access")
|
||||||
|
@Test
|
||||||
|
public void testPartitionerShouldNotBeCalledWhenOneReducerIsPresent()
|
||||||
|
throws Exception {
|
||||||
|
MapFileOutputFormat outputFormat = new MapFileOutputFormat();
|
||||||
|
Reader reader = Mockito.mock(Reader.class);
|
||||||
|
Reader[] readers = new Reader[]{reader};
|
||||||
|
outputFormat.getEntry(readers, new MyPartitioner(), new Text(), new Text());
|
||||||
|
assertTrue(!MyPartitioner.isGetPartitionCalled());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
MyPartitioner.setGetPartitionCalled(false);
|
||||||
|
}
|
||||||
|
private static class MyPartitioner
|
||||||
|
extends
|
||||||
|
Partitioner<WritableComparable, Writable> {
|
||||||
|
private static boolean getPartitionCalled = false;
|
||||||
|
|
||||||
|
public static boolean isGetPartitionCalled() {
|
||||||
|
return getPartitionCalled;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public int getPartition(WritableComparable key, Writable value,
|
||||||
|
int numPartitions) {
|
||||||
|
setGetPartitionCalled(true);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
public static void setGetPartitionCalled(boolean getPartitionCalled) {
|
||||||
|
MyPartitioner.getPartitionCalled = getPartitionCalled;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user