MAPREDUCE-7272. TaskAttemptListenerImpl excessive log messages. Contributed by Ahmed Hussein (ahussein)

This commit is contained in:
Eric E Payne 2020-04-13 18:20:07 +00:00
parent 8d49229c37
commit 11d17417ce
7 changed files with 183 additions and 7 deletions

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
@ -58,6 +59,7 @@
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -94,6 +96,12 @@ public class TaskAttemptListenerImpl extends CompositeService
AtomicReference<TaskAttemptStatus>> attemptIdToStatus
= new ConcurrentHashMap<>();
/**
* A Map to keep track of the history of logging each task attempt.
*/
private ConcurrentHashMap<TaskAttemptID, TaskProgressLogPair>
taskAttemptLogProgressStamps = new ConcurrentHashMap<>();
private Set<WrappedJvmID> launchedJVMs = Collections
.newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
@ -123,10 +131,12 @@ public TaskAttemptListenerImpl(AppContext context,
@Override
protected void serviceInit(Configuration conf) throws Exception {
registerHeartbeatHandler(conf);
commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
super.serviceInit(conf);
registerHeartbeatHandler(conf);
commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
// initialize the delta threshold for logging the task progress.
MRJobConfUtil.setTaskLogProgressDeltaThresholds(conf);
super.serviceInit(conf);
}
@Override
@ -410,8 +420,10 @@ public AMFeedback statusUpdate(TaskAttemptID taskAttemptID,
taskAttemptStatus.id = yarnAttemptID;
// Task sends the updated progress to the TT.
taskAttemptStatus.progress = taskStatus.getProgress();
LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : "
+ taskStatus.getProgress());
// log the new progress
taskAttemptLogProgressStamps.computeIfAbsent(taskAttemptID,
k -> new TaskProgressLogPair(taskAttemptID))
.update(taskStatus.getProgress());
// Task sends the updated state-string to the TT.
taskAttemptStatus.stateString = taskStatus.getStateString();
// Task sends the updated phase to the TT.
@ -637,4 +649,68 @@ private void coalesceStatusUpdate(TaskAttemptId yarnAttemptID,
AtomicReference<TaskAttemptStatus>> getAttemptIdToStatus() {
return attemptIdToStatus;
}
/**
* Entity to keep track of the taskAttempt, last time it was logged,
* and the
* progress that has been logged.
*/
class TaskProgressLogPair {
/**
* The taskAttemptId of that history record.
*/
private final TaskAttemptID taskAttemptID;
/**
* Timestamp of last time the progress was logged.
*/
private volatile long logTimeStamp;
/**
* Snapshot of the last logged progress.
*/
private volatile double prevProgress;
TaskProgressLogPair(final TaskAttemptID attemptID) {
taskAttemptID = attemptID;
prevProgress = 0.0;
logTimeStamp = 0;
}
private void resetLog(final boolean doLog,
final float progress, final double processedProgress,
final long timestamp) {
if (doLog) {
prevProgress = processedProgress;
logTimeStamp = timestamp;
LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : "
+ progress);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Progress of TaskAttempt " + taskAttemptID + " is : "
+ progress);
}
}
}
public void update(final float progress) {
final double processedProgress =
MRJobConfUtil.convertTaskProgressToFactor(progress);
final double diffProgress = processedProgress - prevProgress;
final long currentTime = Time.monotonicNow();
boolean result =
(Double.compare(diffProgress,
MRJobConfUtil.getTaskProgressMinDeltaThreshold()) >= 0);
if (!result) {
// check if time has expired.
result = ((currentTime - logTimeStamp)
>= MRJobConfUtil.getTaskProgressWaitDeltaTimeThreshold());
}
// It is helpful to log the progress when it reaches 1.0F.
if (Float.compare(progress, 1.0f) == 0) {
result = true;
taskAttemptLogProgressStamps.remove(taskAttemptID);
}
resetLog(result, progress, processedProgress, currentTime);
}
}
}

View File

@ -51,7 +51,7 @@ public void testFinshingAttemptTimeout()
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 100);
conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10);
conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
AppContext appCtx = mock(AppContext.class);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
RMHeartbeatHandler rmHeartbeatHandler =

View File

