diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index 8982a15a91..2ffc5f0b5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -173,6 +173,11 @@ public void close() { } } + @VisibleForTesting + public void closeConnectionCreator(){ + this.creator.shutdown(); + } + /** * Fetches the next available proxy client in the pool. Each client connection * is reserved for a single user and cannot be reused until free. @@ -229,7 +234,7 @@ public ConnectionContext getConnection(UserGroupInformation ugi, // Add a new connection to the pool if it wasn't usable if (conn == null || !conn.isUsable()) { - if (!this.creatorQueue.offer(pool)) { + if (!this.creatorQueue.contains(pool) && !this.creatorQueue.offer(pool)) { LOG.error("Cannot add more than {} connections at the same time", this.creatorQueueMaxSize); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java index 920c9c4e51..97c55836a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java @@ -363,6 +363,29 @@ public void testConfigureConnectionActiveRatio() throws IOException { testConnectionCleanup(0.8f, 10, 6, 8); } + @Test + public void testConnectionCreatorWithSamePool() throws IOException { + Configuration tmpConf = new Configuration(); + // Set DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY to 0 + // for ensuring a pool will be offered in the creatorQueue + tmpConf.setInt( + RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY, 0); + ConnectionManager tmpConnManager = new ConnectionManager(tmpConf); + tmpConnManager.start(); + // Close ConnectionCreator thread to make sure that new connection will not initialize. + tmpConnManager.closeConnectionCreator(); + // Create same connection pool for simulating concurrency scenario + for (int i = 0; i < 3; i++) { + tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, + NamenodeProtocol.class, "ns0"); + } + assertEquals(1, tmpConnManager.getNumCreatingConnections()); + + tmpConnManager.getConnection(TEST_USER2, TEST_NN_ADDRESS, + NamenodeProtocol.class, "ns0"); + assertEquals(2, tmpConnManager.getNumCreatingConnections()); + } + private void testConnectionCleanup(float ratio, int totalConns, int activeConns, int leftConns) throws IOException { Configuration tmpConf = new Configuration();