diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java index e1e5fd09a8..1147b1ac46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java @@ -117,7 +117,8 @@ public static class NNProxyInfo extends ProxyInfo { /** * The currently known state of the NameNode represented by this ProxyInfo. * This may be out of date if the NameNode has changed state since the last - * time the state was checked. + * time the state was checked. If the NameNode could not be contacted, this + * will store null to indicate an unknown state. */ private HAServiceState cachedState; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index 0df5e1e71d..5780ce3b78 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -66,7 +66,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class ObserverReadProxyProvider +public class ObserverReadProxyProvider extends AbstractNNFailoverProxyProvider { @VisibleForTesting static final Logger LOG = LoggerFactory.getLogger( @@ -189,7 +189,13 @@ public ObserverReadProxyProvider( AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS); // TODO : make this configurable or remove this variable - this.observerReadEnabled = true; + if (wrappedProxy instanceof ClientProtocol) { + this.observerReadEnabled = true; + } else { + LOG.info("Disabling observer reads for {} because the requested proxy " + + "class does not implement {}", uri, ClientProtocol.class.getName()); + this.observerReadEnabled = false; + } } public AlignmentContext getAlignmentContext() { @@ -267,7 +273,7 @@ private synchronized NNProxyInfo changeProxy(NNProxyInfo initial) { private HAServiceState getHAServiceState(NNProxyInfo proxyInfo) { IOException ioe; try { - return proxyInfo.proxy.getHAServiceState(); + return getProxyAsClientProtocol(proxyInfo.proxy).getHAServiceState(); } catch (RemoteException re) { // Though a Standby will allow a getHAServiceState call, it won't allow // delegation token lookup, so if DT is used it throws StandbyException @@ -280,9 +286,21 @@ private HAServiceState getHAServiceState(NNProxyInfo proxyInfo) { } catch (IOException e) { ioe = e; } - LOG.info("Failed to connect to {}. Assuming Standby state", + LOG.warn("Failed to connect to {} while fetching HAServiceState", proxyInfo.getAddress(), ioe); - return HAServiceState.STANDBY; + return null; + } + + /** + * Return the input proxy, cast as a {@link ClientProtocol}. This catches any + * {@link ClassCastException} and wraps it in a more helpful message. This + * should ONLY be called if the caller is certain that the proxy is, in fact, + * a {@link ClientProtocol}. + */ + private ClientProtocol getProxyAsClientProtocol(T proxy) { + assert proxy instanceof ClientProtocol : "BUG: Attempted to use proxy " + + "of class " + proxy.getClass() + " as if it was a ClientProtocol."; + return (ClientProtocol) proxy; } /** @@ -297,7 +315,7 @@ private synchronized void initializeMsync() throws IOException { if (msynced) { return; // No need for an msync } - failoverProxy.getProxy().proxy.msync(); + getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync(); msynced = true; lastMsyncTimeMs = Time.monotonicNow(); } @@ -313,7 +331,7 @@ private synchronized void initializeMsync() throws IOException { private void autoMsyncIfNecessary() throws IOException { if (autoMsyncPeriodMs == 0) { // Always msync - failoverProxy.getProxy().proxy.msync(); + getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync(); } else if (autoMsyncPeriodMs > 0) { if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) { synchronized (this) { @@ -322,7 +340,7 @@ private void autoMsyncIfNecessary() throws IOException { // Re-check the entry criterion since the status may have changed // while waiting for the lock. if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) { - failoverProxy.getProxy().proxy.msync(); + getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync(); lastMsyncTimeMs = Time.monotonicNow(); } } @@ -361,6 +379,7 @@ public Object invoke(Object proxy, final Method method, final Object[] args) int failedObserverCount = 0; int activeCount = 0; int standbyCount = 0; + int unreachableCount = 0; for (int i = 0; i < nameNodeProxies.size(); i++) { NNProxyInfo current = getCurrentProxy(); HAServiceState currState = current.getCachedState(); @@ -369,9 +388,12 @@ public Object invoke(Object proxy, final Method method, final Object[] args) activeCount++; } else if (currState == HAServiceState.STANDBY) { standbyCount++; + } else if (currState == null) { + unreachableCount++; } LOG.debug("Skipping proxy {} for {} because it is in state {}", - current.proxyInfo, method.getName(), currState); + current.proxyInfo, method.getName(), + currState == null ? "unreachable" : currState); changeProxy(current); continue; } @@ -414,13 +436,15 @@ public Object invoke(Object proxy, final Method method, final Object[] args) } // If we get here, it means all observers have failed. - LOG.warn("{} observers have failed for read request {}; also found " + - "{} standby and {} active. Falling back to active.", - failedObserverCount, method.getName(), standbyCount, activeCount); + LOG.warn("{} observers have failed for read request {}; also found {} " + + "standby, {} active, and {} unreachable. Falling back to active.", + failedObserverCount, method.getName(), standbyCount, activeCount, + unreachableCount); } - // Either all observers have failed, or that it is a write request. - // In either case, we'll forward the request to active NameNode. + // Either all observers have failed, observer reads are disabled, + // or this is a write request. In any case, forward the request to + // the active NameNode. LOG.debug("Using failoverProxy to service {}", method.getName()); ProxyInfo activeProxy = failoverProxy.getProxy(); try { @@ -442,7 +466,8 @@ public void close() throws IOException {} @Override public ConnectionId getConnectionId() { - return RPC.getConnectionIdForProxy(getCurrentProxy().proxy); + return RPC.getConnectionIdForProxy(observerReadEnabled + ? getCurrentProxy().proxy : failoverProxy.getProxy().proxy); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java index 718d13f124..fb3cc34794 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java @@ -152,7 +152,7 @@ public void testObserverReadProxyProviderWithDT() throws Exception { cluster.shutdownNameNode(0); logCapture.clearOutput(); dfs.access(new Path("/"), FsAction.READ); - assertTrue(logCapture.getOutput().contains("Assuming Standby state")); + assertTrue(logCapture.getOutput().contains("Failed to connect to")); } finally { logCapture.stopCapturing(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index 38feec4752..20e0bbdfcd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.TestFsck; +import org.apache.hadoop.hdfs.tools.GetGroups; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -113,6 +114,17 @@ public void testNoActiveToObserver() throws Exception { fail("active cannot be transitioned to observer"); } + /** + * Test that non-ClientProtocol proxies such as + * {@link org.apache.hadoop.tools.GetUserMappingsProtocol} still work + * when run in an environment with observers. + */ + @Test + public void testGetGroups() throws Exception { + GetGroups getGroups = new GetGroups(conf); + assertEquals(0, getGroups.run(new String[0])); + } + @Test public void testNoObserverToActive() throws Exception { try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java index caf7d003ea..13b5774423 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java @@ -34,6 +34,7 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -42,9 +43,12 @@ import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * Tests for {@link ObserverReadProxyProvider} under various configurations of @@ -115,6 +119,31 @@ protected List> getProxyAddresses( proxyProvider.setObserverReadEnabled(true); } + @Test + public void testWithNonClientProxy() throws Exception { + setupProxyProvider(2); // This will initialize all of the instance fields + final String fakeUser = "fakeUser"; + final String[] fakeGroups = {"fakeGroup"}; + HAProxyFactory proxyFactory = + new NameNodeHAProxyFactory() { + @Override + public GetUserMappingsProtocol createProxy(Configuration config, + InetSocketAddress addr, Class xface, + UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException { + GetUserMappingsProtocol proxy = + mock(GetUserMappingsProtocol.class); + when(proxy.getGroupsForUser(fakeUser)).thenReturn(fakeGroups); + return proxy; + } + }; + ObserverReadProxyProvider userProxyProvider = + new ObserverReadProxyProvider<>(conf, nnURI, + GetUserMappingsProtocol.class, proxyFactory); + assertArrayEquals(fakeGroups, + userProxyProvider.getProxy().proxy.getGroupsForUser(fakeUser)); + } + @Test public void testReadOperationOnObserver() throws Exception { setupProxyProvider(3);