HADOOP-15129. Datanode caches namenode DNS lookup failure and cannot startup (#3348)
Co-authored-by: Karthik Palaniappan Change-Id: Id079a5319e5e83939d5dcce5fb9ebe3715ee864f
This commit is contained in:
parent
a610f6d9c3
commit
1d808f59d7
@ -655,6 +655,16 @@ private synchronized void setupConnection(
|
|||||||
short timeoutFailures = 0;
|
short timeoutFailures = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
|
if (server.isUnresolved()) {
|
||||||
|
// Jump into the catch block. updateAddress() will re-resolve
|
||||||
|
// the address if this is just a temporary DNS failure. If not,
|
||||||
|
// it will timeout after max ipc client retries
|
||||||
|
throw NetUtils.wrapException(server.getHostName(),
|
||||||
|
server.getPort(),
|
||||||
|
NetUtils.getHostname(),
|
||||||
|
0,
|
||||||
|
new UnknownHostException());
|
||||||
|
}
|
||||||
this.socket = socketFactory.createSocket();
|
this.socket = socketFactory.createSocket();
|
||||||
this.socket.setTcpNoDelay(tcpNoDelay);
|
this.socket.setTcpNoDelay(tcpNoDelay);
|
||||||
this.socket.setKeepAlive(true);
|
this.socket.setKeepAlive(true);
|
||||||
@ -1604,15 +1614,6 @@ Set<ConnectionId> getConnectionIds() {
|
|||||||
private Connection getConnection(ConnectionId remoteId,
|
private Connection getConnection(ConnectionId remoteId,
|
||||||
Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
|
Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final InetSocketAddress address = remoteId.getAddress();
|
|
||||||
if (address.isUnresolved()) {
|
|
||||||
throw NetUtils.wrapException(address.getHostName(),
|
|
||||||
address.getPort(),
|
|
||||||
null,
|
|
||||||
0,
|
|
||||||
new UnknownHostException());
|
|
||||||
}
|
|
||||||
|
|
||||||
final Consumer<Connection> removeMethod = c -> {
|
final Consumer<Connection> removeMethod = c -> {
|
||||||
final boolean removed = connections.remove(remoteId, c);
|
final boolean removed = connections.remove(remoteId, c);
|
||||||
if (removed && connections.isEmpty()) {
|
if (removed && connections.isEmpty()) {
|
||||||
|
@ -47,6 +47,7 @@
|
|||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.net.SocketException;
|
import java.net.SocketException;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
@ -54,6 +55,7 @@
|
|||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.BrokenBarrierException;
|
import java.util.concurrent.BrokenBarrierException;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
@ -88,6 +90,7 @@
|
|||||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
import org.apache.hadoop.test.Whitebox;
|
import org.apache.hadoop.test.Whitebox;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -789,6 +792,55 @@ public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testIpcHostResolutionTimeout() throws Exception {
|
||||||
|
final InetSocketAddress addr = new InetSocketAddress("host.invalid", 80);
|
||||||
|
|
||||||
|
// start client
|
||||||
|
Client.setConnectTimeout(conf, 100);
|
||||||
|
final Client client = new Client(LongWritable.class, conf);
|
||||||
|
// set the rpc timeout to twice the MIN_SLEEP_TIME
|
||||||
|
try {
|
||||||
|
LambdaTestUtils.intercept(UnknownHostException.class,
|
||||||
|
new Callable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws IOException {
|
||||||
|
TestIPC.this.call(client, new LongWritable(RANDOM.nextLong()),
|
||||||
|
addr, MIN_SLEEP_TIME * 2, conf);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
client.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testIpcFlakyHostResolution() throws IOException {
|
||||||
|
// start server
|
||||||
|
Server server = new TestServer(5, false);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
// Leave host unresolved to start. Use "localhost" as opposed
|
||||||
|
// to local IP from NetUtils.getConnectAddress(server) to force
|
||||||
|
// resolution later
|
||||||
|
InetSocketAddress unresolvedAddr = InetSocketAddress.createUnresolved(
|
||||||
|
"localhost", NetUtils.getConnectAddress(server).getPort());
|
||||||
|
|
||||||
|
// start client
|
||||||
|
Client.setConnectTimeout(conf, 100);
|
||||||
|
Client client = new Client(LongWritable.class, conf);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Should re-resolve host and succeed
|
||||||
|
call(client, new LongWritable(RANDOM.nextLong()), unresolvedAddr,
|
||||||
|
MIN_SLEEP_TIME * 2, conf);
|
||||||
|
} finally {
|
||||||
|
client.stop();
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check that reader queueing works
|
* Check that reader queueing works
|
||||||
* @throws BrokenBarrierException
|
* @throws BrokenBarrierException
|
||||||
|
Loading…
Reference in New Issue
Block a user