YARN-10479. RMProxy should retry on SocketTimeout Exceptions. Contributed by Jim Brennan (Jim_Brennan)

This commit is contained in:
Eric E Payne 2020-11-05 21:50:46 +00:00
parent af389d9897
commit 55339c2bdd
2 changed files with 33 additions and 9 deletions

View File

@ -24,6 +24,7 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.NoRouteToHostException; import java.net.NoRouteToHostException;
import java.net.SocketException; import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.HashMap; import java.util.HashMap;
@ -294,6 +295,7 @@ protected static RetryPolicy createRetryPolicy(Configuration conf,
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy); exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
exceptionToPolicyMap.put(RetriableException.class, retryPolicy); exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
exceptionToPolicyMap.put(SocketException.class, retryPolicy); exceptionToPolicyMap.put(SocketException.class, retryPolicy);
exceptionToPolicyMap.put(SocketTimeoutException.class, retryPolicy);
exceptionToPolicyMap.put(StandbyException.class, retryPolicy); exceptionToPolicyMap.put(StandbyException.class, retryPolicy);
// YARN-4288: local IOException is also possible. // YARN-4288: local IOException is also possible.
exceptionToPolicyMap.put(IOException.class, retryPolicy); exceptionToPolicyMap.put(IOException.class, retryPolicy);

View File

@ -405,12 +405,15 @@ private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl {
private final long rmStartIntervalMS; private final long rmStartIntervalMS;
private final boolean rmNeverStart; private final boolean rmNeverStart;
public ResourceTracker resourceTracker; public ResourceTracker resourceTracker;
private final boolean useSocketTimeoutEx;
public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher, public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
long rmStartIntervalMS, boolean rmNeverStart) { long rmStartIntervalMS, boolean rmNeverStart,
boolean useSocketTimeoutEx) {
super(context, dispatcher, healthChecker, metrics); super(context, dispatcher, healthChecker, metrics);
this.rmStartIntervalMS = rmStartIntervalMS; this.rmStartIntervalMS = rmStartIntervalMS;
this.rmNeverStart = rmNeverStart; this.rmNeverStart = rmNeverStart;
this.useSocketTimeoutEx = useSocketTimeoutEx;
} }
@Override @Override
@ -425,7 +428,8 @@ protected ResourceTracker getRMClient() throws IOException {
HAUtil.isHAEnabled(conf)); HAUtil.isHAEnabled(conf));
resourceTracker = resourceTracker =
(ResourceTracker) RetryProxy.create(ResourceTracker.class, (ResourceTracker) RetryProxy.create(ResourceTracker.class,
new MyResourceTracker6(rmStartIntervalMS, rmNeverStart), new MyResourceTracker6(rmStartIntervalMS, rmNeverStart,
useSocketTimeoutEx),
retryPolicy); retryPolicy);
return resourceTracker; return resourceTracker;
} }
@ -859,11 +863,14 @@ private class MyResourceTracker6 implements ResourceTracker {
private long rmStartIntervalMS; private long rmStartIntervalMS;
private boolean rmNeverStart; private boolean rmNeverStart;
private final long waitStartTime; private final long waitStartTime;
private final boolean useSocketTimeoutEx;
public MyResourceTracker6(long rmStartIntervalMS, boolean rmNeverStart) { MyResourceTracker6(long rmStartIntervalMS, boolean rmNeverStart,
boolean useSocketTimeoutEx) {
this.rmStartIntervalMS = rmStartIntervalMS; this.rmStartIntervalMS = rmStartIntervalMS;
this.rmNeverStart = rmNeverStart; this.rmNeverStart = rmNeverStart;
this.waitStartTime = System.currentTimeMillis(); this.waitStartTime = System.currentTimeMillis();
this.useSocketTimeoutEx = useSocketTimeoutEx;
} }
@Override @Override
@ -872,8 +879,13 @@ public RegisterNodeManagerResponse registerNodeManager(
IOException { IOException {
if (System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS if (System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS
|| rmNeverStart) { || rmNeverStart) {
throw new java.net.ConnectException("Faking RM start failure as start " if (useSocketTimeoutEx) {
+ "delay timer has not expired."); throw new java.net.SocketTimeoutException(
"Faking RM start failure as start delay timer has not expired.");
} else {
throw new java.net.ConnectException(
"Faking RM start failure as start delay timer has not expired.");
}
} else { } else {
NodeId nodeId = request.getNodeId(); NodeId nodeId = request.getNodeId();
Resource resource = request.getResource(); Resource resource = request.getResource();
@ -1375,8 +1387,8 @@ protected NodeStatusUpdater createUpdater(Context context,
} }
} }
@Test (timeout = 150000) private void testNMConnectionToRMInternal(boolean useSocketTimeoutEx)
public void testNMConnectionToRM() throws Exception { throws Exception {
final long delta = 50000; final long delta = 50000;
final long connectionWaitMs = 5000; final long connectionWaitMs = 5000;
final long connectionRetryIntervalMs = 1000; final long connectionRetryIntervalMs = 1000;
@ -1395,7 +1407,7 @@ protected NodeStatusUpdater createUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4( NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
context, dispatcher, healthChecker, metrics, context, dispatcher, healthChecker, metrics,
rmStartIntervalMS, true); rmStartIntervalMS, true, useSocketTimeoutEx);
return nodeStatusUpdater; return nodeStatusUpdater;
} }
}; };
@ -1427,7 +1439,7 @@ protected NodeStatusUpdater createUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4( NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
context, dispatcher, healthChecker, metrics, rmStartIntervalMS, context, dispatcher, healthChecker, metrics, rmStartIntervalMS,
false); false, useSocketTimeoutEx);
return nodeStatusUpdater; return nodeStatusUpdater;
} }
}; };
@ -1458,6 +1470,16 @@ protected NodeStatusUpdater createUpdater(Context context,
(duration < (rmStartIntervalMS + delta))); (duration < (rmStartIntervalMS + delta)));
} }
@Test (timeout = 150000)
public void testNMConnectionToRM() throws Exception {
testNMConnectionToRMInternal(false);
}
@Test (timeout = 150000)
public void testNMConnectionToRMwithSocketTimeout() throws Exception {
testNMConnectionToRMInternal(true);
}
/** /**
* Verifies that if for some reason NM fails to start ContainerManager RPC * Verifies that if for some reason NM fails to start ContainerManager RPC
* server, RM is oblivious to NM's presence. The behaviour is like this * server, RM is oblivious to NM's presence. The behaviour is like this