diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 20945cf3de..ba0abc11e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; @@ -101,10 +102,11 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.Serializable; import java.net.ConnectException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; +import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; @@ -819,6 +821,20 @@ public void renewLease(String clientName, List namespaces) } } + /** + * For {@link #getListing(String,byte[],boolean) GetLisiting} to sort results. + */ + private static class GetListingComparator + implements Comparator, Serializable { + @Override + public int compare(byte[] o1, byte[] o2) { + return DFSUtilClient.compareBytes(o1, o2); + } + } + + private static GetListingComparator comparator = + new GetListingComparator(); + @Override public DirectoryListing getListing(String src, byte[] startAfter, boolean needLocation) throws IOException { @@ -826,13 +842,13 @@ public DirectoryListing getListing(String src, byte[] startAfter, List> listings = getListingInt(src, startAfter, needLocation); - TreeMap nnListing = new TreeMap<>(); + TreeMap nnListing = new TreeMap<>(comparator); int totalRemainingEntries = 0; int remainingEntries = 0; boolean namenodeListingExists = false; // Check the subcluster listing with the smallest name to make sure // no file is skipped across subclusters - String lastName = null; + byte[] lastName = null; if (listings != null) { for (RemoteResult result : listings) { if (result.hasException()) { @@ -850,8 +866,9 @@ public DirectoryListing getListing(String src, byte[] startAfter, int length = partialListing.length; if (length > 0) { HdfsFileStatus lastLocalEntry = partialListing[length-1]; - String lastLocalName = lastLocalEntry.getLocalName(); - if (lastName == null || lastName.compareTo(lastLocalName) > 0) { + byte[] lastLocalName = lastLocalEntry.getLocalNameInBytes(); + if (lastName == null || + comparator.compare(lastName, lastLocalName) > 0) { lastName = lastLocalName; } } @@ -864,9 +881,9 @@ public DirectoryListing getListing(String src, byte[] startAfter, if (listing != null) { namenodeListingExists = true; for (HdfsFileStatus file : listing.getPartialListing()) { - String filename = file.getLocalName(); + byte[] filename = file.getLocalNameInBytes(); if (totalRemainingEntries > 0 && - filename.compareTo(lastName) > 0) { + comparator.compare(filename, lastName) > 0) { // Discarding entries further than the lastName remainingEntries++; } else { @@ -880,10 +897,6 @@ public DirectoryListing getListing(String src, byte[] startAfter, // Add mount points at this level in the tree final List children = subclusterResolver.getMountPoints(src); - // Sort the list as the entries from subcluster are also sorted - if (children != null) { - Collections.sort(children); - } if (children != null) { // Get the dates for each mount point Map dates = getMountPointDates(src); @@ -899,22 +912,24 @@ public DirectoryListing getListing(String src, byte[] startAfter, getMountPointStatus(childPath.toString(), 0, date); // if there is no subcluster path, always add mount point + byte[] bChild = DFSUtil.string2Bytes(child); if (lastName == null) { - nnListing.put(child, dirStatus); + nnListing.put(bChild, dirStatus); } else { - if (shouldAddMountPoint(child, + if (shouldAddMountPoint(bChild, lastName, startAfter, remainingEntries)) { // This may overwrite existing listing entries with the mount point // TODO don't add if already there? - nnListing.put(child, dirStatus); + nnListing.put(bChild, dirStatus); } } } // Update the remaining count to include left mount points if (nnListing.size() > 0) { - String lastListing = nnListing.lastKey(); + byte[] lastListing = nnListing.lastKey(); for (int i = 0; i < children.size(); i++) { - if (children.get(i).compareTo(lastListing) > 0) { + byte[] bChild = DFSUtil.string2Bytes(children.get(i)); + if (comparator.compare(bChild, lastListing) > 0) { remainingEntries += (children.size() - i); break; } @@ -2320,13 +2335,14 @@ private List> getListingInt( * @return */ private static boolean shouldAddMountPoint( - String mountPoint, String lastEntry, byte[] startAfter, + byte[] mountPoint, byte[] lastEntry, byte[] startAfter, int remainingEntries) { - if (mountPoint.compareTo(DFSUtil.bytes2String(startAfter)) > 0 && - mountPoint.compareTo(lastEntry) <= 0) { + if (comparator.compare(mountPoint, startAfter) > 0 && + comparator.compare(mountPoint, lastEntry) <= 0) { return true; } - if (remainingEntries == 0 && mountPoint.compareTo(lastEntry) >= 0) { + if (remainingEntries == 0 && + comparator.compare(mountPoint, lastEntry) >= 0) { return true; } return false; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index d3d3421619..d44b40b052 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -145,6 +145,8 @@ import org.slf4j.LoggerFactory; import java.util.function.Supplier; +import java.util.stream.Collectors; + import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; /** @@ -2278,4 +2280,43 @@ public void testDisableNodeUsageInRBFMetrics() throws JSONException { long proxyOpAfterWithReEnable = federationRPCMetrics.getProxyOps(); assertEquals(proxyOpAfterWithDisable + 2, proxyOpAfterWithReEnable); } + + @Test + public void testGetListingOrder() throws Exception { + String ns1 = getCluster().getNameservices().get(1); + String destBasePath = cluster.getNamenodeTestDirectoryForNS(ns1); + final String testPath1 = destBasePath + "/ßtestGetListingOrder"; + final String testPath2 = destBasePath + "/%testGetListingOrder"; + final FileSystem fileSystem1 = getCluster(). + getNamenode(ns1, null).getFileSystem(); + + try { + // Create the test file in ns1. + createFile(fileSystem1, testPath1, 32); + createFile(fileSystem1, testPath2, 32); + + NamenodeContext nn = cluster.getNamenode(ns1, null); + FileStatus[] fileStatuses = + nn.getFileSystem().listStatus(new Path(destBasePath)); + List requiredPaths = Arrays.stream(fileStatuses) + .map(fileStatus -> fileStatus.getPath().getName()) + .collect(Collectors.toList()); + Iterator requiredPathsIterator = requiredPaths.iterator(); + + // Fetch listing. + DirectoryListing listing = + routerProtocol.getListing(cluster.getFederatedTestDirectoryForNS(ns1), + HdfsFileStatus.EMPTY_NAME, false); + assertEquals(requiredPaths.size(), listing.getPartialListing().length); + // Match each path returned and verify order returned. + for (HdfsFileStatus f : listing.getPartialListing()) { + String fileName = requiredPathsIterator.next(); + String currentFile = f.getFullPath(new Path("/")).getName(); + assertEquals(currentFile, fileName); + } + } finally { + fileSystem1.delete(new Path(testPath1), true); + fileSystem1.delete(new Path(testPath2), true); + } + } }