diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index 0b5084574e..9fb83e430b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -49,9 +49,6 @@ public class ConnectionManager { private static final Logger LOG = LoggerFactory.getLogger(ConnectionManager.class); - /** Number of parallel new connections to create. */ - protected static final int MAX_NEW_CONNECTIONS = 100; - /** Minimum amount of active connections: 50%. */ protected static final float MIN_ACTIVE_RATIO = 0.5f; @@ -77,8 +74,10 @@ public class ConnectionManager { private final Lock writeLock = readWriteLock.writeLock(); /** Queue for creating new connections. */ - private final BlockingQueue creatorQueue = - new ArrayBlockingQueue<>(MAX_NEW_CONNECTIONS); + private final BlockingQueue creatorQueue; + /** Max size of queue for creating new connections. */ + private final int creatorQueueMaxSize; + /** Create new connections asynchronously. */ private final ConnectionCreator creator; /** Periodic executor to remove stale connection pools. */ @@ -106,7 +105,12 @@ public ConnectionManager(Configuration config) { this.pools = new HashMap<>(); // Create connections in a thread asynchronously - this.creator = new ConnectionCreator(creatorQueue); + this.creatorQueueMaxSize = this.conf.getInt( + RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE, + RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE_DEFAULT + ); + this.creatorQueue = new ArrayBlockingQueue<>(this.creatorQueueMaxSize); + this.creator = new ConnectionCreator(this.creatorQueue); this.creator.setDaemon(true); // Cleanup periods @@ -213,7 +217,7 @@ public ConnectionContext getConnection(UserGroupInformation ugi, if (conn == null || !conn.isUsable()) { if (!this.creatorQueue.offer(pool)) { LOG.error("Cannot add more than {} connections at the same time", - MAX_NEW_CONNECTIONS); + this.creatorQueueMaxSize); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 87df5d2cd5..997e1dda2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -93,6 +93,11 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { TimeUnit.SECONDS.toMillis(5); // HDFS Router NN client + public static final String + DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE = + FEDERATION_ROUTER_PREFIX + "connection.creator.queue-size"; + public static final int + DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE_DEFAULT = 100; public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE = FEDERATION_ROUTER_PREFIX + "connection.pool-size"; public static final int DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 8be5b8ab77..ed39d4b5f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -93,6 +93,14 @@ + + dfs.federation.router.connection.creator.queue-size + 100 + + Size of async connection creator queue. + + + dfs.federation.router.connection.pool-size 1