diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 0de1071339..99272474f9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -2229,7 +2229,9 @@ private void fetchMore() throws IOException { @Override @SuppressWarnings("unchecked") public T next() throws IOException { - Preconditions.checkState(hasNext(), "No more items in iterator"); + if (!hasNext()) { + throw new NoSuchElementException("No more items in iterator"); + } if (i == entries.getEntries().length) { fetchMore(); } diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index 089af06211..284a964f6e 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -294,6 +294,24 @@ any optimizations. The atomicity and consistency constraints are as for `listStatus(Path, PathFilter)`. +### `RemoteIterator listStatusIterator(Path p)` + +Return an iterator enumerating the `FileStatus` entries under +a path. This is similar to `listStatus(Path)` except the fact that +rather than returning an entire list, an iterator is returned. +The result is exactly the same as `listStatus(Path)`, provided no other +caller updates the directory during the listing. Having said that, this does +not guarantee atomicity if other callers are adding/deleting the files +inside the directory while listing is being performed. Different filesystems +may provide a more efficient implementation, for example S3A does the +listing in pages and fetches the next pages asynchronously while a +page is getting processed. + +Note that now since the initial listing is async, bucket/path existence +exception may show up later during next() call. + +Callers should prefer using listStatusIterator over listStatus as it +is incremental in nature. ### `FileStatus[] listStatus(Path[] paths)` diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java index f63314d392..c0d9733bbb 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.UUID; +import org.assertj.core.api.Assertions; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FilterFileSystem; @@ -148,6 +150,7 @@ public void testListLocatedStatusEmptyDirectory() throws IOException { public void testComplexDirActions() throws Throwable { TreeScanResults tree = createTestTree(); checkListStatusStatusComplexDir(tree); + checkListStatusIteratorComplexDir(tree); checkListLocatedStatusStatusComplexDir(tree); checkListFilesComplexDirNonRecursive(tree); checkListFilesComplexDirRecursive(tree); @@ -169,6 +172,34 @@ protected void checkListStatusStatusComplexDir(TreeScanResults tree) listing.assertSizeEquals("listStatus()", TREE_FILES, TREE_WIDTH, 0); } + /** + * Test {@link FileSystem#listStatusIterator(Path)} on a complex + * directory tree. + * @param tree directory tree to list. + * @throws Throwable + */ + protected void checkListStatusIteratorComplexDir(TreeScanResults tree) + throws Throwable { + describe("Expect listStatusIterator to list all entries in top dir only"); + + FileSystem fs = getFileSystem(); + TreeScanResults listing = new TreeScanResults( + fs.listStatusIterator(tree.getBasePath())); + listing.assertSizeEquals("listStatus()", TREE_FILES, TREE_WIDTH, 0); + + List resWithoutCheckingHasNext = + iteratorToListThroughNextCallsAlone(fs + .listStatusIterator(tree.getBasePath())); + + List resWithCheckingHasNext = iteratorToList(fs + .listStatusIterator(tree.getBasePath())); + Assertions.assertThat(resWithCheckingHasNext) + .describedAs("listStatusIterator() should return correct " + + "results even if hasNext() calls are not made.") + .hasSameElementsAs(resWithoutCheckingHasNext); + + } + /** * Test {@link FileSystem#listLocatedStatus(Path)} on a complex * directory tree. @@ -322,6 +353,45 @@ public void testListStatusFile() throws Throwable { verifyStatusArrayMatchesFile(f, getFileSystem().listStatus(f)); } + @Test + public void testListStatusIteratorFile() throws Throwable { + describe("test the listStatusIterator(path) on a file"); + Path f = touchf("listStItrFile"); + + List statusList = (List) iteratorToList( + getFileSystem().listStatusIterator(f)); + validateListingForFile(f, statusList, false); + + List statusList2 = + (List) iteratorToListThroughNextCallsAlone( + getFileSystem().listStatusIterator(f)); + validateListingForFile(f, statusList2, true); + } + + /** + * Validate listing result for an input path which is file. + * @param f file. + * @param statusList list status of a file. + * @param nextCallAlone whether the listing generated just using + * next() calls. + */ + private void validateListingForFile(Path f, + List statusList, + boolean nextCallAlone) { + String msg = String.format("size of file list returned using %s should " + + "be 1", nextCallAlone ? + "next() calls alone" : "hasNext() and next() calls"); + Assertions.assertThat(statusList) + .describedAs(msg) + .hasSize(1); + Assertions.assertThat(statusList.get(0).getPath()) + .describedAs("path returned should match with the input path") + .isEqualTo(f); + Assertions.assertThat(statusList.get(0).isFile()) + .describedAs("path returned should be a file") + .isEqualTo(true); + } + @Test public void testListFilesFile() throws Throwable { describe("test the listStatus(path) on a file"); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java index 27c6933ae1..6eaa56bab7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java @@ -22,13 +22,16 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.junit.Test; +import org.assertj.core.api.Assertions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.RemoteIterator; @@ -39,6 +42,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren; import static org.apache.hadoop.fs.contract.ContractTestUtils.dumpStats; +import static org.apache.hadoop.fs.contract.ContractTestUtils.iteratorToList; import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren; import static org.apache.hadoop.fs.contract.ContractTestUtils.toList; import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk; @@ -242,6 +246,13 @@ public void testSimpleRootListing() throws IOException { + "listStatus = " + listStatusResult + "listFiles = " + listFilesResult, fileList.size() <= statuses.length); + List statusList = (List) iteratorToList( + fs.listStatusIterator(root)); + Assertions.assertThat(statusList) + .describedAs("Result of listStatus(/) and listStatusIterator(/)" + + " must match") + .hasSameElementsAs(Arrays.stream(statuses) + .collect(Collectors.toList())); } @Test diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index 4789630f95..39a41d01c4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -1453,6 +1453,52 @@ public static List toList( return list; } + /** + * Convert a remote iterator over file status results into a list. + * The utility equivalents in commons collection and guava cannot be + * used here, as this is a different interface, one whose operators + * can throw IOEs. + * @param iterator input iterator + * @return the file status entries as a list. + * @throws IOException + */ + public static List iteratorToList( + RemoteIterator iterator) throws IOException { + List list = new ArrayList<>(); + while (iterator.hasNext()) { + list.add(iterator.next()); + } + return list; + } + + + /** + * Convert a remote iterator over file status results into a list. + * This uses {@link RemoteIterator#next()} calls only, expecting + * a raised {@link NoSuchElementException} exception to indicate that + * the end of the listing has been reached. This iteration strategy is + * designed to verify that the implementation of the remote iterator + * generates results and terminates consistently with the {@code hasNext/next} + * iteration. More succinctly "verifies that the {@code next()} operator + * isn't relying on {@code hasNext()} to always be called during an iteration. + * @param iterator input iterator + * @return the status entries as a list. + * @throws IOException IO problems + */ + @SuppressWarnings("InfiniteLoopStatement") + public static List iteratorToListThroughNextCallsAlone( + RemoteIterator iterator) throws IOException { + List list = new ArrayList<>(); + try { + while (true) { + list.add(iterator.next()); + } + } catch (NoSuchElementException expected) { + // ignored + } + return list; + } + /** * Convert a remote iterator over file status results into a list. * This uses {@link RemoteIterator#next()} calls only, expecting @@ -1602,7 +1648,7 @@ public TreeScanResults(Path basePath) { * @param results results of the listFiles/listStatus call. * @throws IOException IO problems during the iteration. */ - public TreeScanResults(RemoteIterator results) + public TreeScanResults(RemoteIterator results) throws IOException { while (results.hasNext()) { add(results.next()); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 86f2a889a9..d7b576af8b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -2643,6 +2643,30 @@ void maybeCreateFakeParentDirectory(Path path) } } + /** + * Override subclass such that we benefit for async listing done + * in {@code S3AFileSystem}. See {@code Listing#ObjectListingIterator}. + * {@inheritDoc} + * + */ + @Override + public RemoteIterator listStatusIterator(Path p) + throws FileNotFoundException, IOException { + RemoteIterator listStatusItr = once("listStatus", + p.toString(), () -> innerListStatus(p)); + return new RemoteIterator() { + @Override + public boolean hasNext() throws IOException { + return listStatusItr.hasNext(); + } + + @Override + public FileStatus next() throws IOException { + return listStatusItr.next(); + } + }; + } + /** * List the statuses of the files/directories in the given path if the path is * a directory. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java index a3cca75c50..44e3a8abcd 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java @@ -231,6 +231,24 @@ public int read() throws IOException { "match with original list of files") .hasSameElementsAs(originalListOfFiles) .hasSize(numOfPutRequests); + // Validate listing using listStatusIterator(). + NanoTimer timeUsingListStatusItr = new NanoTimer(); + RemoteIterator lsItr = fs.listStatusIterator(dir); + List listUsingListStatusItr = new ArrayList<>(); + while (lsItr.hasNext()) { + listUsingListStatusItr.add(lsItr.next().getPath().toString()); + Thread.sleep(eachFileProcessingTime); + } + timeUsingListStatusItr.end("listing %d files using " + + "listStatusIterator() api with batch size of %d " + + "including %dms of processing time for each file", + numOfPutRequests, batchSize, eachFileProcessingTime); + Assertions.assertThat(listUsingListStatusItr) + .describedAs("Listing results using listStatusIterator() must" + + "match with original list of files") + .hasSameElementsAs(originalListOfFiles) + .hasSize(numOfPutRequests); + } finally { executorService.shutdown(); }