@ -501,6 +501,8 @@ protected void stopRpcServer() {
Configuration conf = new Configuration();
conf.setLong(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS, 1);
tal.init(conf);
tal.start();

View File

@ -271,6 +271,7 @@ public InetSocketAddress getAddress() {
protected void serviceInit(Configuration conf) throws Exception {
conf.setInt(MRJobConfig.TASK_TIMEOUT, 1*1000);//reduce timeout
conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1*1000);
conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
super.serviceInit(conf);
}
};

View File

@ -63,6 +63,7 @@ public void testTaskTimeout() throws InterruptedException {
// so that TASK_TIMEOUT is not overridden
conf.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 5);
conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms
conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
hb.init(conf);
hb.start();
@ -205,6 +206,7 @@ public void testTaskUnregistered() throws Exception {
new TaskHeartbeatHandler(mockHandler, clock, 1);
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
hb.init(conf);
hb.start();
try {

View File

@ -372,6 +372,29 @@ public interface MRJobConfig {
public static final int TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT = 20 * 1000;
/**
* TaskAttemptListenerImpl will log the task progress when the delta progress
* is larger than or equal the defined value.
* The double value has to be between 0, and 1 with two decimals.
*/
String TASK_LOG_PROGRESS_DELTA_THRESHOLD =
"mapreduce.task.log.progress.delta.threshold";
/**
* Default delta progress is set to 5%.
*/
double TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT = 0.05;
/**
* TaskAttemptListenerImpl will log the task progress when the defined value
* in seconds expires.
* This helps to debug task attempts that are doing very slow progress.
*/
String TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS =
"mapreduce.task.log.progress.wait.interval-seconds";
/**
* Default period to log the task attempt progress is 60 seconds.
*/
long TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT = 60L;
public static final String TASK_ID = "mapreduce.task.id";
public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.mapreduce.util;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
@ -58,4 +59,75 @@ public static long getTaskProgressReportInterval(final Configuration conf) {
}
public static final float TASK_REPORT_INTERVAL_TO_TIMEOUT_RATIO = 0.01f;
/**
* Configurations to control the frequency of logging of task Attempt.
*/
public static final double PROGRESS_MIN_DELTA_FACTOR = 100.0;
private static volatile Double progressMinDeltaThreshold = null;
private static volatile Long progressMaxWaitDeltaTimeThreshold = null;
/**
* load the values defined from a configuration file including the delta
* progress and the maximum time between each log message.
* @param conf
*/
public static void setTaskLogProgressDeltaThresholds(
final Configuration conf) {
if (progressMinDeltaThreshold == null) {
progressMinDeltaThreshold =
new Double(PROGRESS_MIN_DELTA_FACTOR
* conf.getDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD,
MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT));
}
if (progressMaxWaitDeltaTimeThreshold == null) {
progressMaxWaitDeltaTimeThreshold =
TimeUnit.SECONDS.toMillis(conf
.getLong(
MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS,
MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT));
}
}
/**
* Retrieves the min delta progress required to log the task attempt current
* progress.
* @return the defined threshold in the conf.
* returns the default value if
* {@link #setTaskLogProgressDeltaThresholds} has not been called.
*/
public static double getTaskProgressMinDeltaThreshold() {
if (progressMinDeltaThreshold == null) {
return PROGRESS_MIN_DELTA_FACTOR
* MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT;
}
return progressMinDeltaThreshold.doubleValue();
}
/**
* Retrieves the min time required to log the task attempt current
* progress.
* @return the defined threshold in the conf.
* returns the default value if
* {@link #setTaskLogProgressDeltaThresholds} has not been called.
*/
public static long getTaskProgressWaitDeltaTimeThreshold() {
if (progressMaxWaitDeltaTimeThreshold == null) {
return TimeUnit.SECONDS.toMillis(
MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT);
}
return progressMaxWaitDeltaTimeThreshold.longValue();
}
/**
* Coverts a progress between 0.0 to 1.0 to double format used to log the
* task attempt.
* @param progress of the task which is a value between 0.0 and 1.0.
* @return the double value that is less than or equal to the argument
* multiplied by {@link #PROGRESS_MIN_DELTA_FACTOR}.
*/
public static double convertTaskProgressToFactor(final float progress) {
return Math.floor(progress * MRJobConfUtil.PROGRESS_MIN_DELTA_FACTOR);
}
}