HDFS-15196. RBF: RouterRpcServer getListing cannot list large dirs correctly. Contributed by Fengnan Li.
(cherry picked from commit 80b877a72f52ef0f4acafe15db55b8ed61fbe6d2)
This commit is contained in:
parent
57d2fff84b
commit
327488d688
@ -103,6 +103,7 @@ import java.io.IOException;
|
|||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
@ -771,13 +772,14 @@ public class RouterClientProtocol implements ClientProtocol {
|
|||||||
|
|
||||||
List<RemoteResult<RemoteLocation, DirectoryListing>> listings =
|
List<RemoteResult<RemoteLocation, DirectoryListing>> listings =
|
||||||
getListingInt(src, startAfter, needLocation);
|
getListingInt(src, startAfter, needLocation);
|
||||||
Map<String, HdfsFileStatus> nnListing = new TreeMap<>();
|
TreeMap<String, HdfsFileStatus> nnListing = new TreeMap<>();
|
||||||
int totalRemainingEntries = 0;
|
int totalRemainingEntries = 0;
|
||||||
int remainingEntries = 0;
|
int remainingEntries = 0;
|
||||||
boolean namenodeListingExists = false;
|
boolean namenodeListingExists = false;
|
||||||
|
// Check the subcluster listing with the smallest name to make sure
|
||||||
|
// no file is skipped across subclusters
|
||||||
|
String lastName = null;
|
||||||
if (listings != null) {
|
if (listings != null) {
|
||||||
// Check the subcluster listing with the smallest name
|
|
||||||
String lastName = null;
|
|
||||||
for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) {
|
for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) {
|
||||||
if (result.hasException()) {
|
if (result.hasException()) {
|
||||||
IOException ioe = result.getException();
|
IOException ioe = result.getException();
|
||||||
@ -824,6 +826,10 @@ public class RouterClientProtocol implements ClientProtocol {
|
|||||||
|
|
||||||
// Add mount points at this level in the tree
|
// Add mount points at this level in the tree
|
||||||
final List<String> children = subclusterResolver.getMountPoints(src);
|
final List<String> children = subclusterResolver.getMountPoints(src);
|
||||||
|
// Sort the list as the entries from subcluster are also sorted
|
||||||
|
if (children != null) {
|
||||||
|
Collections.sort(children);
|
||||||
|
}
|
||||||
if (children != null) {
|
if (children != null) {
|
||||||
// Get the dates for each mount point
|
// Get the dates for each mount point
|
||||||
Map<String, Long> dates = getMountPointDates(src);
|
Map<String, Long> dates = getMountPointDates(src);
|
||||||
@ -838,9 +844,27 @@ public class RouterClientProtocol implements ClientProtocol {
|
|||||||
HdfsFileStatus dirStatus =
|
HdfsFileStatus dirStatus =
|
||||||
getMountPointStatus(childPath.toString(), 0, date);
|
getMountPointStatus(childPath.toString(), 0, date);
|
||||||
|
|
||||||
// This may overwrite existing listing entries with the mount point
|
// if there is no subcluster path, always add mount point
|
||||||
// TODO don't add if already there?
|
if (lastName == null) {
|
||||||
nnListing.put(child, dirStatus);
|
nnListing.put(child, dirStatus);
|
||||||
|
} else {
|
||||||
|
if (shouldAddMountPoint(child,
|
||||||
|
lastName, startAfter, remainingEntries)) {
|
||||||
|
// This may overwrite existing listing entries with the mount point
|
||||||
|
// TODO don't add if already there?
|
||||||
|
nnListing.put(child, dirStatus);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Update the remaining count to include left mount points
|
||||||
|
if (nnListing.size() > 0) {
|
||||||
|
String lastListing = nnListing.lastKey();
|
||||||
|
for (int i = 0; i < children.size(); i++) {
|
||||||
|
if (children.get(i).compareTo(lastListing) > 0) {
|
||||||
|
remainingEntries += (children.size() - i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2108,6 +2132,36 @@ public class RouterClientProtocol implements ClientProtocol {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if we should add the mount point into the total listing.
|
||||||
|
* This should be done under either of the two cases:
|
||||||
|
* 1) current mount point is between startAfter and cutoff lastEntry.
|
||||||
|
* 2) there are no remaining entries from subclusters and this mount
|
||||||
|
* point is bigger than all files from subclusters
|
||||||
|
* This is to make sure that the following batch of
|
||||||
|
* getListing call will use the correct startAfter, which is lastEntry from
|
||||||
|
* subcluster.
|
||||||
|
*
|
||||||
|
* @param mountPoint to be added mount point inside router
|
||||||
|
* @param lastEntry biggest listing from subcluster
|
||||||
|
* @param startAfter starting listing from client, used to define listing
|
||||||
|
* start boundary
|
||||||
|
* @param remainingEntries how many entries left from subcluster
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private static boolean shouldAddMountPoint(
|
||||||
|
String mountPoint, String lastEntry, byte[] startAfter,
|
||||||
|
int remainingEntries) {
|
||||||
|
if (mountPoint.compareTo(DFSUtil.bytes2String(startAfter)) > 0 &&
|
||||||
|
mountPoint.compareTo(lastEntry) <= 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (remainingEntries == 0 && mountPoint.compareTo(lastEntry) >= 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks if the path is a directory and is supposed to be present in all
|
* Checks if the path is a directory and is supposed to be present in all
|
||||||
* subclusters.
|
* subclusters.
|
||||||
|
@ -327,16 +327,30 @@ public class MockResolver
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<String> getMountPoints(String path) throws IOException {
|
public List<String> getMountPoints(String path) throws IOException {
|
||||||
// Mounts only supported under root level
|
|
||||||
if (!path.equals("/")) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
List<String> mounts = new ArrayList<>();
|
List<String> mounts = new ArrayList<>();
|
||||||
for (String mount : this.locations.keySet()) {
|
// for root path search, returning all downstream root level mapping
|
||||||
if (mount.length() > 1) {
|
if (path.equals("/")) {
|
||||||
// Remove leading slash, this is the behavior of the mount tree,
|
// Mounts only supported under root level
|
||||||
// return only names.
|
for (String mount : this.locations.keySet()) {
|
||||||
mounts.add(mount.replace("/", ""));
|
if (mount.length() > 1) {
|
||||||
|
// Remove leading slash, this is the behavior of the mount tree,
|
||||||
|
// return only names.
|
||||||
|
mounts.add(mount.replace("/", ""));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// a simplified version of MountTableResolver implementation
|
||||||
|
for (String key : this.locations.keySet()) {
|
||||||
|
if (key.startsWith(path)) {
|
||||||
|
String child = key.substring(path.length());
|
||||||
|
if (child.length() > 0) {
|
||||||
|
// only take children so remove parent path and /
|
||||||
|
mounts.add(key.substring(path.length()+1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (mounts.size() == 0) {
|
||||||
|
mounts = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return mounts;
|
return mounts;
|
||||||
|
@ -203,6 +203,9 @@ public class TestRouterRpc {
|
|||||||
cluster.addNamenodeOverrides(namenodeConf);
|
cluster.addNamenodeOverrides(namenodeConf);
|
||||||
cluster.setIndependentDNs();
|
cluster.setIndependentDNs();
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 5);
|
||||||
|
cluster.addNamenodeOverrides(conf);
|
||||||
// Start NNs and DNs and wait until ready
|
// Start NNs and DNs and wait until ready
|
||||||
cluster.startCluster();
|
cluster.startCluster();
|
||||||
|
|
||||||
@ -436,6 +439,62 @@ public class TestRouterRpc {
|
|||||||
new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false});
|
new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProxyListFilesLargeDir() throws IOException {
|
||||||
|
// Call listStatus against a dir with many files
|
||||||
|
// Create a parent point as well as a subfolder mount
|
||||||
|
// /parent
|
||||||
|
// ns0 -> /parent
|
||||||
|
// /parent/file-7
|
||||||
|
// ns0 -> /parent/file-7
|
||||||
|
// /parent/file-0
|
||||||
|
// ns0 -> /parent/file-0
|
||||||
|
for (RouterContext rc : cluster.getRouters()) {
|
||||||
|
MockResolver resolver =
|
||||||
|
(MockResolver) rc.getRouter().getSubclusterResolver();
|
||||||
|
resolver.addLocation("/parent", ns, "/parent");
|
||||||
|
// file-0 is only in mount table
|
||||||
|
resolver.addLocation("/parent/file-0", ns, "/parent/file-0");
|
||||||
|
// file-7 is both in mount table and in file system
|
||||||
|
resolver.addLocation("/parent/file-7", ns, "/parent/file-7");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test the case when there is no subcluster path and only mount point
|
||||||
|
FileStatus[] result = routerFS.listStatus(new Path("/parent"));
|
||||||
|
assertEquals(2, result.length);
|
||||||
|
// this makes sure file[0-8] is added in order
|
||||||
|
assertEquals("file-0", result[0].getPath().getName());
|
||||||
|
assertEquals("file-7", result[1].getPath().getName());
|
||||||
|
|
||||||
|
// Create files and test full listing in order
|
||||||
|
NamenodeContext nn = cluster.getNamenode(ns, null);
|
||||||
|
FileSystem nnFileSystem = nn.getFileSystem();
|
||||||
|
for (int i = 1; i < 9; i++) {
|
||||||
|
createFile(nnFileSystem, "/parent/file-"+i, 32);
|
||||||
|
}
|
||||||
|
|
||||||
|
result = routerFS.listStatus(new Path("/parent"));
|
||||||
|
assertEquals(9, result.length);
|
||||||
|
// this makes sure file[0-8] is added in order
|
||||||
|
for (int i = 0; i < 9; i++) {
|
||||||
|
assertEquals("file-"+i, result[i].getPath().getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add file-9 and now this listing will be added from mount point
|
||||||
|
for (RouterContext rc : cluster.getRouters()) {
|
||||||
|
MockResolver resolver =
|
||||||
|
(MockResolver) rc.getRouter().getSubclusterResolver();
|
||||||
|
resolver.addLocation("/parent/file-9", ns, "/parent/file-9");
|
||||||
|
}
|
||||||
|
assertFalse(verifyFileExists(nnFileSystem, "/parent/file-9"));
|
||||||
|
result = routerFS.listStatus(new Path("/parent"));
|
||||||
|
// file-9 will be added by mount point
|
||||||
|
assertEquals(10, result.length);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
assertEquals("file-"+i, result[i].getPath().getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testProxyListFilesWithConflict()
|
public void testProxyListFilesWithConflict()
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user