From b304062f1ffee078ea9575dcee5583d43e33508c Mon Sep 17 00:00:00 2001 From: Mahadev Konar Date: Fri, 28 Oct 2011 01:40:24 +0000 Subject: [PATCH] MAPREDUCE-3186. User jobs are getting hanged if the Resource manager process goes down and comes up while job is getting executed. (Eric Payne via mahadev) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1190122 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../v2/app/local/LocalContainerAllocator.java | 50 +++++++++++++++++-- .../mapreduce/v2/app/rm/RMCommunicator.java | 3 ++ .../v2/app/rm/RMContainerAllocator.java | 42 +++++++++++++++- .../apache/hadoop/mapreduce/MRJobConfig.java | 9 ++++ 5 files changed, 100 insertions(+), 7 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ea550b874b..e225adcd55 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1824,6 +1824,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3282. bin/mapred job -list throws exception. (acmurthy via mahadev) + MAPREDUCE-3186. User jobs are getting hanged if the Resource manager process goes down + and comes up while job is getting executed. (Eric Payne via mahadev) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java index fca3cacee0..f0ce272bb8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java @@ -23,19 +23,23 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; +import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.AMResponse; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -57,8 +61,10 @@ public class LocalContainerAllocator extends RMCommunicator LogFactory.getLog(LocalContainerAllocator.class); private final EventHandler eventHandler; - private final ApplicationId appID; +// private final ApplicationId appID; private AtomicInteger containerCount = new AtomicInteger(); + private long retryInterval; + private long retrystartTime; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -67,7 +73,19 @@ public LocalContainerAllocator(ClientService clientService, AppContext context) { super(clientService, context); this.eventHandler = context.getEventHandler(); - this.appID = context.getApplicationID(); +// this.appID = context.getApplicationID(); + + } + + @Override + public void init(Configuration conf) { + super.init(conf); + retryInterval = + getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, + MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS); + // Init startTime to current time. If all goes well, it will be reset after + // first attempt to contact RM. + retrystartTime = System.currentTimeMillis(); } @Override @@ -77,10 +95,32 @@ protected synchronized void heartbeat() throws Exception { .getApplicationProgress(), new ArrayList(), new ArrayList()); AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); - AMResponse response = allocateResponse.getAMResponse(); + AMResponse response; + try { + response = allocateResponse.getAMResponse(); + // Reset retry count if no exception occurred. + retrystartTime = System.currentTimeMillis(); + } catch (Exception e) { + // This can happen when the connection to the RM has gone down. Keep + // re-trying until the retryInterval has expired. + if (System.currentTimeMillis() - retrystartTime >= retryInterval) { + eventHandler.handle(new JobEvent(this.getJob().getID(), + JobEventType.INTERNAL_ERROR)); + throw new YarnException("Could not contact RM after " + + retryInterval + " milliseconds."); + } + // Throw this up to the caller, which may decide to ignore it and + // continue to attempt to contact the RM. + throw e; + } if (response.getReboot()) { - // TODO LOG.info("Event from RM: shutting down Application Master"); + // This can happen if the RM has been restarted. If it is in that state, + // this application must clean itself up. + eventHandler.handle(new JobEvent(this.getJob().getID(), + JobEventType.INTERNAL_ERROR)); + throw new YarnException("Resource Manager doesn't recognize AttemptId: " + + this.getContext().getApplicationID()); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index d4b849035a..68d9c2462b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -233,6 +233,9 @@ public void run() { Thread.sleep(rmPollInterval); try { heartbeat(); + } catch (YarnException e) { + LOG.error("Error communicating with RM: " + e.getMessage() , e); + return; } catch (Exception e) { LOG.error("ERROR IN CONTACTING RM. ", e); // TODO: for other exceptions diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 9fa08e5b64..e8588e5cd0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -128,6 +129,8 @@ added to the pending and are ramped up (added to scheduled) based private float maxReduceRampupLimit = 0; private float maxReducePreemptionLimit = 0; private float reduceSlowStart = 0; + private long retryInterval; + private long retrystartTime; public RMContainerAllocator(ClientService clientService, AppContext context) { super(clientService, context); @@ -146,6 +149,11 @@ public void init(Configuration conf) { MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT, MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT); RackResolver.init(conf); + retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, + MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS); + // Init startTime to current time. If all goes well, it will be reset after + // first attempt to contact RM. + retrystartTime = System.currentTimeMillis(); } @Override @@ -429,11 +437,41 @@ private synchronized String getStat() { " rackLocalAssigned:" + rackLocalAssigned + " availableResources(headroom):" + getAvailableResources(); } - + @SuppressWarnings("unchecked") private List getResources() throws Exception { int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null - AMResponse response = makeRemoteRequest(); + AMResponse response; + /* + * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS + * milliseconds before aborting. During this interval, AM will still try + * to contact the RM. + */ + try { + response = makeRemoteRequest(); + // Reset retry count if no exception occurred. + retrystartTime = System.currentTimeMillis(); + } catch (Exception e) { + // This can happen when the connection to the RM has gone down. Keep + // re-trying until the retryInterval has expired. + if (System.currentTimeMillis() - retrystartTime >= retryInterval) { + eventHandler.handle(new JobEvent(this.getJob().getID(), + JobEventType.INTERNAL_ERROR)); + throw new YarnException("Could not contact RM after " + + retryInterval + " milliseconds."); + } + // Throw this up to the caller, which may decide to ignore it and + // continue to attempt to contact the RM. + throw e; + } + if (response.getReboot()) { + // This can happen if the RM has been restarted. If it is in that state, + // this application must clean itself up. + eventHandler.handle(new JobEvent(this.getJob().getID(), + JobEventType.INTERNAL_ERROR)); + throw new YarnException("Resource Manager doesn't recognize AttemptId: " + + this.getContext().getApplicationID()); + } int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0; List newContainers = response.getAllocatedContainers(); List finishedContainers = response.getCompletedContainersStatuses(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index ea2c63f578..769d842c60 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -403,6 +403,15 @@ public interface MRJobConfig { MR_AM_PREFIX + "scheduler.heartbeat.interval-ms"; public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 2000; + /** + * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS + * milliseconds before aborting. During this interval, AM will still try + * to contact the RM. + */ + public static final String MR_AM_TO_RM_WAIT_INTERVAL_MS = + MR_AM_PREFIX + "scheduler.connection.wait.interval-ms"; + public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000; + /** * Boolean. Create the base dirs in the JobHistoryEventHandler * Set to false for multi-user clusters. This is an internal config that