MAPREDUCE-4157. ResourceManager should not kill apps that are well behaved (Jason Lowe via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1363067 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6e56376023
commit
1efd2e1d45
@ -134,6 +134,9 @@ Branch-2 ( Unreleased changes )
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
MAPREDUCE-4157. ResourceManager should not kill apps that are well behaved
|
||||
(Jason Lowe via bobby)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
MAPREDUCE-4422. YARN_APPLICATION_CLASSPATH needs a documented default value in
|
||||
|
@ -51,6 +51,8 @@ public interface RMContext {
|
||||
|
||||
AMLivelinessMonitor getAMLivelinessMonitor();
|
||||
|
||||
AMLivelinessMonitor getAMFinishingMonitor();
|
||||
|
||||
ContainerAllocationExpirer getContainerAllocationExpirer();
|
||||
|
||||
DelegationTokenRenewer getDelegationTokenRenewer();
|
||||
|
@ -49,6 +49,7 @@ public class RMContextImpl implements RMContext {
|
||||
= new ConcurrentHashMap<String, RMNode>();
|
||||
|
||||
private AMLivelinessMonitor amLivelinessMonitor;
|
||||
private AMLivelinessMonitor amFinishingMonitor;
|
||||
private ContainerAllocationExpirer containerAllocationExpirer;
|
||||
private final DelegationTokenRenewer tokenRenewer;
|
||||
private final ApplicationTokenSecretManager appTokenSecretManager;
|
||||
@ -56,12 +57,14 @@ public class RMContextImpl implements RMContext {
|
||||
public RMContextImpl(Store store, Dispatcher rmDispatcher,
|
||||
ContainerAllocationExpirer containerAllocationExpirer,
|
||||
AMLivelinessMonitor amLivelinessMonitor,
|
||||
AMLivelinessMonitor amFinishingMonitor,
|
||||
DelegationTokenRenewer tokenRenewer,
|
||||
ApplicationTokenSecretManager appTokenSecretManager) {
|
||||
this.store = store;
|
||||
this.rmDispatcher = rmDispatcher;
|
||||
this.containerAllocationExpirer = containerAllocationExpirer;
|
||||
this.amLivelinessMonitor = amLivelinessMonitor;
|
||||
this.amFinishingMonitor = amFinishingMonitor;
|
||||
this.tokenRenewer = tokenRenewer;
|
||||
this.appTokenSecretManager = appTokenSecretManager;
|
||||
}
|
||||
@ -106,6 +109,11 @@ public AMLivelinessMonitor getAMLivelinessMonitor() {
|
||||
return this.amLivelinessMonitor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AMLivelinessMonitor getAMFinishingMonitor() {
|
||||
return this.amFinishingMonitor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DelegationTokenRenewer getDelegationTokenRenewer() {
|
||||
return tokenRenewer;
|
||||
|
@ -155,13 +155,16 @@ public synchronized void init(Configuration conf) {
|
||||
AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
|
||||
addService(amLivelinessMonitor);
|
||||
|
||||
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
|
||||
addService(amFinishingMonitor);
|
||||
|
||||
DelegationTokenRenewer tokenRenewer = createDelegationTokenRenewer();
|
||||
addService(tokenRenewer);
|
||||
|
||||
this.rmContext =
|
||||
new RMContextImpl(this.store, this.rmDispatcher,
|
||||
this.containerAllocationExpirer, amLivelinessMonitor, tokenRenewer,
|
||||
this.appTokenSecretManager);
|
||||
this.rmContext = new RMContextImpl(this.store, this.rmDispatcher,
|
||||
this.containerAllocationExpirer,
|
||||
amLivelinessMonitor, amFinishingMonitor,
|
||||
tokenRenewer, this.appTokenSecretManager);
|
||||
|
||||
// Register event handler for NodesListManager
|
||||
this.nodesListManager = new NodesListManager(this.rmContext);
|
||||
|
@ -27,6 +27,7 @@ public enum RMAppEventType {
|
||||
APP_REJECTED,
|
||||
APP_ACCEPTED,
|
||||
ATTEMPT_REGISTERED,
|
||||
ATTEMPT_FINISHING,
|
||||
ATTEMPT_FINISHED, // Will send the final state
|
||||
ATTEMPT_FAILED,
|
||||
ATTEMPT_KILLED,
|
||||
|
@ -147,6 +147,8 @@ RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
||||
// Transitions from RUNNING state
|
||||
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||
.addTransition(RMAppState.RUNNING, RMAppState.FINISHING,
|
||||
RMAppEventType.ATTEMPT_FINISHING, new RMAppFinishingTransition())
|
||||
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
|
||||
RMAppEventType.ATTEMPT_FINISHED, FINAL_TRANSITION)
|
||||
.addTransition(RMAppState.RUNNING,
|
||||
@ -156,12 +158,24 @@ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||
.addTransition(RMAppState.RUNNING, RMAppState.KILLED,
|
||||
RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
||||
|
||||
// Transitions from FINISHING state
|
||||
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
|
||||
RMAppEventType.ATTEMPT_FINISHED, FINAL_TRANSITION)
|
||||
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
|
||||
RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
||||
// ignorable transitions
|
||||
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
||||
RMAppEventType.NODE_UPDATE)
|
||||
|
||||
// Transitions from FINISHED state
|
||||
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
||||
RMAppEventType.KILL)
|
||||
// ignorable transitions
|
||||
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
||||
RMAppEventType.NODE_UPDATE)
|
||||
EnumSet.of(
|
||||
RMAppEventType.NODE_UPDATE,
|
||||
RMAppEventType.ATTEMPT_FINISHING,
|
||||
RMAppEventType.ATTEMPT_FINISHED))
|
||||
|
||||
// Transitions from FAILED state
|
||||
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
|
||||
@ -339,6 +353,7 @@ private YarnApplicationState createApplicationState(RMAppState rmAppState) {
|
||||
return YarnApplicationState.ACCEPTED;
|
||||
case RUNNING:
|
||||
return YarnApplicationState.RUNNING;
|
||||
case FINISHING:
|
||||
case FINISHED:
|
||||
return YarnApplicationState.FINISHED;
|
||||
case KILLED:
|
||||
@ -357,6 +372,7 @@ private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) {
|
||||
case RUNNING:
|
||||
return FinalApplicationStatus.UNDEFINED;
|
||||
// finished without a proper final state is the same as failed
|
||||
case FINISHING:
|
||||
case FINISHED:
|
||||
case FAILED:
|
||||
return FinalApplicationStatus.FAILED;
|
||||
@ -548,6 +564,14 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
||||
};
|
||||
}
|
||||
|
||||
private static final class RMAppFinishingTransition extends
|
||||
RMAppTransition {
|
||||
@Override
|
||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||
app.finishTime = System.currentTimeMillis();
|
||||
}
|
||||
}
|
||||
|
||||
private static class AppKilledTransition extends FinalTransition {
|
||||
@Override
|
||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||
@ -591,7 +615,9 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
||||
app.handler.handle(
|
||||
new RMNodeCleanAppEvent(nodeId, app.applicationId));
|
||||
}
|
||||
app.finishTime = System.currentTimeMillis();
|
||||
if (app.getState() != RMAppState.FINISHING) {
|
||||
app.finishTime = System.currentTimeMillis();
|
||||
}
|
||||
app.handler.handle(
|
||||
new RMAppManagerEvent(app.applicationId,
|
||||
RMAppManagerEventType.APP_COMPLETED));
|
||||
|
@ -19,5 +19,5 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
public enum RMAppState {
|
||||
NEW, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED
|
||||
NEW, SUBMITTED, ACCEPTED, RUNNING, FINISHING, FINISHED, FAILED, KILLED
|
||||
}
|
||||
|
@ -197,7 +197,8 @@ RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
|
||||
new FinalTransition(RMAppAttemptState.KILLED))
|
||||
|
||||
// Transitions from RUNNING State
|
||||
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINISHED,
|
||||
.addTransition(RMAppAttemptState.RUNNING,
|
||||
EnumSet.of(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED),
|
||||
RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
|
||||
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
|
||||
RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
|
||||
@ -233,6 +234,21 @@ RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
|
||||
RMAppAttemptEventType.CONTAINER_ALLOCATED,
|
||||
RMAppAttemptEventType.CONTAINER_FINISHED))
|
||||
|
||||
// Transitions from FINISHING State
|
||||
.addTransition(RMAppAttemptState.FINISHING,
|
||||
EnumSet.of(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED),
|
||||
RMAppAttemptEventType.CONTAINER_FINISHED,
|
||||
new AMFinishingContainerFinishedTransition())
|
||||
.addTransition(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED,
|
||||
RMAppAttemptEventType.EXPIRE,
|
||||
new FinalTransition(RMAppAttemptState.FINISHED))
|
||||
.addTransition(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHING,
|
||||
EnumSet.of(
|
||||
RMAppAttemptEventType.UNREGISTERED,
|
||||
RMAppAttemptEventType.STATUS_UPDATE,
|
||||
RMAppAttemptEventType.CONTAINER_ALLOCATED,
|
||||
RMAppAttemptEventType.KILL))
|
||||
|
||||
// Transitions from FINISHED State
|
||||
.addTransition(
|
||||
RMAppAttemptState.FINISHED,
|
||||
@ -830,6 +846,8 @@ public void transition(RMAppAttemptImpl appAttempt,
|
||||
// UnRegister from AMLivelinessMonitor
|
||||
appAttempt.rmContext.getAMLivelinessMonitor().unregister(
|
||||
appAttempt.getAppAttemptId());
|
||||
appAttempt.rmContext.getAMFinishingMonitor().unregister(
|
||||
appAttempt.getAppAttemptId());
|
||||
|
||||
if(!appAttempt.submissionContext.getUnmanagedAM()) {
|
||||
// Tell the launcher to cleanup.
|
||||
@ -874,15 +892,21 @@ public void transition(RMAppAttemptImpl appAttempt,
|
||||
}
|
||||
}
|
||||
|
||||
private static final class AMUnregisteredTransition extends FinalTransition {
|
||||
|
||||
public AMUnregisteredTransition() {
|
||||
super(RMAppAttemptState.FINISHED);
|
||||
}
|
||||
private static final class AMUnregisteredTransition implements
|
||||
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
|
||||
|
||||
@Override
|
||||
public void transition(RMAppAttemptImpl appAttempt,
|
||||
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptEvent event) {
|
||||
ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId();
|
||||
|
||||
appAttempt.rmContext.getAMLivelinessMonitor().unregister(appAttemptId);
|
||||
|
||||
// Remove the AppAttempt from the ApplicationTokenSecretManager
|
||||
appAttempt.rmContext.getApplicationTokenSecretManager()
|
||||
.applicationMasterFinished(appAttemptId);
|
||||
|
||||
appAttempt.progress = 1.0f;
|
||||
|
||||
RMAppAttemptUnregistrationEvent unregisterEvent
|
||||
= (RMAppAttemptUnregistrationEvent) event;
|
||||
@ -892,8 +916,20 @@ public void transition(RMAppAttemptImpl appAttempt,
|
||||
appAttempt.generateProxyUriWithoutScheme(appAttempt.origTrackingUrl);
|
||||
appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus();
|
||||
|
||||
// Tell the app and the scheduler
|
||||
super.transition(appAttempt, event);
|
||||
// Tell the app
|
||||
if (appAttempt.getSubmissionContext().getUnmanagedAM()) {
|
||||
// Unmanaged AMs have no container to wait for, so they skip
|
||||
// the FINISHING state and go straight to FINISHED.
|
||||
new FinalTransition(RMAppAttemptState.FINISHED).transition(
|
||||
appAttempt, event);
|
||||
return RMAppAttemptState.FINISHED;
|
||||
}
|
||||
appAttempt.rmContext.getAMFinishingMonitor().register(appAttemptId);
|
||||
ApplicationId applicationId =
|
||||
appAttempt.getAppAttemptId().getApplicationId();
|
||||
appAttempt.eventHandler.handle(
|
||||
new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_FINISHING));
|
||||
return RMAppAttemptState.FINISHING;
|
||||
}
|
||||
}
|
||||
|
||||
@ -958,6 +994,33 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
||||
}
|
||||
}
|
||||
|
||||
private static final class AMFinishingContainerFinishedTransition
|
||||
implements
|
||||
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
|
||||
|
||||
@Override
|
||||
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptEvent event) {
|
||||
|
||||
RMAppAttemptContainerFinishedEvent containerFinishedEvent
|
||||
= (RMAppAttemptContainerFinishedEvent) event;
|
||||
ContainerStatus containerStatus =
|
||||
containerFinishedEvent.getContainerStatus();
|
||||
|
||||
// Is this container the ApplicationMaster container?
|
||||
if (appAttempt.masterContainer.getId().equals(
|
||||
containerStatus.getContainerId())) {
|
||||
new FinalTransition(RMAppAttemptState.FINISHED).transition(
|
||||
appAttempt, containerFinishedEvent);
|
||||
return RMAppAttemptState.FINISHED;
|
||||
}
|
||||
|
||||
// Normal container.
|
||||
appAttempt.justFinishedContainers.add(containerStatus);
|
||||
return RMAppAttemptState.FINISHING;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getStartTime() {
|
||||
this.readLock.lock();
|
||||
|
@ -19,6 +19,6 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
||||
|
||||
public enum RMAppAttemptState {
|
||||
NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHED,
|
||||
KILLED,
|
||||
NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING,
|
||||
FINISHING, FINISHED, KILLED,
|
||||
}
|
||||
|
@ -65,7 +65,8 @@ public void scheduler() {
|
||||
RMAppState.NEW.toString(),
|
||||
RMAppState.SUBMITTED.toString(),
|
||||
RMAppState.ACCEPTED.toString(),
|
||||
RMAppState.RUNNING.toString()));
|
||||
RMAppState.RUNNING.toString(),
|
||||
RMAppState.FINISHING.toString()));
|
||||
|
||||
ResourceManager rm = getInstance(ResourceManager.class);
|
||||
ResourceScheduler rs = rm.getResourceScheduler();
|
||||
|
@ -18,13 +18,16 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
@ -33,6 +36,7 @@
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
public class MockNM {
|
||||
@ -85,6 +89,20 @@ public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
|
||||
b, ++responseId);
|
||||
}
|
||||
|
||||
public HeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
|
||||
int containerId, ContainerState containerState) throws Exception {
|
||||
HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
|
||||
new HashMap<ApplicationId, List<ContainerStatus>>(1);
|
||||
ContainerStatus amContainerStatus = BuilderUtils.newContainerStatus(
|
||||
BuilderUtils.newContainerId(attemptId, 1),
|
||||
ContainerState.COMPLETE, "Success", 0);
|
||||
ArrayList<ContainerStatus> containerStatusList =
|
||||
new ArrayList<ContainerStatus>(1);
|
||||
containerStatusList.add(amContainerStatus);
|
||||
nodeUpdate.put(attemptId.getApplicationId(), containerStatusList);
|
||||
return nodeHeartbeat(nodeUpdate, true);
|
||||
}
|
||||
|
||||
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||
List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
|
||||
return nodeHeartbeat(conts, isHealthy, ++responseId);
|
||||
|
@ -91,8 +91,11 @@ public static RMContext mockRMContext(int n, long time) {
|
||||
rmDispatcher);
|
||||
AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(
|
||||
rmDispatcher);
|
||||
AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
|
||||
rmDispatcher);
|
||||
return new RMContextImpl(new MemStore(), rmDispatcher,
|
||||
containerAllocationExpirer, amLivelinessMonitor, null, null) {
|
||||
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
||||
null, null) {
|
||||
@Override
|
||||
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
||||
return map;
|
||||
|
@ -92,6 +92,8 @@ public void testAppCleanup() throws Exception {
|
||||
Assert.assertEquals(request, conts.size());
|
||||
|
||||
am.unregisterAppAttempt();
|
||||
HeartbeatResponse resp = nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1,
|
||||
ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
|
||||
int cleanedConts = 0;
|
||||
@ -102,8 +104,7 @@ public void testAppCleanup() throws Exception {
|
||||
//currently only containers are cleaned via this
|
||||
//AM container is cleaned via container launcher
|
||||
waitCount = 0;
|
||||
while ((cleanedConts < 3 || cleanedApps < 1) && waitCount++ < 20) {
|
||||
HeartbeatResponse resp = nm1.nodeHeartbeat(true);
|
||||
while ((cleanedConts < 2 || cleanedApps < 1) && waitCount++ < 20) {
|
||||
contsToClean = resp.getContainersToCleanupList();
|
||||
apps = resp.getApplicationsToCleanupList();
|
||||
LOG.info("Waiting to get cleanup events.. cleanedConts: "
|
||||
@ -111,12 +112,13 @@ public void testAppCleanup() throws Exception {
|
||||
cleanedConts += contsToClean.size();
|
||||
cleanedApps += apps.size();
|
||||
Thread.sleep(1000);
|
||||
resp = nm1.nodeHeartbeat(true);
|
||||
}
|
||||
|
||||
Assert.assertEquals(1, apps.size());
|
||||
Assert.assertEquals(app.getApplicationId(), apps.get(0));
|
||||
Assert.assertEquals(1, cleanedApps);
|
||||
Assert.assertEquals(3, cleanedConts);
|
||||
Assert.assertEquals(2, cleanedConts);
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
@ -33,6 +33,7 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
||||
@ -182,6 +183,10 @@ public void testAMLaunchAndCleanup() throws Exception {
|
||||
am.registerAppAttempt();
|
||||
am.unregisterAppAttempt();
|
||||
|
||||
//complete the AM container to finish the app normally
|
||||
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
|
||||
waitCount = 0;
|
||||
while (containerManager.cleanedup == false && waitCount++ < 20) {
|
||||
LOG.info("Waiting for AM Cleanup to happen..");
|
||||
|
@ -28,6 +28,7 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
@ -72,6 +73,7 @@ public void testAppWithNoContainers() throws Exception {
|
||||
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||
am.registerAppAttempt();
|
||||
am.unregisterAppAttempt();
|
||||
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
rm.stop();
|
||||
}
|
||||
@ -127,6 +129,7 @@ public void testAppOnMultiNode() throws Exception {
|
||||
Assert.assertEquals(10, conts.size());
|
||||
|
||||
am.unregisterAppAttempt();
|
||||
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
|
||||
rm.stop();
|
||||
|
@ -77,7 +77,7 @@ public void setUp() throws Exception {
|
||||
InlineDispatcher rmDispatcher = new InlineDispatcher();
|
||||
|
||||
rmContext =
|
||||
new RMContextImpl(new MemStore(), rmDispatcher, null, null,
|
||||
new RMContextImpl(new MemStore(), rmDispatcher, null, null, null,
|
||||
mock(DelegationTokenRenewer.class), null);
|
||||
scheduler = mock(YarnScheduler.class);
|
||||
doAnswer(
|
||||
|
@ -71,7 +71,7 @@ public void setUp() {
|
||||
// Dispatcher that processes events inline
|
||||
Dispatcher dispatcher = new InlineDispatcher();
|
||||
RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
|
||||
null, null, null);
|
||||
null, null, null, null);
|
||||
dispatcher.register(SchedulerEventType.class,
|
||||
new InlineDispatcher.EmptyEventHandler());
|
||||
dispatcher.register(RMNodeEventType.class,
|
||||
|
@ -65,7 +65,8 @@ public void handle(Event event) {
|
||||
}
|
||||
});
|
||||
RMContext context =
|
||||
new RMContextImpl(new MemStore(), dispatcher, null, null, null, null);
|
||||
new RMContextImpl(new MemStore(), dispatcher, null, null, null,
|
||||
null, null);
|
||||
dispatcher.register(RMNodeEventType.class,
|
||||
new ResourceManager.NodeEventDispatcher(context));
|
||||
NodesListManager nodesListManager = new NodesListManager(context);
|
||||
|
@ -118,10 +118,10 @@ public void setUp() throws Exception {
|
||||
ContainerAllocationExpirer containerAllocationExpirer =
|
||||
mock(ContainerAllocationExpirer.class);
|
||||
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
|
||||
this.rmContext =
|
||||
new RMContextImpl(new MemStore(), rmDispatcher,
|
||||
containerAllocationExpirer, amLivelinessMonitor, null,
|
||||
new ApplicationTokenSecretManager(conf));
|
||||
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
|
||||
this.rmContext = new RMContextImpl(new MemStore(), rmDispatcher,
|
||||
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
||||
null, new ApplicationTokenSecretManager(conf));
|
||||
|
||||
rmDispatcher.register(RMAppAttemptEventType.class,
|
||||
new TestApplicationAttemptEventDispatcher(this.rmContext));
|
||||
@ -278,14 +278,35 @@ protected RMApp testCreateAppRunning(
|
||||
return application;
|
||||
}
|
||||
|
||||
protected RMApp testCreateAppFinishing(
|
||||
ApplicationSubmissionContext submissionContext) throws IOException {
|
||||
// unmanaged AMs don't use the FINISHING state
|
||||
assert submissionContext == null || !submissionContext.getUnmanagedAM();
|
||||
RMApp application = testCreateAppRunning(submissionContext);
|
||||
// RUNNING => FINISHING event RMAppEventType.ATTEMPT_FINISHING
|
||||
RMAppEvent finishingEvent =
|
||||
new RMAppEvent(application.getApplicationId(),
|
||||
RMAppEventType.ATTEMPT_FINISHING);
|
||||
application.handle(finishingEvent);
|
||||
assertAppState(RMAppState.FINISHING, application);
|
||||
assertTimesAtFinish(application);
|
||||
return application;
|
||||
}
|
||||
|
||||
protected RMApp testCreateAppFinished(
|
||||
ApplicationSubmissionContext submissionContext) throws IOException {
|
||||
RMApp application = testCreateAppRunning(submissionContext);
|
||||
// RUNNING => FINISHED event RMAppEventType.ATTEMPT_FINISHED
|
||||
RMAppEvent event =
|
||||
// unmanaged AMs don't use the FINISHING state
|
||||
RMApp application = null;
|
||||
if (submissionContext != null && submissionContext.getUnmanagedAM()) {
|
||||
application = testCreateAppRunning(submissionContext);
|
||||
} else {
|
||||
application = testCreateAppFinishing(submissionContext);
|
||||
}
|
||||
// RUNNING/FINISHING => FINISHED event RMAppEventType.ATTEMPT_FINISHED
|
||||
RMAppEvent finishedEvent =
|
||||
new RMAppEvent(application.getApplicationId(),
|
||||
RMAppEventType.ATTEMPT_FINISHED);
|
||||
application.handle(event);
|
||||
application.handle(finishedEvent);
|
||||
assertAppState(RMAppState.FINISHED, application);
|
||||
assertTimesAtFinish(application);
|
||||
// finished without a proper unregister implies failed
|
||||
@ -468,6 +489,17 @@ public void testAppRunningFailed() throws IOException {
|
||||
assertFailed(application, ".*Failing the application.*");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppFinishingKill() throws IOException {
|
||||
LOG.info("--- START: testAppFinishedFinished ---");
|
||||
|
||||
RMApp application = testCreateAppFinishing(null);
|
||||
// FINISHING => FINISHED event RMAppEventType.KILL
|
||||
RMAppEvent event =
|
||||
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
||||
application.handle(event);
|
||||
assertAppState(RMAppState.FINISHED, application);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppFinishedFinished() throws IOException {
|
||||
|
@ -39,6 +39,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
@ -71,6 +72,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -87,6 +89,7 @@ public class TestRMAppAttemptTransitions {
|
||||
private ApplicationMasterService masterService;
|
||||
private ApplicationMasterLauncher applicationMasterLauncher;
|
||||
private AMLivelinessMonitor amLivelinessMonitor;
|
||||
private AMLivelinessMonitor amFinishingMonitor;
|
||||
|
||||
private RMApp application;
|
||||
private RMAppAttempt applicationAttempt;
|
||||
@ -150,10 +153,10 @@ public void setUp() throws Exception {
|
||||
ContainerAllocationExpirer containerAllocationExpirer =
|
||||
mock(ContainerAllocationExpirer.class);
|
||||
amLivelinessMonitor = mock(AMLivelinessMonitor.class);
|
||||
rmContext =
|
||||
new RMContextImpl(new MemStore(), rmDispatcher,
|
||||
containerAllocationExpirer, amLivelinessMonitor, null,
|
||||
new ApplicationTokenSecretManager(new Configuration()));
|
||||
amFinishingMonitor = mock(AMLivelinessMonitor.class);
|
||||
rmContext = new RMContextImpl(new MemStore(), rmDispatcher,
|
||||
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
||||
null, new ApplicationTokenSecretManager(new Configuration()));
|
||||
|
||||
scheduler = mock(YarnScheduler.class);
|
||||
masterService = mock(ApplicationMasterService.class);
|
||||
@ -366,6 +369,23 @@ private void testAppAttemptRunningState(Container container,
|
||||
// TODO - need to add more checks relevant to this state
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link RMAppAttemptState#FINISHING}
|
||||
*/
|
||||
private void testAppAttemptFinishingState(Container container,
|
||||
FinalApplicationStatus finalStatus,
|
||||
String trackingUrl,
|
||||
String diagnostics) {
|
||||
assertEquals(RMAppAttemptState.FINISHING,
|
||||
applicationAttempt.getAppAttemptState());
|
||||
assertEquals(diagnostics, applicationAttempt.getDiagnostics());
|
||||
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
|
||||
assertEquals("null/proxy/"+applicationAttempt.getAppAttemptId().
|
||||
getApplicationId()+"/", applicationAttempt.getTrackingUrl());
|
||||
assertEquals(container, applicationAttempt.getMasterContainer());
|
||||
assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link RMAppAttemptState#FINISHED}
|
||||
*/
|
||||
@ -408,6 +428,8 @@ private Container allocateApplicationAttempt() {
|
||||
|
||||
// Mock the allocation of AM container
|
||||
Container container = mock(Container.class);
|
||||
when(container.getId()).thenReturn(
|
||||
BuilderUtils.newContainerId(applicationAttempt.getAppAttemptId(), 1));
|
||||
Allocation allocation = mock(Allocation.class);
|
||||
when(allocation.getContainers()).
|
||||
thenReturn(Collections.singletonList(container));
|
||||
@ -447,6 +469,18 @@ private void runApplicationAttempt(Container container,
|
||||
|
||||
testAppAttemptRunningState(container, host, rpcPort, trackingUrl);
|
||||
}
|
||||
|
||||
private void unregisterApplicationAttempt(Container container,
|
||||
FinalApplicationStatus finalStatus, String trackingUrl,
|
||||
String diagnostics) {
|
||||
applicationAttempt.handle(
|
||||
new RMAppAttemptUnregistrationEvent(
|
||||
applicationAttempt.getAppAttemptId(),
|
||||
trackingUrl, finalStatus, diagnostics));
|
||||
testAppAttemptFinishingState(container, finalStatus,
|
||||
trackingUrl, diagnostics);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testUnmanagedAMSuccess() {
|
||||
@ -553,36 +587,99 @@ public void testAllocatedToFailed() {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnregisterToKilledFinish() {
|
||||
public void testUnregisterToKilledFinishing() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||
String trackingUrl = "newtrackingurl";
|
||||
String diagnostics = "Killed by user";
|
||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.KILLED;
|
||||
applicationAttempt.handle(
|
||||
new RMAppAttemptUnregistrationEvent(
|
||||
applicationAttempt.getAppAttemptId(),
|
||||
trackingUrl, finalStatus, diagnostics));
|
||||
testAppAttemptFinishedState(amContainer, finalStatus,
|
||||
trackingUrl, diagnostics, 0);
|
||||
unregisterApplicationAttempt(amContainer,
|
||||
FinalApplicationStatus.KILLED, "newtrackingurl",
|
||||
"Killed by user");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testUnregisterToSuccessfulFinish() {
|
||||
|
||||
@Test
|
||||
public void testUnregisterToSuccessfulFinishing() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||
unregisterApplicationAttempt(amContainer,
|
||||
FinalApplicationStatus.SUCCEEDED, "mytrackingurl", "Successful");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFinishingKill() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED;
|
||||
String trackingUrl = "newtrackingurl";
|
||||
String diagnostics = "Job failed";
|
||||
unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
|
||||
diagnostics);
|
||||
applicationAttempt.handle(
|
||||
new RMAppAttemptEvent(
|
||||
applicationAttempt.getAppAttemptId(),
|
||||
RMAppAttemptEventType.KILL));
|
||||
testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl,
|
||||
diagnostics);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFinishingExpire() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
||||
String trackingUrl = "mytrackingurl";
|
||||
String diagnostics = "Successful";
|
||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
||||
unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
|
||||
diagnostics);
|
||||
applicationAttempt.handle(
|
||||
new RMAppAttemptUnregistrationEvent(
|
||||
applicationAttempt.getAppAttemptId(),
|
||||
trackingUrl, finalStatus, diagnostics));
|
||||
testAppAttemptFinishedState(amContainer, finalStatus,
|
||||
trackingUrl, diagnostics, 0);
|
||||
new RMAppAttemptEvent(
|
||||
applicationAttempt.getAppAttemptId(),
|
||||
RMAppAttemptEventType.EXPIRE));
|
||||
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
|
||||
diagnostics, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFinishingToFinishing() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
||||
String trackingUrl = "mytrackingurl";
|
||||
String diagnostics = "Successful";
|
||||
unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
|
||||
diagnostics);
|
||||
// container must be AM container to move from FINISHING to FINISHED
|
||||
applicationAttempt.handle(
|
||||
new RMAppAttemptContainerFinishedEvent(
|
||||
applicationAttempt.getAppAttemptId(),
|
||||
BuilderUtils.newContainerStatus(
|
||||
BuilderUtils.newContainerId(
|
||||
applicationAttempt.getAppAttemptId(), 42),
|
||||
ContainerState.COMPLETE, "", 0)));
|
||||
testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl,
|
||||
diagnostics);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuccessfulFinishingToFinished() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
||||
String trackingUrl = "mytrackingurl";
|
||||
String diagnostics = "Successful";
|
||||
unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
|
||||
diagnostics);
|
||||
applicationAttempt.handle(
|
||||
new RMAppAttemptContainerFinishedEvent(
|
||||
applicationAttempt.getAppAttemptId(),
|
||||
BuilderUtils.newContainerStatus(amContainer.getId(),
|
||||
ContainerState.COMPLETE, "", 0)));
|
||||
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
|
||||
diagnostics, 0);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -80,7 +80,7 @@ public EventHandler getEventHandler() {
|
||||
new ContainerAllocationExpirer(nullDispatcher);
|
||||
|
||||
RMContext rmContext =
|
||||
new RMContextImpl(null, nullDispatcher, cae, null, null,
|
||||
new RMContextImpl(null, nullDispatcher, cae, null, null, null,
|
||||
new ApplicationTokenSecretManager(new Configuration()));
|
||||
|
||||
return rmContext;
|
||||
|
@ -86,8 +86,8 @@ public void testFifoSchedulerCapacityWhenNoNMs() {
|
||||
@Test
|
||||
public void testAppAttemptMetrics() throws Exception {
|
||||
AsyncDispatcher dispatcher = new InlineDispatcher();
|
||||
RMContext rmContext =
|
||||
new RMContextImpl(null, dispatcher, null, null, null, null);
|
||||
RMContext rmContext = new RMContextImpl(null, dispatcher, null,
|
||||
null, null, null, null);
|
||||
|
||||
FifoScheduler schedular = new FifoScheduler();
|
||||
schedular.reinitialize(new Configuration(), null, rmContext);
|
||||
|
@ -158,7 +158,8 @@ public static RMContext mockRMContext(int numApps, int racks, int numNodes,
|
||||
for (RMNode node : deactivatedNodes) {
|
||||
deactivatedNodesMap.put(node.getHostName(), node);
|
||||
}
|
||||
return new RMContextImpl(new MemStore(), null, null, null, null, null) {
|
||||
return new RMContextImpl(new MemStore(), null, null, null, null,
|
||||
null, null) {
|
||||
@Override
|
||||
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
||||
return applicationsMaps;
|
||||
|
@ -31,6 +31,7 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
@ -545,6 +546,8 @@ public void testAppsQueryFinishBegin() throws JSONException, Exception {
|
||||
.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
|
||||
am.registerAppAttempt();
|
||||
am.unregisterAppAttempt();
|
||||
amNodeManager.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
|
||||
1, ContainerState.COMPLETE);
|
||||
rm.submitApp(1024);
|
||||
rm.submitApp(1024);
|
||||
|
||||
@ -573,6 +576,8 @@ public void testAppsQueryFinishEnd() throws JSONException, Exception {
|
||||
.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
|
||||
am.registerAppAttempt();
|
||||
am.unregisterAppAttempt();
|
||||
amNodeManager.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
|
||||
1, ContainerState.COMPLETE);
|
||||
|
||||
rm.submitApp(1024);
|
||||
rm.submitApp(1024);
|
||||
@ -605,6 +610,8 @@ public void testAppsQueryFinishBeginEnd() throws JSONException, Exception {
|
||||
.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
|
||||
am.registerAppAttempt();
|
||||
am.unregisterAppAttempt();
|
||||
amNodeManager.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
|
||||
1, ContainerState.COMPLETE);
|
||||
|
||||
rm.submitApp(1024);
|
||||
rm.submitApp(1024);
|
||||
|
Loading…
Reference in New Issue
Block a user