YARN-8360. Improve YARN service restart policy and node manager auto restart policy.
Contributed by Suma Shivaprasad
This commit is contained in:
parent
bbe2f6225e
commit
84d7bf1eef
@ -79,4 +79,9 @@ public static AlwaysRestartPolicy getInstance() {
|
|||||||
@Override public boolean shouldTerminate(Component component) {
|
@Override public boolean shouldTerminate(Component component) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public boolean allowContainerRetriesForInstance(
|
||||||
|
ComponentInstance componentInstance) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -42,4 +42,6 @@ boolean shouldRelaunchInstance(ComponentInstance componentInstance,
|
|||||||
|
|
||||||
boolean shouldTerminate(Component component);
|
boolean shouldTerminate(Component component);
|
||||||
|
|
||||||
|
boolean allowContainerRetriesForInstance(ComponentInstance componentInstance);
|
||||||
|
|
||||||
}
|
}
|
@ -79,4 +79,9 @@ public static NeverRestartPolicy getInstance() {
|
|||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public boolean allowContainerRetriesForInstance(
|
||||||
|
ComponentInstance componentInstance) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -84,4 +84,9 @@ public static OnFailureRestartPolicy getInstance() {
|
|||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public boolean allowContainerRetriesForInstance(
|
||||||
|
ComponentInstance componentInstance) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||||
|
import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
|
||||||
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
|
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
|
||||||
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
|
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
|
||||||
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
|
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
|
||||||
@ -116,18 +117,22 @@ public void buildContainerLaunchCommand(AbstractLauncher launcher,
|
|||||||
|
|
||||||
public void buildContainerRetry(AbstractLauncher launcher,
|
public void buildContainerRetry(AbstractLauncher launcher,
|
||||||
Configuration yarnConf,
|
Configuration yarnConf,
|
||||||
ContainerLaunchService.ComponentLaunchContext compLaunchContext) {
|
ContainerLaunchService.ComponentLaunchContext compLaunchContext,
|
||||||
|
ComponentInstance instance) {
|
||||||
// By default retry forever every 30 seconds
|
// By default retry forever every 30 seconds
|
||||||
launcher.setRetryContext(
|
|
||||||
YarnServiceConf.getInt(CONTAINER_RETRY_MAX,
|
ComponentRestartPolicy restartPolicy = instance.getComponent()
|
||||||
DEFAULT_CONTAINER_RETRY_MAX,
|
.getRestartPolicyHandler();
|
||||||
compLaunchContext.getConfiguration(), yarnConf),
|
if (restartPolicy.allowContainerRetriesForInstance(instance)) {
|
||||||
YarnServiceConf.getInt(CONTAINER_RETRY_INTERVAL,
|
launcher.setRetryContext(YarnServiceConf
|
||||||
DEFAULT_CONTAINER_RETRY_INTERVAL,
|
.getInt(CONTAINER_RETRY_MAX, DEFAULT_CONTAINER_RETRY_MAX,
|
||||||
compLaunchContext.getConfiguration(), yarnConf),
|
compLaunchContext.getConfiguration(), yarnConf), YarnServiceConf
|
||||||
YarnServiceConf.getLong(CONTAINER_FAILURES_VALIDITY_INTERVAL,
|
.getInt(CONTAINER_RETRY_INTERVAL, DEFAULT_CONTAINER_RETRY_INTERVAL,
|
||||||
DEFAULT_CONTAINER_FAILURES_VALIDITY_INTERVAL,
|
compLaunchContext.getConfiguration(), yarnConf), YarnServiceConf
|
||||||
compLaunchContext.getConfiguration(), yarnConf));
|
.getLong(CONTAINER_FAILURES_VALIDITY_INTERVAL,
|
||||||
|
DEFAULT_CONTAINER_FAILURES_VALIDITY_INTERVAL,
|
||||||
|
compLaunchContext.getConfiguration(), yarnConf));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void buildContainerLaunchContext(AbstractLauncher launcher,
|
public void buildContainerLaunchContext(AbstractLauncher launcher,
|
||||||
@ -161,6 +166,6 @@ public void buildContainerLaunchContext(AbstractLauncher launcher,
|
|||||||
yarnConf, container, compLaunchContext, tokensForSubstitution);
|
yarnConf, container, compLaunchContext, tokensForSubstitution);
|
||||||
|
|
||||||
// Setup container retry settings
|
// Setup container retry settings
|
||||||
buildContainerRetry(launcher, yarnConf, compLaunchContext);
|
buildContainerRetry(launcher, yarnConf, compLaunchContext, instance);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -115,7 +115,7 @@ public static Service createTerminatingJobExample(String serviceName) {
|
|||||||
exampleApp.setName(serviceName);
|
exampleApp.setName(serviceName);
|
||||||
exampleApp.setVersion("v1");
|
exampleApp.setVersion("v1");
|
||||||
exampleApp.addComponent(
|
exampleApp.addComponent(
|
||||||
createComponent("terminating-comp1", 2, "sleep " + "1000",
|
createComponent("terminating-comp1", 2, "sleep 1000",
|
||||||
Component.RestartPolicyEnum.NEVER, null));
|
Component.RestartPolicyEnum.NEVER, null));
|
||||||
exampleApp.addComponent(
|
exampleApp.addComponent(
|
||||||
createComponent("terminating-comp2", 2, "sleep 1000",
|
createComponent("terminating-comp2", 2, "sleep 1000",
|
||||||
|
@ -19,13 +19,33 @@
|
|||||||
package org.apache.hadoop.yarn.service.containerlaunch;
|
package org.apache.hadoop.yarn.service.containerlaunch;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.service.ServiceContext;
|
import org.apache.hadoop.yarn.service.ServiceContext;
|
||||||
|
import org.apache.hadoop.yarn.service.api.records.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.service.component.AlwaysRestartPolicy;
|
||||||
|
import org.apache.hadoop.yarn.service.component.Component;
|
||||||
|
import org.apache.hadoop.yarn.service.component.NeverRestartPolicy;
|
||||||
|
import org.apache.hadoop.yarn.service.component.OnFailureRestartPolicy;
|
||||||
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||||
|
import org.apache.hadoop.yarn.service.provider.defaultImpl
|
||||||
|
.DefaultProviderService;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fi.FiConfig.getConfig;
|
||||||
|
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf
|
||||||
|
.DEFAULT_CONTAINER_FAILURES_VALIDITY_INTERVAL;
|
||||||
|
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf
|
||||||
|
.DEFAULT_CONTAINER_RETRY_INTERVAL;
|
||||||
|
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf
|
||||||
|
.DEFAULT_CONTAINER_RETRY_MAX;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.reset;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests for {@link AbstractLauncher}.
|
* Tests for {@link AbstractLauncher}.
|
||||||
@ -51,4 +71,50 @@ public void testDockerContainerMounts() throws IOException {
|
|||||||
|
|
||||||
Assert.assertEquals("s1:t1:ro,s2:t2:ro", dockerContainerMounts);
|
Assert.assertEquals("s1:t1:ro,s2:t2:ro", dockerContainerMounts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerRetries() throws Exception {
|
||||||
|
|
||||||
|
DefaultProviderService providerService = new DefaultProviderService();
|
||||||
|
AbstractLauncher mockLauncher = mock(AbstractLauncher.class);
|
||||||
|
ContainerLaunchService.ComponentLaunchContext componentLaunchContext =
|
||||||
|
mock(ContainerLaunchService.ComponentLaunchContext.class);
|
||||||
|
|
||||||
|
ComponentInstance componentInstance = mock(ComponentInstance.class);
|
||||||
|
|
||||||
|
//Never Restart Policy
|
||||||
|
Component component = mock(Component.class);
|
||||||
|
when(componentInstance.getComponent()).thenReturn(component);
|
||||||
|
|
||||||
|
when(component.getRestartPolicyHandler()).thenReturn(NeverRestartPolicy
|
||||||
|
.getInstance());
|
||||||
|
|
||||||
|
providerService.buildContainerRetry(mockLauncher, getConfig(),
|
||||||
|
componentLaunchContext, componentInstance);
|
||||||
|
verifyZeroInteractions(mockLauncher);
|
||||||
|
|
||||||
|
|
||||||
|
//OnFailure restart policy
|
||||||
|
when(component.getRestartPolicyHandler()).thenReturn(OnFailureRestartPolicy
|
||||||
|
.getInstance());
|
||||||
|
when(componentLaunchContext.getConfiguration()).thenReturn(new
|
||||||
|
Configuration());
|
||||||
|
providerService.buildContainerRetry(mockLauncher, getConfig(),
|
||||||
|
componentLaunchContext, componentInstance);
|
||||||
|
verify(mockLauncher).setRetryContext(DEFAULT_CONTAINER_RETRY_MAX,
|
||||||
|
DEFAULT_CONTAINER_RETRY_INTERVAL,
|
||||||
|
DEFAULT_CONTAINER_FAILURES_VALIDITY_INTERVAL);
|
||||||
|
|
||||||
|
reset(mockLauncher);
|
||||||
|
|
||||||
|
//Always restart policy
|
||||||
|
when(component.getRestartPolicyHandler()).thenReturn(AlwaysRestartPolicy
|
||||||
|
.getInstance());
|
||||||
|
providerService.buildContainerRetry(mockLauncher, getConfig(),
|
||||||
|
componentLaunchContext, componentInstance);
|
||||||
|
|
||||||
|
verify(mockLauncher).setRetryContext(DEFAULT_CONTAINER_RETRY_MAX,
|
||||||
|
DEFAULT_CONTAINER_RETRY_INTERVAL,
|
||||||
|
DEFAULT_CONTAINER_FAILURES_VALIDITY_INTERVAL);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user