diff --git a/mapreduce/CHANGES.txt b/mapreduce/CHANGES.txt
index aee8b0a8ce..a95155a931 100644
--- a/mapreduce/CHANGES.txt
+++ b/mapreduce/CHANGES.txt
@@ -40,6 +40,9 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via
+ acmurthy)
+
MAPREDUCE-2365. Add counters to track bytes (read,written) via
File(Input,Output)Format. (Siddharth Seth via acmurthy)
diff --git a/mapreduce/src/java/mapred-default.xml b/mapreduce/src/java/mapred-default.xml
index 0b74e9778c..db2d79a35d 100644
--- a/mapreduce/src/java/mapred-default.xml
+++ b/mapreduce/src/java/mapred-default.xml
@@ -1041,6 +1041,14 @@
+
+ mapreduce.task.combine.progress.records
+ 10000
+ The number of records to process during combine output collection
+ before sending a progress notification to the TaskTracker.
+
+
+
mapreduce.task.merge.progress.records
10000
diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java b/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java
index 44ba9a7e68..951b45ae70 100644
--- a/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java
+++ b/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java
@@ -946,7 +946,7 @@ public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
if (combinerRunner != null) {
final Counters.Counter combineOutputCounter =
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
- combineCollector= new CombineOutputCollector(combineOutputCounter);
+ combineCollector= new CombineOutputCollector(combineOutputCounter, reporter, conf);
} else {
combineCollector = null;
}
diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java b/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java
index 0225982139..6256c66273 100644
--- a/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java
+++ b/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java
@@ -352,7 +352,7 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
- new CombineOutputCollector(reduceCombineOutputCounter) : null;
+ new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
Shuffle shuffle =
new Shuffle(getTaskID(), job, FileSystem.getLocal(job), umbilical,
diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/Task.java b/mapreduce/src/java/org/apache/hadoop/mapred/Task.java
index f5abb3022a..8ad56a7d05 100644
--- a/mapreduce/src/java/org/apache/hadoop/mapred/Task.java
+++ b/mapreduce/src/java/org/apache/hadoop/mapred/Task.java
@@ -58,6 +58,7 @@
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
@@ -79,6 +80,7 @@ abstract public class Task implements Writable, Configurable {
LogFactory.getLog(Task.class);
public static String MERGED_OUTPUT_PREFIX = ".merged";
+ public static final long DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS = 10000;
/**
* Counters to measure the usage of the different file systems.
@@ -1176,16 +1178,26 @@ public static class CombineOutputCollector
implements OutputCollector {
private Writer writer;
private Counters.Counter outCounter;
- public CombineOutputCollector(Counters.Counter outCounter) {
+ private Progressable progressable;
+ private long progressBar;
+
+ public CombineOutputCollector(Counters.Counter outCounter, Progressable progressable, Configuration conf) {
this.outCounter = outCounter;
+ this.progressable=progressable;
+ progressBar = conf.getLong(MRJobConfig.COMBINE_RECORDS_BEFORE_PROGRESS, DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS);
}
+
public synchronized void setWriter(Writer writer) {
this.writer = writer;
}
+
public synchronized void collect(K key, V value)
throws IOException {
outCounter.increment(1);
writer.append(key, value);
+ if ((outCounter.getValue() % progressBar) == 0) {
+ progressable.progress();
+ }
}
}
diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index bcaeaf147a..0054646caf 100644
--- a/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -260,6 +260,8 @@ public interface MRJobConfig {
public static final String REDUCE_MEMTOMEM_ENABLED = "mapreduce.reduce.merge.memtomem.enabled";
+ public static final String COMBINE_RECORDS_BEFORE_PROGRESS = "mapreduce.task.combine.progress.records";
+
public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal";