diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index 9fb83e430b..fa2bf944dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -393,7 +393,7 @@ public class ConnectionManager { /** * Thread that creates connections asynchronously. */ - private static class ConnectionCreator extends Thread { + static class ConnectionCreator extends Thread { /** If the creator is running. */ private boolean running = true; /** Queue to push work to. */ @@ -426,6 +426,8 @@ public class ConnectionManager { } catch (InterruptedException e) { LOG.error("The connection creator was interrupted"); this.running = false; + } catch (Throwable e) { + LOG.error("Fatal error caught by connection creator ", e); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java index 0e1eb40783..765f6c84e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java @@ -22,12 +22,17 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.Rule; +import org.junit.rules.ExpectedException; import java.io.IOException; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -49,6 +54,7 @@ public class TestConnectionManager { private static final UserGroupInformation TEST_USER3 = UserGroupInformation.createUserForTesting("user3", TEST_GROUP); private static final String TEST_NN_ADDRESS = "nn1:8080"; + private static final String UNRESOLVED_TEST_NN_ADDRESS = "unknownhost:8080"; @Before public void setup() throws Exception { @@ -59,6 +65,9 @@ public class TestConnectionManager { connManager.start(); } + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + @After public void shutdown() { if (connManager != null) { @@ -121,6 +130,40 @@ public class TestConnectionManager { checkPoolConnections(TEST_USER3, 4, 2); } + @Test + public void testConnectionCreatorWithException() throws Exception { + // Create a bad connection pool pointing to unresolvable namenode address. + ConnectionPool badPool = new ConnectionPool( + conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 0, 10, + ClientProtocol.class); + BlockingQueue queue = new ArrayBlockingQueue<>(1); + queue.add(badPool); + ConnectionManager.ConnectionCreator connectionCreator = + new ConnectionManager.ConnectionCreator(queue); + connectionCreator.setDaemon(true); + connectionCreator.start(); + // Wait to make sure async thread is scheduled and picks + GenericTestUtils.waitFor(()->queue.isEmpty(), 50, 5000); + // At this point connection creation task should be definitely picked up. + assertTrue(queue.isEmpty()); + // At this point connection thread should still be alive. + assertTrue(connectionCreator.isAlive()); + // Stop the thread as test is successful at this point + connectionCreator.interrupt(); + } + + @Test + public void testGetConnectionWithException() throws Exception { + String exceptionCause = "java.net.UnknownHostException: unknownhost"; + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage(exceptionCause); + + // Create a bad connection pool pointing to unresolvable namenode address. + ConnectionPool badPool = new ConnectionPool( + conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 1, 10, + ClientProtocol.class); + } + @Test public void testGetConnection() throws Exception { Map poolMap = connManager.getPools();