HDFS-17134. RBF: Fix duplicate results of getListing through Router. (#5900). Contributed by Shuyan Zhang.

Reviewed-by: Ayush Saxena <ayushsaxena@apache.org>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
zhangshuyan 2023-08-01 17:52:54 +08:00 committed by GitHub
parent 030811bf85
commit 5aee0e0c0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 20 deletions

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing;
@ -101,10 +102,11 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
@ -819,6 +821,20 @@ public void renewLease(String clientName, List<String> namespaces)
}
}
/**
* For {@link #getListing(String,byte[],boolean) GetLisiting} to sort results.
*/
private static class GetListingComparator
implements Comparator<byte[]>, Serializable {
@Override
public int compare(byte[] o1, byte[] o2) {
return DFSUtilClient.compareBytes(o1, o2);
}
}
private static GetListingComparator comparator =
new GetListingComparator();
@Override
public DirectoryListing getListing(String src, byte[] startAfter,
boolean needLocation) throws IOException {
@ -826,13 +842,13 @@ public DirectoryListing getListing(String src, byte[] startAfter,
List<RemoteResult<RemoteLocation, DirectoryListing>> listings =
getListingInt(src, startAfter, needLocation);
TreeMap<String, HdfsFileStatus> nnListing = new TreeMap<>();
TreeMap<byte[], HdfsFileStatus> nnListing = new TreeMap<>(comparator);
int totalRemainingEntries = 0;
int remainingEntries = 0;
boolean namenodeListingExists = false;
// Check the subcluster listing with the smallest name to make sure
// no file is skipped across subclusters
String lastName = null;
byte[] lastName = null;
if (listings != null) {
for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) {
if (result.hasException()) {
@ -850,8 +866,9 @@ public DirectoryListing getListing(String src, byte[] startAfter,
int length = partialListing.length;
if (length > 0) {
HdfsFileStatus lastLocalEntry = partialListing[length-1];
String lastLocalName = lastLocalEntry.getLocalName();
if (lastName == null || lastName.compareTo(lastLocalName) > 0) {
byte[] lastLocalName = lastLocalEntry.getLocalNameInBytes();
if (lastName == null ||
comparator.compare(lastName, lastLocalName) > 0) {
lastName = lastLocalName;
}
}
@ -864,9 +881,9 @@ public DirectoryListing getListing(String src, byte[] startAfter,
if (listing != null) {
namenodeListingExists = true;
for (HdfsFileStatus file : listing.getPartialListing()) {
String filename = file.getLocalName();
byte[] filename = file.getLocalNameInBytes();
if (totalRemainingEntries > 0 &&
filename.compareTo(lastName) > 0) {
comparator.compare(filename, lastName) > 0) {
// Discarding entries further than the lastName
remainingEntries++;
} else {
@ -880,10 +897,6 @@ public DirectoryListing getListing(String src, byte[] startAfter,
// Add mount points at this level in the tree
final List<String> children = subclusterResolver.getMountPoints(src);
// Sort the list as the entries from subcluster are also sorted
if (children != null) {
Collections.sort(children);
}
if (children != null) {
// Get the dates for each mount point
Map<String, Long> dates = getMountPointDates(src);
@ -899,22 +912,24 @@ public DirectoryListing getListing(String src, byte[] startAfter,
getMountPointStatus(childPath.toString(), 0, date);
// if there is no subcluster path, always add mount point
byte[] bChild = DFSUtil.string2Bytes(child);
if (lastName == null) {
nnListing.put(child, dirStatus);
nnListing.put(bChild, dirStatus);
} else {
if (shouldAddMountPoint(child,
if (shouldAddMountPoint(bChild,
lastName, startAfter, remainingEntries)) {
// This may overwrite existing listing entries with the mount point
// TODO don't add if already there?
nnListing.put(child, dirStatus);
nnListing.put(bChild, dirStatus);
}
}
}
// Update the remaining count to include left mount points
if (nnListing.size() > 0) {
String lastListing = nnListing.lastKey();
byte[] lastListing = nnListing.lastKey();
for (int i = 0; i < children.size(); i++) {
if (children.get(i).compareTo(lastListing) > 0) {
byte[] bChild = DFSUtil.string2Bytes(children.get(i));
if (comparator.compare(bChild, lastListing) > 0) {
remainingEntries += (children.size() - i);
break;
}
@ -2320,13 +2335,14 @@ private List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
* @return
*/
private static boolean shouldAddMountPoint(
String mountPoint, String lastEntry, byte[] startAfter,
byte[] mountPoint, byte[] lastEntry, byte[] startAfter,
int remainingEntries) {
if (mountPoint.compareTo(DFSUtil.bytes2String(startAfter)) > 0 &&
mountPoint.compareTo(lastEntry) <= 0) {
if (comparator.compare(mountPoint, startAfter) > 0 &&
comparator.compare(mountPoint, lastEntry) <= 0) {
return true;
}
if (remainingEntries == 0 && mountPoint.compareTo(lastEntry) >= 0) {
if (remainingEntries == 0 &&
comparator.compare(mountPoint, lastEntry) >= 0) {
return true;
}
return false;

View File

@ -145,6 +145,8 @@
import org.slf4j.LoggerFactory;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
/**
@ -2278,4 +2280,43 @@ public void testDisableNodeUsageInRBFMetrics() throws JSONException {
long proxyOpAfterWithReEnable = federationRPCMetrics.getProxyOps();
assertEquals(proxyOpAfterWithDisable + 2, proxyOpAfterWithReEnable);
}
@Test
public void testGetListingOrder() throws Exception {
String ns1 = getCluster().getNameservices().get(1);
String destBasePath = cluster.getNamenodeTestDirectoryForNS(ns1);
final String testPath1 = destBasePath + "/ßtestGetListingOrder";
final String testPath2 = destBasePath + "/%testGetListingOrder";
final FileSystem fileSystem1 = getCluster().
getNamenode(ns1, null).getFileSystem();
try {
// Create the test file in ns1.
createFile(fileSystem1, testPath1, 32);
createFile(fileSystem1, testPath2, 32);
NamenodeContext nn = cluster.getNamenode(ns1, null);
FileStatus[] fileStatuses =
nn.getFileSystem().listStatus(new Path(destBasePath));
List<String> requiredPaths = Arrays.stream(fileStatuses)
.map(fileStatus -> fileStatus.getPath().getName())
.collect(Collectors.toList());
Iterator<String> requiredPathsIterator = requiredPaths.iterator();
// Fetch listing.
DirectoryListing listing =
routerProtocol.getListing(cluster.getFederatedTestDirectoryForNS(ns1),
HdfsFileStatus.EMPTY_NAME, false);
assertEquals(requiredPaths.size(), listing.getPartialListing().length);
// Match each path returned and verify order returned.
for (HdfsFileStatus f : listing.getPartialListing()) {
String fileName = requiredPathsIterator.next();
String currentFile = f.getFullPath(new Path("/")).getName();
assertEquals(currentFile, fileName);
}
} finally {
fileSystem1.delete(new Path(testPath1), true);
fileSystem1.delete(new Path(testPath2), true);
}
}
}