From 0840c0c1f3136873c4d774d00c354ede010f971a Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Tue, 25 Aug 2020 15:59:43 +0530 Subject: [PATCH] HADOOP-17074. S3A Listing to be fully asynchronous. (#2207) Contributed by Mukund Thakur. Change-Id: I1b0574a0c9ebc0805f285dd5280a00e5add081f1 --- .../org/apache/hadoop/fs/s3a/Listing.java | 56 ++++++--- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 11 +- .../s3a/impl/ListingOperationCallbacks.java | 7 +- .../s3a/impl/TestPartialDeleteFailures.java | 6 +- .../scale/ITestS3ADirectoryPerformance.java | 107 +++++++++++++++++- 5 files changed, 164 insertions(+), 23 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java index 34129e0bf1..16413a7620 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -55,7 +55,9 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture; import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX; import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL; import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus; @@ -739,6 +741,16 @@ class ObjectListingIterator implements RemoteIterator { */ private int maxKeys; + /** + * Future to store current batch listing result. + */ + private CompletableFuture s3ListResultFuture; + + /** + * Result of previous batch. + */ + private S3ListResult objectsPrev; + /** * Constructor -calls `listObjects()` on the request to populate the * initial set of results/fail if there was a problem talking to the bucket. @@ -752,8 +764,10 @@ class ObjectListingIterator implements RemoteIterator { S3ListRequest request) throws IOException { this.listPath = listPath; this.maxKeys = listingOperationCallbacks.getMaxKeys(); - this.objects = listingOperationCallbacks.listObjects(request); + this.s3ListResultFuture = listingOperationCallbacks + .listObjectsAsync(request); this.request = request; + this.objectsPrev = null; } /** @@ -764,7 +778,8 @@ class ObjectListingIterator implements RemoteIterator { */ @Override public boolean hasNext() throws IOException { - return firstListing || objects.isTruncated(); + return firstListing || + (objectsPrev != null && objectsPrev.isTruncated()); } /** @@ -780,29 +795,44 @@ public boolean hasNext() throws IOException { @Retries.RetryTranslated public S3ListResult next() throws IOException { if (firstListing) { - // on the first listing, don't request more data. - // Instead just clear the firstListing flag so that it future calls - // will request new data. + // clear the firstListing flag for future calls. firstListing = false; + // Calculating the result of last async list call. + objects = awaitFuture(s3ListResultFuture); + fetchNextBatchAsyncIfPresent(); } else { try { - if (!objects.isTruncated()) { + if (objectsPrev!= null && !objectsPrev.isTruncated()) { // nothing more to request: fail. throw new NoSuchElementException("No more results in listing of " - + listPath); + + listPath); } - // need to request a new set of objects. - LOG.debug("[{}], Requesting next {} objects under {}", - listingCount, maxKeys, listPath); - objects = listingOperationCallbacks - .continueListObjects(request, objects); + // Calculating the result of last async list call. + objects = awaitFuture(s3ListResultFuture); + // Requesting next batch of results. + fetchNextBatchAsyncIfPresent(); listingCount++; LOG.debug("New listing status: {}", this); } catch (AmazonClientException e) { throw translateException("listObjects()", listPath, e); } } - return objects; + // Storing the current result to be used by hasNext() call. + objectsPrev = objects; + return objectsPrev; + } + + /** + * If there are more listings present, call for next batch async. + * @throws IOException + */ + private void fetchNextBatchAsyncIfPresent() throws IOException { + if (objects.isTruncated()) { + LOG.debug("[{}], Requesting next {} objects under {}", + listingCount, maxKeys, listPath); + s3ListResultFuture = listingOperationCallbacks + .continueListObjectsAsync(request, objects); + } } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index ac9904a867..63c80bdd06 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1649,19 +1649,21 @@ protected class ListingOperationCallbacksImpl implements @Override @Retries.RetryRaw - public S3ListResult listObjects( + public CompletableFuture listObjectsAsync( S3ListRequest request) throws IOException { - return S3AFileSystem.this.listObjects(request); + return submit(unboundedThreadPool, + () -> listObjects(request)); } @Override @Retries.RetryRaw - public S3ListResult continueListObjects( + public CompletableFuture continueListObjectsAsync( S3ListRequest request, S3ListResult prevResult) throws IOException { - return S3AFileSystem.this.continueListObjects(request, prevResult); + return submit(unboundedThreadPool, + () -> continueListObjects(request, prevResult)); } @Override @@ -2279,6 +2281,7 @@ public UploadInfo putObject(PutObjectRequest putObjectRequest) { * not be saved to the metadata store and * fs.s3a.metadatastore.fail.on.write.error=true */ + @VisibleForTesting @Retries.OnceRaw("For PUT; post-PUT actions are RetryTranslated") PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest) throws AmazonClientException, MetadataPersistenceException { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java index d89cada846..3f35bae65c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a.impl; import java.io.IOException; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -38,7 +39,7 @@ public interface ListingOperationCallbacks { /** - * Initiate a {@code listObjects} operation, incrementing metrics + * Initiate a {@code listObjectsAsync} operation, incrementing metrics * in the process. * * Retry policy: retry untranslated. @@ -47,7 +48,7 @@ public interface ListingOperationCallbacks { * @throws IOException if the retry invocation raises one (it shouldn't). */ @Retries.RetryRaw - S3ListResult listObjects( + CompletableFuture listObjectsAsync( S3ListRequest request) throws IOException; @@ -60,7 +61,7 @@ S3ListResult listObjects( * @throws IOException none, just there for retryUntranslated. */ @Retries.RetryRaw - S3ListResult continueListObjects( + CompletableFuture continueListObjectsAsync( S3ListRequest request, S3ListResult prevResult) throws IOException; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java index 7fa03a16cd..0729f2ac28 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -243,13 +244,14 @@ private StoreContext createMockStoreContext(boolean multiDelete, private static class MinimalListingOperationCallbacks implements ListingOperationCallbacks { @Override - public S3ListResult listObjects(S3ListRequest request) + public CompletableFuture listObjectsAsync( + S3ListRequest request) throws IOException { return null; } @Override - public S3ListResult continueListObjects( + public CompletableFuture continueListObjectsAsync( S3ListRequest request, S3ListResult prevResult) throws IOException { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java index 03f1e220b5..d2898f08fa 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java @@ -18,14 +18,33 @@ package org.apache.hadoop.fs.s3a.scale; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.Statistic; + import org.junit.Test; +import org.assertj.core.api.Assertions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; import static org.apache.hadoop.fs.s3a.Statistic.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; @@ -137,6 +156,93 @@ public void testListOperations() throws Throwable { } } + @Test + public void testMultiPagesListingPerformanceAndCorrectness() + throws Throwable { + describe("Check performance and correctness for multi page listing " + + "using different listing api"); + final Path dir = methodPath(); + final int batchSize = 10; + final int numOfPutRequests = 1000; + final int eachFileProcessingTime = 10; + final int numOfPutThreads = 50; + final Configuration conf = + getConfigurationWithConfiguredBatchSize(batchSize); + final InputStream im = new InputStream() { + @Override + public int read() throws IOException { + return -1; + } + }; + final List originalListOfFiles = new ArrayList<>(); + List> putObjectRequests = new ArrayList<>(); + ExecutorService executorService = Executors + .newFixedThreadPool(numOfPutThreads); + + NanoTimer uploadTimer = new NanoTimer(); + try(S3AFileSystem fs = (S3AFileSystem) FileSystem.get(dir.toUri(), conf)) { + fs.create(dir); + assume("Test is only for raw fs", !fs.hasMetadataStore()); + for (int i=0; i + fs.getWriteOperationHelper().putObject(put)); + } + executorService.invokeAll(putObjectRequests); + uploadTimer.end("uploading %d files with a parallelism of %d", + numOfPutRequests, numOfPutThreads); + + RemoteIterator resIterator = fs.listFiles(dir, true); + List listUsingListFiles = new ArrayList<>(); + NanoTimer timeUsingListFiles = new NanoTimer(); + while(resIterator.hasNext()) { + listUsingListFiles.add(resIterator.next().getPath().toString()); + Thread.sleep(eachFileProcessingTime); + } + timeUsingListFiles.end("listing %d files using listFiles() api with " + + "batch size of %d including %dms of processing time" + + " for each file", + numOfPutRequests, batchSize, eachFileProcessingTime); + + Assertions.assertThat(listUsingListFiles) + .describedAs("Listing results using listFiles() must" + + "match with original list of files") + .hasSameElementsAs(originalListOfFiles) + .hasSize(numOfPutRequests); + List listUsingListStatus = new ArrayList<>(); + NanoTimer timeUsingListStatus = new NanoTimer(); + FileStatus[] fileStatuses = fs.listStatus(dir); + for(FileStatus fileStatus : fileStatuses) { + listUsingListStatus.add(fileStatus.getPath().toString()); + Thread.sleep(eachFileProcessingTime); + } + timeUsingListStatus.end("listing %d files using listStatus() api with " + + "batch size of %d including %dms of processing time" + + " for each file", + numOfPutRequests, batchSize, eachFileProcessingTime); + Assertions.assertThat(listUsingListStatus) + .describedAs("Listing results using listStatus() must" + + "match with original list of files") + .hasSameElementsAs(originalListOfFiles) + .hasSize(numOfPutRequests); + } finally { + executorService.shutdown(); + } + } + + private Configuration getConfigurationWithConfiguredBatchSize(int batchSize) { + Configuration conf = new Configuration(getFileSystem().getConf()); + S3ATestUtils.disableFilesystemCaching(conf); + conf.setInt(Constants.MAX_PAGING_KEYS, batchSize); + return conf; + } + @Test public void testTimeToStatEmptyDirectory() throws Throwable { describe("Time to stat an empty directory"); @@ -188,5 +294,4 @@ private void timeToStatPath(Path path) throws IOException { LOG.info("listObjects: {}", listRequests); LOG.info("listObjects: per operation {}", listRequests.diff() / attempts); } - }