HADOOP-17022. Tune S3AFileSystem.listFiles() API.
Contributed by Mukund Thakur. Change-Id: I17f5cfdcd25670ce3ddb62c13378c7e2dc06ba52
This commit is contained in:
parent
cac2fc1f58
commit
8b601ad7e6
@ -4206,79 +4206,125 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||
Path path = qualify(f);
|
||||
LOG.debug("listFiles({}, {})", path, recursive);
|
||||
try {
|
||||
// if a status was given, that is used, otherwise
|
||||
// call getFileStatus, which triggers an existence check
|
||||
final S3AFileStatus fileStatus = status != null
|
||||
? status
|
||||
: (S3AFileStatus) getFileStatus(path);
|
||||
if (fileStatus.isFile()) {
|
||||
// if a status was given and it is a file.
|
||||
if (status != null && status.isFile()) {
|
||||
// simple case: File
|
||||
LOG.debug("Path is a file");
|
||||
LOG.debug("Path is a file: {}", path);
|
||||
return new Listing.SingleStatusRemoteIterator(
|
||||
toLocatedFileStatus(fileStatus));
|
||||
} else {
|
||||
// directory: do a bulk operation
|
||||
String key = maybeAddTrailingSlash(pathToKey(path));
|
||||
String delimiter = recursive ? null : "/";
|
||||
LOG.debug("Requesting all entries under {} with delimiter '{}'",
|
||||
key, delimiter);
|
||||
final RemoteIterator<S3AFileStatus> cachedFilesIterator;
|
||||
final Set<Path> tombstones;
|
||||
boolean allowAuthoritative = allowAuthoritative(f);
|
||||
if (recursive) {
|
||||
final PathMetadata pm = metadataStore.get(path, true);
|
||||
// shouldn't need to check pm.isDeleted() because that will have
|
||||
// been caught by getFileStatus above.
|
||||
MetadataStoreListFilesIterator metadataStoreListFilesIterator =
|
||||
new MetadataStoreListFilesIterator(metadataStore, pm,
|
||||
allowAuthoritative);
|
||||
tombstones = metadataStoreListFilesIterator.listTombstones();
|
||||
// if all of the below is true
|
||||
// - authoritative access is allowed for this metadatastore for this directory,
|
||||
// - all the directory listings are authoritative on the client
|
||||
// - the caller does not force non-authoritative access
|
||||
// return the listing without any further s3 access
|
||||
if (!forceNonAuthoritativeMS &&
|
||||
allowAuthoritative &&
|
||||
metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
|
||||
S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
|
||||
metadataStoreListFilesIterator, tombstones);
|
||||
cachedFilesIterator = listing.createProvidedFileStatusIterator(
|
||||
statuses, ACCEPT_ALL, acceptor);
|
||||
return listing.createLocatedFileStatusIterator(cachedFilesIterator);
|
||||
}
|
||||
cachedFilesIterator = metadataStoreListFilesIterator;
|
||||
} else {
|
||||
DirListingMetadata meta =
|
||||
S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
|
||||
allowAuthoritative);
|
||||
if (meta != null) {
|
||||
tombstones = meta.listTombstones();
|
||||
} else {
|
||||
tombstones = null;
|
||||
}
|
||||
cachedFilesIterator = listing.createProvidedFileStatusIterator(
|
||||
S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
|
||||
if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
|
||||
// metadata listing is authoritative, so return it directly
|
||||
return listing.createLocatedFileStatusIterator(cachedFilesIterator);
|
||||
}
|
||||
}
|
||||
return listing.createTombstoneReconcilingIterator(
|
||||
listing.createLocatedFileStatusIterator(
|
||||
listing.createFileStatusListingIterator(path,
|
||||
createListObjectsRequest(key, delimiter),
|
||||
ACCEPT_ALL,
|
||||
acceptor,
|
||||
cachedFilesIterator)),
|
||||
collectTombstones ? tombstones : null);
|
||||
toLocatedFileStatus(status));
|
||||
}
|
||||
// Assuming the path to be a directory
|
||||
// do a bulk operation.
|
||||
RemoteIterator<S3ALocatedFileStatus> listFilesAssumingDir =
|
||||
getListFilesAssumingDir(path,
|
||||
recursive,
|
||||
acceptor,
|
||||
collectTombstones,
|
||||
forceNonAuthoritativeMS);
|
||||
// If there are no list entries present, we
|
||||
// fallback to file existence check as the path
|
||||
// can be a file or empty directory.
|
||||
if (!listFilesAssumingDir.hasNext()) {
|
||||
// If file status was already passed, reuse it.
|
||||
final S3AFileStatus fileStatus = status != null
|
||||
? status
|
||||
: (S3AFileStatus) getFileStatus(path);
|
||||
if (fileStatus.isFile()) {
|
||||
return new Listing.SingleStatusRemoteIterator(
|
||||
toLocatedFileStatus(fileStatus));
|
||||
}
|
||||
}
|
||||
// If we have reached here, it means either there are files
|
||||
// in this directory or it is empty.
|
||||
return listFilesAssumingDir;
|
||||
} catch (AmazonClientException e) {
|
||||
// TODO S3Guard: retry on file not found exception
|
||||
throw translateException("listFiles", path, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* List files under a path assuming the path to be a directory.
|
||||
* @param path input path.
|
||||
* @param recursive recursive listing?
|
||||
* @param acceptor file status filter
|
||||
* @param collectTombstones should tombstones be collected from S3Guard?
|
||||
* @param forceNonAuthoritativeMS forces metadata store to act like non
|
||||
* authoritative. This is useful when
|
||||
* listFiles output is used by import tool.
|
||||
* @return an iterator over listing.
|
||||
* @throws IOException any exception.
|
||||
*/
|
||||
private RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
|
||||
Path path,
|
||||
boolean recursive, Listing.FileStatusAcceptor acceptor,
|
||||
boolean collectTombstones,
|
||||
boolean forceNonAuthoritativeMS) throws IOException {
|
||||
|
||||
String key = maybeAddTrailingSlash(pathToKey(path));
|
||||
String delimiter = recursive ? null : "/";
|
||||
LOG.debug("Requesting all entries under {} with delimiter '{}'",
|
||||
key, delimiter);
|
||||
final RemoteIterator<S3AFileStatus> cachedFilesIterator;
|
||||
final Set<Path> tombstones;
|
||||
boolean allowAuthoritative = allowAuthoritative(path);
|
||||
if (recursive) {
|
||||
final PathMetadata pm = metadataStore.get(path, true);
|
||||
if (pm != null) {
|
||||
if (pm.isDeleted()) {
|
||||
OffsetDateTime deletedAt = OffsetDateTime
|
||||
.ofInstant(Instant.ofEpochMilli(
|
||||
pm.getFileStatus().getModificationTime()),
|
||||
ZoneOffset.UTC);
|
||||
throw new FileNotFoundException("Path " + path + " is recorded as " +
|
||||
"deleted by S3Guard at " + deletedAt);
|
||||
}
|
||||
}
|
||||
MetadataStoreListFilesIterator metadataStoreListFilesIterator =
|
||||
new MetadataStoreListFilesIterator(metadataStore, pm,
|
||||
allowAuthoritative);
|
||||
tombstones = metadataStoreListFilesIterator.listTombstones();
|
||||
// if all of the below is true
|
||||
// - authoritative access is allowed for this metadatastore
|
||||
// for this directory,
|
||||
// - all the directory listings are authoritative on the client
|
||||
// - the caller does not force non-authoritative access
|
||||
// return the listing without any further s3 access
|
||||
if (!forceNonAuthoritativeMS &&
|
||||
allowAuthoritative &&
|
||||
metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
|
||||
S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
|
||||
metadataStoreListFilesIterator, tombstones);
|
||||
cachedFilesIterator = listing.createProvidedFileStatusIterator(
|
||||
statuses, ACCEPT_ALL, acceptor);
|
||||
return listing.createLocatedFileStatusIterator(cachedFilesIterator);
|
||||
}
|
||||
cachedFilesIterator = metadataStoreListFilesIterator;
|
||||
} else {
|
||||
DirListingMetadata meta =
|
||||
S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
|
||||
allowAuthoritative);
|
||||
if (meta != null) {
|
||||
tombstones = meta.listTombstones();
|
||||
} else {
|
||||
tombstones = null;
|
||||
}
|
||||
cachedFilesIterator = listing.createProvidedFileStatusIterator(
|
||||
S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
|
||||
if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
|
||||
// metadata listing is authoritative, so return it directly
|
||||
return listing.createLocatedFileStatusIterator(cachedFilesIterator);
|
||||
}
|
||||
}
|
||||
return listing.createTombstoneReconcilingIterator(
|
||||
listing.createLocatedFileStatusIterator(
|
||||
listing.createFileStatusListingIterator(path,
|
||||
createListObjectsRequest(key, delimiter),
|
||||
ACCEPT_ALL,
|
||||
acceptor,
|
||||
cachedFilesIterator)),
|
||||
collectTombstones ? tombstones : null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Override superclass so as to add statistic collection.
|
||||
* {@inheritDoc}
|
||||
|
@ -168,6 +168,76 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCostOfListFilesOnFile() throws Throwable {
|
||||
describe("Performing listFiles() on a file");
|
||||
Path file = path(getMethodName() + ".txt");
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
touch(fs, file);
|
||||
resetMetricDiffs();
|
||||
fs.listFiles(file, true);
|
||||
if (!fs.hasMetadataStore()) {
|
||||
metadataRequests.assertDiffEquals(1);
|
||||
} else {
|
||||
if (fs.allowAuthoritative(file)) {
|
||||
listRequests.assertDiffEquals(0);
|
||||
} else {
|
||||
listRequests.assertDiffEquals(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCostOfListFilesOnEmptyDir() throws Throwable {
|
||||
describe("Performing listFiles() on an empty dir");
|
||||
Path dir = path(getMethodName());
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
fs.mkdirs(dir);
|
||||
resetMetricDiffs();
|
||||
fs.listFiles(dir, true);
|
||||
if (!fs.hasMetadataStore()) {
|
||||
verifyOperationCount(2, 1);
|
||||
} else {
|
||||
if (fs.allowAuthoritative(dir)) {
|
||||
verifyOperationCount(0, 0);
|
||||
} else {
|
||||
verifyOperationCount(0, 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCostOfListFilesOnNonEmptyDir() throws Throwable {
|
||||
describe("Performing listFiles() on a non empty dir");
|
||||
Path dir = path(getMethodName());
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
fs.mkdirs(dir);
|
||||
Path file = new Path(dir, "file.txt");
|
||||
touch(fs, file);
|
||||
resetMetricDiffs();
|
||||
fs.listFiles(dir, true);
|
||||
if (!fs.hasMetadataStore()) {
|
||||
verifyOperationCount(0, 1);
|
||||
} else {
|
||||
if (fs.allowAuthoritative(dir)) {
|
||||
verifyOperationCount(0, 0);
|
||||
} else {
|
||||
verifyOperationCount(0, 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCostOfListFilesOnNonExistingDir() throws Throwable {
|
||||
describe("Performing listFiles() on a non existing dir");
|
||||
Path dir = path(getMethodName());
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
resetMetricDiffs();
|
||||
intercept(FileNotFoundException.class,
|
||||
() -> fs.listFiles(dir, true));
|
||||
verifyOperationCount(2, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCostOfGetFileStatusOnFile() throws Throwable {
|
||||
describe("performing getFileStatus on a file");
|
||||
|
@ -28,6 +28,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.contract.s3a.S3AContract;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
@ -271,7 +272,10 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
|
||||
assertTrue(fs.delete(testDirs[1], false));
|
||||
assertTrue(fs.delete(testDirs[2], false));
|
||||
|
||||
fs.rename(path("a"), path("a3"));
|
||||
ContractTestUtils.rename(fs, path("a"), path("a3"));
|
||||
ContractTestUtils.assertPathsDoNotExist(fs,
|
||||
"Source paths shouldn't exist post rename operation",
|
||||
testDirs[0], testDirs[1], testDirs[2]);
|
||||
FileStatus[] paths = fs.listStatus(path("a3/b"));
|
||||
List<Path> list = new ArrayList<>();
|
||||
for (FileStatus fileState : paths) {
|
||||
|
@ -397,6 +397,10 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
|
||||
}
|
||||
forbidden("",
|
||||
() -> fs.listStatus(ROOT));
|
||||
forbidden("",
|
||||
() -> fs.listFiles(ROOT, true));
|
||||
forbidden("",
|
||||
() -> fs.listLocatedStatus(ROOT));
|
||||
forbidden("",
|
||||
() -> fs.mkdirs(path("testAssumeRoleFS")));
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user