From d7c4f8ab21c56a52afcfbd0a56d9120e61376d0c Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 15 Jan 2020 17:22:47 -0800 Subject: [PATCH] HDFS-13616. Batch listing of multiple directories (#1725) --- .../java/org/apache/hadoop/fs/FileSystem.java | 27 ++ .../org/apache/hadoop/fs/PartialListing.java | 91 ++++ .../hadoop/fs/TestFilterFileSystem.java | 5 + .../apache/hadoop/fs/TestHarFileSystem.java | 4 + .../dev-support/findbugsExcludeFile.xml | 1 + .../org/apache/hadoop/hdfs/DFSClient.java | 21 +- .../hadoop/hdfs/DistributedFileSystem.java | 111 +++++ .../protocol/BatchedDirectoryListing.java | 62 +++ .../hadoop/hdfs/protocol/ClientProtocol.java | 18 + .../hdfs/protocol/HdfsPartialListing.java | 82 ++++ .../ClientNamenodeProtocolTranslatorPB.java | 50 +++ .../hdfs/protocolPB/PBHelperClient.java | 12 + .../main/proto/ClientNamenodeProtocol.proto | 13 + .../src/main/proto/hdfs.proto | 18 + .../hadoop/hdfs/protocol/TestReadOnly.java | 1 + .../router/RouterClientProtocol.java | 7 + .../federation/router/RouterRpcServer.java | 8 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + ...amenodeProtocolServerSideTranslatorPB.java | 62 ++- .../hdfs/server/namenode/FSDirectory.java | 4 + .../hdfs/server/namenode/FSNamesystem.java | 177 +++++++- .../server/namenode/NameNodeRpcServer.java | 23 + .../src/main/resources/hdfs-default.xml | 10 + .../apache/hadoop/hdfs/ListingBenchmark.java | 35 ++ .../hdfs/TestBatchedListDirectories.java | 410 ++++++++++++++++++ 25 files changed, 1249 insertions(+), 5 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartialListing.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchedDirectoryListing.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsPartialListing.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ListingBenchmark.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchedListDirectories.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index d2fddf8d8e..46f885f27f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -2207,6 +2207,33 @@ public RemoteIterator listStatusIterator(final Path p) return new DirListingIterator<>(p); } + /** + * Batched listing API that returns {@link PartialListing}s for the + * passed Paths. + * + * @param paths List of paths to list. + * @return RemoteIterator that returns corresponding PartialListings. + * @throws IOException + */ + public RemoteIterator> batchedListStatusIterator( + final List paths) throws IOException { + throw new UnsupportedOperationException("Not implemented"); + } + + /** + * Batched listing API that returns {@link PartialListing}s for the passed + * Paths. The PartialListing will contain {@link LocatedFileStatus} entries + * with locations. + * + * @param paths List of paths to list. + * @return RemoteIterator that returns corresponding PartialListings. + * @throws IOException + */ + public RemoteIterator> batchedListLocatedStatusIterator( + final List paths) throws IOException { + throw new UnsupportedOperationException("Not implemented"); + } + /** * List the statuses and block locations of the files in the given path. * Does not guarantee to return the iterator that traverses statuses diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartialListing.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartialListing.java new file mode 100644 index 0000000000..58b6612490 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartialListing.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ipc.RemoteException; + +import java.io.IOException; +import java.util.List; + +/** + * A partial listing of the children of a parent directory. Since it is a + * partial listing, multiple PartialListing may need to be combined to obtain + * the full listing of a parent directory. + *

+ * ListingBatch behaves similar to a Future, in that getting the result via + * {@link #get()} will throw an Exception if there was a failure. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class PartialListing { + private final Path listedPath; + private final List partialListing; + private final RemoteException exception; + + public PartialListing(Path listedPath, List partialListing) { + this(listedPath, partialListing, null); + } + + public PartialListing(Path listedPath, RemoteException exception) { + this(listedPath, null, exception); + } + + private PartialListing(Path listedPath, List partialListing, + RemoteException exception) { + Preconditions.checkArgument(partialListing == null ^ exception == null); + this.partialListing = partialListing; + this.listedPath = listedPath; + this.exception = exception; + } + + /** + * Partial listing of the path being listed. In the case where the path is + * a file. The list will be a singleton with the file itself. + * + * @return Partial listing of the path being listed. + * @throws IOException if there was an exception getting the listing. + */ + public List get() throws IOException { + if (exception != null) { + throw exception.unwrapRemoteException(); + } + return partialListing; + } + + /** + * Path being listed. + * + * @return the path being listed. + */ + public Path getListedPath() { + return listedPath; + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("listedPath", listedPath) + .append("partialListing", partialListing) + .append("exception", exception) + .toString(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java index f0057a6c6d..9161be3148 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java @@ -27,6 +27,7 @@ import java.net.URI; import java.util.EnumSet; import java.util.Iterator; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; @@ -105,6 +106,10 @@ public FSDataOutputStream create(Path f, FsPermission permission, public FileStatus[] listStatusBatch(Path f, byte[] token); public FileStatus[] listStatus(Path[] files); public FileStatus[] listStatus(Path[] files, PathFilter filter); + public RemoteIterator> batchedListLocatedStatusIterator( + final List paths) throws IOException; + public RemoteIterator> batchedListStatusIterator( + final List paths) throws IOException; public FileStatus[] globStatus(Path pathPattern); public FileStatus[] globStatus(Path pathPattern, PathFilter filter); public Iterator listFiles(Path path, diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java index 3b923e05bd..6fc777b984 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java @@ -124,6 +124,10 @@ public FSDataOutputStream create(Path f, FsPermission permission, public FileStatus[] listStatusBatch(Path f, byte[] token); public FileStatus[] listStatus(Path[] files); public FileStatus[] listStatus(Path[] files, PathFilter filter); + public RemoteIterator> batchedListLocatedStatusIterator( + final List paths) throws IOException; + public RemoteIterator> batchedListStatusIterator( + final List paths) throws IOException; public FileStatus[] globStatus(Path pathPattern); public FileStatus[] globStatus(Path pathPattern, PathFilter filter); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml index fa9654b16a..3d85a1336d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml @@ -6,6 +6,7 @@ + diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index f8d85d0b21..08d708b740 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -105,6 +105,7 @@ import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; +import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -1675,6 +1676,24 @@ public DirectoryListing listPaths(String src, byte[] startAfter, } } + /** + * Get a batched listing for the indicated directories + * + * @see ClientProtocol#getBatchedListing(String[], byte[], boolean) + */ + public BatchedDirectoryListing batchedListPaths( + String[] srcs, byte[] startAfter, boolean needLocation) + throws IOException { + checkOpen(); + try { + return namenode.getBatchedListing(srcs, startAfter, needLocation); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + UnresolvedPathException.class); + } + } + /** * Get the file info for a specific file or directory. * @param src The string representation of the path to the file @@ -1694,7 +1713,7 @@ public HdfsFileStatus getFileInfo(String src) throws IOException { } } - /** + /** * Get the file info for a specific file or directory. * @param src The string representation of the path to the file * @param needBlockToken Include block tokens in {@link LocatedBlocks}. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 5a05ffef3b..af3025c021 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.commons.collections.list.TreeList; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; @@ -49,6 +50,7 @@ import org.apache.hadoop.fs.GlobalStorageStatistics; import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider; import org.apache.hadoop.fs.InvalidPathHandleException; +import org.apache.hadoop.fs.PartialListing; import org.apache.hadoop.fs.PathHandle; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Options; @@ -73,6 +75,7 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; +import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -81,6 +84,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsPartialListing; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; @@ -108,6 +112,8 @@ import org.apache.hadoop.security.token.DelegationTokenIssuer; import org.apache.hadoop.util.ChunkedArrayList; import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import java.io.FileNotFoundException; @@ -120,6 +126,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Optional; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; @@ -1292,6 +1299,110 @@ public T next() throws IOException { } } + @Override + public RemoteIterator> batchedListStatusIterator( + final List paths) throws IOException { + List absPaths = Lists.newArrayListWithCapacity(paths.size()); + for (Path p : paths) { + absPaths.add(fixRelativePart(p)); + } + return new PartialListingIterator<>(absPaths, false); + } + + @Override + public RemoteIterator> batchedListLocatedStatusIterator( + final List paths) throws IOException { + List absPaths = Lists.newArrayListWithCapacity(paths.size()); + for (Path p : paths) { + absPaths.add(fixRelativePart(p)); + } + return new PartialListingIterator<>(absPaths, true); + } + + private static final Logger LBI_LOG = + LoggerFactory.getLogger(PartialListingIterator.class); + + private class PartialListingIterator + implements RemoteIterator> { + + private List paths; + private String[] srcs; + private boolean needLocation; + private BatchedDirectoryListing batchedListing; + private int listingIdx = 0; + + PartialListingIterator(List paths, boolean needLocation) + throws IOException { + this.paths = paths; + this.srcs = new String[paths.size()]; + for (int i = 0; i < paths.size(); i++) { + this.srcs[i] = getPathName(paths.get(i)); + } + this.needLocation = needLocation; + + // Do the first listing + statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.LIST_LOCATED_STATUS); + batchedListing = dfs.batchedListPaths( + srcs, HdfsFileStatus.EMPTY_NAME, needLocation); + LBI_LOG.trace("Got batchedListing: {}", batchedListing); + if (batchedListing == null) { // the directory does not exist + throw new FileNotFoundException("One or more paths do not exist."); + } + } + + @Override + public boolean hasNext() throws IOException { + if (batchedListing == null) { + return false; + } + // If we're done with the current batch, try to get the next batch + if (listingIdx >= batchedListing.getListings().length) { + if (!batchedListing.hasMore()) { + LBI_LOG.trace("No more elements"); + return false; + } + batchedListing = dfs.batchedListPaths( + srcs, batchedListing.getStartAfter(), needLocation); + LBI_LOG.trace("Got batchedListing: {}", batchedListing); + listingIdx = 0; + } + return listingIdx < batchedListing.getListings().length; + } + + @Override + @SuppressWarnings("unchecked") + public PartialListing next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException("No more entries"); + } + HdfsPartialListing listing = batchedListing.getListings()[listingIdx]; + listingIdx++; + + Path parent = paths.get(listing.getParentIdx()); + + if (listing.getException() != null) { + return new PartialListing<>(parent, listing.getException()); + } + + // Qualify paths for the client. + List statuses = listing.getPartialListing(); + List qualifiedStatuses = + Lists.newArrayListWithCapacity(statuses.size()); + + for (HdfsFileStatus status : statuses) { + if (needLocation) { + qualifiedStatuses.add((T)((HdfsLocatedFileStatus) status) + .makeQualifiedLocated(getUri(), parent)); + } else { + qualifiedStatuses.add((T)status.makeQualified(getUri(), parent)); + } + } + + return new PartialListing<>(parent, qualifiedStatuses); + } + } + /** * Create a directory, only when the parent directories exist. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchedDirectoryListing.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchedDirectoryListing.java new file mode 100644 index 0000000000..b56d4428b2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchedDirectoryListing.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A struct-like class for holding partial listings returned by the batched + * listing API. This class is used internally by the HDFS client and namenode + * and is not meant for public consumption. + */ +@InterfaceAudience.Private +public class BatchedDirectoryListing { + + private final HdfsPartialListing[] listings; + private final boolean hasMore; + private final byte[] startAfter; + + public BatchedDirectoryListing(HdfsPartialListing[] listings, + boolean hasMore, byte[] startAfter) { + this.listings = listings; + this.hasMore = hasMore; + this.startAfter = startAfter; + } + + public HdfsPartialListing[] getListings() { + return listings; + } + + public boolean hasMore() { + return hasMore; + } + + public byte[] getStartAfter() { + return startAfter; + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("listings", listings) + .append("hasMore", hasMore) + .append("startAfter", startAfter) + .toString(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 35969a384e..58d033991b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -697,6 +697,24 @@ boolean mkdirs(String src, FsPermission masked, boolean createParent) DirectoryListing getListing(String src, byte[] startAfter, boolean needLocation) throws IOException; + /** + * Get a partial listing of the input directories + * + * @param srcs the input directories + * @param startAfter the name to start listing after encoded in Java UTF8 + * @param needLocation if the FileStatus should contain block locations + * + * @return a partial listing starting after startAfter. null if the input is + * empty + * @throws IOException if an I/O error occurred + */ + @Idempotent + @ReadOnly(isCoordinated = true) + BatchedDirectoryListing getBatchedListing( + String[] srcs, + byte[] startAfter, + boolean needLocation) throws IOException; + /** * Get the list of snapshottable directories that are owned * by the current user. Return all the snapshottable directories if the diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsPartialListing.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsPartialListing.java new file mode 100644 index 0000000000..c3e0ac585e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsPartialListing.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.ipc.RemoteException; + +import java.util.List; + +/** + * A partial listing returned by the batched listing API. This is used + * internally by the HDFS client and namenode and is not meant for public + * consumption. + */ +@InterfaceAudience.Private +public class HdfsPartialListing { + + private final List partialListing; + private final int parentIdx; + private final RemoteException exception; + + public HdfsPartialListing( + int parentIdx, + List partialListing) { + this(parentIdx, partialListing, null); + } + + public HdfsPartialListing( + int parentIdx, + RemoteException exception) { + this(parentIdx, null, exception); + } + + private HdfsPartialListing( + int parentIdx, + List partialListing, + RemoteException exception) { + Preconditions.checkArgument(partialListing == null ^ exception == null); + this.parentIdx = parentIdx; + this.partialListing = partialListing; + this.exception = exception; + } + + public int getParentIdx() { + return parentIdx; + } + + public List getPartialListing() { + return partialListing; + } + + public RemoteException getException() { + return exception; + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("partialListing", partialListing) + .append("parentIdx", parentIdx) + .append("exception", exception) + .toString(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index a23ae48de3..eb1d6886ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; +import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -63,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsPartialListing; import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; @@ -114,6 +116,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBatchedListingRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBatchedListingResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto; @@ -216,6 +220,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.SetErasureCodingPolicyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCodingPolicyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CodecProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BatchedDirectoryListingProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto; @@ -233,6 +238,7 @@ import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto; @@ -686,6 +692,50 @@ public DirectoryListing getListing(String src, byte[] startAfter, } } + @Override + public BatchedDirectoryListing getBatchedListing( + String[] srcs, byte[] startAfter, boolean needLocation) + throws IOException { + GetBatchedListingRequestProto req = GetBatchedListingRequestProto + .newBuilder() + .addAllPaths(Arrays.asList(srcs)) + .setStartAfter(ByteString.copyFrom(startAfter)) + .setNeedLocation(needLocation).build(); + try { + GetBatchedListingResponseProto result = + rpcProxy.getBatchedListing(null, req); + + if (result.getListingsCount() > 0) { + HdfsPartialListing[] listingArray = + new HdfsPartialListing[result.getListingsCount()]; + int listingIdx = 0; + for (BatchedDirectoryListingProto proto : result.getListingsList()) { + HdfsPartialListing listing; + if (proto.hasException()) { + HdfsProtos.RemoteExceptionProto reProto = proto.getException(); + RemoteException ex = new RemoteException( + reProto.getClassName(), reProto.getMessage()); + listing = new HdfsPartialListing(proto.getParentIdx(), ex); + } else { + List statuses = + PBHelperClient.convertHdfsFileStatus( + proto.getPartialListingList()); + listing = new HdfsPartialListing(proto.getParentIdx(), statuses); + } + listingArray[listingIdx++] = listing; + } + BatchedDirectoryListing batchedListing = + new BatchedDirectoryListing(listingArray, result.getHasMore(), + result.getStartAfter().toByteArray()); + return batchedListing; + } + return null; + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override public void renewLease(String clientName) throws IOException { RenewLeaseRequestProto req = RenewLeaseRequestProto.newBuilder() diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 691ac54ff2..57b2f92a2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -2063,6 +2063,18 @@ public static HdfsFileStatus[] convert(HdfsFileStatusProto[] fs) { return result; } + public static List convertHdfsFileStatus( + List fs) { + if (fs == null) { + return null; + } + List result = Lists.newArrayListWithCapacity(fs.size()); + for (HdfsFileStatusProto proto : fs) { + result.add(convert(proto)); + } + return result; + } + // The creatFlag field in PB is a bitmask whose values are the same a the // emum values of CreateFlag public static int convertCreateFlag(EnumSetWritable flag) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index ec229797b3..bb94b0c854 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -285,6 +285,18 @@ message GetListingResponseProto { optional DirectoryListingProto dirList = 1; } +message GetBatchedListingRequestProto { + repeated string paths = 1; + required bytes startAfter = 2; + required bool needLocation = 3; +} + +message GetBatchedListingResponseProto { + repeated BatchedDirectoryListingProto listings = 1; + required bool hasMore = 2; + required bytes startAfter = 3; +} + message GetSnapshottableDirListingRequestProto { // no input parameters } message GetSnapshottableDirListingResponseProto { @@ -887,6 +899,7 @@ service ClientNamenodeProtocol { rpc delete(DeleteRequestProto) returns(DeleteResponseProto); rpc mkdirs(MkdirsRequestProto) returns(MkdirsResponseProto); rpc getListing(GetListingRequestProto) returns(GetListingResponseProto); + rpc getBatchedListing (GetBatchedListingRequestProto) returns (GetBatchedListingResponseProto); rpc renewLease(RenewLeaseRequestProto) returns(RenewLeaseResponseProto); rpc recoverLease(RecoverLeaseRequestProto) returns(RecoverLeaseResponseProto); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index 58a3d598f4..a72328b571 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -274,6 +274,12 @@ message LocatedBlockProto { repeated hadoop.common.TokenProto blockTokens = 10; // each internal block has a block token } +message BatchedListingKeyProto { + required bytes checksum = 1; + required uint32 pathIndex = 2; + required bytes startAfter = 3; +} + message DataEncryptionKeyProto { required uint32 keyId = 1; required string blockPoolId = 2; @@ -526,6 +532,18 @@ message DirectoryListingProto { required uint32 remainingEntries = 2; } +message RemoteExceptionProto { + required string className = 1; + optional string message = 2; +} + +// Directory listing result for a batched listing call. +message BatchedDirectoryListingProto { + repeated HdfsFileStatusProto partialListing = 1; + required uint32 parentIdx = 2; + optional RemoteExceptionProto exception = 3; +} + /** * Status of a snapshottable directory: besides the normal information for * a directory status, also include snapshot quota, number of snapshots, and diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java index e0432f5e7e..393f9f2c1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java @@ -39,6 +39,7 @@ public class TestReadOnly { "getStoragePolicies", "getStoragePolicy", "getListing", + "getBatchedListing", "getSnapshottableDirListing", "getPreferredBlockSize", "listCorruptFileBlocks", diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 82871aed15..aabf2821ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; +import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -855,6 +856,12 @@ public DirectoryListing getListing(String src, byte[] startAfter, return new DirectoryListing(combinedData, remainingEntries); } + @Override + public BatchedDirectoryListing getBatchedListing(String[] srcs, + byte[] startAfter, boolean needLocation) throws IOException { + throw new UnsupportedOperationException("Not implemented"); + } + @Override public HdfsFileStatus getFileInfo(String src) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index c1fd0ca83e..1f2298ddc6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; +import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -827,6 +828,13 @@ public DirectoryListing getListing(String src, byte[] startAfter, return clientProto.getListing(src, startAfter, needLocation); } + @Override + public BatchedDirectoryListing getBatchedListing( + String[] srcs, byte[] startAfter, boolean needLocation) + throws IOException { + throw new UnsupportedOperationException(); + } + @Override // ClientProtocol public HdfsFileStatus getFileInfo(String src) throws IOException { return clientProto.getFileInfo(src); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index f489570c36..c8f031e8fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -385,6 +385,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_LIST_LIMIT = "dfs.ls.limit"; public static final int DFS_LIST_LIMIT_DEFAULT = 1000; + public static final String DFS_NAMENODE_BATCHED_LISTING_LIMIT = "dfs.batched.ls.limit"; + public static final int DFS_NAMENODE_BATCHED_LISTING_LIMIT_DEFAULT = 100; public static final String DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit"; public static final int DFS_CONTENT_SUMMARY_LIMIT_DEFAULT = 5000; public static final String DFS_CONTENT_SUMMARY_SLEEP_MICROSEC_KEY = "dfs.content-summary.sleep-microsec"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 8c7d08278f..a4770150ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.stream.Collectors; +import com.google.protobuf.ByteString; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; @@ -40,6 +41,7 @@ import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.QuotaUsage; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; +import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -47,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsPartialListing; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; @@ -111,6 +114,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBatchedListingRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBatchedListingResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder; @@ -262,11 +267,13 @@ import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCodingPolicyResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingCodecsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingCodecsResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.*; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BatchedDirectoryListingProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteExceptionProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto; @@ -279,6 +286,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto; import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto; @@ -343,6 +351,13 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements private static final GetListingResponseProto VOID_GETLISTING_RESPONSE = GetListingResponseProto.newBuilder().build(); + private static final GetBatchedListingResponseProto + VOID_GETBATCHEDLISTING_RESPONSE = + GetBatchedListingResponseProto.newBuilder() + .setStartAfter(ByteString.copyFromUtf8("")) + .setHasMore(false) + .build(); + private static final RenewLeaseResponseProto VOID_RENEWLEASE_RESPONSE = RenewLeaseResponseProto.newBuilder().build(); @@ -742,7 +757,50 @@ public GetListingResponseProto getListing(RpcController controller, throw new ServiceException(e); } } - + + @Override + public GetBatchedListingResponseProto getBatchedListing( + RpcController controller, + GetBatchedListingRequestProto request) throws ServiceException { + try { + BatchedDirectoryListing result = server.getBatchedListing( + request.getPathsList().toArray(new String[] {}), + request.getStartAfter().toByteArray(), + request.getNeedLocation()); + if (result != null) { + GetBatchedListingResponseProto.Builder builder = + GetBatchedListingResponseProto.newBuilder(); + for (HdfsPartialListing partialListing : result.getListings()) { + BatchedDirectoryListingProto.Builder listingBuilder = + BatchedDirectoryListingProto.newBuilder(); + if (partialListing.getException() != null) { + RemoteException ex = partialListing.getException(); + RemoteExceptionProto.Builder rexBuilder = + RemoteExceptionProto.newBuilder(); + rexBuilder.setClassName(ex.getClassName()); + if (ex.getMessage() != null) { + rexBuilder.setMessage(ex.getMessage()); + } + listingBuilder.setException(rexBuilder.build()); + } else { + for (HdfsFileStatus f : partialListing.getPartialListing()) { + listingBuilder.addPartialListing(PBHelperClient.convert(f)); + } + } + listingBuilder.setParentIdx(partialListing.getParentIdx()); + builder.addListings(listingBuilder); + } + builder.setHasMore(result.hasMore()); + builder.setStartAfter(ByteString.copyFrom(result.getStartAfter())); + return builder.build(); + } else { + return VOID_GETBATCHEDLISTING_RESPONSE; + } + } catch (IOException e) { + throw new ServiceException(e); + } + } + @Override public RenewLeaseResponseProto renewLease(RpcController controller, RenewLeaseRequestProto req) throws ServiceException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index aee96b7714..f1dc6c1904 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -248,6 +248,10 @@ public int getWriteHoldCount() { return this.dirLock.getWriteHoldCount(); } + public int getListLimit() { + return lsLimit; + } + @VisibleForTesting public final EncryptionZoneManager ezManager; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 69f0799cf7..7ab7fd68b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -98,6 +98,11 @@ import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.OBSERVER; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; + +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; +import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsPartialListing; import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats; @@ -108,6 +113,7 @@ import org.apache.hadoop.hdfs.server.common.ECTopologyVerifier; import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; +import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.util.Time; import static org.apache.hadoop.util.Time.now; import static org.apache.hadoop.util.Time.monotonicNow; @@ -127,6 +133,9 @@ import java.net.InetSocketAddress; import java.net.URI; import java.nio.file.Files; +import java.security.GeneralSecurityException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -136,6 +145,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -225,6 +235,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BatchedListingKeyProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; @@ -289,7 +300,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.CallerContext; -import org.apache.hadoop.ipc.ObserverRetryOnActiveException; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RetryCache; import org.apache.hadoop.ipc.Server; @@ -531,6 +542,10 @@ private void logAuditEvent(boolean succeeded, private final long minBlockSize; // minimum block size final long maxBlocksPerFile; // maximum # of blocks per file + + // Maximum number of paths that can be listed per batched call. + private final int batchedListingLimit; + private final int numCommittedAllowed; /** Lock to protect FSNamesystem. */ @@ -599,6 +614,8 @@ private void logAuditEvent(boolean succeeded, */ private final Object metaSaveLock = new Object(); + private final MessageDigest digest; + /** * Notify that loading of this FSDirectory is complete, and * it is imageLoaded for use @@ -822,6 +839,12 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException { + DFS_CHECKSUM_TYPE_KEY + ": " + checksumTypeStr); } + try { + digest = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + throw new IOException("Algorithm 'MD5' not found"); + } + this.serverDefaults = new FsServerDefaults( conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT), conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT), @@ -844,6 +867,13 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException { DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT); this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY, DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT); + this.batchedListingLimit = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_BATCHED_LISTING_LIMIT, + DFSConfigKeys.DFS_NAMENODE_BATCHED_LISTING_LIMIT_DEFAULT); + Preconditions.checkArgument( + batchedListingLimit > 0, + DFSConfigKeys.DFS_NAMENODE_BATCHED_LISTING_LIMIT + + " must be greater than zero"); this.numCommittedAllowed = conf.getInt( DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT); @@ -897,7 +927,7 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException { alwaysUseDelegationTokensForTests = conf.getBoolean( DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT); - + this.dtSecretManager = createDelegationTokenSecretManager(conf); this.dir = new FSDirectory(this, conf); this.snapshotManager = new SnapshotManager(conf, dir); @@ -3926,6 +3956,149 @@ DirectoryListing getListing(String src, byte[] startAfter, return dl; } + public byte[] getSrcPathsHash(String[] srcs) { + synchronized (digest) { + for (String src : srcs) { + digest.update(src.getBytes(Charsets.UTF_8)); + } + byte[] result = digest.digest(); + digest.reset(); + return result; + } + } + + BatchedDirectoryListing getBatchedListing(String[] srcs, byte[] startAfter, + boolean needLocation) throws IOException { + + if (srcs.length > this.batchedListingLimit) { + String msg = String.format("Too many source paths (%d > %d)", + srcs.length, batchedListingLimit); + throw new IllegalArgumentException(msg); + } + + // Parse the startAfter key if present + int srcsIndex = 0; + byte[] indexStartAfter = new byte[0]; + + if (startAfter.length > 0) { + BatchedListingKeyProto startAfterProto = + BatchedListingKeyProto.parseFrom(startAfter); + // Validate that the passed paths match the checksum from key + Preconditions.checkArgument( + Arrays.equals( + startAfterProto.getChecksum().toByteArray(), + getSrcPathsHash(srcs))); + srcsIndex = startAfterProto.getPathIndex(); + indexStartAfter = startAfterProto.getStartAfter().toByteArray(); + // Special case: if the indexStartAfter key is an empty array, it + // means the last element we listed was a file, not a directory. + // Skip it so we don't list it twice. + if (indexStartAfter.length == 0) { + srcsIndex++; + } + } + final int startSrcsIndex = srcsIndex; + final String operationName = "listStatus"; + final FSPermissionChecker pc = getPermissionChecker(); + + BatchedDirectoryListing bdl; + + checkOperation(OperationCategory.READ); + readLock(); + try { + checkOperation(NameNode.OperationCategory.READ); + + // List all directories from the starting index until we've reached + // ls limit OR finished listing all srcs. + LinkedHashMap listings = + Maps.newLinkedHashMap(); + DirectoryListing lastListing = null; + int numEntries = 0; + for (; srcsIndex < srcs.length; srcsIndex++) { + String src = srcs[srcsIndex]; + HdfsPartialListing listing; + try { + DirectoryListing dirListing = + getListingInt(dir, pc, src, indexStartAfter, needLocation); + if (dirListing == null) { + throw new FileNotFoundException("Path " + src + " does not exist"); + } + listing = new HdfsPartialListing( + srcsIndex, Lists.newArrayList(dirListing.getPartialListing())); + numEntries += listing.getPartialListing().size(); + lastListing = dirListing; + } catch (Exception e) { + if (e instanceof AccessControlException) { + logAuditEvent(false, operationName, src); + } + listing = new HdfsPartialListing( + srcsIndex, + new RemoteException( + e.getClass().getCanonicalName(), + e.getMessage())); + lastListing = null; + LOG.info("Exception listing src {}", src, e); + } + + listings.put(srcsIndex, listing); + // Null out the indexStartAfter after the first time. + // If we get a partial result, we're done iterating because we're also + // over the list limit. + if (indexStartAfter.length != 0) { + indexStartAfter = new byte[0]; + } + // Terminate if we've reached the maximum listing size + if (numEntries >= dir.getListLimit()) { + break; + } + } + + HdfsPartialListing[] partialListingArray = + listings.values().toArray(new HdfsPartialListing[] {}); + + // Check whether there are more dirs/files to be listed, and if so setting + // up the index to start within the first dir to be listed next time. + if (srcsIndex >= srcs.length) { + // If the loop finished normally, there are no more srcs and we're done. + bdl = new BatchedDirectoryListing( + partialListingArray, + false, + new byte[0]); + } else if (srcsIndex == srcs.length-1 && + lastListing != null && + !lastListing.hasMore()) { + // If we're on the last srcsIndex, then we might be done exactly on an + // lsLimit boundary. + bdl = new BatchedDirectoryListing( + partialListingArray, + false, + new byte[0] + ); + } else { + byte[] lastName = lastListing != null && lastListing.getLastName() != + null ? lastListing.getLastName() : new byte[0]; + BatchedListingKeyProto proto = BatchedListingKeyProto.newBuilder() + .setChecksum(ByteString.copyFrom(getSrcPathsHash(srcs))) + .setPathIndex(srcsIndex) + .setStartAfter(ByteString.copyFrom(lastName)) + .build(); + byte[] returnedStartAfter = proto.toByteArray(); + + // Set the startAfter key if the last listing has more entries + bdl = new BatchedDirectoryListing( + partialListingArray, + true, + returnedStartAfter); + } + } finally { + readUnlock(operationName); + } + for (int i = startSrcsIndex; i < srcsIndex; i++) { + logAuditEvent(true, operationName, srcs[i]); + } + return bdl; + } + ///////////////////////////////////////////////////////// // // These methods are called by datanodes diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index e4839612e7..7099974996 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -89,6 +89,7 @@ import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; @@ -107,6 +108,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.FSLimitException; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; +import org.apache.hadoop.hdfs.protocol.HdfsPartialListing; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction; @@ -1179,6 +1181,27 @@ public DirectoryListing getListing(String src, byte[] startAfter, return files; } + @Override // ClientProtocol + public BatchedDirectoryListing getBatchedListing( + String[] srcs, + byte[] startAfter, + boolean needLocation) throws IOException { + checkNNStartup(); + BatchedDirectoryListing batchedListing = + namesystem.getBatchedListing(srcs, startAfter, needLocation); + if (batchedListing != null) { + metrics.incrGetListingOps(); + int numEntries = 0; + for (HdfsPartialListing partial : batchedListing.getListings()) { + if (partial.getPartialListing() != null) { + numEntries += partial.getPartialListing().size(); + } + } + metrics.incrFilesInGetListingOps(numEntries); + } + return batchedListing; + } + @Override // ClientProtocol public HdfsFileStatus getFileInfo(String src) throws IOException { checkNNStartup(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 2158d805ca..c291c892f9 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4556,6 +4556,16 @@ + + dfs.batched.ls.limit + 100 + + Limit the number of paths that can be listed in a single batched + listing call. printed by ls. If less or equal to + zero, at most DFS_LIST_LIMIT_DEFAULT (= 1000) will be printed. + + + dfs.ls.limit 1000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ListingBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ListingBenchmark.java new file mode 100644 index 0000000000..67deaf3c3e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ListingBenchmark.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.hdfs.server.namenode.NameNode; + +import java.io.IOException; + +public class ListingBenchmark { + + public static void main(String[] args) throws IOException { + HdfsConfiguration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(0) + .format(true) + .build(); + NameNode nn = cluster.getNameNode(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchedListDirectories.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchedListDirectories.java new file mode 100644 index 0000000000..f0b62bb45e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchedListDirectories.java @@ -0,0 +1,410 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.PartialListing; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.hamcrest.core.StringContains; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the batched listing API. + */ +public class TestBatchedListDirectories { + + private static MiniDFSCluster cluster; + private static Configuration conf; + private static DistributedFileSystem dfs; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private static final List SUBDIR_PATHS = Lists.newArrayList(); + private static final List FILE_PATHS = Lists.newArrayList(); + private static final int FIRST_LEVEL_DIRS = 2; + private static final int SECOND_LEVEL_DIRS = 3; + private static final int FILES_PER_DIR = 5; + private static final Path EMPTY_DIR_PATH = new Path("/emptydir"); + private static final Path DATA_FILE_PATH = new Path("/datafile"); + private static final Path INACCESSIBLE_DIR_PATH = new Path("/noperms"); + private static final Path INACCESSIBLE_FILE_PATH = + new Path(INACCESSIBLE_DIR_PATH, "nopermsfile"); + + private static Path getSubDirName(int i, int j) { + return new Path(String.format("/dir%d/subdir%d", i, j)); + } + + private static Path getFileName(int i, int j, int k) { + Path dirPath = getSubDirName(i, j); + return new Path(dirPath, "file" + k); + } + + private static void assertSubDirEquals(int i, int j, Path p) { + assertTrue(p.toString().startsWith("hdfs://")); + Path expected = getSubDirName(i, j); + assertEquals("Unexpected subdir name", + expected.toString(), p.toUri().getPath()); + } + + private static void assertFileEquals(int i, int j, int k, Path p) { + assertTrue(p.toString().startsWith("hdfs://")); + Path expected = getFileName(i, j, k); + assertEquals("Unexpected file name", + expected.toString(), p.toUri().getPath()); + } + + private static void loadData() throws Exception { + for (int i = 0; i < FIRST_LEVEL_DIRS; i++) { + for (int j = 0; j < SECOND_LEVEL_DIRS; j++) { + Path dirPath = getSubDirName(i, j); + dfs.mkdirs(dirPath); + SUBDIR_PATHS.add(dirPath); + for (int k = 0; k < FILES_PER_DIR; k++) { + Path filePath = getFileName(i, j, k); + dfs.create(filePath, (short)1).close(); + FILE_PATHS.add(filePath); + } + } + } + dfs.mkdirs(EMPTY_DIR_PATH); + FSDataOutputStream fsout = dfs.create(DATA_FILE_PATH, (short)1); + fsout.write(123); + fsout.close(); + + dfs.mkdirs(INACCESSIBLE_DIR_PATH); + dfs.create(INACCESSIBLE_FILE_PATH, (short)1).close(); + dfs.setPermission(INACCESSIBLE_DIR_PATH, new FsPermission(0000)); + } + + @BeforeClass + public static void beforeClass() throws Exception { + conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 7); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_BATCHED_LISTING_LIMIT, + FIRST_LEVEL_DIRS * SECOND_LEVEL_DIRS * FILES_PER_DIR); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .build(); + dfs = cluster.getFileSystem(); + loadData(); + } + + @AfterClass + public static void afterClass() { + if (cluster != null) { + cluster.shutdown(); + } + } + + private static List> getListings(List paths) + throws IOException { + List> returned = Lists.newArrayList(); + RemoteIterator> it = + dfs.batchedListStatusIterator(paths); + while (it.hasNext()) { + returned.add(it.next()); + } + return returned; + } + + private static List listingsToStatuses( + List> listings) throws IOException { + List returned = Lists.newArrayList(); + for (PartialListing listing : listings) { + returned.addAll(listing.get()); + } + return returned; + } + + private static List getStatuses(List paths) + throws IOException { + List> listings = getListings(paths); + return listingsToStatuses(listings); + } + + @Test + public void testEmptyPath() throws Exception { + thrown.expect(FileNotFoundException.class); + List paths = Lists.newArrayList(); + getStatuses(paths); + } + + @Test + public void testEmptyDir() throws Exception { + List paths = Lists.newArrayList(EMPTY_DIR_PATH); + List> listings = getListings(paths); + assertEquals(1, listings.size()); + PartialListing listing = listings.get(0); + assertEquals(EMPTY_DIR_PATH, listing.getListedPath()); + assertEquals(0, listing.get().size()); + } + @Test + public void listOneFile() throws Exception { + List paths = Lists.newArrayList(); + paths.add(FILE_PATHS.get(0)); + List statuses = getStatuses(paths); + assertEquals(1, statuses.size()); + assertFileEquals(0, 0, 0, statuses.get(0).getPath()); + } + + @Test + public void listDoesNotExist() throws Exception { + thrown.expect(FileNotFoundException.class); + List paths = Lists.newArrayList(); + paths.add(new Path("/does/not/exist")); + getStatuses(paths); + } + + @Test + public void listSomeDoNotExist() throws Exception { + List paths = Lists.newArrayList(); + paths.add(new Path("/does/not/exist")); + paths.addAll(SUBDIR_PATHS.subList(0, FIRST_LEVEL_DIRS)); + paths.add(new Path("/does/not/exist")); + paths.addAll(SUBDIR_PATHS.subList(0, FIRST_LEVEL_DIRS)); + paths.add(new Path("/does/not/exist")); + List> listings = getListings(paths); + for (int i = 0; i < listings.size(); i++) { + PartialListing partial = listings.get(i); + if (partial.getListedPath().toString().equals("/does/not/exist")) { + try { + partial.get(); + fail("Expected exception"); + } catch (FileNotFoundException e) { + assertTrue(e.getMessage().contains("/does/not/exist")); + } + } else { + partial.get(); + } + } + try { + listings.get(listings.size()-1).get(); + fail("Expected exception"); + } catch (FileNotFoundException e) { + assertTrue(e.getMessage().contains("/does/not/exist")); + } + } + + @Test + public void listDirRelative() throws Exception { + dfs.setWorkingDirectory(new Path("/dir0")); + List paths = Lists.newArrayList(new Path(".")); + List statuses = getStatuses(paths); + assertEquals("Wrong number of items", + SECOND_LEVEL_DIRS, statuses.size()); + for (int i = 0; i < SECOND_LEVEL_DIRS; i++) { + FileStatus stat = statuses.get(i); + assertSubDirEquals(0, i, stat.getPath()); + } + } + + @Test + public void listFilesRelative() throws Exception { + dfs.setWorkingDirectory(new Path("/dir0")); + List paths = Lists.newArrayList(new Path("subdir0")); + List statuses = getStatuses(paths); + assertEquals("Wrong number of items", + FILES_PER_DIR, statuses.size()); + for (int i = 0; i < FILES_PER_DIR; i++) { + FileStatus stat = statuses.get(i); + assertFileEquals(0, 0, i, stat.getPath()); + } + } + + private void listFilesInternal(int numFiles) throws Exception { + List paths = FILE_PATHS.subList(0, numFiles); + List statuses = getStatuses(paths); + assertEquals(paths.size(), statuses.size()); + for (int i = 0; i < paths.size(); i++) { + Path p = paths.get(i); + FileStatus stat = statuses.get(i); + assertEquals(p.toUri().getPath(), stat.getPath().toUri().getPath()); + } + } + + @Test + public void listOneFiles() throws Exception { + listFilesInternal(1); + } + + @Test + public void listSomeFiles() throws Exception { + listFilesInternal(FILE_PATHS.size() / 2); + } + + @Test + public void listAllFiles() throws Exception { + listFilesInternal(FILE_PATHS.size()); + } + + private void listDirectoriesInternal(int numDirs) throws Exception { + List paths = SUBDIR_PATHS.subList(0, numDirs); + List> listings = getListings(paths); + + LinkedHashMap> listing = new LinkedHashMap<>(); + for (PartialListing partialListing : listings) { + Path parent = partialListing.getListedPath(); + if (!listing.containsKey(parent)) { + listing.put(parent, Lists.newArrayList()); + } + listing.get(parent).addAll(partialListing.get()); + } + + assertEquals(paths.size(), listing.size()); + int pathIdx = 0; + for (Map.Entry> entry : listing.entrySet()) { + Path expected = paths.get(pathIdx++); + Path parent = entry.getKey(); + List children = entry.getValue(); + assertEquals(expected, parent); + assertEquals(FILES_PER_DIR, children.size()); + } + } + + @Test + public void listOneDirectory() throws Exception { + listDirectoriesInternal(1); + } + + @Test + public void listSomeDirectories() throws Exception { + listDirectoriesInternal(SUBDIR_PATHS.size() / 2); + } + + @Test + public void listAllDirectories() throws Exception { + listDirectoriesInternal(SUBDIR_PATHS.size()); + } + + @Test + public void listTooManyDirectories() throws Exception { + thrown.expect(RemoteException.class); + thrown.expectMessage( + StringContains.containsString("Too many source paths")); + List paths = Lists.newArrayList(FILE_PATHS); + paths.add(SUBDIR_PATHS.get(0)); + getStatuses(paths); + } + + @Test + public void listDirsAndEmpty() throws Exception { + List paths = Lists.newArrayList(); + paths.add(EMPTY_DIR_PATH); + paths.add(FILE_PATHS.get(0)); + paths.add(EMPTY_DIR_PATH); + List> listings = getListings(paths); + assertEquals(3, listings.size()); + assertEquals(0, listings.get(0).get().size()); + assertEquals(1, listings.get(1).get().size()); + assertEquals(FILE_PATHS.get(0).toString(), + listings.get(1).get().get(0).getPath().toUri().getPath()); + assertEquals(0, listings.get(2).get().size()); + } + + @Test + public void listSamePaths() throws Exception { + List paths = Lists.newArrayList(); + paths.add(SUBDIR_PATHS.get(0)); + paths.add(SUBDIR_PATHS.get(0)); + paths.add(FILE_PATHS.get(0)); + paths.add(FILE_PATHS.get(0)); + List statuses = getStatuses(paths); + assertEquals(FILES_PER_DIR*2 + 2, statuses.size()); + List slice = statuses.subList(0, FILES_PER_DIR); + for (int i = 0; i < FILES_PER_DIR; i++) { + assertFileEquals(0, 0, i, slice.get(i).getPath()); + } + slice = statuses.subList(FILES_PER_DIR, FILES_PER_DIR*2); + for (int i = 0; i < FILES_PER_DIR; i++) { + assertFileEquals(0, 0, i, slice.get(i).getPath()); + } + assertFileEquals(0, 0, 0, statuses.get(FILES_PER_DIR*2).getPath()); + assertFileEquals(0, 0, 0, statuses.get(FILES_PER_DIR*2+1).getPath()); + } + + @Test + public void listLocatedStatus() throws Exception { + List paths = Lists.newArrayList(); + paths.add(DATA_FILE_PATH); + RemoteIterator> it = + dfs.batchedListLocatedStatusIterator(paths); + PartialListing listing = it.next(); + List statuses = listing.get(); + assertEquals(1, statuses.size()); + assertTrue(statuses.get(0).getBlockLocations().length > 0); + } + + private void listAsNormalUser(List paths) + throws IOException, InterruptedException { + final UserGroupInformation ugi = UserGroupInformation + .createRemoteUser("tiffany"); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + // try renew with long name + FileSystem fs = FileSystem.get(cluster.getURI(), conf); + RemoteIterator> it = + fs.batchedListStatusIterator(paths); + PartialListing listing = it.next(); + listing.get(); + return null; + } + }); + } + + @Test + public void listInaccessibleDir() throws Exception { + thrown.expect(AccessControlException.class); + List paths = Lists.newArrayList(INACCESSIBLE_DIR_PATH); + listAsNormalUser(paths); + } + + @Test + public void listInaccessibleFile() throws Exception { + thrown.expect(AccessControlException.class); + List paths = Lists.newArrayList(INACCESSIBLE_FILE_PATH); + listAsNormalUser(paths); + } +}