YARN-2138. Cleaned up notifyDone* APIs in RMStateStore. Contributed by Varun Saxena
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1617341 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e60673697d
commit
c4dc685343
@ -110,6 +110,9 @@ Release 2.6.0 - UNRELEASED
|
||||
YARN-2337. ResourceManager sets ClientRMService in RMContext multiple times.
|
||||
(Zhihai Xu via kasha)
|
||||
|
||||
YARN-2138. Cleaned up notifyDone* APIs in RMStateStore. (Varun Saxena via
|
||||
jianhe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -52,13 +52,13 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
||||
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
||||
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
||||
import org.apache.hadoop.yarn.state.StateMachine;
|
||||
@ -132,7 +132,8 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
|
||||
LOG.info("Storing info for app: " + appId);
|
||||
try {
|
||||
store.storeApplicationStateInternal(appId, appStateData);
|
||||
store.notifyDoneStoringApplication(appId, null);
|
||||
store.notifyApplication(new RMAppEvent(appId,
|
||||
RMAppEventType.APP_NEW_SAVED));
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error storing app: " + appId, e);
|
||||
store.notifyStoreOperationFailed(e);
|
||||
@ -156,7 +157,8 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
|
||||
LOG.info("Updating info for app: " + appId);
|
||||
try {
|
||||
store.updateApplicationStateInternal(appId, appStateData);
|
||||
store.notifyDoneUpdatingApplication(appId, null);
|
||||
store.notifyApplication(new RMAppEvent(appId,
|
||||
RMAppEventType.APP_UPDATE_SAVED));
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error updating app: " + appId, e);
|
||||
store.notifyStoreOperationFailed(e);
|
||||
@ -205,8 +207,9 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
|
||||
}
|
||||
store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
|
||||
attemptStateData);
|
||||
store.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
|
||||
null);
|
||||
store.notifyApplicationAttempt(new RMAppAttemptEvent
|
||||
(attemptState.getAttemptId(),
|
||||
RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
|
||||
store.notifyStoreOperationFailed(e);
|
||||
@ -233,8 +236,9 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
|
||||
}
|
||||
store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
|
||||
attemptStateData);
|
||||
store.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
|
||||
null);
|
||||
store.notifyApplicationAttempt(new RMAppAttemptEvent
|
||||
(attemptState.getAttemptId(),
|
||||
RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
|
||||
store.notifyStoreOperationFailed(e);
|
||||
@ -801,47 +805,28 @@ protected void notifyStoreOperationFailed(Exception failureCause) {
|
||||
}
|
||||
rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause));
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
/**
|
||||
* In (@link handleStoreEvent}, this method is called to notify the
|
||||
* application that new application is stored in state store
|
||||
* @param appId id of the application that has been saved
|
||||
* @param storedException the exception that is thrown when storing the
|
||||
* application
|
||||
* This method is called to notify the application that
|
||||
* new application is stored or updated in state store
|
||||
* @param event App event containing the app id and event type
|
||||
*/
|
||||
private void notifyDoneStoringApplication(ApplicationId appId,
|
||||
Exception storedException) {
|
||||
rmDispatcher.getEventHandler().handle(
|
||||
new RMAppNewSavedEvent(appId, storedException));
|
||||
private void notifyApplication(RMAppEvent event) {
|
||||
rmDispatcher.getEventHandler().handle(event);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void notifyDoneUpdatingApplication(ApplicationId appId,
|
||||
Exception storedException) {
|
||||
rmDispatcher.getEventHandler().handle(
|
||||
new RMAppUpdateSavedEvent(appId, storedException));
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
/**
|
||||
* In (@link handleStoreEvent}, this method is called to notify the
|
||||
* application attempt that new attempt is stored in state store
|
||||
* @param appAttempt attempt that has been saved
|
||||
* This method is called to notify the application attempt
|
||||
* that new attempt is stored or updated in state store
|
||||
* @param event App attempt event containing the app attempt
|
||||
* id and event type
|
||||
*/
|
||||
private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
|
||||
Exception storedException) {
|
||||
rmDispatcher.getEventHandler().handle(
|
||||
new RMAppAttemptNewSavedEvent(attemptId, storedException));
|
||||
private void notifyApplicationAttempt(RMAppAttemptEvent event) {
|
||||
rmDispatcher.getEventHandler().handle(event);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId,
|
||||
Exception updatedException) {
|
||||
rmDispatcher.getEventHandler().handle(
|
||||
new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* EventHandler implementation which forward events to the FSRMStateStore
|
||||
* This hides the EventHandle methods of the store from its public interface
|
||||
|
@ -820,17 +820,6 @@ private static final class AddApplicationToSchedulerTransition extends
|
||||
RMAppTransition {
|
||||
@Override
|
||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||
if (event instanceof RMAppNewSavedEvent) {
|
||||
RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
|
||||
// For HA this exception needs to be handled by giving up
|
||||
// master status if we got fenced
|
||||
if (((RMAppNewSavedEvent) event).getStoredException() != null) {
|
||||
LOG.error(
|
||||
"Failed to store application: " + storeEvent.getApplicationId(),
|
||||
storeEvent.getStoredException());
|
||||
ExitUtil.terminate(1, storeEvent.getStoredException());
|
||||
}
|
||||
}
|
||||
app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
|
||||
app.submissionContext.getQueue(), app.user));
|
||||
}
|
||||
@ -848,13 +837,6 @@ private static final class FinalStateSavedTransition implements
|
||||
|
||||
@Override
|
||||
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
||||
RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event;
|
||||
if (storeEvent.getUpdatedException() != null) {
|
||||
LOG.error("Failed to update the final state of application"
|
||||
+ storeEvent.getApplicationId(), storeEvent.getUpdatedException());
|
||||
ExitUtil.terminate(1, storeEvent.getUpdatedException());
|
||||
}
|
||||
|
||||
if (app.transitionTodo instanceof SingleArcTransition) {
|
||||
((SingleArcTransition) app.transitionTodo).transition(app,
|
||||
app.eventCausingFinalSaving);
|
||||
|
@ -1,36 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
||||
public class RMAppNewSavedEvent extends RMAppEvent {
|
||||
|
||||
private final Exception storedException;
|
||||
|
||||
public RMAppNewSavedEvent(ApplicationId appId, Exception storedException) {
|
||||
super(appId, RMAppEventType.APP_NEW_SAVED);
|
||||
this.storedException = storedException;
|
||||
}
|
||||
|
||||
public Exception getStoredException() {
|
||||
return storedException;
|
||||
}
|
||||
|
||||
}
|
@ -1,36 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
||||
public class RMAppUpdateSavedEvent extends RMAppEvent {
|
||||
|
||||
private final Exception updatedException;
|
||||
|
||||
public RMAppUpdateSavedEvent(ApplicationId appId, Exception updatedException) {
|
||||
super(appId, RMAppEventType.APP_UPDATE_SAVED);
|
||||
this.updatedException = updatedException;
|
||||
}
|
||||
|
||||
public Exception getUpdatedException() {
|
||||
return updatedException;
|
||||
}
|
||||
|
||||
}
|
@ -80,11 +80,9 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
@ -905,8 +903,6 @@ private static final class AttemptStoredTransition extends BaseTransition {
|
||||
@Override
|
||||
public void transition(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptEvent event) {
|
||||
appAttempt.checkAttemptStoreError(event);
|
||||
|
||||
appAttempt.launchAttempt();
|
||||
}
|
||||
}
|
||||
@ -1058,14 +1054,6 @@ private static class FinalStateSavedTransition implements
|
||||
@Override
|
||||
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptEvent event) {
|
||||
RMAppAttemptUpdateSavedEvent storeEvent = (RMAppAttemptUpdateSavedEvent) event;
|
||||
if (storeEvent.getUpdatedException() != null) {
|
||||
LOG.error("Failed to update the final state of application attempt: "
|
||||
+ storeEvent.getApplicationAttemptId(),
|
||||
storeEvent.getUpdatedException());
|
||||
ExitUtil.terminate(1, storeEvent.getUpdatedException());
|
||||
}
|
||||
|
||||
RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving;
|
||||
|
||||
if (appAttempt.transitionTodo instanceof SingleArcTransition) {
|
||||
@ -1195,8 +1183,6 @@ private static final class UnmanagedAMAttemptSavedTransition
|
||||
@Override
|
||||
public void transition(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptEvent event) {
|
||||
appAttempt.checkAttemptStoreError(event);
|
||||
|
||||
// create AMRMToken
|
||||
appAttempt.amrmToken =
|
||||
appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
|
||||
@ -1689,18 +1675,6 @@ private void attemptLaunched() {
|
||||
rmContext.getAMLivelinessMonitor().register(getAppAttemptId());
|
||||
}
|
||||
|
||||
private void checkAttemptStoreError(RMAppAttemptEvent event) {
|
||||
RMAppAttemptNewSavedEvent storeEvent = (RMAppAttemptNewSavedEvent) event;
|
||||
if(storeEvent.getStoredException() != null)
|
||||
{
|
||||
// This needs to be handled for HA and give up master status if we got
|
||||
// fenced
|
||||
LOG.error("Failed to store attempt: " + getAppAttemptId(),
|
||||
storeEvent.getStoredException());
|
||||
ExitUtil.terminate(1, storeEvent.getStoredException());
|
||||
}
|
||||
}
|
||||
|
||||
private void storeAttempt() {
|
||||
// store attempt data in a non-blocking manner to prevent dispatcher
|
||||
// thread starvation and wait for state to be saved
|
||||
|
@ -1,39 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||
|
||||
public class RMAppAttemptNewSavedEvent extends RMAppAttemptEvent {
|
||||
|
||||
final Exception storedException;
|
||||
|
||||
public RMAppAttemptNewSavedEvent(ApplicationAttemptId appAttemptId,
|
||||
Exception storedException) {
|
||||
super(appAttemptId, RMAppAttemptEventType.ATTEMPT_NEW_SAVED);
|
||||
this.storedException = storedException;
|
||||
}
|
||||
|
||||
public Exception getStoredException() {
|
||||
return storedException;
|
||||
}
|
||||
|
||||
}
|
@ -1,38 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||
|
||||
public class RMAppAttemptUpdateSavedEvent extends RMAppAttemptEvent {
|
||||
|
||||
final Exception updatedException;
|
||||
|
||||
public RMAppAttemptUpdateSavedEvent(ApplicationAttemptId appAttemptId,
|
||||
Exception updatedException) {
|
||||
super(appAttemptId, RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED);
|
||||
this.updatedException = updatedException;
|
||||
}
|
||||
|
||||
public Exception getUpdatedException() {
|
||||
return updatedException;
|
||||
}
|
||||
}
|
@ -65,8 +65,8 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
@ -77,10 +77,9 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
|
||||
|
||||
static class TestDispatcher implements
|
||||
Dispatcher, EventHandler<RMAppAttemptNewSavedEvent> {
|
||||
Dispatcher, EventHandler<RMAppAttemptEvent> {
|
||||
|
||||
ApplicationAttemptId attemptId;
|
||||
Exception storedException;
|
||||
|
||||
boolean notified = false;
|
||||
|
||||
@ -91,9 +90,8 @@ public void register(Class<? extends Enum> eventType,
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(RMAppAttemptNewSavedEvent event) {
|
||||
public void handle(RMAppAttemptEvent event) {
|
||||
assertEquals(attemptId, event.getApplicationAttemptId());
|
||||
assertEquals(storedException, event.getStoredException());
|
||||
notified = true;
|
||||
synchronized (this) {
|
||||
notifyAll();
|
||||
@ -163,7 +161,6 @@ ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
|
||||
when(mockAttempt.getClientTokenMasterKey())
|
||||
.thenReturn(clientTokenMasterKey);
|
||||
dispatcher.attemptId = attemptId;
|
||||
dispatcher.storedException = null;
|
||||
store.storeNewApplicationAttempt(mockAttempt);
|
||||
waitNotify(dispatcher);
|
||||
return container.getId();
|
||||
|
@ -60,7 +60,6 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
@ -328,15 +327,15 @@ private void assertFailed(RMApp application, String regex) {
|
||||
|
||||
private void sendAppUpdateSavedEvent(RMApp application) {
|
||||
RMAppEvent event =
|
||||
new RMAppUpdateSavedEvent(application.getApplicationId(), null);
|
||||
new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
|
||||
application.handle(event);
|
||||
rmDispatcher.await();
|
||||
}
|
||||
|
||||
private void sendAttemptUpdateSavedEvent(RMApp application) {
|
||||
application.getCurrentAppAttempt().handle(
|
||||
new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
|
||||
.getAppAttemptId(), null));
|
||||
new RMAppAttemptEvent(application.getCurrentAppAttempt().getAppAttemptId(),
|
||||
RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
|
||||
}
|
||||
|
||||
protected RMApp testCreateAppNewSaving(
|
||||
@ -357,7 +356,7 @@ protected RMApp testCreateAppSubmittedNoRecovery(
|
||||
RMApp application = testCreateAppNewSaving(submissionContext);
|
||||
// NEW_SAVING => SUBMITTED event RMAppEventType.APP_SAVED
|
||||
RMAppEvent event =
|
||||
new RMAppNewSavedEvent(application.getApplicationId(), null);
|
||||
new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_NEW_SAVED);
|
||||
application.handle(event);
|
||||
assertStartTimeSet(application);
|
||||
assertAppState(RMAppState.SUBMITTED, application);
|
||||
@ -422,7 +421,7 @@ protected RMApp testCreateAppFinishing(
|
||||
RMApp application = testCreateAppFinalSaving(submissionContext);
|
||||
// FINAL_SAVING => FINISHING event RMAppEventType.APP_UPDATED
|
||||
RMAppEvent appUpdated =
|
||||
new RMAppUpdateSavedEvent(application.getApplicationId(), null);
|
||||
new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
|
||||
application.handle(appUpdated);
|
||||
assertAppState(RMAppState.FINISHING, application);
|
||||
assertTimesAtFinish(application);
|
||||
@ -763,7 +762,7 @@ public void testAppFinalSavingToFinished() throws IOException {
|
||||
application.handle(event);
|
||||
assertAppState(RMAppState.FINAL_SAVING, application);
|
||||
RMAppEvent appUpdated =
|
||||
new RMAppUpdateSavedEvent(application.getApplicationId(), null);
|
||||
new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
|
||||
application.handle(appUpdated);
|
||||
assertAppState(RMAppState.FINISHED, application);
|
||||
|
||||
|
@ -81,10 +81,8 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
@ -570,15 +568,15 @@ private void scheduleApplicationAttempt() {
|
||||
submitApplicationAttempt();
|
||||
applicationAttempt.handle(
|
||||
new RMAppAttemptEvent(
|
||||
applicationAttempt.getAppAttemptId(),
|
||||
applicationAttempt.getAppAttemptId(),
|
||||
RMAppAttemptEventType.ATTEMPT_ADDED));
|
||||
|
||||
if(unmanagedAM){
|
||||
assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
|
||||
applicationAttempt.getAppAttemptState());
|
||||
applicationAttempt.handle(
|
||||
new RMAppAttemptNewSavedEvent(
|
||||
applicationAttempt.getAppAttemptId(), null));
|
||||
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
|
||||
RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
|
||||
}
|
||||
|
||||
testAppAttemptScheduledState();
|
||||
@ -616,8 +614,8 @@ private Container allocateApplicationAttempt() {
|
||||
assertEquals(RMAppAttemptState.ALLOCATED_SAVING,
|
||||
applicationAttempt.getAppAttemptState());
|
||||
applicationAttempt.handle(
|
||||
new RMAppAttemptNewSavedEvent(
|
||||
applicationAttempt.getAppAttemptId(), null));
|
||||
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
|
||||
RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
|
||||
|
||||
testAppAttemptAllocatedState(container);
|
||||
|
||||
@ -696,8 +694,8 @@ private void sendAttemptUpdateSavedEvent(RMAppAttempt applicationAttempt) {
|
||||
assertEquals(RMAppAttemptState.FINAL_SAVING,
|
||||
applicationAttempt.getAppAttemptState());
|
||||
applicationAttempt.handle(
|
||||
new RMAppAttemptUpdateSavedEvent(
|
||||
applicationAttempt.getAppAttemptId(), null));
|
||||
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
|
||||
RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
Reference in New Issue
Block a user