From 7e6f384dd742de21f29e96ee76df5316529c9019 Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Mon, 23 Mar 2015 22:51:20 +0530 Subject: [PATCH] MAPREDUCE-6242. Progress report log is incredibly excessive in application master. Contributed by Varun Saxena. --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../java/org/apache/hadoop/mapred/Task.java | 13 +- .../apache/hadoop/mapreduce/MRJobConfig.java | 5 + .../mapred/TestTaskProgressReporter.java | 160 ++++++++++++++++++ 4 files changed, 177 insertions(+), 4 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 20505b63d7..b8a2a1cdd2 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -300,6 +300,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6281. Fix javadoc in Terasort. (Albert Chu via ozawa) + MAPREDUCE-6242. Progress report log is incredibly excessive in + application master. (Varun Saxena via devaraj) + Release 2.7.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 7fa5d02682..bf5ca22f6d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -229,6 +229,11 @@ public Task(String jobFile, TaskAttemptID taskId, int partition, gcUpdater = new GcTimeUpdater(); } + @VisibleForTesting + void setTaskDone() { + taskDone.set(true); + } + //////////////////////////////////////////// // Accessors //////////////////////////////////////////// @@ -536,9 +541,6 @@ public void localizeConfiguration(JobConf conf) throws IOException { public abstract void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException; - /** The number of milliseconds between progress reports. */ - public static final int PROGRESS_INTERVAL = 3000; - private transient Progress taskProgress = new Progress(); // Current counters @@ -714,6 +716,9 @@ public void run() { int remainingRetries = MAX_RETRIES; // get current flag value and reset it as well boolean sendProgress = resetProgressFlag(); + long taskProgressInterval = + conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, + MRJobConfig.DEFAULT_TASK_PROGRESS_REPORT_INTERVAL); while (!taskDone.get()) { synchronized (lock) { done = false; @@ -726,7 +731,7 @@ public void run() { if (taskDone.get()) { break; } - lock.wait(PROGRESS_INTERVAL); + lock.wait(taskProgressInterval); } if (taskDone.get()) { break; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index f0a6ddf7a8..947c814261 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -49,6 +49,11 @@ public interface MRJobConfig { public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed"; + public static final String TASK_PROGRESS_REPORT_INTERVAL = + "mapreduce.task.progress-report.interval"; + /** The number of milliseconds between progress reports. */ + public static final int DEFAULT_TASK_PROGRESS_REPORT_INTERVAL = 3000; + public static final String JAR = "mapreduce.job.jar"; public static final String ID = "mapreduce.job.id"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java new file mode 100644 index 0000000000..0bceb87e9a --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java @@ -0,0 +1,160 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.mapred.SortedRanges.Range; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; +import org.junit.Assert; +import org.junit.Test; + +public class TestTaskProgressReporter { + private static int statusUpdateTimes = 0; + private FakeUmbilical fakeUmbilical = new FakeUmbilical(); + + private static class DummyTask extends Task { + @Override + public void run(JobConf job, TaskUmbilicalProtocol umbilical) + throws IOException, ClassNotFoundException, InterruptedException { + } + + @Override + public boolean isMapTask() { + return true; + } + } + + private static class FakeUmbilical implements TaskUmbilicalProtocol { + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return 0; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + return null; + } + + @Override + public JvmTask getTask(JvmContext context) throws IOException { + return null; + } + + @Override + public AMFeedback statusUpdate(TaskAttemptID taskId, + TaskStatus taskStatus) throws IOException, InterruptedException { + statusUpdateTimes++; + AMFeedback feedback = new AMFeedback(); + feedback.setTaskFound(true); + feedback.setPreemption(true); + return feedback; + } + + @Override + public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) + throws IOException { + } + + @Override + public void reportNextRecordRange(TaskAttemptID taskid, Range range) + throws IOException { + } + + @Override + public void done(TaskAttemptID taskid) throws IOException { + } + + @Override + public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus) + throws IOException, InterruptedException { + } + + @Override + public boolean canCommit(TaskAttemptID taskid) throws IOException { + return false; + } + + @Override + public void shuffleError(TaskAttemptID taskId, String message) + throws IOException { + } + + @Override + public void fsError(TaskAttemptID taskId, String message) + throws IOException { + } + + @Override + public void fatalError(TaskAttemptID taskId, String message) + throws IOException { + } + + @Override + public MapTaskCompletionEventsUpdate getMapCompletionEvents( + JobID jobId, int fromIndex, int maxLocs, TaskAttemptID id) + throws IOException { + return null; + } + + @Override + public void preempted(TaskAttemptID taskId, TaskStatus taskStatus) + throws IOException, InterruptedException { + } + + @Override + public TaskCheckpointID getCheckpointID(TaskID taskID) { + return null; + } + + @Override + public void setCheckpointID(TaskID tid, TaskCheckpointID cid) { + } + } + + private class DummyTaskReporter extends Task.TaskReporter { + public DummyTaskReporter(Task task) { + task.super(task.getProgress(), fakeUmbilical); + } + @Override + public void setProgress(float progress) { + super.setProgress(progress); + } + } + + @Test (timeout=10000) + public void testTaskProgress() throws Exception { + JobConf job = new JobConf(); + job.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 1000); + Task task = new DummyTask(); + task.setConf(job); + DummyTaskReporter reporter = new DummyTaskReporter(task); + Thread t = new Thread(reporter); + t.start(); + Thread.sleep(2100); + task.setTaskDone(); + reporter.resetDoneFlag(); + t.join(); + Assert.assertEquals(statusUpdateTimes, 2); + } +} \ No newline at end of file