diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 18270f68ca..0a9ee8d02d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -288,6 +288,9 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-6045. need close the DataInputStream after open it in TestMapReduce.java (zxu via rkanter) + MAPREDUCE-6199. AbstractCounters are not reset completely on + deserialization (adhoot via rkanter) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java index 401bbb2502..dd81ebb55f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java @@ -307,6 +307,10 @@ public synchronized void readFields(DataInput in) throws IOException { fgroups.put(group.getName(), group); } int numGroups = WritableUtils.readVInt(in); + if (!groups.isEmpty()) { + groups.clear(); + limits.reset(); + } while (numGroups-- > 0) { limits.checkGroups(groups.size() + 1); G group = groupFactory.newGenericGroup( diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java index 3821694b2f..9546c8d763 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java @@ -124,8 +124,15 @@ public synchronized LimitExceededException violation() { return firstViolation; } + // This allows initialization of global settings and not for an instance public static synchronized void reset(Configuration conf) { isInited = false; init(conf); } + + // This allows resetting of an instance to allow reuse + public synchronized void reset() { + totalCounters = 0; + firstViolation = null; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestCounters.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestCounters.java index 83d689c1e9..0215568c47 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestCounters.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestCounters.java @@ -17,8 +17,12 @@ */ package org.apache.hadoop.mapreduce; +import java.io.IOException; import java.util.Random; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; import org.junit.Test; import static org.junit.Assert.*; @@ -70,7 +74,40 @@ public void testCounterValue() { testMaxGroups(new Counters()); } } - + + @Test public void testResetOnDeserialize() throws IOException { + // Allow only one counterGroup + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.COUNTER_GROUPS_MAX_KEY, 1); + Limits.init(conf); + + Counters countersWithOneGroup = new Counters(); + countersWithOneGroup.findCounter("firstOf1Allowed", "First group"); + boolean caughtExpectedException = false; + try { + countersWithOneGroup.findCounter("secondIsTooMany", "Second group"); + } + catch (LimitExceededException _) { + caughtExpectedException = true; + } + + assertTrue("Did not throw expected exception", + caughtExpectedException); + + Counters countersWithZeroGroups = new Counters(); + DataOutputBuffer out = new DataOutputBuffer(); + countersWithZeroGroups.write(out); + + DataInputBuffer in = new DataInputBuffer(); + in.reset(out.getData(), out.getLength()); + + countersWithOneGroup.readFields(in); + + // After reset one should be able to add a group + countersWithOneGroup.findCounter("firstGroupAfterReset", "After reset " + + "limit should be set back to zero"); + } + @Test public void testCountersIncrement() { Counters fCounters = new Counters();