diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java index 696fe062a5..45e5bd4df6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java @@ -33,11 +33,14 @@ public enum ContainerState { /** Running container */ RUNNING, - + /** Completed container */ COMPLETE, /** Scheduled (awaiting resources) at the NM. */ @InterfaceStability.Unstable - SCHEDULED + SCHEDULED, + + /** Paused at the NM. */ + PAUSED } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 9933e9ea8e..066441cc3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -110,6 +110,7 @@ enum ContainerStateProto { C_RUNNING = 2; C_COMPLETE = 3; C_SCHEDULED = 4; + C_PAUSED = 5; } message ContainerProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 072cca7a3f..da50d7a3a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -699,6 +699,28 @@ public void deactivateContainer(ContainerId containerId) { } } + /** + * Pause the container. The default implementation is to raise a kill event. + * Specific executor implementations can override this behavior. + * @param container + * the Container + */ + public void pauseContainer(Container container) { + LOG.warn(container.getContainerId() + " doesn't support pausing."); + throw new UnsupportedOperationException(); + } + + /** + * Resume the container from pause state. The default implementation ignores + * this event. Specific implementations can override this behavior. + * @param container + * the Container + */ + public void resumeContainer(Container container) { + LOG.warn(container.getContainerId() + " doesn't support resume."); + throw new UnsupportedOperationException(); + } + /** * Get the process-identifier for the container. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java index afea0e6cbd..147543567d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java @@ -27,6 +27,8 @@ public enum ContainerEventType { CONTAINER_DONE, REINITIALIZE_CONTAINER, ROLLBACK_REINIT, + PAUSE_CONTAINER, + RESUME_CONTAINER, // DownloadManager CONTAINER_INITED, @@ -38,5 +40,7 @@ public enum ContainerEventType { CONTAINER_LAUNCHED, CONTAINER_EXITED_WITH_SUCCESS, CONTAINER_EXITED_WITH_FAILURE, - CONTAINER_KILLED_ON_REQUEST + CONTAINER_KILLED_ON_REQUEST, + CONTAINER_PAUSED, + CONTAINER_RESUMED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 1a48b12d36..7a1237171f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -307,6 +307,8 @@ ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition()) UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.NEW, ContainerState.DONE, ContainerEventType.KILL_CONTAINER, new KillOnNewTransition()) + .addTransition(ContainerState.NEW, ContainerState.DONE, + ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) // From LOCALIZING State .addTransition(ContainerState.LOCALIZING, @@ -322,6 +324,8 @@ ContainerEventType.RESOURCE_LOCALIZED, new LocalizedTransition()) .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillBeforeRunningTransition()) + .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING, + ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) // From LOCALIZATION_FAILED State .addTransition(ContainerState.LOCALIZATION_FAILED, @@ -335,7 +339,8 @@ ContainerEventType.RESOURCE_LOCALIZED, new LocalizedTransition()) // container not launched so kill is a no-op .addTransition(ContainerState.LOCALIZATION_FAILED, ContainerState.LOCALIZATION_FAILED, - ContainerEventType.KILL_CONTAINER) + EnumSet.of(ContainerEventType.KILL_CONTAINER, + ContainerEventType.PAUSE_CONTAINER)) // container cleanup triggers a release of all resources // regardless of whether they were localized or not // LocalizedResource handles release event in all states @@ -391,6 +396,76 @@ ContainerEventType.KILL_CONTAINER, new KillTransition()) ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new KilledExternallyTransition()) + .addTransition(ContainerState.RUNNING, ContainerState.PAUSING, + ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition()) + + // From PAUSING State + .addTransition(ContainerState.PAUSING, ContainerState.KILLING, + ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.PAUSING, ContainerState.PAUSING, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + .addTransition(ContainerState.PAUSING, ContainerState.PAUSED, + ContainerEventType.CONTAINER_PAUSED, new PausedContainerTransition()) + // In case something goes wrong then container will exit from the + // PAUSING state + .addTransition(ContainerState.PAUSING, + ContainerState.EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS) + .addTransition(ContainerState.PAUSING, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) + .addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + new KilledExternallyTransition()) + + // From PAUSED State + .addTransition(ContainerState.PAUSED, ContainerState.KILLING, + ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.PAUSED, ContainerState.PAUSED, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + .addTransition(ContainerState.PAUSED, ContainerState.PAUSED, + ContainerEventType.PAUSE_CONTAINER) + .addTransition(ContainerState.PAUSED, ContainerState.RESUMING, + ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition()) + // In case something goes wrong then container will exit from the + // PAUSED state + .addTransition(ContainerState.PAUSED, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) + .addTransition(ContainerState.PAUSED, ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + new KilledExternallyTransition()) + .addTransition(ContainerState.PAUSED, + ContainerState.EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + new ExitedWithSuccessTransition(true)) + + // From RESUMING State + .addTransition(ContainerState.RESUMING, ContainerState.KILLING, + ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.RESUMING, ContainerState.RUNNING, + ContainerEventType.CONTAINER_RESUMED) + .addTransition(ContainerState.RESUMING, ContainerState.RESUMING, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + // In case something goes wrong then container will exit from the + // RESUMING state + .addTransition(ContainerState.RESUMING, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) + .addTransition(ContainerState.RESUMING, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + new KilledExternallyTransition()) + .addTransition(ContainerState.RESUMING, + ContainerState.EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + new ExitedWithSuccessTransition(true)) // From REINITIALIZING State .addTransition(ContainerState.REINITIALIZING, @@ -414,6 +489,8 @@ ContainerEventType.KILL_CONTAINER, new KillTransition()) UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING, + ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) .addTransition(ContainerState.REINITIALIZING, ContainerState.SCHEDULED, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, @@ -431,6 +508,8 @@ ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition()) UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING, + ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) // From CONTAINER_EXITED_WITH_SUCCESS State .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE, @@ -442,7 +521,8 @@ ContainerEventType.KILL_CONTAINER, new KillTransition()) UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.EXITED_WITH_SUCCESS, - ContainerEventType.KILL_CONTAINER) + EnumSet.of(ContainerEventType.KILL_CONTAINER, + ContainerEventType.PAUSE_CONTAINER)) // From EXITED_WITH_FAILURE State .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE, @@ -454,7 +534,8 @@ ContainerEventType.KILL_CONTAINER, new KillTransition()) UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.EXITED_WITH_FAILURE, - ContainerEventType.KILL_CONTAINER) + EnumSet.of(ContainerEventType.KILL_CONTAINER, + ContainerEventType.PAUSE_CONTAINER)) // From KILLING State. .addTransition(ContainerState.KILLING, @@ -488,7 +569,8 @@ ContainerEventType.KILL_CONTAINER, new KillTransition()) // in the container launcher .addTransition(ContainerState.KILLING, ContainerState.KILLING, - ContainerEventType.CONTAINER_LAUNCHED) + EnumSet.of(ContainerEventType.CONTAINER_LAUNCHED, + ContainerEventType.PAUSE_CONTAINER)) // From CONTAINER_CLEANEDUP_AFTER_KILL State. .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, @@ -504,11 +586,13 @@ ContainerEventType.KILL_CONTAINER, new KillTransition()) EnumSet.of(ContainerEventType.KILL_CONTAINER, ContainerEventType.RESOURCE_FAILED, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, - ContainerEventType.CONTAINER_EXITED_WITH_FAILURE)) + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + ContainerEventType.PAUSE_CONTAINER)) // From DONE .addTransition(ContainerState.DONE, ContainerState.DONE, - ContainerEventType.KILL_CONTAINER) + EnumSet.of(ContainerEventType.KILL_CONTAINER, + ContainerEventType.PAUSE_CONTAINER)) .addTransition(ContainerState.DONE, ContainerState.DONE, ContainerEventType.INIT_CONTAINER) .addTransition(ContainerState.DONE, ContainerState.DONE, @@ -534,6 +618,8 @@ public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() { case LOCALIZING: case LOCALIZATION_FAILED: case SCHEDULED: + case PAUSED: + case RESUMING: return org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED; case RUNNING: case RELAUNCHING: @@ -543,6 +629,7 @@ public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() { case KILLING: case CONTAINER_CLEANEDUP_AFTER_KILL: case CONTAINER_RESOURCES_CLEANINGUP: + case PAUSING: return org.apache.hadoop.yarn.api.records.ContainerState.RUNNING; case DONE: default: @@ -1500,6 +1587,26 @@ public void transition(ContainerImpl container, ContainerEvent event) { } } + /** + * Transitions upon receiving PAUSE_CONTAINER. + * - LOCALIZED -> KILLING. + * - REINITIALIZING -> KILLING. + */ + @SuppressWarnings("unchecked") // dispatcher not typed + static class KillOnPauseTransition implements + SingleArcTransition { + + @SuppressWarnings("unchecked") + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + // Kill the process/process-grp + container.setIsReInitializing(false); + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.CLEANUP_CONTAINER)); + } + } + /** * Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL * upon receiving CONTAINER_KILLED_ON_REQUEST. @@ -1690,6 +1797,57 @@ public void transition(ContainerImpl container, ContainerEvent event) { } } + /** + * Transitions upon receiving PAUSE_CONTAINER. + * - RUNNING -> PAUSED + */ + @SuppressWarnings("unchecked") // dispatcher not typed + static class PauseContainerTransition implements + SingleArcTransition { + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + // Pause the process/process-grp if it is supported by the container + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.PAUSE_CONTAINER)); + ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event; + container.addDiagnostics(pauseEvent.getDiagnostic(), "\n"); + } + } + + /** + * Transitions upon receiving PAUSED_CONTAINER. + */ + @SuppressWarnings("unchecked") // dispatcher not typed + static class PausedContainerTransition implements + SingleArcTransition { + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + // Container was PAUSED so tell the scheduler + container.dispatcher.getEventHandler().handle( + new ContainerSchedulerEvent(container, + ContainerSchedulerEventType.CONTAINER_PAUSED)); + } + } + + /** + * Transitions upon receiving RESUME_CONTAINER. + * - PAUSED -> RUNNING + */ + @SuppressWarnings("unchecked") // dispatcher not typed + static class ResumeContainerTransition implements + SingleArcTransition { + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + // Pause the process/process-grp if it is supported by the container + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.RESUME_CONTAINER)); + ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event; + container.addDiagnostics(resumeEvent.getDiagnostic(), "\n"); + } + } + @Override public void handle(ContainerEvent event) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java new file mode 100644 index 0000000000..898304e5e2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import org.apache.hadoop.yarn.api.records.ContainerId; + +/** + * ContainerEvent for ContainerEventType.PAUSE_CONTAINER. + */ +public class ContainerPauseEvent extends ContainerEvent { + + private final String diagnostic; + + public ContainerPauseEvent(ContainerId cId, + String diagnostic) { + super(cId, ContainerEventType.PAUSE_CONTAINER); + this.diagnostic = diagnostic; + } + + public String getDiagnostic() { + return this.diagnostic; + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java new file mode 100644 index 0000000000..d7c9e9ae79 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import org.apache.hadoop.yarn.api.records.ContainerId; + +/** + * ContainerEvent for ContainerEventType.RESUME_CONTAINER. + */ +public class ContainerResumeEvent extends ContainerEvent { + + private final String diagnostic; + + public ContainerResumeEvent(ContainerId cId, + String diagnostic) { + super(cId, ContainerEventType.RESUME_CONTAINER); + this.diagnostic = diagnostic; + } + + public String getDiagnostic() { + return this.diagnostic; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java index 91d1356934..7c3fea805b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java @@ -21,5 +21,6 @@ public enum ContainerState { NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING, REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING, - CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE + CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE, + PAUSING, PAUSED, RESUMING } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index d0ce787aaa..89dfdd1736 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; @@ -86,6 +87,7 @@ import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.ConverterUtils; public class ContainerLaunch implements Callable { @@ -106,8 +108,10 @@ public class ContainerLaunch implements Callable { private final Configuration conf; private final Context context; private final ContainerManagerImpl containerManager; - + protected AtomicBoolean containerAlreadyLaunched = new AtomicBoolean(false); + protected AtomicBoolean shouldPauseContainer = new AtomicBoolean(false); + protected AtomicBoolean completed = new AtomicBoolean(false); private volatile boolean killedBeforeStart = false; @@ -802,6 +806,90 @@ public static Signal translateCommandToSignal( return signal; } + /** + * Pause the container. + * Cancels the launch if the container isn't launched yet. Otherwise asks the + * executor to pause the container. + * @throws IOException in case of errors. + */ + @SuppressWarnings("unchecked") // dispatcher not typed + public void pauseContainer() throws IOException { + ContainerId containerId = container.getContainerId(); + String containerIdStr = containerId.toString(); + LOG.info("Pausing the container " + containerIdStr); + + // The pause event is only handled if the container is in the running state + // (the container state machine), so we don't check for + // shouldLaunchContainer over here + + if (!shouldPauseContainer.compareAndSet(false, true)) { + LOG.info("Container " + containerId + " not paused as " + + "resume already called"); + return; + } + + try { + // Pause the container + exec.pauseContainer(container); + + // PauseContainer is a blocking call. We are here almost means the + // container is paused, so send out the event. + dispatcher.getEventHandler().handle(new ContainerEvent( + containerId, + ContainerEventType.CONTAINER_PAUSED)); + } catch (Exception e) { + String message = + "Exception when trying to pause container " + containerIdStr + + ": " + StringUtils.stringifyException(e); + LOG.info(message); + container.handle(new ContainerKillEvent(container.getContainerId(), + ContainerExitStatus.PREEMPTED, "Container preempted as there was " + + " an exception in pausing it.")); + } + } + + /** + * Resume the container. + * Cancels the launch if the container isn't launched yet. Otherwise asks the + * executor to pause the container. + * @throws IOException in case of error. + */ + @SuppressWarnings("unchecked") // dispatcher not typed + public void resumeContainer() throws IOException { + ContainerId containerId = container.getContainerId(); + String containerIdStr = containerId.toString(); + LOG.info("Resuming the container " + containerIdStr); + + // The resume event is only handled if the container is in a paused state + // so we don't check for the launched flag here. + + // paused flag will be set to true if process already paused + boolean alreadyPaused = !shouldPauseContainer.compareAndSet(false, true); + if (!alreadyPaused) { + LOG.info("Container " + containerIdStr + " not paused." + + " No resume necessary"); + return; + } + + // If the container has already started + try { + exec.resumeContainer(container); + // ResumeContainer is a blocking call. We are here almost means the + // container is resumed, so send out the event. + dispatcher.getEventHandler().handle(new ContainerEvent( + containerId, + ContainerEventType.CONTAINER_RESUMED)); + } catch (Exception e) { + String message = + "Exception when trying to resume container " + containerIdStr + + ": " + StringUtils.stringifyException(e); + LOG.info(message); + container.handle(new ContainerKillEvent(container.getContainerId(), + ContainerExitStatus.PREEMPTED, "Container preempted as there was " + + " an exception in pausing it.")); + } + } + /** * Loop through for a time-bounded interval waiting to * read the process id from a file generated by a running process. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index 25909b9a2d..ca69712421 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.Dispatcher; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import com.google.common.annotations.VisibleForTesting; @@ -171,6 +173,36 @@ public void handle(ContainersLauncherEvent event) { + " with command " + signalEvent.getCommand()); } break; + case PAUSE_CONTAINER: + ContainerLaunch launchedContainer = running.get(containerId); + if (launchedContainer == null) { + // Container not launched. So nothing needs to be done. + return; + } + + // Pause the container + try { + launchedContainer.pauseContainer(); + } catch (Exception e) { + LOG.info("Got exception while pausing container: " + + StringUtils.stringifyException(e)); + } + break; + case RESUME_CONTAINER: + ContainerLaunch launchCont = running.get(containerId); + if (launchCont == null) { + // Container not launched. So nothing needs to be done. + return; + } + + // Resume the container. + try { + launchCont.resumeContainer(); + } catch (Exception e) { + LOG.info("Got exception while resuming container: " + + StringUtils.stringifyException(e)); + } + break; } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java index 380a032ca7..1054e06cb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java @@ -25,4 +25,7 @@ public enum ContainersLauncherEventType { CLEANUP_CONTAINER, // The process(grp) itself. CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself. SIGNAL_CONTAINER, + PAUSE_CONTAINER, + RESUME_CONTAINER + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java index 917eda09af..a9cbf745a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java @@ -27,4 +27,5 @@ public enum ContainerSchedulerEventType { UPDATE_CONTAINER, // Producer: Node HB response - RM has asked to shed the queue SHED_QUEUED_CONTAINERS, + CONTAINER_PAUSED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 33f460972f..8909088020 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -103,6 +103,7 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; public class TestContainer { @@ -205,6 +206,42 @@ public void testExternalKill() throws Exception { } } + @Test + @SuppressWarnings("unchecked") // mocked generic + public void testContainerPauseAndResume() throws Exception { + WrappedContainer wc = null; + try { + wc = new WrappedContainer(13, 314159265358979L, 4344, "yak"); + wc.initContainer(); + wc.localizeResources(); + int running = metrics.getRunningContainers(); + wc.launchContainer(); + assertEquals(running + 1, metrics.getRunningContainers()); + reset(wc.localizerBus); + wc.pauseContainer(); + assertEquals(ContainerState.PAUSED, + wc.c.getContainerState()); + wc.resumeContainer(); + assertEquals(ContainerState.RUNNING, + wc.c.getContainerState()); + wc.containerKilledOnRequest(); + assertEquals(ContainerState.EXITED_WITH_FAILURE, + wc.c.getContainerState()); + assertNull(wc.c.getLocalizedResources()); + verifyCleanupCall(wc); + int failed = metrics.getFailedContainers(); + wc.containerResourcesCleanup(); + assertEquals(ContainerState.DONE, wc.c.getContainerState()); + assertEquals(failed + 1, metrics.getFailedContainers()); + assertEquals(running, metrics.getRunningContainers()); + } + finally { + if (wc != null) { + wc.finished(); + } + } + } + @Test @SuppressWarnings("unchecked") // mocked generic public void testCleanupOnFailure() throws Exception { @@ -955,6 +992,8 @@ protected void scheduleContainer(Container container) { NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class); when(context.getNodeStatusUpdater()).thenReturn(nodeStatusUpdater); ContainerExecutor executor = mock(ContainerExecutor.class); + Mockito.doNothing().when(executor).pauseContainer(any(Container.class)); + Mockito.doNothing().when(executor).resumeContainer(any(Container.class)); launcher = new ContainersLauncher(context, dispatcher, executor, null, null); // create a mock ExecutorService, which will not really launch @@ -1143,6 +1182,18 @@ public void killContainer() { drainDispatcherEvents(); } + public void pauseContainer() { + c.handle(new ContainerPauseEvent(cId, + "PauseRequest")); + drainDispatcherEvents(); + } + + public void resumeContainer() { + c.handle(new ContainerResumeEvent(cId, + "ResumeRequest")); + drainDispatcherEvents(); + } + public void containerKilledOnRequest() { int exitCode = ContainerExitStatus.KILLED_BY_RESOURCEMANAGER; String diagnosticMsg = "Container completed with exit code " + exitCode;