From 6ed01585eb1929497efbafe2f19bda4f1a56575c Mon Sep 17 00:00:00 2001 From: Haoze Wu <18595686+functioner@users.noreply.github.com> Date: Wed, 8 Dec 2021 04:48:43 -0500 Subject: [PATCH] HADOOP-18024. SocketChannel is not closed when IOException happens in Server$Listener.doAccept (#3719) --- .../java/org/apache/hadoop/ipc/Server.java | 25 ++++++++-- .../java/org/apache/hadoop/ipc/TestIPC.java | 47 +++++++++++++++++++ 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 7a391c4994..78529565a5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -1324,6 +1324,14 @@ public String toString() { } } + @VisibleForTesting + @InterfaceAudience.Private + protected void configureSocketChannel(SocketChannel channel) throws IOException { + channel.configureBlocking(false); + channel.socket().setTcpNoDelay(tcpNoDelay); + channel.socket().setKeepAlive(true); + } + /** Listens on the socket. Creates jobs for the handler threads*/ private class Listener extends Thread { @@ -1530,15 +1538,24 @@ private void closeCurrentConnection(SelectionKey key, Throwable e) { InetSocketAddress getAddress() { return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress(); } - + void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { - channel.configureBlocking(false); - channel.socket().setTcpNoDelay(tcpNoDelay); - channel.socket().setKeepAlive(true); + try { + configureSocketChannel(channel); + } catch (IOException e) { + LOG.warn("Error in an accepted SocketChannel", e); + try { + channel.socket().close(); + channel.close(); + } catch (IOException ex) { + LOG.warn("Error in closing SocketChannel", ex); + } + continue; + } Reader reader = getReader(); Connection c = connectionManager.register(channel, diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 95ff302103..90415b49bc 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -34,6 +34,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; +import java.io.EOFException; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -48,6 +49,7 @@ import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; +import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -616,6 +618,51 @@ public void testIOEOnWriteAfterPingClient() throws Exception { WRITABLE_FAULTS_SLEEP = 0; } } + + /** + * Test for HADOOP-18024. + */ + @Test(timeout=60000) + public void testIOEOnListenerAccept() throws Exception { + // start server + Server server = new TestServer(1, false, + LongWritable.class, LongWritable.class) { + @Override + protected void configureSocketChannel(SocketChannel channel) throws IOException { + maybeThrowIOE(); + super.configureSocketChannel(channel); + } + }; + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + + // start client + WRITABLE_FAULTS_ENABLED = true; + Client client = new Client(LongWritable.class, conf); + try { + LongWritable param = LongWritable.class.newInstance(); + + try { + call(client, param, addr, 0, conf); + fail("Expected an exception to have been thrown"); + } catch (EOFException e) { + LOG.info("Got expected exception", e); + } catch (Throwable t) { + LOG.warn("Got unexpected error", t); + fail("Expected an EOFException to have been thrown"); + } + + // Doing a second call with faults disabled should return fine -- + // ie the internal state of the client or server should not be broken + // by the failed call + WRITABLE_FAULTS_ENABLED = false; + call(client, param, addr, 0, conf); + + } finally { + client.stop(); + server.stop(); + } + } private static void assertExceptionContains( Throwable t, String substring) {