HDFS-14114. RBF: MIN_ACTIVE_RATIO should be configurable. Contributed by Fei Hui.

This commit is contained in:
Yiqun Lin 2018-12-05 11:44:38 +08:00 committed by Brahma Reddy Battula
parent 0ca7142c11
commit 94a8dec168
5 changed files with 85 additions and 17 deletions

View File

@ -49,10 +49,6 @@ public class ConnectionManager {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(ConnectionManager.class); LoggerFactory.getLogger(ConnectionManager.class);
/** Minimum amount of active connections: 50%. */
protected static final float MIN_ACTIVE_RATIO = 0.5f;
/** Configuration for the connection manager, pool and sockets. */ /** Configuration for the connection manager, pool and sockets. */
private final Configuration conf; private final Configuration conf;
@ -60,6 +56,8 @@ public class ConnectionManager {
private final int minSize = 1; private final int minSize = 1;
/** Max number of connections per user + nn. */ /** Max number of connections per user + nn. */
private final int maxSize; private final int maxSize;
/** Min ratio of active connections per user + nn. */
private final float minActiveRatio;
/** How often we close a pool for a particular user + nn. */ /** How often we close a pool for a particular user + nn. */
private final long poolCleanupPeriodMs; private final long poolCleanupPeriodMs;
@ -96,10 +94,13 @@ public class ConnectionManager {
public ConnectionManager(Configuration config) { public ConnectionManager(Configuration config) {
this.conf = config; this.conf = config;
// Configure minimum and maximum connection pools // Configure minimum, maximum and active connection pools
this.maxSize = this.conf.getInt( this.maxSize = this.conf.getInt(
RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE, RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE,
RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT); RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT);
this.minActiveRatio = this.conf.getFloat(
RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO,
RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO_DEFAULT);
// Map with the connections indexed by UGI and Namenode // Map with the connections indexed by UGI and Namenode
this.pools = new HashMap<>(); this.pools = new HashMap<>();
@ -203,7 +204,8 @@ public ConnectionContext getConnection(UserGroupInformation ugi,
pool = this.pools.get(connectionId); pool = this.pools.get(connectionId);
if (pool == null) { if (pool == null) {
pool = new ConnectionPool( pool = new ConnectionPool(
this.conf, nnAddress, ugi, this.minSize, this.maxSize, protocol); this.conf, nnAddress, ugi, this.minSize, this.maxSize,
this.minActiveRatio, protocol);
this.pools.put(connectionId, pool); this.pools.put(connectionId, pool);
} }
} finally { } finally {
@ -326,8 +328,9 @@ void cleanup(ConnectionPool pool) {
long timeSinceLastActive = Time.now() - pool.getLastActiveTime(); long timeSinceLastActive = Time.now() - pool.getLastActiveTime();
int total = pool.getNumConnections(); int total = pool.getNumConnections();
int active = pool.getNumActiveConnections(); int active = pool.getNumActiveConnections();
float poolMinActiveRatio = pool.getMinActiveRatio();
if (timeSinceLastActive > connectionCleanupPeriodMs || if (timeSinceLastActive > connectionCleanupPeriodMs ||
active < MIN_ACTIVE_RATIO * total) { active < poolMinActiveRatio * total) {
// Remove and close 1 connection // Remove and close 1 connection
List<ConnectionContext> conns = pool.removeConnections(1); List<ConnectionContext> conns = pool.removeConnections(1);
for (ConnectionContext conn : conns) { for (ConnectionContext conn : conns) {
@ -412,8 +415,9 @@ public void run() {
try { try {
int total = pool.getNumConnections(); int total = pool.getNumConnections();
int active = pool.getNumActiveConnections(); int active = pool.getNumActiveConnections();
float poolMinActiveRatio = pool.getMinActiveRatio();
if (pool.getNumConnections() < pool.getMaxSize() && if (pool.getNumConnections() < pool.getMaxSize() &&
active >= MIN_ACTIVE_RATIO * total) { active >= poolMinActiveRatio * total) {
ConnectionContext conn = pool.newConnection(); ConnectionContext conn = pool.newConnection();
pool.addConnection(conn); pool.addConnection(conn);
} else { } else {

View File

@ -91,6 +91,8 @@ public class ConnectionPool {
private final int minSize; private final int minSize;
/** Max number of connections per user. */ /** Max number of connections per user. */
private final int maxSize; private final int maxSize;
/** Min ratio of active connections per user. */
private final float minActiveRatio;
/** The last time a connection was active. */ /** The last time a connection was active. */
private volatile long lastActiveTime = 0; private volatile long lastActiveTime = 0;
@ -98,7 +100,7 @@ public class ConnectionPool {
protected ConnectionPool(Configuration config, String address, protected ConnectionPool(Configuration config, String address,
UserGroupInformation user, int minPoolSize, int maxPoolSize, UserGroupInformation user, int minPoolSize, int maxPoolSize,
Class<?> proto) throws IOException { float minActiveRatio, Class<?> proto) throws IOException {
this.conf = config; this.conf = config;
@ -112,6 +114,7 @@ protected ConnectionPool(Configuration config, String address,
// Set configuration parameters for the pool // Set configuration parameters for the pool
this.minSize = minPoolSize; this.minSize = minPoolSize;
this.maxSize = maxPoolSize; this.maxSize = maxPoolSize;
this.minActiveRatio = minActiveRatio;
// Add minimum connections to the pool // Add minimum connections to the pool
for (int i=0; i<this.minSize; i++) { for (int i=0; i<this.minSize; i++) {
@ -140,6 +143,15 @@ protected int getMinSize() {
return this.minSize; return this.minSize;
} }
/**
* Get the minimum ratio of active connections in this pool.
*
* @return Minimum ratio of active connections.
*/
protected float getMinActiveRatio() {
return this.minActiveRatio;
}
/** /**
* Get the connection pool identifier. * Get the connection pool identifier.
* *

View File

@ -102,6 +102,11 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
FEDERATION_ROUTER_PREFIX + "connection.creator.queue-size"; FEDERATION_ROUTER_PREFIX + "connection.creator.queue-size";
public static final int public static final int
DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE_DEFAULT = 100; DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE_DEFAULT = 100;
public static final String
DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO =
FEDERATION_ROUTER_PREFIX + "connection.min-active-ratio";
public static final float
DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO_DEFAULT = 0.5f;
public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE = public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE =
FEDERATION_ROUTER_PREFIX + "connection.pool-size"; FEDERATION_ROUTER_PREFIX + "connection.pool-size";
public static final int DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT = public static final int DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT =

View File

@ -117,6 +117,14 @@
</description> </description>
</property> </property>
<property>
<name>dfs.federation.router.connection.min-active-ratio</name>
<value>0.5f</value>
<description>
Minimum active ratio of connections from the router to namenodes.
</description>
</property>
<property> <property>
<name>dfs.federation.router.connection.clean.ms</name> <name>dfs.federation.router.connection.clean.ms</name>
<value>10000</value> <value>10000</value>

View File

@ -80,14 +80,14 @@ public void testCleanup() throws Exception {
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools(); Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
ConnectionPool pool1 = new ConnectionPool( ConnectionPool pool1 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, ClientProtocol.class); conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class);
addConnectionsToPool(pool1, 9, 4); addConnectionsToPool(pool1, 9, 4);
poolMap.put( poolMap.put(
new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
pool1); pool1);
ConnectionPool pool2 = new ConnectionPool( ConnectionPool pool2 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, ClientProtocol.class); conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, 0.5f, ClientProtocol.class);
addConnectionsToPool(pool2, 10, 10); addConnectionsToPool(pool2, 10, 10);
poolMap.put( poolMap.put(
new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, ClientProtocol.class), new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, ClientProtocol.class),
@ -110,7 +110,7 @@ public void testCleanup() throws Exception {
// Make sure the number of connections doesn't go below minSize // Make sure the number of connections doesn't go below minSize
ConnectionPool pool3 = new ConnectionPool( ConnectionPool pool3 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, ClientProtocol.class); conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, 0.5f, ClientProtocol.class);
addConnectionsToPool(pool3, 8, 0); addConnectionsToPool(pool3, 8, 0);
poolMap.put( poolMap.put(
new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, ClientProtocol.class), new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, ClientProtocol.class),
@ -134,7 +134,7 @@ public void testCleanup() throws Exception {
public void testConnectionCreatorWithException() throws Exception { public void testConnectionCreatorWithException() throws Exception {
// Create a bad connection pool pointing to unresolvable namenode address. // Create a bad connection pool pointing to unresolvable namenode address.
ConnectionPool badPool = new ConnectionPool( ConnectionPool badPool = new ConnectionPool(
conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 0, 10, conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f,
ClientProtocol.class); ClientProtocol.class);
BlockingQueue<ConnectionPool> queue = new ArrayBlockingQueue<>(1); BlockingQueue<ConnectionPool> queue = new ArrayBlockingQueue<>(1);
queue.add(badPool); queue.add(badPool);
@ -160,7 +160,7 @@ public void testGetConnectionWithException() throws Exception {
// Create a bad connection pool pointing to unresolvable namenode address. // Create a bad connection pool pointing to unresolvable namenode address.
ConnectionPool badPool = new ConnectionPool( ConnectionPool badPool = new ConnectionPool(
conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 1, 10, conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f,
ClientProtocol.class); ClientProtocol.class);
} }
@ -171,7 +171,7 @@ public void testGetConnection() throws Exception {
int activeConns = 5; int activeConns = 5;
ConnectionPool pool = new ConnectionPool( ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, ClientProtocol.class); conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class);
addConnectionsToPool(pool, totalConns, activeConns); addConnectionsToPool(pool, totalConns, activeConns);
poolMap.put( poolMap.put(
new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
@ -196,7 +196,7 @@ public void testGetConnection() throws Exception {
@Test @Test
public void testValidClientIndex() throws Exception { public void testValidClientIndex() throws Exception {
ConnectionPool pool = new ConnectionPool( ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, ClientProtocol.class); conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, 0.5f, ClientProtocol.class);
for(int i = -3; i <= 3; i++) { for(int i = -3; i <= 3; i++) {
pool.getClientIndex().set(i); pool.getClientIndex().set(i);
ConnectionContext conn = pool.getConnection(); ConnectionContext conn = pool.getConnection();
@ -212,7 +212,7 @@ public void getGetConnectionNamenodeProtocol() throws Exception {
int activeConns = 5; int activeConns = 5;
ConnectionPool pool = new ConnectionPool( ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, NamenodeProtocol.class); conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, NamenodeProtocol.class);
addConnectionsToPool(pool, totalConns, activeConns); addConnectionsToPool(pool, totalConns, activeConns);
poolMap.put( poolMap.put(
new ConnectionPoolId( new ConnectionPoolId(
@ -262,4 +262,43 @@ private void checkPoolConnections(UserGroupInformation ugi,
} }
} }
@Test
public void testConfigureConnectionActiveRatio() throws IOException {
final int totalConns = 10;
int activeConns = 7;
Configuration tmpConf = new Configuration();
// Set dfs.federation.router.connection.min-active-ratio 0.8f
tmpConf.setFloat(
RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO, 0.8f);
ConnectionManager tmpConnManager = new ConnectionManager(tmpConf);
tmpConnManager.start();
// Create one new connection pool
tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS,
NamenodeProtocol.class);
Map<ConnectionPoolId, ConnectionPool> poolMap = tmpConnManager.getPools();
ConnectionPoolId connectionPoolId = new ConnectionPoolId(TEST_USER1,
TEST_NN_ADDRESS, NamenodeProtocol.class);
ConnectionPool pool = poolMap.get(connectionPoolId);
// Test min active ratio is 0.8f
assertEquals(0.8f, pool.getMinActiveRatio(), 0.001f);
pool.getConnection().getClient();
// Test there is one active connection in pool
assertEquals(1, pool.getNumActiveConnections());
// Add other 6 active/9 total connections to pool
addConnectionsToPool(pool, totalConns - 1, activeConns - 1);
// There are 7 active connections.
// The active number is less than totalConns(10) * minActiveRatio(0.8f).
// We can cleanup the pool
tmpConnManager.cleanup(pool);
assertEquals(totalConns - 1, pool.getNumConnections());
tmpConnManager.close();
}
} }