diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java index 1283604191..75cc9c59a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; +import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException; import org.apache.hadoop.yarn.service.monitor.ServiceMonitor; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; @@ -237,6 +238,7 @@ protected void loadApplicationJson(ServiceContext context, SliderFileSystem fs) throws IOException { context.service = ServiceApiUtil .loadServiceFrom(fs, new Path(serviceDefPath)); + context.service.setState(ServiceState.ACCEPTED); LOG.info(context.service.toString()); } @@ -257,6 +259,41 @@ protected void serviceStop() throws Exception { super.serviceStop(); } + // This method should be called whenever there is an increment or decrement + // of a READY state component of a service + public static synchronized void checkAndUpdateServiceState( + ServiceScheduler scheduler, boolean isIncrement) { + ServiceState curState = scheduler.getApp().getState(); + if (!isIncrement) { + // set it to STARTED every time a component moves out of STABLE state + scheduler.getApp().setState(ServiceState.STARTED); + } else { + // otherwise check the state of all components + boolean isStable = true; + for (org.apache.hadoop.yarn.service.api.records.Component comp : scheduler + .getApp().getComponents()) { + if (comp.getState() != + org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE) { + isStable = false; + break; + } + } + if (isStable) { + scheduler.getApp().setState(ServiceState.STABLE); + } else { + // mark new state as started only if current state is stable, otherwise + // leave it as is + if (curState == ServiceState.STABLE) { + scheduler.getApp().setState(ServiceState.STARTED); + } + } + } + if (curState != scheduler.getApp().getState()) { + LOG.info("Service state changed from {} -> {}", curState, + scheduler.getApp().getState()); + } + } + private void printSystemEnv() { for (Map.Entry envs : System.getenv().entrySet()) { LOG.info("{} = {}", envs.getKey(), envs.getValue()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 26970508de..45cdd28945 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.service.api.ServiceApiConstants; import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ConfigFile; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; @@ -284,6 +285,9 @@ public void serviceStart() throws Exception { } registerServiceInstance(context.attemptId, app); + // Since AM has been started and registered, the service is in STARTED state + app.setState(ServiceState.STARTED); + // recover components based on containers sent from RM recoverComponents(response); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 81c56d2429..d1ccc4f6f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -268,7 +268,8 @@ private long parseNumberOfContainers(Component component, String newNumber) { long ret = orig - Long.parseLong(newNumber.substring(1)); if (ret < 0) { LOG.warn(MessageFormat.format( - "[COMPONENT {}]: component count goes to negative ({}{} = {}), reset it to 0.", + "[COMPONENT {0}]: component count goes to negative ({1}{2} = {3})," + + " ignore and reset it to 0.", component.getName(), orig, newNumber, ret)); ret = 0; } @@ -878,18 +879,23 @@ public String updateLifetime(String serviceName, long lifetime) return newTimeout; } - public ServiceState convertState(FinalApplicationStatus status) { - switch (status) { - case UNDEFINED: + public ServiceState convertState(YarnApplicationState state) { + switch (state) { + case NEW: + case NEW_SAVING: + case SUBMITTED: + case ACCEPTED: return ServiceState.ACCEPTED; - case FAILED: + case RUNNING: + return ServiceState.STARTED; + case FINISHED: case KILLED: - return ServiceState.FAILED; - case ENDED: - case SUCCEEDED: return ServiceState.STOPPED; + case FAILED: + return ServiceState.FAILED; + default: + return ServiceState.ACCEPTED; } - return ServiceState.ACCEPTED; } public String getStatusString(String appId) @@ -917,7 +923,7 @@ public Service getStatus(String serviceName) ApplicationReport appReport = yarnClient.getApplicationReport(currentAppId); Service appSpec = new Service(); appSpec.setName(serviceName); - appSpec.setState(convertState(appReport.getFinalApplicationStatus())); + appSpec.setState(convertState(appReport.getYarnApplicationState())); ApplicationTimeout lifetime = appReport.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME); if (lifetime != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 9c5cbaef86..a84c1b164b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -31,7 +31,9 @@ import org.apache.hadoop.yarn.service.ContainerFailureTracker; import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.ServiceScheduler; +import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; +import org.apache.hadoop.yarn.service.ServiceMaster; import org.apache.hadoop.yarn.service.ServiceMetrics; import org.apache.hadoop.yarn.service.provider.ProviderUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; @@ -209,6 +211,7 @@ public ComponentState transition(Component component, component.createNumCompInstances(delta); component.componentSpec.setState( org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); + component.getScheduler().getApp().setState(ServiceState.STARTED); return FLEXING; } else if (delta < 0){ delta = 0 - delta; @@ -229,14 +232,11 @@ public ComponentState transition(Component component, component.instanceIdCounter.decrementAndGet(); instance.destroy(); } - component.componentSpec.setState( - org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); + checkAndUpdateComponentState(component, false); return STABLE; } else { LOG.info("[FLEX COMPONENT " + component.getName() + "]: already has " + event.getDesired() + " instances, ignoring"); - component.componentSpec.setState( - org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); return STABLE; } } @@ -289,7 +289,7 @@ private static class ContainerStartedTransition implements private static ComponentState checkIfStable(Component component) { // if desired == running - if (component.componentMetrics.containersRunning.value() == component + if (component.componentMetrics.containersReady.value() == component .getComponentSpec().getNumberOfContainers()) { component.componentSpec.setState( org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); @@ -301,6 +301,46 @@ private static ComponentState checkIfStable(Component component) { } } + // This method should be called whenever there is an increment or decrement + // of a READY state container of a component + public static synchronized void checkAndUpdateComponentState( + Component component, boolean isIncrement) { + org.apache.hadoop.yarn.service.api.records.ComponentState curState = + component.componentSpec.getState(); + if (isIncrement) { + // check if all containers are in READY state + if (component.componentMetrics.containersReady + .value() == component.componentMetrics.containersDesired.value()) { + component.componentSpec.setState( + org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); + if (curState != component.componentSpec.getState()) { + LOG.info("[COMPONENT {}] state changed from {} -> {}", + component.componentSpec.getName(), curState, + component.componentSpec.getState()); + } + // component state change will trigger re-check of service state + ServiceMaster.checkAndUpdateServiceState(component.scheduler, + isIncrement); + } + } else { + // container moving out of READY state could be because of FLEX down so + // still need to verify the count before changing the component state + if (component.componentMetrics.containersReady + .value() < component.componentMetrics.containersDesired.value()) { + component.componentSpec.setState( + org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); + if (curState != component.componentSpec.getState()) { + LOG.info("[COMPONENT {}] state changed from {} -> {}", + component.componentSpec.getName(), curState, + component.componentSpec.getState()); + } + // component state change will trigger re-check of service state + ServiceMaster.checkAndUpdateServiceState(component.scheduler, + isIncrement); + } + } + } + private static class ContainerCompletedTransition extends BaseTransition { @Override public void transition(Component component, ComponentEvent event) { @@ -310,6 +350,7 @@ public void transition(Component component, ComponentEvent event) { STOP).setStatus(event.getStatus())); component.componentSpec.setState( org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); + component.getScheduler().getApp().setState(ServiceState.STARTED); } } @@ -472,11 +513,13 @@ public void decRunningContainers() { public void incContainersReady() { componentMetrics.containersReady.incr(); scheduler.getServiceMetrics().containersReady.incr(); + checkAndUpdateComponentState(this, true); } public void decContainersReady() { componentMetrics.containersReady.decr(); scheduler.getServiceMetrics().containersReady.decr(); + checkAndUpdateComponentState(this, false); } public int getNumReadyInstances() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 31fa5c719c..0e3e11bc72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -147,7 +147,6 @@ private static class ContainerStartedTransition extends BaseTransition { new ContainerStatusRetriever(compInstance.scheduler, event.getContainerId(), compInstance), 0, 1, TimeUnit.SECONDS); - compInstance.component.incRunningContainers(); long containerStartTime = System.currentTimeMillis(); try { ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils @@ -171,6 +170,7 @@ private static class ContainerStartedTransition extends BaseTransition { compInstance.containerSpec = container; compInstance.getCompSpec().addContainer(container); compInstance.containerStartedTime = containerStartTime; + compInstance.component.incRunningContainers(); if (compInstance.timelineServiceEnabled) { compInstance.serviceTimelinePublisher @@ -183,8 +183,8 @@ private static class ContainerBecomeReadyTransition extends BaseTransition { @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { - compInstance.component.incContainersReady(); compInstance.containerSpec.setState(ContainerState.READY); + compInstance.component.incContainersReady(); if (compInstance.timelineServiceEnabled) { compInstance.serviceTimelinePublisher .componentInstanceBecomeReady(compInstance.containerSpec); @@ -196,8 +196,8 @@ private static class ContainerBecomeNotReadyTransition extends BaseTransition { @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { - compInstance.component.decContainersReady(); compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY); + compInstance.component.decContainersReady(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 1c517d941b..debab8b8b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; @@ -90,25 +91,25 @@ public void testCreateFlexStopDestroyService() throws Exception { // check app.json is persisted. Assert.assertTrue( getFS().exists(new Path(appDir, exampleApp.getName() + ".json"))); - waitForAllCompToBeReady(client, exampleApp); + waitForServiceToBeStable(client, exampleApp); // Flex two components, each from 2 container to 3 containers. flexComponents(client, exampleApp, 3L); // wait for flex to be completed, increase from 2 to 3 containers. - waitForAllCompToBeReady(client, exampleApp); + waitForServiceToBeStable(client, exampleApp); // check all instances name for each component are in sequential order. checkCompInstancesInOrder(client, exampleApp); // flex down to 1 flexComponents(client, exampleApp, 1L); - waitForAllCompToBeReady(client, exampleApp); + waitForServiceToBeStable(client, exampleApp); checkCompInstancesInOrder(client, exampleApp); // check component dir and registry are cleaned up. // flex up again to 2 flexComponents(client, exampleApp, 2L); - waitForAllCompToBeReady(client, exampleApp); + waitForServiceToBeStable(client, exampleApp); checkCompInstancesInOrder(client, exampleApp); // stop the service @@ -145,7 +146,7 @@ public void testComponentStartOrder() throws Exception { exampleApp.addComponent(compb); client.actionCreate(exampleApp); - waitForAllCompToBeReady(client, exampleApp); + waitForServiceToBeStable(client, exampleApp); // check that containers for compa are launched before containers for compb checkContainerLaunchDependencies(client, exampleApp, "compa", "compb"); @@ -372,6 +373,29 @@ private Multimap waitForAllCompToBeReady(ServiceClient client, return allContainers; } + /** + * Wait until service state becomes stable. A service is stable when all + * requested containers of all components are running and in ready state. + * + * @param client + * @param exampleApp + * @throws TimeoutException + * @throws InterruptedException + */ + private void waitForServiceToBeStable(ServiceClient client, + Service exampleApp) throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(() -> { + try { + Service retrievedApp = client.getStatus(exampleApp.getName()); + System.out.println(retrievedApp); + return retrievedApp.getState() == ServiceState.STABLE; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + }, 2000, 200000); + } + private ServiceClient createClient() throws Exception { ServiceClient client = new ServiceClient() { @Override protected Path addJarResource(String appName,