MAPREDUCE-2187. Reporter sends progress during sort/merge. Contributed by Anupam Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1152964 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-08-01 22:53:08 +00:00
parent 22f232bce2
commit 9bac807ced
6 changed files with 28 additions and 3 deletions

View File

@ -40,6 +40,9 @@ Trunk (unreleased changes)
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via
acmurthy)
MAPREDUCE-2365. Add counters to track bytes (read,written) via MAPREDUCE-2365. Add counters to track bytes (read,written) via
File(Input,Output)Format. (Siddharth Seth via acmurthy) File(Input,Output)Format. (Siddharth Seth via acmurthy)

View File

@ -1041,6 +1041,14 @@
</property> </property>
<!-- End of TaskTracker DistributedCache configuration --> <!-- End of TaskTracker DistributedCache configuration -->
<property>
<name>mapreduce.task.combine.progress.records</name>
<value>10000</value>
<description> The number of records to process during combine output collection
before sending a progress notification to the TaskTracker.
</description>
</property>
<property> <property>
<name>mapreduce.task.merge.progress.records</name> <name>mapreduce.task.merge.progress.records</name>
<value>10000</value> <value>10000</value>

View File

@ -946,7 +946,7 @@ public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
if (combinerRunner != null) { if (combinerRunner != null) {
final Counters.Counter combineOutputCounter = final Counters.Counter combineOutputCounter =
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter); combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, conf);
} else { } else {
combineCollector = null; combineCollector = null;
} }

View File

@ -352,7 +352,7 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
Class combinerClass = conf.getCombinerClass(); Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector = CombineOutputCollector combineCollector =
(null != combinerClass) ? (null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter) : null; new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
Shuffle shuffle = Shuffle shuffle =
new Shuffle(getTaskID(), job, FileSystem.getLocal(job), umbilical, new Shuffle(getTaskID(), job, FileSystem.getLocal(job), umbilical,

View File

@ -58,6 +58,7 @@
import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
import org.apache.hadoop.mapreduce.task.ReduceContextImpl; import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin; import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
@ -79,6 +80,7 @@ abstract public class Task implements Writable, Configurable {
LogFactory.getLog(Task.class); LogFactory.getLog(Task.class);
public static String MERGED_OUTPUT_PREFIX = ".merged"; public static String MERGED_OUTPUT_PREFIX = ".merged";
public static final long DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS = 10000;
/** /**
* Counters to measure the usage of the different file systems. * Counters to measure the usage of the different file systems.
@ -1176,16 +1178,26 @@ public static class CombineOutputCollector<K extends Object, V extends Object>
implements OutputCollector<K, V> { implements OutputCollector<K, V> {
private Writer<K, V> writer; private Writer<K, V> writer;
private Counters.Counter outCounter; private Counters.Counter outCounter;
public CombineOutputCollector(Counters.Counter outCounter) { private Progressable progressable;
private long progressBar;
public CombineOutputCollector(Counters.Counter outCounter, Progressable progressable, Configuration conf) {
this.outCounter = outCounter; this.outCounter = outCounter;
this.progressable=progressable;
progressBar = conf.getLong(MRJobConfig.COMBINE_RECORDS_BEFORE_PROGRESS, DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS);
} }
public synchronized void setWriter(Writer<K, V> writer) { public synchronized void setWriter(Writer<K, V> writer) {
this.writer = writer; this.writer = writer;
} }
public synchronized void collect(K key, V value) public synchronized void collect(K key, V value)
throws IOException { throws IOException {
outCounter.increment(1); outCounter.increment(1);
writer.append(key, value); writer.append(key, value);
if ((outCounter.getValue() % progressBar) == 0) {
progressable.progress();
}
} }
} }

View File

@ -260,6 +260,8 @@ public interface MRJobConfig {
public static final String REDUCE_MEMTOMEM_ENABLED = "mapreduce.reduce.merge.memtomem.enabled"; public static final String REDUCE_MEMTOMEM_ENABLED = "mapreduce.reduce.merge.memtomem.enabled";
public static final String COMBINE_RECORDS_BEFORE_PROGRESS = "mapreduce.task.combine.progress.records";
public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers"; public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal"; public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal";