From 921ca1f554e128e3b187e4124beaf264e3d1c66a Mon Sep 17 00:00:00 2001 From: hemanthboyina Date: Wed, 7 Oct 2020 09:58:53 +0530 Subject: [PATCH] HDFS-15543. RBF: Write Should allow, when a subcluster is unavailable for RANDOM mount points with fault Tolerance enabled. Contributed by Hemanth Boyina. --- .../router/RouterClientProtocol.java | 2 +- .../federation/router/RouterRpcServer.java | 57 +++++++++++++++++-- ...MultipleDestinationMountTableResolver.java | 39 +++++++++++++ 3 files changed, 91 insertions(+), 7 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 2c7e90bbdf..cfba7f1d9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -306,7 +306,7 @@ public HdfsFileStatus create(String src, FsPermission masked, * @return If caused by an unavailable subcluster. False if the should not be * retried (e.g., NSQuotaExceededException). */ - private static boolean isUnavailableSubclusterException( + protected static boolean isUnavailableSubclusterException( final IOException ioe) { if (ioe instanceof ConnectException || ioe instanceof ConnectTimeoutException || diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 97b146c947..ce10bfdec8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -587,6 +588,7 @@ static String getMethodName() { /** * Invokes the method at default namespace, if default namespace is not * available then at the first available namespace. + * If the namespace is unavailable, retry once with other namespace. * @param expected return type. * @param method the remote method. * @return the response received after invoking method. @@ -595,18 +597,61 @@ static String getMethodName() { T invokeAtAvailableNs(RemoteMethod method, Class clazz) throws IOException { String nsId = subclusterResolver.getDefaultNamespace(); - if (!nsId.isEmpty()) { - return rpcClient.invokeSingle(nsId, method, clazz); - } // If default Ns is not present return result from first namespace. Set nss = namenodeResolver.getNamespaces(); - if (nss.isEmpty()) { - throw new IOException("No namespace available."); + try { + if (!nsId.isEmpty()) { + return rpcClient.invokeSingle(nsId, method, clazz); + } + // If no namespace is available, throw IOException. + IOException io = new IOException("No namespace available."); + return invokeOnNs(method, clazz, io, nss); + } catch (IOException ioe) { + if (!clientProto.isUnavailableSubclusterException(ioe)) { + LOG.debug("{} exception cannot be retried", + ioe.getClass().getSimpleName()); + throw ioe; + } + Set nssWithoutFailed = getNameSpaceInfo(nsId); + return invokeOnNs(method, clazz, ioe, nssWithoutFailed); } - nsId = nss.iterator().next().getNameserviceId(); + } + + /** + * Invoke the method on first available namespace, + * throw no namespace available exception, if no namespaces are available. + * @param method the remote method. + * @param clazz Class for the return type. + * @param ioe IOException . + * @param nss List of name spaces in the federation + * @return the response received after invoking method. + * @throws IOException + */ + T invokeOnNs(RemoteMethod method, Class clazz, IOException ioe, + Set nss) throws IOException { + if (nss.isEmpty()) { + throw ioe; + } + String nsId = nss.iterator().next().getNameserviceId(); return rpcClient.invokeSingle(nsId, method, clazz); } + /** + * Get set of namespace info's removing the already invoked namespaceinfo. + * @param nsId already invoked namespace id + * @return List of name spaces in the federation on + * removing the already invoked namespaceinfo. + */ + private Set getNameSpaceInfo(String nsId) { + Set namespaceInfos = new HashSet<>(); + for (FederationNamespaceInfo ns : namespaceInfos) { + if (!nsId.equals(ns.getNameserviceId())) { + namespaceInfos.add(ns); + } + } + return namespaceInfos; + } + @Override // ClientProtocol public Token getDelegationToken(Text renewer) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java index ebb62d410d..2887c08f15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.AclEntry; @@ -45,6 +46,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.SnapshotStatus; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; @@ -60,6 +62,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.tools.federation.RouterAdmin; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.ToolRunner; import org.junit.After; @@ -640,6 +643,42 @@ public void testContentSummaryMultipleDestWithMaxValue() assertEquals(ssQuota, cs.getSpaceQuota()); } + /** + * Test write on mount point with multiple destinations + * and making a one of the destination's subcluster unavailable. + */ + @Test + public void testWriteWithUnavailableSubCluster() throws IOException { + //create a mount point with multiple destinations + Path path = new Path("/testWriteWithUnavailableSubCluster"); + Map destMap = new HashMap<>(); + destMap.put("ns0", "/testWriteWithUnavailableSubCluster"); + destMap.put("ns1", "/testWriteWithUnavailableSubCluster"); + nnFs0.mkdirs(path); + nnFs1.mkdirs(path); + MountTable addEntry = + MountTable.newInstance("/testWriteWithUnavailableSubCluster", destMap); + addEntry.setQuota(new RouterQuotaUsage.Builder().build()); + addEntry.setDestOrder(DestinationOrder.RANDOM); + addEntry.setFaultTolerant(true); + assertTrue(addMountTable(addEntry)); + + //make one subcluster unavailable and perform write on mount point + MiniDFSCluster dfsCluster = cluster.getCluster(); + dfsCluster.shutdownNameNode(0); + FSDataOutputStream out = null; + Path filePath = new Path(path, "aa"); + try { + out = routerFs.create(filePath); + out.write("hello".getBytes()); + out.hflush(); + assertTrue(routerFs.exists(filePath)); + } finally { + IOUtils.closeStream(out); + dfsCluster.restartNameNode(0); + } + } + /** * Test to verify rename operation on directories in case of multiple * destinations.