YARN-5637. Changes in NodeManager to support Container rollback and commit. (asuresh)

This commit is contained in:
Arun Suresh 2016-09-16 16:53:18 -07:00
parent ea839bd48e
commit 3552c2b99d
8 changed files with 401 additions and 84 deletions

View File

@ -165,8 +165,8 @@
public class ContainerManagerImpl extends CompositeService implements public class ContainerManagerImpl extends CompositeService implements
ContainerManager { ContainerManager {
private enum ReinitOp { private enum ReInitOp {
UPGRADE, COMMIT, ROLLBACK, LOCALIZE; RE_INIT, COMMIT, ROLLBACK, LOCALIZE;
} }
/** /**
* Extra duration to wait for applications to be killed on shutdown. * Extra duration to wait for applications to be killed on shutdown.
@ -1535,7 +1535,7 @@ public ResourceLocalizationResponse localize(
ContainerId containerId = request.getContainerId(); ContainerId containerId = request.getContainerId();
Container container = preUpgradeOrLocalizeCheck(containerId, Container container = preUpgradeOrLocalizeCheck(containerId,
ReinitOp.LOCALIZE); ReInitOp.LOCALIZE);
try { try {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req = Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
container.getResourceSet().addResources(request.getLocalResources()); container.getResourceSet().addResources(request.getLocalResources());
@ -1551,16 +1551,31 @@ public ResourceLocalizationResponse localize(
return ResourceLocalizationResponse.newInstance(); return ResourceLocalizationResponse.newInstance();
} }
public void upgradeContainer(ContainerId containerId, /**
ContainerLaunchContext upgradeLaunchContext) throws YarnException { * ReInitialize a container using a new Launch Context. If the
* retryFailureContext is not provided, The container is
* terminated on Failure.
*
* NOTE: Auto-Commit is true by default. This also means that the rollback
* context is purged as soon as the command to start the new process
* is sent. (The Container moves to RUNNING state)
*
* @param containerId Container Id.
* @param autoCommit Auto Commit flag.
* @param reInitLaunchContext Target Launch Context.
* @throws YarnException Yarn Exception.
*/
public void reInitializeContainer(ContainerId containerId,
ContainerLaunchContext reInitLaunchContext, boolean autoCommit)
throws YarnException {
Container container = preUpgradeOrLocalizeCheck(containerId, Container container = preUpgradeOrLocalizeCheck(containerId,
ReinitOp.UPGRADE); ReInitOp.RE_INIT);
ResourceSet resourceSet = new ResourceSet(); ResourceSet resourceSet = new ResourceSet();
try { try {
resourceSet.addResources(upgradeLaunchContext.getLocalResources()); resourceSet.addResources(reInitLaunchContext.getLocalResources());
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new ContainerReInitEvent(containerId, upgradeLaunchContext, new ContainerReInitEvent(containerId, reInitLaunchContext,
resourceSet)); resourceSet, autoCommit));
container.setIsReInitializing(true); container.setIsReInitializing(true);
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
LOG.info("Error when parsing local resource URI for upgrade of" + LOG.info("Error when parsing local resource URI for upgrade of" +
@ -1569,8 +1584,41 @@ public void upgradeContainer(ContainerId containerId,
} }
} }
/**
* Rollback the last reInitialization, if possible.
* @param containerId Container ID.
* @throws YarnException Yarn Exception.
*/
public void rollbackReInitialization(ContainerId containerId)
throws YarnException {
Container container = preUpgradeOrLocalizeCheck(containerId,
ReInitOp.ROLLBACK);
if (container.canRollback()) {
dispatcher.getEventHandler().handle(
new ContainerEvent(containerId, ContainerEventType.ROLLBACK_REINIT));
} else {
throw new YarnException("Nothing to rollback to !!");
}
}
/**
* Commit last reInitialization after which no rollback will be possible.
* @param containerId Container ID.
* @throws YarnException Yarn Exception.
*/
public void commitReInitialization(ContainerId containerId)
throws YarnException {
Container container = preUpgradeOrLocalizeCheck(containerId,
ReInitOp.COMMIT);
if (container.canRollback()) {
container.commitUpgrade();
} else {
throw new YarnException("Nothing to Commit !!");
}
}
private Container preUpgradeOrLocalizeCheck(ContainerId containerId, private Container preUpgradeOrLocalizeCheck(ContainerId containerId,
ReinitOp op) throws YarnException { ReInitOp op) throws YarnException {
Container container = context.getContainers().get(containerId); Container container = context.getContainers().get(containerId);
if (container == null) { if (container == null) {
throw new YarnException("Specified " + containerId + " does not exist!"); throw new YarnException("Specified " + containerId + " does not exist!");

View File

@ -82,4 +82,8 @@ public interface Container extends EventHandler<ContainerEvent> {
void setIsReInitializing(boolean isReInitializing); void setIsReInitializing(boolean isReInitializing);
boolean isReInitializing(); boolean isReInitializing();
boolean canRollback();
void commitUpgrade();
} }

View File

@ -26,6 +26,7 @@ public enum ContainerEventType {
UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_MSG,
CONTAINER_DONE, CONTAINER_DONE,
REINITIALIZE_CONTAINER, REINITIALIZE_CONTAINER,
ROLLBACK_REINIT,
// DownloadManager // DownloadManager
CONTAINER_INITED, CONTAINER_INITED,

View File

@ -91,14 +91,42 @@
public class ContainerImpl implements Container { public class ContainerImpl implements Container {
private final static class ReInitializationContext { private static final class ReInitializationContext {
private final ResourceSet resourceSet;
private final ContainerLaunchContext newLaunchContext; private final ContainerLaunchContext newLaunchContext;
private final ResourceSet newResourceSet;
// Rollback state
private final ContainerLaunchContext oldLaunchContext;
private final ResourceSet oldResourceSet;
private ReInitializationContext(ContainerLaunchContext newLaunchContext, private ReInitializationContext(ContainerLaunchContext newLaunchContext,
ResourceSet resourceSet) { ResourceSet newResourceSet,
ContainerLaunchContext oldLaunchContext,
ResourceSet oldResourceSet) {
this.newLaunchContext = newLaunchContext; this.newLaunchContext = newLaunchContext;
this.resourceSet = resourceSet; this.newResourceSet = newResourceSet;
this.oldLaunchContext = oldLaunchContext;
this.oldResourceSet = oldResourceSet;
}
private boolean canRollback() {
return (oldLaunchContext != null);
}
private ResourceSet mergedResourceSet() {
if (oldLaunchContext == null) {
return newResourceSet;
}
return ResourceSet.merge(oldResourceSet, newResourceSet);
}
private ReInitializationContext createContextForRollback() {
if (oldLaunchContext == null) {
return null;
} else {
return new ReInitializationContext(
oldLaunchContext, oldResourceSet, null, null);
}
} }
} }
@ -129,7 +157,7 @@ private ReInitializationContext(ContainerLaunchContext newLaunchContext,
private String logDir; private String logDir;
private String host; private String host;
private String ips; private String ips;
private ReInitializationContext reInitContext; private volatile ReInitializationContext reInitContext;
private volatile boolean isReInitializing = false; private volatile boolean isReInitializing = false;
/** The NM-wide configuration - not specific to this container */ /** The NM-wide configuration - not specific to this container */
@ -187,8 +215,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
} }
// Configure the Retry Context // Configure the Retry Context
this.containerRetryContext = this.containerRetryContext = configureRetryContext(
configureRetryContext(conf, launchContext, this.containerId); conf, launchContext, this.containerId);
this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries(); this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries();
stateMachine = stateMachineFactory.make(this); stateMachine = stateMachineFactory.make(this);
this.context = context; this.context = context;
@ -320,12 +348,16 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
new ExitedWithSuccessTransition(true)) new ExitedWithSuccessTransition(true))
.addTransition(ContainerState.RUNNING, .addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.RELAUNCHING, EnumSet.of(ContainerState.RELAUNCHING,
ContainerState.LOCALIZED,
ContainerState.EXITED_WITH_FAILURE), ContainerState.EXITED_WITH_FAILURE),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new RetryFailureTransition()) new RetryFailureTransition())
.addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING, .addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
ContainerEventType.REINITIALIZE_CONTAINER, ContainerEventType.REINITIALIZE_CONTAINER,
new ReInitializeContainerTransition()) new ReInitializeContainerTransition())
.addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
ContainerEventType.ROLLBACK_REINIT,
new RollbackContainerTransition())
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING, .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.RESOURCE_LOCALIZED, ContainerEventType.RESOURCE_LOCALIZED,
new ResourceLocalizedWhileRunningTransition()) new ResourceLocalizedWhileRunningTransition())
@ -884,15 +916,15 @@ static class ReInitializeContainerTransition extends ContainerTransition {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void transition(ContainerImpl container, ContainerEvent event) { public void transition(ContainerImpl container, ContainerEvent event) {
container.reInitContext = createReInitContext(event); container.reInitContext = createReInitContext(container, event);
try { try {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
pendingResources = resByVisibility = container.reInitContext.newResourceSet
container.reInitContext.resourceSet.getAllResourcesByVisibility(); .getAllResourcesByVisibility();
if (!pendingResources.isEmpty()) { if (!resByVisibility.isEmpty()) {
container.dispatcher.getEventHandler().handle( container.dispatcher.getEventHandler().handle(
new ContainerLocalizationRequestEvent( new ContainerLocalizationRequestEvent(
container, pendingResources)); container, resByVisibility));
} else { } else {
// We are not waiting on any resources, so... // We are not waiting on any resources, so...
// Kill the current container. // Kill the current container.
@ -909,10 +941,30 @@ public void transition(ContainerImpl container, ContainerEvent event) {
} }
protected ReInitializationContext createReInitContext( protected ReInitializationContext createReInitContext(
ContainerEvent event) { ContainerImpl container, ContainerEvent event) {
ContainerReInitEvent rEvent = (ContainerReInitEvent)event; ContainerReInitEvent reInitEvent = (ContainerReInitEvent)event;
return new ReInitializationContext(rEvent.getReInitLaunchContext(), return new ReInitializationContext(
rEvent.getResourceSet()); reInitEvent.getReInitLaunchContext(),
reInitEvent.getResourceSet(),
// If AutoCommit is turned on, then no rollback can happen...
// So don't need to store the previous context.
(reInitEvent.isAutoCommit() ? null : container.launchContext),
(reInitEvent.isAutoCommit() ? null : container.resourceSet));
}
}
/**
* Transition to start the Rollback process.
*/
static class RollbackContainerTransition extends
ReInitializeContainerTransition {
@Override
protected ReInitializationContext createReInitContext(ContainerImpl
container, ContainerEvent event) {
LOG.warn("Container [" + container.getContainerId() + "]" +
" about to be explicitly Rolledback !!");
return container.reInitContext.createContextForRollback();
} }
} }
@ -928,10 +980,10 @@ static class ResourceLocalizedWhileReInitTransition
public void transition(ContainerImpl container, ContainerEvent event) { public void transition(ContainerImpl container, ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent = ContainerResourceLocalizedEvent rsrcEvent =
(ContainerResourceLocalizedEvent) event; (ContainerResourceLocalizedEvent) event;
container.reInitContext.resourceSet.resourceLocalized( container.reInitContext.newResourceSet.resourceLocalized(
rsrcEvent.getResource(), rsrcEvent.getLocation()); rsrcEvent.getResource(), rsrcEvent.getLocation());
// Check if all ResourceLocalization has completed // Check if all ResourceLocalization has completed
if (container.reInitContext.resourceSet.getPendingResources() if (container.reInitContext.newResourceSet.getPendingResources()
.isEmpty()) { .isEmpty()) {
// Kill the current container. // Kill the current container.
container.dispatcher.getEventHandler().handle( container.dispatcher.getEventHandler().handle(
@ -1028,10 +1080,13 @@ public void transition(ContainerImpl container, ContainerEvent event) {
container.metrics.runningContainer(); container.metrics.runningContainer();
container.wasLaunched = true; container.wasLaunched = true;
if (container.reInitContext != null) { container.setIsReInitializing(false);
// Check if this launch was due to a re-initialization.
// If autocommit == true, then wipe the re-init context. This ensures
// that any subsequent failures do not trigger a rollback.
if (container.reInitContext != null
&& !container.reInitContext.canRollback()) {
container.reInitContext = null; container.reInitContext = null;
// Set rollback context here..
container.setIsReInitializing(false);
} }
if (container.recoveredAsKilled) { if (container.recoveredAsKilled) {
@ -1148,36 +1203,50 @@ public ContainerState transition(final ContainerImpl container,
+ container.getContainerId(), e); + container.getContainerId(), e);
} }
} }
LOG.info("Relaunching Container " + container.getContainerId() doRelaunch(container, container.remainingRetryAttempts,
+ ". Remaining retry attempts(after relaunch) : " container.containerRetryContext.getRetryInterval());
+ container.remainingRetryAttempts
+ ". Interval between retries is "
+ container.containerRetryContext.getRetryInterval() + "ms");
container.wasLaunched = false;
container.metrics.endRunningContainer();
if (container.containerRetryContext.getRetryInterval() == 0) {
container.sendRelaunchEvent();
} else {
// wait for some time, then send launch event
new Thread() {
@Override
public void run() {
try {
Thread.sleep(
container.containerRetryContext.getRetryInterval());
container.sendRelaunchEvent();
} catch (InterruptedException e) {
return;
}
}
}.start();
}
return ContainerState.RELAUNCHING; return ContainerState.RELAUNCHING;
} else if (container.canRollback()) {
// Rollback is possible only if the previous launch context is
// available.
container.addDiagnostics("Container Re-init Auto Rolled-Back.");
LOG.info("Rolling back Container reInitialization for [" +
container.getContainerId() + "] !!");
container.reInitContext =
container.reInitContext.createContextForRollback();
new KilledForReInitializationTransition().transition(container, event);
return ContainerState.LOCALIZED;
} else { } else {
new ExitedWithFailureTransition(true).transition(container, event); new ExitedWithFailureTransition(true).transition(container, event);
return ContainerState.EXITED_WITH_FAILURE; return ContainerState.EXITED_WITH_FAILURE;
} }
} }
private void doRelaunch(final ContainerImpl container,
int remainingRetryAttempts, final int retryInterval) {
LOG.info("Relaunching Container " + container.getContainerId()
+ ". Remaining retry attempts(after relaunch) : "
+ remainingRetryAttempts + ". Interval between retries is "
+ retryInterval + "ms");
container.wasLaunched = false;
container.metrics.endRunningContainer();
if (retryInterval == 0) {
container.sendRelaunchEvent();
} else {
// wait for some time, then send launch event
new Thread() {
@Override
public void run() {
try {
Thread.sleep(retryInterval);
container.sendRelaunchEvent();
} catch (InterruptedException e) {
return;
}
}
}.start();
}
}
} }
@Override @Override
@ -1188,24 +1257,29 @@ public boolean isRetryContextSet() {
@Override @Override
public boolean shouldRetry(int errorCode) { public boolean shouldRetry(int errorCode) {
return shouldRetry(errorCode, containerRetryContext,
remainingRetryAttempts);
}
public static boolean shouldRetry(int errorCode,
ContainerRetryContext retryContext, int remainingRetryAttempts) {
if (errorCode == ExitCode.SUCCESS.getExitCode() if (errorCode == ExitCode.SUCCESS.getExitCode()
|| errorCode == ExitCode.FORCE_KILLED.getExitCode() || errorCode == ExitCode.FORCE_KILLED.getExitCode()
|| errorCode == ExitCode.TERMINATED.getExitCode()) { || errorCode == ExitCode.TERMINATED.getExitCode()) {
return false; return false;
} }
ContainerRetryPolicy retryPolicy = containerRetryContext.getRetryPolicy(); ContainerRetryPolicy retryPolicy = retryContext.getRetryPolicy();
if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS
|| (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES
&& containerRetryContext.getErrorCodes() != null && retryContext.getErrorCodes() != null
&& containerRetryContext.getErrorCodes().contains(errorCode))) { && retryContext.getErrorCodes().contains(errorCode))) {
return remainingRetryAttempts > 0 return remainingRetryAttempts > 0
|| remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER; || remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER;
} }
return false; return false;
} }
/** /**
* Transition to EXITED_WITH_FAILURE * Transition to EXITED_WITH_FAILURE
*/ */
@ -1240,13 +1314,12 @@ public void transition(ContainerImpl container,
// Re configure the Retry Context // Re configure the Retry Context
container.containerRetryContext = container.containerRetryContext =
configureRetryContext(container.context.getConf(), configureRetryContext(container.context.getConf(),
container.launchContext, container.containerId); container.launchContext, container.containerId);
// Reset the retry attempts since its a fresh start // Reset the retry attempts since its a fresh start
container.remainingRetryAttempts = container.remainingRetryAttempts =
container.containerRetryContext.getMaxRetries(); container.containerRetryContext.getMaxRetries();
container.resourceSet = ResourceSet.merge( container.resourceSet = container.reInitContext.mergedResourceSet();
container.resourceSet, container.reInitContext.resourceSet);
container.sendLaunchEvent(); container.sendLaunchEvent();
} }
@ -1589,4 +1662,15 @@ public void setIsReInitializing(boolean isReInitializing) {
public boolean isReInitializing() { public boolean isReInitializing() {
return this.isReInitializing; return this.isReInitializing;
} }
@Override
public boolean canRollback() {
return (this.reInitContext != null)
&& (this.reInitContext.canRollback());
}
@Override
public void commitUpgrade() {
this.reInitContext = null;
}
} }

