HDFS-17311. RBF: ConnectionManager creatorQueue should offer a pool that is not already in creatorQueue. (#6392) Contributed by liuguanghua.

Reviewed-by: Inigo Goiri <inigoiri@apache.org>
Reviewed-by: Shilun Fan <slfan1989@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
LiuGuH 2024-01-20 07:55:23 +08:00 committed by GitHub
parent 15e1789baf
commit 2a1ee8dfcd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 29 additions and 1 deletions

View File

@ -173,6 +173,11 @@ public void close() {
} }
} }
@VisibleForTesting
public void closeConnectionCreator(){
this.creator.shutdown();
}
/** /**
* Fetches the next available proxy client in the pool. Each client connection * Fetches the next available proxy client in the pool. Each client connection
* is reserved for a single user and cannot be reused until free. * is reserved for a single user and cannot be reused until free.
@ -229,7 +234,7 @@ public ConnectionContext getConnection(UserGroupInformation ugi,
// Add a new connection to the pool if it wasn't usable // Add a new connection to the pool if it wasn't usable
if (conn == null || !conn.isUsable()) { if (conn == null || !conn.isUsable()) {
if (!this.creatorQueue.offer(pool)) { if (!this.creatorQueue.contains(pool) && !this.creatorQueue.offer(pool)) {
LOG.error("Cannot add more than {} connections at the same time", LOG.error("Cannot add more than {} connections at the same time",
this.creatorQueueMaxSize); this.creatorQueueMaxSize);
} }

View File

@ -363,6 +363,29 @@ public void testConfigureConnectionActiveRatio() throws IOException {
testConnectionCleanup(0.8f, 10, 6, 8); testConnectionCleanup(0.8f, 10, 6, 8);
} }
@Test
public void testConnectionCreatorWithSamePool() throws IOException {
Configuration tmpConf = new Configuration();
// Set DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY to 0
// for ensuring a pool will be offered in the creatorQueue
tmpConf.setInt(
RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY, 0);
ConnectionManager tmpConnManager = new ConnectionManager(tmpConf);
tmpConnManager.start();
// Close ConnectionCreator thread to make sure that new connection will not initialize.
tmpConnManager.closeConnectionCreator();
// Create same connection pool for simulating concurrency scenario
for (int i = 0; i < 3; i++) {
tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS,
NamenodeProtocol.class, "ns0");
}
assertEquals(1, tmpConnManager.getNumCreatingConnections());
tmpConnManager.getConnection(TEST_USER2, TEST_NN_ADDRESS,
NamenodeProtocol.class, "ns0");
assertEquals(2, tmpConnManager.getNumCreatingConnections());
}
private void testConnectionCleanup(float ratio, int totalConns, private void testConnectionCleanup(float ratio, int totalConns,
int activeConns, int leftConns) throws IOException { int activeConns, int leftConns) throws IOException {
Configuration tmpConf = new Configuration(); Configuration tmpConf = new Configuration();