diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index e1a7c2f803..11143d7ef5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -24,7 +24,15 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.URI; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeoutException; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; @@ -88,6 +96,17 @@ public class ObserverReadProxyProvider /** Observer probe retry period default to 10 min. */ static final long OBSERVER_PROBE_RETRY_PERIOD_DEFAULT = 60 * 10 * 1000; + /** + * Timeout in ms to cancel the ha-state probe rpc request for an namenode. + * To disable timeout, set it to 0 or a negative value. + */ + static final String NAMENODE_HA_STATE_PROBE_TIMEOUT = + HdfsClientConfigKeys.Failover.PREFIX + "namenode.ha-state.probe.timeout"; + /** + * Default to disable namenode ha-state probe timeout. + */ + static final long NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT = 0; + /** The inner proxy provider used for active/standby failover. */ private final AbstractNNFailoverProxyProvider failoverProxy; /** List of all NameNode proxies. */ @@ -155,12 +174,32 @@ public class ObserverReadProxyProvider */ private long observerProbeRetryPeriodMs; + /** + * Timeout in ms when we try to get the HA state of a namenode. + */ + private long namenodeHAStateProbeTimeoutMs; + /** * The previous time where zero observer were found. If there was observer, * or it is initialization, this is set to 0. */ private long lastObserverProbeTime; + /** + * Threadpool to send the getHAServiceState requests. + * + * One thread running all the time, with up to 4 threads. Idle threads will be killed after + * 1 minute. At most 1024 requests can be submitted before they start to be rejected. + * + * Each hdfs client will have its own ObserverReadProxyProvider. Thus, + * having 1 thread running should be sufficient in most cases. + * We are not expecting to receive a lot of outstanding RPC calls + * from a single hdfs client, thus setting the queue size to 1024. + */ + private final ExecutorService nnProbingThreadPool = + new ThreadPoolExecutor(1, 4, 1L, TimeUnit.MINUTES, + new ArrayBlockingQueue(1024)); + /** * By default ObserverReadProxyProvider uses * {@link ConfiguredFailoverProxyProvider} for failover. @@ -213,6 +252,8 @@ public ObserverReadProxyProvider( observerProbeRetryPeriodMs = conf.getTimeDuration( OBSERVER_PROBE_RETRY_PERIOD_KEY, OBSERVER_PROBE_RETRY_PERIOD_DEFAULT, TimeUnit.MILLISECONDS); + namenodeHAStateProbeTimeoutMs = conf.getTimeDuration(NAMENODE_HA_STATE_PROBE_TIMEOUT, + NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); if (wrappedProxy instanceof ClientProtocol) { this.observerReadEnabled = true; @@ -284,13 +325,67 @@ private synchronized NNProxyInfo changeProxy(NNProxyInfo initial) { } currentIndex = (currentIndex + 1) % nameNodeProxies.size(); currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex)); - currentProxy.setCachedState(getHAServiceState(currentProxy)); + currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy)); LOG.debug("Changed current proxy from {} to {}", initial == null ? "none" : initial.proxyInfo, currentProxy.proxyInfo); return currentProxy; } + /** + * Execute getHAServiceState() call with a timeout, to avoid a long wait when + * an NN becomes irresponsive to rpc requests + * (when a thread/heap dump is being taken, e.g.). + * + * For each getHAServiceState() call, a task is created and submitted to a + * threadpool for execution. We will wait for a response up to + * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out. + * + * The implementation is split into two functions so that we can unit test + * the second function. + */ + HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo proxyInfo) { + Callable getHAServiceStateTask = () -> getHAServiceState(proxyInfo); + + try { + Future task = + nnProbingThreadPool.submit(getHAServiceStateTask); + return getHAServiceStateWithTimeout(proxyInfo, task); + } catch (RejectedExecutionException e) { + LOG.warn("Run out of threads to submit the request to query HA state. " + + "Ok to return null and we will fallback to use active NN to serve " + + "this request."); + return null; + } + } + + HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo proxyInfo, + Future task) { + HAServiceState state = null; + try { + if (namenodeHAStateProbeTimeoutMs > 0) { + state = task.get(namenodeHAStateProbeTimeoutMs, TimeUnit.MILLISECONDS); + } else { + // Disable timeout by waiting indefinitely when namenodeHAStateProbeTimeoutSec is set to 0 + // or a negative value. + state = task.get(); + } + LOG.debug("HA State for {} is {}", proxyInfo.proxyInfo, state); + } catch (TimeoutException e) { + // Cancel the task on timeout + String msg = String.format("Cancel NN probe task due to timeout for %s", proxyInfo.proxyInfo); + LOG.warn(msg, e); + if (task != null) { + task.cancel(true); + } + } catch (InterruptedException|ExecutionException e) { + String msg = String.format("Exception in NN probe task for %s", proxyInfo.proxyInfo); + LOG.warn(msg, e); + } + + return state; + } + /** * Fetch the service state from a proxy. If it is unable to be fetched, * assume it is in standby state, but log the exception. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java index 54b1159899..e3c1a0388b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import java.io.IOException; import java.net.InetSocketAddress; @@ -24,7 +26,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsAction; @@ -36,20 +41,30 @@ import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.tools.GetUserMappingsProtocol; +import org.apache.hadoop.util.StopWatch; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.slf4j.event.Level; import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; /** @@ -58,30 +73,42 @@ * NameNode to communicate with. */ public class TestObserverReadProxyProvider { + private final static long SLOW_RESPONSE_SLEEP_TIME = TimeUnit.SECONDS.toMillis(5); // 5 s + private final static long NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT = TimeUnit.SECONDS.toMillis(2); + private final static long NAMENODE_HA_STATE_PROBE_TIMEOUT_LONG = TimeUnit.SECONDS.toMillis(25); + private final GenericTestUtils.LogCapturer proxyLog = + GenericTestUtils.LogCapturer.captureLogs(ObserverReadProxyProvider.LOG); private static final LocatedBlock[] EMPTY_BLOCKS = new LocatedBlock[0]; private String ns; private URI nnURI; - private Configuration conf; private ObserverReadProxyProvider proxyProvider; private NameNodeAnswer[] namenodeAnswers; private String[] namenodeAddrs; + @BeforeClass + public static void setLogLevel() { + GenericTestUtils.setLogLevel(ObserverReadProxyProvider.LOG, Level.DEBUG); + } + @Before public void setup() throws Exception { ns = "testcluster"; nnURI = URI.create("hdfs://" + ns); - conf = new Configuration(); - conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns); - // Set observer probe retry period to 0. Required by the tests that - // transition observer back and forth - conf.setTimeDuration( - OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS); - conf.setBoolean(HdfsClientConfigKeys.Failover.RANDOM_ORDER, false); } private void setupProxyProvider(int namenodeCount) throws Exception { + setupProxyProvider(namenodeCount, new Configuration()); + } + + private void setupProxyProvider(int namenodeCount, long nnHAStateProbeTimeout) throws Exception { + Configuration conf = new Configuration(); + conf.setLong(NAMENODE_HA_STATE_PROBE_TIMEOUT, nnHAStateProbeTimeout); + setupProxyProvider(namenodeCount, conf); + } + + private void setupProxyProvider(int namenodeCount, Configuration conf) throws Exception { String[] namenodeIDs = new String[namenodeCount]; namenodeAddrs = new String[namenodeCount]; namenodeAnswers = new NameNodeAnswer[namenodeCount]; @@ -104,6 +131,12 @@ private void setupProxyProvider(int namenodeCount) throws Exception { } conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, Joiner.on(",").join(namenodeIDs)); + conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns); + // Set observer probe retry period to 0. Required by the tests that + // transition observer back and forth + conf.setTimeDuration( + OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS); + conf.setBoolean(HdfsClientConfigKeys.Failover.RANDOM_ORDER, false); proxyProvider = new ObserverReadProxyProvider(conf, nnURI, ClientProtocol.class, new ClientHAProxyFactory() { @@ -145,7 +178,7 @@ public GetUserMappingsProtocol createProxy(Configuration config, } }; ObserverReadProxyProvider userProxyProvider = - new ObserverReadProxyProvider<>(conf, nnURI, + new ObserverReadProxyProvider<>(proxyProvider.conf, nnURI, GetUserMappingsProtocol.class, proxyFactory); assertArrayEquals(fakeGroups, userProxyProvider.getProxy().proxy.getGroupsForUser(fakeUser)); @@ -325,6 +358,160 @@ public void testObserverRetriableException() throws Exception { assertHandledBy(1); } + /** + * Happy case for GetHAServiceStateWithTimeout. + */ + @Test + public void testGetHAServiceStateWithTimeout() throws Exception { + proxyLog.clearOutput(); + + setupProxyProvider(1, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT); + final HAServiceState state = HAServiceState.STANDBY; + NNProxyInfo dummyNNProxyInfo = + (NNProxyInfo) mock(NNProxyInfo.class); + Future task = mock(Future.class); + when(task.get(anyLong(), any(TimeUnit.class))).thenReturn(state); + + HAServiceState state2 = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task); + assertEquals(state, state2); + verify(task).get(anyLong(), any(TimeUnit.class)); + verifyNoMoreInteractions(task); + assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(), + "HA State for " + dummyNNProxyInfo.proxyInfo + " is " + state)); + proxyLog.clearOutput(); + } + + /** + * Test TimeoutException for GetHAServiceStateWithTimeout. + */ + @Test + public void testTimeoutExceptionGetHAServiceStateWithTimeout() throws Exception { + proxyLog.clearOutput(); + + setupProxyProvider(1, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT); + NNProxyInfo dummyNNProxyInfo = + (NNProxyInfo) Mockito.mock(NNProxyInfo.class); + Future task = mock(Future.class); + TimeoutException e = new TimeoutException("Timeout"); + when(task.get(anyLong(), any(TimeUnit.class))).thenThrow(e); + + HAServiceState state = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task); + assertNull(state); + verify(task).get(anyLong(), any(TimeUnit.class)); + verify(task).cancel(true); + verifyNoMoreInteractions(task); + assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(), + "Cancel NN probe task due to timeout for " + dummyNNProxyInfo.proxyInfo)); + proxyLog.clearOutput(); + } + + /** + * Test InterruptedException for GetHAServiceStateWithTimeout. + */ + @Test + public void testInterruptedExceptionGetHAServiceStateWithTimeout() throws Exception { + proxyLog.clearOutput(); + + setupProxyProvider(1, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT); + NNProxyInfo dummyNNProxyInfo = + (NNProxyInfo) Mockito.mock(NNProxyInfo.class); + Future task = mock(Future.class); + InterruptedException e = new InterruptedException("Interrupted"); + when(task.get(anyLong(), any(TimeUnit.class))).thenThrow(e); + + HAServiceState state = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task); + assertNull(state); + verify(task).get(anyLong(), any(TimeUnit.class)); + verifyNoMoreInteractions(task); + assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(), + "Exception in NN probe task for " + dummyNNProxyInfo.proxyInfo)); + proxyLog.clearOutput(); + } + + /** + * Test ExecutionException for GetHAServiceStateWithTimeout. + */ + @Test + public void testExecutionExceptionGetHAServiceStateWithTimeout() throws Exception { + proxyLog.clearOutput(); + + setupProxyProvider(1, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT); + NNProxyInfo dummyNNProxyInfo = + (NNProxyInfo) Mockito.mock(NNProxyInfo.class); + Future task = mock(Future.class); + Exception e = new ExecutionException(new InterruptedException("Interrupted")); + when(task.get(anyLong(), any(TimeUnit.class))).thenThrow(e); + + HAServiceState state = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task); + assertNull(state); + verify(task).get(anyLong(), any(TimeUnit.class)); + verifyNoMoreInteractions(task); + assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(), + "Exception in NN probe task for " + dummyNNProxyInfo.proxyInfo)); + proxyLog.clearOutput(); + } + + /** + * Test GetHAServiceState when timeout is disabled (test the else { task.get() } code path) + */ + @Test + public void testGetHAServiceStateWithoutTimeout() throws Exception { + proxyLog.clearOutput(); + setupProxyProvider(1, 0); + + final HAServiceState state = HAServiceState.STANDBY; + NNProxyInfo dummyNNProxyInfo = + (NNProxyInfo) mock(NNProxyInfo.class); + Future task = mock(Future.class); + when(task.get()).thenReturn(state); + + HAServiceState state2 = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task); + assertEquals(state, state2); + verify(task).get(); + verifyNoMoreInteractions(task); + assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(), + "HA State for " + dummyNNProxyInfo.proxyInfo + " is " + state)); + proxyLog.clearOutput(); + } + + /** + * Test getHAServiceState when we have a slow NN, using a 25s timeout. + * This is to verify the old behavior without being able to fast-fail (we can also set + * namenodeHAStateProbeTimeoutMs to 0 or a negative value and the rest of the test can stay + * the same). + * + * 5-second (SLOW_RESPONSE_SLEEP_TIME) latency is introduced and we expect that latency is added + * to the READ operation. + */ + @Test + public void testStandbyGetHAServiceStateLongTimeout() throws Exception { + setupProxyProvider(4, NAMENODE_HA_STATE_PROBE_TIMEOUT_LONG); + namenodeAnswers[0].setActiveState(); + namenodeAnswers[1].setSlowNode(true); + namenodeAnswers[3].setObserverState(); + + StopWatch watch = new StopWatch(); + watch.start(); + doRead(); + long runtime = watch.now(TimeUnit.MILLISECONDS); + assertTrue("Read operation finished earlier than we expected", + runtime > SLOW_RESPONSE_SLEEP_TIME); + } + + /** + * Test getHAServiceState using a 2s timeout with a slow standby. + * Fail the test if we don't complete it in 4s. + */ + @Test(timeout = 4000) + public void testStandbyGetHAServiceStateTimeout() throws Exception { + setupProxyProvider(4, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT); + namenodeAnswers[0].setActiveState(); + namenodeAnswers[1].setSlowNode(true); + namenodeAnswers[3].setObserverState(); + + doRead(); + } + private void doRead() throws Exception { doRead(proxyProvider.getProxy().proxy); } @@ -357,6 +544,7 @@ private static class NameNodeAnswer { private volatile boolean unreachable = false; private volatile boolean retryActive = false; + private volatile boolean slowNode = false; // Standby state by default private volatile boolean allowWrites = false; @@ -370,6 +558,12 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { if (unreachable) { throw new IOException("Unavailable"); } + + // sleep to simulate slow rpc responses. + if (slowNode) { + Thread.sleep(SLOW_RESPONSE_SLEEP_TIME); + } + // retryActive should be checked before getHAServiceState. // Check getHAServiceState first here only because in test, // it relies read call, which relies on getHAServiceState @@ -416,6 +610,11 @@ void setUnreachable(boolean unreachable) { this.unreachable = unreachable; } + // Whether this node should be slow in rpc response. + void setSlowNode(boolean slowNode) { + this.slowNode = slowNode; + } + void setActiveState() { allowReads = true; allowWrites = true;