View File

@ -30,18 +30,22 @@ public class ContainerReInitEvent extends ContainerEvent {
private final ContainerLaunchContext reInitLaunchContext; private final ContainerLaunchContext reInitLaunchContext;
private final ResourceSet resourceSet; private final ResourceSet resourceSet;
private final boolean autoCommit;
/** /**
* Container Re-Init Event. * Container Re-Init Event.
* @param cID Container Id * @param cID Container Id.
* @param upgradeContext Upgrade context * @param upgradeContext Upgrade Context.
* @param resourceSet Resource Set * @param resourceSet Resource Set.
* @param autoCommit Auto Commit.
*/ */
public ContainerReInitEvent(ContainerId cID, public ContainerReInitEvent(ContainerId cID,
ContainerLaunchContext upgradeContext, ResourceSet resourceSet){ ContainerLaunchContext upgradeContext,
ResourceSet resourceSet, boolean autoCommit){
super(cID, ContainerEventType.REINITIALIZE_CONTAINER); super(cID, ContainerEventType.REINITIALIZE_CONTAINER);
this.reInitLaunchContext = upgradeContext; this.reInitLaunchContext = upgradeContext;
this.resourceSet = resourceSet; this.resourceSet = resourceSet;
this.autoCommit = autoCommit;
} }
/** /**
@ -59,4 +63,12 @@ public ContainerLaunchContext getReInitLaunchContext() {
public ResourceSet getResourceSet() { public ResourceSet getResourceSet() {
return resourceSet; return resourceSet;
} }
/**
* Should this re-Initialization be auto-committed.
* @return AutoCommit.
*/
public boolean isAutoCommit() {
return autoCommit;
}
} }

