YARN-2964. RM prematurely cancels tokens for jobs that submit jobs (oozie). Contributed by Jian He
This commit is contained in:
parent
5df7ecb33a
commit
0402bada19
@ -244,6 +244,9 @@ Release 2.7.0 - UNRELEASED
|
||||
YARN-2944. InMemorySCMStore can not be instantiated with ReflectionUtils#newInstance.
|
||||
(Chris Trezzo via kasha)
|
||||
|
||||
YARN-2964. RM prematurely cancels tokens for jobs that submit jobs (oozie)
|
||||
(Jian He via jlowe)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -69,7 +69,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* Service to renew application delegation tokens.
|
||||
*/
|
||||
@ -94,6 +93,9 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||
private ConcurrentMap<ApplicationId, Set<DelegationTokenToRenew>> appTokens =
|
||||
new ConcurrentHashMap<ApplicationId, Set<DelegationTokenToRenew>>();
|
||||
|
||||
private ConcurrentMap<Token<?>, DelegationTokenToRenew> allTokens =
|
||||
new ConcurrentHashMap<Token<?>, DelegationTokenToRenew>();
|
||||
|
||||
private final ConcurrentMap<ApplicationId, Long> delayedRemovalMap =
|
||||
new ConcurrentHashMap<ApplicationId, Long>();
|
||||
|
||||
@ -202,6 +204,7 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||
renewalTimer.cancel();
|
||||
}
|
||||
appTokens.clear();
|
||||
allTokens.clear();
|
||||
this.renewerService.shutdown();
|
||||
dtCancelThread.interrupt();
|
||||
try {
|
||||
@ -230,7 +233,7 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||
public final Configuration conf;
|
||||
public long expirationDate;
|
||||
public TimerTask timerTask;
|
||||
public final boolean shouldCancelAtEnd;
|
||||
public volatile boolean shouldCancelAtEnd;
|
||||
public long maxDate;
|
||||
public String user;
|
||||
|
||||
@ -407,12 +410,25 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||
boolean hasHdfsToken = false;
|
||||
for (Token<?> token : tokens) {
|
||||
if (token.isManaged()) {
|
||||
tokenList.add(new DelegationTokenToRenew(applicationId,
|
||||
token, getConfig(), now, shouldCancelAtEnd, evt.getUser()));
|
||||
if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
|
||||
LOG.info(applicationId + " found existing hdfs token " + token);
|
||||
hasHdfsToken = true;
|
||||
}
|
||||
|
||||
DelegationTokenToRenew dttr = allTokens.get(token);
|
||||
if (dttr != null) {
|
||||
// If any of the jobs sharing the same token doesn't want to cancel
|
||||
// the token, we should not cancel the token.
|
||||
if (!evt.shouldCancelAtEnd) {
|
||||
dttr.shouldCancelAtEnd = evt.shouldCancelAtEnd;
|
||||
LOG.info("Set shouldCancelAtEnd=" + shouldCancelAtEnd
|
||||
+ " for token " + dttr.token);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
tokenList.add(new DelegationTokenToRenew(applicationId, token,
|
||||
getConfig(), now, shouldCancelAtEnd, evt.getUser()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -429,6 +445,7 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||
}
|
||||
for (DelegationTokenToRenew dtr : tokenList) {
|
||||
appTokens.get(applicationId).add(dtr);
|
||||
allTokens.put(dtr.token, dtr);
|
||||
setTimerForTokenRenewal(dtr);
|
||||
}
|
||||
}
|
||||
@ -496,7 +513,6 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||
token.setTimerTask(tTask); // keep reference to the timer
|
||||
|
||||
renewalTimer.schedule(token.timerTask, new Date(renewIn));
|
||||
|
||||
LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = "
|
||||
+ token.applicationId);
|
||||
}
|
||||
@ -559,6 +575,10 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||
private void requestNewHdfsDelegationToken(ApplicationId applicationId,
|
||||
String user, boolean shouldCancelAtEnd) throws IOException,
|
||||
InterruptedException {
|
||||
if (!hasProxyUserPrivileges) {
|
||||
LOG.info("RM proxy-user privilege is not enabled. Skip requesting hdfs tokens.");
|
||||
return;
|
||||
}
|
||||
// Get new hdfs tokens for this user
|
||||
Credentials credentials = new Credentials();
|
||||
Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);
|
||||
@ -621,6 +641,8 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||
LOG.error("removing failed delegation token for appid=" + applicationId
|
||||
+ ";t=" + t.token.getService());
|
||||
appTokens.get(applicationId).remove(t);
|
||||
allTokens.remove(t.token);
|
||||
|
||||
// cancel the timer
|
||||
if (t.timerTask != null) {
|
||||
t.timerTask.cancel();
|
||||
@ -685,9 +707,14 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||
cancelToken(dttr);
|
||||
|
||||
it.remove();
|
||||
allTokens.remove(dttr.token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(tokens != null && tokens.isEmpty()) {
|
||||
appTokens.remove(applicationId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -842,4 +869,9 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||
return appId;
|
||||
}
|
||||
}
|
||||
|
||||
// only for testing
|
||||
protected ConcurrentMap<Token<?>, DelegationTokenToRenew> getAllTokens() {
|
||||
return allTokens;
|
||||
}
|
||||
}
|
||||
|
@ -313,7 +313,7 @@ public class MockRM extends ResourceManager {
|
||||
boolean waitForAccepted, boolean keepContainers) throws Exception {
|
||||
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
|
||||
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
|
||||
false, null, 0, null);
|
||||
false, null, 0, null, true);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
|
||||
@ -322,7 +322,7 @@ public class MockRM extends ResourceManager {
|
||||
.getShortUserName(), null, false, null,
|
||||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
|
||||
false, null, attemptFailuresValidityInterval, null);
|
||||
false, null, attemptFailuresValidityInterval, null, true);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, String name, String user,
|
||||
@ -332,26 +332,24 @@ public class MockRM extends ResourceManager {
|
||||
ApplicationId applicationId) throws Exception {
|
||||
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
|
||||
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
|
||||
isAppIdProvided, applicationId, 0, null);
|
||||
isAppIdProvided, applicationId, 0, null, true);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public RMApp submitApp(int masterMemory,
|
||||
LogAggregationContext logAggregationContext) throws Exception {
|
||||
return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
|
||||
.getShortUserName(), null, false, null,
|
||||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
|
||||
false, null, 0, logAggregationContext);
|
||||
false, null, 0, logAggregationContext, true);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public RMApp submitApp(int masterMemory, String name, String user,
|
||||
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
||||
int maxAppAttempts, Credentials ts, String appType,
|
||||
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
|
||||
ApplicationId applicationId, long attemptFailuresValidityInterval,
|
||||
LogAggregationContext logAggregationContext)
|
||||
LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete)
|
||||
throws Exception {
|
||||
ApplicationId appId = isAppIdProvided ? applicationId : null;
|
||||
ApplicationClientProtocol client = getClientRMService();
|
||||
@ -392,6 +390,7 @@ public class MockRM extends ResourceManager {
|
||||
if (logAggregationContext != null) {
|
||||
sub.setLogAggregationContext(logAggregationContext);
|
||||
}
|
||||
sub.setCancelTokensWhenComplete(cancelTokensWhenComplete);
|
||||
req.setApplicationSubmissionContext(sub);
|
||||
UserGroupInformation fakeUser =
|
||||
UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});
|
||||
|
@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
@ -86,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityM
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.After;
|
||||
@ -116,11 +118,12 @@ public class TestDelegationTokenRenewer {
|
||||
private static int counter = 0;
|
||||
private static Token<?> lastRenewed = null;
|
||||
private static Token<?> tokenToRenewIn2Sec = null;
|
||||
|
||||
private static boolean cancelled = false;
|
||||
private static void reset() {
|
||||
counter = 0;
|
||||
lastRenewed = null;
|
||||
tokenToRenewIn2Sec = null;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -136,7 +139,8 @@ public class TestDelegationTokenRenewer {
|
||||
@Override
|
||||
public long renew(Token<?> t, Configuration conf) throws IOException {
|
||||
if ( !(t instanceof MyToken)) {
|
||||
return DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
|
||||
// renew in 3 seconds
|
||||
return System.currentTimeMillis() + 3000;
|
||||
}
|
||||
MyToken token = (MyToken)t;
|
||||
if(token.isCanceled()) {
|
||||
@ -158,9 +162,12 @@ public class TestDelegationTokenRenewer {
|
||||
|
||||
@Override
|
||||
public void cancel(Token<?> t, Configuration conf) {
|
||||
MyToken token = (MyToken)t;
|
||||
LOG.info("Cancel token " + token);
|
||||
token.cancelToken();
|
||||
cancelled = true;
|
||||
if (t instanceof MyToken) {
|
||||
MyToken token = (MyToken) t;
|
||||
LOG.info("Cancel token " + token);
|
||||
token.cancelToken();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -921,6 +928,7 @@ public class TestDelegationTokenRenewer {
|
||||
// YARN will get the token for the app submitted without the delegation token.
|
||||
@Test
|
||||
public void testAppSubmissionWithoutDelegationToken() throws Exception {
|
||||
conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true);
|
||||
// create token2
|
||||
Text userText2 = new Text("user2");
|
||||
DelegationTokenIdentifier dtId2 =
|
||||
@ -970,4 +978,48 @@ public class TestDelegationTokenRenewer {
|
||||
appCredentials.readTokenStorageStream(buf);
|
||||
Assert.assertTrue(appCredentials.getAllTokens().contains(token2));
|
||||
}
|
||||
|
||||
// Test submitting an application with the token obtained by a previously
|
||||
// submitted application.
|
||||
@Test (timeout = 30000)
|
||||
public void testAppSubmissionWithPreviousToken() throws Exception{
|
||||
MockRM rm = new TestSecurityMockRM(conf, null);
|
||||
rm.start();
|
||||
final MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
// create Token1:
|
||||
Text userText1 = new Text("user");
|
||||
DelegationTokenIdentifier dtId1 =
|
||||
new DelegationTokenIdentifier(userText1, new Text("renewer1"),
|
||||
userText1);
|
||||
final Token<DelegationTokenIdentifier> token1 =
|
||||
new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
|
||||
"password1".getBytes(), dtId1.getKind(), new Text("service1"));
|
||||
|
||||
Credentials credentials = new Credentials();
|
||||
credentials.addToken(userText1, token1);
|
||||
|
||||
// submit app1 with a token, set cancelTokenWhenComplete to false;
|
||||
RMApp app1 =
|
||||
rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
|
||||
null, true, false, false, null, 0, null, false);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
||||
|
||||
// submit app2 with the same token, set cancelTokenWhenComplete to true;
|
||||
RMApp app2 =
|
||||
rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
|
||||
null, true, false, false, null, 0, null, true);
|
||||
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
|
||||
rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING);
|
||||
MockRM.finishAMAndVerifyAppState(app2, rm, nm1, am2);
|
||||
Assert.assertTrue(rm.getRMContext().getDelegationTokenRenewer()
|
||||
.getAllTokens().containsKey(token1));
|
||||
|
||||
MockRM.finishAMAndVerifyAppState(app1, rm, nm1, am1);
|
||||
// app2 completes, app1 is still running, check the token is not cancelled
|
||||
Assert.assertFalse(Renewer.cancelled);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user