MAPREDUCE-5228. Bring back FileInputFormat.Counter and FileOuputFormat.Counter for binary compatibility with 1.x mapred APIs. Contributed by Mayank Bansal.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1488060 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
85623a2d75
commit
4394e5edb0
@ -245,6 +245,10 @@ Release 2.0.5-beta - UNRELEASED
|
||||
filecache.DistributedCache for binary compatibility with mapred in 1.x.
|
||||
(Zhijie Shen via vinodkv)
|
||||
|
||||
MAPREDUCE-5228. Bring back FileInputFormat.Counter and
|
||||
FileOuputFormat.Counter for binary compatibility with 1.x mapred APIs.
|
||||
(Mayank Bansal via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
|
||||
|
@ -26,6 +26,7 @@
|
||||
import java.io.IOException;
|
||||
import java.text.ParseException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.commons.collections.IteratorUtils;
|
||||
@ -40,10 +41,9 @@
|
||||
import org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup;
|
||||
import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
|
||||
import org.apache.hadoop.mapreduce.counters.GenericCounter;
|
||||
import org.apache.hadoop.mapreduce.counters.LimitExceededException;
|
||||
import org.apache.hadoop.mapreduce.counters.Limits;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
|
||||
import org.apache.hadoop.mapreduce.util.CountersStrings;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
|
||||
|
||||
import com.google.common.collect.Iterators;
|
||||
|
||||
@ -64,6 +64,12 @@ public class Counters
|
||||
|
||||
public static int MAX_COUNTER_LIMIT = Limits.COUNTERS_MAX;
|
||||
public static int MAX_GROUP_LIMIT = Limits.GROUPS_MAX;
|
||||
private static HashMap<String, String> depricatedCounterMap =
|
||||
new HashMap<String, String>();
|
||||
|
||||
static {
|
||||
initDepricatedMap();
|
||||
}
|
||||
|
||||
public Counters() {
|
||||
super(groupFactory);
|
||||
@ -73,6 +79,27 @@ public Counters(org.apache.hadoop.mapreduce.Counters newCounters) {
|
||||
super(newCounters, groupFactory);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "deprecation" })
|
||||
private static void initDepricatedMap() {
|
||||
depricatedCounterMap.put(FileInputFormat.Counter.class.getName(),
|
||||
FileInputFormatCounter.class.getName());
|
||||
depricatedCounterMap.put(FileOutputFormat.Counter.class.getName(),
|
||||
FileOutputFormatCounter.class.getName());
|
||||
depricatedCounterMap.put(
|
||||
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.Counter.class
|
||||
.getName(), FileInputFormatCounter.class.getName());
|
||||
depricatedCounterMap.put(
|
||||
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.Counter.class
|
||||
.getName(), FileOutputFormatCounter.class.getName());
|
||||
}
|
||||
|
||||
private static String getNewGroupKey(String oldGroup) {
|
||||
if (depricatedCounterMap.containsKey(oldGroup)) {
|
||||
return depricatedCounterMap.get(oldGroup);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Downgrade new {@link org.apache.hadoop.mapreduce.Counters} to old Counters
|
||||
* @param newCounters new Counters
|
||||
@ -445,6 +472,10 @@ public synchronized Counter findCounter(String group, String name) {
|
||||
" BYTES_READ as counter name instead");
|
||||
return findCounter(FileInputFormatCounter.BYTES_READ);
|
||||
}
|
||||
String newGroupKey = getNewGroupKey(group);
|
||||
if (newGroupKey != null) {
|
||||
group = newGroupKey;
|
||||
}
|
||||
return getGroup(group).getCounterForName(name);
|
||||
}
|
||||
|
||||
|
@ -62,6 +62,11 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
||||
public static final Log LOG =
|
||||
LogFactory.getLog(FileInputFormat.class);
|
||||
|
||||
@Deprecated
|
||||
public static enum Counter {
|
||||
BYTES_READ
|
||||
}
|
||||
|
||||
public static final String NUM_INPUT_FILES =
|
||||
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES;
|
||||
|
||||
|
@ -0,0 +1,18 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
# ResourceBundle properties file for file-input-format counters
|
||||
|
||||
CounterGroupName= File Input Format Counters
|
||||
|
||||
BYTES_READ.name= Bytes Read
|
@ -36,6 +36,11 @@
|
||||
@InterfaceStability.Stable
|
||||
public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {
|
||||
|
||||
@Deprecated
|
||||
public static enum Counter {
|
||||
BYTES_WRITTEN
|
||||
}
|
||||
|
||||
/**
|
||||
* Set whether the output of the job is compressed.
|
||||
* @param conf the {@link JobConf} to modify
|
||||
|
@ -0,0 +1,18 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
# ResourceBundle properties file for file-output-format counters
|
||||
|
||||
CounterGroupName= File Output Format Counters
|
||||
|
||||
BYTES_WRITTEN.name= Bytes Written
|
@ -69,6 +69,11 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
|
||||
|
||||
private static final double SPLIT_SLOP = 1.1; // 10% slop
|
||||
|
||||
@Deprecated
|
||||
public static enum Counter {
|
||||
BYTES_READ
|
||||
}
|
||||
|
||||
private static final PathFilter hiddenFileFilter = new PathFilter(){
|
||||
public boolean accept(Path p){
|
||||
String name = p.getName();
|
||||
|
@ -0,0 +1,18 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
# ResourceBundle properties file for file-input-format counters
|
||||
|
||||
CounterGroupName= File Input Format Counters
|
||||
|
||||
BYTES_READ.name= Bytes Read
|
@ -0,0 +1,18 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
# ResourceBundle properties file for file-output-format counters
|
||||
|
||||
CounterGroupName= File Output Format Counters
|
||||
|
||||
BYTES_WRITTEN.name= Bytes Written
|
@ -60,6 +60,11 @@ public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
|
||||
public static final String COMPRESS_TYPE = "mapreduce.output.fileoutputformat.compress.type";
|
||||
public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir";
|
||||
|
||||
@Deprecated
|
||||
public static enum Counter {
|
||||
BYTES_WRITTEN
|
||||
}
|
||||
|
||||
/**
|
||||
* Set whether the output of the job is compressed.
|
||||
* @param job the job to modify
|
||||
|
@ -80,6 +80,40 @@ private void validateFileCounters(Counters counter, long fileBytesRead,
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private void validateOldFileCounters(Counters counter, long fileBytesRead,
|
||||
long fileBytesWritten, long mapOutputBytes,
|
||||
long mapOutputMaterializedBytes) {
|
||||
|
||||
assertEquals(fileBytesRead,
|
||||
counter.findCounter(FileInputFormat.Counter.BYTES_READ).getValue());
|
||||
|
||||
assertEquals(
|
||||
fileBytesRead,
|
||||
counter
|
||||
.findCounter(
|
||||
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.Counter.BYTES_READ)
|
||||
.getValue());
|
||||
|
||||
assertEquals(fileBytesWritten,
|
||||
counter.findCounter(FileOutputFormat.Counter.BYTES_WRITTEN).getValue());
|
||||
|
||||
assertEquals(
|
||||
fileBytesWritten,
|
||||
counter
|
||||
.findCounter(
|
||||
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.Counter.BYTES_WRITTEN)
|
||||
.getValue());
|
||||
|
||||
if (mapOutputBytes >= 0) {
|
||||
assertTrue(counter.findCounter(TaskCounter.MAP_OUTPUT_BYTES).getValue() != 0);
|
||||
}
|
||||
if (mapOutputMaterializedBytes >= 0) {
|
||||
assertTrue(counter.findCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES)
|
||||
.getValue() != 0);
|
||||
}
|
||||
}
|
||||
|
||||
private void validateCounters(Counters counter, long spillRecCnt,
|
||||
long mapInputRecords, long mapOutputRecords) {
|
||||
// Check if the numer of Spilled Records is same as expected
|
||||
@ -255,7 +289,7 @@ public void testOldCounterA() throws Exception {
|
||||
// 4 records/line = 61440 output records
|
||||
validateCounters(c1, 90112, 15360, 61440);
|
||||
validateFileCounters(c1, inputSize, 0, 0, 0);
|
||||
|
||||
validateOldFileCounters(c1, inputSize, 61928, 0, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -431,6 +465,18 @@ public void testNewCounterD() throws Exception {
|
||||
validateFileCounters(c1, inputSize, 0, -1, -1);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testOldCounters() throws Exception {
|
||||
Counters c1 = new Counters();
|
||||
c1.incrCounter(FileInputFormat.Counter.BYTES_READ, 100);
|
||||
c1.incrCounter(FileOutputFormat.Counter.BYTES_WRITTEN, 200);
|
||||
c1.incrCounter(TaskCounter.MAP_OUTPUT_BYTES, 100);
|
||||
c1.incrCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES, 100);
|
||||
validateFileCounters(c1, 100, 200, 100, 100);
|
||||
validateOldFileCounters(c1, 100, 200, 100, 100);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increases the JVM's heap usage to the specified target value.
|
||||
*/
|
||||
|
Loading…
Reference in New Issue
Block a user