YARN-7616. Map YARN application status to Service Status more accurately. (Contributed by Gour Saha)

This commit is contained in:
Eric Yang 2017-12-19 19:14:45 -05:00
parent 94a2ac6b71
commit 41b581012a
6 changed files with 137 additions and 23 deletions

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException; import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
import org.apache.hadoop.yarn.service.monitor.ServiceMonitor; import org.apache.hadoop.yarn.service.monitor.ServiceMonitor;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
@ -237,6 +238,7 @@ protected void loadApplicationJson(ServiceContext context,
SliderFileSystem fs) throws IOException { SliderFileSystem fs) throws IOException {
context.service = ServiceApiUtil context.service = ServiceApiUtil
.loadServiceFrom(fs, new Path(serviceDefPath)); .loadServiceFrom(fs, new Path(serviceDefPath));
context.service.setState(ServiceState.ACCEPTED);
LOG.info(context.service.toString()); LOG.info(context.service.toString());
} }
@ -257,6 +259,41 @@ protected void serviceStop() throws Exception {
super.serviceStop(); super.serviceStop();
} }
// This method should be called whenever there is an increment or decrement
// of a READY state component of a service
public static synchronized void checkAndUpdateServiceState(
ServiceScheduler scheduler, boolean isIncrement) {
ServiceState curState = scheduler.getApp().getState();
if (!isIncrement) {
// set it to STARTED every time a component moves out of STABLE state
scheduler.getApp().setState(ServiceState.STARTED);
} else {
// otherwise check the state of all components
boolean isStable = true;
for (org.apache.hadoop.yarn.service.api.records.Component comp : scheduler
.getApp().getComponents()) {
if (comp.getState() !=
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE) {
isStable = false;
break;
}
}
if (isStable) {
scheduler.getApp().setState(ServiceState.STABLE);
} else {
// mark new state as started only if current state is stable, otherwise
// leave it as is
if (curState == ServiceState.STABLE) {
scheduler.getApp().setState(ServiceState.STARTED);
}
}
}
if (curState != scheduler.getApp().getState()) {
LOG.info("Service state changed from {} -> {}", curState,
scheduler.getApp().getState());
}
}
private void printSystemEnv() { private void printSystemEnv() {
for (Map.Entry<String, String> envs : System.getenv().entrySet()) { for (Map.Entry<String, String> envs : System.getenv().entrySet()) {
LOG.info("{} = {}", envs.getKey(), envs.getValue()); LOG.info("{} = {}", envs.getKey(), envs.getValue());

View File

@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.service.api.ServiceApiConstants; import org.apache.hadoop.yarn.service.api.ServiceApiConstants;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.api.records.ConfigFile; import org.apache.hadoop.yarn.service.api.records.ConfigFile;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
@ -284,6 +285,9 @@ public void serviceStart() throws Exception {
} }
registerServiceInstance(context.attemptId, app); registerServiceInstance(context.attemptId, app);
// Since AM has been started and registered, the service is in STARTED state
app.setState(ServiceState.STARTED);
// recover components based on containers sent from RM // recover components based on containers sent from RM
recoverComponents(response); recoverComponents(response);

View File

@ -268,7 +268,8 @@ private long parseNumberOfContainers(Component component, String newNumber) {
long ret = orig - Long.parseLong(newNumber.substring(1)); long ret = orig - Long.parseLong(newNumber.substring(1));
if (ret < 0) { if (ret < 0) {
LOG.warn(MessageFormat.format( LOG.warn(MessageFormat.format(
"[COMPONENT {}]: component count goes to negative ({}{} = {}), reset it to 0.", "[COMPONENT {0}]: component count goes to negative ({1}{2} = {3}),"
+ " ignore and reset it to 0.",
component.getName(), orig, newNumber, ret)); component.getName(), orig, newNumber, ret));
ret = 0; ret = 0;
} }
@ -878,18 +879,23 @@ public String updateLifetime(String serviceName, long lifetime)
return newTimeout; return newTimeout;
} }
public ServiceState convertState(FinalApplicationStatus status) { public ServiceState convertState(YarnApplicationState state) {
switch (status) { switch (state) {
case UNDEFINED: case NEW:
case NEW_SAVING:
case SUBMITTED:
case ACCEPTED:
return ServiceState.ACCEPTED; return ServiceState.ACCEPTED;
case FAILED: case RUNNING:
return ServiceState.STARTED;
case FINISHED:
case KILLED: case KILLED:
return ServiceState.FAILED;
case ENDED:
case SUCCEEDED:
return ServiceState.STOPPED; return ServiceState.STOPPED;
case FAILED:
return ServiceState.FAILED;
default:
return ServiceState.ACCEPTED;
} }
return ServiceState.ACCEPTED;
} }
public String getStatusString(String appId) public String getStatusString(String appId)
@ -917,7 +923,7 @@ public Service getStatus(String serviceName)
ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId); ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId);
Service appSpec = new Service(); Service appSpec = new Service();
appSpec.setName(serviceName); appSpec.setName(serviceName);
appSpec.setState(convertState(appReport.getFinalApplicationStatus())); appSpec.setState(convertState(appReport.getYarnApplicationState()));
ApplicationTimeout lifetime = ApplicationTimeout lifetime =
appReport.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME); appReport.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME);
if (lifetime != null) { if (lifetime != null) {

View File

@ -31,7 +31,9 @@
import org.apache.hadoop.yarn.service.ContainerFailureTracker; import org.apache.hadoop.yarn.service.ContainerFailureTracker;
import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.ServiceMaster;
import org.apache.hadoop.yarn.service.ServiceMetrics; import org.apache.hadoop.yarn.service.ServiceMetrics;
import org.apache.hadoop.yarn.service.provider.ProviderUtils; import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@ -209,6 +211,7 @@ public ComponentState transition(Component component,
component.createNumCompInstances(delta); component.createNumCompInstances(delta);
component.componentSpec.setState( component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
component.getScheduler().getApp().setState(ServiceState.STARTED);
return FLEXING; return FLEXING;
} else if (delta < 0){ } else if (delta < 0){
delta = 0 - delta; delta = 0 - delta;
@ -229,14 +232,11 @@ public ComponentState transition(Component component,
component.instanceIdCounter.decrementAndGet(); component.instanceIdCounter.decrementAndGet();
instance.destroy(); instance.destroy();
} }
component.componentSpec.setState( checkAndUpdateComponentState(component, false);
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
return STABLE; return STABLE;
} else { } else {
LOG.info("[FLEX COMPONENT " + component.getName() + "]: already has " + LOG.info("[FLEX COMPONENT " + component.getName() + "]: already has " +
event.getDesired() + " instances, ignoring"); event.getDesired() + " instances, ignoring");
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
return STABLE; return STABLE;
} }
} }
@ -289,7 +289,7 @@ private static class ContainerStartedTransition implements
private static ComponentState checkIfStable(Component component) { private static ComponentState checkIfStable(Component component) {
// if desired == running // if desired == running
if (component.componentMetrics.containersRunning.value() == component if (component.componentMetrics.containersReady.value() == component
.getComponentSpec().getNumberOfContainers()) { .getComponentSpec().getNumberOfContainers()) {
component.componentSpec.setState( component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
@ -301,6 +301,46 @@ private static ComponentState checkIfStable(Component component) {
} }
} }
// This method should be called whenever there is an increment or decrement
// of a READY state container of a component
public static synchronized void checkAndUpdateComponentState(
Component component, boolean isIncrement) {
org.apache.hadoop.yarn.service.api.records.ComponentState curState =
component.componentSpec.getState();
if (isIncrement) {
// check if all containers are in READY state
if (component.componentMetrics.containersReady
.value() == component.componentMetrics.containersDesired.value()) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
if (curState != component.componentSpec.getState()) {
LOG.info("[COMPONENT {}] state changed from {} -> {}",
component.componentSpec.getName(), curState,
component.componentSpec.getState());
}
// component state change will trigger re-check of service state
ServiceMaster.checkAndUpdateServiceState(component.scheduler,
isIncrement);
}
} else {
// container moving out of READY state could be because of FLEX down so
// still need to verify the count before changing the component state
if (component.componentMetrics.containersReady
.value() < component.componentMetrics.containersDesired.value()) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
if (curState != component.componentSpec.getState()) {
LOG.info("[COMPONENT {}] state changed from {} -> {}",
component.componentSpec.getName(), curState,
component.componentSpec.getState());
}
// component state change will trigger re-check of service state
ServiceMaster.checkAndUpdateServiceState(component.scheduler,
isIncrement);
}
}
}
private static class ContainerCompletedTransition extends BaseTransition { private static class ContainerCompletedTransition extends BaseTransition {
@Override @Override
public void transition(Component component, ComponentEvent event) { public void transition(Component component, ComponentEvent event) {
@ -310,6 +350,7 @@ public void transition(Component component, ComponentEvent event) {
STOP).setStatus(event.getStatus())); STOP).setStatus(event.getStatus()));
component.componentSpec.setState( component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
component.getScheduler().getApp().setState(ServiceState.STARTED);
} }
} }
@ -472,11 +513,13 @@ public void decRunningContainers() {
public void incContainersReady() { public void incContainersReady() {
componentMetrics.containersReady.incr(); componentMetrics.containersReady.incr();
scheduler.getServiceMetrics().containersReady.incr(); scheduler.getServiceMetrics().containersReady.incr();
checkAndUpdateComponentState(this, true);
} }
public void decContainersReady() { public void decContainersReady() {
componentMetrics.containersReady.decr(); componentMetrics.containersReady.decr();
scheduler.getServiceMetrics().containersReady.decr(); scheduler.getServiceMetrics().containersReady.decr();
checkAndUpdateComponentState(this, false);
} }
public int getNumReadyInstances() { public int getNumReadyInstances() {

View File

@ -147,7 +147,6 @@ private static class ContainerStartedTransition extends BaseTransition {
new ContainerStatusRetriever(compInstance.scheduler, new ContainerStatusRetriever(compInstance.scheduler,
event.getContainerId(), compInstance), 0, 1, event.getContainerId(), compInstance), 0, 1,
TimeUnit.SECONDS); TimeUnit.SECONDS);
compInstance.component.incRunningContainers();
long containerStartTime = System.currentTimeMillis(); long containerStartTime = System.currentTimeMillis();
try { try {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
@ -171,6 +170,7 @@ private static class ContainerStartedTransition extends BaseTransition {
compInstance.containerSpec = container; compInstance.containerSpec = container;
compInstance.getCompSpec().addContainer(container); compInstance.getCompSpec().addContainer(container);
compInstance.containerStartedTime = containerStartTime; compInstance.containerStartedTime = containerStartTime;
compInstance.component.incRunningContainers();
if (compInstance.timelineServiceEnabled) { if (compInstance.timelineServiceEnabled) {
compInstance.serviceTimelinePublisher compInstance.serviceTimelinePublisher
@ -183,8 +183,8 @@ private static class ContainerBecomeReadyTransition extends BaseTransition {
@Override @Override
public void transition(ComponentInstance compInstance, public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) { ComponentInstanceEvent event) {
compInstance.component.incContainersReady();
compInstance.containerSpec.setState(ContainerState.READY); compInstance.containerSpec.setState(ContainerState.READY);
compInstance.component.incContainersReady();
if (compInstance.timelineServiceEnabled) { if (compInstance.timelineServiceEnabled) {
compInstance.serviceTimelinePublisher compInstance.serviceTimelinePublisher
.componentInstanceBecomeReady(compInstance.containerSpec); .componentInstanceBecomeReady(compInstance.containerSpec);
@ -196,8 +196,8 @@ private static class ContainerBecomeNotReadyTransition extends BaseTransition {
@Override @Override
public void transition(ComponentInstance compInstance, public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) { ComponentInstanceEvent event) {
compInstance.component.decContainersReady();
compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY); compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY);
compInstance.component.decContainersReady();
} }
} }

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.ContainerState;
@ -90,25 +91,25 @@ public void testCreateFlexStopDestroyService() throws Exception {
// check app.json is persisted. // check app.json is persisted.
Assert.assertTrue( Assert.assertTrue(
getFS().exists(new Path(appDir, exampleApp.getName() + ".json"))); getFS().exists(new Path(appDir, exampleApp.getName() + ".json")));
waitForAllCompToBeReady(client, exampleApp); waitForServiceToBeStable(client, exampleApp);
// Flex two components, each from 2 container to 3 containers. // Flex two components, each from 2 container to 3 containers.
flexComponents(client, exampleApp, 3L); flexComponents(client, exampleApp, 3L);
// wait for flex to be completed, increase from 2 to 3 containers. // wait for flex to be completed, increase from 2 to 3 containers.
waitForAllCompToBeReady(client, exampleApp); waitForServiceToBeStable(client, exampleApp);
// check all instances name for each component are in sequential order. // check all instances name for each component are in sequential order.
checkCompInstancesInOrder(client, exampleApp); checkCompInstancesInOrder(client, exampleApp);
// flex down to 1 // flex down to 1
flexComponents(client, exampleApp, 1L); flexComponents(client, exampleApp, 1L);
waitForAllCompToBeReady(client, exampleApp); waitForServiceToBeStable(client, exampleApp);
checkCompInstancesInOrder(client, exampleApp); checkCompInstancesInOrder(client, exampleApp);
// check component dir and registry are cleaned up. // check component dir and registry are cleaned up.
// flex up again to 2 // flex up again to 2
flexComponents(client, exampleApp, 2L); flexComponents(client, exampleApp, 2L);
waitForAllCompToBeReady(client, exampleApp); waitForServiceToBeStable(client, exampleApp);
checkCompInstancesInOrder(client, exampleApp); checkCompInstancesInOrder(client, exampleApp);
// stop the service // stop the service
@ -145,7 +146,7 @@ public void testComponentStartOrder() throws Exception {
exampleApp.addComponent(compb); exampleApp.addComponent(compb);
client.actionCreate(exampleApp); client.actionCreate(exampleApp);
waitForAllCompToBeReady(client, exampleApp); waitForServiceToBeStable(client, exampleApp);
// check that containers for compa are launched before containers for compb // check that containers for compa are launched before containers for compb
checkContainerLaunchDependencies(client, exampleApp, "compa", "compb"); checkContainerLaunchDependencies(client, exampleApp, "compa", "compb");
@ -372,6 +373,29 @@ private Multimap<String, String> waitForAllCompToBeReady(ServiceClient client,
return allContainers; return allContainers;
} }
/**
* Wait until service state becomes stable. A service is stable when all
* requested containers of all components are running and in ready state.
*
* @param client
* @param exampleApp
* @throws TimeoutException
* @throws InterruptedException
*/
private void waitForServiceToBeStable(ServiceClient client,
Service exampleApp) throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
try {
Service retrievedApp = client.getStatus(exampleApp.getName());
System.out.println(retrievedApp);
return retrievedApp.getState() == ServiceState.STABLE;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}, 2000, 200000);
}
private ServiceClient createClient() throws Exception { private ServiceClient createClient() throws Exception {
ServiceClient client = new ServiceClient() { ServiceClient client = new ServiceClient() {
@Override protected Path addJarResource(String appName, @Override protected Path addJarResource(String appName,