YARN-5016. Add support for a minimum retry interval for container retries. Contributed by Jun Gong.
This commit is contained in:
parent
93258459fa
commit
0287c49107
@ -893,6 +893,11 @@ public static boolean isAclEnabled(Configuration conf) {
|
|||||||
NM_PREFIX + "container-diagnostics-maximum-size";
|
NM_PREFIX + "container-diagnostics-maximum-size";
|
||||||
public static final int DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE = 10000;
|
public static final int DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE = 10000;
|
||||||
|
|
||||||
|
/** Minimum container restart interval. */
|
||||||
|
public static final String NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS =
|
||||||
|
NM_PREFIX + "container-retry-minimum-interval-ms";
|
||||||
|
public static final int DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS = 1000;
|
||||||
|
|
||||||
/** Interval at which the delayed token removal thread runs */
|
/** Interval at which the delayed token removal thread runs */
|
||||||
public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
|
public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
|
||||||
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
|
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
|
||||||
|
@ -1588,6 +1588,12 @@
|
|||||||
<value>10000</value>
|
<value>10000</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Minimum container restart interval in milliseconds.</description>
|
||||||
|
<name>yarn.nodemanager.container-retry-minimum-interval-ms</name>
|
||||||
|
<value>1000</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Max number of threads in NMClientAsync to process container
|
<description>Max number of threads in NMClientAsync to process container
|
||||||
management events</description>
|
management events</description>
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -155,6 +156,16 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
|
|||||||
this.containerRetryContext = ContainerRetryContext.NEVER_RETRY_CONTEXT;
|
this.containerRetryContext = ContainerRetryContext.NEVER_RETRY_CONTEXT;
|
||||||
}
|
}
|
||||||
this.remainingRetryAttempts = containerRetryContext.getMaxRetries();
|
this.remainingRetryAttempts = containerRetryContext.getMaxRetries();
|
||||||
|
int minimumRestartInterval = conf.getInt(
|
||||||
|
YarnConfiguration.NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS,
|
||||||
|
YarnConfiguration.DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS);
|
||||||
|
if (containerRetryContext.getRetryPolicy()
|
||||||
|
!= ContainerRetryPolicy.NEVER_RETRY
|
||||||
|
&& containerRetryContext.getRetryInterval() < minimumRestartInterval) {
|
||||||
|
LOG.info("Set restart interval to minimum value " + minimumRestartInterval
|
||||||
|
+ "ms for container " + containerTokenIdentifier.getContainerID());
|
||||||
|
this.containerRetryContext.setRetryInterval(minimumRestartInterval);
|
||||||
|
}
|
||||||
this.diagnosticsMaxSize = conf.getInt(
|
this.diagnosticsMaxSize = conf.getInt(
|
||||||
YarnConfiguration.NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE,
|
YarnConfiguration.NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE,
|
||||||
YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE);
|
YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE);
|
||||||
@ -1368,4 +1379,9 @@ private static boolean shouldBeUploadedToSharedCache(ContainerImpl container,
|
|||||||
LocalResourceRequest resource) {
|
LocalResourceRequest resource) {
|
||||||
return container.resourcesUploadPolicies.get(resource);
|
return container.resourcesUploadPolicies.get(resource);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
ContainerRetryContext getContainerRetryContext() {
|
||||||
|
return containerRetryContext;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -725,6 +725,40 @@ private void testContainerRetry(ContainerRetryContext containerRetryContext,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerRestartInterval() throws IOException {
|
||||||
|
conf.setInt(YarnConfiguration.NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS, 2000);
|
||||||
|
|
||||||
|
ContainerRetryContext containerRetryContext1 = ContainerRetryContext
|
||||||
|
.newInstance(ContainerRetryPolicy.NEVER_RETRY, null, 3, 0);
|
||||||
|
testContainerRestartInterval(containerRetryContext1, 0);
|
||||||
|
|
||||||
|
ContainerRetryContext containerRetryContext2 = ContainerRetryContext
|
||||||
|
.newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 3, 0);
|
||||||
|
testContainerRestartInterval(containerRetryContext2, 2000);
|
||||||
|
|
||||||
|
ContainerRetryContext containerRetryContext3 = ContainerRetryContext
|
||||||
|
.newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 3, 4000);
|
||||||
|
testContainerRestartInterval(containerRetryContext3, 4000);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testContainerRestartInterval(
|
||||||
|
ContainerRetryContext containerRetryContext,
|
||||||
|
int expectedRestartInterval) throws IOException {
|
||||||
|
WrappedContainer wc = null;
|
||||||
|
try {
|
||||||
|
wc = new WrappedContainer(25, 314159265358980L, 4345,
|
||||||
|
"yak", containerRetryContext);
|
||||||
|
Assert.assertEquals(
|
||||||
|
((ContainerImpl)wc.c).getContainerRetryContext().getRetryInterval(),
|
||||||
|
expectedRestartInterval);
|
||||||
|
} finally {
|
||||||
|
if (wc != null) {
|
||||||
|
wc.finished();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void verifyCleanupCall(WrappedContainer wc) throws Exception {
|
private void verifyCleanupCall(WrappedContainer wc) throws Exception {
|
||||||
ResourcesReleasedMatcher matchesReq =
|
ResourcesReleasedMatcher matchesReq =
|
||||||
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
|
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
|
||||||
|
Loading…
Reference in New Issue
Block a user