YARN-10479. RMProxy should retry on SocketTimeout Exceptions. Contributed by Jim Brennan (Jim_Brennan)
(cherry picked from commit 55339c2bdd
)
This commit is contained in:
parent
c6fee0a2c8
commit
1e22929885
@ -24,6 +24,7 @@
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.NoRouteToHostException;
|
||||
import java.net.SocketException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.HashMap;
|
||||
@ -294,6 +295,7 @@ protected static RetryPolicy createRetryPolicy(Configuration conf,
|
||||
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(SocketTimeoutException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(StandbyException.class, retryPolicy);
|
||||
// YARN-4288: local IOException is also possible.
|
||||
exceptionToPolicyMap.put(IOException.class, retryPolicy);
|
||||
|
@ -398,12 +398,15 @@ private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl {
|
||||
private final long rmStartIntervalMS;
|
||||
private final boolean rmNeverStart;
|
||||
public ResourceTracker resourceTracker;
|
||||
private final boolean useSocketTimeoutEx;
|
||||
public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher,
|
||||
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
|
||||
long rmStartIntervalMS, boolean rmNeverStart) {
|
||||
long rmStartIntervalMS, boolean rmNeverStart,
|
||||
boolean useSocketTimeoutEx) {
|
||||
super(context, dispatcher, healthChecker, metrics);
|
||||
this.rmStartIntervalMS = rmStartIntervalMS;
|
||||
this.rmNeverStart = rmNeverStart;
|
||||
this.useSocketTimeoutEx = useSocketTimeoutEx;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -418,7 +421,8 @@ protected ResourceTracker getRMClient() throws IOException {
|
||||
HAUtil.isHAEnabled(conf));
|
||||
resourceTracker =
|
||||
(ResourceTracker) RetryProxy.create(ResourceTracker.class,
|
||||
new MyResourceTracker6(rmStartIntervalMS, rmNeverStart),
|
||||
new MyResourceTracker6(rmStartIntervalMS, rmNeverStart,
|
||||
useSocketTimeoutEx),
|
||||
retryPolicy);
|
||||
return resourceTracker;
|
||||
}
|
||||
@ -852,11 +856,14 @@ private class MyResourceTracker6 implements ResourceTracker {
|
||||
private long rmStartIntervalMS;
|
||||
private boolean rmNeverStart;
|
||||
private final long waitStartTime;
|
||||
private final boolean useSocketTimeoutEx;
|
||||
|
||||
public MyResourceTracker6(long rmStartIntervalMS, boolean rmNeverStart) {
|
||||
MyResourceTracker6(long rmStartIntervalMS, boolean rmNeverStart,
|
||||
boolean useSocketTimeoutEx) {
|
||||
this.rmStartIntervalMS = rmStartIntervalMS;
|
||||
this.rmNeverStart = rmNeverStart;
|
||||
this.waitStartTime = System.currentTimeMillis();
|
||||
this.useSocketTimeoutEx = useSocketTimeoutEx;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -865,8 +872,13 @@ public RegisterNodeManagerResponse registerNodeManager(
|
||||
IOException {
|
||||
if (System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS
|
||||
|| rmNeverStart) {
|
||||
throw new java.net.ConnectException("Faking RM start failure as start "
|
||||
+ "delay timer has not expired.");
|
||||
if (useSocketTimeoutEx) {
|
||||
throw new java.net.SocketTimeoutException(
|
||||
"Faking RM start failure as start delay timer has not expired.");
|
||||
} else {
|
||||
throw new java.net.ConnectException(
|
||||
"Faking RM start failure as start delay timer has not expired.");
|
||||
}
|
||||
} else {
|
||||
NodeId nodeId = request.getNodeId();
|
||||
Resource resource = request.getResource();
|
||||
@ -1368,8 +1380,8 @@ protected NodeStatusUpdater createUpdater(Context context,
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 150000)
|
||||
public void testNMConnectionToRM() throws Exception {
|
||||
private void testNMConnectionToRMInternal(boolean useSocketTimeoutEx)
|
||||
throws Exception {
|
||||
final long delta = 50000;
|
||||
final long connectionWaitMs = 5000;
|
||||
final long connectionRetryIntervalMs = 1000;
|
||||
@ -1388,7 +1400,7 @@ protected NodeStatusUpdater createUpdater(Context context,
|
||||
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
||||
NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
|
||||
context, dispatcher, healthChecker, metrics,
|
||||
rmStartIntervalMS, true);
|
||||
rmStartIntervalMS, true, useSocketTimeoutEx);
|
||||
return nodeStatusUpdater;
|
||||
}
|
||||
};
|
||||
@ -1420,7 +1432,7 @@ protected NodeStatusUpdater createUpdater(Context context,
|
||||
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
||||
NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
|
||||
context, dispatcher, healthChecker, metrics, rmStartIntervalMS,
|
||||
false);
|
||||
false, useSocketTimeoutEx);
|
||||
return nodeStatusUpdater;
|
||||
}
|
||||
};
|
||||
@ -1451,6 +1463,16 @@ protected NodeStatusUpdater createUpdater(Context context,
|
||||
(duration < (rmStartIntervalMS + delta)));
|
||||
}
|
||||
|
||||
@Test (timeout = 150000)
|
||||
public void testNMConnectionToRM() throws Exception {
|
||||
testNMConnectionToRMInternal(false);
|
||||
}
|
||||
|
||||
@Test (timeout = 150000)
|
||||
public void testNMConnectionToRMwithSocketTimeout() throws Exception {
|
||||
testNMConnectionToRMInternal(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that if for some reason NM fails to start ContainerManager RPC
|
||||
* server, RM is oblivious to NM's presence. The behaviour is like this
|
||||
|
Loading…
Reference in New Issue
Block a user