HDFS-16734. RBF: fix some bugs when handling getContentSummary RPC (#4763)

This commit is contained in:
ZanderXu 2022-08-27 07:04:33 +08:00 committed by GitHub
parent f8b9dd911c
commit 5567154f71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 203 additions and 20 deletions

View File

@ -115,6 +115,7 @@
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/** /**
* Module that implements all the RPC calls in {@link ClientProtocol} in the * Module that implements all the RPC calls in {@link ClientProtocol} in the
@ -1251,14 +1252,93 @@ public void setBalancerBandwidth(long bandwidth) throws IOException {
rpcClient.invokeConcurrent(nss, method, true, false); rpcClient.invokeConcurrent(nss, method, true, false);
} }
/**
* Recursively get all the locations for the path.
* For example, there are some mount points:
* /a -> ns0 -> /a
* /a/b -> ns1 -> /a/b
* /a/b/c -> ns2 -> /a/b/c
* When the path is '/a', the result of locations should be
* {ns0 -> [RemoteLocation(/a)], ns1 -> [RemoteLocation(/a/b)], ns2 -> [RemoteLocation(/a/b/c)]}
* @param path the path to get the locations.
* @return a map to store all the locations and key is namespace id.
* @throws IOException
*/
@VisibleForTesting
Map<String, List<RemoteLocation>> getAllLocations(String path) throws IOException {
Map<String, List<RemoteLocation>> locations = new HashMap<>();
try {
List<RemoteLocation> parentLocations = rpcServer.getLocationsForPath(path, false, false);
parentLocations.forEach(
l -> locations.computeIfAbsent(l.getNameserviceId(), k -> new ArrayList<>()).add(l));
} catch (NoLocationException | RouterResolveException e) {
LOG.debug("Cannot find locations for {}.", path);
}
final List<String> children = subclusterResolver.getMountPoints(path);
if (children != null) {
for (String child : children) {
String childPath = new Path(path, child).toUri().getPath();
Map<String, List<RemoteLocation>> childLocations = getAllLocations(childPath);
childLocations.forEach(
(k, v) -> locations.computeIfAbsent(k, l -> new ArrayList<>()).addAll(v));
}
}
return locations;
}
/**
* Get all the locations of the path for {@link this#getContentSummary(String)}.
* For example, there are some mount points:
* /a -> ns0 -> /a
* /a/b -> ns0 -> /a/b
* /a/b/c -> ns1 -> /a/b/c
* When the path is '/a', the result of locations should be
* [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')]
* When the path is '/b', will throw NoLocationException.
* @param path the path to get content summary
* @return one list contains all the remote location
* @throws IOException
*/
@VisibleForTesting
List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException {
// Try to get all the locations of the path.
final Map<String, List<RemoteLocation>> ns2Locations = getAllLocations(path);
if (ns2Locations.isEmpty()) {
throw new NoLocationException(path, subclusterResolver.getClass());
}
final List<RemoteLocation> locations = new ArrayList<>();
// remove the redundancy remoteLocation order by destination.
ns2Locations.forEach((k, v) -> {
List<RemoteLocation> sortedList = v.stream().sorted().collect(Collectors.toList());
int size = sortedList.size();
for (int i = size - 1; i > -1; i--) {
RemoteLocation currentLocation = sortedList.get(i);
if (i == 0) {
locations.add(currentLocation);
} else {
RemoteLocation preLocation = sortedList.get(i - 1);
if (!currentLocation.getDest().startsWith(preLocation.getDest() + Path.SEPARATOR)) {
locations.add(currentLocation);
} else {
LOG.debug("Ignore redundant location {}, because there is an ancestor location {}",
currentLocation, preLocation);
}
}
}
});
return locations;
}
@Override @Override
public ContentSummary getContentSummary(String path) throws IOException { public ContentSummary getContentSummary(String path) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ); rpcServer.checkOperation(NameNode.OperationCategory.READ);
// Get the summaries from regular files // Get the summaries from regular files
final Collection<ContentSummary> summaries = new ArrayList<>(); final Collection<ContentSummary> summaries = new ArrayList<>();
final List<RemoteLocation> locations = final List<RemoteLocation> locations = getLocationsForContentSummary(path);
rpcServer.getLocationsForPath(path, false, false);
final RemoteMethod method = new RemoteMethod("getContentSummary", final RemoteMethod method = new RemoteMethod("getContentSummary",
new Class<?>[] {String.class}, new RemoteParam()); new Class<?>[] {String.class}, new RemoteParam());
final List<RemoteResult<RemoteLocation, ContentSummary>> results = final List<RemoteResult<RemoteLocation, ContentSummary>> results =
@ -1278,24 +1358,6 @@ public ContentSummary getContentSummary(String path) throws IOException {
} }
} }
// Add mount points at this level in the tree
final List<String> children = subclusterResolver.getMountPoints(path);
if (children != null) {
for (String child : children) {
Path childPath = new Path(path, child);
try {
ContentSummary mountSummary = getContentSummary(
childPath.toString());
if (mountSummary != null) {
summaries.add(mountSummary);
}
} catch (Exception e) {
LOG.error("Cannot get content summary for mount {}: {}",
childPath, e.getMessage());
}
}
}
// Throw original exception if no original nor mount points // Throw original exception if no original nor mount points
if (summaries.isEmpty() && notFoundException != null) { if (summaries.isEmpty() && notFoundException != null) {
throw notFoundException; throw notFoundException;

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.hdfs.server.federation.router; package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
@ -25,6 +29,7 @@
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException; import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
@ -40,10 +45,13 @@
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** /**
* Test a router end-to-end including the MountTable without default nameservice. * Test a router end-to-end including the MountTable without default nameservice.
@ -53,6 +61,8 @@ public class TestRouterMountTableWithoutDefaultNS {
private static RouterContext routerContext; private static RouterContext routerContext;
private static MountTableResolver mountTable; private static MountTableResolver mountTable;
private static ClientProtocol routerProtocol; private static ClientProtocol routerProtocol;
private static FileSystem nnFs0;
private static FileSystem nnFs1;
@BeforeClass @BeforeClass
public static void globalSetUp() throws Exception { public static void globalSetUp() throws Exception {
@ -71,6 +81,8 @@ public static void globalSetUp() throws Exception {
cluster.waitClusterUp(); cluster.waitClusterUp();
// Get the end points // Get the end points
nnFs0 = cluster.getNamenode("ns0", null).getFileSystem();
nnFs1 = cluster.getNamenode("ns1", null).getFileSystem();
routerContext = cluster.getRandomRouter(); routerContext = cluster.getRandomRouter();
Router router = routerContext.getRouter(); Router router = routerContext.getRouter();
routerProtocol = routerContext.getClient().getNamenode(); routerProtocol = routerContext.getClient().getNamenode();
@ -144,4 +156,113 @@ public void testGetFileInfoWithoutSubMountPoint() throws Exception {
LambdaTestUtils.intercept(RouterResolveException.class, LambdaTestUtils.intercept(RouterResolveException.class,
() -> routerContext.getRouter().getRpcServer().getFileInfo("/testdir2")); () -> routerContext.getRouter().getRpcServer().getFileInfo("/testdir2"));
} }
/**
* Verify that RBF that disable default nameservice should support
* get information about ancestor mount points.
*/
@Test
public void testGetContentSummaryWithSubMountPoint() throws IOException {
MountTable addEntry = MountTable.newInstance("/testdir/1/2",
Collections.singletonMap("ns0", "/testdir/1/2"));
assertTrue(addMountTable(addEntry));
try {
writeData(nnFs0, new Path("/testdir/1/2/3"), 10 * 1024 * 1024);
RouterRpcServer routerRpcServer = routerContext.getRouterRpcServer();
ContentSummary summaryFromRBF = routerRpcServer.getContentSummary("/testdir");
assertNotNull(summaryFromRBF);
assertEquals(1, summaryFromRBF.getFileCount());
assertEquals(10 * 1024 * 1024, summaryFromRBF.getLength());
} finally {
nnFs0.delete(new Path("/testdir"), true);
}
}
@Test
public void testGetAllLocations() throws IOException {
// Add mount table entry.
MountTable addEntry = MountTable.newInstance("/testA",
Collections.singletonMap("ns0", "/testA"));
assertTrue(addMountTable(addEntry));
addEntry = MountTable.newInstance("/testA/testB",
Collections.singletonMap("ns1", "/testA/testB"));
assertTrue(addMountTable(addEntry));
addEntry = MountTable.newInstance("/testA/testB/testC",
Collections.singletonMap("ns2", "/testA/testB/testC"));
assertTrue(addMountTable(addEntry));
RouterClientProtocol protocol = routerContext.getRouterRpcServer().getClientProtocolModule();
Map<String, List<RemoteLocation>> locations = protocol.getAllLocations("/testA");
assertEquals(3, locations.size());
}
@Test
public void testGetLocationsForContentSummary() throws Exception {
// Add mount table entry.
MountTable addEntry = MountTable.newInstance("/testA/testB",
Collections.singletonMap("ns0", "/testA/testB"));
assertTrue(addMountTable(addEntry));
addEntry = MountTable.newInstance("/testA/testB/testC",
Collections.singletonMap("ns1", "/testA/testB/testC"));
assertTrue(addMountTable(addEntry));
RouterClientProtocol protocol = routerContext.getRouterRpcServer().getClientProtocolModule();
List<RemoteLocation> locations = protocol.getLocationsForContentSummary("/testA");
assertEquals(2, locations.size());
for (RemoteLocation location : locations) {
String nsId = location.getNameserviceId();
if ("ns0".equals(nsId)) {
assertEquals("/testA/testB", location.getDest());
} else if ("ns1".equals(nsId)) {
assertEquals("/testA/testB/testC", location.getDest());
} else {
fail("Unexpected NS " + nsId);
}
}
LambdaTestUtils.intercept(NoLocationException.class,
() -> protocol.getLocationsForContentSummary("/testB"));
}
@Test
public void testGetContentSummary() throws Exception {
try {
// Add mount table entry.
MountTable addEntry = MountTable.newInstance("/testA",
Collections.singletonMap("ns0", "/testA"));
assertTrue(addMountTable(addEntry));
addEntry = MountTable.newInstance("/testA/testB",
Collections.singletonMap("ns0", "/testA/testB"));
assertTrue(addMountTable(addEntry));
addEntry = MountTable.newInstance("/testA/testB/testC",
Collections.singletonMap("ns1", "/testA/testB/testC"));
assertTrue(addMountTable(addEntry));
writeData(nnFs0, new Path("/testA/testB/file1"), 1024 * 1024);
writeData(nnFs1, new Path("/testA/testB/testC/file2"), 1024 * 1024);
writeData(nnFs1, new Path("/testA/testB/testC/file3"), 1024 * 1024);
RouterRpcServer routerRpcServer = routerContext.getRouterRpcServer();
ContentSummary summary = routerRpcServer.getContentSummary("/testA");
assertEquals(3, summary.getFileCount());
assertEquals(1024 * 1024 * 3, summary.getLength());
LambdaTestUtils.intercept(NoLocationException.class,
() -> routerRpcServer.getContentSummary("/testB"));
} finally {
nnFs0.delete(new Path("/testA"), true);
nnFs1.delete(new Path("/testA"), true);
}
}
void writeData(FileSystem fs, Path path, int fileLength) throws IOException {
try (FSDataOutputStream outputStream = fs.create(path)) {
for (int writeSize = 0; writeSize < fileLength; writeSize++) {
outputStream.write(writeSize);
}
}
}
} }