MAPREDUCE-5235. Bring back old fields and exceptions in Counters for binary compatibility with mapred in 1.x. Contributed by Mayank Bansal
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1484992 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
98a692fd63
commit
6377bfc505
@ -243,6 +243,9 @@ Release 2.0.5-beta - UNRELEASED
|
|||||||
MAPREDUCE-5222. Bring back some methods and constants in Jobclient for
|
MAPREDUCE-5222. Bring back some methods and constants in Jobclient for
|
||||||
binary compatibility with mapred in 1.x. (Karthik Kambatla via vinodkv)
|
binary compatibility with mapred in 1.x. (Karthik Kambatla via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-5235. Bring back old fields and exceptions in Counters for
|
||||||
|
binary compatibility with mapred in 1.x. (Mayank Bansal via vinodkv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
|
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
|
||||||
|
@ -40,6 +40,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup;
|
import org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup;
|
||||||
import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
|
import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
|
||||||
import org.apache.hadoop.mapreduce.counters.GenericCounter;
|
import org.apache.hadoop.mapreduce.counters.GenericCounter;
|
||||||
|
import org.apache.hadoop.mapreduce.counters.LimitExceededException;
|
||||||
import org.apache.hadoop.mapreduce.counters.Limits;
|
import org.apache.hadoop.mapreduce.counters.Limits;
|
||||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
|
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
|
||||||
import org.apache.hadoop.mapreduce.util.CountersStrings;
|
import org.apache.hadoop.mapreduce.util.CountersStrings;
|
||||||
@ -62,6 +63,7 @@ public class Counters
|
|||||||
extends AbstractCounters<Counters.Counter, Counters.Group> {
|
extends AbstractCounters<Counters.Counter, Counters.Group> {
|
||||||
|
|
||||||
public static int MAX_COUNTER_LIMIT = Limits.COUNTERS_MAX;
|
public static int MAX_COUNTER_LIMIT = Limits.COUNTERS_MAX;
|
||||||
|
public static int MAX_GROUP_LIMIT = Limits.GROUPS_MAX;
|
||||||
|
|
||||||
public Counters() {
|
public Counters() {
|
||||||
super(groupFactory);
|
super(groupFactory);
|
||||||
@ -595,4 +597,21 @@ public static Counters fromEscapedCompactString(String compactString)
|
|||||||
throws ParseException {
|
throws ParseException {
|
||||||
return parseEscapedCompactString(compactString, new Counters());
|
return parseEscapedCompactString(compactString, new Counters());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Counter exception thrown when the number of counters exceed the limit
|
||||||
|
*/
|
||||||
|
public static class CountersExceededException extends RuntimeException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
public CountersExceededException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only allows chaining of related exceptions
|
||||||
|
public CountersExceededException(CountersExceededException cause) {
|
||||||
|
super(cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,9 +19,10 @@
|
|||||||
package org.apache.hadoop.mapreduce.counters;
|
package org.apache.hadoop.mapreduce.counters;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.mapred.Counters.CountersExceededException;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class LimitExceededException extends RuntimeException {
|
public class LimitExceededException extends CountersExceededException {
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
@ -26,9 +26,12 @@
|
|||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.mapred.Counters.Counter;
|
import org.apache.hadoop.mapred.Counters.Counter;
|
||||||
|
import org.apache.hadoop.mapred.Counters.CountersExceededException;
|
||||||
import org.apache.hadoop.mapred.Counters.Group;
|
import org.apache.hadoop.mapred.Counters.Group;
|
||||||
import org.apache.hadoop.mapreduce.FileSystemCounter;
|
import org.apache.hadoop.mapreduce.FileSystemCounter;
|
||||||
import org.apache.hadoop.mapreduce.JobCounter;
|
import org.apache.hadoop.mapreduce.JobCounter;
|
||||||
@ -43,6 +46,12 @@ enum myCounters {TEST1, TEST2};
|
|||||||
private static final long MAX_VALUE = 10;
|
private static final long MAX_VALUE = 10;
|
||||||
private static final Log LOG = LogFactory.getLog(TestCounters.class);
|
private static final Log LOG = LogFactory.getLog(TestCounters.class);
|
||||||
|
|
||||||
|
static final Enum<?> FRAMEWORK_COUNTER = TaskCounter.CPU_MILLISECONDS;
|
||||||
|
static final long FRAMEWORK_COUNTER_VALUE = 8;
|
||||||
|
static final String FS_SCHEME = "HDFS";
|
||||||
|
static final FileSystemCounter FS_COUNTER = FileSystemCounter.BYTES_READ;
|
||||||
|
static final long FS_COUNTER_VALUE = 10;
|
||||||
|
|
||||||
// Generates enum based counters
|
// Generates enum based counters
|
||||||
private Counters getEnumCounters(Enum[] keys) {
|
private Counters getEnumCounters(Enum[] keys) {
|
||||||
Counters counters = new Counters();
|
Counters counters = new Counters();
|
||||||
@ -255,6 +264,60 @@ public void testMakeCompactString() {
|
|||||||
cs.equals(GC1 + ',' + GC2) || cs.equals(GC2 + ',' + GC1));
|
cs.equals(GC1 + ',' + GC2) || cs.equals(GC2 + ',' + GC1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCounterLimits() {
|
||||||
|
testMaxCountersLimits(new Counters());
|
||||||
|
testMaxGroupsLimits(new Counters());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testMaxCountersLimits(final Counters counters) {
|
||||||
|
for (int i = 0; i < org.apache.hadoop.mapred.Counters.MAX_COUNTER_LIMIT; ++i) {
|
||||||
|
counters.findCounter("test", "test" + i);
|
||||||
|
}
|
||||||
|
setExpected(counters);
|
||||||
|
shouldThrow(CountersExceededException.class, new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
counters.findCounter("test", "bad");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
checkExpected(counters);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testMaxGroupsLimits(final Counters counters) {
|
||||||
|
for (int i = 0; i < org.apache.hadoop.mapred.Counters.MAX_GROUP_LIMIT; ++i) {
|
||||||
|
// assuming COUNTERS_MAX > GROUPS_MAX
|
||||||
|
counters.findCounter("test" + i, "test");
|
||||||
|
}
|
||||||
|
setExpected(counters);
|
||||||
|
shouldThrow(CountersExceededException.class, new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
counters.findCounter("bad", "test");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
checkExpected(counters);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setExpected(Counters counters) {
|
||||||
|
counters.findCounter(FRAMEWORK_COUNTER).setValue(FRAMEWORK_COUNTER_VALUE);
|
||||||
|
counters.findCounter(FS_SCHEME, FS_COUNTER).setValue(FS_COUNTER_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkExpected(Counters counters) {
|
||||||
|
assertEquals(FRAMEWORK_COUNTER_VALUE,
|
||||||
|
counters.findCounter(FRAMEWORK_COUNTER).getValue());
|
||||||
|
assertEquals(FS_COUNTER_VALUE, counters.findCounter(FS_SCHEME, FS_COUNTER)
|
||||||
|
.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void shouldThrow(Class<? extends Exception> ecls, Runnable runnable) {
|
||||||
|
try {
|
||||||
|
runnable.run();
|
||||||
|
} catch (CountersExceededException e) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Assert.fail("Should've thrown " + ecls.getSimpleName());
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException {
|
public static void main(String[] args) throws IOException {
|
||||||
new TestCounters().testCounters();
|
new TestCounters().testCounters();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user