diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index f26177d0e8..3e84372275 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -519,6 +519,8 @@ Release 2.7.0 - UNRELEASED HADOOP-11441. Hadoop-azure: Change few methods scope to public. (Shashank Khandelwal via cnauroth) + HADOOP-9137. Support connection limiting in IPC server (kihwal) + OPTIMIZATIONS HADOOP-11323. WritableComparator#compare keeps reference to byte array. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index 36989bd68d..00c8d78bf7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -227,6 +227,11 @@ public class CommonConfigurationKeysPublic { "ipc.server.tcpnodelay"; /** Default value for IPC_SERVER_TCPNODELAY_KEY */ public static final boolean IPC_SERVER_TCPNODELAY_DEFAULT = true; + /** See core-default.xml */ + public static final String IPC_SERVER_MAX_CONNECTIONS_KEY = + "ipc.server.max.connections"; + /** Default value for IPC_SERVER_MAX_CONNECTIONS_KEY */ + public static final int IPC_SERVER_MAX_CONNECTIONS_DEFAULT = 0; /** See core-default.xml */ public static final String HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY = diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index e508d4e01f..905c7dbe17 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -749,6 +749,13 @@ void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOf Reader reader = getReader(); Connection c = connectionManager.register(channel); + // If the connectionManager can't take it, close the connection. + if (c == null) { + if (channel.isOpen()) { + IOUtils.cleanup(null, channel); + } + continue; + } key.attach(c); // so closeCurrentConnection can get the object reader.addConnection(c); } @@ -2731,6 +2738,7 @@ private class ConnectionManager { final private int idleScanInterval; final private int maxIdleTime; final private int maxIdleToClose; + final private int maxConnections; ConnectionManager() { this.idleScanTimer = new Timer( @@ -2747,6 +2755,9 @@ private class ConnectionManager { this.maxIdleToClose = conf.getInt( CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY, CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT); + this.maxConnections = conf.getInt( + CommonConfigurationKeysPublic.IPC_SERVER_MAX_CONNECTIONS_KEY, + CommonConfigurationKeysPublic.IPC_SERVER_MAX_CONNECTIONS_DEFAULT); // create a set with concurrency -and- a thread-safe iterator, add 2 // for listener and idle closer threads this.connections = Collections.newSetFromMap( @@ -2774,11 +2785,19 @@ int size() { return count.get(); } + boolean isFull() { + // The check is disabled when maxConnections <= 0. + return ((maxConnections > 0) && (size() >= maxConnections)); + } + Connection[] toArray() { return connections.toArray(new Connection[0]); } Connection register(SocketChannel channel) { + if (isFull()) { + return null; + } Connection connection = new Connection(channel, Time.now()); add(connection); if (LOG.isDebugEnabled()) { diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index bebc2633c1..c11669d11c 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1677,4 +1677,15 @@ for ldap providers in the same way as above does. + + ipc.server.max.connections + 0 + The maximum number of concurrent connections a server is allowed + to accept. If this limit is exceeded, incoming connections will first fill + the listen queue and then may go to an OS-specific listen overflow queue. + The client may fail or timeout, but the server can avoid running out of file + descriptors using this feature. 0 means no limit. + + + diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 02516a183a..04a74120c8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -1184,6 +1184,57 @@ public void run() { } } + @Test + public void testMaxConnections() throws Exception { + conf.setInt("ipc.server.max.connections", 5); + Server server = null; + Thread connectors[] = new Thread[10]; + + try { + server = new TestServer(3, false); + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + assertEquals(0, server.getNumOpenConnections()); + + for (int i = 0; i < 10; i++) { + connectors[i] = new Thread() { + @Override + public void run() { + Socket sock = null; + try { + sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); + NetUtils.connect(sock, addr, 3000); + try { + Thread.sleep(4000); + } catch (InterruptedException ie) { } + } catch (IOException ioe) { + } finally { + if (sock != null) { + try { + sock.close(); + } catch (IOException ioe) { } + } + } + } + }; + connectors[i].start(); + } + + Thread.sleep(1000); + // server should only accept up to 5 connections + assertEquals(5, server.getNumOpenConnections()); + + for (int i = 0; i < 10; i++) { + connectors[i].join(); + } + } finally { + if (server != null) { + server.stop(); + } + conf.setInt("ipc.server.max.connections", 0); + } + } + private void assertRetriesOnSocketTimeouts(Configuration conf, int maxTimeoutRetries) throws IOException { SocketFactory mockFactory = Mockito.mock(SocketFactory.class);