diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 1c17c297ae..7003f96f04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY; +import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Constructor; @@ -436,8 +437,7 @@ private Object invokeMethod( this.rpcMonitor.proxyOpFailureStandby(); } failover = true; - } else if (ioe instanceof ConnectException || - ioe instanceof ConnectTimeoutException) { + } else if (isUnavailableException(ioe)) { if (this.rpcMonitor != null) { this.rpcMonitor.proxyOpFailureCommunicate(); } @@ -503,8 +503,7 @@ private Object invokeMethod( if (ioe instanceof StandbyException) { LOG.error("{} at {} is in Standby: {}", nnKey, addr, ioe.getMessage()); - } else if (ioe instanceof ConnectException || - ioe instanceof ConnectTimeoutException) { + } else if (isUnavailableException(ioe)) { exConnect++; LOG.error("{} at {} cannot be reached: {}", nnKey, addr, ioe.getMessage()); @@ -563,8 +562,7 @@ private Object invoke(String nsId, int retryCount, final Method method, // failover, invoker looks for standby exceptions for failover. if (ioe instanceof StandbyException) { throw ioe; - } else if (ioe instanceof ConnectException || - ioe instanceof ConnectTimeoutException) { + } else if (isUnavailableException(ioe)) { throw ioe; } else { throw new StandbyException(ioe.getMessage()); @@ -578,6 +576,27 @@ private Object invoke(String nsId, int retryCount, final Method method, } } + /** + * Check if the exception comes from an unavailable subcluster. + * @param ioe IOException to check. + * @return If the exception comes from an unavailable subcluster. + */ + public static boolean isUnavailableException(IOException ioe) { + if (ioe instanceof ConnectException || + ioe instanceof ConnectTimeoutException || + ioe instanceof EOFException || + ioe instanceof StandbyException) { + return true; + } + if (ioe instanceof RetriableException) { + Throwable cause = ioe.getCause(); + if (cause instanceof NoNamenodesAvailableException) { + return true; + } + } + return false; + } + /** * Check if the cluster of given nameservice id is available. * @param nsId nameservice ID. @@ -833,8 +852,7 @@ public T invokeSequential( final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); final Method m = remoteMethod.getMethod(); - IOException firstThrownException = null; - IOException lastThrownException = null; + List thrownExceptions = new ArrayList<>(); Object firstResult = null; // Invoke in priority order for (final RemoteLocationContext loc : locations) { @@ -862,29 +880,33 @@ public T invokeSequential( ioe = processException(ioe, loc); // Record it and move on - lastThrownException = ioe; - if (firstThrownException == null) { - firstThrownException = lastThrownException; - } + thrownExceptions.add(ioe); } catch (Exception e) { // Unusual error, ClientProtocol calls always use IOException (or // RemoteException). Re-wrap in IOException for compatibility with // ClientProtcol. LOG.error("Unexpected exception {} proxying {} to {}", e.getClass(), m.getName(), ns, e); - lastThrownException = new IOException( + IOException ioe = new IOException( "Unexpected exception proxying API " + e.getMessage(), e); - if (firstThrownException == null) { - firstThrownException = lastThrownException; - } + thrownExceptions.add(ioe); } } - if (firstThrownException != null) { - // re-throw the last exception thrown for compatibility - throw firstThrownException; + if (!thrownExceptions.isEmpty()) { + // An unavailable subcluster may be the actual cause + // We cannot surface other exceptions (e.g., FileNotFoundException) + for (int i = 0; i < thrownExceptions.size(); i++) { + IOException ioe = thrownExceptions.get(i); + if (isUnavailableException(ioe)) { + throw ioe; + } + } + + // re-throw the first exception thrown for compatibility + throw thrownExceptions.get(0); } - // Return the last result, whether it is the value we are looking for or a + // Return the first result, whether it is the value or not @SuppressWarnings("unchecked") T ret = (T)firstResult; return ret; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 1f2298ddc6..14cd6e7587 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -653,6 +653,12 @@ RemoteLocation getCreateLocation( } } catch (FileNotFoundException fne) { // Ignore if the file is not found + } catch (IOException ioe) { + if (RouterRpcClient.isUnavailableException(ioe)) { + LOG.debug("Ignore unavailable exception: {}", ioe); + } else { + throw ioe; + } } } return createLocation; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java index 39d9561395..8907ce5b91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters; import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -40,6 +41,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Random; import java.util.UUID; import java.util.concurrent.Callable; @@ -51,6 +53,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -72,6 +75,7 @@ import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -153,9 +157,6 @@ public void setup() throws Exception { registerSubclusters( routers, namenodes.values(), Collections.singleton("ns1")); - LOG.info("Stop ns1 to simulate an unavailable subcluster"); - namenodes.get("ns1").stop(); - service = Executors.newFixedThreadPool(10); } @@ -209,6 +210,9 @@ private void updateMountPointFaultTolerant(final String mountPoint) @Test public void testWriteWithFailedSubcluster() throws Exception { + LOG.info("Stop ns1 to simulate an unavailable subcluster"); + namenodes.get("ns1").stop(); + // Run the actual tests with each approach final List> tasks = new ArrayList<>(); final List orders = asList( @@ -609,4 +613,63 @@ private FileSystem getRandomRouterFileSystem() throws Exception { return userUgi.doAs( (PrivilegedExceptionAction) () -> getFileSystem(router)); } + + @Test + public void testReadWithFailedSubcluster() throws Exception { + + DestinationOrder order = DestinationOrder.HASH_ALL; + final String mountPoint = "/" + order + "-testread"; + final Path mountPath = new Path(mountPoint); + LOG.info("Setup {} with order {}", mountPoint, order); + createMountTableEntry( + routers, mountPoint, order, namenodes.keySet()); + + FileSystem fs = getRandomRouterFileSystem(); + + // Create a file (we don't write because we have no mock Datanodes) + final Path fileexisting = new Path(mountPath, "fileexisting"); + final Path filenotexisting = new Path(mountPath, "filenotexisting"); + FSDataOutputStream os = fs.create(fileexisting); + assertNotNull(os); + os.close(); + + // We should be able to read existing files + FSDataInputStream fsdis = fs.open(fileexisting); + assertNotNull("We should be able to read the file", fsdis); + // We shouldn't be able to read non-existing files + LambdaTestUtils.intercept(FileNotFoundException.class, + () -> fs.open(filenotexisting)); + + // Check the subcluster where the file got created + String nsIdWithFile = null; + for (Entry entry : namenodes.entrySet()) { + String nsId = entry.getKey(); + MockNamenode nn = entry.getValue(); + int rpc = nn.getRPCPort(); + FileSystem nnfs = getFileSystem(rpc); + + try { + FileStatus fileStatus = nnfs.getFileStatus(fileexisting); + assertNotNull(fileStatus); + assertNull("The file cannot be in two subclusters", nsIdWithFile); + nsIdWithFile = nsId; + } catch (FileNotFoundException fnfe) { + LOG.debug("File not found in {}", nsId); + } + } + assertNotNull("The file has to be in one subcluster", nsIdWithFile); + + LOG.info("Stop {} to simulate an unavailable subcluster", nsIdWithFile); + namenodes.get(nsIdWithFile).stop(); + + // We should not get FileNotFoundException anymore + try { + fs.open(fileexisting); + fail("It should throw an unavailable cluster exception"); + } catch(RemoteException re) { + IOException ioe = re.unwrapRemoteException(); + assertTrue("Expected an unavailable exception for:" + ioe.getClass(), + RouterRpcClient.isUnavailableException(ioe)); + } + } }