HADOOP-18024. SocketChannel is not closed when IOException happens in Server$Listener.doAccept (#3719)

This commit is contained in:
Haoze Wu 2021-12-08 04:48:43 -05:00 committed by GitHub
parent 53edd0de5a
commit 6ed01585eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 4 deletions

View File

@ -1324,6 +1324,14 @@ public String toString() {
} }
} }
@VisibleForTesting
@InterfaceAudience.Private
protected void configureSocketChannel(SocketChannel channel) throws IOException {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(true);
}
/** Listens on the socket. Creates jobs for the handler threads*/ /** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread { private class Listener extends Thread {
@ -1530,15 +1538,24 @@ private void closeCurrentConnection(SelectionKey key, Throwable e) {
InetSocketAddress getAddress() { InetSocketAddress getAddress() {
return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress(); return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
} }
void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError { void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel(); ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel; SocketChannel channel;
while ((channel = server.accept()) != null) { while ((channel = server.accept()) != null) {
channel.configureBlocking(false); try {
channel.socket().setTcpNoDelay(tcpNoDelay); configureSocketChannel(channel);
channel.socket().setKeepAlive(true); } catch (IOException e) {
LOG.warn("Error in an accepted SocketChannel", e);
try {
channel.socket().close();
channel.close();
} catch (IOException ex) {
LOG.warn("Error in closing SocketChannel", ex);
}
continue;
}
Reader reader = getReader(); Reader reader = getReader();
Connection c = connectionManager.register(channel, Connection c = connectionManager.register(channel,

View File

@ -34,6 +34,7 @@
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -48,6 +49,7 @@
import java.net.SocketException; import java.net.SocketException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.channels.SocketChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -616,6 +618,51 @@ public void testIOEOnWriteAfterPingClient() throws Exception {
WRITABLE_FAULTS_SLEEP = 0; WRITABLE_FAULTS_SLEEP = 0;
} }
} }
/**
* Test for HADOOP-18024.
*/
@Test(timeout=60000)
public void testIOEOnListenerAccept() throws Exception {
// start server
Server server = new TestServer(1, false,
LongWritable.class, LongWritable.class) {
@Override
protected void configureSocketChannel(SocketChannel channel) throws IOException {
maybeThrowIOE();
super.configureSocketChannel(channel);
}
};
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
// start client
WRITABLE_FAULTS_ENABLED = true;
Client client = new Client(LongWritable.class, conf);
try {
LongWritable param = LongWritable.class.newInstance();
try {
call(client, param, addr, 0, conf);
fail("Expected an exception to have been thrown");
} catch (EOFException e) {
LOG.info("Got expected exception", e);
} catch (Throwable t) {
LOG.warn("Got unexpected error", t);
fail("Expected an EOFException to have been thrown");
}
// Doing a second call with faults disabled should return fine --
// ie the internal state of the client or server should not be broken
// by the failed call
WRITABLE_FAULTS_ENABLED = false;
call(client, param, addr, 0, conf);
} finally {
client.stop();
server.stop();
}
}
private static void assertExceptionContains( private static void assertExceptionContains(
Throwable t, String substring) { Throwable t, String substring) {