YARN-4051. ContainerKillEvent lost when container is still recovering and application finishes. Contributed by sandflee
This commit is contained in:
parent
cd976b263b
commit
7114baddb6
@ -402,8 +402,9 @@ private void recoverContainer(RecoveredContainerState rcs)
|
||||
LOG.info("Recovering " + containerId + " in state " + rcs.getStatus()
|
||||
+ " with exit code " + rcs.getExitCode());
|
||||
|
||||
if (context.getApplications().containsKey(appId)) {
|
||||
recoverActiveContainer(launchContext, token, rcs);
|
||||
Application app = context.getApplications().get(appId);
|
||||
if (app != null) {
|
||||
recoverActiveContainer(app, launchContext, token, rcs);
|
||||
if (rcs.getRecoveryType() == RecoveredContainerType.KILL) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerKillEvent(containerId, ContainerExitStatus.ABORTED,
|
||||
@ -423,7 +424,7 @@ private void recoverContainer(RecoveredContainerState rcs)
|
||||
* Recover a running container.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void recoverActiveContainer(
|
||||
protected void recoverActiveContainer(Application app,
|
||||
ContainerLaunchContext launchContext, ContainerTokenIdentifier token,
|
||||
RecoveredContainerState rcs) throws IOException {
|
||||
Credentials credentials = YarnServerSecurityUtils.parseCredentials(
|
||||
@ -431,8 +432,7 @@ protected void recoverActiveContainer(
|
||||
Container container = new ContainerImpl(getConfig(), dispatcher,
|
||||
launchContext, credentials, metrics, token, context, rcs);
|
||||
context.getContainers().put(token.getContainerID(), container);
|
||||
dispatcher.getEventHandler().handle(new ApplicationContainerInitEvent(
|
||||
container));
|
||||
app.handle(new ApplicationContainerInitEvent(container));
|
||||
}
|
||||
|
||||
private void waitForRecoveredContainers() throws InterruptedException {
|
||||
@ -1286,6 +1286,10 @@ protected void stopContainerInternal(ContainerId containerID)
|
||||
+ " is not handled by this NodeManager");
|
||||
}
|
||||
} else {
|
||||
if (container.isRecovering()) {
|
||||
throw new NMNotYetReadyException("Container " + containerIDStr
|
||||
+ " is recovering, try later");
|
||||
}
|
||||
context.getNMStateStore().storeContainerKilled(containerID);
|
||||
container.sendKillEvent(ContainerExitStatus.KILLED_BY_APPMASTER,
|
||||
"Container killed by the ApplicationMaster.");
|
||||
@ -1455,6 +1459,21 @@ public void handle(ContainerManagerEvent event) {
|
||||
+ " FINISH_APPS event");
|
||||
continue;
|
||||
}
|
||||
|
||||
boolean shouldDropEvent = false;
|
||||
for (Container container : app.getContainers().values()) {
|
||||
if (container.isRecovering()) {
|
||||
LOG.info("drop FINISH_APPS event to " + appID + " because "
|
||||
+ "container " + container.getContainerId()
|
||||
+ " is recovering");
|
||||
shouldDropEvent = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (shouldDropEvent) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String diagnostic = "";
|
||||
if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN) {
|
||||
diagnostic = "Application killed on shutdown";
|
||||
@ -1469,10 +1488,32 @@ public void handle(ContainerManagerEvent event) {
|
||||
case FINISH_CONTAINERS:
|
||||
CMgrCompletedContainersEvent containersFinishedEvent =
|
||||
(CMgrCompletedContainersEvent) event;
|
||||
for (ContainerId container : containersFinishedEvent
|
||||
for (ContainerId containerId : containersFinishedEvent
|
||||
.getContainersToCleanup()) {
|
||||
this.dispatcher.getEventHandler().handle(
|
||||
new ContainerKillEvent(container,
|
||||
ApplicationId appId =
|
||||
containerId.getApplicationAttemptId().getApplicationId();
|
||||
Application app = this.context.getApplications().get(appId);
|
||||
if (app == null) {
|
||||
LOG.warn("couldn't find app " + appId + " while processing"
|
||||
+ " FINISH_CONTAINERS event");
|
||||
continue;
|
||||
}
|
||||
|
||||
Container container = app.getContainers().get(containerId);
|
||||
if (container == null) {
|
||||
LOG.warn("couldn't find container " + containerId
|
||||
+ " while processing FINISH_CONTAINERS event");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (container.isRecovering()) {
|
||||
LOG.info("drop FINISH_CONTAINERS event to " + containerId
|
||||
+ " because container is recovering");
|
||||
continue;
|
||||
}
|
||||
|
||||
this.dispatcher.getEventHandler().handle(
|
||||
new ContainerKillEvent(containerId,
|
||||
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
|
||||
"Container Killed by ResourceManager"));
|
||||
}
|
||||
|
@ -20,8 +20,8 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
@ -89,7 +89,7 @@ public class ApplicationImpl implements Application {
|
||||
private LogAggregationContext logAggregationContext;
|
||||
|
||||
Map<ContainerId, Container> containers =
|
||||
new HashMap<ContainerId, Container>();
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* The timestamp when the log aggregation has started for this application.
|
||||
|
@ -92,4 +92,6 @@ public interface Container extends EventHandler<ContainerEvent> {
|
||||
void sendLaunchEvent();
|
||||
|
||||
void sendKillEvent(int exitStatus, String description);
|
||||
|
||||
boolean isRecovering();
|
||||
}
|
||||
|
@ -1756,4 +1756,12 @@ public boolean canRollback() {
|
||||
public void commitUpgrade() {
|
||||
this.reInitContext = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRecovering() {
|
||||
boolean isRecovering = (
|
||||
recoveredStatus != RecoveredContainerStatus.REQUESTED &&
|
||||
getContainerState() == ContainerState.NEW);
|
||||
return isRecovering;
|
||||
}
|
||||
}
|
||||
|
@ -86,6 +86,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
@ -248,8 +249,8 @@ public void testApplicationRecovery() throws Exception {
|
||||
// simulate application completion
|
||||
List<ApplicationId> finishedApps = new ArrayList<ApplicationId>();
|
||||
finishedApps.add(appId);
|
||||
cm.handle(new CMgrCompletedAppsEvent(finishedApps,
|
||||
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
|
||||
app.handle(new ApplicationFinishEvent(
|
||||
appId, "Application killed by ResourceManager"));
|
||||
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
|
||||
|
||||
// restart and verify app is marked for finishing
|
||||
@ -263,8 +264,8 @@ public void testApplicationRecovery() throws Exception {
|
||||
assertNotNull(app);
|
||||
// no longer saving FINISH_APP event in NM stateStore,
|
||||
// simulate by resending FINISH_APP event
|
||||
cm.handle(new CMgrCompletedAppsEvent(finishedApps,
|
||||
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
|
||||
app.handle(new ApplicationFinishEvent(
|
||||
appId, "Application killed by ResourceManager"));
|
||||
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
|
||||
assertTrue(context.getApplicationACLsManager().checkAccess(
|
||||
UserGroupInformation.createRemoteUser(modUser),
|
||||
@ -335,8 +336,8 @@ public void testNMRecoveryForAppFinishedWithLogAggregationFailure()
|
||||
// simulate application completion
|
||||
List<ApplicationId> finishedApps = new ArrayList<ApplicationId>();
|
||||
finishedApps.add(appId);
|
||||
cm.handle(new CMgrCompletedAppsEvent(finishedApps,
|
||||
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
|
||||
app.handle(new ApplicationFinishEvent(
|
||||
appId, "Application killed by ResourceManager"));
|
||||
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
|
||||
|
||||
app.handle(new ApplicationEvent(app.getAppId(),
|
||||
@ -357,8 +358,9 @@ public void testNMRecoveryForAppFinishedWithLogAggregationFailure()
|
||||
|
||||
// no longer saving FINISH_APP event in NM stateStore,
|
||||
// simulate by resending FINISH_APP event
|
||||
cm.handle(new CMgrCompletedAppsEvent(finishedApps,
|
||||
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
|
||||
app.handle(new ApplicationFinishEvent(
|
||||
appId, "Application killed by ResourceManager"));
|
||||
|
||||
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
|
||||
// TODO need to figure out why additional APPLICATION_RESOURCES_CLEANEDUP
|
||||
// is needed.
|
||||
|
@ -230,4 +230,9 @@ public void sendLaunchEvent() {
|
||||
public void sendKillEvent(int exitStatus, String description) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRecovering() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user