HDFS-13634. RBF: Configurable value in xml for async connection request queue size. Contributed by CR Hota.

This commit is contained in:
Yiqun Lin 2018-08-29 16:15:22 +08:00
parent 0bd4217194
commit a0ebb6b39f
3 changed files with 24 additions and 7 deletions

View File

@ -49,9 +49,6 @@ public class ConnectionManager {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(ConnectionManager.class); LoggerFactory.getLogger(ConnectionManager.class);
/** Number of parallel new connections to create. */
protected static final int MAX_NEW_CONNECTIONS = 100;
/** Minimum amount of active connections: 50%. */ /** Minimum amount of active connections: 50%. */
protected static final float MIN_ACTIVE_RATIO = 0.5f; protected static final float MIN_ACTIVE_RATIO = 0.5f;
@ -77,8 +74,10 @@ public class ConnectionManager {
private final Lock writeLock = readWriteLock.writeLock(); private final Lock writeLock = readWriteLock.writeLock();
/** Queue for creating new connections. */ /** Queue for creating new connections. */
private final BlockingQueue<ConnectionPool> creatorQueue = private final BlockingQueue<ConnectionPool> creatorQueue;
new ArrayBlockingQueue<>(MAX_NEW_CONNECTIONS); /** Max size of queue for creating new connections. */
private final int creatorQueueMaxSize;
/** Create new connections asynchronously. */ /** Create new connections asynchronously. */
private final ConnectionCreator creator; private final ConnectionCreator creator;
/** Periodic executor to remove stale connection pools. */ /** Periodic executor to remove stale connection pools. */
@ -106,7 +105,12 @@ public ConnectionManager(Configuration config) {
this.pools = new HashMap<>(); this.pools = new HashMap<>();
// Create connections in a thread asynchronously // Create connections in a thread asynchronously
this.creator = new ConnectionCreator(creatorQueue); this.creatorQueueMaxSize = this.conf.getInt(
RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE,
RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE_DEFAULT
);
this.creatorQueue = new ArrayBlockingQueue<>(this.creatorQueueMaxSize);
this.creator = new ConnectionCreator(this.creatorQueue);
this.creator.setDaemon(true); this.creator.setDaemon(true);
// Cleanup periods // Cleanup periods
@ -213,7 +217,7 @@ public ConnectionContext getConnection(UserGroupInformation ugi,
if (conn == null || !conn.isUsable()) { if (conn == null || !conn.isUsable()) {
if (!this.creatorQueue.offer(pool)) { if (!this.creatorQueue.offer(pool)) {
LOG.error("Cannot add more than {} connections at the same time", LOG.error("Cannot add more than {} connections at the same time",
MAX_NEW_CONNECTIONS); this.creatorQueueMaxSize);
} }
} }

View File

@ -93,6 +93,11 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
TimeUnit.SECONDS.toMillis(5); TimeUnit.SECONDS.toMillis(5);
// HDFS Router NN client // HDFS Router NN client
public static final String
DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE =
FEDERATION_ROUTER_PREFIX + "connection.creator.queue-size";
public static final int
DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE_DEFAULT = 100;
public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE = public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE =
FEDERATION_ROUTER_PREFIX + "connection.pool-size"; FEDERATION_ROUTER_PREFIX + "connection.pool-size";
public static final int DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT = public static final int DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT =

View File

@ -93,6 +93,14 @@
</description> </description>
</property> </property>
<property>
<name>dfs.federation.router.connection.creator.queue-size</name>
<value>100</value>
<description>
Size of async connection creator queue.
</description>
</property>
<property> <property>
<name>dfs.federation.router.connection.pool-size</name> <name>dfs.federation.router.connection.pool-size</name>
<value>1</value> <value>1</value>