From 5567154f71e84ae45e37d4840cf09e64fc4c4df5 Mon Sep 17 00:00:00 2001 From: ZanderXu <15040255127@163.com> Date: Sat, 27 Aug 2022 07:04:33 +0800 Subject: [PATCH] HDFS-16734. RBF: fix some bugs when handling getContentSummary RPC (#4763) --- .../router/RouterClientProtocol.java | 102 ++++++++++++--- .../TestRouterMountTableWithoutDefaultNS.java | 121 ++++++++++++++++++ 2 files changed, 203 insertions(+), 20 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index e1a83d0b41..9d3973d450 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -115,6 +115,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Module that implements all the RPC calls in {@link ClientProtocol} in the @@ -1251,14 +1252,93 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { rpcClient.invokeConcurrent(nss, method, true, false); } + /** + * Recursively get all the locations for the path. + * For example, there are some mount points: + * /a -> ns0 -> /a + * /a/b -> ns1 -> /a/b + * /a/b/c -> ns2 -> /a/b/c + * When the path is '/a', the result of locations should be + * {ns0 -> [RemoteLocation(/a)], ns1 -> [RemoteLocation(/a/b)], ns2 -> [RemoteLocation(/a/b/c)]} + * @param path the path to get the locations. + * @return a map to store all the locations and key is namespace id. + * @throws IOException + */ + @VisibleForTesting + Map> getAllLocations(String path) throws IOException { + Map> locations = new HashMap<>(); + try { + List parentLocations = rpcServer.getLocationsForPath(path, false, false); + parentLocations.forEach( + l -> locations.computeIfAbsent(l.getNameserviceId(), k -> new ArrayList<>()).add(l)); + } catch (NoLocationException | RouterResolveException e) { + LOG.debug("Cannot find locations for {}.", path); + } + + final List children = subclusterResolver.getMountPoints(path); + if (children != null) { + for (String child : children) { + String childPath = new Path(path, child).toUri().getPath(); + Map> childLocations = getAllLocations(childPath); + childLocations.forEach( + (k, v) -> locations.computeIfAbsent(k, l -> new ArrayList<>()).addAll(v)); + } + } + return locations; + } + + /** + * Get all the locations of the path for {@link this#getContentSummary(String)}. + * For example, there are some mount points: + * /a -> ns0 -> /a + * /a/b -> ns0 -> /a/b + * /a/b/c -> ns1 -> /a/b/c + * When the path is '/a', the result of locations should be + * [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')] + * When the path is '/b', will throw NoLocationException. + * @param path the path to get content summary + * @return one list contains all the remote location + * @throws IOException + */ + @VisibleForTesting + List getLocationsForContentSummary(String path) throws IOException { + // Try to get all the locations of the path. + final Map> ns2Locations = getAllLocations(path); + if (ns2Locations.isEmpty()) { + throw new NoLocationException(path, subclusterResolver.getClass()); + } + + final List locations = new ArrayList<>(); + // remove the redundancy remoteLocation order by destination. + ns2Locations.forEach((k, v) -> { + List sortedList = v.stream().sorted().collect(Collectors.toList()); + int size = sortedList.size(); + for (int i = size - 1; i > -1; i--) { + RemoteLocation currentLocation = sortedList.get(i); + if (i == 0) { + locations.add(currentLocation); + } else { + RemoteLocation preLocation = sortedList.get(i - 1); + if (!currentLocation.getDest().startsWith(preLocation.getDest() + Path.SEPARATOR)) { + locations.add(currentLocation); + } else { + LOG.debug("Ignore redundant location {}, because there is an ancestor location {}", + currentLocation, preLocation); + } + } + } + }); + + return locations; + } + @Override public ContentSummary getContentSummary(String path) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); // Get the summaries from regular files final Collection summaries = new ArrayList<>(); - final List locations = - rpcServer.getLocationsForPath(path, false, false); + final List locations = getLocationsForContentSummary(path); final RemoteMethod method = new RemoteMethod("getContentSummary", new Class[] {String.class}, new RemoteParam()); final List> results = @@ -1278,24 +1358,6 @@ public ContentSummary getContentSummary(String path) throws IOException { } } - // Add mount points at this level in the tree - final List children = subclusterResolver.getMountPoints(path); - if (children != null) { - for (String child : children) { - Path childPath = new Path(path, child); - try { - ContentSummary mountSummary = getContentSummary( - childPath.toString()); - if (mountSummary != null) { - summaries.add(mountSummary); - } - } catch (Exception e) { - LOG.error("Cannot get content summary for mount {}: {}", - childPath, e.getMessage()); - } - } - } - // Throw original exception if no original nor mount points if (summaries.isEmpty() && notFoundException != null) { throw notFoundException; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableWithoutDefaultNS.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableWithoutDefaultNS.java index bbf56fe06b..57d4c69db6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableWithoutDefaultNS.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableWithoutDefaultNS.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hdfs.server.federation.router; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; @@ -25,6 +29,7 @@ import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException; import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; @@ -40,10 +45,13 @@ import java.io.IOException; import java.util.Collections; +import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Test a router end-to-end including the MountTable without default nameservice. @@ -53,6 +61,8 @@ public class TestRouterMountTableWithoutDefaultNS { private static RouterContext routerContext; private static MountTableResolver mountTable; private static ClientProtocol routerProtocol; + private static FileSystem nnFs0; + private static FileSystem nnFs1; @BeforeClass public static void globalSetUp() throws Exception { @@ -71,6 +81,8 @@ public static void globalSetUp() throws Exception { cluster.waitClusterUp(); // Get the end points + nnFs0 = cluster.getNamenode("ns0", null).getFileSystem(); + nnFs1 = cluster.getNamenode("ns1", null).getFileSystem(); routerContext = cluster.getRandomRouter(); Router router = routerContext.getRouter(); routerProtocol = routerContext.getClient().getNamenode(); @@ -144,4 +156,113 @@ public void testGetFileInfoWithoutSubMountPoint() throws Exception { LambdaTestUtils.intercept(RouterResolveException.class, () -> routerContext.getRouter().getRpcServer().getFileInfo("/testdir2")); } + + /** + * Verify that RBF that disable default nameservice should support + * get information about ancestor mount points. + */ + @Test + public void testGetContentSummaryWithSubMountPoint() throws IOException { + MountTable addEntry = MountTable.newInstance("/testdir/1/2", + Collections.singletonMap("ns0", "/testdir/1/2")); + assertTrue(addMountTable(addEntry)); + + try { + writeData(nnFs0, new Path("/testdir/1/2/3"), 10 * 1024 * 1024); + + RouterRpcServer routerRpcServer = routerContext.getRouterRpcServer(); + ContentSummary summaryFromRBF = routerRpcServer.getContentSummary("/testdir"); + assertNotNull(summaryFromRBF); + assertEquals(1, summaryFromRBF.getFileCount()); + assertEquals(10 * 1024 * 1024, summaryFromRBF.getLength()); + } finally { + nnFs0.delete(new Path("/testdir"), true); + } + } + + @Test + public void testGetAllLocations() throws IOException { + // Add mount table entry. + MountTable addEntry = MountTable.newInstance("/testA", + Collections.singletonMap("ns0", "/testA")); + assertTrue(addMountTable(addEntry)); + addEntry = MountTable.newInstance("/testA/testB", + Collections.singletonMap("ns1", "/testA/testB")); + assertTrue(addMountTable(addEntry)); + addEntry = MountTable.newInstance("/testA/testB/testC", + Collections.singletonMap("ns2", "/testA/testB/testC")); + assertTrue(addMountTable(addEntry)); + + RouterClientProtocol protocol = routerContext.getRouterRpcServer().getClientProtocolModule(); + Map> locations = protocol.getAllLocations("/testA"); + assertEquals(3, locations.size()); + } + + @Test + public void testGetLocationsForContentSummary() throws Exception { + // Add mount table entry. + MountTable addEntry = MountTable.newInstance("/testA/testB", + Collections.singletonMap("ns0", "/testA/testB")); + assertTrue(addMountTable(addEntry)); + addEntry = MountTable.newInstance("/testA/testB/testC", + Collections.singletonMap("ns1", "/testA/testB/testC")); + assertTrue(addMountTable(addEntry)); + + RouterClientProtocol protocol = routerContext.getRouterRpcServer().getClientProtocolModule(); + List locations = protocol.getLocationsForContentSummary("/testA"); + assertEquals(2, locations.size()); + + for (RemoteLocation location : locations) { + String nsId = location.getNameserviceId(); + if ("ns0".equals(nsId)) { + assertEquals("/testA/testB", location.getDest()); + } else if ("ns1".equals(nsId)) { + assertEquals("/testA/testB/testC", location.getDest()); + } else { + fail("Unexpected NS " + nsId); + } + } + + LambdaTestUtils.intercept(NoLocationException.class, + () -> protocol.getLocationsForContentSummary("/testB")); + } + + @Test + public void testGetContentSummary() throws Exception { + try { + // Add mount table entry. + MountTable addEntry = MountTable.newInstance("/testA", + Collections.singletonMap("ns0", "/testA")); + assertTrue(addMountTable(addEntry)); + addEntry = MountTable.newInstance("/testA/testB", + Collections.singletonMap("ns0", "/testA/testB")); + assertTrue(addMountTable(addEntry)); + addEntry = MountTable.newInstance("/testA/testB/testC", + Collections.singletonMap("ns1", "/testA/testB/testC")); + assertTrue(addMountTable(addEntry)); + + writeData(nnFs0, new Path("/testA/testB/file1"), 1024 * 1024); + writeData(nnFs1, new Path("/testA/testB/testC/file2"), 1024 * 1024); + writeData(nnFs1, new Path("/testA/testB/testC/file3"), 1024 * 1024); + + RouterRpcServer routerRpcServer = routerContext.getRouterRpcServer(); + ContentSummary summary = routerRpcServer.getContentSummary("/testA"); + assertEquals(3, summary.getFileCount()); + assertEquals(1024 * 1024 * 3, summary.getLength()); + + LambdaTestUtils.intercept(NoLocationException.class, + () -> routerRpcServer.getContentSummary("/testB")); + } finally { + nnFs0.delete(new Path("/testA"), true); + nnFs1.delete(new Path("/testA"), true); + } + } + + void writeData(FileSystem fs, Path path, int fileLength) throws IOException { + try (FSDataOutputStream outputStream = fs.create(path)) { + for (int writeSize = 0; writeSize < fileLength; writeSize++) { + outputStream.write(writeSize); + } + } + } }