YARN-1152. Fixed a bug in ResourceManager that was causing clients to get invalid client token key errors when an appliation is about to finish. Contributed by Jason Lowe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1521292 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f2e0a125f4
commit
1a649aa51a
@ -168,6 +168,10 @@ Release 2.1.1-beta - UNRELEASED
|
||||
YARN-1144. Unmanaged AMs registering a tracking URI should not be
|
||||
proxy-fied. (tucu)
|
||||
|
||||
YARN-1152. Fixed a bug in ResourceManager that was causing clients to get
|
||||
invalid client token key errors when an appliation is about to finish.
|
||||
(Jason Lowe via vinodkv)
|
||||
|
||||
Release 2.1.0-beta - 2013-08-22
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -432,19 +432,19 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
|
||||
currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
|
||||
trackingUrl = this.currentAttempt.getTrackingUrl();
|
||||
origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
|
||||
if (UserGroupInformation.isSecurityEnabled()
|
||||
&& clientUserName != null) {
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
// get a token so the client can communicate with the app attempt
|
||||
// NOTE: token may be unavailable if the attempt is not running
|
||||
Token<ClientToAMTokenIdentifier> attemptClientToAMToken =
|
||||
new Token<ClientToAMTokenIdentifier>(
|
||||
new ClientToAMTokenIdentifier(
|
||||
currentApplicationAttemptId, clientUserName),
|
||||
rmContext.getClientToAMTokenSecretManager());
|
||||
this.currentAttempt.createClientToken(clientUserName);
|
||||
if (attemptClientToAMToken != null) {
|
||||
clientToAMToken = BuilderUtils.newClientToAMToken(
|
||||
attemptClientToAMToken.getIdentifier(),
|
||||
attemptClientToAMToken.getKind().toString(),
|
||||
attemptClientToAMToken.getPassword(),
|
||||
attemptClientToAMToken.getService().toString());
|
||||
}
|
||||
}
|
||||
host = this.currentAttempt.getHost();
|
||||
rpcPort = this.currentAttempt.getRpcPort();
|
||||
appUsageReport = currentAttempt.getApplicationResourceUsageReport();
|
||||
|
@ -34,6 +34,7 @@
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
|
||||
/**
|
||||
@ -155,6 +156,13 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
|
||||
*/
|
||||
SecretKey getClientTokenMasterKey();
|
||||
|
||||
/**
|
||||
* Create a token for authenticating a client connection to the app attempt
|
||||
* @param clientName the name of the client requesting the token
|
||||
* @return the token or null if the attempt is not running
|
||||
*/
|
||||
Token<ClientToAMTokenIdentifier> createClientToken(String clientName);
|
||||
|
||||
/**
|
||||
* Get application container and resource usage information.
|
||||
* @return an ApplicationResourceUsageReport object.
|
||||
|
@ -61,6 +61,7 @@
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
||||
@ -89,6 +90,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
|
||||
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
||||
@ -508,6 +510,26 @@ public Token<AMRMTokenIdentifier> getAMRMToken() {
|
||||
return this.amrmToken;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<ClientToAMTokenIdentifier> createClientToken(String client) {
|
||||
this.readLock.lock();
|
||||
|
||||
try {
|
||||
Token<ClientToAMTokenIdentifier> token = null;
|
||||
ClientToAMTokenSecretManagerInRM secretMgr =
|
||||
this.rmContext.getClientToAMTokenSecretManager();
|
||||
if (client != null &&
|
||||
secretMgr.getMasterKey(this.applicationAttemptId) != null) {
|
||||
token = new Token<ClientToAMTokenIdentifier>(
|
||||
new ClientToAMTokenIdentifier(this.applicationAttemptId, client),
|
||||
secretMgr);
|
||||
}
|
||||
return token;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDiagnostics() {
|
||||
this.readLock.lock();
|
||||
|
@ -19,14 +19,20 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||
import org.apache.hadoop.yarn.MockApps;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
@ -57,11 +63,16 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class TestRMAppTransitions {
|
||||
static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class);
|
||||
|
||||
private boolean isSecurityEnabled;
|
||||
private Configuration conf;
|
||||
private RMContext rmContext;
|
||||
private static int maxAppAttempts =
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS;
|
||||
@ -133,9 +144,28 @@ public void handle(SchedulerEvent event) {
|
||||
}
|
||||
}
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection<Object[]> getTestParameters() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{ Boolean.FALSE },
|
||||
{ Boolean.TRUE }
|
||||
});
|
||||
}
|
||||
|
||||
public TestRMAppTransitions(boolean isSecurityEnabled) {
|
||||
this.isSecurityEnabled = isSecurityEnabled;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf = new YarnConfiguration();
|
||||
AuthenticationMethod authMethod = AuthenticationMethod.SIMPLE;
|
||||
if (isSecurityEnabled) {
|
||||
authMethod = AuthenticationMethod.KERBEROS;
|
||||
}
|
||||
SecurityUtil.setAuthenticationMethod(authMethod, conf);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
|
||||
rmDispatcher = new DrainDispatcher();
|
||||
ContainerAllocationExpirer containerAllocationExpirer =
|
||||
mock(ContainerAllocationExpirer.class);
|
||||
@ -171,7 +201,6 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext)
|
||||
String user = MockApps.newUserName();
|
||||
String name = MockApps.newAppName();
|
||||
String queue = MockApps.newQueue();
|
||||
Configuration conf = new YarnConfiguration();
|
||||
// ensure max application attempts set to known value
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, maxAppAttempts);
|
||||
YarnScheduler scheduler = mock(YarnScheduler.class);
|
||||
@ -191,6 +220,8 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext)
|
||||
System.currentTimeMillis(), "YARN");
|
||||
|
||||
testAppStartState(applicationId, user, name, queue, application);
|
||||
this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(),
|
||||
application);
|
||||
return application;
|
||||
}
|
||||
|
||||
@ -488,8 +519,6 @@ public void testAppSubmittedKill() throws IOException, InterruptedException {
|
||||
// SUBMITTED => KILLED event RMAppEventType.KILL
|
||||
RMAppEvent event = new RMAppEvent(application.getApplicationId(),
|
||||
RMAppEventType.KILL);
|
||||
this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(),
|
||||
application);
|
||||
application.handle(event);
|
||||
rmDispatcher.await();
|
||||
assertKilled(application);
|
||||
@ -535,8 +564,6 @@ public void testAppAcceptedKill() throws IOException, InterruptedException {
|
||||
// ACCEPTED => KILLED event RMAppEventType.KILL
|
||||
RMAppEvent event = new RMAppEvent(application.getApplicationId(),
|
||||
RMAppEventType.KILL);
|
||||
this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(),
|
||||
application);
|
||||
application.handle(event);
|
||||
rmDispatcher.await();
|
||||
assertKilled(application);
|
||||
@ -731,4 +758,33 @@ public void testGetAppReport() {
|
||||
report = app.createAndGetApplicationReport("clientuser", true);
|
||||
Assert.assertNotNull(report.getApplicationResourceUsageReport());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientTokens() throws Exception {
|
||||
assumeTrue(isSecurityEnabled);
|
||||
|
||||
RMApp app = createNewTestApp(null);
|
||||
assertAppState(RMAppState.NEW, app);
|
||||
ApplicationReport report = app.createAndGetApplicationReport(null, true);
|
||||
Assert.assertNull(report.getClientToAMToken());
|
||||
report = app.createAndGetApplicationReport("clientuser", true);
|
||||
Assert.assertNull(report.getClientToAMToken());
|
||||
|
||||
app = testCreateAppRunning(null);
|
||||
rmDispatcher.await();
|
||||
assertAppState(RMAppState.RUNNING, app);
|
||||
report = app.createAndGetApplicationReport(null, true);
|
||||
Assert.assertNull(report.getClientToAMToken());
|
||||
report = app.createAndGetApplicationReport("clientuser", true);
|
||||
Assert.assertNotNull(report.getClientToAMToken());
|
||||
|
||||
// kill the app attempt and verify client token is unavailable
|
||||
app.handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL));
|
||||
rmDispatcher.await();
|
||||
assertAppAndAttemptKilled(app);
|
||||
report = app.createAndGetApplicationReport(null, true);
|
||||
Assert.assertNull(report.getClientToAMToken());
|
||||
report = app.createAndGetApplicationReport("clientuser", true);
|
||||
Assert.assertNull(report.getClientToAMToken());
|
||||
}
|
||||
}
|
||||
|
@ -30,13 +30,17 @@
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||
import org.apache.hadoop.yarn.MockApps;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
@ -85,7 +89,10 @@
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class TestRMAppAttemptTransitions {
|
||||
|
||||
private static final Log LOG =
|
||||
@ -95,6 +102,7 @@ public class TestRMAppAttemptTransitions {
|
||||
private static final String RM_WEBAPP_ADDR =
|
||||
YarnConfiguration.getRMWebAppHostAndPort(new Configuration());
|
||||
|
||||
private boolean isSecurityEnabled;
|
||||
private RMContext rmContext;
|
||||
private YarnScheduler scheduler;
|
||||
private ApplicationMasterService masterService;
|
||||
@ -162,8 +170,26 @@ public void handle(AMLauncherEvent event) {
|
||||
private ApplicationSubmissionContext submissionContext = null;
|
||||
private boolean unmanagedAM;
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection<Object[]> getTestParameters() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{ Boolean.FALSE },
|
||||
{ Boolean.TRUE }
|
||||
});
|
||||
}
|
||||
|
||||
public TestRMAppAttemptTransitions(Boolean isSecurityEnabled) {
|
||||
this.isSecurityEnabled = isSecurityEnabled;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
AuthenticationMethod authMethod = AuthenticationMethod.SIMPLE;
|
||||
if (isSecurityEnabled) {
|
||||
authMethod = AuthenticationMethod.KERBEROS;
|
||||
}
|
||||
SecurityUtil.setAuthenticationMethod(authMethod, conf);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
InlineDispatcher rmDispatcher = new InlineDispatcher();
|
||||
|
||||
ContainerAllocationExpirer containerAllocationExpirer =
|
||||
@ -270,7 +296,9 @@ private void testAppAttemptSubmittedState() {
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
verify(clientToAMTokenManager).registerApplication(
|
||||
applicationAttempt.getAppAttemptId());
|
||||
assertNotNull(applicationAttempt.createClientToken("some client"));
|
||||
}
|
||||
assertNull(applicationAttempt.createClientToken(null));
|
||||
assertNotNull(applicationAttempt.getAMRMToken());
|
||||
// Check events
|
||||
verify(masterService).
|
||||
@ -883,6 +911,9 @@ private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
|
||||
verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId);
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
verify(clientToAMTokenManager, times(count)).unRegisterApplication(appAttemptId);
|
||||
if (count > 0) {
|
||||
assertNull(applicationAttempt.createClientToken("client"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user