HDFS-13834. RBF: Connection creator thread should catch Throwable. Contributed by CR Hota.
This commit is contained in:
parent
04caaba488
commit
fa55eacd35
@ -393,7 +393,7 @@ public class ConnectionManager {
|
|||||||
/**
|
/**
|
||||||
* Thread that creates connections asynchronously.
|
* Thread that creates connections asynchronously.
|
||||||
*/
|
*/
|
||||||
private static class ConnectionCreator extends Thread {
|
static class ConnectionCreator extends Thread {
|
||||||
/** If the creator is running. */
|
/** If the creator is running. */
|
||||||
private boolean running = true;
|
private boolean running = true;
|
||||||
/** Queue to push work to. */
|
/** Queue to push work to. */
|
||||||
@ -426,6 +426,8 @@ public class ConnectionManager {
|
|||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.error("The connection creator was interrupted");
|
LOG.error("The connection creator was interrupted");
|
||||||
this.running = false;
|
this.running = false;
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.error("Fatal error caught by connection creator ", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,12 +22,17 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
@ -49,6 +54,7 @@ public class TestConnectionManager {
|
|||||||
private static final UserGroupInformation TEST_USER3 =
|
private static final UserGroupInformation TEST_USER3 =
|
||||||
UserGroupInformation.createUserForTesting("user3", TEST_GROUP);
|
UserGroupInformation.createUserForTesting("user3", TEST_GROUP);
|
||||||
private static final String TEST_NN_ADDRESS = "nn1:8080";
|
private static final String TEST_NN_ADDRESS = "nn1:8080";
|
||||||
|
private static final String UNRESOLVED_TEST_NN_ADDRESS = "unknownhost:8080";
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
@ -59,6 +65,9 @@ public class TestConnectionManager {
|
|||||||
connManager.start();
|
connManager.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public ExpectedException exceptionRule = ExpectedException.none();
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
if (connManager != null) {
|
if (connManager != null) {
|
||||||
@ -121,6 +130,40 @@ public class TestConnectionManager {
|
|||||||
checkPoolConnections(TEST_USER3, 4, 2);
|
checkPoolConnections(TEST_USER3, 4, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConnectionCreatorWithException() throws Exception {
|
||||||
|
// Create a bad connection pool pointing to unresolvable namenode address.
|
||||||
|
ConnectionPool badPool = new ConnectionPool(
|
||||||
|
conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 0, 10,
|
||||||
|
ClientProtocol.class);
|
||||||
|
BlockingQueue<ConnectionPool> queue = new ArrayBlockingQueue<>(1);
|
||||||
|
queue.add(badPool);
|
||||||
|
ConnectionManager.ConnectionCreator connectionCreator =
|
||||||
|
new ConnectionManager.ConnectionCreator(queue);
|
||||||
|
connectionCreator.setDaemon(true);
|
||||||
|
connectionCreator.start();
|
||||||
|
// Wait to make sure async thread is scheduled and picks
|
||||||
|
GenericTestUtils.waitFor(()->queue.isEmpty(), 50, 5000);
|
||||||
|
// At this point connection creation task should be definitely picked up.
|
||||||
|
assertTrue(queue.isEmpty());
|
||||||
|
// At this point connection thread should still be alive.
|
||||||
|
assertTrue(connectionCreator.isAlive());
|
||||||
|
// Stop the thread as test is successful at this point
|
||||||
|
connectionCreator.interrupt();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetConnectionWithException() throws Exception {
|
||||||
|
String exceptionCause = "java.net.UnknownHostException: unknownhost";
|
||||||
|
exceptionRule.expect(IllegalArgumentException.class);
|
||||||
|
exceptionRule.expectMessage(exceptionCause);
|
||||||
|
|
||||||
|
// Create a bad connection pool pointing to unresolvable namenode address.
|
||||||
|
ConnectionPool badPool = new ConnectionPool(
|
||||||
|
conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 1, 10,
|
||||||
|
ClientProtocol.class);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetConnection() throws Exception {
|
public void testGetConnection() throws Exception {
|
||||||
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
|
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user