HDFS-15196. RBF: RouterRpcServer getListing cannot list large dirs correctly. Contributed by Fengnan Li.

This commit is contained in:
Inigo Goiri 2020-03-30 12:29:21 -07:00
parent 960c9ebaea
commit 80b877a72f
3 changed files with 142 additions and 15 deletions

View File

@ -103,6 +103,7 @@
import java.net.ConnectException; import java.net.ConnectException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -771,13 +772,14 @@ public DirectoryListing getListing(String src, byte[] startAfter,
List<RemoteResult<RemoteLocation, DirectoryListing>> listings = List<RemoteResult<RemoteLocation, DirectoryListing>> listings =
getListingInt(src, startAfter, needLocation); getListingInt(src, startAfter, needLocation);
Map<String, HdfsFileStatus> nnListing = new TreeMap<>(); TreeMap<String, HdfsFileStatus> nnListing = new TreeMap<>();
int totalRemainingEntries = 0; int totalRemainingEntries = 0;
int remainingEntries = 0; int remainingEntries = 0;
boolean namenodeListingExists = false; boolean namenodeListingExists = false;
if (listings != null) { // Check the subcluster listing with the smallest name to make sure
// Check the subcluster listing with the smallest name // no file is skipped across subclusters
String lastName = null; String lastName = null;
if (listings != null) {
for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) { for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) {
if (result.hasException()) { if (result.hasException()) {
IOException ioe = result.getException(); IOException ioe = result.getException();
@ -824,6 +826,10 @@ public DirectoryListing getListing(String src, byte[] startAfter,
// Add mount points at this level in the tree // Add mount points at this level in the tree
final List<String> children = subclusterResolver.getMountPoints(src); final List<String> children = subclusterResolver.getMountPoints(src);
// Sort the list as the entries from subcluster are also sorted
if (children != null) {
Collections.sort(children);
}
if (children != null) { if (children != null) {
// Get the dates for each mount point // Get the dates for each mount point
Map<String, Long> dates = getMountPointDates(src); Map<String, Long> dates = getMountPointDates(src);
@ -838,11 +844,29 @@ public DirectoryListing getListing(String src, byte[] startAfter,
HdfsFileStatus dirStatus = HdfsFileStatus dirStatus =
getMountPointStatus(childPath.toString(), 0, date); getMountPointStatus(childPath.toString(), 0, date);
// if there is no subcluster path, always add mount point
if (lastName == null) {
nnListing.put(child, dirStatus);
} else {
if (shouldAddMountPoint(child,
lastName, startAfter, remainingEntries)) {
// This may overwrite existing listing entries with the mount point // This may overwrite existing listing entries with the mount point
// TODO don't add if already there? // TODO don't add if already there?
nnListing.put(child, dirStatus); nnListing.put(child, dirStatus);
} }
} }
}
// Update the remaining count to include left mount points
if (nnListing.size() > 0) {
String lastListing = nnListing.lastKey();
for (int i = 0; i < children.size(); i++) {
if (children.get(i).compareTo(lastListing) > 0) {
remainingEntries += (children.size() - i);
break;
}
}
}
}
if (!namenodeListingExists && nnListing.size() == 0) { if (!namenodeListingExists && nnListing.size() == 0) {
// NN returns a null object if the directory cannot be found and has no // NN returns a null object if the directory cannot be found and has no
@ -2107,6 +2131,36 @@ private List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
} }
} }
/**
* Check if we should add the mount point into the total listing.
* This should be done under either of the two cases:
* 1) current mount point is between startAfter and cutoff lastEntry.
* 2) there are no remaining entries from subclusters and this mount
* point is bigger than all files from subclusters
* This is to make sure that the following batch of
* getListing call will use the correct startAfter, which is lastEntry from
* subcluster.
*
* @param mountPoint to be added mount point inside router
* @param lastEntry biggest listing from subcluster
* @param startAfter starting listing from client, used to define listing
* start boundary
* @param remainingEntries how many entries left from subcluster
* @return
*/
private static boolean shouldAddMountPoint(
String mountPoint, String lastEntry, byte[] startAfter,
int remainingEntries) {
if (mountPoint.compareTo(DFSUtil.bytes2String(startAfter)) > 0 &&
mountPoint.compareTo(lastEntry) <= 0) {
return true;
}
if (remainingEntries == 0 && mountPoint.compareTo(lastEntry) >= 0) {
return true;
}
return false;
}
/** /**
* Checks if the path is a directory and is supposed to be present in all * Checks if the path is a directory and is supposed to be present in all
* subclusters. * subclusters.

View File

@ -327,11 +327,10 @@ public PathLocation getDestinationForPath(String path) throws IOException {
@Override @Override
public List<String> getMountPoints(String path) throws IOException { public List<String> getMountPoints(String path) throws IOException {
// Mounts only supported under root level
if (!path.equals("/")) {
return null;
}
List<String> mounts = new ArrayList<>(); List<String> mounts = new ArrayList<>();
// for root path search, returning all downstream root level mapping
if (path.equals("/")) {
// Mounts only supported under root level
for (String mount : this.locations.keySet()) { for (String mount : this.locations.keySet()) {
if (mount.length() > 1) { if (mount.length() > 1) {
// Remove leading slash, this is the behavior of the mount tree, // Remove leading slash, this is the behavior of the mount tree,
@ -339,6 +338,21 @@ public List<String> getMountPoints(String path) throws IOException {
mounts.add(mount.replace("/", "")); mounts.add(mount.replace("/", ""));
} }
} }
} else {
// a simplified version of MountTableResolver implementation
for (String key : this.locations.keySet()) {
if (key.startsWith(path)) {
String child = key.substring(path.length());
if (child.length() > 0) {
// only take children so remove parent path and /
mounts.add(key.substring(path.length()+1));
}
}
}
if (mounts.size() == 0) {
mounts = null;
}
}
return mounts; return mounts;
} }

View File

@ -196,6 +196,9 @@ public static void globalSetUp() throws Exception {
cluster.setNumDatanodesPerNameservice(NUM_DNS); cluster.setNumDatanodesPerNameservice(NUM_DNS);
cluster.setIndependentDNs(); cluster.setIndependentDNs();
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 5);
cluster.addNamenodeOverrides(conf);
// Start NNs and DNs and wait until ready // Start NNs and DNs and wait until ready
cluster.startCluster(); cluster.startCluster();
@ -429,6 +432,62 @@ public void testProxyListFiles() throws IOException, InterruptedException,
new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false}); new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false});
} }
@Test
public void testProxyListFilesLargeDir() throws IOException {
// Call listStatus against a dir with many files
// Create a parent point as well as a subfolder mount
// /parent
// ns0 -> /parent
// /parent/file-7
// ns0 -> /parent/file-7
// /parent/file-0
// ns0 -> /parent/file-0
for (RouterContext rc : cluster.getRouters()) {
MockResolver resolver =
(MockResolver) rc.getRouter().getSubclusterResolver();
resolver.addLocation("/parent", ns, "/parent");
// file-0 is only in mount table
resolver.addLocation("/parent/file-0", ns, "/parent/file-0");
// file-7 is both in mount table and in file system
resolver.addLocation("/parent/file-7", ns, "/parent/file-7");
}
// Test the case when there is no subcluster path and only mount point
FileStatus[] result = routerFS.listStatus(new Path("/parent"));
assertEquals(2, result.length);
// this makes sure file[0-8] is added in order
assertEquals("file-0", result[0].getPath().getName());
assertEquals("file-7", result[1].getPath().getName());
// Create files and test full listing in order
NamenodeContext nn = cluster.getNamenode(ns, null);
FileSystem nnFileSystem = nn.getFileSystem();
for (int i = 1; i < 9; i++) {
createFile(nnFileSystem, "/parent/file-"+i, 32);
}
result = routerFS.listStatus(new Path("/parent"));
assertEquals(9, result.length);
// this makes sure file[0-8] is added in order
for (int i = 0; i < 9; i++) {
assertEquals("file-"+i, result[i].getPath().getName());
}
// Add file-9 and now this listing will be added from mount point
for (RouterContext rc : cluster.getRouters()) {
MockResolver resolver =
(MockResolver) rc.getRouter().getSubclusterResolver();
resolver.addLocation("/parent/file-9", ns, "/parent/file-9");
}
assertFalse(verifyFileExists(nnFileSystem, "/parent/file-9"));
result = routerFS.listStatus(new Path("/parent"));
// file-9 will be added by mount point
assertEquals(10, result.length);
for (int i = 0; i < 10; i++) {
assertEquals("file-"+i, result[i].getPath().getName());
}
}
@Test @Test
public void testProxyListFilesWithConflict() public void testProxyListFilesWithConflict()
throws IOException, InterruptedException { throws IOException, InterruptedException {