YARN-3136. Fixed a synchronization problem of AbstractYarnScheduler#getTransferredContainers. Contributed by Sunil G

This commit is contained in:
Jian He 2015-04-18 12:45:38 -07:00
parent f47a5763ac
commit 497c86b485
4 changed files with 51 additions and 24 deletions

View File

@ -227,6 +227,9 @@ Release 2.8.0 - UNRELEASED
YARN-3493. RM fails to come up with error "Failed to load/recover state" YARN-3493. RM fails to come up with error "Failed to load/recover state"
when mem settings are changed. (Jian He via wangda) when mem settings are changed. (Jian He via wangda)
YARN-3136. Fixed a synchronization problem of
AbstractYarnScheduler#getTransferredContainers. (Sunil G via jianhe)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -469,6 +469,14 @@
<Method name="recoverContainersOnNode" /> <Method name="recoverContainersOnNode" />
<Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" /> <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" />
</Match> </Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler" />
<Or>
<Field name="rmContext" />
<Field name="applications" />
</Or>
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!-- Following fields are used in ErrorsAndWarningsBlock, which is not a part of analysis of findbugs --> <!-- Following fields are used in ErrorsAndWarningsBlock, which is not a part of analysis of findbugs -->
<Match> <Match>

View File

@ -298,32 +298,35 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
// For work-preserving AM restart, retrieve previous attempts' containers // For work-preserving AM restart, retrieve previous attempts' containers
// and corresponding NM tokens. // and corresponding NM tokens.
List<Container> transferredContainers = if (app.getApplicationSubmissionContext()
((AbstractYarnScheduler) rScheduler) .getKeepContainersAcrossApplicationAttempts()) {
List<Container> transferredContainers = ((AbstractYarnScheduler) rScheduler)
.getTransferredContainers(applicationAttemptId); .getTransferredContainers(applicationAttemptId);
if (!transferredContainers.isEmpty()) { if (!transferredContainers.isEmpty()) {
response.setContainersFromPreviousAttempts(transferredContainers); response.setContainersFromPreviousAttempts(transferredContainers);
List<NMToken> nmTokens = new ArrayList<NMToken>(); List<NMToken> nmTokens = new ArrayList<NMToken>();
for (Container container : transferredContainers) { for (Container container : transferredContainers) {
try { try {
NMToken token = rmContext.getNMTokenSecretManager() NMToken token = rmContext.getNMTokenSecretManager()
.createAndGetNMToken(app.getUser(), applicationAttemptId, .createAndGetNMToken(app.getUser(), applicationAttemptId,
container); container);
if (null != token) { if (null != token) {
nmTokens.add(token); nmTokens.add(token);
} }
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
// if it's a DNS issue, throw UnknowHostException directly and that // if it's a DNS issue, throw UnknowHostException directly and
// will be automatically retried by RMProxy in RPC layer. // that
if (e.getCause() instanceof UnknownHostException) { // will be automatically retried by RMProxy in RPC layer.
throw (UnknownHostException) e.getCause(); if (e.getCause() instanceof UnknownHostException) {
throw (UnknownHostException) e.getCause();
}
} }
} }
response.setNMTokensFromPreviousAttempts(nmTokens);
LOG.info("Application " + appID + " retrieved "
+ transferredContainers.size() + " containers from previous"
+ " attempts and " + nmTokens.size() + " NM tokens.");
} }
response.setNMTokensFromPreviousAttempts(nmTokens);
LOG.info("Application " + appID + " retrieved "
+ transferredContainers.size() + " containers from previous"
+ " attempts and " + nmTokens.size() + " NM tokens.");
} }
response.setSchedulerResourceTypes(rScheduler response.setSchedulerResourceTypes(rScheduler

View File

@ -21,12 +21,15 @@
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -67,6 +70,8 @@
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Private
@Unstable
public abstract class AbstractYarnScheduler public abstract class AbstractYarnScheduler
<T extends SchedulerApplicationAttempt, N extends SchedulerNode> <T extends SchedulerApplicationAttempt, N extends SchedulerNode>
extends AbstractService implements ResourceScheduler { extends AbstractService implements ResourceScheduler {
@ -91,7 +96,12 @@ public abstract class AbstractYarnScheduler
private long configuredMaximumAllocationWaitTime; private long configuredMaximumAllocationWaitTime;
protected RMContext rmContext; protected RMContext rmContext;
protected Map<ApplicationId, SchedulerApplication<T>> applications;
/*
* All schedulers which are inheriting AbstractYarnScheduler should use
* concurrent version of 'applications' map.
*/
protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications;
protected int nmExpireInterval; protected int nmExpireInterval;
protected final static List<Container> EMPTY_CONTAINER_LIST = protected final static List<Container> EMPTY_CONTAINER_LIST =
@ -123,7 +133,7 @@ public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf); super.serviceInit(conf);
} }
public synchronized List<Container> getTransferredContainers( public List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) { ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId(); ApplicationId appId = currentAttempt.getApplicationId();
SchedulerApplication<T> app = applications.get(appId); SchedulerApplication<T> app = applications.get(appId);
@ -132,6 +142,9 @@ public synchronized List<Container> getTransferredContainers(
if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) { if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
return containerList; return containerList;
} }
if (app == null) {
return containerList;
}
Collection<RMContainer> liveContainers = Collection<RMContainer> liveContainers =
app.getCurrentAppAttempt().getLiveContainers(); app.getCurrentAppAttempt().getLiveContainers();
ContainerId amContainerId = ContainerId amContainerId =