YARN-709. Added tests to verify validity of delegation tokens and logging of appsummary after RM restart. Contributed by Jian He.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1543269 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e68e3e429d
commit
4341562622
@ -100,6 +100,9 @@ Release 2.3.0 - UNRELEASED
|
|||||||
YARN-1222. Make improvements in ZKRMStateStore for fencing (Karthik
|
YARN-1222. Make improvements in ZKRMStateStore for fencing (Karthik
|
||||||
Kambatla via bikas)
|
Kambatla via bikas)
|
||||||
|
|
||||||
|
YARN-709. Added tests to verify validity of delegation tokens and logging of
|
||||||
|
appsummary after RM restart. (Jian He via vinodkv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -54,6 +54,8 @@
|
|||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class manages the list of applications for the resource manager.
|
* This class manages the list of applications for the resource manager.
|
||||||
*/
|
*/
|
||||||
@ -165,6 +167,11 @@ public static void logAppSummary(RMApp app) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void logApplicationSummary(ApplicationId appId) {
|
||||||
|
ApplicationSummary.logAppSummary(rmContext.getRMApps().get(appId));
|
||||||
|
}
|
||||||
|
|
||||||
protected synchronized void setCompletedAppsMax(int max) {
|
protected synchronized void setCompletedAppsMax(int max) {
|
||||||
this.completedAppsMax = max;
|
this.completedAppsMax = max;
|
||||||
}
|
}
|
||||||
@ -351,8 +358,7 @@ public void handle(RMAppManagerEvent event) {
|
|||||||
case APP_COMPLETED:
|
case APP_COMPLETED:
|
||||||
{
|
{
|
||||||
finishApplication(applicationId);
|
finishApplication(applicationId);
|
||||||
ApplicationSummary.logAppSummary(
|
logApplicationSummary(applicationId);
|
||||||
rmContext.getRMApps().get(applicationId));
|
|
||||||
checkAppNumCompletedLimit();
|
checkAppNumCompletedLimit();
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -421,6 +421,10 @@ public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
|
|||||||
return this.clientToAMSecretManager;
|
return this.clientToAMSecretManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public RMAppManager getRMAppManager() {
|
||||||
|
return this.rmAppManager;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void startWepApp() {
|
protected void startWepApp() {
|
||||||
// override to disable webapp
|
// override to disable webapp
|
||||||
|
@ -18,6 +18,11 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.isA;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
@ -577,7 +582,14 @@ public void testRMRestartGetApplicationList() throws Exception {
|
|||||||
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.KILLED);
|
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.KILLED);
|
||||||
|
|
||||||
// restart rm
|
// restart rm
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
|
||||||
|
MockRM rm2 = new MockRM(conf, memStore) {
|
||||||
|
@Override
|
||||||
|
protected RMAppManager createRMAppManager() {
|
||||||
|
return spy(super.createRMAppManager());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
|
||||||
GetApplicationsRequest request1 =
|
GetApplicationsRequest request1 =
|
||||||
@ -620,6 +632,10 @@ public void testRMRestartGetApplicationList() throws Exception {
|
|||||||
rm2.getClientRMService().getApplications(request2);
|
rm2.getClientRMService().getApplications(request2);
|
||||||
List<ApplicationReport> appList2 = response2.getApplicationList();
|
List<ApplicationReport> appList2 = response2.getApplicationList();
|
||||||
Assert.assertTrue(3 == appList2.size());
|
Assert.assertTrue(3 == appList2.size());
|
||||||
|
|
||||||
|
// check application summary is logged for the completed apps after RM restart.
|
||||||
|
verify(rm2.getRMAppManager(), times(3)).logApplicationSummary(
|
||||||
|
isA(ApplicationId.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
|
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
|
||||||
@ -920,7 +936,6 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
|
|||||||
@Test
|
@Test
|
||||||
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
|
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
|
|
||||||
conf.set(
|
conf.set(
|
||||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
"kerberos");
|
"kerberos");
|
||||||
@ -1063,6 +1078,43 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
|
|||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This is to test submit an application to the new RM with the old delegation
|
||||||
|
// token got from previous RM.
|
||||||
|
@Test
|
||||||
|
public void testAppSubmissionWithOldDelegationTokenAfterRMRestart()
|
||||||
|
throws Exception {
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
|
"kerberos");
|
||||||
|
conf.set(YarnConfiguration.RM_ADDRESS, "localhost:8032");
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
|
||||||
|
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
|
||||||
|
rm1.start();
|
||||||
|
|
||||||
|
GetDelegationTokenRequest request1 =
|
||||||
|
GetDelegationTokenRequest.newInstance("renewer1");
|
||||||
|
UserGroupInformation.getCurrentUser().setAuthenticationMethod(
|
||||||
|
AuthMethod.KERBEROS);
|
||||||
|
GetDelegationTokenResponse response1 =
|
||||||
|
rm1.getClientRMService().getDelegationToken(request1);
|
||||||
|
Token<RMDelegationTokenIdentifier> token1 =
|
||||||
|
ConverterUtils.convertFromYarn(response1.getRMDelegationToken(), rmAddr);
|
||||||
|
|
||||||
|
// start new RM
|
||||||
|
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
|
||||||
|
rm2.start();
|
||||||
|
|
||||||
|
// submit an app with the old delegation token got from previous RM.
|
||||||
|
Credentials ts = new Credentials();
|
||||||
|
ts.addToken(token1.getService(), token1);
|
||||||
|
RMApp app = rm2.submitApp(200, "name", "user",
|
||||||
|
new HashMap<ApplicationAccessType, String>(), false, "default", 1, ts);
|
||||||
|
rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception {
|
public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception {
|
||||||
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
||||||
|
Loading…
Reference in New Issue
Block a user