YARN-9084. Reset container state and defer readiness check for upgrade.
Contributed by Chandni Singh
This commit is contained in:
parent
c7a5a4435e
commit
ccdd982e51
@ -68,12 +68,6 @@ public List<Component> findTargetComponentSpecs(Service currentDef,
|
|||||||
"not supported by upgrade");
|
"not supported by upgrade");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!Objects.equals(currentDef.getQuicklinks(),
|
|
||||||
targetDef.getQuicklinks())) {
|
|
||||||
throw new UnsupportedOperationException("changes to quick links " +
|
|
||||||
"not supported by upgrade");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!Objects.equals(currentDef.getLaunchTime(),
|
if (!Objects.equals(currentDef.getLaunchTime(),
|
||||||
targetDef.getLaunchTime())) {
|
targetDef.getLaunchTime())) {
|
||||||
throw new UnsupportedOperationException("changes to launch time " +
|
throw new UnsupportedOperationException("changes to launch time " +
|
||||||
|
@ -47,6 +47,7 @@
|
|||||||
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
||||||
import org.apache.hadoop.yarn.service.component.ComponentEventType;
|
import org.apache.hadoop.yarn.service.component.ComponentEventType;
|
||||||
import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
|
import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
|
||||||
|
import org.apache.hadoop.yarn.service.monitor.probe.DefaultProbe;
|
||||||
import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
|
import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
|
||||||
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
||||||
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
|
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
|
||||||
@ -186,7 +187,7 @@ private static class ContainerStartedTransition extends BaseTransition {
|
|||||||
@Override public void transition(ComponentInstance compInstance,
|
@Override public void transition(ComponentInstance compInstance,
|
||||||
ComponentInstanceEvent event) {
|
ComponentInstanceEvent event) {
|
||||||
// Query container status for ip and host
|
// Query container status for ip and host
|
||||||
compInstance.initializeStatusRetriever(event);
|
compInstance.initializeStatusRetriever(event, 0);
|
||||||
long containerStartTime = System.currentTimeMillis();
|
long containerStartTime = System.currentTimeMillis();
|
||||||
try {
|
try {
|
||||||
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
|
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
|
||||||
@ -266,7 +267,12 @@ public ComponentInstanceState transition(ComponentInstance instance,
|
|||||||
|
|
||||||
instance.upgradeInProgress.set(false);
|
instance.upgradeInProgress.set(false);
|
||||||
instance.setContainerState(ContainerState.RUNNING_BUT_UNREADY);
|
instance.setContainerState(ContainerState.RUNNING_BUT_UNREADY);
|
||||||
instance.initializeStatusRetriever(event);
|
if (instance.component.getProbe() != null &&
|
||||||
|
instance.component.getProbe() instanceof DefaultProbe) {
|
||||||
|
instance.initializeStatusRetriever(event, 30);
|
||||||
|
} else {
|
||||||
|
instance.initializeStatusRetriever(event, 0);
|
||||||
|
}
|
||||||
|
|
||||||
Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ?
|
Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ?
|
||||||
instance.component.getUpgradeStatus() :
|
instance.component.getUpgradeStatus() :
|
||||||
@ -625,7 +631,7 @@ private void cancelUpgrade() {
|
|||||||
|
|
||||||
private void reInitHelper(Component.UpgradeStatus upgradeStatus) {
|
private void reInitHelper(Component.UpgradeStatus upgradeStatus) {
|
||||||
cancelContainerStatusRetriever();
|
cancelContainerStatusRetriever();
|
||||||
setContainerStatus(null);
|
setContainerStatus(container.getId(), null);
|
||||||
scheduler.executorService.submit(() -> cleanupRegistry(container.getId()));
|
scheduler.executorService.submit(() -> cleanupRegistry(container.getId()));
|
||||||
scheduler.getContainerLaunchService()
|
scheduler.getContainerLaunchService()
|
||||||
.reInitCompInstance(scheduler.getApp(), this,
|
.reInitCompInstance(scheduler.getApp(), this,
|
||||||
@ -634,7 +640,8 @@ private void reInitHelper(Component.UpgradeStatus upgradeStatus) {
|
|||||||
upgradeStatus.getTargetVersion()));
|
upgradeStatus.getTargetVersion()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initializeStatusRetriever(ComponentInstanceEvent event) {
|
private void initializeStatusRetriever(ComponentInstanceEvent event,
|
||||||
|
long initialDelay) {
|
||||||
boolean cancelOnSuccess = true;
|
boolean cancelOnSuccess = true;
|
||||||
if (getCompSpec().getArtifact() != null &&
|
if (getCompSpec().getArtifact() != null &&
|
||||||
getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) {
|
getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) {
|
||||||
@ -644,10 +651,11 @@ private void initializeStatusRetriever(ComponentInstanceEvent event) {
|
|||||||
// container relaunch (see YARN-8265).
|
// container relaunch (see YARN-8265).
|
||||||
cancelOnSuccess = false;
|
cancelOnSuccess = false;
|
||||||
}
|
}
|
||||||
|
LOG.info("{} retrieve status after {}", compInstanceId, initialDelay);
|
||||||
containerStatusFuture =
|
containerStatusFuture =
|
||||||
scheduler.executorService.scheduleAtFixedRate(
|
scheduler.executorService.scheduleAtFixedRate(
|
||||||
new ContainerStatusRetriever(scheduler, event.getContainerId(),
|
new ContainerStatusRetriever(scheduler, event.getContainerId(),
|
||||||
this, cancelOnSuccess), 0, 1,
|
this, cancelOnSuccess), initialDelay, 1,
|
||||||
TimeUnit.SECONDS);
|
TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -743,32 +751,44 @@ public ContainerStatus getContainerStatus() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setContainerStatus(ContainerStatus latestStatus) {
|
private void setContainerStatus(ContainerId containerId,
|
||||||
|
ContainerStatus latestStatus) {
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
this.status = latestStatus;
|
this.status = latestStatus;
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Container containerRec =
|
||||||
|
getCompSpec().getContainer(containerId.toString());
|
||||||
|
|
||||||
|
if (containerRec != null) {
|
||||||
|
if (latestStatus != null) {
|
||||||
|
containerRec.setIp(StringUtils.join(",", latestStatus.getIPs()));
|
||||||
|
containerRec.setHostname(latestStatus.getHost());
|
||||||
|
} else {
|
||||||
|
containerRec.setIp(null);
|
||||||
|
containerRec.setHostname(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateContainerStatus(ContainerStatus status) {
|
public void updateContainerStatus(ContainerStatus status) {
|
||||||
setContainerStatus(status);
|
org.apache.hadoop.yarn.service.api.records.Container containerRec =
|
||||||
org.apache.hadoop.yarn.service.api.records.Container container =
|
|
||||||
getCompSpec().getContainer(status.getContainerId().toString());
|
getCompSpec().getContainer(status.getContainerId().toString());
|
||||||
boolean doRegistryUpdate = true;
|
boolean doRegistryUpdate = true;
|
||||||
if (container != null) {
|
if (containerRec != null) {
|
||||||
String existingIP = container.getIp();
|
String existingIP = containerRec.getIp();
|
||||||
String newIP = StringUtils.join(",", status.getIPs());
|
String newIP = StringUtils.join(",", status.getIPs());
|
||||||
container.setIp(newIP);
|
|
||||||
container.setHostname(status.getHost());
|
|
||||||
if (existingIP != null && newIP.equals(existingIP)) {
|
if (existingIP != null && newIP.equals(existingIP)) {
|
||||||
doRegistryUpdate = false;
|
doRegistryUpdate = false;
|
||||||
}
|
}
|
||||||
if (timelineServiceEnabled && doRegistryUpdate) {
|
|
||||||
serviceTimelinePublisher.componentInstanceIPHostUpdated(container);
|
|
||||||
}
|
}
|
||||||
|
setContainerStatus(status.getContainerId(), status);
|
||||||
|
if (containerRec != null && timelineServiceEnabled && doRegistryUpdate) {
|
||||||
|
serviceTimelinePublisher.componentInstanceIPHostUpdated(containerRec);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (doRegistryUpdate) {
|
if (doRegistryUpdate) {
|
||||||
cleanupRegistry(status.getContainerId());
|
cleanupRegistry(status.getContainerId());
|
||||||
LOG.info(
|
LOG.info(
|
||||||
|
@ -363,8 +363,12 @@ public void testRecoverComponentsAfterRMRestart() throws Exception {
|
|||||||
|
|
||||||
Multimap<String, String> containersAfterFailure = waitForAllCompToBeReady(
|
Multimap<String, String> containersAfterFailure = waitForAllCompToBeReady(
|
||||||
client, exampleApp);
|
client, exampleApp);
|
||||||
Assert.assertEquals("component container affected by restart",
|
containersBeforeFailure.keys().forEach(compName -> {
|
||||||
containersBeforeFailure, containersAfterFailure);
|
Assert.assertEquals("num containers after by restart for " + compName,
|
||||||
|
containersBeforeFailure.get(compName).size(),
|
||||||
|
containersAfterFailure.get(compName) == null ? 0 :
|
||||||
|
containersAfterFailure.get(compName).size());
|
||||||
|
});
|
||||||
|
|
||||||
LOG.info("Stop/destroy service {}", exampleApp);
|
LOG.info("Stop/destroy service {}", exampleApp);
|
||||||
client.actionStop(exampleApp.getName(), true);
|
client.actionStop(exampleApp.getName(), true);
|
||||||
|
Loading…
Reference in New Issue
Block a user