HADOOP-17131. Refactor S3A Listing code for better isolation. (#2148)
Contributed by Mukund Thakur. Change-Id: I79160b236a92fdd67565a4b4974f1862e600c210
This commit is contained in:
parent
b5d712251c
commit
251d2d1fa5
@ -30,11 +30,22 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
|
||||
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
|
||||
import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
@ -46,25 +57,31 @@
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.maybeAddTrailingSlash;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.objectRepresentsDirectory;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.stringify;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
|
||||
import static org.apache.hadoop.fs.s3a.auth.RoleModel.pathToKey;
|
||||
|
||||
/**
|
||||
* Place for the S3A listing classes; keeps all the small classes under control.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class Listing {
|
||||
public class Listing extends AbstractStoreOperation {
|
||||
|
||||
private final S3AFileSystem owner;
|
||||
private static final Logger LOG = S3AFileSystem.LOG;
|
||||
|
||||
static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
|
||||
new AcceptAllButS3nDirs();
|
||||
|
||||
public Listing(S3AFileSystem owner) {
|
||||
this.owner = owner;
|
||||
private final ListingOperationCallbacks listingOperationCallbacks;
|
||||
|
||||
public Listing(ListingOperationCallbacks listingOperationCallbacks,
|
||||
StoreContext storeContext) {
|
||||
super(storeContext);
|
||||
this.listingOperationCallbacks = listingOperationCallbacks;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -156,6 +173,145 @@ TombstoneReconcilingIterator createTombstoneReconcilingIterator(
|
||||
return new TombstoneReconcilingIterator(iterator, tombstones);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* List files under a path assuming the path to be a directory.
|
||||
* @param path input path.
|
||||
* @param recursive recursive listing?
|
||||
* @param acceptor file status filter
|
||||
* @param collectTombstones should tombstones be collected from S3Guard?
|
||||
* @param forceNonAuthoritativeMS forces metadata store to act like non
|
||||
* authoritative. This is useful when
|
||||
* listFiles output is used by import tool.
|
||||
* @return an iterator over listing.
|
||||
* @throws IOException any exception.
|
||||
*/
|
||||
public RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
|
||||
Path path,
|
||||
boolean recursive, Listing.FileStatusAcceptor acceptor,
|
||||
boolean collectTombstones,
|
||||
boolean forceNonAuthoritativeMS) throws IOException {
|
||||
|
||||
String key = maybeAddTrailingSlash(pathToKey(path));
|
||||
String delimiter = recursive ? null : "/";
|
||||
LOG.debug("Requesting all entries under {} with delimiter '{}'",
|
||||
key, delimiter);
|
||||
final RemoteIterator<S3AFileStatus> cachedFilesIterator;
|
||||
final Set<Path> tombstones;
|
||||
boolean allowAuthoritative = listingOperationCallbacks
|
||||
.allowAuthoritative(path);
|
||||
if (recursive) {
|
||||
final PathMetadata pm = getStoreContext()
|
||||
.getMetadataStore()
|
||||
.get(path, true);
|
||||
if (pm != null) {
|
||||
if (pm.isDeleted()) {
|
||||
OffsetDateTime deletedAt = OffsetDateTime
|
||||
.ofInstant(Instant.ofEpochMilli(
|
||||
pm.getFileStatus().getModificationTime()),
|
||||
ZoneOffset.UTC);
|
||||
throw new FileNotFoundException("Path " + path + " is recorded as " +
|
||||
"deleted by S3Guard at " + deletedAt);
|
||||
}
|
||||
}
|
||||
MetadataStoreListFilesIterator metadataStoreListFilesIterator =
|
||||
new MetadataStoreListFilesIterator(
|
||||
getStoreContext().getMetadataStore(),
|
||||
pm,
|
||||
allowAuthoritative);
|
||||
tombstones = metadataStoreListFilesIterator.listTombstones();
|
||||
// if all of the below is true
|
||||
// - authoritative access is allowed for this metadatastore
|
||||
// for this directory,
|
||||
// - all the directory listings are authoritative on the client
|
||||
// - the caller does not force non-authoritative access
|
||||
// return the listing without any further s3 access
|
||||
if (!forceNonAuthoritativeMS &&
|
||||
allowAuthoritative &&
|
||||
metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
|
||||
S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
|
||||
metadataStoreListFilesIterator, tombstones);
|
||||
cachedFilesIterator = createProvidedFileStatusIterator(
|
||||
statuses, ACCEPT_ALL, acceptor);
|
||||
return createLocatedFileStatusIterator(cachedFilesIterator);
|
||||
}
|
||||
cachedFilesIterator = metadataStoreListFilesIterator;
|
||||
} else {
|
||||
DirListingMetadata meta =
|
||||
S3Guard.listChildrenWithTtl(
|
||||
getStoreContext().getMetadataStore(),
|
||||
path,
|
||||
listingOperationCallbacks.getUpdatedTtlTimeProvider(),
|
||||
allowAuthoritative);
|
||||
if (meta != null) {
|
||||
tombstones = meta.listTombstones();
|
||||
} else {
|
||||
tombstones = null;
|
||||
}
|
||||
cachedFilesIterator = createProvidedFileStatusIterator(
|
||||
S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
|
||||
if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
|
||||
// metadata listing is authoritative, so return it directly
|
||||
return createLocatedFileStatusIterator(cachedFilesIterator);
|
||||
}
|
||||
}
|
||||
return createTombstoneReconcilingIterator(
|
||||
createLocatedFileStatusIterator(
|
||||
createFileStatusListingIterator(path,
|
||||
listingOperationCallbacks
|
||||
.createListObjectsRequest(key, delimiter),
|
||||
ACCEPT_ALL,
|
||||
acceptor,
|
||||
cachedFilesIterator)),
|
||||
collectTombstones ? tombstones : null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate list located status for a directory.
|
||||
* Also performing tombstone reconciliation for guarded directories.
|
||||
* @param dir directory to check.
|
||||
* @param filter a path filter.
|
||||
* @return an iterator that traverses statuses of the given dir.
|
||||
* @throws IOException in case of failure.
|
||||
*/
|
||||
public RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
|
||||
Path dir, PathFilter filter) throws IOException {
|
||||
final String key = maybeAddTrailingSlash(pathToKey(dir));
|
||||
final Listing.FileStatusAcceptor acceptor =
|
||||
new Listing.AcceptAllButSelfAndS3nDirs(dir);
|
||||
boolean allowAuthoritative = listingOperationCallbacks
|
||||
.allowAuthoritative(dir);
|
||||
DirListingMetadata meta =
|
||||
S3Guard.listChildrenWithTtl(getStoreContext().getMetadataStore(),
|
||||
dir,
|
||||
listingOperationCallbacks
|
||||
.getUpdatedTtlTimeProvider(),
|
||||
allowAuthoritative);
|
||||
Set<Path> tombstones = meta != null
|
||||
? meta.listTombstones()
|
||||
: null;
|
||||
final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
|
||||
createProvidedFileStatusIterator(
|
||||
S3Guard.dirMetaToStatuses(meta), filter, acceptor);
|
||||
return (allowAuthoritative && meta != null
|
||||
&& meta.isAuthoritative())
|
||||
? createLocatedFileStatusIterator(
|
||||
cachedFileStatusIterator)
|
||||
: createTombstoneReconcilingIterator(
|
||||
createLocatedFileStatusIterator(
|
||||
createFileStatusListingIterator(dir,
|
||||
listingOperationCallbacks
|
||||
.createListObjectsRequest(key, "/"),
|
||||
filter,
|
||||
acceptor,
|
||||
cachedFileStatusIterator)),
|
||||
tombstones);
|
||||
}
|
||||
|
||||
public S3ListRequest createListObjectsRequest(String key, String delimiter) {
|
||||
return listingOperationCallbacks.createListObjectsRequest(key, delimiter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Interface to implement by the logic deciding whether to accept a summary
|
||||
* entry or path as a valid file or directory.
|
||||
@ -193,9 +349,9 @@ interface FileStatusAcceptor {
|
||||
* value.
|
||||
*
|
||||
* If the status value is null, the iterator declares that it has no data.
|
||||
* This iterator is used to handle {@link S3AFileSystem#listStatus} calls
|
||||
* where the path handed in refers to a file, not a directory: this is the
|
||||
* iterator returned.
|
||||
* This iterator is used to handle {@link S3AFileSystem#listStatus(Path)}
|
||||
* calls where the path handed in refers to a file, not a directory:
|
||||
* this is the iterator returned.
|
||||
*/
|
||||
static final class SingleStatusRemoteIterator
|
||||
implements RemoteIterator<S3ALocatedFileStatus> {
|
||||
@ -465,14 +621,15 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
|
||||
// objects
|
||||
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
|
||||
String key = summary.getKey();
|
||||
Path keyPath = owner.keyToQualifiedPath(key);
|
||||
Path keyPath = getStoreContext().getContextAccessors().keyToPath(key);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("{}: {}", keyPath, stringify(summary));
|
||||
}
|
||||
// Skip over keys that are ourselves and old S3N _$folder$ files
|
||||
if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) {
|
||||
S3AFileStatus status = createFileStatus(keyPath, summary,
|
||||
owner.getDefaultBlockSize(keyPath), owner.getUsername(),
|
||||
listingOperationCallbacks.getDefaultBlockSize(keyPath),
|
||||
getStoreContext().getUsername(),
|
||||
summary.getETag(), null);
|
||||
LOG.debug("Adding: {}", status);
|
||||
stats.add(status);
|
||||
@ -485,10 +642,12 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
|
||||
|
||||
// prefixes: always directories
|
||||
for (String prefix : objects.getCommonPrefixes()) {
|
||||
Path keyPath = owner.keyToQualifiedPath(prefix);
|
||||
Path keyPath = getStoreContext()
|
||||
.getContextAccessors()
|
||||
.keyToPath(prefix);
|
||||
if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) {
|
||||
S3AFileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath,
|
||||
owner.getUsername());
|
||||
getStoreContext().getUsername());
|
||||
LOG.debug("Adding directory: {}", status);
|
||||
added++;
|
||||
stats.add(status);
|
||||
@ -573,8 +732,8 @@ class ObjectListingIterator implements RemoteIterator<S3ListResult> {
|
||||
Path listPath,
|
||||
S3ListRequest request) throws IOException {
|
||||
this.listPath = listPath;
|
||||
this.maxKeys = owner.getMaxKeys();
|
||||
this.objects = owner.listObjects(request);
|
||||
this.maxKeys = listingOperationCallbacks.getMaxKeys();
|
||||
this.objects = listingOperationCallbacks.listObjects(request);
|
||||
this.request = request;
|
||||
}
|
||||
|
||||
@ -616,7 +775,8 @@ public S3ListResult next() throws IOException {
|
||||
// need to request a new set of objects.
|
||||
LOG.debug("[{}], Requesting next {} objects under {}",
|
||||
listingCount, maxKeys, listPath);
|
||||
objects = owner.continueListObjects(request, objects);
|
||||
objects = listingOperationCallbacks
|
||||
.continueListObjects(request, objects);
|
||||
listingCount++;
|
||||
LOG.debug("New listing status: {}", this);
|
||||
} catch (AmazonClientException e) {
|
||||
@ -716,7 +876,8 @@ public boolean hasNext() throws IOException {
|
||||
|
||||
@Override
|
||||
public S3ALocatedFileStatus next() throws IOException {
|
||||
return owner.toLocatedFileStatus(statusIterator.next());
|
||||
return listingOperationCallbacks
|
||||
.toLocatedFileStatus(statusIterator.next());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -105,6 +105,7 @@
|
||||
import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
|
||||
import org.apache.hadoop.fs.s3a.impl.DeleteOperation;
|
||||
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
|
||||
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
|
||||
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
|
||||
import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
|
||||
import org.apache.hadoop.fs.s3a.impl.RenameOperation;
|
||||
@ -148,7 +149,6 @@
|
||||
import org.apache.hadoop.fs.s3a.select.SelectBinding;
|
||||
import org.apache.hadoop.fs.s3a.select.SelectConstants;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
||||
@ -293,6 +293,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||
private final S3AFileSystem.OperationCallbacksImpl
|
||||
operationCallbacks = new OperationCallbacksImpl();
|
||||
|
||||
private final ListingOperationCallbacks listingOperationCallbacks =
|
||||
new ListingOperationCallbacksImpl();
|
||||
|
||||
/** Add any deprecated keys. */
|
||||
@SuppressWarnings("deprecation")
|
||||
private static void addDeprecatedKeys() {
|
||||
@ -362,7 +365,6 @@ public void initialize(URI name, Configuration originalConf)
|
||||
FAIL_ON_METADATA_WRITE_ERROR_DEFAULT);
|
||||
|
||||
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
|
||||
listing = new Listing(this);
|
||||
partSize = getMultipartSizeProperty(conf,
|
||||
MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
|
||||
multiPartThreshold = getMultipartSizeProperty(conf,
|
||||
@ -455,6 +457,7 @@ public void initialize(URI name, Configuration originalConf)
|
||||
|
||||
pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
|
||||
BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
|
||||
listing = new Listing(listingOperationCallbacks, createStoreContext());
|
||||
} catch (AmazonClientException e) {
|
||||
// amazon client exception: stop all services then throw the translation
|
||||
stopAllServices();
|
||||
@ -589,6 +592,14 @@ public S3AInstrumentation getInstrumentation() {
|
||||
return instrumentation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current listing instance.
|
||||
* @return this instance's listing.
|
||||
*/
|
||||
public Listing getListing() {
|
||||
return listing;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up the client bindings.
|
||||
* If delegation tokens are enabled, the FS first looks for a DT
|
||||
@ -1599,6 +1610,61 @@ public RemoteIterator<S3AFileStatus> listObjects(
|
||||
}
|
||||
}
|
||||
|
||||
protected class ListingOperationCallbacksImpl implements
|
||||
ListingOperationCallbacks {
|
||||
|
||||
@Override
|
||||
@Retries.RetryRaw
|
||||
public S3ListResult listObjects(
|
||||
S3ListRequest request)
|
||||
throws IOException {
|
||||
return S3AFileSystem.this.listObjects(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Retries.RetryRaw
|
||||
public S3ListResult continueListObjects(
|
||||
S3ListRequest request,
|
||||
S3ListResult prevResult)
|
||||
throws IOException {
|
||||
return S3AFileSystem.this.continueListObjects(request, prevResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
public S3ALocatedFileStatus toLocatedFileStatus(
|
||||
S3AFileStatus status)
|
||||
throws IOException {
|
||||
return S3AFileSystem.this.toLocatedFileStatus(status);
|
||||
}
|
||||
|
||||
@Override
|
||||
public S3ListRequest createListObjectsRequest(
|
||||
String key,
|
||||
String delimiter) {
|
||||
return S3AFileSystem.this.createListObjectsRequest(key, delimiter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDefaultBlockSize(Path path) {
|
||||
return S3AFileSystem.this.getDefaultBlockSize(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxKeys() {
|
||||
return S3AFileSystem.this.getMaxKeys();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ITtlTimeProvider getUpdatedTtlTimeProvider() {
|
||||
return S3AFileSystem.this.ttlTimeProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean allowAuthoritative(final Path p) {
|
||||
return S3AFileSystem.this.allowAuthoritative(p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Low-level call to get at the object metadata.
|
||||
* @param path path to the object
|
||||
@ -4216,7 +4282,7 @@ private RemoteIterator<S3ALocatedFileStatus> innerListFiles(
|
||||
// Assuming the path to be a directory
|
||||
// do a bulk operation.
|
||||
RemoteIterator<S3ALocatedFileStatus> listFilesAssumingDir =
|
||||
getListFilesAssumingDir(path,
|
||||
listing.getListFilesAssumingDir(path,
|
||||
recursive,
|
||||
acceptor,
|
||||
collectTombstones,
|
||||
@ -4242,89 +4308,6 @@ private RemoteIterator<S3ALocatedFileStatus> innerListFiles(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* List files under a path assuming the path to be a directory.
|
||||
* @param path input path.
|
||||
* @param recursive recursive listing?
|
||||
* @param acceptor file status filter
|
||||
* @param collectTombstones should tombstones be collected from S3Guard?
|
||||
* @param forceNonAuthoritativeMS forces metadata store to act like non
|
||||
* authoritative. This is useful when
|
||||
* listFiles output is used by import tool.
|
||||
* @return an iterator over listing.
|
||||
* @throws IOException any exception.
|
||||
*/
|
||||
private RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
|
||||
Path path,
|
||||
boolean recursive, Listing.FileStatusAcceptor acceptor,
|
||||
boolean collectTombstones,
|
||||
boolean forceNonAuthoritativeMS) throws IOException {
|
||||
|
||||
String key = maybeAddTrailingSlash(pathToKey(path));
|
||||
String delimiter = recursive ? null : "/";
|
||||
LOG.debug("Requesting all entries under {} with delimiter '{}'",
|
||||
key, delimiter);
|
||||
final RemoteIterator<S3AFileStatus> cachedFilesIterator;
|
||||
final Set<Path> tombstones;
|
||||
boolean allowAuthoritative = allowAuthoritative(path);
|
||||
if (recursive) {
|
||||
final PathMetadata pm = metadataStore.get(path, true);
|
||||
if (pm != null) {
|
||||
if (pm.isDeleted()) {
|
||||
OffsetDateTime deletedAt = OffsetDateTime
|
||||
.ofInstant(Instant.ofEpochMilli(
|
||||
pm.getFileStatus().getModificationTime()),
|
||||
ZoneOffset.UTC);
|
||||
throw new FileNotFoundException("Path " + path + " is recorded as " +
|
||||
"deleted by S3Guard at " + deletedAt);
|
||||
}
|
||||
}
|
||||
MetadataStoreListFilesIterator metadataStoreListFilesIterator =
|
||||
new MetadataStoreListFilesIterator(metadataStore, pm,
|
||||
allowAuthoritative);
|
||||
tombstones = metadataStoreListFilesIterator.listTombstones();
|
||||
// if all of the below is true
|
||||
// - authoritative access is allowed for this metadatastore
|
||||
// for this directory,
|
||||
// - all the directory listings are authoritative on the client
|
||||
// - the caller does not force non-authoritative access
|
||||
// return the listing without any further s3 access
|
||||
if (!forceNonAuthoritativeMS &&
|
||||
allowAuthoritative &&
|
||||
metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
|
||||
S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
|
||||
metadataStoreListFilesIterator, tombstones);
|
||||
cachedFilesIterator = listing.createProvidedFileStatusIterator(
|
||||
statuses, ACCEPT_ALL, acceptor);
|
||||
return listing.createLocatedFileStatusIterator(cachedFilesIterator);
|
||||
}
|
||||
cachedFilesIterator = metadataStoreListFilesIterator;
|
||||
} else {
|
||||
DirListingMetadata meta =
|
||||
S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
|
||||
allowAuthoritative);
|
||||
if (meta != null) {
|
||||
tombstones = meta.listTombstones();
|
||||
} else {
|
||||
tombstones = null;
|
||||
}
|
||||
cachedFilesIterator = listing.createProvidedFileStatusIterator(
|
||||
S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
|
||||
if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
|
||||
// metadata listing is authoritative, so return it directly
|
||||
return listing.createLocatedFileStatusIterator(cachedFilesIterator);
|
||||
}
|
||||
}
|
||||
return listing.createTombstoneReconcilingIterator(
|
||||
listing.createLocatedFileStatusIterator(
|
||||
listing.createFileStatusListingIterator(path,
|
||||
createListObjectsRequest(key, delimiter),
|
||||
ACCEPT_ALL,
|
||||
acceptor,
|
||||
cachedFilesIterator)),
|
||||
collectTombstones ? tombstones : null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Override superclass so as to add statistic collection.
|
||||
* {@inheritDoc}
|
||||
@ -4363,7 +4346,7 @@ public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
|
||||
// trigger a list call directly.
|
||||
final RemoteIterator<S3ALocatedFileStatus>
|
||||
locatedFileStatusIteratorForDir =
|
||||
getLocatedFileStatusIteratorForDir(path, filter);
|
||||
listing.getLocatedFileStatusIteratorForDir(path, filter);
|
||||
|
||||
// If no listing is present then path might be a file.
|
||||
if (!locatedFileStatusIteratorForDir.hasNext()) {
|
||||
@ -4847,5 +4830,6 @@ public String getBucketLocation() throws IOException {
|
||||
public Path makeQualified(final Path path) {
|
||||
return S3AFileSystem.this.makeQualified(path);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,119 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.Retries;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
|
||||
import org.apache.hadoop.fs.s3a.S3ListRequest;
|
||||
import org.apache.hadoop.fs.s3a.S3ListResult;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
|
||||
|
||||
/**
|
||||
* These are all the callbacks which
|
||||
* {@link org.apache.hadoop.fs.s3a.Listing} operations
|
||||
* need, derived from the actual appropriate S3AFileSystem
|
||||
* methods.
|
||||
*/
|
||||
public interface ListingOperationCallbacks {
|
||||
|
||||
/**
|
||||
* Initiate a {@code listObjects} operation, incrementing metrics
|
||||
* in the process.
|
||||
*
|
||||
* Retry policy: retry untranslated.
|
||||
* @param request request to initiate
|
||||
* @return the results
|
||||
* @throws IOException if the retry invocation raises one (it shouldn't).
|
||||
*/
|
||||
@Retries.RetryRaw
|
||||
S3ListResult listObjects(
|
||||
S3ListRequest request)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* List the next set of objects.
|
||||
* Retry policy: retry untranslated.
|
||||
* @param request last list objects request to continue
|
||||
* @param prevResult last paged result to continue from
|
||||
* @return the next result object
|
||||
* @throws IOException none, just there for retryUntranslated.
|
||||
*/
|
||||
@Retries.RetryRaw
|
||||
S3ListResult continueListObjects(
|
||||
S3ListRequest request,
|
||||
S3ListResult prevResult)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Build a {@link S3ALocatedFileStatus} from a {@link FileStatus} instance.
|
||||
* @param status file status
|
||||
* @return a located status with block locations set up from this FS.
|
||||
* @throws IOException IO Problems.
|
||||
*/
|
||||
S3ALocatedFileStatus toLocatedFileStatus(
|
||||
S3AFileStatus status)
|
||||
throws IOException;
|
||||
/**
|
||||
* Create a {@code ListObjectsRequest} request against this bucket,
|
||||
* with the maximum keys returned in a query set by
|
||||
* {@link this.getMaxKeys()}.
|
||||
* @param key key for request
|
||||
* @param delimiter any delimiter
|
||||
* @return the request
|
||||
*/
|
||||
S3ListRequest createListObjectsRequest(
|
||||
String key,
|
||||
String delimiter);
|
||||
|
||||
|
||||
/**
|
||||
* Return the number of bytes that large input files should be optimally
|
||||
* be split into to minimize I/O time. The given path will be used to
|
||||
* locate the actual filesystem. The full path does not have to exist.
|
||||
* @param path path of file
|
||||
* @return the default block size for the path's filesystem
|
||||
*/
|
||||
long getDefaultBlockSize(Path path);
|
||||
|
||||
/**
|
||||
* Get the maximum key count.
|
||||
* @return a value, valid after initialization
|
||||
*/
|
||||
int getMaxKeys();
|
||||
|
||||
/**
|
||||
* Get the updated time provider for the current fs instance.
|
||||
* @return implementation of {@link ITtlTimeProvider}
|
||||
*/
|
||||
ITtlTimeProvider getUpdatedTtlTimeProvider();
|
||||
|
||||
/**
|
||||
* Is the path for this instance considered authoritative on the client,
|
||||
* that is: will listing/status operations only be handled by the metastore,
|
||||
* with no fallback to S3.
|
||||
* @param p path
|
||||
* @return true iff the path is authoritative on the client.
|
||||
*/
|
||||
boolean allowAuthoritative(Path p);
|
||||
}
|
@ -210,6 +210,10 @@ public boolean isUseListV1() {
|
||||
return useListV1;
|
||||
}
|
||||
|
||||
public ContextAccessors getContextAccessors() {
|
||||
return contextAccessors;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a key to a fully qualified path.
|
||||
* @param key input key
|
||||
|
@ -348,8 +348,8 @@ protected long dumpRawS3ObjectStore(
|
||||
final CsvFile csv) throws IOException {
|
||||
S3AFileSystem fs = getFilesystem();
|
||||
Path rootPath = fs.qualify(new Path("/"));
|
||||
Listing listing = new Listing(fs);
|
||||
S3ListRequest request = fs.createListObjectsRequest("", null);
|
||||
Listing listing = fs.getListing();
|
||||
S3ListRequest request = listing.createListObjectsRequest("", null);
|
||||
long count = 0;
|
||||
RemoteIterator<S3AFileStatus> st =
|
||||
listing.createFileStatusListingIterator(rootPath, request,
|
||||
|
@ -68,7 +68,7 @@ public void testTombstoneReconcilingIterator() throws Exception {
|
||||
Path[] allFiles = {parent, liveChild, deletedChild};
|
||||
Path[] liveFiles = {parent, liveChild};
|
||||
|
||||
Listing listing = new Listing(fs);
|
||||
Listing listing = fs.getListing();
|
||||
Collection<FileStatus> statuses = new ArrayList<>();
|
||||
statuses.add(blankFileStatus(parent));
|
||||
statuses.add(blankFileStatus(liveChild));
|
||||
|
@ -32,8 +32,11 @@
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectsResult;
|
||||
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
|
||||
import com.amazonaws.services.s3.transfer.model.CopyResult;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Before;
|
||||
@ -42,14 +45,21 @@
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.s3a.Constants;
|
||||
import org.apache.hadoop.fs.s3a.Invoker;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
|
||||
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
|
||||
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
|
||||
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
|
||||
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
|
||||
import org.apache.hadoop.fs.s3a.S3ListRequest;
|
||||
import org.apache.hadoop.fs.s3a.S3ListResult;
|
||||
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
|
||||
@ -230,6 +240,142 @@ private StoreContext createMockStoreContext(boolean multiDelete,
|
||||
.build();
|
||||
}
|
||||
|
||||
private static class MinimalListingOperationCallbacks
|
||||
implements ListingOperationCallbacks {
|
||||
@Override
|
||||
public S3ListResult listObjects(S3ListRequest request)
|
||||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public S3ListResult continueListObjects(
|
||||
S3ListRequest request,
|
||||
S3ListResult prevResult)
|
||||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public S3ALocatedFileStatus toLocatedFileStatus(
|
||||
S3AFileStatus status) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public S3ListRequest createListObjectsRequest(
|
||||
String key,
|
||||
String delimiter) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDefaultBlockSize(Path path) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxKeys() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ITtlTimeProvider getUpdatedTtlTimeProvider() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean allowAuthoritative(Path p) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
private static class MinimalOperationCallbacks
|
||||
implements OperationCallbacks {
|
||||
@Override
|
||||
public S3ObjectAttributes createObjectAttributes(
|
||||
Path path,
|
||||
String eTag,
|
||||
String versionId,
|
||||
long len) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public S3ObjectAttributes createObjectAttributes(
|
||||
S3AFileStatus fileStatus) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public S3AReadOpContext createReadContext(
|
||||
FileStatus fileStatus) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finishRename(
|
||||
Path sourceRenamed,
|
||||
Path destCreated)
|
||||
throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteObjectAtPath(
|
||||
Path path,
|
||||
String key,
|
||||
boolean isFile,
|
||||
BulkOperationState operationState)
|
||||
throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
|
||||
Path path,
|
||||
S3AFileStatus status,
|
||||
boolean collectTombstones,
|
||||
boolean includeSelf)
|
||||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CopyResult copyFile(
|
||||
String srcKey,
|
||||
String destKey,
|
||||
S3ObjectAttributes srcAttributes,
|
||||
S3AReadOpContext readContext)
|
||||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteObjectsResult removeKeys(
|
||||
List<DeleteObjectsRequest.KeyVersion> keysToDelete,
|
||||
boolean deleteFakeDir,
|
||||
List<Path> undeletedObjectsOnFailure,
|
||||
BulkOperationState operationState,
|
||||
boolean quiet)
|
||||
throws MultiObjectDeleteException, AmazonClientException,
|
||||
IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean allowAuthoritative(Path p) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteIterator<S3AFileStatus> listObjects(
|
||||
Path path,
|
||||
String key)
|
||||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static class MinimalContextAccessor implements ContextAccessors {
|
||||
|
||||
@Override
|
||||
@ -333,7 +479,8 @@ public void delete(final Path path,
|
||||
|
||||
@Override
|
||||
public void deletePaths(final Collection<Path> paths,
|
||||
@Nullable final BulkOperationState operationState) throws IOException {
|
||||
@Nullable final BulkOperationState operationState)
|
||||
throws IOException {
|
||||
deleted.addAll(paths);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user