Fixed TimelineClient to retry SocketTimeoutException too. Contributed by Xuan Gong.
This commit is contained in:
parent
d8b729e16f
commit
477003730e
@ -24,6 +24,7 @@
|
||||
import java.net.ConnectException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
@ -116,7 +117,9 @@ public class TimelineClientImpl extends TimelineClient {
|
||||
TimelineClientConnectionRetry connectionRetry;
|
||||
|
||||
// Abstract class for an operation that should be retried by timeline client
|
||||
private static abstract class TimelineClientRetryOp {
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public static abstract class TimelineClientRetryOp {
|
||||
// The operation that should be retried
|
||||
public abstract Object run() throws IOException;
|
||||
// The method to indicate if we should retry given the incoming exception
|
||||
@ -449,27 +452,8 @@ private Object operateDelegationToken(
|
||||
final PrivilegedExceptionAction<?> action)
|
||||
throws IOException, YarnException {
|
||||
// Set up the retry operation
|
||||
TimelineClientRetryOp tokenRetryOp = new TimelineClientRetryOp() {
|
||||
|
||||
@Override
|
||||
public Object run() throws IOException {
|
||||
// Try pass the request, if fail, keep retrying
|
||||
authUgi.checkTGTAndReloginFromKeytab();
|
||||
try {
|
||||
return authUgi.doAs(action);
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
throw new IOException(e.getCause());
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldRetryOn(Exception e) {
|
||||
// Only retry on connection exceptions
|
||||
return (e instanceof ConnectException);
|
||||
}
|
||||
};
|
||||
TimelineClientRetryOp tokenRetryOp =
|
||||
createTimelineClientRetryOpForOperateDelegationToken(action);
|
||||
|
||||
return connectionRetry.retryOn(tokenRetryOp);
|
||||
}
|
||||
@ -680,4 +664,50 @@ public void putDomain(ApplicationAttemptId appAttemptId,
|
||||
public void setTimelineWriter(TimelineWriter writer) {
|
||||
this.timelineWriter = writer;
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public TimelineClientRetryOp
|
||||
createTimelineClientRetryOpForOperateDelegationToken(
|
||||
final PrivilegedExceptionAction<?> action) throws IOException {
|
||||
return new TimelineClientRetryOpForOperateDelegationToken(
|
||||
this.authUgi, action);
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public class TimelineClientRetryOpForOperateDelegationToken
|
||||
extends TimelineClientRetryOp {
|
||||
|
||||
private final UserGroupInformation authUgi;
|
||||
private final PrivilegedExceptionAction<?> action;
|
||||
|
||||
public TimelineClientRetryOpForOperateDelegationToken(
|
||||
UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
|
||||
this.authUgi = authUgi;
|
||||
this.action = action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object run() throws IOException {
|
||||
// Try pass the request, if fail, keep retrying
|
||||
authUgi.checkTGTAndReloginFromKeytab();
|
||||
try {
|
||||
return authUgi.doAs(action);
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
throw new IOException(e.getCause());
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldRetryOn(Exception e) {
|
||||
// retry on connection exceptions
|
||||
// and SocketTimeoutException
|
||||
return (e instanceof ConnectException
|
||||
|| e instanceof SocketTimeoutException);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -27,7 +27,9 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
@ -234,6 +236,8 @@ public void testDelegationTokenOperationsRetry() throws Exception {
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
|
||||
TimelineClientImpl client = createTimelineClient(conf);
|
||||
TimelineClientImpl clientFake =
|
||||
createTimelineClientFakeTimelineClientRetryOp(conf);
|
||||
TestTimlineDelegationTokenSecretManager dtManager =
|
||||
new TestTimlineDelegationTokenSecretManager();
|
||||
try {
|
||||
@ -278,8 +282,24 @@ public void testDelegationTokenOperationsRetry() throws Exception {
|
||||
} catch (RuntimeException ce) {
|
||||
assertException(client, ce);
|
||||
}
|
||||
|
||||
// Test DelegationTokenOperationsRetry on SocketTimeoutException
|
||||
try {
|
||||
TimelineDelegationTokenIdentifier timelineDT =
|
||||
new TimelineDelegationTokenIdentifier(
|
||||
new Text("tester"), new Text("tester"), new Text("tester"));
|
||||
clientFake.cancelDelegationToken(
|
||||
new Token<TimelineDelegationTokenIdentifier>(timelineDT.getBytes(),
|
||||
dtManager.createPassword(timelineDT),
|
||||
timelineDT.getKind(),
|
||||
new Text("0.0.0.0:8188")));
|
||||
assertFail();
|
||||
} catch (RuntimeException ce) {
|
||||
assertException(clientFake, ce);
|
||||
}
|
||||
} finally {
|
||||
client.stop();
|
||||
clientFake.stop();
|
||||
dtManager.stopThreads();
|
||||
}
|
||||
}
|
||||
@ -393,6 +413,27 @@ protected TimelineWriter createTimelineWriter(Configuration conf,
|
||||
return client;
|
||||
}
|
||||
|
||||
private TimelineClientImpl createTimelineClientFakeTimelineClientRetryOp(
|
||||
YarnConfiguration conf) {
|
||||
TimelineClientImpl client = new TimelineClientImpl() {
|
||||
|
||||
@Override
|
||||
public TimelineClientRetryOp
|
||||
createTimelineClientRetryOpForOperateDelegationToken(
|
||||
final PrivilegedExceptionAction<?> action) throws IOException {
|
||||
TimelineClientRetryOpForOperateDelegationToken op =
|
||||
spy(new TimelineClientRetryOpForOperateDelegationToken(
|
||||
UserGroupInformation.getCurrentUser(), action));
|
||||
doThrow(new SocketTimeoutException("Test socketTimeoutException"))
|
||||
.when(op).run();
|
||||
return op;
|
||||
}
|
||||
};
|
||||
client.init(conf);
|
||||
client.start();
|
||||
return client;
|
||||
}
|
||||
|
||||
private static class TestTimlineDelegationTokenSecretManager extends
|
||||
AbstractDelegationTokenSecretManager<TimelineDelegationTokenIdentifier> {
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user