YARN-1812. Fixed ResourceManager to synchrously renew tokens after recovery and thus recover app itself synchronously and avoid races with resyncing NodeManagers. Contributed by Jian He.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1576843 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4ce0e4bf2e
commit
4de17c6052
@ -453,6 +453,10 @@ Release 2.4.0 - UNRELEASED
|
|||||||
specify host/rack requests without off-switch request. (Wangda Tan via
|
specify host/rack requests without off-switch request. (Wangda Tan via
|
||||||
acmurthy)
|
acmurthy)
|
||||||
|
|
||||||
|
YARN-1812. Fixed ResourceManager to synchrously renew tokens after recovery
|
||||||
|
and thus recover app itself synchronously and avoid races with resyncing
|
||||||
|
NodeManagers. (Jian He via vinodkv)
|
||||||
|
|
||||||
Release 2.3.1 - UNRELEASED
|
Release 2.3.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -338,7 +338,7 @@ public SubmitApplicationResponse submitApplication(
|
|||||||
try {
|
try {
|
||||||
// call RMAppManager to submit application directly
|
// call RMAppManager to submit application directly
|
||||||
rmAppManager.submitApplication(submissionContext,
|
rmAppManager.submitApplication(submissionContext,
|
||||||
System.currentTimeMillis(), user, false, null);
|
System.currentTimeMillis(), user);
|
||||||
|
|
||||||
LOG.info("Application with id " + applicationId.getId() +
|
LOG.info("Application with id " + applicationId.getId() +
|
||||||
" submitted by user " + user);
|
" submitted by user " + user);
|
||||||
|
@ -263,48 +263,75 @@ protected synchronized void checkAppNumCompletedLimit() {
|
|||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
protected void submitApplication(
|
protected void submitApplication(
|
||||||
ApplicationSubmissionContext submissionContext, long submitTime,
|
ApplicationSubmissionContext submissionContext, long submitTime,
|
||||||
String user, boolean isRecovered, RMState state) throws YarnException {
|
String user) throws YarnException {
|
||||||
ApplicationId applicationId = submissionContext.getApplicationId();
|
ApplicationId applicationId = submissionContext.getApplicationId();
|
||||||
|
|
||||||
RMAppImpl application =
|
RMAppImpl application =
|
||||||
createAndPopulateNewRMApp(submissionContext, submitTime, user);
|
createAndPopulateNewRMApp(submissionContext, submitTime, user);
|
||||||
|
ApplicationId appId = submissionContext.getApplicationId();
|
||||||
if (isRecovered) {
|
|
||||||
recoverApplication(state, application);
|
|
||||||
RMAppState rmAppState =
|
|
||||||
state.getApplicationState().get(applicationId).getState();
|
|
||||||
if (isApplicationInFinalState(rmAppState)) {
|
|
||||||
// We are synchronously moving the application into final state so that
|
|
||||||
// momentarily client will not see this application in NEW state. Also
|
|
||||||
// for finished applications we will avoid renewing tokens.
|
|
||||||
application
|
|
||||||
.handle(new RMAppEvent(applicationId, RMAppEventType.RECOVER));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
Credentials credentials = null;
|
Credentials credentials = null;
|
||||||
try {
|
try {
|
||||||
credentials = parseCredentials(submissionContext);
|
credentials = parseCredentials(submissionContext);
|
||||||
|
this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
|
||||||
|
credentials, submissionContext.getCancelTokensWhenComplete());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn(
|
LOG.warn("Unable to parse credentials.", e);
|
||||||
"Unable to parse credentials.", e);
|
|
||||||
// Sending APP_REJECTED is fine, since we assume that the
|
// Sending APP_REJECTED is fine, since we assume that the
|
||||||
// RMApp is in NEW state and thus we haven't yet informed the
|
// RMApp is in NEW state and thus we haven't yet informed the
|
||||||
// scheduler about the existence of the application
|
// scheduler about the existence of the application
|
||||||
assert application.getState() == RMAppState.NEW;
|
assert application.getState() == RMAppState.NEW;
|
||||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
this.rmContext.getDispatcher().getEventHandler()
|
||||||
new RMAppRejectedEvent(applicationId, e.getMessage()));
|
.handle(new RMAppRejectedEvent(applicationId, e.getMessage()));
|
||||||
throw RPCUtil.getRemoteException(e);
|
throw RPCUtil.getRemoteException(e);
|
||||||
}
|
}
|
||||||
this.rmContext.getDelegationTokenRenewer().addApplication(
|
|
||||||
applicationId, credentials,
|
|
||||||
submissionContext.getCancelTokensWhenComplete(), isRecovered);
|
|
||||||
} else {
|
} else {
|
||||||
|
// Dispatcher is not yet started at this time, so these START events
|
||||||
|
// enqueued should be guaranteed to be first processed when dispatcher
|
||||||
|
// gets started.
|
||||||
this.rmContext.getDispatcher().getEventHandler()
|
this.rmContext.getDispatcher().getEventHandler()
|
||||||
.handle(new RMAppEvent(applicationId,
|
.handle(new RMAppEvent(applicationId, RMAppEventType.START));
|
||||||
isRecovered ? RMAppEventType.RECOVER : RMAppEventType.START));
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
protected void
|
||||||
|
recoverApplication(ApplicationState appState, RMState rmState)
|
||||||
|
throws Exception {
|
||||||
|
ApplicationSubmissionContext appContext =
|
||||||
|
appState.getApplicationSubmissionContext();
|
||||||
|
ApplicationId appId = appState.getAppId();
|
||||||
|
|
||||||
|
// create and recover app.
|
||||||
|
RMAppImpl application =
|
||||||
|
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
|
||||||
|
appState.getUser());
|
||||||
|
application.recover(rmState);
|
||||||
|
if (isApplicationInFinalState(appState.getState())) {
|
||||||
|
// We are synchronously moving the application into final state so that
|
||||||
|
// momentarily client will not see this application in NEW state. Also
|
||||||
|
// for finished applications we will avoid renewing tokens.
|
||||||
|
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
Credentials credentials = null;
|
||||||
|
try {
|
||||||
|
credentials = parseCredentials(appContext);
|
||||||
|
// synchronously renew delegation token on recovery.
|
||||||
|
rmContext.getDelegationTokenRenewer().addApplicationSync(appId,
|
||||||
|
credentials, appContext.getCancelTokensWhenComplete());
|
||||||
|
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Unable to parse and renew delegation tokens.", e);
|
||||||
|
this.rmContext.getDispatcher().getEventHandler()
|
||||||
|
.handle(new RMAppRejectedEvent(appId, e.getMessage()));
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -363,16 +390,6 @@ private void validateResourceRequest(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void recoverApplication(RMState state, RMAppImpl application)
|
|
||||||
throws YarnException {
|
|
||||||
try {
|
|
||||||
application.recover(state);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Error recovering application", e);
|
|
||||||
throw new YarnException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isApplicationInFinalState(RMAppState rmAppState) {
|
private boolean isApplicationInFinalState(RMAppState rmAppState) {
|
||||||
if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED
|
if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED
|
||||||
|| rmAppState == RMAppState.KILLED) {
|
|| rmAppState == RMAppState.KILLED) {
|
||||||
@ -403,8 +420,7 @@ public void recover(RMState state) throws Exception {
|
|||||||
Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
|
Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
|
||||||
LOG.info("Recovering " + appStates.size() + " applications");
|
LOG.info("Recovering " + appStates.size() + " applications");
|
||||||
for (ApplicationState appState : appStates.values()) {
|
for (ApplicationState appState : appStates.values()) {
|
||||||
submitApplication(appState.getApplicationSubmissionContext(),
|
recoverApplication(appState, state);
|
||||||
appState.getSubmitTime(), appState.getUser(), true, state);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -731,7 +731,9 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
* Therefore we should wait for it to finish.
|
* Therefore we should wait for it to finish.
|
||||||
*/
|
*/
|
||||||
for (RMAppAttempt attempt : app.getAppAttempts().values()) {
|
for (RMAppAttempt attempt : app.getAppAttempts().values()) {
|
||||||
app.dispatcher.getEventHandler().handle(
|
// synchronously recover attempt to ensure any incoming external events
|
||||||
|
// to be processed after the attempt processes the recover event.
|
||||||
|
attempt.handle(
|
||||||
new RMAppAttemptEvent(attempt.getAppAttemptId(),
|
new RMAppAttemptEvent(attempt.getAppAttemptId(),
|
||||||
RMAppAttemptEventType.RECOVER));
|
RMAppAttemptEventType.RECOVER));
|
||||||
}
|
}
|
||||||
|
@ -114,6 +114,7 @@ protected synchronized void serviceInit(Configuration conf) throws Exception {
|
|||||||
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
|
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
|
||||||
renewerService = createNewThreadPoolService(conf);
|
renewerService = createNewThreadPoolService(conf);
|
||||||
pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
|
pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
|
||||||
|
renewalTimer = new Timer(true);
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -136,7 +137,6 @@ protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) {
|
|||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
dtCancelThread.start();
|
dtCancelThread.start();
|
||||||
renewalTimer = new Timer(true);
|
|
||||||
if (tokenKeepAliveEnabled) {
|
if (tokenKeepAliveEnabled) {
|
||||||
delayedRemovalThread =
|
delayedRemovalThread =
|
||||||
new Thread(new DelayedTokenRemovalRunnable(getConfig()),
|
new Thread(new DelayedTokenRemovalRunnable(getConfig()),
|
||||||
@ -151,12 +151,12 @@ protected void serviceStart() throws Exception {
|
|||||||
isServiceStarted = true;
|
isServiceStarted = true;
|
||||||
serviceStateLock.writeLock().unlock();
|
serviceStateLock.writeLock().unlock();
|
||||||
while(!pendingEventQueue.isEmpty()) {
|
while(!pendingEventQueue.isEmpty()) {
|
||||||
processDelegationTokenRewewerEvent(pendingEventQueue.take());
|
processDelegationTokenRenewerEvent(pendingEventQueue.take());
|
||||||
}
|
}
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processDelegationTokenRewewerEvent(
|
private void processDelegationTokenRenewerEvent(
|
||||||
DelegationTokenRenewerEvent evt) {
|
DelegationTokenRenewerEvent evt) {
|
||||||
serviceStateLock.readLock().lock();
|
serviceStateLock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
@ -325,19 +325,26 @@ public Set<Token<?>> getDelegationTokens() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add application tokens for renewal.
|
* Asynchronously add application tokens for renewal.
|
||||||
* @param applicationId added application
|
* @param applicationId added application
|
||||||
* @param ts tokens
|
* @param ts tokens
|
||||||
* @param shouldCancelAtEnd true if tokens should be canceled when the app is
|
* @param shouldCancelAtEnd true if tokens should be canceled when the app is
|
||||||
* done else false.
|
* done else false.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void addApplication(
|
public void addApplicationAsync(ApplicationId applicationId, Credentials ts,
|
||||||
ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd,
|
boolean shouldCancelAtEnd) {
|
||||||
boolean isApplicationRecovered) {
|
processDelegationTokenRenewerEvent(new DelegationTokenRenewerAppSubmitEvent(
|
||||||
processDelegationTokenRewewerEvent(new DelegationTokenRenewerAppSubmitEvent(
|
applicationId, ts, shouldCancelAtEnd));
|
||||||
applicationId, ts,
|
}
|
||||||
shouldCancelAtEnd, isApplicationRecovered));
|
|
||||||
|
/**
|
||||||
|
* Synchronously renew delegation tokens.
|
||||||
|
*/
|
||||||
|
public void addApplicationSync(ApplicationId applicationId, Credentials ts,
|
||||||
|
boolean shouldCancelAtEnd) throws IOException{
|
||||||
|
handleAppSubmitEvent(new DelegationTokenRenewerAppSubmitEvent(
|
||||||
|
applicationId, ts, shouldCancelAtEnd));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
|
private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
|
||||||
@ -493,7 +500,7 @@ private void removeFailedDelegationToken(DelegationTokenToRenew t) {
|
|||||||
* @param applicationId completed application
|
* @param applicationId completed application
|
||||||
*/
|
*/
|
||||||
public void applicationFinished(ApplicationId applicationId) {
|
public void applicationFinished(ApplicationId applicationId) {
|
||||||
processDelegationTokenRewewerEvent(new DelegationTokenRenewerEvent(
|
processDelegationTokenRenewerEvent(new DelegationTokenRenewerEvent(
|
||||||
applicationId,
|
applicationId,
|
||||||
DelegationTokenRenewerEventType.FINISH_APPLICATION));
|
DelegationTokenRenewerEventType.FINISH_APPLICATION));
|
||||||
}
|
}
|
||||||
@ -638,9 +645,7 @@ private void handleDTRenewerAppSubmitEvent(
|
|||||||
// Setup tokens for renewal
|
// Setup tokens for renewal
|
||||||
DelegationTokenRenewer.this.handleAppSubmitEvent(event);
|
DelegationTokenRenewer.this.handleAppSubmitEvent(event);
|
||||||
rmContext.getDispatcher().getEventHandler()
|
rmContext.getDispatcher().getEventHandler()
|
||||||
.handle(new RMAppEvent(event.getApplicationId(),
|
.handle(new RMAppEvent(event.getApplicationId(), RMAppEventType.START));
|
||||||
event.isApplicationRecovered() ? RMAppEventType.RECOVER
|
|
||||||
: RMAppEventType.START));
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.warn(
|
LOG.warn(
|
||||||
"Unable to add the application to the delegation token renewer.",
|
"Unable to add the application to the delegation token renewer.",
|
||||||
@ -654,20 +659,17 @@ private void handleDTRenewerAppSubmitEvent(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class DelegationTokenRenewerAppSubmitEvent extends
|
private static class DelegationTokenRenewerAppSubmitEvent extends
|
||||||
DelegationTokenRenewerEvent {
|
DelegationTokenRenewerEvent {
|
||||||
|
|
||||||
private Credentials credentials;
|
private Credentials credentials;
|
||||||
private boolean shouldCancelAtEnd;
|
private boolean shouldCancelAtEnd;
|
||||||
private boolean isAppRecovered;
|
|
||||||
|
|
||||||
public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
|
public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
|
||||||
Credentials credentails, boolean shouldCancelAtEnd,
|
Credentials credentails, boolean shouldCancelAtEnd) {
|
||||||
boolean isApplicationRecovered) {
|
|
||||||
super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
|
super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
|
||||||
this.credentials = credentails;
|
this.credentials = credentails;
|
||||||
this.shouldCancelAtEnd = shouldCancelAtEnd;
|
this.shouldCancelAtEnd = shouldCancelAtEnd;
|
||||||
this.isAppRecovered = isApplicationRecovered;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Credentials getCredentials() {
|
public Credentials getCredentials() {
|
||||||
@ -677,10 +679,6 @@ public Credentials getCredentials() {
|
|||||||
public boolean shouldCancelAtEnd() {
|
public boolean shouldCancelAtEnd() {
|
||||||
return shouldCancelAtEnd;
|
return shouldCancelAtEnd;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isApplicationRecovered() {
|
|
||||||
return isAppRecovered;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
enum DelegationTokenRenewerEventType {
|
enum DelegationTokenRenewerEventType {
|
||||||
@ -688,7 +686,7 @@ enum DelegationTokenRenewerEventType {
|
|||||||
FINISH_APPLICATION
|
FINISH_APPLICATION
|
||||||
}
|
}
|
||||||
|
|
||||||
class DelegationTokenRenewerEvent extends
|
private static class DelegationTokenRenewerEvent extends
|
||||||
AbstractEvent<DelegationTokenRenewerEventType> {
|
AbstractEvent<DelegationTokenRenewerEventType> {
|
||||||
|
|
||||||
private ApplicationId appId;
|
private ApplicationId appId;
|
||||||
|
@ -497,7 +497,7 @@ protected void startWepApp() {
|
|||||||
// override to disable webapp
|
// override to disable webapp
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
|
public static void finishAMAndVerifyAppState(RMApp rmApp, MockRM rm, MockNM nm,
|
||||||
MockAM am) throws Exception {
|
MockAM am) throws Exception {
|
||||||
FinishApplicationMasterRequest req =
|
FinishApplicationMasterRequest req =
|
||||||
FinishApplicationMasterRequest.newInstance(
|
FinishApplicationMasterRequest.newInstance(
|
||||||
|
@ -30,7 +30,6 @@
|
|||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
@ -142,7 +141,7 @@ public MyRMAppManager(RMContext context, YarnScheduler scheduler,
|
|||||||
@Override
|
@Override
|
||||||
protected void submitApplication(
|
protected void submitApplication(
|
||||||
ApplicationSubmissionContext submissionContext, long submitTime,
|
ApplicationSubmissionContext submissionContext, long submitTime,
|
||||||
String user, boolean isRecovered, RMState state) throws YarnException {
|
String user) throws YarnException {
|
||||||
//Do nothing, just add the application to RMContext
|
//Do nothing, just add the application to RMContext
|
||||||
RMAppImpl application =
|
RMAppImpl application =
|
||||||
new RMAppImpl(submissionContext.getApplicationId(), this.rmContext,
|
new RMAppImpl(submissionContext.getApplicationId(), this.rmContext,
|
||||||
|
@ -178,7 +178,7 @@ public void submitApplication(
|
|||||||
ApplicationSubmissionContext submissionContext, String user)
|
ApplicationSubmissionContext submissionContext, String user)
|
||||||
throws YarnException {
|
throws YarnException {
|
||||||
super.submitApplication(submissionContext, System.currentTimeMillis(),
|
super.submitApplication(submissionContext, System.currentTimeMillis(),
|
||||||
user, false, null);
|
user);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -414,7 +414,7 @@ public void testInvalidateAMHostPortWhenAMFailedOrKilled() throws Exception {
|
|||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
MockRM.finishApplicationMaster(app1, rm1, nm1, am1);
|
MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1);
|
||||||
|
|
||||||
// a failed app
|
// a failed app
|
||||||
RMApp app2 = rm1.submitApp(200);
|
RMApp app2 = rm1.submitApp(200);
|
||||||
|
@ -1709,6 +1709,63 @@ public void testDecomissionedNMsMetricsOnRMRestart() throws Exception {
|
|||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test Delegation token is renewed synchronously so that recover events
|
||||||
|
// can be processed before any other external incoming events, specifically
|
||||||
|
// the ContainerFinished event on NM re-registraton.
|
||||||
|
@Test (timeout = 20000)
|
||||||
|
public void testSynchronouslyRenewDTOnRecovery() throws Exception {
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
|
"kerberos");
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
|
||||||
|
// start RM
|
||||||
|
MockRM rm1 = new MockRM(conf, memStore);
|
||||||
|
rm1.start();
|
||||||
|
final MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
RMApp app0 = rm1.submitApp(200);
|
||||||
|
final MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
|
||||||
|
|
||||||
|
MockRM rm2 = new MockRM(conf, memStore) {
|
||||||
|
@Override
|
||||||
|
protected ResourceTrackerService createResourceTrackerService() {
|
||||||
|
return new ResourceTrackerService(this.rmContext,
|
||||||
|
this.nodesListManager, this.nmLivelinessMonitor,
|
||||||
|
this.rmContext.getContainerTokenSecretManager(),
|
||||||
|
this.rmContext.getNMTokenSecretManager()) {
|
||||||
|
@Override
|
||||||
|
protected void serviceStart() throws Exception {
|
||||||
|
// send the container_finished event as soon as the
|
||||||
|
// ResourceTrackerService is started.
|
||||||
|
super.serviceStart();
|
||||||
|
nm1.setResourceTrackerService(getResourceTrackerService());
|
||||||
|
List<ContainerStatus> status = new ArrayList<ContainerStatus>();
|
||||||
|
ContainerId amContainer =
|
||||||
|
ContainerId.newInstance(am0.getApplicationAttemptId(), 1);
|
||||||
|
status.add(ContainerStatus.newInstance(amContainer,
|
||||||
|
ContainerState.COMPLETE, "AM container exit", 143));
|
||||||
|
nm1.registerNode(status);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Re-start RM
|
||||||
|
rm2.start();
|
||||||
|
|
||||||
|
// wait for the 2nd attempt to be started.
|
||||||
|
RMApp loadedApp0 =
|
||||||
|
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
||||||
|
int timeoutSecs = 0;
|
||||||
|
while (loadedApp0.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {
|
||||||
|
Thread.sleep(200);
|
||||||
|
}
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1);
|
||||||
|
MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1);
|
||||||
|
}
|
||||||
|
|
||||||
private void writeToHostsFile(String... hosts) throws IOException {
|
private void writeToHostsFile(String... hosts) throws IOException {
|
||||||
if (!hostFile.exists()) {
|
if (!hostFile.exists()) {
|
||||||
TEMP_DIR.mkdirs();
|
TEMP_DIR.mkdirs();
|
||||||
|
@ -223,7 +223,7 @@ public void testAMRestartWithExistingContainers() throws Exception {
|
|||||||
((CapacityScheduler) rm1.getResourceScheduler())
|
((CapacityScheduler) rm1.getResourceScheduler())
|
||||||
.getCurrentAttemptForContainer(containerId2);
|
.getCurrentAttemptForContainer(containerId2);
|
||||||
// finish this application
|
// finish this application
|
||||||
MockRM.finishApplicationMaster(app1, rm1, nm1, am2);
|
MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am2);
|
||||||
|
|
||||||
// the 2nd attempt released the 1st attempt's running container, when the
|
// the 2nd attempt released the 1st attempt's running container, when the
|
||||||
// 2nd attempt finishes.
|
// 2nd attempt finishes.
|
||||||
|
@ -353,7 +353,7 @@ public void testDTRenewal () throws Exception {
|
|||||||
// register the tokens for renewal
|
// register the tokens for renewal
|
||||||
ApplicationId applicationId_0 =
|
ApplicationId applicationId_0 =
|
||||||
BuilderUtils.newApplicationId(0, 0);
|
BuilderUtils.newApplicationId(0, 0);
|
||||||
delegationTokenRenewer.addApplication(applicationId_0, ts, true, false);
|
delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true);
|
||||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||||
|
|
||||||
// first 3 initial renewals + 1 real
|
// first 3 initial renewals + 1 real
|
||||||
@ -393,7 +393,7 @@ public void testDTRenewal () throws Exception {
|
|||||||
|
|
||||||
|
|
||||||
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
|
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
|
||||||
delegationTokenRenewer.addApplication(applicationId_1, ts, true, false);
|
delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true);
|
||||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||||
delegationTokenRenewer.applicationFinished(applicationId_1);
|
delegationTokenRenewer.applicationFinished(applicationId_1);
|
||||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||||
@ -429,7 +429,7 @@ public void testAppRejectionWithCancelledDelegationToken() throws Exception {
|
|||||||
|
|
||||||
// register the tokens for renewal
|
// register the tokens for renewal
|
||||||
ApplicationId appId = BuilderUtils.newApplicationId(0, 0);
|
ApplicationId appId = BuilderUtils.newApplicationId(0, 0);
|
||||||
delegationTokenRenewer.addApplication(appId, ts, true, false);
|
delegationTokenRenewer.addApplicationAsync(appId, ts, true);
|
||||||
int waitCnt = 20;
|
int waitCnt = 20;
|
||||||
while (waitCnt-- >0) {
|
while (waitCnt-- >0) {
|
||||||
if (!eventQueue.isEmpty()) {
|
if (!eventQueue.isEmpty()) {
|
||||||
@ -473,7 +473,7 @@ public void testDTRenewalWithNoCancel () throws Exception {
|
|||||||
|
|
||||||
|
|
||||||
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
|
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
|
||||||
delegationTokenRenewer.addApplication(applicationId_1, ts, false, false);
|
delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false);
|
||||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||||
delegationTokenRenewer.applicationFinished(applicationId_1);
|
delegationTokenRenewer.applicationFinished(applicationId_1);
|
||||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||||
@ -540,7 +540,7 @@ public void testDTKeepAlive1 () throws Exception {
|
|||||||
|
|
||||||
// register the tokens for renewal
|
// register the tokens for renewal
|
||||||
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
|
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
|
||||||
localDtr.addApplication(applicationId_0, ts, true, false);
|
localDtr.addApplicationAsync(applicationId_0, ts, true);
|
||||||
waitForEventsToGetProcessed(localDtr);
|
waitForEventsToGetProcessed(localDtr);
|
||||||
if (!eventQueue.isEmpty()){
|
if (!eventQueue.isEmpty()){
|
||||||
Event evt = eventQueue.take();
|
Event evt = eventQueue.take();
|
||||||
@ -617,7 +617,7 @@ public void testDTKeepAlive2() throws Exception {
|
|||||||
|
|
||||||
// register the tokens for renewal
|
// register the tokens for renewal
|
||||||
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
|
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
|
||||||
localDtr.addApplication(applicationId_0, ts, true, false);
|
localDtr.addApplicationAsync(applicationId_0, ts, true);
|
||||||
localDtr.applicationFinished(applicationId_0);
|
localDtr.applicationFinished(applicationId_0);
|
||||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||||
//Send another keep alive.
|
//Send another keep alive.
|
||||||
@ -718,14 +718,14 @@ public Long answer(InvocationOnMock invocation)
|
|||||||
Thread submitThread = new Thread() {
|
Thread submitThread = new Thread() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
dtr.addApplication(mock(ApplicationId.class), creds1, false, false);
|
dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
submitThread.start();
|
submitThread.start();
|
||||||
|
|
||||||
// wait till 1st submit blocks, then submit another
|
// wait till 1st submit blocks, then submit another
|
||||||
startBarrier.await();
|
startBarrier.await();
|
||||||
dtr.addApplication(mock(ApplicationId.class), creds2, false, false);
|
dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false);
|
||||||
// signal 1st to complete
|
// signal 1st to complete
|
||||||
endBarrier.await();
|
endBarrier.await();
|
||||||
submitThread.join();
|
submitThread.join();
|
||||||
|
Loading…
Reference in New Issue
Block a user