From 8fd4f5490f59a2e9e561b6438b30b3a7453c808b Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Tue, 4 Aug 2020 20:30:02 +0530 Subject: [PATCH] HADOOP-17131. Refactor S3A Listing code for better isolation. (#2148) Contributed by Mukund Thakur. --- .../org/apache/hadoop/fs/s3a/Listing.java | 191 ++++++++++++++++-- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 158 +++++++-------- .../s3a/impl/ListingOperationCallbacks.java | 119 +++++++++++ .../hadoop/fs/s3a/impl/StoreContext.java | 4 + .../s3a/s3guard/DumpS3GuardDynamoTable.java | 4 +- .../org/apache/hadoop/fs/s3a/TestListing.java | 2 +- .../s3a/impl/TestPartialDeleteFailures.java | 149 +++++++++++++- 7 files changed, 521 insertions(+), 106 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java index 9c2f67dd88..fcb492857e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -30,11 +30,22 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation; +import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks; +import org.apache.hadoop.fs.s3a.impl.StoreContext; +import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata; +import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator; +import org.apache.hadoop.fs.s3a.s3guard.PathMetadata; +import org.apache.hadoop.fs.s3a.s3guard.S3Guard; import com.google.common.base.Preconditions; import org.slf4j.Logger; +import java.io.FileNotFoundException; import java.io.IOException; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -46,25 +57,31 @@ import java.util.Set; import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX; +import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL; import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus; +import static org.apache.hadoop.fs.s3a.S3AUtils.maybeAddTrailingSlash; import static org.apache.hadoop.fs.s3a.S3AUtils.objectRepresentsDirectory; import static org.apache.hadoop.fs.s3a.S3AUtils.stringify; import static org.apache.hadoop.fs.s3a.S3AUtils.translateException; +import static org.apache.hadoop.fs.s3a.auth.RoleModel.pathToKey; /** * Place for the S3A listing classes; keeps all the small classes under control. */ @InterfaceAudience.Private -public class Listing { +public class Listing extends AbstractStoreOperation { - private final S3AFileSystem owner; private static final Logger LOG = S3AFileSystem.LOG; static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N = new AcceptAllButS3nDirs(); - public Listing(S3AFileSystem owner) { - this.owner = owner; + private final ListingOperationCallbacks listingOperationCallbacks; + + public Listing(ListingOperationCallbacks listingOperationCallbacks, + StoreContext storeContext) { + super(storeContext); + this.listingOperationCallbacks = listingOperationCallbacks; } /** @@ -156,6 +173,145 @@ TombstoneReconcilingIterator createTombstoneReconcilingIterator( return new TombstoneReconcilingIterator(iterator, tombstones); } + + /** + * List files under a path assuming the path to be a directory. + * @param path input path. + * @param recursive recursive listing? + * @param acceptor file status filter + * @param collectTombstones should tombstones be collected from S3Guard? + * @param forceNonAuthoritativeMS forces metadata store to act like non + * authoritative. This is useful when + * listFiles output is used by import tool. + * @return an iterator over listing. + * @throws IOException any exception. + */ + public RemoteIterator getListFilesAssumingDir( + Path path, + boolean recursive, Listing.FileStatusAcceptor acceptor, + boolean collectTombstones, + boolean forceNonAuthoritativeMS) throws IOException { + + String key = maybeAddTrailingSlash(pathToKey(path)); + String delimiter = recursive ? null : "/"; + LOG.debug("Requesting all entries under {} with delimiter '{}'", + key, delimiter); + final RemoteIterator cachedFilesIterator; + final Set tombstones; + boolean allowAuthoritative = listingOperationCallbacks + .allowAuthoritative(path); + if (recursive) { + final PathMetadata pm = getStoreContext() + .getMetadataStore() + .get(path, true); + if (pm != null) { + if (pm.isDeleted()) { + OffsetDateTime deletedAt = OffsetDateTime + .ofInstant(Instant.ofEpochMilli( + pm.getFileStatus().getModificationTime()), + ZoneOffset.UTC); + throw new FileNotFoundException("Path " + path + " is recorded as " + + "deleted by S3Guard at " + deletedAt); + } + } + MetadataStoreListFilesIterator metadataStoreListFilesIterator = + new MetadataStoreListFilesIterator( + getStoreContext().getMetadataStore(), + pm, + allowAuthoritative); + tombstones = metadataStoreListFilesIterator.listTombstones(); + // if all of the below is true + // - authoritative access is allowed for this metadatastore + // for this directory, + // - all the directory listings are authoritative on the client + // - the caller does not force non-authoritative access + // return the listing without any further s3 access + if (!forceNonAuthoritativeMS && + allowAuthoritative && + metadataStoreListFilesIterator.isRecursivelyAuthoritative()) { + S3AFileStatus[] statuses = S3Guard.iteratorToStatuses( + metadataStoreListFilesIterator, tombstones); + cachedFilesIterator = createProvidedFileStatusIterator( + statuses, ACCEPT_ALL, acceptor); + return createLocatedFileStatusIterator(cachedFilesIterator); + } + cachedFilesIterator = metadataStoreListFilesIterator; + } else { + DirListingMetadata meta = + S3Guard.listChildrenWithTtl( + getStoreContext().getMetadataStore(), + path, + listingOperationCallbacks.getUpdatedTtlTimeProvider(), + allowAuthoritative); + if (meta != null) { + tombstones = meta.listTombstones(); + } else { + tombstones = null; + } + cachedFilesIterator = createProvidedFileStatusIterator( + S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor); + if (allowAuthoritative && meta != null && meta.isAuthoritative()) { + // metadata listing is authoritative, so return it directly + return createLocatedFileStatusIterator(cachedFilesIterator); + } + } + return createTombstoneReconcilingIterator( + createLocatedFileStatusIterator( + createFileStatusListingIterator(path, + listingOperationCallbacks + .createListObjectsRequest(key, delimiter), + ACCEPT_ALL, + acceptor, + cachedFilesIterator)), + collectTombstones ? tombstones : null); + } + + /** + * Generate list located status for a directory. + * Also performing tombstone reconciliation for guarded directories. + * @param dir directory to check. + * @param filter a path filter. + * @return an iterator that traverses statuses of the given dir. + * @throws IOException in case of failure. + */ + public RemoteIterator getLocatedFileStatusIteratorForDir( + Path dir, PathFilter filter) throws IOException { + final String key = maybeAddTrailingSlash(pathToKey(dir)); + final Listing.FileStatusAcceptor acceptor = + new Listing.AcceptAllButSelfAndS3nDirs(dir); + boolean allowAuthoritative = listingOperationCallbacks + .allowAuthoritative(dir); + DirListingMetadata meta = + S3Guard.listChildrenWithTtl(getStoreContext().getMetadataStore(), + dir, + listingOperationCallbacks + .getUpdatedTtlTimeProvider(), + allowAuthoritative); + Set tombstones = meta != null + ? meta.listTombstones() + : null; + final RemoteIterator cachedFileStatusIterator = + createProvidedFileStatusIterator( + S3Guard.dirMetaToStatuses(meta), filter, acceptor); + return (allowAuthoritative && meta != null + && meta.isAuthoritative()) + ? createLocatedFileStatusIterator( + cachedFileStatusIterator) + : createTombstoneReconcilingIterator( + createLocatedFileStatusIterator( + createFileStatusListingIterator(dir, + listingOperationCallbacks + .createListObjectsRequest(key, "/"), + filter, + acceptor, + cachedFileStatusIterator)), + tombstones); + } + + public S3ListRequest createListObjectsRequest(String key, String delimiter) { + return listingOperationCallbacks.createListObjectsRequest(key, delimiter); + } + /** * Interface to implement by the logic deciding whether to accept a summary * entry or path as a valid file or directory. @@ -193,9 +349,9 @@ interface FileStatusAcceptor { * value. * * If the status value is null, the iterator declares that it has no data. - * This iterator is used to handle {@link S3AFileSystem#listStatus} calls - * where the path handed in refers to a file, not a directory: this is the - * iterator returned. + * This iterator is used to handle {@link S3AFileSystem#listStatus(Path)} + * calls where the path handed in refers to a file, not a directory: + * this is the iterator returned. */ static final class SingleStatusRemoteIterator implements RemoteIterator { @@ -465,14 +621,15 @@ private boolean buildNextStatusBatch(S3ListResult objects) { // objects for (S3ObjectSummary summary : objects.getObjectSummaries()) { String key = summary.getKey(); - Path keyPath = owner.keyToQualifiedPath(key); + Path keyPath = getStoreContext().getContextAccessors().keyToPath(key); if (LOG.isDebugEnabled()) { LOG.debug("{}: {}", keyPath, stringify(summary)); } // Skip over keys that are ourselves and old S3N _$folder$ files if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) { S3AFileStatus status = createFileStatus(keyPath, summary, - owner.getDefaultBlockSize(keyPath), owner.getUsername(), + listingOperationCallbacks.getDefaultBlockSize(keyPath), + getStoreContext().getUsername(), summary.getETag(), null); LOG.debug("Adding: {}", status); stats.add(status); @@ -485,10 +642,12 @@ private boolean buildNextStatusBatch(S3ListResult objects) { // prefixes: always directories for (String prefix : objects.getCommonPrefixes()) { - Path keyPath = owner.keyToQualifiedPath(prefix); + Path keyPath = getStoreContext() + .getContextAccessors() + .keyToPath(prefix); if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) { S3AFileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath, - owner.getUsername()); + getStoreContext().getUsername()); LOG.debug("Adding directory: {}", status); added++; stats.add(status); @@ -573,8 +732,8 @@ class ObjectListingIterator implements RemoteIterator { Path listPath, S3ListRequest request) throws IOException { this.listPath = listPath; - this.maxKeys = owner.getMaxKeys(); - this.objects = owner.listObjects(request); + this.maxKeys = listingOperationCallbacks.getMaxKeys(); + this.objects = listingOperationCallbacks.listObjects(request); this.request = request; } @@ -616,7 +775,8 @@ public S3ListResult next() throws IOException { // need to request a new set of objects. LOG.debug("[{}], Requesting next {} objects under {}", listingCount, maxKeys, listPath); - objects = owner.continueListObjects(request, objects); + objects = listingOperationCallbacks + .continueListObjects(request, objects); listingCount++; LOG.debug("New listing status: {}", this); } catch (AmazonClientException e) { @@ -716,7 +876,8 @@ public boolean hasNext() throws IOException { @Override public S3ALocatedFileStatus next() throws IOException { - return owner.toLocatedFileStatus(statusIterator.next()); + return listingOperationCallbacks + .toLocatedFileStatus(statusIterator.next()); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 286df44939..2cd23255c4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -105,6 +105,7 @@ import org.apache.hadoop.fs.s3a.impl.CopyOutcome; import org.apache.hadoop.fs.s3a.impl.DeleteOperation; import org.apache.hadoop.fs.s3a.impl.InternalConstants; +import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks; import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport; import org.apache.hadoop.fs.s3a.impl.OperationCallbacks; import org.apache.hadoop.fs.s3a.impl.RenameOperation; @@ -148,7 +149,6 @@ import org.apache.hadoop.fs.s3a.select.SelectBinding; import org.apache.hadoop.fs.s3a.select.SelectConstants; import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata; -import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator; import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; import org.apache.hadoop.fs.s3a.s3guard.PathMetadata; import org.apache.hadoop.fs.s3a.s3guard.S3Guard; @@ -293,6 +293,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private final S3AFileSystem.OperationCallbacksImpl operationCallbacks = new OperationCallbacksImpl(); + private final ListingOperationCallbacks listingOperationCallbacks = + new ListingOperationCallbacksImpl(); + /** Add any deprecated keys. */ @SuppressWarnings("deprecation") private static void addDeprecatedKeys() { @@ -362,7 +365,6 @@ public void initialize(URI name, Configuration originalConf) FAIL_ON_METADATA_WRITE_ERROR_DEFAULT); maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); - listing = new Listing(this); partSize = getMultipartSizeProperty(conf, MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); multiPartThreshold = getMultipartSizeProperty(conf, @@ -455,6 +457,7 @@ public void initialize(URI name, Configuration originalConf) pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE, BULK_DELETE_PAGE_SIZE_DEFAULT, 0); + listing = new Listing(listingOperationCallbacks, createStoreContext()); } catch (AmazonClientException e) { // amazon client exception: stop all services then throw the translation stopAllServices(); @@ -589,6 +592,14 @@ public S3AInstrumentation getInstrumentation() { return instrumentation; } + /** + * Get current listing instance. + * @return this instance's listing. + */ + public Listing getListing() { + return listing; + } + /** * Set up the client bindings. * If delegation tokens are enabled, the FS first looks for a DT @@ -1599,6 +1610,61 @@ public RemoteIterator listObjects( } } + protected class ListingOperationCallbacksImpl implements + ListingOperationCallbacks { + + @Override + @Retries.RetryRaw + public S3ListResult listObjects( + S3ListRequest request) + throws IOException { + return S3AFileSystem.this.listObjects(request); + } + + @Override + @Retries.RetryRaw + public S3ListResult continueListObjects( + S3ListRequest request, + S3ListResult prevResult) + throws IOException { + return S3AFileSystem.this.continueListObjects(request, prevResult); + } + + @Override + public S3ALocatedFileStatus toLocatedFileStatus( + S3AFileStatus status) + throws IOException { + return S3AFileSystem.this.toLocatedFileStatus(status); + } + + @Override + public S3ListRequest createListObjectsRequest( + String key, + String delimiter) { + return S3AFileSystem.this.createListObjectsRequest(key, delimiter); + } + + @Override + public long getDefaultBlockSize(Path path) { + return S3AFileSystem.this.getDefaultBlockSize(path); + } + + @Override + public int getMaxKeys() { + return S3AFileSystem.this.getMaxKeys(); + } + + @Override + public ITtlTimeProvider getUpdatedTtlTimeProvider() { + return S3AFileSystem.this.ttlTimeProvider; + } + + @Override + public boolean allowAuthoritative(final Path p) { + return S3AFileSystem.this.allowAuthoritative(p); + } + } + /** * Low-level call to get at the object metadata. * @param path path to the object @@ -4216,7 +4282,7 @@ private RemoteIterator innerListFiles( // Assuming the path to be a directory // do a bulk operation. RemoteIterator listFilesAssumingDir = - getListFilesAssumingDir(path, + listing.getListFilesAssumingDir(path, recursive, acceptor, collectTombstones, @@ -4242,89 +4308,6 @@ private RemoteIterator innerListFiles( } } - /** - * List files under a path assuming the path to be a directory. - * @param path input path. - * @param recursive recursive listing? - * @param acceptor file status filter - * @param collectTombstones should tombstones be collected from S3Guard? - * @param forceNonAuthoritativeMS forces metadata store to act like non - * authoritative. This is useful when - * listFiles output is used by import tool. - * @return an iterator over listing. - * @throws IOException any exception. - */ - private RemoteIterator getListFilesAssumingDir( - Path path, - boolean recursive, Listing.FileStatusAcceptor acceptor, - boolean collectTombstones, - boolean forceNonAuthoritativeMS) throws IOException { - - String key = maybeAddTrailingSlash(pathToKey(path)); - String delimiter = recursive ? null : "/"; - LOG.debug("Requesting all entries under {} with delimiter '{}'", - key, delimiter); - final RemoteIterator cachedFilesIterator; - final Set tombstones; - boolean allowAuthoritative = allowAuthoritative(path); - if (recursive) { - final PathMetadata pm = metadataStore.get(path, true); - if (pm != null) { - if (pm.isDeleted()) { - OffsetDateTime deletedAt = OffsetDateTime - .ofInstant(Instant.ofEpochMilli( - pm.getFileStatus().getModificationTime()), - ZoneOffset.UTC); - throw new FileNotFoundException("Path " + path + " is recorded as " + - "deleted by S3Guard at " + deletedAt); - } - } - MetadataStoreListFilesIterator metadataStoreListFilesIterator = - new MetadataStoreListFilesIterator(metadataStore, pm, - allowAuthoritative); - tombstones = metadataStoreListFilesIterator.listTombstones(); - // if all of the below is true - // - authoritative access is allowed for this metadatastore - // for this directory, - // - all the directory listings are authoritative on the client - // - the caller does not force non-authoritative access - // return the listing without any further s3 access - if (!forceNonAuthoritativeMS && - allowAuthoritative && - metadataStoreListFilesIterator.isRecursivelyAuthoritative()) { - S3AFileStatus[] statuses = S3Guard.iteratorToStatuses( - metadataStoreListFilesIterator, tombstones); - cachedFilesIterator = listing.createProvidedFileStatusIterator( - statuses, ACCEPT_ALL, acceptor); - return listing.createLocatedFileStatusIterator(cachedFilesIterator); - } - cachedFilesIterator = metadataStoreListFilesIterator; - } else { - DirListingMetadata meta = - S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider, - allowAuthoritative); - if (meta != null) { - tombstones = meta.listTombstones(); - } else { - tombstones = null; - } - cachedFilesIterator = listing.createProvidedFileStatusIterator( - S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor); - if (allowAuthoritative && meta != null && meta.isAuthoritative()) { - // metadata listing is authoritative, so return it directly - return listing.createLocatedFileStatusIterator(cachedFilesIterator); - } - } - return listing.createTombstoneReconcilingIterator( - listing.createLocatedFileStatusIterator( - listing.createFileStatusListingIterator(path, - createListObjectsRequest(key, delimiter), - ACCEPT_ALL, - acceptor, - cachedFilesIterator)), - collectTombstones ? tombstones : null); - } - /** * Override superclass so as to add statistic collection. * {@inheritDoc} @@ -4363,7 +4346,7 @@ public RemoteIterator listLocatedStatus(final Path f, // trigger a list call directly. final RemoteIterator locatedFileStatusIteratorForDir = - getLocatedFileStatusIteratorForDir(path, filter); + listing.getLocatedFileStatusIteratorForDir(path, filter); // If no listing is present then path might be a file. if (!locatedFileStatusIteratorForDir.hasNext()) { @@ -4847,5 +4830,6 @@ public String getBucketLocation() throws IOException { public Path makeQualified(final Path path) { return S3AFileSystem.this.makeQualified(path); } + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java new file mode 100644 index 0000000000..d89cada846 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; +import org.apache.hadoop.fs.s3a.S3ListRequest; +import org.apache.hadoop.fs.s3a.S3ListResult; +import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider; + +/** + * These are all the callbacks which + * {@link org.apache.hadoop.fs.s3a.Listing} operations + * need, derived from the actual appropriate S3AFileSystem + * methods. + */ +public interface ListingOperationCallbacks { + + /** + * Initiate a {@code listObjects} operation, incrementing metrics + * in the process. + * + * Retry policy: retry untranslated. + * @param request request to initiate + * @return the results + * @throws IOException if the retry invocation raises one (it shouldn't). + */ + @Retries.RetryRaw + S3ListResult listObjects( + S3ListRequest request) + throws IOException; + + /** + * List the next set of objects. + * Retry policy: retry untranslated. + * @param request last list objects request to continue + * @param prevResult last paged result to continue from + * @return the next result object + * @throws IOException none, just there for retryUntranslated. + */ + @Retries.RetryRaw + S3ListResult continueListObjects( + S3ListRequest request, + S3ListResult prevResult) + throws IOException; + + /** + * Build a {@link S3ALocatedFileStatus} from a {@link FileStatus} instance. + * @param status file status + * @return a located status with block locations set up from this FS. + * @throws IOException IO Problems. + */ + S3ALocatedFileStatus toLocatedFileStatus( + S3AFileStatus status) + throws IOException; + /** + * Create a {@code ListObjectsRequest} request against this bucket, + * with the maximum keys returned in a query set by + * {@link this.getMaxKeys()}. + * @param key key for request + * @param delimiter any delimiter + * @return the request + */ + S3ListRequest createListObjectsRequest( + String key, + String delimiter); + + + /** + * Return the number of bytes that large input files should be optimally + * be split into to minimize I/O time. The given path will be used to + * locate the actual filesystem. The full path does not have to exist. + * @param path path of file + * @return the default block size for the path's filesystem + */ + long getDefaultBlockSize(Path path); + + /** + * Get the maximum key count. + * @return a value, valid after initialization + */ + int getMaxKeys(); + + /** + * Get the updated time provider for the current fs instance. + * @return implementation of {@link ITtlTimeProvider} + */ + ITtlTimeProvider getUpdatedTtlTimeProvider(); + + /** + * Is the path for this instance considered authoritative on the client, + * that is: will listing/status operations only be handled by the metastore, + * with no fallback to S3. + * @param p path + * @return true iff the path is authoritative on the client. + */ + boolean allowAuthoritative(Path p); +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java index e307c8db9b..cafa22fdec 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java @@ -210,6 +210,10 @@ public boolean isUseListV1() { return useListV1; } + public ContextAccessors getContextAccessors() { + return contextAccessors; + } + /** * Convert a key to a fully qualified path. * @param key input key diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java index 536481ac23..b5f29cc7a3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java @@ -348,8 +348,8 @@ protected long dumpRawS3ObjectStore( final CsvFile csv) throws IOException { S3AFileSystem fs = getFilesystem(); Path rootPath = fs.qualify(new Path("/")); - Listing listing = new Listing(fs); - S3ListRequest request = fs.createListObjectsRequest("", null); + Listing listing = fs.getListing(); + S3ListRequest request = listing.createListObjectsRequest("", null); long count = 0; RemoteIterator st = listing.createFileStatusListingIterator(rootPath, request, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java index 1a533bfe64..3472674183 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java @@ -68,7 +68,7 @@ public void testTombstoneReconcilingIterator() throws Exception { Path[] allFiles = {parent, liveChild, deletedChild}; Path[] liveFiles = {parent, liveChild}; - Listing listing = new Listing(fs); + Listing listing = fs.getListing(); Collection statuses = new ArrayList<>(); statuses.add(blankFileStatus(parent)); statuses.add(blankFileStatus(liveChild)); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java index c9d872e591..7fa03a16cd 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java @@ -32,8 +32,11 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.DeleteObjectsResult; import com.amazonaws.services.s3.model.MultiObjectDeleteException; +import com.amazonaws.services.s3.transfer.model.CopyResult; import com.google.common.collect.Lists; import org.assertj.core.api.Assertions; import org.junit.Before; @@ -42,14 +45,21 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.Invoker; import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.S3AInputPolicy; import org.apache.hadoop.fs.s3a.S3AInstrumentation; +import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; +import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3AStorageStatistics; +import org.apache.hadoop.fs.s3a.S3ListRequest; +import org.apache.hadoop.fs.s3a.S3ListResult; +import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState; import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata; import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider; @@ -230,6 +240,142 @@ private StoreContext createMockStoreContext(boolean multiDelete, .build(); } + private static class MinimalListingOperationCallbacks + implements ListingOperationCallbacks { + @Override + public S3ListResult listObjects(S3ListRequest request) + throws IOException { + return null; + } + + @Override + public S3ListResult continueListObjects( + S3ListRequest request, + S3ListResult prevResult) + throws IOException { + return null; + } + + @Override + public S3ALocatedFileStatus toLocatedFileStatus( + S3AFileStatus status) throws IOException { + return null; + } + + @Override + public S3ListRequest createListObjectsRequest( + String key, + String delimiter) { + return null; + } + + @Override + public long getDefaultBlockSize(Path path) { + return 0; + } + + @Override + public int getMaxKeys() { + return 0; + } + + @Override + public ITtlTimeProvider getUpdatedTtlTimeProvider() { + return null; + } + + @Override + public boolean allowAuthoritative(Path p) { + return false; + } + } + private static class MinimalOperationCallbacks + implements OperationCallbacks { + @Override + public S3ObjectAttributes createObjectAttributes( + Path path, + String eTag, + String versionId, + long len) { + return null; + } + + @Override + public S3ObjectAttributes createObjectAttributes( + S3AFileStatus fileStatus) { + return null; + } + + @Override + public S3AReadOpContext createReadContext( + FileStatus fileStatus) { + return null; + } + + @Override + public void finishRename( + Path sourceRenamed, + Path destCreated) + throws IOException { + + } + + @Override + public void deleteObjectAtPath( + Path path, + String key, + boolean isFile, + BulkOperationState operationState) + throws IOException { + + } + + @Override + public RemoteIterator listFilesAndEmptyDirectories( + Path path, + S3AFileStatus status, + boolean collectTombstones, + boolean includeSelf) + throws IOException { + return null; + } + + @Override + public CopyResult copyFile( + String srcKey, + String destKey, + S3ObjectAttributes srcAttributes, + S3AReadOpContext readContext) + throws IOException { + return null; + } + + @Override + public DeleteObjectsResult removeKeys( + List keysToDelete, + boolean deleteFakeDir, + List undeletedObjectsOnFailure, + BulkOperationState operationState, + boolean quiet) + throws MultiObjectDeleteException, AmazonClientException, + IOException { + return null; + } + + @Override + public boolean allowAuthoritative(Path p) { + return false; + } + + @Override + public RemoteIterator listObjects( + Path path, + String key) + throws IOException { + return null; + } + } + private static class MinimalContextAccessor implements ContextAccessors { @Override @@ -333,7 +479,8 @@ public void delete(final Path path, @Override public void deletePaths(final Collection paths, - @Nullable final BulkOperationState operationState) throws IOException { + @Nullable final BulkOperationState operationState) + throws IOException { deleted.addAll(paths); }