YARN-8901. Fixed restart policy NEVER/ON_FAILURE with component dependency.
Contributed by Suma Shivaprasad
This commit is contained in:
parent
2e636dd3c4
commit
f5a95f7998
@ -59,11 +59,14 @@ public static NeverRestartPolicy getInstance() {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public boolean isReadyForDownStream(Component component) {
|
@Override public boolean isReadyForDownStream(Component dependentComponent) {
|
||||||
if (hasCompleted(component)) {
|
if (dependentComponent.getNumReadyInstances()
|
||||||
return true;
|
+ dependentComponent.getNumSucceededInstances()
|
||||||
|
+ dependentComponent.getNumFailedInstances()
|
||||||
|
< dependentComponent.getNumDesiredInstances()) {
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
return false;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public boolean allowUpgrades() {
|
@Override public boolean allowUpgrades() {
|
||||||
|
@ -65,12 +65,14 @@ public static OnFailureRestartPolicy getInstance() {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public boolean isReadyForDownStream(Component component) {
|
@Override public boolean isReadyForDownStream(Component dependentComponent) {
|
||||||
if (hasCompletedSuccessfully(component)) {
|
if (dependentComponent.getNumReadyInstances()
|
||||||
return true;
|
+ dependentComponent.getNumSucceededInstances()
|
||||||
|
+ dependentComponent.getNumFailedInstances()
|
||||||
|
< dependentComponent.getNumDesiredInstances()) {
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public boolean allowUpgrades() {
|
@Override public boolean allowUpgrades() {
|
||||||
|
@ -65,6 +65,7 @@ public void testNeverRestartPolicy() throws Exception {
|
|||||||
when(component.getNumSucceededInstances()).thenReturn(new Long(1));
|
when(component.getNumSucceededInstances()).thenReturn(new Long(1));
|
||||||
when(component.getNumFailedInstances()).thenReturn(new Long(2));
|
when(component.getNumFailedInstances()).thenReturn(new Long(2));
|
||||||
when(component.getNumDesiredInstances()).thenReturn(3);
|
when(component.getNumDesiredInstances()).thenReturn(3);
|
||||||
|
when(component.getNumReadyInstances()).thenReturn(3);
|
||||||
|
|
||||||
ComponentInstance instance = mock(ComponentInstance.class);
|
ComponentInstance instance = mock(ComponentInstance.class);
|
||||||
when(instance.getComponent()).thenReturn(component);
|
when(instance.getComponent()).thenReturn(component);
|
||||||
@ -92,6 +93,7 @@ public void testOnFailureRestartPolicy() throws Exception {
|
|||||||
when(component.getNumSucceededInstances()).thenReturn(new Long(3));
|
when(component.getNumSucceededInstances()).thenReturn(new Long(3));
|
||||||
when(component.getNumFailedInstances()).thenReturn(new Long(0));
|
when(component.getNumFailedInstances()).thenReturn(new Long(0));
|
||||||
when(component.getNumDesiredInstances()).thenReturn(3);
|
when(component.getNumDesiredInstances()).thenReturn(3);
|
||||||
|
when(component.getNumReadyInstances()).thenReturn(3);
|
||||||
|
|
||||||
ComponentInstance instance = mock(ComponentInstance.class);
|
ComponentInstance instance = mock(ComponentInstance.class);
|
||||||
when(instance.getComponent()).thenReturn(component);
|
when(instance.getComponent()).thenReturn(component);
|
||||||
@ -123,7 +125,7 @@ public void testOnFailureRestartPolicy() throws Exception {
|
|||||||
assertEquals(true,
|
assertEquals(true,
|
||||||
restartPolicy.shouldRelaunchInstance(instance, containerStatus));
|
restartPolicy.shouldRelaunchInstance(instance, containerStatus));
|
||||||
|
|
||||||
assertEquals(false, restartPolicy.isReadyForDownStream(component));
|
assertEquals(true, restartPolicy.isReadyForDownStream(component));
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -85,11 +85,15 @@ public void testComponentDependency() throws Exception{
|
|||||||
exampleApp.setId(applicationId.toString());
|
exampleApp.setId(applicationId.toString());
|
||||||
exampleApp.setName("testComponentDependency");
|
exampleApp.setName("testComponentDependency");
|
||||||
exampleApp.addComponent(createComponent("compa", 1, "sleep 1000"));
|
exampleApp.addComponent(createComponent("compa", 1, "sleep 1000"));
|
||||||
Component compb = createComponent("compb", 1, "sleep 1000");
|
|
||||||
|
|
||||||
// Let compb depends on compa;
|
// Let compb depends on compa;
|
||||||
compb.setDependencies(Collections.singletonList("compa"));
|
Component compb = createComponent("compb", 1, "sleep 1000", Component
|
||||||
|
.RestartPolicyEnum.ON_FAILURE, Collections.singletonList("compa"));
|
||||||
|
// Let compb depends on compb;
|
||||||
|
Component compc = createComponent("compc", 1, "sleep 1000", Component
|
||||||
|
.RestartPolicyEnum.NEVER, Collections.singletonList("compb"));
|
||||||
|
|
||||||
exampleApp.addComponent(compb);
|
exampleApp.addComponent(compb);
|
||||||
|
exampleApp.addComponent(compc);
|
||||||
|
|
||||||
MockServiceAM am = new MockServiceAM(exampleApp);
|
MockServiceAM am = new MockServiceAM(exampleApp);
|
||||||
am.init(conf);
|
am.init(conf);
|
||||||
@ -105,6 +109,11 @@ public void testComponentDependency() throws Exception{
|
|||||||
// waiting for compb's dependencies are satisfied
|
// waiting for compb's dependencies are satisfied
|
||||||
am.waitForDependenciesSatisfied("compb");
|
am.waitForDependenciesSatisfied("compb");
|
||||||
|
|
||||||
|
// feed 1 container to compb,
|
||||||
|
am.feedContainerToComp(exampleApp, 2, "compb");
|
||||||
|
// waiting for compc's dependencies are satisfied
|
||||||
|
am.waitForDependenciesSatisfied("compc");
|
||||||
|
|
||||||
// feed 1 container to compb
|
// feed 1 container to compb
|
||||||
am.feedContainerToComp(exampleApp, 2, "compb");
|
am.feedContainerToComp(exampleApp, 2, "compb");
|
||||||
am.flexComponent("compa", 2);
|
am.flexComponent("compa", 2);
|
||||||
|
Loading…
Reference in New Issue
Block a user