YARN-9071. Improved status update for reinitialized containers.

Contributed by Chandni Singh
This commit is contained in:
Eric Yang 2018-12-05 17:00:56 -05:00
parent 1dabb31cdf
commit 1b790f4dd1
7 changed files with 131 additions and 36 deletions

View File

@ -152,10 +152,14 @@ CANCEL_UPGRADE, new CancelUpgradeTransition())
REINITIALIZED), START, new StartedAfterUpgradeTransition())
.addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, INIT),
STOP, new StoppedAfterCancelUpgradeTransition())
// FROM REINITIALIZED
.addTransition(REINITIALIZED, CANCEL_UPGRADING, CANCEL_UPGRADE,
new CancelledAfterReinitTransition())
.addTransition(REINITIALIZED, READY, BECOME_READY,
new ContainerBecomeReadyTransition(true))
.addTransition(REINITIALIZED, REINITIALIZED, STOP,
new StoppedAfterUpgradeTransition())
.installTopology();
public ComponentInstance(Component component,
@ -182,20 +186,7 @@ private static class ContainerStartedTransition extends BaseTransition {
@Override public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
// Query container status for ip and host
boolean cancelOnSuccess = true;
if (compInstance.getCompSpec().getArtifact() != null && compInstance
.getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) {
// A docker container might get a different IP if the container is
// relaunched by the NM, so we need to keep checking the status.
// This is a temporary fix until the NM provides a callback for
// container relaunch (see YARN-8265).
cancelOnSuccess = false;
}
compInstance.containerStatusFuture =
compInstance.scheduler.executorService.scheduleAtFixedRate(
new ContainerStatusRetriever(compInstance.scheduler,
event.getContainerId(), compInstance, cancelOnSuccess), 0, 1,
TimeUnit.SECONDS);
compInstance.initializeStatusRetriever(event);
long containerStartTime = System.currentTimeMillis();
try {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
@ -275,6 +266,7 @@ public ComponentInstanceState transition(ComponentInstance instance,
instance.upgradeInProgress.set(false);
instance.setContainerState(ContainerState.RUNNING_BUT_UNREADY);
instance.initializeStatusRetriever(event);
Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ?
instance.component.getUpgradeStatus() :
@ -570,13 +562,9 @@ public void transition(ComponentInstance instance,
instance.setContainerState(ContainerState.UPGRADING);
instance.component.decContainersReady(false);
Component.UpgradeStatus status = instance.component.getUpgradeStatus();
instance.scheduler.getContainerLaunchService()
.reInitCompInstance(instance.scheduler.getApp(), instance,
instance.container,
instance.component.createLaunchContext(
status.getTargetSpec(),
status.getTargetVersion()));
Component.UpgradeStatus upgradeStatus = instance.component.
getUpgradeStatus();
instance.reInitHelper(upgradeStatus);
}
}
@ -632,11 +620,35 @@ private void cancelUpgrade() {
LOG.info("{} cancelling upgrade", container.getId());
setContainerState(ContainerState.UPGRADING);
Component.UpgradeStatus cancelStatus = component.getCancelUpgradeStatus();
reInitHelper(cancelStatus);
}
private void reInitHelper(Component.UpgradeStatus upgradeStatus) {
cancelContainerStatusRetriever();
setContainerStatus(null);
scheduler.executorService.submit(() -> cleanupRegistry(container.getId()));
scheduler.getContainerLaunchService()
.reInitCompInstance(scheduler.getApp(), this,
this.container, this.component.createLaunchContext(
cancelStatus.getTargetSpec(),
cancelStatus.getTargetVersion()));
upgradeStatus.getTargetSpec(),
upgradeStatus.getTargetVersion()));
}
private void initializeStatusRetriever(ComponentInstanceEvent event) {
boolean cancelOnSuccess = true;
if (getCompSpec().getArtifact() != null &&
getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) {
// A docker container might get a different IP if the container is
// relaunched by the NM, so we need to keep checking the status.
// This is a temporary fix until the NM provides a callback for
// container relaunch (see YARN-8265).
cancelOnSuccess = false;
}
containerStatusFuture =
scheduler.executorService.scheduleAtFixedRate(
new ContainerStatusRetriever(scheduler, event.getContainerId(),
this, cancelOnSuccess), 0, 1,
TimeUnit.SECONDS);
}
public ComponentInstanceState getState() {
@ -723,11 +735,25 @@ public String getCompInstanceName() {
}
public ContainerStatus getContainerStatus() {
return status;
try {
readLock.lock();
return status;
} finally {
readLock.unlock();
}
}
private void setContainerStatus(ContainerStatus latestStatus) {
try {
writeLock.lock();
this.status = latestStatus;
} finally {
writeLock.unlock();
}
}
public void updateContainerStatus(ContainerStatus status) {
this.status = status;
setContainerStatus(status);
org.apache.hadoop.yarn.service.api.records.Container container =
getCompSpec().getContainer(status.getContainerId().toString());
boolean doRegistryUpdate = true;

View File

@ -140,6 +140,42 @@ public void testContainerUpgradeFailed() throws Exception {
.getId().toString()).getState());
}
@Test
public void testFailureAfterReinit() throws Exception {
ServiceContext context = TestComponent.createTestContext(rule,
"testContainerUpgradeFailed");
Component component = context.scheduler.getAllComponents().entrySet()
.iterator().next().getValue();
upgradeComponent(component);
ComponentInstance instance = component.getAllComponentInstances().iterator()
.next();
ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
instance.handle(upgradeEvent);
// NM finished updgrae
instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
ComponentInstanceEventType.START));
Assert.assertEquals("instance not running",
ContainerState.RUNNING_BUT_UNREADY,
component.getComponentSpec().getContainer(instance.getContainer()
.getId().toString()).getState());
ContainerStatus containerStatus = mock(ContainerStatus.class);
when(containerStatus.getExitStatus()).thenReturn(
ContainerExitStatus.ABORTED);
ComponentInstanceEvent stopEvent = new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.STOP)
.setStatus(containerStatus);
// this is the call back from NM for the upgrade
instance.handle(stopEvent);
Assert.assertEquals("instance did not fail", ContainerState.FAILED_UPGRADE,
component.getComponentSpec().getContainer(instance.getContainer()
.getId().toString()).getState());
}
@Test
public void testCancelNothingToUpgrade() throws Exception {
ServiceContext context = TestComponent.createTestContext(rule,

View File

@ -934,10 +934,21 @@ public void setWorkDir(String workDir) {
this.workDir = workDir;
}
private void clearIpAndHost() {
LOG.info("{} clearing ip and host", containerId);
this.ips = null;
this.host = null;
}
@Override
public void setIpAndHost(String[] ipAndHost) {
this.ips = ipAndHost[0];
this.host = ipAndHost[1];
try {
this.writeLock.lock();
this.ips = ipAndHost[0];
this.host = ipAndHost[1];
} finally {
this.writeLock.unlock();
}
}
@Override
@ -1729,7 +1740,11 @@ public void transition(ContainerImpl container,
+ "] for re-initialization !!");
container.wasLaunched = false;
container.metrics.endRunningContainer();
container.clearIpAndHost();
// Remove the container from the resource-monitor. When container
// is launched again, it is added back to monitoring service.
container.dispatcher.getEventHandler().handle(
new ContainerStopMonitoringEvent(container.containerId, true));
container.launchContext = container.reInitContext.newLaunchContext;
// Re configure the Retry Context
@ -1894,7 +1909,7 @@ public void transition(ContainerImpl container, ContainerEvent event) {
if (container.containerMetrics != null) {
container.containerMetrics
.recordFinishTimeAndExitCode(clock.getTime(), container.exitCode);
container.containerMetrics.finished();
container.containerMetrics.finished(false);
}
container.sendFinishedEvents();

View File

@ -242,14 +242,18 @@ public synchronized void getMetrics(MetricsCollector collector, boolean all) {
}
}
public synchronized void finished() {
public synchronized void finished(boolean unregisterWithoutDelay) {
if (!finished) {
this.finished = true;
if (timer != null) {
timer.cancel();
timer = null;
}
scheduleTimerTaskForUnregistration();
if (!unregisterWithoutDelay) {
scheduleTimerTaskForUnregistration();
} else {
ContainerMetrics.unregisterContainerMetrics(ContainerMetrics.this);
}
this.pMemMBQuantiles.stop();
this.cpuCoreUsagePercentQuantiles.stop();
}

View File

@ -22,8 +22,20 @@
public class ContainerStopMonitoringEvent extends ContainersMonitorEvent {
private final boolean forReInit;
public ContainerStopMonitoringEvent(ContainerId containerId) {
super(containerId, ContainersMonitorEventType.STOP_MONITORING_CONTAINER);
forReInit = false;
}
public ContainerStopMonitoringEvent(ContainerId containerId,
boolean forReInit) {
super(containerId, ContainersMonitorEventType.STOP_MONITORING_CONTAINER);
this.forReInit = forReInit;
}
public boolean isForReInit() {
return forReInit;
}
}

View File

@ -533,7 +533,7 @@ public void run() {
} catch (Exception e) {
// Log the exception and proceed to the next container.
LOG.warn("Uncaught exception in ContainersMonitorImpl "
+ "while monitoring resource of " + containerId, e);
+ "while monitoring resource of {}", containerId, e);
}
}
if (LOG.isDebugEnabled()) {
@ -861,10 +861,12 @@ private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) {
vmemLimitMBs, pmemLimitMBs, cpuVcores);
break;
case STOP_MONITORING_CONTAINER:
ContainerStopMonitoringEvent stopEvent =
(ContainerStopMonitoringEvent) monitoringEvent;
usageMetrics = ContainerMetrics.getContainerMetrics(
containerId);
if (usageMetrics != null) {
usageMetrics.finished();
usageMetrics.finished(stopEvent.isForReInit());
}
break;
case CHANGE_MONITORING_CONTAINER_RESOURCE:

View File

@ -63,7 +63,7 @@ public void testContainerMetricsFlow() throws InterruptedException {
assertEquals(ERR, 1, collector.getRecords().size());
collector.clear();
metrics.finished();
metrics.finished(false);
metrics.getMetrics(collector, true);
assertEquals(ERR, 1, collector.getRecords().size());
collector.clear();
@ -137,8 +137,8 @@ public void testContainerMetricsFinished() throws InterruptedException {
ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3);
ContainerMetrics metrics3 = ContainerMetrics.forContainer(system,
containerId3, 1, 0);
metrics1.finished();
metrics2.finished();
metrics1.finished(false);
metrics2.finished(false);
system.sampleMetrics();
system.sampleMetrics();
Thread.sleep(100);