View File

@ -270,15 +270,15 @@ public void testForcefulShutdownSignal() throws IOException,
} }
@Override @Override
public void testContainerUpgradeSuccess() throws IOException, public void testContainerUpgradeSuccessAutoCommit() throws IOException,
InterruptedException, YarnException { InterruptedException, YarnException {
// Don't run the test if the binary is not available. // Don't run the test if the binary is not available.
if (!shouldRunTest()) { if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test"); LOG.info("LCE binary path is not passed. Not running the test");
return; return;
} }
LOG.info("Running testContainerUpgradeSuccess"); LOG.info("Running testContainerUpgradeSuccessAutoCommit");
super.testContainerUpgradeSuccess(); super.testContainerUpgradeSuccessAutoCommit();
} }
@Override @Override
@ -293,6 +293,42 @@ public void testContainerUpgradeLocalizationFailure() throws IOException,
super.testContainerUpgradeLocalizationFailure(); super.testContainerUpgradeLocalizationFailure();
} }
@Override
public void testContainerUpgradeSuccessExplicitCommit() throws IOException,
InterruptedException, YarnException {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
LOG.info("Running testContainerUpgradeSuccessExplicitCommit");
super.testContainerUpgradeSuccessExplicitCommit();
}
@Override
public void testContainerUpgradeSuccessExplicitRollback() throws IOException,
InterruptedException, YarnException {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
LOG.info("Running testContainerUpgradeSuccessExplicitRollback");
super.testContainerUpgradeSuccessExplicitRollback();
}
@Override
public void testContainerUpgradeRollbackDueToFailure() throws IOException,
InterruptedException, YarnException {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
LOG.info("Running testContainerUpgradeRollbackDueToFailure");
super.testContainerUpgradeRollbackDueToFailure();
}
@Override @Override
public void testContainerUpgradeProcessFailure() throws IOException, public void testContainerUpgradeProcessFailure() throws IOException,
InterruptedException, YarnException { InterruptedException, YarnException {

View File

@ -369,9 +369,8 @@ public void testContainerLaunchAndStop() throws IOException,
DefaultContainerExecutor.containerIsAlive(pid)); DefaultContainerExecutor.containerIsAlive(pid));
} }
@Test private String[] testContainerUpgradeSuccess(boolean autoCommit)
public void testContainerUpgradeSuccess() throws IOException, throws IOException, InterruptedException, YarnException {
InterruptedException, YarnException {
containerManager.start(); containerManager.start();
// ////// Construct the Container-id // ////// Construct the Container-id
ContainerId cId = createContainerId(0); ContainerId cId = createContainerId(0);
@ -381,7 +380,7 @@ public void testContainerUpgradeSuccess() throws IOException,
File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile(); File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
prepareContainerUpgrade(false, false, cId, newStartFile); prepareContainerUpgrade(autoCommit, false, false, cId, newStartFile);
// Assert that the First process is not alive anymore // Assert that the First process is not alive anymore
Assert.assertFalse("Process is still alive!", Assert.assertFalse("Process is still alive!",
@ -407,6 +406,80 @@ public void testContainerUpgradeSuccess() throws IOException,
// Assert that the New process is alive // Assert that the New process is alive
Assert.assertTrue("New Process is not alive!", Assert.assertTrue("New Process is not alive!",
DefaultContainerExecutor.containerIsAlive(newPid)); DefaultContainerExecutor.containerIsAlive(newPid));
return new String[]{pid, newPid};
}
@Test
public void testContainerUpgradeSuccessAutoCommit() throws IOException,
InterruptedException, YarnException {
testContainerUpgradeSuccess(true);
// Should not be able to Commit (since already auto committed)
try {
containerManager.commitReInitialization(createContainerId(0));
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Nothing to Commit"));
}
}
@Test
public void testContainerUpgradeSuccessExplicitCommit() throws IOException,
InterruptedException, YarnException {
testContainerUpgradeSuccess(false);
ContainerId cId = createContainerId(0);
containerManager.commitReInitialization(cId);
// Should not be able to Rollback once committed
try {
containerManager.rollbackReInitialization(cId);
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Nothing to rollback to"));
}
}
@Test
public void testContainerUpgradeSuccessExplicitRollback() throws IOException,
InterruptedException, YarnException {
String[] pids = testContainerUpgradeSuccess(false);
// Delete the old start File..
File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
oldStartFile.delete();
ContainerId cId = createContainerId(0);
// Explicit Rollback
containerManager.rollbackReInitialization(cId);
// Original should be dead anyway
Assert.assertFalse("Original Process is still alive!",
DefaultContainerExecutor.containerIsAlive(pids[0]));
// Wait for upgraded process to die
int timeoutSecs = 0;
while (!DefaultContainerExecutor.containerIsAlive(pids[1])
&& timeoutSecs++ < 20) {
Thread.sleep(1000);
LOG.info("Waiting for Upgraded process to die..");
}
timeoutSecs = 0;
// Wait for new processStartfile to be created
while (!oldStartFile.exists() && timeoutSecs++ < 20) {
Thread.sleep(1000);
LOG.info("Waiting for New process start-file to be created");
}
// Now verify the contents of the file
BufferedReader reader =
new BufferedReader(new FileReader(oldStartFile));
Assert.assertEquals("Hello World!", reader.readLine());
// Get the pid of the process
String rolledBackPid = reader.readLine().trim();
// No more lines
Assert.assertEquals(null, reader.readLine());
Assert.assertNotEquals("The Rolled-back process should be a different pid",
pids[0], rolledBackPid);
} }
@Test @Test
@ -424,7 +497,7 @@ public void testContainerUpgradeLocalizationFailure() throws IOException,
File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile(); File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
prepareContainerUpgrade(true, true, cId, newStartFile); prepareContainerUpgrade(false, true, true, cId, newStartFile);
// Assert that the First process is STILL alive // Assert that the First process is STILL alive
// since upgrade was terminated.. // since upgrade was terminated..
@ -447,22 +520,69 @@ public void testContainerUpgradeProcessFailure() throws IOException,
File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile(); File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
prepareContainerUpgrade(true, false, cId, newStartFile); // Since Autocommit is true, there is also no rollback context...
// which implies that if the new process fails, since there is no
// rollback, it is terminated.
prepareContainerUpgrade(true, true, false, cId, newStartFile);
// Assert that the First process is not alive anymore // Assert that the First process is not alive anymore
Assert.assertFalse("Process is still alive!", Assert.assertFalse("Process is still alive!",
DefaultContainerExecutor.containerIsAlive(pid)); DefaultContainerExecutor.containerIsAlive(pid));
} }
@Test
public void testContainerUpgradeRollbackDueToFailure() throws IOException,
InterruptedException, YarnException {
if (Shell.WINDOWS) {
return;
}
containerManager.start();
// ////// Construct the Container-id
ContainerId cId = createContainerId(0);
File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
String pid = prepareInitialContainer(cId, oldStartFile);
File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
prepareContainerUpgrade(false, true, false, cId, newStartFile);
// Assert that the First process is not alive anymore
Assert.assertFalse("Original Process is still alive!",
DefaultContainerExecutor.containerIsAlive(pid));
int timeoutSecs = 0;
// Wait for oldStartFile to be created
while (!oldStartFile.exists() && timeoutSecs++ < 20) {
System.out.println("\nFiles: " +
Arrays.toString(oldStartFile.getParentFile().list()));
Thread.sleep(1000);
LOG.info("Waiting for New process start-file to be created");
}
// Now verify the contents of the file
BufferedReader reader =
new BufferedReader(new FileReader(oldStartFile));
Assert.assertEquals("Hello World!", reader.readLine());
// Get the pid of the process
String rolledBackPid = reader.readLine().trim();
// No more lines
Assert.assertEquals(null, reader.readLine());
Assert.assertNotEquals("The Rolled-back process should be a different pid",
pid, rolledBackPid);
}
/** /**
* Prepare a launch Context for container upgrade and request the * Prepare a launch Context for container upgrade and request the
* Container Manager to re-initialize a running container using the * Container Manager to re-initialize a running container using the
* new launch context. * new launch context.
* @param autoCommit Enable autoCommit.
* @param failCmd injects a start script that intentionally fails. * @param failCmd injects a start script that intentionally fails.
* @param failLoc injects a bad file Location that will fail localization. * @param failLoc injects a bad file Location that will fail localization.
*/ */
private void prepareContainerUpgrade(boolean failCmd, boolean failLoc, private void prepareContainerUpgrade(boolean autoCommit, boolean failCmd,
ContainerId cId, File startFile) boolean failLoc, ContainerId cId, File startFile)
throws FileNotFoundException, YarnException, InterruptedException { throws FileNotFoundException, YarnException, InterruptedException {
// Re-write scriptfile and processStartFile // Re-write scriptfile and processStartFile
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new"); File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new");
@ -471,13 +591,15 @@ private void prepareContainerUpgrade(boolean failCmd, boolean failLoc,
writeScriptFile(fileWriter, "Upgrade World!", startFile, cId, failCmd); writeScriptFile(fileWriter, "Upgrade World!", startFile, cId, failCmd);
ContainerLaunchContext containerLaunchContext = ContainerLaunchContext containerLaunchContext =
prepareContainerLaunchContext(scriptFile, "dest_file_new", failLoc); prepareContainerLaunchContext(scriptFile, "dest_file_new", failLoc, 0);
containerManager.upgradeContainer(cId, containerLaunchContext); containerManager.reInitializeContainer(cId, containerLaunchContext,
autoCommit);
try { try {
containerManager.upgradeContainer(cId, containerLaunchContext); containerManager.reInitializeContainer(cId, containerLaunchContext,
autoCommit);
} catch (Exception e) { } catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Cannot perform UPGRADE")); Assert.assertTrue(e.getMessage().contains("Cannot perform RE_INIT"));
} }
int timeoutSecs = 0; int timeoutSecs = 0;
int maxTimeToWait = failLoc ? 10 : 20; int maxTimeToWait = failLoc ? 10 : 20;
@ -501,7 +623,7 @@ private String prepareInitialContainer(ContainerId cId, File startFile)
writeScriptFile(fileWriterOld, "Hello World!", startFile, cId, false); writeScriptFile(fileWriterOld, "Hello World!", startFile, cId, false);
ContainerLaunchContext containerLaunchContext = ContainerLaunchContext containerLaunchContext =
prepareContainerLaunchContext(scriptFileOld, "dest_file", false); prepareContainerLaunchContext(scriptFileOld, "dest_file", false, 4);
StartContainerRequest scRequest = StartContainerRequest scRequest =
StartContainerRequest.newInstance(containerLaunchContext, StartContainerRequest.newInstance(containerLaunchContext,
@ -562,7 +684,7 @@ private void writeScriptFile(PrintWriter fileWriter, String startLine,
} }
private ContainerLaunchContext prepareContainerLaunchContext(File scriptFile, private ContainerLaunchContext prepareContainerLaunchContext(File scriptFile,
String destFName, boolean putBadFile) { String destFName, boolean putBadFile, int numRetries) {
ContainerLaunchContext containerLaunchContext = ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class); recordFactory.newRecordInstance(ContainerLaunchContext.class);
URL resourceAlpha = null; URL resourceAlpha = null;
@ -592,7 +714,7 @@ private ContainerLaunchContext prepareContainerLaunchContext(File scriptFile,
ContainerRetryContext containerRetryContext = ContainerRetryContext ContainerRetryContext containerRetryContext = ContainerRetryContext
.newInstance( .newInstance(
ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES, ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES,
new HashSet<>(Arrays.asList(Integer.valueOf(111))), 4, 0); new HashSet<>(Arrays.asList(Integer.valueOf(111))), numRetries, 0);
containerLaunchContext.setContainerRetryContext(containerRetryContext); containerLaunchContext.setContainerRetryContext(containerRetryContext);
List<String> commands = Arrays.asList( List<String> commands = Arrays.asList(
Shell.getRunScriptCommand(scriptFile)); Shell.getRunScriptCommand(scriptFile));

View File

@ -205,4 +205,14 @@ public void setIsReInitializing(boolean isReInitializing) {
public boolean isReInitializing() { public boolean isReInitializing() {
return false; return false;
} }
@Override
public boolean canRollback() {
return false;
}
@Override
public void commitUpgrade() {
}
} }