diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java index cd44a58568..a3d6714422 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java @@ -59,11 +59,14 @@ public static NeverRestartPolicy getInstance() { return false; } - @Override public boolean isReadyForDownStream(Component component) { - if (hasCompleted(component)) { - return true; + @Override public boolean isReadyForDownStream(Component dependentComponent) { + if (dependentComponent.getNumReadyInstances() + + dependentComponent.getNumSucceededInstances() + + dependentComponent.getNumFailedInstances() + < dependentComponent.getNumDesiredInstances()) { + return false; } - return false; + return true; } @Override public boolean allowUpgrades() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java index b939ba0428..28ebf9ef32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java @@ -65,12 +65,14 @@ public static OnFailureRestartPolicy getInstance() { return false; } - @Override public boolean isReadyForDownStream(Component component) { - if (hasCompletedSuccessfully(component)) { - return true; + @Override public boolean isReadyForDownStream(Component dependentComponent) { + if (dependentComponent.getNumReadyInstances() + + dependentComponent.getNumSucceededInstances() + + dependentComponent.getNumFailedInstances() + < dependentComponent.getNumDesiredInstances()) { + return false; } - - return false; + return true; } @Override public boolean allowUpgrades() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java index 03158cfc18..3e3b4a1a3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java @@ -65,6 +65,7 @@ public void testNeverRestartPolicy() throws Exception { when(component.getNumSucceededInstances()).thenReturn(new Long(1)); when(component.getNumFailedInstances()).thenReturn(new Long(2)); when(component.getNumDesiredInstances()).thenReturn(3); + when(component.getNumReadyInstances()).thenReturn(3); ComponentInstance instance = mock(ComponentInstance.class); when(instance.getComponent()).thenReturn(component); @@ -92,6 +93,7 @@ public void testOnFailureRestartPolicy() throws Exception { when(component.getNumSucceededInstances()).thenReturn(new Long(3)); when(component.getNumFailedInstances()).thenReturn(new Long(0)); when(component.getNumDesiredInstances()).thenReturn(3); + when(component.getNumReadyInstances()).thenReturn(3); ComponentInstance instance = mock(ComponentInstance.class); when(instance.getComponent()).thenReturn(component); @@ -123,7 +125,7 @@ public void testOnFailureRestartPolicy() throws Exception { assertEquals(true, restartPolicy.shouldRelaunchInstance(instance, containerStatus)); - assertEquals(false, restartPolicy.isReadyForDownStream(component)); + assertEquals(true, restartPolicy.isReadyForDownStream(component)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java index 7cef91e25d..c758e6fbea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java @@ -85,11 +85,15 @@ public void testComponentDependency() throws Exception{ exampleApp.setId(applicationId.toString()); exampleApp.setName("testComponentDependency"); exampleApp.addComponent(createComponent("compa", 1, "sleep 1000")); - Component compb = createComponent("compb", 1, "sleep 1000"); - // Let compb depends on compa; - compb.setDependencies(Collections.singletonList("compa")); + Component compb = createComponent("compb", 1, "sleep 1000", Component + .RestartPolicyEnum.ON_FAILURE, Collections.singletonList("compa")); + // Let compb depends on compb; + Component compc = createComponent("compc", 1, "sleep 1000", Component + .RestartPolicyEnum.NEVER, Collections.singletonList("compb")); + exampleApp.addComponent(compb); + exampleApp.addComponent(compc); MockServiceAM am = new MockServiceAM(exampleApp); am.init(conf); @@ -105,6 +109,11 @@ public void testComponentDependency() throws Exception{ // waiting for compb's dependencies are satisfied am.waitForDependenciesSatisfied("compb"); + // feed 1 container to compb, + am.feedContainerToComp(exampleApp, 2, "compb"); + // waiting for compc's dependencies are satisfied + am.waitForDependenciesSatisfied("compc"); + // feed 1 container to compb am.feedContainerToComp(exampleApp, 2, "compb"); am.flexComponent("compa", 2);