diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ea7e586da4..125238d8ba 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1773,6 +1773,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3269. Fixed log4j properties to correctly set logging options for JobHistoryServer vis-a-vis JobSummary logs. (mahadev via acmurthy) + MAPREDUCE-2977. Fix ResourceManager to renew HDFS delegation tokens for + applications. (acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java index 64125fde2a..497edcb5b1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java @@ -105,7 +105,6 @@ protected boolean notifyURLOnce() { /** * Notify a server of the completion of a submitted job. The server must have * configured MRConfig.JOB_END_NOTIFICATION_URLS - * @param config JobConf to read parameters from * @param jobReport JobReport used to read JobId and JobStatus * @throws InterruptedException */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java index fd46caacef..51adf750d7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java @@ -24,7 +24,6 @@ import java.lang.reflect.InvocationTargetException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; /** * A factory to allow applications to deal with inconsistencies between @@ -178,7 +177,7 @@ public static JobContext cloneContext(JobContext original, } /** - * Copy a custom {@link WrappedMapper.Context}, optionally replacing + * Copy a custom WrappedMapper.Context, optionally replacing * the input and output. * @param input key type * @param input value type diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index ab589e0b49..bfd4e65496 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -18,17 +18,18 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.LinkedList; -import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -166,12 +167,17 @@ protected synchronized int getCompletedAppsListSize() { return this.completedApps.size(); } - protected synchronized void addCompletedApp(ApplicationId appId) { - if (appId == null) { + protected synchronized void finishApplication(ApplicationId applicationId) { + if (applicationId == null) { LOG.error("RMAppManager received completed appId of null, skipping"); } else { - completedApps.add(appId); - writeAuditLog(appId); + // Inform the DelegationTokenRenewer + if (UserGroupInformation.isSecurityEnabled()) { + rmContext.getDelegationTokenRenewer().removeApplication(applicationId); + } + + completedApps.add(applicationId); + writeAuditLog(applicationId); } } @@ -256,40 +262,65 @@ protected synchronized void submitApplication( appStore, this.scheduler, this.masterService, submitTime); + // Sanity check - duplicate? if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) { String message = "Application with id " + applicationId + " is already present! Cannot add a duplicate!"; LOG.info(message); throw RPCUtil.getRemoteException(message); - } else { + } - this.applicationACLsManager.addApplication(applicationId, - submissionContext.getAMContainerSpec().getApplicationACLs()); + // Inform the ACLs Manager + this.applicationACLsManager.addApplication(applicationId, + submissionContext.getAMContainerSpec().getApplicationACLs()); - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.START)); - } + // Setup tokens for renewal + if (UserGroupInformation.isSecurityEnabled()) { + this.rmContext.getDelegationTokenRenewer().addApplication( + applicationId,parseCredentials(submissionContext) + ); + } + + // All done, start the RMApp + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.START)); } catch (IOException ie) { LOG.info("RMAppManager submit application exception", ie); if (application != null) { - // TODO: Weird setup. + // Sending APP_REJECTED is fine, since we assume that the + // RMApp is in NEW state and thus we havne't yet informed the + // Scheduler about the existence of the application this.rmContext.getDispatcher().getEventHandler().handle( new RMAppRejectedEvent(applicationId, ie.getMessage())); } } } + + private Credentials parseCredentials(ApplicationSubmissionContext application) + throws IOException { + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + ByteBuffer tokens = application.getAMContainerSpec().getContainerTokens(); + if (tokens != null) { + dibb.reset(tokens); + credentials.readTokenStorageStream(dibb); + tokens.rewind(); + } + return credentials; + } @Override public void handle(RMAppManagerEvent event) { - ApplicationId appID = event.getApplicationId(); + ApplicationId applicationId = event.getApplicationId(); LOG.debug("RMAppManager processing event for " - + appID + " of type " + event.getType()); + + applicationId + " of type " + event.getType()); switch(event.getType()) { case APP_COMPLETED: { - addCompletedApp(appID); - ApplicationSummary.logAppSummary(rmContext.getRMApps().get(appID)); + finishApplication(applicationId); + ApplicationSummary.logAppSummary( + rmContext.getRMApps().get(applicationId)); checkAppNumCompletedLimit(); } break; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index b1c8adce30..3d975818f2 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -29,7 +29,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; +/** + * Context of the ResourceManager. + */ public interface RMContext { Dispatcher getDispatcher(); @@ -45,4 +49,6 @@ public interface RMContext { AMLivelinessMonitor getAMLivelinessMonitor(); ContainerAllocationExpirer getContainerAllocationExpirer(); + + DelegationTokenRenewer getDelegationTokenRenewer(); } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 997906a62e..a177f1cc16 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; public class RMContextImpl implements RMContext { @@ -45,14 +46,17 @@ public class RMContextImpl implements RMContext { private AMLivelinessMonitor amLivelinessMonitor; private ContainerAllocationExpirer containerAllocationExpirer; + private final DelegationTokenRenewer tokenRenewer; public RMContextImpl(Store store, Dispatcher rmDispatcher, ContainerAllocationExpirer containerAllocationExpirer, - AMLivelinessMonitor amLivelinessMonitor) { + AMLivelinessMonitor amLivelinessMonitor, + DelegationTokenRenewer tokenRenewer) { this.store = store; this.rmDispatcher = rmDispatcher; this.containerAllocationExpirer = containerAllocationExpirer; this.amLivelinessMonitor = amLivelinessMonitor; + this.tokenRenewer = tokenRenewer; } @Override @@ -89,4 +93,9 @@ public ContainerAllocationExpirer getContainerAllocationExpirer() { public AMLivelinessMonitor getAMLivelinessMonitor() { return this.amLivelinessMonitor; } + + @Override + public DelegationTokenRenewer getDelegationTokenRenewer() { + return tokenRenewer; + } } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index b51e019803..d3c725718d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; @@ -134,8 +135,11 @@ public synchronized void init(Configuration conf) { AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor(); addService(amLivelinessMonitor); + DelegationTokenRenewer tokenRenewer = createDelegationTokenRenewer(); + addService(tokenRenewer); + this.rmContext = new RMContextImpl(this.store, this.rmDispatcher, - this.containerAllocationExpirer, amLivelinessMonitor); + this.containerAllocationExpirer, amLivelinessMonitor, tokenRenewer); addService(nodesListManager); @@ -234,6 +238,10 @@ private NMLivelinessMonitor createNMLivelinessMonitor() { protected AMLivelinessMonitor createAMLivelinessMonitor() { return new AMLivelinessMonitor(this.rmDispatcher); } + + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return new DelegationTokenRenewer(); + } protected RMAppManager createRMAppManager() { return new RMAppManager(this.rmContext, this.clientToAMSecretManager, diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 9b716f8146..29997fdab8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -23,24 +23,22 @@ import javax.crypto.SecretKey; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.ipc.Server; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.Node; -import org.apache.hadoop.security.SecurityInfo; import org.apache.hadoop.security.authorize.PolicyProvider; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index 7818701919..5c24152500 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -198,8 +198,7 @@ private void setupTokensAndEnv( String.valueOf(rmContext.getRMApps() .get(application.getAppAttemptId().getApplicationId()) .getSubmitTime())); - - + if (UserGroupInformation.isSecurityEnabled()) { // TODO: Security enabled/disabled info should come from RM. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java new file mode 100644 index 0000000000..b3ab9a1c4e --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -0,0 +1,365 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.security; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.service.AbstractService; + +/** + * Service to renew application delegation tokens. + */ +@Private +@Unstable +public class DelegationTokenRenewer extends AbstractService { + + private static final Log LOG = + LogFactory.getLog(DelegationTokenRenewer.class); + + public static final String SCHEME = "hdfs"; + + // global single timer (daemon) + private Timer renewalTimer; + + // delegation token canceler thread + private DelegationTokenCancelThread dtCancelThread = + new DelegationTokenCancelThread(); + + // managing the list of tokens using Map + // appId=>List + private Set delegationTokens = + Collections.synchronizedSet(new HashSet()); + + public DelegationTokenRenewer() { + super(DelegationTokenRenewer.class.getName()); + } + + @Override + public synchronized void init(Configuration conf) { + super.init(conf); + } + + @Override + public synchronized void start() { + super.start(); + + dtCancelThread.start(); + renewalTimer = new Timer(true); + } + + @Override + public synchronized void stop() { + renewalTimer.cancel(); + delegationTokens.clear(); + + dtCancelThread.interrupt(); + try { + dtCancelThread.join(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + super.stop(); + } + + /** + * class that is used for keeping tracks of DT to renew + * + */ + private static class DelegationTokenToRenew { + public final Token token; + public final ApplicationId applicationId; + public final Configuration conf; + public long expirationDate; + public TimerTask timerTask; + + public DelegationTokenToRenew( + ApplicationId jId, Token token, + Configuration conf, long expirationDate) { + this.token = token; + this.applicationId = jId; + this.conf = conf; + this.expirationDate = expirationDate; + this.timerTask = null; + if (this.token==null || this.applicationId==null || this.conf==null) { + throw new IllegalArgumentException("Invalid params to renew token" + + ";token=" + this.token + + ";appId=" + this.applicationId + + ";conf=" + this.conf); + } + } + + public void setTimerTask(TimerTask tTask) { + timerTask = tTask; + } + + @Override + public String toString() { + return token + ";exp=" + expirationDate; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof DelegationTokenToRenew && + token.equals(((DelegationTokenToRenew)obj).token); + } + + @Override + public int hashCode() { + return token.hashCode(); + } + } + + + private static class DelegationTokenCancelThread extends Thread { + private static class TokenWithConf { + Token token; + Configuration conf; + TokenWithConf(Token token, Configuration conf) { + this.token = token; + this.conf = conf; + } + } + private LinkedBlockingQueue queue = + new LinkedBlockingQueue(); + + public DelegationTokenCancelThread() { + super("Delegation Token Canceler"); + setDaemon(true); + } + public void cancelToken(Token token, + Configuration conf) { + TokenWithConf tokenWithConf = new TokenWithConf(token, conf); + while (!queue.offer(tokenWithConf)) { + LOG.warn("Unable to add token " + token + " for cancellation. " + + "Will retry.."); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + public void run() { + TokenWithConf tokenWithConf = null; + while (true) { + try { + tokenWithConf = queue.take(); + final TokenWithConf current = tokenWithConf; + if (LOG.isDebugEnabled()) { + LOG.debug("Canceling token " + tokenWithConf.token.getService()); + } + // need to use doAs so that http can find the kerberos tgt + UserGroupInformation.getLoginUser() + .doAs(new PrivilegedExceptionAction(){ + + @Override + public Void run() throws Exception { + current.token.cancel(current.conf); + return null; + } + }); + } catch (IOException e) { + LOG.warn("Failed to cancel token " + tokenWithConf.token + " " + + StringUtils.stringifyException(e)); + } catch (InterruptedException ie) { + return; + } catch (Throwable t) { + LOG.warn("Got exception " + StringUtils.stringifyException(t) + + ". Exiting.."); + System.exit(-1); + } + } + } + } + //adding token + private void addTokenToList(DelegationTokenToRenew t) { + delegationTokens.add(t); + } + + /** + * Add application tokens for renewal. + * @param applicationId added application + * @param ts tokens + * @throws IOException + */ + public synchronized void addApplication( + ApplicationId applicationId, Credentials ts) + throws IOException { + if (ts == null) { + return; //nothing to add + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Registering tokens for renewal for:" + + " appId = " + applicationId); + } + + Collection > tokens = ts.getAllTokens(); + long now = System.currentTimeMillis(); + + for(Token token : tokens) { + // first renew happens immediately + if (token.isManaged()) { + DelegationTokenToRenew dtr = + new DelegationTokenToRenew(applicationId, token, getConfig(), now); + + addTokenToList(dtr); + + setTimerForTokenRenewal(dtr, true); + if (LOG.isDebugEnabled()) { + LOG.debug("Registering token for renewal for:" + + " service = " + token.getService() + + " for appId = " + applicationId); + } + } + } + } + + /** + * Task - to renew a token + * + */ + private class RenewalTimerTask extends TimerTask { + private DelegationTokenToRenew dttr; + + RenewalTimerTask(DelegationTokenToRenew t) { + dttr = t; + } + + @Override + public void run() { + Token token = dttr.token; + try { + // need to use doAs so that http can find the kerberos tgt + dttr.expirationDate = UserGroupInformation.getLoginUser() + .doAs(new PrivilegedExceptionAction(){ + + @Override + public Long run() throws Exception { + return dttr.token.renew(dttr.conf); + } + }); + + if (LOG.isDebugEnabled()) { + LOG.debug("Renewing delegation-token for:" + token.getService() + + "; new expiration;" + dttr.expirationDate); + } + + setTimerForTokenRenewal(dttr, false);// set the next one + } catch (Exception e) { + LOG.error("Exception renewing token" + token + ". Not rescheduled", e); + removeFailedDelegationToken(dttr); + } + } + } + + /** + * set task to renew the token + */ + private + void setTimerForTokenRenewal(DelegationTokenToRenew token, + boolean firstTime) throws IOException { + + // calculate timer time + long now = System.currentTimeMillis(); + long renewIn; + if(firstTime) { + renewIn = now; + } else { + long expiresIn = (token.expirationDate - now); + renewIn = now + expiresIn - expiresIn/10; // little bit before the expiration + } + + // need to create new task every time + TimerTask tTask = new RenewalTimerTask(token); + token.setTimerTask(tTask); // keep reference to the timer + + renewalTimer.schedule(token.timerTask, new Date(renewIn)); + } + + // cancel a token + private void cancelToken(DelegationTokenToRenew t) { + dtCancelThread.cancelToken(t.token, t.conf); + } + + /** + * removing failed DT + * @param applicationId + */ + private void removeFailedDelegationToken(DelegationTokenToRenew t) { + ApplicationId applicationId = t.applicationId; + if (LOG.isDebugEnabled()) + LOG.debug("removing failed delegation token for appid=" + applicationId + + ";t=" + t.token.getService()); + delegationTokens.remove(t); + // cancel the timer + if(t.timerTask!=null) + t.timerTask.cancel(); + } + + /** + * Removing delegation token for completed applications. + * @param applicationId completed application + */ + public void removeApplication(ApplicationId applicationId) { + synchronized (delegationTokens) { + Iterator it = delegationTokens.iterator(); + while(it.hasNext()) { + DelegationTokenToRenew dttr = it.next(); + if (dttr.applicationId.equals(applicationId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing delegation token for appId=" + applicationId + + "; token=" + dttr.token.getService()); + } + + // cancel the timer + if(dttr.timerTask!=null) + dttr.timerTask.cancel(); + + // cancel the token + cancelToken(dttr); + + it.remove(); + } + } + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 85cc15371f..882115b665 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -93,7 +93,7 @@ public static RMContext mockRMContext(int n, long time) { AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor( rmDispatcher); return new RMContextImpl(new MemStore(), rmDispatcher, - containerAllocationExpirer, amLivelinessMonitor) { + containerAllocationExpirer, amLivelinessMonitor, null) { @Override public ConcurrentMap getRMApps() { return map; @@ -150,8 +150,8 @@ public void checkAppNumCompletedLimit() { super.checkAppNumCompletedLimit(); } - public void addCompletedApp(ApplicationId appId) { - super.addCompletedApp(appId); + public void finishApplication(ApplicationId appId) { + super.finishApplication(appId); } public int getCompletedAppsListSize() { @@ -172,7 +172,7 @@ protected void addToCompletedApps(TestRMAppManager appMonitor, RMContext rmConte if (app.getState() == RMAppState.FINISHED || app.getState() == RMAppState.KILLED || app.getState() == RMAppState.FAILED) { - appMonitor.addCompletedApp(app.getApplicationId()); + appMonitor.finishApplication(app.getApplicationId()); } } } @@ -288,7 +288,7 @@ public void testRMAppRetireNullApp() throws Exception { Assert.assertEquals("Number of apps incorrect before", 10, rmContext .getRMApps().size()); - appMonitor.addCompletedApp(null); + appMonitor.finishApplication(null); Assert.assertEquals("Number of completed apps incorrect after check", 0, appMonitor.getCompletedAppsListSize()); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java index 8bbfd105c5..7411748b06 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java @@ -96,7 +96,7 @@ public void setUp() { dispatcher.register(RMNodeEventType.class, new InlineDispatcher.EmptyEventHandler()); RMContext context = new RMContextImpl(new MemStore(), dispatcher, null, - null); + null, null); NMLivelinessMonitor nmLivelinessMonitor = new TestNmLivelinessMonitor( dispatcher); nmLivelinessMonitor.init(conf); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java index 5cdd3b3ab4..ca025eafa5 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java @@ -65,8 +65,8 @@ public void handle(Event event) { ; // ignore } }); - RMContext context = new RMContextImpl(new MemStore(), dispatcher, null, - null); + RMContext context = + new RMContextImpl(new MemStore(), dispatcher, null, null, null); dispatcher.register(RMNodeEventType.class, new ResourceManager.NodeEventDispatcher(context)); NodesListManager nodesListManager = new NodesListManager(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 2ad44bf39b..ceba12ad2e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -119,7 +119,7 @@ public void setUp() throws Exception { mock(ContainerAllocationExpirer.class); AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); this.rmContext = new RMContextImpl(new MemStore(), rmDispatcher, - containerAllocationExpirer, amLivelinessMonitor); + containerAllocationExpirer, amLivelinessMonitor, null); rmDispatcher.register(RMAppAttemptEventType.class, new TestApplicationAttemptEventDispatcher(this.rmContext)); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 7083197ad1..eded36a72f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -138,7 +138,7 @@ public void setUp() throws Exception { mock(ContainerAllocationExpirer.class); AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); rmContext = new RMContextImpl(new MemStore(), rmDispatcher, - containerAllocationExpirer, amLivelinessMonitor); + containerAllocationExpirer, amLivelinessMonitor, null); scheduler = mock(YarnScheduler.class); masterService = mock(ApplicationMasterService.class); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 8459e51d5c..52a67cf0c5 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -75,7 +75,7 @@ public EventHandler getEventHandler() { new ContainerAllocationExpirer(nullDispatcher); RMContext rmContext = - new RMContextImpl(null, nullDispatcher, cae, null); + new RMContextImpl(null, nullDispatcher, cae, null, null); return rmContext; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java new file mode 100644 index 0000000000..61870a817d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -0,0 +1,350 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.security; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenRenewer; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * unit test - + * tests addition/deletion/cancelation of renewals of delegation tokens + * + */ +public class TestDelegationTokenRenewer { + private static final Log LOG = + LogFactory.getLog(TestDelegationTokenRenewer.class); + private static final Text KIND = new Text("TestDelegationTokenRenewer.Token"); + + public static class Renewer extends TokenRenewer { + private static int counter = 0; + private static Token lastRenewed = null; + private static Token tokenToRenewIn2Sec = null; + + @Override + public boolean handleKind(Text kind) { + return KIND.equals(kind); + } + + @Override + public boolean isManaged(Token token) throws IOException { + return true; + } + + @Override + public long renew(Token t, Configuration conf) throws IOException { + MyToken token = (MyToken)t; + if(token.isCanceled()) { + throw new InvalidToken("token has been canceled"); + } + lastRenewed = token; + counter ++; + LOG.info("Called MYDFS.renewdelegationtoken " + token + + ";this dfs=" + this.hashCode() + ";c=" + counter); + if(tokenToRenewIn2Sec == token) { + // this token first renewal in 2 seconds + LOG.info("RENEW in 2 seconds"); + tokenToRenewIn2Sec=null; + return 2*1000 + System.currentTimeMillis(); + } else { + return 86400*1000 + System.currentTimeMillis(); + } + } + + @Override + public void cancel(Token t, Configuration conf) { + MyToken token = (MyToken)t; + LOG.info("Cancel token " + token); + token.cancelToken(); + } + + } + + private static Configuration conf; + DelegationTokenRenewer delegationTokenRenewer; + + @BeforeClass + public static void setUpClass() throws Exception { + conf = new Configuration(); + + // create a fake FileSystem (MyFS) and assosiate it + // with "hdfs" schema. + URI uri = new URI(DelegationTokenRenewer.SCHEME+"://localhost:0"); + System.out.println("scheme is : " + uri.getScheme()); + conf.setClass("fs." + uri.getScheme() + ".impl", MyFS.class, DistributedFileSystem.class); + FileSystem.setDefaultUri(conf, uri); + LOG.info("filesystem uri = " + FileSystem.getDefaultUri(conf).toString()); + } + + + @Before + public void setUp() throws Exception { + delegationTokenRenewer = new DelegationTokenRenewer(); + delegationTokenRenewer.init(conf); + delegationTokenRenewer.start(); + } + + @After + public void tearDown() { + delegationTokenRenewer.stop(); + } + + private static class MyDelegationTokenSecretManager extends DelegationTokenSecretManager { + + public MyDelegationTokenSecretManager(long delegationKeyUpdateInterval, + long delegationTokenMaxLifetime, long delegationTokenRenewInterval, + long delegationTokenRemoverScanInterval, FSNamesystem namesystem) { + super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, + delegationTokenRenewInterval, delegationTokenRemoverScanInterval, + namesystem); + } + + @Override //DelegationTokenSecretManager + public void logUpdateMasterKey(DelegationKey key) throws IOException { + return; + } + } + + /** + * add some extra functionality for testing + * 1. toString(); + * 2. cancel() and isCanceled() + */ + private static class MyToken extends Token { + public String status = "GOOD"; + public static final String CANCELED = "CANCELED"; + + public MyToken(DelegationTokenIdentifier dtId1, + MyDelegationTokenSecretManager sm) { + super(dtId1, sm); + setKind(KIND); + status = "GOOD"; + } + + public boolean isCanceled() {return status.equals(CANCELED);} + + public void cancelToken() {this.status=CANCELED;} + + @Override + public long renew(Configuration conf) throws IOException, + InterruptedException { + return super.renew(conf); + } + + public String toString() { + StringBuilder sb = new StringBuilder(1024); + + sb.append("id="); + String id = StringUtils.byteToHexString(this.getIdentifier()); + int idLen = id.length(); + sb.append(id.substring(idLen-6)); + sb.append(";k="); + sb.append(this.getKind()); + sb.append(";s="); + sb.append(this.getService()); + return sb.toString(); + } + } + + /** + * fake FileSystem + * overwrites three methods + * 1. getDelegationToken() - generates a token + * 2. renewDelegataionToken - counts number of calls, and remembers + * most recently renewed token. + * 3. cancelToken -cancels token (subsequent renew will cause IllegalToken + * exception + */ + static class MyFS extends DistributedFileSystem { + + public MyFS() {} + public void close() {} + @Override + public void initialize(URI uri, Configuration conf) throws IOException {} + + @Override + public MyToken getDelegationToken(Text renewer) throws IOException { + MyToken result = createTokens(renewer); + LOG.info("Called MYDFS.getdelegationtoken " + result); + return result; + } + } + + /** + * Auxiliary - create token + * @param renewer + * @return + * @throws IOException + */ + static MyToken createTokens(Text renewer) + throws IOException { + Text user1= new Text("user1"); + + MyDelegationTokenSecretManager sm = new MyDelegationTokenSecretManager( + DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT, + DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT, + DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT, + 3600000, null); + sm.startThreads(); + + DelegationTokenIdentifier dtId1 = + new DelegationTokenIdentifier(user1, renewer, user1); + + MyToken token1 = new MyToken(dtId1, sm); + + token1.setService(new Text("localhost:0")); + return token1; + } + + + /** + * Basic idea of the test: + * 1. create tokens. + * 2. Mark one of them to be renewed in 2 seconds (istead of + * 24 hourse) + * 3. register them for renewal + * 4. sleep for 3 seconds + * 5. count number of renewals (should 3 initial ones + one extra) + * 6. register another token for 2 seconds + * 7. cancel it immediately + * 8. Sleep and check that the 2 seconds renew didn't happen + * (totally 5 reneals) + * 9. check cancelation + * @throws IOException + * @throws URISyntaxException + */ + @Test + public void testDTRenewal () throws Exception { + MyFS dfs = (MyFS)FileSystem.get(conf); + LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode()); + // Test 1. - add three tokens - make sure exactly one get's renewed + + // get the delegation tokens + MyToken token1, token2, token3; + token1 = dfs.getDelegationToken(new Text("user1")); + token2 = dfs.getDelegationToken(new Text("user2")); + token3 = dfs.getDelegationToken(new Text("user3")); + + //to cause this one to be set for renew in 2 secs + Renewer.tokenToRenewIn2Sec = token1; + LOG.info("token="+token1+" should be renewed for 2 secs"); + + // three distinct Namenodes + String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0"; + String nn2 = DelegationTokenRenewer.SCHEME + "://host2:0"; + String nn3 = DelegationTokenRenewer.SCHEME + "://host3:0"; + + Credentials ts = new Credentials(); + + // register the token for renewal + ts.addToken(new Text(nn1), token1); + ts.addToken(new Text(nn2), token2); + ts.addToken(new Text(nn3), token3); + + // register the tokens for renewal + ApplicationId applicationId_0 = + BuilderUtils.newApplicationId(0, 0); + delegationTokenRenewer.addApplication(applicationId_0, ts); + + // first 3 initial renewals + 1 real + int numberOfExpectedRenewals = 3+1; + + int attempts = 10; + while(attempts-- > 0) { + try { + Thread.sleep(3*1000); // sleep 3 seconds, so it has time to renew + } catch (InterruptedException e) {} + + // since we cannot guarantee timely execution - let's give few chances + if(Renewer.counter==numberOfExpectedRenewals) + break; + } + + LOG.info("dfs=" + dfs.hashCode() + + ";Counter = " + Renewer.counter + ";t="+ Renewer.lastRenewed); + assertEquals("renew wasn't called as many times as expected(4):", + numberOfExpectedRenewals, Renewer.counter); + assertEquals("most recently renewed token mismatch", Renewer.lastRenewed, + token1); + + // Test 2. + // add another token ( that expires in 2 secs). Then remove it, before + // time is up. + // Wait for 3 secs , and make sure no renews were called + ts = new Credentials(); + MyToken token4 = dfs.getDelegationToken(new Text("user4")); + + //to cause this one to be set for renew in 2 secs + Renewer.tokenToRenewIn2Sec = token4; + LOG.info("token="+token4+" should be renewed for 2 secs"); + + String nn4 = DelegationTokenRenewer.SCHEME + "://host4:0"; + ts.addToken(new Text(nn4), token4); + + + ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); + delegationTokenRenewer.addApplication(applicationId_1, ts); + delegationTokenRenewer.removeApplication(applicationId_1); + + numberOfExpectedRenewals = Renewer.counter; // number of renewals so far + try { + Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew + } catch (InterruptedException e) {} + LOG.info("Counter = " + Renewer.counter + ";t="+ Renewer.lastRenewed); + + // counter and the token should stil be the old ones + assertEquals("renew wasn't called as many times as expected", + numberOfExpectedRenewals, Renewer.counter); + + // also renewing of the cancelled token should fail + try { + token4.renew(conf); + assertTrue("Renewal of canceled token didn't fail", false); + } catch (InvalidToken ite) { + //expected + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java index 961317db97..f567419545 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java @@ -120,7 +120,7 @@ public static RMContext mockRMContext(int numApps, int racks, int numNodes, for (RMNode node : nodes) { nodesMap.put(node.getNodeID(), node); } - return new RMContextImpl(new MemStore(), null, null, null) { + return new RMContextImpl(new MemStore(), null, null, null, null) { @Override public ConcurrentMap getRMApps() { return applicationsMaps; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer new file mode 100644 index 0000000000..24c0713a5d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer @@ -0,0 +1 @@ +org.apache.hadoop.yarn.server.resourcemanager.security.TestDelegationTokenRenewer$Renewer