YARN-6059. Update paused container state in the NM state store. (Hitesh Sharma via asuresh)
This commit is contained in:
parent
4f8194430f
commit
66ca0a6540
@ -836,10 +836,18 @@ public void sendLaunchEvent() {
|
||||
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
private void sendScheduleEvent() {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerSchedulerEvent(this,
|
||||
ContainerSchedulerEventType.SCHEDULE_CONTAINER)
|
||||
);
|
||||
if (recoveredStatus == RecoveredContainerStatus.PAUSED) {
|
||||
// Recovery is not supported for paused container so we raise the
|
||||
// launch event which will proceed to kill the paused container instead
|
||||
// of raising the schedule event.
|
||||
ContainersLauncherEventType launcherEvent;
|
||||
launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER;
|
||||
dispatcher.getEventHandler()
|
||||
.handle(new ContainersLauncherEvent(this, launcherEvent));
|
||||
} else {
|
||||
dispatcher.getEventHandler().handle(new ContainerSchedulerEvent(this,
|
||||
ContainerSchedulerEventType.SCHEDULE_CONTAINER));
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
|
@ -837,6 +837,14 @@ public void pauseContainer() throws IOException {
|
||||
dispatcher.getEventHandler().handle(new ContainerEvent(
|
||||
containerId,
|
||||
ContainerEventType.CONTAINER_PAUSED));
|
||||
|
||||
try {
|
||||
this.context.getNMStateStore().storeContainerPaused(
|
||||
container.getContainerId());
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Could not store container [" + container.getContainerId()
|
||||
+ "] state. The Container has been paused.", e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
String message =
|
||||
"Exception when trying to pause container " + containerIdStr
|
||||
@ -873,12 +881,20 @@ public void resumeContainer() throws IOException {
|
||||
|
||||
// If the container has already started
|
||||
try {
|
||||
exec.resumeContainer(container);
|
||||
// ResumeContainer is a blocking call. We are here almost means the
|
||||
// container is resumed, so send out the event.
|
||||
dispatcher.getEventHandler().handle(new ContainerEvent(
|
||||
containerId,
|
||||
ContainerEventType.CONTAINER_RESUMED));
|
||||
exec.resumeContainer(container);
|
||||
// ResumeContainer is a blocking call. We are here almost means the
|
||||
// container is resumed, so send out the event.
|
||||
dispatcher.getEventHandler().handle(new ContainerEvent(
|
||||
containerId,
|
||||
ContainerEventType.CONTAINER_RESUMED));
|
||||
|
||||
try {
|
||||
this.context.getNMStateStore().removeContainerPaused(
|
||||
container.getContainerId());
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Could not store container [" + container.getContainerId()
|
||||
+ "] state. The Container has been resumed.", e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
String message =
|
||||
"Exception when trying to resume container " + containerIdStr
|
||||
|
@ -139,6 +139,16 @@ public void handle(ContainersLauncherEvent event) {
|
||||
containerLauncher.submit(launch);
|
||||
running.put(containerId, launch);
|
||||
break;
|
||||
case RECOVER_PAUSED_CONTAINER:
|
||||
// Recovery for paused containers is not supported, thus here
|
||||
// we locate any paused containers, and terminate them.
|
||||
app = context.getApplications().get(
|
||||
containerId.getApplicationAttemptId().getApplicationId());
|
||||
launch = new RecoverPausedContainerLaunch(context, getConfig(),
|
||||
dispatcher, exec, app, event.getContainer(), dirsHandler,
|
||||
containerManager);
|
||||
containerLauncher.submit(launch);
|
||||
break;
|
||||
case CLEANUP_CONTAINER:
|
||||
case CLEANUP_CONTAINER_FOR_REINIT:
|
||||
ContainerLaunch launcher = running.remove(containerId);
|
||||
|
@ -26,6 +26,7 @@ public enum ContainersLauncherEventType {
|
||||
CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself.
|
||||
SIGNAL_CONTAINER,
|
||||
PAUSE_CONTAINER,
|
||||
RESUME_CONTAINER
|
||||
RESUME_CONTAINER,
|
||||
RECOVER_PAUSED_CONTAINER
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,124 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.*;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
|
||||
/**
|
||||
* This is a ContainerLaunch which has been recovered after an NM restart for
|
||||
* pause containers (for rolling upgrades)
|
||||
*/
|
||||
public class RecoverPausedContainerLaunch extends ContainerLaunch {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(
|
||||
RecoveredContainerLaunch.class);
|
||||
|
||||
public RecoverPausedContainerLaunch(Context context,
|
||||
Configuration configuration, Dispatcher dispatcher,
|
||||
ContainerExecutor exec, Application app, Container container,
|
||||
LocalDirsHandlerService dirsHandler,
|
||||
ContainerManagerImpl containerManager) {
|
||||
super(context, configuration, dispatcher, exec, app, container, dirsHandler,
|
||||
containerManager);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup the paused container by issuing a kill on it.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Integer call() {
|
||||
int retCode = ContainerExecutor.ExitCode.LOST.getExitCode();
|
||||
ContainerId containerId = container.getContainerId();
|
||||
String appIdStr =
|
||||
containerId.getApplicationAttemptId().getApplicationId().toString();
|
||||
String containerIdStr = containerId.toString();
|
||||
|
||||
boolean notInterrupted = true;
|
||||
try {
|
||||
File pidFile = locatePidFile(appIdStr, containerIdStr);
|
||||
if (pidFile != null) {
|
||||
String pidPathStr = pidFile.getPath();
|
||||
pidFilePath = new Path(pidPathStr);
|
||||
exec.activateContainer(containerId, pidFilePath);
|
||||
exec.signalContainer(new ContainerSignalContext.Builder()
|
||||
.setContainer(container)
|
||||
.setUser(container.getUser())
|
||||
.setSignal(ContainerExecutor.Signal.KILL)
|
||||
.build());
|
||||
} else {
|
||||
LOG.warn("Unable to locate pid file for container " + containerIdStr);
|
||||
}
|
||||
|
||||
} catch (InterruptedIOException e) {
|
||||
LOG.warn("Interrupted while waiting for exit code from " + containerId);
|
||||
notInterrupted = false;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to kill the paused container " + containerIdStr, e);
|
||||
} finally {
|
||||
if (notInterrupted) {
|
||||
this.completed.set(true);
|
||||
exec.deactivateContainer(containerId);
|
||||
try {
|
||||
getContext().getNMStateStore()
|
||||
.storeContainerCompleted(containerId, retCode);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to set exit code for container " + containerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG.warn("Recovered container exited with a non-zero exit code "
|
||||
+ retCode);
|
||||
this.dispatcher.getEventHandler().handle(new ContainerExitEvent(
|
||||
containerId,
|
||||
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, retCode,
|
||||
"Container exited with a non-zero exit code " + retCode));
|
||||
|
||||
return retCode;
|
||||
}
|
||||
|
||||
private File locatePidFile(String appIdStr, String containerIdStr) {
|
||||
String pidSubpath= getPidFileSubpath(appIdStr, containerIdStr);
|
||||
for (String dir : getContext().getLocalDirsHandler().
|
||||
getLocalDirsForRead()) {
|
||||
File pidFile = new File(dir, pidSubpath);
|
||||
if (pidFile.exists()) {
|
||||
return pidFile;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
@ -40,10 +40,9 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
|
||||
|
||||
|
||||
/**
|
||||
* This is a ContainerLaunch which has been recovered after an NM restart (for
|
||||
* rolling upgrades)
|
||||
* rolling upgrades).
|
||||
*/
|
||||
public class RecoveredContainerLaunch extends ContainerLaunch {
|
||||
|
||||
|
@ -119,6 +119,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||
private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
|
||||
private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
|
||||
private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued";
|
||||
private static final String CONTAINER_PAUSED_KEY_SUFFIX = "/paused";
|
||||
private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX =
|
||||
"/resourceChanged";
|
||||
private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
|
||||
@ -272,9 +273,16 @@ private RecoveredContainerState loadContainerState(ContainerId containerId,
|
||||
if (rcs.status == RecoveredContainerStatus.REQUESTED) {
|
||||
rcs.status = RecoveredContainerStatus.QUEUED;
|
||||
}
|
||||
} else if (suffix.equals(CONTAINER_PAUSED_KEY_SUFFIX)) {
|
||||
if ((rcs.status == RecoveredContainerStatus.LAUNCHED)
|
||||
||(rcs.status == RecoveredContainerStatus.QUEUED)
|
||||
||(rcs.status == RecoveredContainerStatus.REQUESTED)) {
|
||||
rcs.status = RecoveredContainerStatus.PAUSED;
|
||||
}
|
||||
} else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
|
||||
if ((rcs.status == RecoveredContainerStatus.REQUESTED)
|
||||
|| (rcs.status == RecoveredContainerStatus.QUEUED)) {
|
||||
|| (rcs.status == RecoveredContainerStatus.QUEUED)
|
||||
||(rcs.status == RecoveredContainerStatus.PAUSED)) {
|
||||
rcs.status = RecoveredContainerStatus.LAUNCHED;
|
||||
}
|
||||
} else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) {
|
||||
@ -366,6 +374,37 @@ public void storeContainerQueued(ContainerId containerId) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerPaused(ContainerId containerId) throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("storeContainerPaused: containerId=" + containerId);
|
||||
}
|
||||
|
||||
String key = CONTAINERS_KEY_PREFIX + containerId.toString()
|
||||
+ CONTAINER_PAUSED_KEY_SUFFIX;
|
||||
try {
|
||||
db.put(bytes(key), EMPTY_VALUE);
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeContainerPaused(ContainerId containerId)
|
||||
throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("removeContainerPaused: containerId=" + containerId);
|
||||
}
|
||||
|
||||
String key = CONTAINERS_KEY_PREFIX + containerId.toString()
|
||||
+ CONTAINER_PAUSED_KEY_SUFFIX;
|
||||
try {
|
||||
db.delete(bytes(key));
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerDiagnostics(ContainerId containerId,
|
||||
StringBuilder diagnostics) throws IOException {
|
||||
@ -510,6 +549,7 @@ public void removeContainer(ContainerId containerId)
|
||||
batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX));
|
||||
batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
|
||||
batch.delete(bytes(keyPrefix + CONTAINER_QUEUED_KEY_SUFFIX));
|
||||
batch.delete(bytes(keyPrefix + CONTAINER_PAUSED_KEY_SUFFIX));
|
||||
batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
|
||||
batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
|
||||
List<String> unknownKeysForContainer = containerUnknownKeySuffixes
|
||||
|
@ -79,6 +79,15 @@ public void storeContainer(ContainerId containerId, int version,
|
||||
public void storeContainerQueued(ContainerId containerId) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerPaused(ContainerId containerId) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeContainerPaused(ContainerId containerId)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerDiagnostics(ContainerId containerId,
|
||||
StringBuilder diagnostics) throws IOException {
|
||||
|
@ -73,7 +73,8 @@ public enum RecoveredContainerStatus {
|
||||
REQUESTED,
|
||||
QUEUED,
|
||||
LAUNCHED,
|
||||
COMPLETED
|
||||
COMPLETED,
|
||||
PAUSED
|
||||
}
|
||||
|
||||
public static class RecoveredContainerState {
|
||||
@ -349,9 +350,9 @@ public boolean isNewlyCreated() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the state of applications
|
||||
* @return recovered state for applications
|
||||
* @throws IOException
|
||||
* Load the state of applications.
|
||||
* @return recovered state for applications.
|
||||
* @throws IOException IO Exception.
|
||||
*/
|
||||
public abstract RecoveredApplicationsState loadApplicationsState()
|
||||
throws IOException;
|
||||
@ -402,6 +403,23 @@ public abstract void storeContainer(ContainerId containerId,
|
||||
public abstract void storeContainerQueued(ContainerId containerId)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Record that a container has been paused at the NM.
|
||||
* @param containerId the container ID.
|
||||
* @throws IOException IO Exception.
|
||||
*/
|
||||
public abstract void storeContainerPaused(ContainerId containerId)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Record that a container has been resumed at the NM by removing the
|
||||
* fact that it has be paused.
|
||||
* @param containerId the container ID.
|
||||
* @throws IOException IO Exception.
|
||||
*/
|
||||
public abstract void removeContainerPaused(ContainerId containerId)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Record that a container has been launched
|
||||
* @param containerId the container ID
|
||||
|
@ -144,6 +144,19 @@ public void storeContainerQueued(ContainerId containerId) throws IOException {
|
||||
rcs.status = RecoveredContainerStatus.QUEUED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerPaused(ContainerId containerId) throws IOException {
|
||||
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
|
||||
rcs.status = RecoveredContainerStatus.PAUSED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeContainerPaused(ContainerId containerId)
|
||||
throws IOException {
|
||||
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
|
||||
rcs.status = RecoveredContainerStatus.LAUNCHED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void storeContainerDiagnostics(ContainerId containerId,
|
||||
StringBuilder diagnostics) throws IOException {
|
||||
|
@ -289,6 +289,23 @@ public void testContainerStorage() throws IOException {
|
||||
assertEquals(containerReq, rcs.getStartRequest());
|
||||
assertEquals(diags.toString(), rcs.getDiagnostics());
|
||||
|
||||
// pause the container, and verify recovered
|
||||
stateStore.storeContainerPaused(containerId);
|
||||
restartStateStore();
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
rcs = recoveredContainers.get(0);
|
||||
assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus());
|
||||
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
|
||||
assertEquals(false, rcs.getKilled());
|
||||
assertEquals(containerReq, rcs.getStartRequest());
|
||||
|
||||
// Resume the container
|
||||
stateStore.removeContainerPaused(containerId);
|
||||
restartStateStore();
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
|
||||
// increase the container size, and verify recovered
|
||||
stateStore.storeContainerResourceChanged(containerId, 2,
|
||||
Resource.newInstance(2468, 4));
|
||||
|
Loading…
Reference in New Issue
Block a user