YARN-674. Fixed ResourceManager to renew DelegationTokens on submission asynchronously to work around potential slowness in state-store. Contributed by Omkar Vinit Joshi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1543312 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cfa783141f
commit
512475e56f
@ -107,6 +107,10 @@ Release 2.3.0 - UNRELEASED
|
||||
ensuring that previous AM exited or after expiry time. (Omkar Vinit Joshi via
|
||||
vinodkv)
|
||||
|
||||
YARN-674. Fixed ResourceManager to renew DelegationTokens on submission
|
||||
asynchronously to work around potential slowness in state-store. (Omkar Vinit
|
||||
Joshi via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -504,6 +504,11 @@ public class YarnConfiguration extends Configuration {
|
||||
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
|
||||
public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
|
||||
30000l;
|
||||
|
||||
/** Delegation Token renewer thread count */
|
||||
public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT =
|
||||
RM_PREFIX + "delegation-token-renewer.thread-count";
|
||||
public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50;
|
||||
|
||||
/** Whether to enable log aggregation */
|
||||
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
|
||||
|
@ -318,7 +318,7 @@ public SubmitApplicationResponse submitApplication(
|
||||
try {
|
||||
// call RMAppManager to submit application directly
|
||||
rmAppManager.submitApplication(submissionContext,
|
||||
System.currentTimeMillis(), false, user);
|
||||
System.currentTimeMillis(), user, false, null);
|
||||
|
||||
LOG.info("Application with id " + applicationId.getId() +
|
||||
" submitted by user " + user);
|
||||
|
@ -47,6 +47,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
@ -236,35 +237,63 @@ protected synchronized void checkAppNumCompletedLimit() {
|
||||
this.applicationACLsManager.removeApplication(removeId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void submitApplication(
|
||||
ApplicationSubmissionContext submissionContext, long submitTime,
|
||||
boolean isRecovered, String user) throws YarnException {
|
||||
String user, boolean isRecovered, RMState state) throws YarnException {
|
||||
ApplicationId applicationId = submissionContext.getApplicationId();
|
||||
|
||||
// Validation of the ApplicationSubmissionContext needs to be completed
|
||||
// here. Only those fields that are dependent on RM's configuration are
|
||||
// checked here as they have to be validated whether they are part of new
|
||||
// submission or just being recovered.
|
||||
RMAppImpl application =
|
||||
createAndPopulateNewRMApp(submissionContext, submitTime, user);
|
||||
|
||||
// Check whether AM resource requirements are within required limits
|
||||
if (!submissionContext.getUnmanagedAM()) {
|
||||
ResourceRequest amReq = BuilderUtils.newResourceRequest(
|
||||
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
|
||||
submissionContext.getResource(), 1);
|
||||
try {
|
||||
SchedulerUtils.validateResourceRequest(amReq,
|
||||
scheduler.getMaximumResourceCapability());
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
LOG.warn("RM app submission failed in validating AM resource request"
|
||||
+ " for application " + applicationId, e);
|
||||
throw e;
|
||||
if (isRecovered) {
|
||||
recoverApplication(state, application);
|
||||
RMAppState rmAppState =
|
||||
state.getApplicationState().get(applicationId).getState();
|
||||
if (isApplicationInFinalState(rmAppState)) {
|
||||
// We are synchronously moving the application into final state so that
|
||||
// momentarily client will not see this application in NEW state. Also
|
||||
// for finished applications we will avoid renewing tokens.
|
||||
application
|
||||
.handle(new RMAppEvent(applicationId, RMAppEventType.RECOVER));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
Credentials credentials = null;
|
||||
try {
|
||||
credentials = parseCredentials(submissionContext);
|
||||
} catch (Exception e) {
|
||||
LOG.warn(
|
||||
"Unable to parse credentials.", e);
|
||||
// Sending APP_REJECTED is fine, since we assume that the
|
||||
// RMApp is in NEW state and thus we haven't yet informed the
|
||||
// scheduler about the existence of the application
|
||||
assert application.getState() == RMAppState.NEW;
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppRejectedEvent(applicationId, e.getMessage()));
|
||||
throw RPCUtil.getRemoteException(e);
|
||||
}
|
||||
this.rmContext.getDelegationTokenRenewer().addApplication(
|
||||
applicationId, credentials,
|
||||
submissionContext.getCancelTokensWhenComplete(), isRecovered);
|
||||
} else {
|
||||
this.rmContext.getDispatcher().getEventHandler()
|
||||
.handle(new RMAppEvent(applicationId,
|
||||
isRecovered ? RMAppEventType.RECOVER : RMAppEventType.START));
|
||||
}
|
||||
}
|
||||
|
||||
private RMAppImpl createAndPopulateNewRMApp(
|
||||
ApplicationSubmissionContext submissionContext,
|
||||
long submitTime, String user)
|
||||
throws YarnException {
|
||||
ApplicationId applicationId = submissionContext.getApplicationId();
|
||||
validateResourceRequest(submissionContext);
|
||||
// Create RMApp
|
||||
RMApp application =
|
||||
RMAppImpl application =
|
||||
new RMAppImpl(applicationId, rmContext, this.conf,
|
||||
submissionContext.getApplicationName(), user,
|
||||
submissionContext.getQueue(),
|
||||
@ -281,35 +310,52 @@ protected void submitApplication(
|
||||
LOG.warn(message);
|
||||
throw RPCUtil.getRemoteException(message);
|
||||
}
|
||||
|
||||
// Inform the ACLs Manager
|
||||
this.applicationACLsManager.addApplication(applicationId,
|
||||
submissionContext.getAMContainerSpec().getApplicationACLs());
|
||||
return application;
|
||||
}
|
||||
|
||||
try {
|
||||
// Setup tokens for renewal
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
this.rmContext.getDelegationTokenRenewer().addApplication(
|
||||
applicationId,parseCredentials(submissionContext),
|
||||
submissionContext.getCancelTokensWhenComplete()
|
||||
);
|
||||
private void validateResourceRequest(
|
||||
ApplicationSubmissionContext submissionContext)
|
||||
throws InvalidResourceRequestException {
|
||||
// Validation of the ApplicationSubmissionContext needs to be completed
|
||||
// here. Only those fields that are dependent on RM's configuration are
|
||||
// checked here as they have to be validated whether they are part of new
|
||||
// submission or just being recovered.
|
||||
|
||||
// Check whether AM resource requirements are within required limits
|
||||
if (!submissionContext.getUnmanagedAM()) {
|
||||
ResourceRequest amReq = BuilderUtils.newResourceRequest(
|
||||
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
|
||||
submissionContext.getResource(), 1);
|
||||
try {
|
||||
SchedulerUtils.validateResourceRequest(amReq,
|
||||
scheduler.getMaximumResourceCapability());
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
LOG.warn("RM app submission failed in validating AM resource request"
|
||||
+ " for application " + submissionContext.getApplicationId(), e);
|
||||
throw e;
|
||||
}
|
||||
} catch (IOException ie) {
|
||||
LOG.warn(
|
||||
"Unable to add the application to the delegation token renewer.",
|
||||
ie);
|
||||
// Sending APP_REJECTED is fine, since we assume that the
|
||||
// RMApp is in NEW state and thus we havne't yet informed the
|
||||
// Scheduler about the existence of the application
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppRejectedEvent(applicationId, ie.getMessage()));
|
||||
throw RPCUtil.getRemoteException(ie);
|
||||
}
|
||||
}
|
||||
|
||||
if (!isRecovered) {
|
||||
// All done, start the RMApp
|
||||
this.rmContext.getDispatcher().getEventHandler()
|
||||
.handle(new RMAppEvent(applicationId, RMAppEventType.START));
|
||||
private void recoverApplication(RMState state, RMAppImpl application)
|
||||
throws YarnException {
|
||||
try {
|
||||
application.recover(state);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error recovering application", e);
|
||||
throw new YarnException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isApplicationInFinalState(RMAppState rmAppState) {
|
||||
if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED
|
||||
|| rmAppState == RMAppState.KILLED) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -335,17 +381,9 @@ public void recover(RMState state) throws Exception {
|
||||
LOG.info("Recovering " + appStates.size() + " applications");
|
||||
for (ApplicationState appState : appStates.values()) {
|
||||
LOG.info("Recovering application " + appState.getAppId());
|
||||
|
||||
submitApplication(appState.getApplicationSubmissionContext(),
|
||||
appState.getSubmitTime(), true, appState.getUser());
|
||||
// re-populate attempt information in application
|
||||
RMAppImpl appImpl =
|
||||
(RMAppImpl) rmContext.getRMApps().get(appState.getAppId());
|
||||
appImpl.recover(state);
|
||||
// Recover the app synchronously, as otherwise client is possible to see
|
||||
// the application not recovered before it is actually recovered because
|
||||
// ClientRMService is already started at this point of time.
|
||||
appImpl.handle(new RMAppEvent(appImpl.getApplicationId(),
|
||||
RMAppEventType.RECOVER));
|
||||
appState.getSubmitTime(), appState.getUser(), true, state);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -34,6 +34,10 @@
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -48,10 +52,15 @@
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AbstractEvent;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* Service to renew application delegation tokens.
|
||||
@ -72,7 +81,8 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||
// delegation token canceler thread
|
||||
private DelegationTokenCancelThread dtCancelThread =
|
||||
new DelegationTokenCancelThread();
|
||||
|
||||
private ThreadPoolExecutor renewerService;
|
||||
|
||||
// managing the list of tokens using Map
|
||||
// appId=>List<tokens>
|
||||
private Set<DelegationTokenToRenew> delegationTokens =
|
||||
@ -84,9 +94,9 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||
private long tokenRemovalDelayMs;
|
||||
|
||||
private Thread delayedRemovalThread;
|
||||
private boolean isServiceStarted = false;
|
||||
private List<DelegationTokenToRenew> pendingTokenForRenewal =
|
||||
new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
|
||||
private ReadWriteLock serviceStateLock = new ReentrantReadWriteLock();
|
||||
private volatile boolean isServiceStarted;
|
||||
private LinkedBlockingQueue<DelegationTokenRenewerEvent> pendingEventQueue;
|
||||
|
||||
private boolean tokenKeepAliveEnabled;
|
||||
|
||||
@ -102,9 +112,27 @@ protected synchronized void serviceInit(Configuration conf) throws Exception {
|
||||
this.tokenRemovalDelayMs =
|
||||
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
|
||||
renewerService = createNewThreadPoolService(conf);
|
||||
pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) {
|
||||
int nThreads = conf.getInt(
|
||||
YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT,
|
||||
YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT);
|
||||
|
||||
ThreadFactory tf = new ThreadFactoryBuilder()
|
||||
.setNameFormat("DelegationTokenRenewer #%d")
|
||||
.build();
|
||||
ThreadPoolExecutor pool =
|
||||
new ThreadPoolExecutor((5 < nThreads ? 5 : nThreads), nThreads, 3L,
|
||||
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
|
||||
pool.setThreadFactory(tf);
|
||||
pool.allowCoreThreadTimeOut(true);
|
||||
return pool;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
dtCancelThread.start();
|
||||
@ -119,21 +147,36 @@ protected void serviceStart() throws Exception {
|
||||
RMDelegationTokenIdentifier.Renewer.setSecretManager(
|
||||
rmContext.getRMDelegationTokenSecretManager(),
|
||||
rmContext.getClientRMService().getBindAddress());
|
||||
// Delegation token renewal is delayed until ClientRMService starts. As
|
||||
// it is required to short circuit the token renewal calls.
|
||||
serviceStateLock.writeLock().lock();
|
||||
isServiceStarted = true;
|
||||
renewIfServiceIsStarted(pendingTokenForRenewal);
|
||||
pendingTokenForRenewal.clear();
|
||||
serviceStateLock.writeLock().unlock();
|
||||
while(!pendingEventQueue.isEmpty()) {
|
||||
processDelegationTokenRewewerEvent(pendingEventQueue.take());
|
||||
}
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
private void processDelegationTokenRewewerEvent(
|
||||
DelegationTokenRenewerEvent evt) {
|
||||
serviceStateLock.readLock().lock();
|
||||
try {
|
||||
if (isServiceStarted) {
|
||||
renewerService.execute(new DelegationTokenRenewerRunnable(evt));
|
||||
} else {
|
||||
pendingEventQueue.add(evt);
|
||||
}
|
||||
} finally {
|
||||
serviceStateLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() {
|
||||
if (renewalTimer != null) {
|
||||
renewalTimer.cancel();
|
||||
}
|
||||
delegationTokens.clear();
|
||||
|
||||
this.renewerService.shutdown();
|
||||
dtCancelThread.interrupt();
|
||||
try {
|
||||
dtCancelThread.join(1000);
|
||||
@ -290,47 +333,50 @@ public Set<Token<?>> getDelegationTokens() {
|
||||
* @throws IOException
|
||||
*/
|
||||
public void addApplication(
|
||||
ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd)
|
||||
ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd,
|
||||
boolean isApplicationRecovered) {
|
||||
processDelegationTokenRewewerEvent(new DelegationTokenRenewerAppSubmitEvent(
|
||||
applicationId, ts,
|
||||
shouldCancelAtEnd, isApplicationRecovered));
|
||||
}
|
||||
|
||||
private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
|
||||
throws IOException {
|
||||
ApplicationId applicationId = evt.getApplicationId();
|
||||
Credentials ts = evt.getCredentials();
|
||||
boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
|
||||
if (ts == null) {
|
||||
return; //nothing to add
|
||||
return; // nothing to add
|
||||
}
|
||||
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Registering tokens for renewal for:" +
|
||||
LOG.debug("Registering tokens for renewal for:" +
|
||||
" appId = " + applicationId);
|
||||
}
|
||||
|
||||
Collection <Token<?>> tokens = ts.getAllTokens();
|
||||
|
||||
Collection<Token<?>> tokens = ts.getAllTokens();
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
|
||||
// find tokens for renewal, but don't add timers until we know
|
||||
// all renewable tokens are valid
|
||||
// At RM restart it is safe to assume that all the previously added tokens
|
||||
// are valid
|
||||
List<DelegationTokenToRenew> tokenList =
|
||||
new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
|
||||
for(Token<?> token : tokens) {
|
||||
for (Token<?> token : tokens) {
|
||||
if (token.isManaged()) {
|
||||
tokenList.add(new DelegationTokenToRenew(applicationId,
|
||||
token, getConfig(), now, shouldCancelAtEnd));
|
||||
}
|
||||
}
|
||||
if (!tokenList.isEmpty()){
|
||||
renewIfServiceIsStarted(tokenList);
|
||||
}
|
||||
}
|
||||
|
||||
protected void renewIfServiceIsStarted(List<DelegationTokenToRenew> dtrs)
|
||||
throws IOException {
|
||||
if (isServiceStarted) {
|
||||
if (!tokenList.isEmpty()) {
|
||||
// Renewing token and adding it to timer calls are separated purposefully
|
||||
// If user provides incorrect token then it should not be added for
|
||||
// renewal.
|
||||
for (DelegationTokenToRenew dtr : dtrs) {
|
||||
for (DelegationTokenToRenew dtr : tokenList) {
|
||||
renewToken(dtr);
|
||||
}
|
||||
for (DelegationTokenToRenew dtr : dtrs) {
|
||||
for (DelegationTokenToRenew dtr : tokenList) {
|
||||
addTokenToList(dtr);
|
||||
setTimerForTokenRenewal(dtr);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
@ -338,11 +384,9 @@ protected void renewIfServiceIsStarted(List<DelegationTokenToRenew> dtrs)
|
||||
+ dtr.token.getService() + " for appId = " + dtr.applicationId);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
pendingTokenForRenewal.addAll(dtrs);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Task - to renew a token
|
||||
*
|
||||
@ -449,14 +493,20 @@ private void removeFailedDelegationToken(DelegationTokenToRenew t) {
|
||||
* @param applicationId completed application
|
||||
*/
|
||||
public void applicationFinished(ApplicationId applicationId) {
|
||||
processDelegationTokenRewewerEvent(new DelegationTokenRenewerEvent(
|
||||
applicationId,
|
||||
DelegationTokenRenewerEventType.FINISH_APPLICATION));
|
||||
}
|
||||
|
||||
private void handleAppFinishEvent(DelegationTokenRenewerEvent evt) {
|
||||
if (!tokenKeepAliveEnabled) {
|
||||
removeApplicationFromRenewal(applicationId);
|
||||
removeApplicationFromRenewal(evt.getApplicationId());
|
||||
} else {
|
||||
delayedRemovalMap.put(applicationId, System.currentTimeMillis()
|
||||
delayedRemovalMap.put(evt.getApplicationId(), System.currentTimeMillis()
|
||||
+ tokenRemovalDelayMs);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Add a list of applications to the keep alive list. If an appId already
|
||||
* exists, update it's keep-alive time.
|
||||
@ -546,4 +596,111 @@ public void run() {
|
||||
public void setRMContext(RMContext rmContext) {
|
||||
this.rmContext = rmContext;
|
||||
}
|
||||
|
||||
/*
|
||||
* This will run as a separate thread and will process individual events. It
|
||||
* is done in this way to make sure that the token renewal as a part of
|
||||
* application submission and token removal as a part of application finish
|
||||
* is asynchronous in nature.
|
||||
*/
|
||||
private final class DelegationTokenRenewerRunnable
|
||||
implements Runnable {
|
||||
|
||||
private DelegationTokenRenewerEvent evt;
|
||||
|
||||
public DelegationTokenRenewerRunnable(DelegationTokenRenewerEvent evt) {
|
||||
this.evt = evt;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (evt instanceof DelegationTokenRenewerAppSubmitEvent) {
|
||||
DelegationTokenRenewerAppSubmitEvent appSubmitEvt =
|
||||
(DelegationTokenRenewerAppSubmitEvent) evt;
|
||||
handleDTRenewerAppSubmitEvent(appSubmitEvt);
|
||||
} else if (evt.getType().equals(
|
||||
DelegationTokenRenewerEventType.FINISH_APPLICATION)) {
|
||||
DelegationTokenRenewer.this.handleAppFinishEvent(evt);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void handleDTRenewerAppSubmitEvent(
|
||||
DelegationTokenRenewerAppSubmitEvent event) {
|
||||
/*
|
||||
* For applications submitted with delegation tokens we are not submitting
|
||||
* the application to scheduler from RMAppManager. Instead we are doing
|
||||
* it from here. The primary goal is to make token renewal as a part of
|
||||
* application submission asynchronous so that client thread is not
|
||||
* blocked during app submission.
|
||||
*/
|
||||
try {
|
||||
// Setup tokens for renewal
|
||||
DelegationTokenRenewer.this.handleAppSubmitEvent(event);
|
||||
rmContext.getDispatcher().getEventHandler()
|
||||
.handle(new RMAppEvent(event.getApplicationId(),
|
||||
event.isApplicationRecovered() ? RMAppEventType.RECOVER
|
||||
: RMAppEventType.START));
|
||||
} catch (Throwable t) {
|
||||
LOG.warn(
|
||||
"Unable to add the application to the delegation token renewer.",
|
||||
t);
|
||||
// Sending APP_REJECTED is fine, since we assume that the
|
||||
// RMApp is in NEW state and thus we havne't yet informed the
|
||||
// Scheduler about the existence of the application
|
||||
rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppRejectedEvent(event.getApplicationId(), t.getMessage()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class DelegationTokenRenewerAppSubmitEvent extends
|
||||
DelegationTokenRenewerEvent {
|
||||
|
||||
private Credentials credentials;
|
||||
private boolean shouldCancelAtEnd;
|
||||
private boolean isAppRecovered;
|
||||
|
||||
public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
|
||||
Credentials credentails, boolean shouldCancelAtEnd,
|
||||
boolean isApplicationRecovered) {
|
||||
super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
|
||||
this.credentials = credentails;
|
||||
this.shouldCancelAtEnd = shouldCancelAtEnd;
|
||||
this.isAppRecovered = isApplicationRecovered;
|
||||
}
|
||||
|
||||
public Credentials getCredentials() {
|
||||
return credentials;
|
||||
}
|
||||
|
||||
public boolean shouldCancelAtEnd() {
|
||||
return shouldCancelAtEnd;
|
||||
}
|
||||
|
||||
public boolean isApplicationRecovered() {
|
||||
return isAppRecovered;
|
||||
}
|
||||
}
|
||||
|
||||
enum DelegationTokenRenewerEventType {
|
||||
VERIFY_AND_START_APPLICATION,
|
||||
FINISH_APPLICATION
|
||||
}
|
||||
|
||||
class DelegationTokenRenewerEvent extends
|
||||
AbstractEvent<DelegationTokenRenewerEventType> {
|
||||
|
||||
private ApplicationId appId;
|
||||
|
||||
public DelegationTokenRenewerEvent(ApplicationId appId,
|
||||
DelegationTokenRenewerEventType type) {
|
||||
super(type);
|
||||
this.appId = appId;
|
||||
}
|
||||
|
||||
public ApplicationId getApplicationId() {
|
||||
return appId;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -172,7 +172,7 @@ public void submitApplication(
|
||||
ApplicationSubmissionContext submissionContext, String user)
|
||||
throws YarnException {
|
||||
super.submitApplication(submissionContext, System.currentTimeMillis(),
|
||||
false, user);
|
||||
user, false, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1009,6 +1009,10 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer()
|
||||
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
|
||||
rm2.start();
|
||||
|
||||
// Need to wait for a while as now token renewal happens on another thread
|
||||
// and is asynchronous in nature.
|
||||
waitForTokensToBeRenewed(rm2);
|
||||
|
||||
// verify tokens are properly populated back to rm2 DelegationTokenRenewer
|
||||
Assert.assertEquals(tokenSet, rm2.getRMContext()
|
||||
.getDelegationTokenRenewer().getDelegationTokens());
|
||||
@ -1018,6 +1022,21 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer()
|
||||
rm2.stop();
|
||||
}
|
||||
|
||||
private void waitForTokensToBeRenewed(MockRM rm2) throws Exception {
|
||||
int waitCnt = 20;
|
||||
boolean atleastOneAppInNEWState = true;
|
||||
while (waitCnt-- > 0 && atleastOneAppInNEWState) {
|
||||
atleastOneAppInNEWState = false;
|
||||
for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) {
|
||||
if (rmApp.getState() == RMAppState.NEW) {
|
||||
Thread.sleep(1000);
|
||||
atleastOneAppInNEWState = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||
|
@ -31,13 +31,24 @@
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
@ -46,16 +57,29 @@
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenRenewer;
|
||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
@ -66,14 +90,18 @@
|
||||
|
||||
/**
|
||||
* unit test -
|
||||
* tests addition/deletion/cancelation of renewals of delegation tokens
|
||||
* tests addition/deletion/cancellation of renewals of delegation tokens
|
||||
*
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
public class TestDelegationTokenRenewer {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestDelegationTokenRenewer.class);
|
||||
private static final Text KIND = new Text("TestDelegationTokenRenewer.Token");
|
||||
|
||||
private static BlockingQueue<Event> eventQueue;
|
||||
private static volatile AtomicInteger counter;
|
||||
private static AsyncDispatcher dispatcher;
|
||||
public static class Renewer extends TokenRenewer {
|
||||
private static int counter = 0;
|
||||
private static Token<?> lastRenewed = null;
|
||||
@ -143,11 +171,20 @@ public static void setUpClass() throws Exception {
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
counter = new AtomicInteger(0);
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
eventQueue = new LinkedBlockingQueue<Event>();
|
||||
dispatcher = new AsyncDispatcher(eventQueue);
|
||||
Renewer.reset();
|
||||
delegationTokenRenewer = new DelegationTokenRenewer();
|
||||
delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
|
||||
delegationTokenRenewer.init(conf);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(
|
||||
delegationTokenRenewer);
|
||||
when(mockContext.getDispatcher()).thenReturn(dispatcher);
|
||||
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||
InetSocketAddress sockAddr =
|
||||
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||
@ -285,7 +322,7 @@ static MyToken createTokens(Text renewer)
|
||||
* @throws IOException
|
||||
* @throws URISyntaxException
|
||||
*/
|
||||
@Test
|
||||
@Test(timeout=60000)
|
||||
public void testDTRenewal () throws Exception {
|
||||
MyFS dfs = (MyFS)FileSystem.get(conf);
|
||||
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
|
||||
@ -316,8 +353,9 @@ public void testDTRenewal () throws Exception {
|
||||
// register the tokens for renewal
|
||||
ApplicationId applicationId_0 =
|
||||
BuilderUtils.newApplicationId(0, 0);
|
||||
delegationTokenRenewer.addApplication(applicationId_0, ts, true);
|
||||
|
||||
delegationTokenRenewer.addApplication(applicationId_0, ts, true, false);
|
||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||
|
||||
// first 3 initial renewals + 1 real
|
||||
int numberOfExpectedRenewals = 3+1;
|
||||
|
||||
@ -355,9 +393,10 @@ public void testDTRenewal () throws Exception {
|
||||
|
||||
|
||||
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
|
||||
delegationTokenRenewer.addApplication(applicationId_1, ts, true);
|
||||
delegationTokenRenewer.addApplication(applicationId_1, ts, true, false);
|
||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||
delegationTokenRenewer.applicationFinished(applicationId_1);
|
||||
|
||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||
numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
|
||||
try {
|
||||
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
|
||||
@ -377,8 +416,8 @@ public void testDTRenewal () throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidDTWithAddApplication() throws Exception {
|
||||
@Test(timeout=60000)
|
||||
public void testAppRejectionWithCancelledDelegationToken() throws Exception {
|
||||
MyFS dfs = (MyFS)FileSystem.get(conf);
|
||||
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
|
||||
|
||||
@ -390,12 +429,21 @@ public void testInvalidDTWithAddApplication() throws Exception {
|
||||
|
||||
// register the tokens for renewal
|
||||
ApplicationId appId = BuilderUtils.newApplicationId(0, 0);
|
||||
try {
|
||||
delegationTokenRenewer.addApplication(appId, ts, true);
|
||||
fail("App submission with a cancelled token should have failed");
|
||||
} catch (InvalidToken e) {
|
||||
// expected
|
||||
delegationTokenRenewer.addApplication(appId, ts, true, false);
|
||||
int waitCnt = 20;
|
||||
while (waitCnt-- >0) {
|
||||
if (!eventQueue.isEmpty()) {
|
||||
Event evt = eventQueue.take();
|
||||
if (evt.getType() == RMAppEventType.APP_REJECTED) {
|
||||
Assert.assertTrue(
|
||||
((RMAppEvent) evt).getApplicationId().equals(appId));
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
}
|
||||
fail("App submission with a cancelled token should have failed");
|
||||
}
|
||||
|
||||
/**
|
||||
@ -408,7 +456,7 @@ public void testInvalidDTWithAddApplication() throws Exception {
|
||||
* @throws IOException
|
||||
* @throws URISyntaxException
|
||||
*/
|
||||
@Test
|
||||
@Test(timeout=60000)
|
||||
public void testDTRenewalWithNoCancel () throws Exception {
|
||||
MyFS dfs = (MyFS)FileSystem.get(conf);
|
||||
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
|
||||
@ -425,9 +473,10 @@ public void testDTRenewalWithNoCancel () throws Exception {
|
||||
|
||||
|
||||
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
|
||||
delegationTokenRenewer.addApplication(applicationId_1, ts, false);
|
||||
delegationTokenRenewer.addApplication(applicationId_1, ts, false, false);
|
||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||
delegationTokenRenewer.applicationFinished(applicationId_1);
|
||||
|
||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||
int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
|
||||
try {
|
||||
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
|
||||
@ -454,9 +503,8 @@ public void testDTRenewalWithNoCancel () throws Exception {
|
||||
* @throws IOException
|
||||
* @throws URISyntaxException
|
||||
*/
|
||||
@Test
|
||||
@Test(timeout=60000)
|
||||
public void testDTKeepAlive1 () throws Exception {
|
||||
DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
|
||||
Configuration lconf = new Configuration(conf);
|
||||
lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
||||
//Keep tokens alive for 6 seconds.
|
||||
@ -465,10 +513,15 @@ public void testDTKeepAlive1 () throws Exception {
|
||||
lconf.setLong(
|
||||
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
|
||||
1000l);
|
||||
DelegationTokenRenewer localDtr =
|
||||
createNewDelegationTokenRenewer(lconf, counter);
|
||||
localDtr.init(lconf);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(
|
||||
localDtr);
|
||||
when(mockContext.getDispatcher()).thenReturn(dispatcher);
|
||||
InetSocketAddress sockAddr =
|
||||
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||
@ -487,16 +540,25 @@ public void testDTKeepAlive1 () throws Exception {
|
||||
|
||||
// register the tokens for renewal
|
||||
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
|
||||
localDtr.addApplication(applicationId_0, ts, true);
|
||||
localDtr.addApplication(applicationId_0, ts, true, false);
|
||||
waitForEventsToGetProcessed(localDtr);
|
||||
if (!eventQueue.isEmpty()){
|
||||
Event evt = eventQueue.take();
|
||||
if (evt instanceof RMAppEvent) {
|
||||
Assert.assertEquals(((RMAppEvent)evt).getType(), RMAppEventType.START);
|
||||
} else {
|
||||
fail("RMAppEvent.START was expected!!");
|
||||
}
|
||||
}
|
||||
|
||||
localDtr.applicationFinished(applicationId_0);
|
||||
|
||||
Thread.sleep(3000l);
|
||||
waitForEventsToGetProcessed(localDtr);
|
||||
|
||||
//Token should still be around. Renewal should not fail.
|
||||
token1.renew(lconf);
|
||||
|
||||
//Allow the keepalive time to run out
|
||||
Thread.sleep(6000l);
|
||||
Thread.sleep(10000l);
|
||||
|
||||
//The token should have been cancelled at this point. Renewal will fail.
|
||||
try {
|
||||
@ -518,9 +580,8 @@ public void testDTKeepAlive1 () throws Exception {
|
||||
* @throws IOException
|
||||
* @throws URISyntaxException
|
||||
*/
|
||||
@Test
|
||||
@Test(timeout=60000)
|
||||
public void testDTKeepAlive2() throws Exception {
|
||||
DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
|
||||
Configuration lconf = new Configuration(conf);
|
||||
lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
||||
//Keep tokens alive for 6 seconds.
|
||||
@ -529,10 +590,15 @@ public void testDTKeepAlive2() throws Exception {
|
||||
lconf.setLong(
|
||||
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
|
||||
1000l);
|
||||
DelegationTokenRenewer localDtr =
|
||||
createNewDelegationTokenRenewer(conf, counter);
|
||||
localDtr.init(lconf);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(
|
||||
localDtr);
|
||||
when(mockContext.getDispatcher()).thenReturn(dispatcher);
|
||||
InetSocketAddress sockAddr =
|
||||
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||
@ -551,22 +617,18 @@ public void testDTKeepAlive2() throws Exception {
|
||||
|
||||
// register the tokens for renewal
|
||||
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
|
||||
localDtr.addApplication(applicationId_0, ts, true);
|
||||
localDtr.addApplication(applicationId_0, ts, true, false);
|
||||
localDtr.applicationFinished(applicationId_0);
|
||||
|
||||
Thread.sleep(4000l);
|
||||
|
||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||
//Send another keep alive.
|
||||
localDtr.updateKeepAliveApplications(Collections
|
||||
.singletonList(applicationId_0));
|
||||
//Renewal should not fail.
|
||||
token1.renew(lconf);
|
||||
|
||||
//Token should be around after this.
|
||||
Thread.sleep(4500l);
|
||||
//Renewal should not fail. - ~1.5 seconds for keepalive timeout.
|
||||
token1.renew(lconf);
|
||||
|
||||
//Allow the keepalive time to run out
|
||||
Thread.sleep(3000l);
|
||||
//The token should have been cancelled at this point. Renewal will fail.
|
||||
@ -575,61 +637,127 @@ public void testDTKeepAlive2() throws Exception {
|
||||
fail("Renewal of cancelled token should have failed");
|
||||
} catch (InvalidToken ite) {}
|
||||
}
|
||||
|
||||
@Test(timeout=20000)
|
||||
public void testConncurrentAddApplication()
|
||||
throws IOException, InterruptedException, BrokenBarrierException {
|
||||
final CyclicBarrier startBarrier = new CyclicBarrier(2);
|
||||
final CyclicBarrier endBarrier = new CyclicBarrier(2);
|
||||
|
||||
// this token uses barriers to block during renew
|
||||
final Credentials creds1 = new Credentials();
|
||||
final Token<?> token1 = mock(Token.class);
|
||||
creds1.addToken(new Text("token"), token1);
|
||||
doReturn(true).when(token1).isManaged();
|
||||
doAnswer(new Answer<Long>() {
|
||||
public Long answer(InvocationOnMock invocation)
|
||||
throws InterruptedException, BrokenBarrierException {
|
||||
startBarrier.await();
|
||||
endBarrier.await();
|
||||
return Long.MAX_VALUE;
|
||||
}}).when(token1).renew(any(Configuration.class));
|
||||
private DelegationTokenRenewer createNewDelegationTokenRenewer(
|
||||
Configuration conf, final AtomicInteger counter) {
|
||||
return new DelegationTokenRenewer() {
|
||||
|
||||
// this dummy token fakes renewing
|
||||
final Credentials creds2 = new Credentials();
|
||||
final Token<?> token2 = mock(Token.class);
|
||||
creds2.addToken(new Text("token"), token2);
|
||||
doReturn(true).when(token2).isManaged();
|
||||
doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
|
||||
|
||||
// fire up the renewer
|
||||
final DelegationTokenRenewer dtr = new DelegationTokenRenewer();
|
||||
dtr.init(conf);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||
InetSocketAddress sockAddr =
|
||||
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||
dtr.setRMContext(mockContext);
|
||||
dtr.start();
|
||||
|
||||
// submit a job that blocks during renewal
|
||||
Thread submitThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
dtr.addApplication(mock(ApplicationId.class), creds1, false);
|
||||
} catch (IOException e) {}
|
||||
protected ThreadPoolExecutor
|
||||
createNewThreadPoolService(Configuration conf) {
|
||||
ThreadPoolExecutor pool =
|
||||
new ThreadPoolExecutor(5, 5, 3L, TimeUnit.SECONDS,
|
||||
new LinkedBlockingQueue<Runnable>()) {
|
||||
|
||||
@Override
|
||||
protected void afterExecute(Runnable r, Throwable t) {
|
||||
counter.decrementAndGet();
|
||||
super.afterExecute(r, t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
counter.incrementAndGet();
|
||||
super.execute(command);
|
||||
}
|
||||
};
|
||||
return pool;
|
||||
}
|
||||
};
|
||||
submitThread.start();
|
||||
|
||||
}
|
||||
|
||||
private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr)
|
||||
throws InterruptedException {
|
||||
int wait = 40;
|
||||
while (wait-- > 0
|
||||
&& counter.get() > 0) {
|
||||
Thread.sleep(200);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=20000)
|
||||
public void testConcurrentAddApplication()
|
||||
throws IOException, InterruptedException, BrokenBarrierException {
|
||||
final CyclicBarrier startBarrier = new CyclicBarrier(2);
|
||||
final CyclicBarrier endBarrier = new CyclicBarrier(2);
|
||||
|
||||
// this token uses barriers to block during renew
|
||||
final Credentials creds1 = new Credentials();
|
||||
final Token<?> token1 = mock(Token.class);
|
||||
creds1.addToken(new Text("token"), token1);
|
||||
doReturn(true).when(token1).isManaged();
|
||||
doAnswer(new Answer<Long>() {
|
||||
public Long answer(InvocationOnMock invocation)
|
||||
throws InterruptedException, BrokenBarrierException {
|
||||
startBarrier.await();
|
||||
endBarrier.await();
|
||||
return Long.MAX_VALUE;
|
||||
}}).when(token1).renew(any(Configuration.class));
|
||||
|
||||
// this dummy token fakes renewing
|
||||
final Credentials creds2 = new Credentials();
|
||||
final Token<?> token2 = mock(Token.class);
|
||||
creds2.addToken(new Text("token"), token2);
|
||||
doReturn(true).when(token2).isManaged();
|
||||
doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
|
||||
|
||||
// fire up the renewer
|
||||
final DelegationTokenRenewer dtr =
|
||||
createNewDelegationTokenRenewer(conf, counter);
|
||||
dtr.init(conf);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||
InetSocketAddress sockAddr =
|
||||
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||
dtr.setRMContext(mockContext);
|
||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
|
||||
dtr.start();
|
||||
// submit a job that blocks during renewal
|
||||
Thread submitThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
dtr.addApplication(mock(ApplicationId.class), creds1, false, false);
|
||||
}
|
||||
};
|
||||
submitThread.start();
|
||||
|
||||
// wait till 1st submit blocks, then submit another
|
||||
startBarrier.await();
|
||||
dtr.addApplication(mock(ApplicationId.class), creds2, false);
|
||||
// signal 1st to complete
|
||||
endBarrier.await();
|
||||
submitThread.join();
|
||||
startBarrier.await();
|
||||
dtr.addApplication(mock(ApplicationId.class), creds2, false, false);
|
||||
// signal 1st to complete
|
||||
endBarrier.await();
|
||||
submitThread.join();
|
||||
}
|
||||
|
||||
@Test(timeout=20000)
|
||||
public void testAppSubmissionWithInvalidDelegationToken() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(
|
||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
MockRM rm = new MockRM(conf);
|
||||
ByteBuffer tokens = ByteBuffer.wrap("BOGUS".getBytes());
|
||||
ContainerLaunchContext amContainer =
|
||||
ContainerLaunchContext.newInstance(
|
||||
new HashMap<String, LocalResource>(), new HashMap<String, String>(),
|
||||
new ArrayList<String>(), new HashMap<String, ByteBuffer>(), tokens,
|
||||
new HashMap<ApplicationAccessType, String>());
|
||||
ApplicationSubmissionContext appSubContext =
|
||||
ApplicationSubmissionContext.newInstance(
|
||||
ApplicationId.newInstance(1234121, 0),
|
||||
"BOGUS", "default", Priority.UNDEFINED, amContainer, false,
|
||||
true, 1, Resource.newInstance(1024, 1), "BOGUS");
|
||||
SubmitApplicationRequest request =
|
||||
SubmitApplicationRequest.newInstance(appSubContext);
|
||||
try {
|
||||
rm.getClientRMService().submitApplication(request);
|
||||
fail("Error was excepted.");
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(e.getMessage().contains(
|
||||
"Bad header found in token storage"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user