YARN-4132. Separate configs for nodemanager to resourcemanager connection timeout and retries. Contributed by Chang Li
This commit is contained in:
parent
f634505d48
commit
4ac6799d4a
@ -571,6 +571,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
YARN-3980. Plumb resource-utilization info in node heartbeat through to the
|
YARN-3980. Plumb resource-utilization info in node heartbeat through to the
|
||||||
scheduler. (Inigo Goiri via kasha)
|
scheduler. (Inigo Goiri via kasha)
|
||||||
|
|
||||||
|
YARN-4132. Separate configs for nodemanager to resourcemanager connection
|
||||||
|
timeout and retries (Chang Li via jlowe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||||
|
@ -2051,6 +2051,22 @@ private static void addDeprecatedKeys() {
|
|||||||
public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY
|
public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY
|
||||||
.name();
|
.name();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Max time to wait for NM to connection to RM.
|
||||||
|
* When not set, proxy will fall back to use value of
|
||||||
|
* RESOURCEMANAGER_CONNECT_MAX_WAIT_MS.
|
||||||
|
*/
|
||||||
|
public static final String NM_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS =
|
||||||
|
YARN_PREFIX + "nodemanager.resourcemanager.connect.max-wait.ms";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Time interval between each NM attempt to connection to RM.
|
||||||
|
* When not set, proxy will fall back to use value of
|
||||||
|
* RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS.
|
||||||
|
*/
|
||||||
|
public static final String NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS =
|
||||||
|
YARN_PREFIX + "nodemanager.resourcemanager.connect.retry-interval.ms";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Node-labels configurations
|
* Node-labels configurations
|
||||||
*/
|
*/
|
||||||
|
@ -88,7 +88,32 @@ protected static <T> T createRMProxy(final Configuration configuration,
|
|||||||
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
||||||
? (YarnConfiguration) configuration
|
? (YarnConfiguration) configuration
|
||||||
: new YarnConfiguration(configuration);
|
: new YarnConfiguration(configuration);
|
||||||
RetryPolicy retryPolicy = createRetryPolicy(conf);
|
RetryPolicy retryPolicy =
|
||||||
|
createRetryPolicy(conf);
|
||||||
|
return createRMProxy(conf, protocol, instance, retryPolicy);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a proxy for the specified protocol. For non-HA,
|
||||||
|
* this is a direct connection to the ResourceManager address. When HA is
|
||||||
|
* enabled, the proxy handles the failover between the ResourceManagers as
|
||||||
|
* well.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
protected static <T> T createRMProxy(final Configuration configuration,
|
||||||
|
final Class<T> protocol, RMProxy instance, final long retryTime,
|
||||||
|
final long retryInterval) throws IOException {
|
||||||
|
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
||||||
|
? (YarnConfiguration) configuration
|
||||||
|
: new YarnConfiguration(configuration);
|
||||||
|
RetryPolicy retryPolicy =
|
||||||
|
createRetryPolicy(conf, retryTime, retryInterval);
|
||||||
|
return createRMProxy(conf, protocol, instance, retryPolicy);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <T> T createRMProxy(final YarnConfiguration conf,
|
||||||
|
final Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy)
|
||||||
|
throws IOException{
|
||||||
if (HAUtil.isHAEnabled(conf)) {
|
if (HAUtil.isHAEnabled(conf)) {
|
||||||
RMFailoverProxyProvider<T> provider =
|
RMFailoverProxyProvider<T> provider =
|
||||||
instance.createRMFailoverProxyProvider(conf, protocol);
|
instance.createRMFailoverProxyProvider(conf, protocol);
|
||||||
@ -179,6 +204,18 @@ public static RetryPolicy createRetryPolicy(Configuration conf) {
|
|||||||
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
||||||
YarnConfiguration
|
YarnConfiguration
|
||||||
.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
|
.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
|
||||||
|
return createRetryPolicy(
|
||||||
|
conf, rmConnectWaitMS, rmConnectionRetryIntervalMS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch retry policy from Configuration and create the
|
||||||
|
* retry policy with specified retryTime and retry interval.
|
||||||
|
*/
|
||||||
|
private static RetryPolicy createRetryPolicy(Configuration conf,
|
||||||
|
long retryTime, long retryInterval) {
|
||||||
|
long rmConnectWaitMS = retryTime;
|
||||||
|
long rmConnectionRetryIntervalMS = retryInterval;
|
||||||
|
|
||||||
boolean waitForEver = (rmConnectWaitMS == -1);
|
boolean waitForEver = (rmConnectWaitMS == -1);
|
||||||
if (!waitForEver) {
|
if (!waitForEver) {
|
||||||
|
@ -1549,6 +1549,26 @@
|
|||||||
<value>10000</value>
|
<value>10000</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
Max time to wait for NM to connect to RM.
|
||||||
|
When not set, proxy will fall back to use value of
|
||||||
|
yarn.resourcemanager.connect.max-wait.ms.
|
||||||
|
</description>
|
||||||
|
<name>yarn.nodemanager.resourcemanager.connect.max-wait.ms</name>
|
||||||
|
<value></value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
Time interval between each NM attempt to connect to RM.
|
||||||
|
When not set, proxy will fall back to use value of
|
||||||
|
yarn.resourcemanager.connect.retry-interval.ms.
|
||||||
|
</description>
|
||||||
|
<name>yarn.nodemanager.resourcemanager.connect.retry-interval.ms</name>
|
||||||
|
<value></value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>
|
<description>
|
||||||
Maximum number of proxy connections to cache for node managers. If set
|
Maximum number of proxy connections to cache for node managers. If set
|
||||||
|
@ -48,8 +48,26 @@ private ServerRMProxy() {
|
|||||||
*/
|
*/
|
||||||
public static <T> T createRMProxy(final Configuration configuration,
|
public static <T> T createRMProxy(final Configuration configuration,
|
||||||
final Class<T> protocol) throws IOException {
|
final Class<T> protocol) throws IOException {
|
||||||
return createRMProxy(configuration, protocol, INSTANCE);
|
long rmConnectWait =
|
||||||
}
|
configuration.getLong(
|
||||||
|
YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
|
||||||
|
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
|
||||||
|
long rmRetryInterval =
|
||||||
|
configuration.getLong(
|
||||||
|
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
||||||
|
YarnConfiguration
|
||||||
|
.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
|
||||||
|
long nmRmConnectWait =
|
||||||
|
configuration.getLong(
|
||||||
|
YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
|
||||||
|
rmConnectWait);
|
||||||
|
long nmRmRetryInterval =
|
||||||
|
configuration.getLong(
|
||||||
|
YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
||||||
|
rmRetryInterval);
|
||||||
|
return createRMProxy(configuration, protocol, INSTANCE,
|
||||||
|
nmRmConnectWait, nmRmRetryInterval);
|
||||||
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@Override
|
@Override
|
||||||
|
@ -95,6 +95,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
@ -486,6 +487,35 @@ protected void stopRMProxy() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class MyNodeStatusUpdater6 extends NodeStatusUpdaterImpl {
|
||||||
|
|
||||||
|
private final long rmStartIntervalMS;
|
||||||
|
private final boolean rmNeverStart;
|
||||||
|
public ResourceTracker resourceTracker;
|
||||||
|
public MyNodeStatusUpdater6(Context context, Dispatcher dispatcher,
|
||||||
|
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
|
||||||
|
long rmStartIntervalMS, boolean rmNeverStart) {
|
||||||
|
super(context, dispatcher, healthChecker, metrics);
|
||||||
|
this.rmStartIntervalMS = rmStartIntervalMS;
|
||||||
|
this.rmNeverStart = rmNeverStart;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStart() throws Exception {
|
||||||
|
//record the startup time
|
||||||
|
super.serviceStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isTriggered() {
|
||||||
|
return triggered;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void stopRMProxy() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private class MyNodeManager extends NodeManager {
|
private class MyNodeManager extends NodeManager {
|
||||||
|
|
||||||
private MyNodeStatusUpdater3 nodeStatusUpdater;
|
private MyNodeStatusUpdater3 nodeStatusUpdater;
|
||||||
@ -1309,6 +1339,59 @@ protected NodeStatusUpdater createUpdater(Context context,
|
|||||||
+ "Message from ResourceManager: RM Shutting Down Node");
|
+ "Message from ResourceManager: RM Shutting Down Node");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 100000)
|
||||||
|
public void testNMRMConnectionConf() throws Exception {
|
||||||
|
final long delta = 50000;
|
||||||
|
final long nmRmConnectionWaitMs = 100;
|
||||||
|
final long nmRmRetryInterval = 100;
|
||||||
|
final long connectionWaitMs = -1;
|
||||||
|
final long connectionRetryIntervalMs = 1000;
|
||||||
|
//Waiting for rmStartIntervalMS, RM will be started
|
||||||
|
final long rmStartIntervalMS = 2*1000;
|
||||||
|
conf.setLong(YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
|
||||||
|
nmRmConnectionWaitMs);
|
||||||
|
conf.setLong(
|
||||||
|
YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
||||||
|
nmRmRetryInterval);
|
||||||
|
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
|
||||||
|
connectionWaitMs);
|
||||||
|
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
||||||
|
connectionRetryIntervalMs);
|
||||||
|
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
|
||||||
|
1);
|
||||||
|
//Test NM try to connect to RM Several times, but finally fail
|
||||||
|
NodeManagerWithCustomNodeStatusUpdater nmWithUpdater;
|
||||||
|
nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() {
|
||||||
|
@Override
|
||||||
|
protected NodeStatusUpdater createUpdater(Context context,
|
||||||
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
||||||
|
NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater6(
|
||||||
|
context, dispatcher, healthChecker, metrics,
|
||||||
|
rmStartIntervalMS, true);
|
||||||
|
return nodeStatusUpdater;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
nm.init(conf);
|
||||||
|
long waitStartTime = System.currentTimeMillis();
|
||||||
|
try {
|
||||||
|
nm.start();
|
||||||
|
Assert.fail("NM should have failed to start due to RM connect failure");
|
||||||
|
} catch(Exception e) {
|
||||||
|
long t = System.currentTimeMillis();
|
||||||
|
long duration = t - waitStartTime;
|
||||||
|
boolean waitTimeValid = (duration >= nmRmConnectionWaitMs) &&
|
||||||
|
(duration < (connectionWaitMs + delta));
|
||||||
|
|
||||||
|
if(!waitTimeValid) {
|
||||||
|
// throw exception if NM doesn't retry long enough
|
||||||
|
throw new Exception("NM should have tried re-connecting to RM during " +
|
||||||
|
"period of at least " + connectionWaitMs + " ms, but " +
|
||||||
|
"stopped retrying within " + (connectionWaitMs + delta) +
|
||||||
|
" ms: " + e, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout = 150000)
|
@Test (timeout = 150000)
|
||||||
public void testNMConnectionToRM() throws Exception {
|
public void testNMConnectionToRM() throws Exception {
|
||||||
final long delta = 50000;
|
final long delta = 50000;
|
||||||
|
Loading…
Reference in New Issue
Block a user