diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index b5056d1d23..7ffc2adb46 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1502,12 +1502,10 @@ - fs.s3a.metadatastore.authoritative.dir.ttl - 3600000 + fs.s3a.metadatastore.metadata.ttl + 15m - This value sets how long a directory listing in the MS is considered as - authoritative. The value is in milliseconds. - MetadataStore should be authoritative to use this configuration knob. + This value sets how long an entry in a MetadataStore is valid. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index a8dc161e5e..7334506367 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -353,10 +353,14 @@ private Constants() { /** * How long a directory listing in the MS is considered as authoritative. */ - public static final String METADATASTORE_AUTHORITATIVE_DIR_TTL = - "fs.s3a.metadatastore.authoritative.dir.ttl"; - public static final long DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL = - TimeUnit.MINUTES.toMillis(60); + public static final String METADATASTORE_METADATA_TTL = + "fs.s3a.metadatastore.metadata.ttl"; + + /** + * Default TTL in milliseconds: 15 minutes. + */ + public static final long DEFAULT_METADATASTORE_METADATA_TTL = + TimeUnit.MINUTES.toMillis(15); /** read ahead buffer size to prevent connection re-establishments. */ public static final String READAHEAD_RANGE = "fs.s3a.readahead.range"; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java index 0f8c52be18..b62c4569b6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -464,7 +464,7 @@ private boolean buildNextStatusBatch(S3ListResult objects) { if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) { S3AFileStatus status = createFileStatus(keyPath, summary, owner.getDefaultBlockSize(keyPath), owner.getUsername(), - null, null); + summary.getETag(), null); LOG.debug("Adding: {}", status); stats.add(status); added++; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index e6850e9e7c..4bd58d5136 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -126,6 +126,7 @@ import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; import org.apache.hadoop.fs.s3a.s3guard.PathMetadata; import org.apache.hadoop.fs.s3a.s3guard.S3Guard; +import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider; import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.fs.store.EtagChecksum; @@ -244,7 +245,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private AWSCredentialProviderList credentials; - private S3Guard.ITtlTimeProvider ttlTimeProvider; + private ITtlTimeProvider ttlTimeProvider; /** Add any deprecated keys. */ @SuppressWarnings("deprecation") @@ -388,9 +389,11 @@ public void initialize(URI name, Configuration originalConf) getMetadataStore(), allowAuthoritative); } initMultipartUploads(conf); - long authDirTtl = conf.getLong(METADATASTORE_AUTHORITATIVE_DIR_TTL, - DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL); - ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl); + if (hasMetadataStore()) { + long authDirTtl = conf.getTimeDuration(METADATASTORE_METADATA_TTL, + DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS); + ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl); + } } catch (AmazonClientException e) { throw translateException("initializing ", new Path(name), e); } @@ -1341,7 +1344,7 @@ childDst, length, getDefaultBlockSize(childDst), username, } } - metadataStore.move(srcPaths, dstMetas); + metadataStore.move(srcPaths, dstMetas, ttlTimeProvider); if (!src.getParent().equals(dst.getParent())) { LOG.debug("source & dest parents are different; fix up dir markers"); @@ -1722,7 +1725,7 @@ void deleteObjectAtPath(Path f, String key, boolean isFile) instrumentation.directoryDeleted(); } deleteObject(key); - metadataStore.delete(f); + metadataStore.delete(f, ttlTimeProvider); } /** @@ -2143,7 +2146,7 @@ private boolean innerDelete(S3AFileStatus status, boolean recursive) } } } - metadataStore.deleteSubtree(f); + metadataStore.deleteSubtree(f, ttlTimeProvider); } else { LOG.debug("delete: Path is a file"); deleteObjectAtPath(f, key, true); @@ -2466,7 +2469,10 @@ S3AFileStatus innerGetFileStatus(final Path f, LOG.debug("Getting path status for {} ({})", path, key); // Check MetadataStore, if any. - PathMetadata pm = metadataStore.get(path, needEmptyDirectoryFlag); + PathMetadata pm = null; + if (hasMetadataStore()) { + pm = S3Guard.getWithTtl(metadataStore, path, ttlTimeProvider); + } Set tombstones = Collections.emptySet(); if (pm != null) { if (pm.isDeleted()) { @@ -2501,7 +2507,7 @@ S3AFileStatus innerGetFileStatus(final Path f, LOG.debug("S3Guard metadata for {} is outdated, updating it", path); return S3Guard.putAndReturn(metadataStore, s3AFileStatus, - instrumentation); + instrumentation, ttlTimeProvider); } } } @@ -2534,12 +2540,14 @@ S3AFileStatus innerGetFileStatus(final Path f, null, null); } // entry was found, save in S3Guard - return S3Guard.putAndReturn(metadataStore, s3FileStatus, instrumentation); + return S3Guard.putAndReturn(metadataStore, s3FileStatus, + instrumentation, ttlTimeProvider); } else { // there was no entry in S3Guard // retrieve the data and update the metadata store in the process. return S3Guard.putAndReturn(metadataStore, - s3GetFileStatus(path, key, tombstones), instrumentation); + s3GetFileStatus(path, key, tombstones), instrumentation, + ttlTimeProvider); } } @@ -3191,11 +3199,12 @@ void finishedWrite(String key, long length, String eTag, String versionId) // See note about failure semantics in S3Guard documentation try { if (hasMetadataStore()) { - S3Guard.addAncestors(metadataStore, p, username); + S3Guard.addAncestors(metadataStore, p, username, ttlTimeProvider); S3AFileStatus status = createUploadFileStatus(p, S3AUtils.objectRepresentsDirectory(key, length), length, getDefaultBlockSize(p), username, eTag, versionId); - S3Guard.putAndReturn(metadataStore, status, instrumentation); + S3Guard.putAndReturn(metadataStore, status, instrumentation, + ttlTimeProvider); } } catch (IOException e) { if (failOnMetadataWriteError) { @@ -3860,12 +3869,12 @@ public AWSCredentialProviderList shareCredentials(final String purpose) { } @VisibleForTesting - protected S3Guard.ITtlTimeProvider getTtlTimeProvider() { + public ITtlTimeProvider getTtlTimeProvider() { return ttlTimeProvider; } @VisibleForTesting - protected void setTtlTimeProvider(S3Guard.ITtlTimeProvider ttlTimeProvider) { + protected void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) { this.ttlTimeProvider = ttlTimeProvider; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index fa1a203fc7..f668c6affd 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -189,8 +189,10 @@ * directory helps prevent unnecessary queries during traversal of an entire * sub-tree. * - * Some mutating operations, notably {@link #deleteSubtree(Path)} and - * {@link #move(Collection, Collection)}, are less efficient with this schema. + * Some mutating operations, notably + * {@link MetadataStore#deleteSubtree(Path, ITtlTimeProvider)} and + * {@link MetadataStore#move(Collection, Collection, ITtlTimeProvider)}, + * are less efficient with this schema. * They require mutating multiple items in the DynamoDB table. * * By default, DynamoDB access is performed within the same AWS region as @@ -471,14 +473,15 @@ private void initDataAccessRetries(Configuration config) { @Override @Retries.RetryTranslated - public void delete(Path path) throws IOException { - innerDelete(path, true); + public void delete(Path path, ITtlTimeProvider ttlTimeProvider) + throws IOException { + innerDelete(path, true, ttlTimeProvider); } @Override @Retries.RetryTranslated public void forgetMetadata(Path path) throws IOException { - innerDelete(path, false); + innerDelete(path, false, null); } /** @@ -487,10 +490,13 @@ public void forgetMetadata(Path path) throws IOException { * There is no check as to whether the entry exists in the table first. * @param path path to delete * @param tombstone flag to create a tombstone marker + * @param ttlTimeProvider The time provider to set last_updated. Must not + * be null if tombstone is true. * @throws IOException I/O error. */ @Retries.RetryTranslated - private void innerDelete(final Path path, boolean tombstone) + private void innerDelete(final Path path, boolean tombstone, + ITtlTimeProvider ttlTimeProvider) throws IOException { checkPath(path); LOG.debug("Deleting from table {} in region {}: {}", @@ -505,8 +511,13 @@ private void innerDelete(final Path path, boolean tombstone) // on that of S3A itself boolean idempotent = S3AFileSystem.DELETE_CONSIDERED_IDEMPOTENT; if (tombstone) { + Preconditions.checkArgument(ttlTimeProvider != null, "ttlTimeProvider " + + "must not be null"); + final PathMetadata pmTombstone = PathMetadata.tombstone(path); + // update the last updated field of record when putting a tombstone + pmTombstone.setLastUpdated(ttlTimeProvider.getNow()); Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem( - new DDBPathMetadata(PathMetadata.tombstone(path))); + new DDBPathMetadata(pmTombstone)); writeOp.retry( "Put tombstone", path.toString(), @@ -524,7 +535,8 @@ private void innerDelete(final Path path, boolean tombstone) @Override @Retries.RetryTranslated - public void deleteSubtree(Path path) throws IOException { + public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider) + throws IOException { checkPath(path); LOG.debug("Deleting subtree from table {} in region {}: {}", tableName, region, path); @@ -537,7 +549,7 @@ public void deleteSubtree(Path path) throws IOException { for (DescendantsIterator desc = new DescendantsIterator(this, meta); desc.hasNext();) { - innerDelete(desc.next().getPath(), true); + innerDelete(desc.next().getPath(), true, ttlTimeProvider); } } @@ -731,7 +743,8 @@ Collection completeAncestry( @Override @Retries.RetryTranslated public void move(Collection pathsToDelete, - Collection pathsToCreate) throws IOException { + Collection pathsToCreate, ITtlTimeProvider ttlTimeProvider) + throws IOException { if (pathsToDelete == null && pathsToCreate == null) { return; } @@ -754,7 +767,11 @@ public void move(Collection pathsToDelete, } if (pathsToDelete != null) { for (Path meta : pathsToDelete) { - newItems.add(new DDBPathMetadata(PathMetadata.tombstone(meta))); + Preconditions.checkArgument(ttlTimeProvider != null, "ttlTimeProvider" + + " must not be null"); + final PathMetadata pmTombstone = PathMetadata.tombstone(meta); + pmTombstone.setLastUpdated(ttlTimeProvider.getNow()); + newItems.add(new DDBPathMetadata(pmTombstone)); } } @@ -1024,14 +1041,37 @@ public void destroy() throws IOException { } @Retries.RetryTranslated - private ItemCollection expiredFiles(long modTime, - String keyPrefix) throws IOException { - String filterExpression = - "mod_time < :mod_time and begins_with(parent, :parent)"; - String projectionExpression = "parent,child"; - ValueMap map = new ValueMap() - .withLong(":mod_time", modTime) - .withString(":parent", keyPrefix); + private ItemCollection expiredFiles(PruneMode pruneMode, + long cutoff, String keyPrefix) throws IOException { + + String filterExpression; + String projectionExpression; + ValueMap map; + + switch (pruneMode) { + case ALL_BY_MODTIME: + filterExpression = + "mod_time < :mod_time and begins_with(parent, :parent)"; + projectionExpression = "parent,child"; + map = new ValueMap() + .withLong(":mod_time", cutoff) + .withString(":parent", keyPrefix); + break; + case TOMBSTONES_BY_LASTUPDATED: + filterExpression = + "last_updated < :last_updated and begins_with(parent, :parent) " + + "and is_deleted = :is_deleted"; + projectionExpression = "parent,child"; + map = new ValueMap() + .withLong(":last_updated", cutoff) + .withString(":parent", keyPrefix) + .withBoolean(":is_deleted", true); + break; + default: + throw new UnsupportedOperationException("Unsupported prune mode: " + + pruneMode); + } + return readOp.retry( "scan", keyPrefix, @@ -1041,20 +1081,31 @@ private ItemCollection expiredFiles(long modTime, @Override @Retries.RetryTranslated - public void prune(long modTime) throws IOException { - prune(modTime, "/"); + public void prune(PruneMode pruneMode, long cutoff) throws IOException { + prune(pruneMode, cutoff, "/"); } /** * Prune files, in batches. There's a sleep between each batch. - * @param modTime Oldest modification time to allow + * + * @param pruneMode The mode of operation for the prune For details see + * {@link MetadataStore#prune(PruneMode, long)} + * @param cutoff Oldest modification time to allow * @param keyPrefix The prefix for the keys that should be removed * @throws IOException Any IO/DDB failure. * @throws InterruptedIOException if the prune was interrupted */ @Override @Retries.RetryTranslated - public void prune(long modTime, String keyPrefix) throws IOException { + public void prune(PruneMode pruneMode, long cutoff, String keyPrefix) + throws IOException { + final ItemCollection items = + expiredFiles(pruneMode, cutoff, keyPrefix); + innerPrune(items); + } + + private void innerPrune(ItemCollection items) + throws IOException { int itemCount = 0; try { Collection deletionBatch = @@ -1064,7 +1115,7 @@ public void prune(long modTime, String keyPrefix) throws IOException { S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT, TimeUnit.MILLISECONDS); Set parentPathSet = new HashSet<>(); - for (Item item : expiredFiles(modTime, keyPrefix)) { + for (Item item : items) { DDBPathMetadata md = PathMetadataDynamoDBTranslation .itemToPathMetadata(item, username); Path path = md.getFileStatus().getPath(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/ITtlTimeProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/ITtlTimeProvider.java new file mode 100644 index 0000000000..daee6211b4 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/ITtlTimeProvider.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +/** + * This interface is defined for handling TTL expiry of metadata in S3Guard. + * + * TTL can be tested by implementing this interface and setting is as + * {@code S3Guard.ttlTimeProvider}. By doing this, getNow() can return any + * value preferred and flaky tests could be avoided. By default getNow() + * returns the EPOCH in runtime. + * + * Time is measured in milliseconds, + */ +public interface ITtlTimeProvider { + long getNow(); + long getMetadataTtl(); +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java index 9276388679..6c13cd151d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java @@ -112,32 +112,34 @@ public String toString() { } @Override - public void delete(Path p) throws IOException { - doDelete(p, false, true); + public void delete(Path p, ITtlTimeProvider ttlTimeProvider) + throws IOException { + doDelete(p, false, true, ttlTimeProvider); } @Override public void forgetMetadata(Path p) throws IOException { - doDelete(p, false, false); + doDelete(p, false, false, null); } @Override - public void deleteSubtree(Path path) throws IOException { - doDelete(path, true, true); + public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider) + throws IOException { + doDelete(path, true, true, ttlTimeProvider); } - private synchronized void doDelete(Path p, boolean recursive, boolean - tombstone) { + private synchronized void doDelete(Path p, boolean recursive, + boolean tombstone, ITtlTimeProvider ttlTimeProvider) { Path path = standardize(p); // Delete entry from file cache, then from cached parent directory, if any - deleteCacheEntries(path, tombstone); + deleteCacheEntries(path, tombstone, ttlTimeProvider); if (recursive) { // Remove all entries that have this dir as path prefix. - deleteEntryByAncestor(path, localCache, tombstone); + deleteEntryByAncestor(path, localCache, tombstone, ttlTimeProvider); } } @@ -191,7 +193,8 @@ public synchronized DirListingMetadata listChildren(Path p) throws @Override public void move(Collection pathsToDelete, - Collection pathsToCreate) throws IOException { + Collection pathsToCreate, + ITtlTimeProvider ttlTimeProvider) throws IOException { LOG.info("Move {} to {}", pathsToDelete, pathsToCreate); Preconditions.checkNotNull(pathsToDelete, "pathsToDelete is null"); @@ -205,7 +208,7 @@ public void move(Collection pathsToDelete, // 1. Delete pathsToDelete for (Path meta : pathsToDelete) { LOG.debug("move: deleting metadata {}", meta); - delete(meta); + delete(meta, ttlTimeProvider); } // 2. Create new destination path metadata @@ -332,18 +335,19 @@ public void destroy() throws IOException { } @Override - public void prune(long modTime) throws IOException{ - prune(modTime, ""); + public void prune(PruneMode pruneMode, long cutoff) throws IOException{ + prune(pruneMode, cutoff, ""); } @Override - public synchronized void prune(long modTime, String keyPrefix) { + public synchronized void prune(PruneMode pruneMode, long cutoff, + String keyPrefix) { // prune files // filter path_metadata (files), filter expired, remove expired localCache.asMap().entrySet().stream() .filter(entry -> entry.getValue().hasPathMeta()) - .filter(entry -> expired( - entry.getValue().getFileMeta().getFileStatus(), modTime, keyPrefix)) + .filter(entry -> expired(pruneMode, + entry.getValue().getFileMeta(), cutoff, keyPrefix)) .forEach(entry -> localCache.invalidate(entry.getKey())); @@ -358,28 +362,37 @@ public synchronized void prune(long modTime, String keyPrefix) { Collection newChildren = new LinkedList<>(); for (PathMetadata child : oldChildren) { - FileStatus status = child.getFileStatus(); - if (!expired(status, modTime, keyPrefix)) { + if (!expired(pruneMode, child, cutoff, keyPrefix)) { newChildren.add(child); } } - if (newChildren.size() != oldChildren.size()) { - DirListingMetadata dlm = - new DirListingMetadata(path, newChildren, false); - localCache.put(path, new LocalMetadataEntry(dlm)); - if (!path.isRoot()) { - DirListingMetadata parent = getDirListingMeta(path.getParent()); - if (parent != null) { - parent.setAuthoritative(false); - } - } - } + removeAuthoritativeFromParent(path, oldChildren, newChildren); }); } - private boolean expired(FileStatus status, long expiry, String keyPrefix) { + private void removeAuthoritativeFromParent(Path path, + Collection oldChildren, + Collection newChildren) { + if (newChildren.size() != oldChildren.size()) { + DirListingMetadata dlm = + new DirListingMetadata(path, newChildren, false); + localCache.put(path, new LocalMetadataEntry(dlm)); + if (!path.isRoot()) { + DirListingMetadata parent = getDirListingMeta(path.getParent()); + if (parent != null) { + parent.setAuthoritative(false); + } + } + } + } + + private boolean expired(PruneMode pruneMode, PathMetadata metadata, + long cutoff, String keyPrefix) { + final S3AFileStatus status = metadata.getFileStatus(); + final URI statusUri = status.getPath().toUri(); + // remove the protocol from path string to be able to compare - String bucket = status.getPath().toUri().getHost(); + String bucket = statusUri.getHost(); String statusTranslatedPath = ""; if(bucket != null && !bucket.isEmpty()){ // if there's a bucket, (well defined host in Uri) the pathToParentKey @@ -389,18 +402,33 @@ private boolean expired(FileStatus status, long expiry, String keyPrefix) { } else { // if there's no bucket in the path the pathToParentKey will fail, so // this is the fallback to get the path from status - statusTranslatedPath = status.getPath().toUri().getPath(); + statusTranslatedPath = statusUri.getPath(); } - // Note: S3 doesn't track modification time on directories, so for - // consistency with the DynamoDB implementation we ignore that here - return status.getModificationTime() < expiry && !status.isDirectory() - && statusTranslatedPath.startsWith(keyPrefix); + boolean expired; + switch (pruneMode) { + case ALL_BY_MODTIME: + // Note: S3 doesn't track modification time on directories, so for + // consistency with the DynamoDB implementation we ignore that here + expired = status.getModificationTime() < cutoff && !status.isDirectory() + && statusTranslatedPath.startsWith(keyPrefix); + break; + case TOMBSTONES_BY_LASTUPDATED: + expired = metadata.getLastUpdated() < cutoff && metadata.isDeleted() + && statusTranslatedPath.startsWith(keyPrefix); + break; + default: + throw new UnsupportedOperationException("Unsupported prune mode: " + + pruneMode); + } + + return expired; } @VisibleForTesting static void deleteEntryByAncestor(Path ancestor, - Cache cache, boolean tombstone) { + Cache cache, boolean tombstone, + ITtlTimeProvider ttlTimeProvider) { cache.asMap().entrySet().stream() .filter(entry -> isAncestorOf(ancestor, entry.getKey())) @@ -410,7 +438,9 @@ static void deleteEntryByAncestor(Path ancestor, if(meta.hasDirMeta()){ cache.invalidate(path); } else if(tombstone && meta.hasPathMeta()){ - meta.setPathMetadata(PathMetadata.tombstone(path)); + final PathMetadata pmTombstone = PathMetadata.tombstone(path); + pmTombstone.setLastUpdated(ttlTimeProvider.getNow()); + meta.setPathMetadata(pmTombstone); } else { cache.invalidate(path); } @@ -434,7 +464,8 @@ private static boolean isAncestorOf(Path ancestor, Path f) { * Update fileCache and dirCache to reflect deletion of file 'f'. Call with * lock held. */ - private void deleteCacheEntries(Path path, boolean tombstone) { + private void deleteCacheEntries(Path path, boolean tombstone, + ITtlTimeProvider ttlTimeProvider) { LocalMetadataEntry entry = localCache.getIfPresent(path); // If there's no entry, delete should silently succeed // (based on MetadataStoreTestBase#testDeleteNonExisting) @@ -448,6 +479,7 @@ private void deleteCacheEntries(Path path, boolean tombstone) { if(entry.hasPathMeta()){ if (tombstone) { PathMetadata pmd = PathMetadata.tombstone(path); + pmd.setLastUpdated(ttlTimeProvider.getNow()); entry.setPathMetadata(pmd); } else { entry.setPathMetadata(null); @@ -474,6 +506,7 @@ private void deleteCacheEntries(Path path, boolean tombstone) { LOG.debug("removing parent's entry for {} ", path); if (tombstone) { dir.markDeleted(path); + dir.setLastUpdated(ttlTimeProvider.getNow()); } else { dir.remove(path); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java index 746fd82950..7875d43d1e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java @@ -63,16 +63,23 @@ public interface MetadataStore extends Closeable { * Deletes exactly one path, leaving a tombstone to prevent lingering, * inconsistent copies of it from being listed. * + * Deleting an entry with a tombstone needs a + * {@link org.apache.hadoop.fs.s3a.s3guard.S3Guard.TtlTimeProvider} because + * the lastUpdated field of the record has to be updated to
now
. + * * @param path the path to delete + * @param ttlTimeProvider the time provider to set last_updated. Must not + * be null. * @throws IOException if there is an error */ - void delete(Path path) throws IOException; + void delete(Path path, ITtlTimeProvider ttlTimeProvider) + throws IOException; /** * Removes the record of exactly one path. Does not leave a tombstone (see - * {@link MetadataStore#delete(Path)}. It is currently intended for testing - * only, and a need to use it as part of normal FileSystem usage is not - * anticipated. + * {@link MetadataStore#delete(Path, ITtlTimeProvider)}. It is currently + * intended for testing only, and a need to use it as part of normal + * FileSystem usage is not anticipated. * * @param path the path to delete * @throws IOException if there is an error @@ -88,10 +95,17 @@ public interface MetadataStore extends Closeable { * implementations must also update any stored {@code DirListingMetadata} * objects which track the parent of this file. * + * Deleting a subtree with a tombstone needs a + * {@link org.apache.hadoop.fs.s3a.s3guard.S3Guard.TtlTimeProvider} because + * the lastUpdated field of all records have to be updated to
now
. + * * @param path the root of the sub-tree to delete + * @param ttlTimeProvider the time provider to set last_updated. Must not + * be null. * @throws IOException if there is an error */ - void deleteSubtree(Path path) throws IOException; + void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider) + throws IOException; /** * Gets metadata for a path. @@ -151,10 +165,13 @@ PathMetadata get(Path path, boolean wantEmptyDirectoryFlag) * @param pathsToCreate Collection of all PathMetadata for the new paths * that were created at the destination of the rename * (). + * @param ttlTimeProvider the time provider to set last_updated. Must not + * be null. * @throws IOException if there is an error */ void move(Collection pathsToDelete, - Collection pathsToCreate) throws IOException; + Collection pathsToCreate, + ITtlTimeProvider ttlTimeProvider) throws IOException; /** * Saves metadata for exactly one path. @@ -212,29 +229,54 @@ void move(Collection pathsToDelete, void destroy() throws IOException; /** - * Clear any metadata older than a specified time from the repository. - * Implementations MUST clear file metadata, and MAY clear directory metadata - * (s3a itself does not track modification time for directories). - * Implementations may also choose to throw UnsupportedOperationException - * istead. Note that modification times should be in UTC, as returned by - * System.currentTimeMillis at the time of modification. + * Prune method with two modes of operation: + *
    + *
  • + * {@link PruneMode#ALL_BY_MODTIME} + * Clear any metadata older than a specified mod_time from the store. + * Note that this modification time is the S3 modification time from the + * object's metadata - from the object store. + * Implementations MUST clear file metadata, and MAY clear directory + * metadata (s3a itself does not track modification time for directories). + * Implementations may also choose to throw UnsupportedOperationException + * instead. Note that modification times must be in UTC, as returned by + * System.currentTimeMillis at the time of modification. + *
  • + *
* - * @param modTime Oldest modification time to allow + *
    + *
  • + * {@link PruneMode#TOMBSTONES_BY_LASTUPDATED} + * Clear any tombstone updated earlier than a specified time from the + * store. Note that this last_updated is the time when the metadata + * entry was last updated and maintained by the metadata store. + * Implementations MUST clear file metadata, and MAY clear directory + * metadata (s3a itself does not track modification time for directories). + * Implementations may also choose to throw UnsupportedOperationException + * instead. Note that last_updated must be in UTC, as returned by + * System.currentTimeMillis at the time of modification. + *
  • + *
+ * + * @param pruneMode + * @param cutoff Oldest time to allow (UTC) * @throws IOException if there is an error * @throws UnsupportedOperationException if not implemented */ - void prune(long modTime) throws IOException, UnsupportedOperationException; + void prune(PruneMode pruneMode, long cutoff) throws IOException, + UnsupportedOperationException; /** - * Same as {@link MetadataStore#prune(long)}, but with an additional - * keyPrefix parameter to filter the pruned keys with a prefix. + * Same as {@link MetadataStore#prune(PruneMode, long)}, but with an + * additional keyPrefix parameter to filter the pruned keys with a prefix. * - * @param modTime Oldest modification time to allow + * @param pruneMode + * @param cutoff Oldest time to allow (UTC) * @param keyPrefix The prefix for the keys that should be removed * @throws IOException if there is an error * @throws UnsupportedOperationException if not implemented */ - void prune(long modTime, String keyPrefix) + void prune(PruneMode pruneMode, long cutoff, String keyPrefix) throws IOException, UnsupportedOperationException; /** @@ -252,4 +294,13 @@ void prune(long modTime, String keyPrefix) * @throws IOException if there is an error */ void updateParameters(Map parameters) throws IOException; + + /** + * Modes of operation for prune. + * For details see {@link MetadataStore#prune(PruneMode, long)} + */ + enum PruneMode { + ALL_BY_MODTIME, + TOMBSTONES_BY_LASTUPDATED + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java index 04704e7ea7..1472ef1a22 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java @@ -47,7 +47,8 @@ public void close() throws IOException { } @Override - public void delete(Path path) throws IOException { + public void delete(Path path, ITtlTimeProvider ttlTimeProvider) + throws IOException { } @Override @@ -55,7 +56,8 @@ public void forgetMetadata(Path path) throws IOException { } @Override - public void deleteSubtree(Path path) throws IOException { + public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider) + throws IOException { } @Override @@ -76,7 +78,8 @@ public DirListingMetadata listChildren(Path path) throws IOException { @Override public void move(Collection pathsToDelete, - Collection pathsToCreate) throws IOException { + Collection pathsToCreate, + ITtlTimeProvider ttlTimeProvider) throws IOException { } @Override @@ -96,11 +99,11 @@ public void destroy() throws IOException { } @Override - public void prune(long modTime) { + public void prune(PruneMode pruneMode, long cutoff) { } @Override - public void prune(long modTime, String keyPrefix) { + public void prune(PruneMode pruneMode, long cutoff, String keyPrefix) { } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java index 26c75e8213..933a01ced5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java @@ -25,9 +25,13 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import javax.annotation.Nullable; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.slf4j.Logger; @@ -46,6 +50,8 @@ import org.apache.hadoop.fs.s3a.Tristate; import org.apache.hadoop.util.ReflectionUtils; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL; +import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL; import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL; import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_PUT_PATH_LATENCY; import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_PUT_PATH_REQUEST; @@ -142,15 +148,17 @@ static Class getMetadataStoreClass( * @param ms MetadataStore to {@code put()} into. * @param status status to store * @param instrumentation instrumentation of the s3a file system + * @param timeProvider Time provider to use when writing entries * @return The same status as passed in * @throws IOException if metadata store update failed */ @RetryTranslated public static S3AFileStatus putAndReturn(MetadataStore ms, S3AFileStatus status, - S3AInstrumentation instrumentation) throws IOException { + S3AInstrumentation instrumentation, + ITtlTimeProvider timeProvider) throws IOException { long startTimeNano = System.nanoTime(); - ms.put(new PathMetadata(status)); + S3Guard.putWithTtl(ms, new PathMetadata(status), timeProvider); instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY, (System.nanoTime() - startTimeNano)); instrumentation.incrementCounter(S3GUARD_METADATASTORE_PUT_PATH_REQUEST, 1); @@ -196,7 +204,7 @@ public static S3AFileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) { * @param backingStatuses Directory listing from the backing store. * @param dirMeta Directory listing from MetadataStore. May be null. * @param isAuthoritative State of authoritative mode - * @param timeProvider Time provider for testing. + * @param timeProvider Time provider to use when updating entries * @return Final result of directory listing. * @throws IOException if metadata store update failed */ @@ -242,7 +250,7 @@ public static FileStatus[] dirListingUnion(MetadataStore ms, Path path, if (status != null && s.getModificationTime() > status.getModificationTime()) { LOG.debug("Update ms with newer metadata of: {}", status); - ms.put(new PathMetadata(s)); + S3Guard.putWithTtl(ms, new PathMetadata(s), timeProvider); } } @@ -357,7 +365,7 @@ public static void makeDirsOrdered(MetadataStore ms, List dirs, } // Batched put - ms.put(pathMetas); + S3Guard.putWithTtl(ms, pathMetas, timeProvider); } catch (IOException ioe) { LOG.error("MetadataStore#put() failure:", ioe); } @@ -462,7 +470,8 @@ public static void addMoveAncestors(MetadataStore ms, } public static void addAncestors(MetadataStore metadataStore, - Path qualifiedPath, String username) throws IOException { + Path qualifiedPath, String username, ITtlTimeProvider timeProvider) + throws IOException { Collection newDirs = new ArrayList<>(); Path parent = qualifiedPath.getParent(); while (!parent.isRoot()) { @@ -476,7 +485,7 @@ public static void addAncestors(MetadataStore metadataStore, } parent = parent.getParent(); } - metadataStore.put(newDirs); + S3Guard.putWithTtl(metadataStore, newDirs, timeProvider); } private static void addMoveStatus(Collection srcPaths, @@ -513,17 +522,6 @@ public static void assertQualified(Path...paths) { } } - /** - * This interface is defined for testing purposes. - * TTL can be tested by implementing this interface and setting is as - * {@code S3Guard.ttlTimeProvider}. By doing this, getNow() can return any - * value preferred and flaky tests could be avoided. - */ - public interface ITtlTimeProvider { - long getNow(); - long getAuthoritativeDirTtl(); - } - /** * Runtime implementation for TTL Time Provider interface. */ @@ -534,34 +532,127 @@ public TtlTimeProvider(long authoritativeDirTtl) { this.authoritativeDirTtl = authoritativeDirTtl; } + public TtlTimeProvider(Configuration conf) { + this.authoritativeDirTtl = + conf.getTimeDuration(METADATASTORE_METADATA_TTL, + DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS); + } + @Override public long getNow() { return System.currentTimeMillis(); } - @Override public long getAuthoritativeDirTtl() { + @Override public long getMetadataTtl() { return authoritativeDirTtl; } + + @Override + public boolean equals(final Object o) { + if (this == o) { return true; } + if (o == null || getClass() != o.getClass()) { return false; } + final TtlTimeProvider that = (TtlTimeProvider) o; + return authoritativeDirTtl == that.authoritativeDirTtl; + } + + @Override + public int hashCode() { + return Objects.hash(authoritativeDirTtl); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "TtlTimeProvider{"); + sb.append("authoritativeDirTtl=").append(authoritativeDirTtl); + sb.append(" millis}"); + return sb.toString(); + } } public static void putWithTtl(MetadataStore ms, DirListingMetadata dirMeta, ITtlTimeProvider timeProvider) throws IOException { dirMeta.setLastUpdated(timeProvider.getNow()); + dirMeta.getListing() + .forEach(pm -> pm.setLastUpdated(timeProvider.getNow())); ms.put(dirMeta); } - public static DirListingMetadata listChildrenWithTtl(MetadataStore ms, - Path path, ITtlTimeProvider timeProvider) - throws IOException { - long ttl = timeProvider.getAuthoritativeDirTtl(); + public static void putWithTtl(MetadataStore ms, PathMetadata fileMeta, + @Nullable ITtlTimeProvider timeProvider) throws IOException { + if (timeProvider != null) { + fileMeta.setLastUpdated(timeProvider.getNow()); + } else { + LOG.debug("timeProvider is null, put {} without setting last_updated", + fileMeta); + } + ms.put(fileMeta); + } + public static void putWithTtl(MetadataStore ms, + Collection fileMetas, + @Nullable ITtlTimeProvider timeProvider) + throws IOException { + if (timeProvider != null) { + final long now = timeProvider.getNow(); + fileMetas.forEach(fileMeta -> fileMeta.setLastUpdated(now)); + } else { + LOG.debug("timeProvider is null, put {} without setting last_updated", + fileMetas); + } + ms.put(fileMetas); + } + + public static PathMetadata getWithTtl(MetadataStore ms, Path path, + @Nullable ITtlTimeProvider timeProvider) throws IOException { + final PathMetadata pathMetadata = ms.get(path); + // if timeProvider is null let's return with what the ms has + if (timeProvider == null) { + LOG.debug("timeProvider is null, returning pathMetadata as is"); + return pathMetadata; + } + + long ttl = timeProvider.getMetadataTtl(); + + if (pathMetadata != null) { + // Special case: the pathmetadata's last updated is 0. This can happen + // eg. with an old db using this implementation + if (pathMetadata.getLastUpdated() == 0) { + LOG.debug("PathMetadata TTL for {} is 0, so it will be returned as " + + "not expired."); + return pathMetadata; + } + + if (!pathMetadata.isExpired(ttl, timeProvider.getNow())) { + return pathMetadata; + } else { + LOG.debug("PathMetadata TTl for {} is expired in metadata store.", + path); + return null; + } + } + + return null; + } + + public static DirListingMetadata listChildrenWithTtl(MetadataStore ms, + Path path, @Nullable ITtlTimeProvider timeProvider) + throws IOException { DirListingMetadata dlm = ms.listChildren(path); - if(dlm != null && dlm.isAuthoritative() + if (timeProvider == null) { + LOG.debug("timeProvider is null, returning DirListingMetadata as is"); + return dlm; + } + + long ttl = timeProvider.getMetadataTtl(); + + if (dlm != null && dlm.isAuthoritative() && dlm.isExpired(ttl, timeProvider.getNow())) { dlm.setAuthoritative(false); } return dlm; } + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java index 397a9cba67..dedb84931a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java @@ -707,7 +707,8 @@ private void putParentsIfNotPresent(FileStatus f) throws IOException { } S3AFileStatus dir = DynamoDBMetadataStore.makeDirStatus(parent, f.getOwner()); - getStore().put(new PathMetadata(dir)); + S3Guard.putWithTtl(getStore(), new PathMetadata(dir), + getFilesystem().getTtlTimeProvider()); dirCache.add(parent); parent = parent.getParent(); } @@ -741,7 +742,8 @@ private long importDir(FileStatus status) throws IOException { located.getVersionId()); } putParentsIfNotPresent(child); - getStore().put(new PathMetadata(child)); + S3Guard.putWithTtl(getStore(), new PathMetadata(child), + getFilesystem().getTtlTimeProvider()); items++; } return items; @@ -1073,7 +1075,8 @@ public int run(String[] args, PrintStream out) throws } try { - getStore().prune(divide, keyPrefix); + getStore().prune(MetadataStore.PruneMode.ALL_BY_MODTIME, divide, + keyPrefix); } catch (UnsupportedOperationException e){ errorln("Prune operation not supported in metadata store."); } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md index 94dc89b70d..337fc95b6c 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md @@ -181,8 +181,8 @@ removed on `S3AFileSystem` level. ```xml - fs.s3a.metadatastore.authoritative.dir.ttl - 3600000 + fs.s3a.metadatastore.metadata.ttl + 15m ``` diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java index 6dbe6f91d4..2af9a0ab73 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.junit.Assume; @@ -37,20 +38,32 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source; import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata; import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; +import org.apache.hadoop.fs.s3a.s3guard.PathMetadata; +import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; +import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL; +import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.test.LambdaTestUtils.eventually; import static org.junit.Assume.assumeTrue; +import static org.apache.hadoop.fs.contract.ContractTestUtils.readBytesToString; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE; import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL; import static org.apache.hadoop.fs.s3a.S3ATestUtils.metadataStorePersistsAuthoritativeBit; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * @@ -115,7 +128,7 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase { * Test array for parameterized test runs. * @return a list of parameter tuples. */ - @Parameterized.Parameters + @Parameterized.Parameters(name="auth={0}") public static Collection params() { return Arrays.asList(new Object[][]{ {true}, {false} @@ -190,8 +203,11 @@ private S3AFileSystem createGuardedFS(boolean authoritativeMode) URI uri = testFS.getUri(); removeBaseAndBucketOverrides(uri.getHost(), config, - METADATASTORE_AUTHORITATIVE); + METADATASTORE_AUTHORITATIVE, + METADATASTORE_METADATA_TTL); config.setBoolean(METADATASTORE_AUTHORITATIVE, authoritativeMode); + config.setLong(METADATASTORE_METADATA_TTL, + DEFAULT_METADATASTORE_METADATA_TTL); final S3AFileSystem gFs = createFS(uri, config); // set back the same metadata store instance gFs.setMetadataStore(realMs); @@ -271,6 +287,292 @@ public void testListingDelete() throws Exception { deleteFileInListing(); } + /** + * Tests that tombstone expiry is implemented, so if a file is created raw + * while the tombstone exist in ms for with the same name then S3Guard will + * check S3 for the file. + * + * Seq: create guarded; delete guarded; create raw (same path); read guarded; + * This will fail if no tombstone expiry is set + * + * @throws Exception + */ + @Test + public void testTombstoneExpiryGuardedDeleteRawCreate() throws Exception { + boolean allowAuthoritative = authoritative; + Path testFilePath = path("TEGDRC-" + UUID.randomUUID() + "/file"); + LOG.info("Allow authoritative param: {}", allowAuthoritative); + String originalText = "some test"; + String newText = "the new originalText for test"; + + final ITtlTimeProvider originalTimeProvider = + guardedFs.getTtlTimeProvider(); + try { + final AtomicLong now = new AtomicLong(1); + final AtomicLong metadataTtl = new AtomicLong(1); + + // SET TTL TIME PROVIDER FOR TESTING + ITtlTimeProvider testTimeProvider = + new ITtlTimeProvider() { + @Override public long getNow() { + return now.get(); + } + + @Override public long getMetadataTtl() { + return metadataTtl.get(); + } + }; + guardedFs.setTtlTimeProvider(testTimeProvider); + + // CREATE GUARDED + createAndAwaitFs(guardedFs, testFilePath, originalText); + + // DELETE GUARDED + deleteGuardedTombstoned(guardedFs, testFilePath, now); + + // CREATE RAW + createAndAwaitFs(rawFS, testFilePath, newText); + + // CHECK LISTING - THE FILE SHOULD NOT BE THERE, EVEN IF IT'S CREATED RAW + checkListingDoesNotContainPath(guardedFs, testFilePath); + + // CHANGE TTL SO ENTRY (& TOMBSTONE METADATA) WILL EXPIRE + long willExpire = now.get() + metadataTtl.get() + 1L; + now.set(willExpire); + LOG.info("willExpire: {}, ttlNow: {}; ttlTTL: {}", willExpire, + testTimeProvider.getNow(), testTimeProvider.getMetadataTtl()); + + // READ GUARDED + String newRead = readBytesToString(guardedFs, testFilePath, + newText.length()); + + // CHECK LISTING - THE FILE SHOULD BE THERE, TOMBSTONE EXPIRED + checkListingContainsPath(guardedFs, testFilePath); + + // we can assert that the originalText is the new one, which created raw + LOG.info("Old: {}, New: {}, Read: {}", originalText, newText, newRead); + assertEquals("The text should be modified with a new.", newText, + newRead); + } finally { + guardedFs.delete(testFilePath, true); + guardedFs.setTtlTimeProvider(originalTimeProvider); + } + } + + private void createAndAwaitFs(S3AFileSystem fs, Path testFilePath, + String text) throws Exception { + writeTextFile(fs, testFilePath, text, true); + final FileStatus newStatus = awaitFileStatus(fs, testFilePath); + assertNotNull("Newly created file status should not be null.", newStatus); + } + + private void deleteGuardedTombstoned(S3AFileSystem guarded, + Path testFilePath, AtomicLong now) throws Exception { + guarded.delete(testFilePath, true); + + final PathMetadata metadata = + guarded.getMetadataStore().get(testFilePath); + assertNotNull("Created file metadata should not be null in ms", + metadata); + assertEquals("Created file metadata last_updated should equal with " + + "mocked now", now.get(), metadata.getLastUpdated()); + + intercept(FileNotFoundException.class, testFilePath.toString(), + "This file should throw FNFE when reading through " + + "the guarded fs, and the metadatastore tombstoned the file.", + () -> guarded.getFileStatus(testFilePath)); + } + + /** + * createNonRecursive must fail if the parent directory has been deleted, + * and succeed if the tombstone has expired and the directory has been + * created out of band. + */ + @Test + public void testCreateNonRecursiveFailsIfParentDeleted() throws Exception { + LOG.info("Authoritative mode: {}", authoritative); + + String dirToDelete = methodName + UUID.randomUUID().toString(); + String fileToTry = dirToDelete + "/theFileToTry"; + + final Path dirPath = path(dirToDelete); + final Path filePath = path(fileToTry); + + // Create a directory with + ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class); + ITtlTimeProvider originalTimeProvider = guardedFs.getTtlTimeProvider(); + + try { + guardedFs.setTtlTimeProvider(mockTimeProvider); + when(mockTimeProvider.getNow()).thenReturn(100L); + when(mockTimeProvider.getMetadataTtl()).thenReturn(5L); + + // CREATE DIRECTORY + guardedFs.mkdirs(dirPath); + + // DELETE DIRECTORY + guardedFs.delete(dirPath, true); + + // WRITE TO DELETED DIRECTORY - FAIL + intercept(FileNotFoundException.class, + dirToDelete, + "createNonRecursive must fail if the parent directory has been deleted.", + () -> createNonRecursive(guardedFs, filePath)); + + // CREATE THE DIRECTORY RAW + rawFS.mkdirs(dirPath); + awaitFileStatus(rawFS, dirPath); + + // SET TIME SO METADATA EXPIRES + when(mockTimeProvider.getNow()).thenReturn(110L); + + // WRITE TO DELETED DIRECTORY - SUCCESS + createNonRecursive(guardedFs, filePath); + + } finally { + guardedFs.delete(filePath, true); + guardedFs.delete(dirPath, true); + guardedFs.setTtlTimeProvider(originalTimeProvider); + } + } + + /** + * When lastUpdated = 0 the entry should not expire. This is a special case + * eg. for old metadata entries + */ + @Test + public void testLastUpdatedZeroWontExpire() throws Exception { + LOG.info("Authoritative mode: {}", authoritative); + + String testFile = methodName + UUID.randomUUID().toString() + + "/theFileToTry"; + + long ttl = 10L; + final Path filePath = path(testFile); + + ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class); + ITtlTimeProvider originalTimeProvider = guardedFs.getTtlTimeProvider(); + + try { + guardedFs.setTtlTimeProvider(mockTimeProvider); + when(mockTimeProvider.getMetadataTtl()).thenReturn(ttl); + + // create a file while the NOW is 0, so it will set 0 as the last_updated + when(mockTimeProvider.getNow()).thenReturn(0L); + touch(guardedFs, filePath); + deleteFile(guardedFs, filePath); + + final PathMetadata pathMetadata = + guardedFs.getMetadataStore().get(filePath); + assertNotNull("pathMetadata should not be null after deleting with " + + "tombstones", pathMetadata); + assertEquals("pathMetadata lastUpdated field should be 0", 0, + pathMetadata.getLastUpdated()); + + // set the time, so the metadata would expire + when(mockTimeProvider.getNow()).thenReturn(2*ttl); + intercept(FileNotFoundException.class, filePath.toString(), + "This file should throw FNFE when reading through " + + "the guarded fs, and the metadatastore tombstoned the file. " + + "The tombstone won't expire if lastUpdated is set to 0.", + () -> guardedFs.getFileStatus(filePath)); + + } finally { + guardedFs.delete(filePath, true); + guardedFs.setTtlTimeProvider(originalTimeProvider); + } + } + + /** + * 1. File is deleted in the guarded fs. + * 2. File is replaced in the raw fs. + * 3. File is deleted in the guarded FS after the expiry time. + * 4. File MUST NOT exist in raw FS. + */ + @Test + public void deleteAfterTombstoneExpiryOobCreate() throws Exception { + LOG.info("Authoritative mode: {}", authoritative); + + String testFile = methodName + UUID.randomUUID().toString() + + "/theFileToTry"; + + long ttl = 10L; + final Path filePath = path(testFile); + + ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class); + ITtlTimeProvider originalTimeProvider = guardedFs.getTtlTimeProvider(); + + try { + guardedFs.setTtlTimeProvider(mockTimeProvider); + when(mockTimeProvider.getMetadataTtl()).thenReturn(ttl); + + // CREATE AND DELETE WITH GUARDED FS + when(mockTimeProvider.getNow()).thenReturn(100L); + touch(guardedFs, filePath); + deleteFile(guardedFs, filePath); + + final PathMetadata pathMetadata = + guardedFs.getMetadataStore().get(filePath); + assertNotNull("pathMetadata should not be null after deleting with " + + "tombstones", pathMetadata); + + // REPLACE WITH RAW FS + touch(rawFS, filePath); + awaitFileStatus(rawFS, filePath); + + // SET EXPIRY TIME, SO THE TOMBSTONE IS EXPIRED + when(mockTimeProvider.getNow()).thenReturn(100L + 2 * ttl); + + // DELETE IN GUARDED FS + guardedFs.delete(filePath, true); + + // FILE MUST NOT EXIST IN RAW + intercept(FileNotFoundException.class, filePath.toString(), + "This file should throw FNFE when reading through " + + "the raw fs, and the guarded fs deleted the file.", + () -> rawFS.getFileStatus(filePath)); + + } finally { + guardedFs.delete(filePath, true); + guardedFs.setTtlTimeProvider(originalTimeProvider); + } + } + + private void checkListingDoesNotContainPath(S3AFileSystem fs, Path filePath) + throws IOException { + final RemoteIterator listIter = + fs.listFiles(filePath.getParent(), false); + while (listIter.hasNext()) { + final LocatedFileStatus lfs = listIter.next(); + assertNotEquals("The tombstone has not been expired, so must not be" + + " listed.", filePath, lfs.getPath()); + } + LOG.info("{}; file omitted from listFiles listing as expected.", filePath); + + final FileStatus[] fileStatuses = fs.listStatus(filePath.getParent()); + for (FileStatus fileStatus : fileStatuses) { + assertNotEquals("The tombstone has not been expired, so must not be" + + " listed.", filePath, fileStatus.getPath()); + } + LOG.info("{}; file omitted from listStatus as expected.", filePath); + } + + private void checkListingContainsPath(S3AFileSystem fs, Path filePath) + throws IOException { + final RemoteIterator listIter = + fs.listFiles(filePath.getParent(), false); + + while (listIter.hasNext()) { + final LocatedFileStatus lfs = listIter.next(); + assertEquals(filePath, lfs.getPath()); + } + + final FileStatus[] fileStatuses = fs.listStatus(filePath.getParent()); + for (FileStatus fileStatus : fileStatuses) + assertEquals("The file should be listed in fs.listStatus", + filePath, fileStatus.getPath()); + } + /** * Perform an out-of-band delete. * @param testFilePath filename @@ -384,12 +686,18 @@ private void overwriteFileInListing(String firstText, String secondText) // Create initial statusIterator with guarded ms writeTextFile(guardedFs, testFilePath, firstText, true); // and cache the value for later - final FileStatus origStatus = awaitFileStatus(rawFS, testFilePath); + final S3AFileStatus origStatus = awaitFileStatus(rawFS, testFilePath); + assertNotNull("No etag in raw status " + origStatus, + origStatus.getETag()); // Do a listing to cache the lists. Should be authoritative if it's set. - final FileStatus[] origList = guardedFs.listStatus(testDirPath); + final S3AFileStatus[] origList = (S3AFileStatus[]) guardedFs.listStatus( + testDirPath); assertArraySize("Added one file to the new dir, so the number of " + "files in the dir should be one.", 1, origList); + S3AFileStatus origGuardedFileStatus = origList[0]; + assertNotNull("No etag in origGuardedFileStatus" + origGuardedFileStatus, + origGuardedFileStatus.getETag()); final DirListingMetadata dirListingMetadata = realMs.listChildren(guardedFs.qualify(testDirPath)); assertListingAuthority(allowAuthoritative, dirListingMetadata); @@ -406,7 +714,8 @@ private void overwriteFileInListing(String firstText, String secondText) final FileStatus rawFileStatus = awaitFileStatus(rawFS, testFilePath); // check listing in guarded store. - final FileStatus[] modList = guardedFs.listStatus(testDirPath); + final S3AFileStatus[] modList = (S3AFileStatus[]) guardedFs.listStatus( + testDirPath); assertArraySize("Added one file to the new dir then modified it, " + "so the number of files in the dir should be one.", 1, modList); @@ -479,6 +788,24 @@ private void verifyFileStatusAsExpected(final String firstText, expectedLength, guardedLength); } } + // check etag. This relies on first and second text being different. + final S3AFileStatus rawS3AFileStatus = (S3AFileStatus) rawFileStatus; + final S3AFileStatus guardedS3AFileStatus = (S3AFileStatus) + guardedFileStatus; + final S3AFileStatus origS3AFileStatus = (S3AFileStatus) origStatus; + assertNotEquals( + "raw status still no to date with changes" + stats, + origS3AFileStatus.getETag(), rawS3AFileStatus.getETag()); + if (allowAuthoritative) { + // expect the etag to be out of sync + assertNotEquals( + "etag in authoritative table with " + stats, + rawS3AFileStatus.getETag(), guardedS3AFileStatus.getETag()); + } else { + assertEquals( + "etag in non-authoritative table with " + stats, + rawS3AFileStatus.getETag(), guardedS3AFileStatus.getETag()); + } // Next: modification time. long rawModTime = rawFileStatus.getModificationTime(); long guardedModTime = guardedFileStatus.getModificationTime(); @@ -631,12 +958,18 @@ private void awaitDeletedFileDisappearance(final S3AFileSystem fs, * @return the file status. * @throws Exception failure */ - private FileStatus awaitFileStatus(S3AFileSystem fs, + private S3AFileStatus awaitFileStatus(S3AFileSystem fs, final Path testFilePath) throws Exception { - return eventually( + return (S3AFileStatus) eventually( STABILIZATION_TIME, PROBE_INTERVAL_MILLIS, () -> fs.getFileStatus(testFilePath)); } + private FSDataOutputStream createNonRecursive(FileSystem fs, Path path) + throws Exception { + return fs + .createNonRecursive(path, false, 4096, (short) 3, (short) 4096, null); + } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java index d24009cea2..962232239a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java @@ -18,13 +18,22 @@ package org.apache.hadoop.fs.s3a; +import java.util.Arrays; +import java.util.Collection; +import java.util.UUID; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata; +import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider; import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; import org.apache.hadoop.fs.s3a.s3guard.S3Guard; + import org.junit.Assume; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE; @@ -36,8 +45,37 @@ /** * These tests are testing the S3Guard TTL (time to live) features. */ +@RunWith(Parameterized.class) public class ITestS3GuardTtl extends AbstractS3ATestBase { + private final boolean authoritative; + + /** + * Test array for parameterized test runs. + * @return a list of parameter tuples. + */ + @Parameterized.Parameters + public static Collection params() { + return Arrays.asList(new Object[][]{ + {true}, {false} + }); + } + + /** + * By changing the method name, the thread name is changed and + * so you can see in the logs which mode is being tested. + * @return a string to use for the thread namer. + */ + @Override + protected String getMethodName() { + return super.getMethodName() + + (authoritative ? "-auth" : "-nonauth"); + } + + public ITestS3GuardTtl(boolean authoritative) { + this.authoritative = authoritative; + } + /** * Patch the configuration - this test needs disabled filesystem caching. * These tests modify the fs instance that would cause flaky tests. @@ -47,11 +85,15 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase { protected Configuration createConfiguration() { Configuration configuration = super.createConfiguration(); S3ATestUtils.disableFilesystemCaching(configuration); - return S3ATestUtils.prepareTestConfiguration(configuration); + configuration = + S3ATestUtils.prepareTestConfiguration(configuration); + configuration.setBoolean(METADATASTORE_AUTHORITATIVE, authoritative); + return configuration; } @Test public void testDirectoryListingAuthoritativeTtl() throws Exception { + LOG.info("Authoritative mode: {}", authoritative); final S3AFileSystem fs = getFileSystem(); Assume.assumeTrue(fs.hasMetadataStore()); @@ -64,12 +106,12 @@ public void testDirectoryListingAuthoritativeTtl() throws Exception { Assume.assumeTrue("MetadataStore should be authoritative for this test", isMetadataStoreAuthoritative(getFileSystem().getConf())); - S3Guard.ITtlTimeProvider mockTimeProvider = - mock(S3Guard.ITtlTimeProvider.class); - S3Guard.ITtlTimeProvider restoreTimeProvider = fs.getTtlTimeProvider(); + ITtlTimeProvider mockTimeProvider = + mock(ITtlTimeProvider.class); + ITtlTimeProvider restoreTimeProvider = fs.getTtlTimeProvider(); fs.setTtlTimeProvider(mockTimeProvider); when(mockTimeProvider.getNow()).thenReturn(100L); - when(mockTimeProvider.getAuthoritativeDirTtl()).thenReturn(1L); + when(mockTimeProvider.getMetadataTtl()).thenReturn(1L); Path dir = path("ttl/"); Path file = path("ttl/afile"); @@ -102,4 +144,146 @@ public void testDirectoryListingAuthoritativeTtl() throws Exception { fs.setTtlTimeProvider(restoreTimeProvider); } } + + @Test + public void testFileMetadataExpiresTtl() throws Exception { + LOG.info("Authoritative mode: {}", authoritative); + + Path fileExpire1 = path("expirettl-" + UUID.randomUUID()); + Path fileExpire2 = path("expirettl-" + UUID.randomUUID()); + Path fileRetain = path("expirettl-" + UUID.randomUUID()); + + final S3AFileSystem fs = getFileSystem(); + Assume.assumeTrue(fs.hasMetadataStore()); + final MetadataStore ms = fs.getMetadataStore(); + + ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class); + ITtlTimeProvider originalTimeProvider = fs.getTtlTimeProvider(); + + try { + fs.setTtlTimeProvider(mockTimeProvider); + when(mockTimeProvider.getMetadataTtl()).thenReturn(5L); + + // set the time, so the fileExpire1 will expire + when(mockTimeProvider.getNow()).thenReturn(100L); + touch(fs, fileExpire1); + // set the time, so fileExpire2 will expire + when(mockTimeProvider.getNow()).thenReturn(101L); + touch(fs, fileExpire2); + // set the time, so fileRetain won't expire + when(mockTimeProvider.getNow()).thenReturn(109L); + touch(fs, fileRetain); + final FileStatus origFileRetainStatus = fs.getFileStatus(fileRetain); + // change time, so the first two file metadata is expired + when(mockTimeProvider.getNow()).thenReturn(110L); + + // metadata is expired so this should refresh the metadata with + // last_updated to the getNow() + final FileStatus fileExpire1Status = fs.getFileStatus(fileExpire1); + assertNotNull(fileExpire1Status); + assertEquals(110L, ms.get(fileExpire1).getLastUpdated()); + + // metadata is expired so this should refresh the metadata with + // last_updated to the getNow() + final FileStatus fileExpire2Status = fs.getFileStatus(fileExpire2); + assertNotNull(fileExpire2Status); + assertEquals(110L, ms.get(fileExpire2).getLastUpdated()); + + final FileStatus fileRetainStatus = fs.getFileStatus(fileRetain); + assertEquals("Modification time of these files should be equal.", + origFileRetainStatus.getModificationTime(), + fileRetainStatus.getModificationTime()); + assertNotNull(fileRetainStatus); + assertEquals(109L, ms.get(fileRetain).getLastUpdated()); + } finally { + fs.delete(fileExpire1, true); + fs.delete(fileExpire2, true); + fs.delete(fileRetain, true); + fs.setTtlTimeProvider(originalTimeProvider); + } + } + + /** + * create(tombstone file) must succeed irrespective of overwrite flag. + */ + @Test + public void testCreateOnTombstonedFileSucceeds() throws Exception { + LOG.info("Authoritative mode: {}", authoritative); + final S3AFileSystem fs = getFileSystem(); + + String fileToTry = methodName + UUID.randomUUID().toString(); + + final Path filePath = path(fileToTry); + + // Create a directory with + ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class); + ITtlTimeProvider originalTimeProvider = fs.getTtlTimeProvider(); + + try { + fs.setTtlTimeProvider(mockTimeProvider); + when(mockTimeProvider.getNow()).thenReturn(100L); + when(mockTimeProvider.getMetadataTtl()).thenReturn(5L); + + // CREATE A FILE + touch(fs, filePath); + + // DELETE THE FILE - TOMBSTONE + fs.delete(filePath, true); + + // CREATE THE SAME FILE WITHOUT ERROR DESPITE THE TOMBSTONE + touch(fs, filePath); + + } finally { + fs.delete(filePath, true); + fs.setTtlTimeProvider(originalTimeProvider); + } + } + + /** + * create("parent has tombstone") must always succeed (We dont check the + * parent), but after the file has been written, all entries up the tree + * must be valid. That is: the putAncestor code will correct everything + */ + @Test + public void testCreateParentHasTombstone() throws Exception { + LOG.info("Authoritative mode: {}", authoritative); + final S3AFileSystem fs = getFileSystem(); + + String dirToDelete = methodName + UUID.randomUUID().toString(); + String fileToTry = dirToDelete + "/theFileToTry"; + + final Path dirPath = path(dirToDelete); + final Path filePath = path(fileToTry); + + // Create a directory with + ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class); + ITtlTimeProvider originalTimeProvider = fs.getTtlTimeProvider(); + + try { + fs.setTtlTimeProvider(mockTimeProvider); + when(mockTimeProvider.getNow()).thenReturn(100L); + when(mockTimeProvider.getMetadataTtl()).thenReturn(5L); + + // CREATE DIRECTORY + fs.mkdirs(dirPath); + + // DELETE DIRECTORY + fs.delete(dirPath, true); + + // WRITE TO DELETED DIRECTORY - SUCCESS + touch(fs, filePath); + + // SET TIME SO METADATA EXPIRES + when(mockTimeProvider.getNow()).thenReturn(110L); + + // WRITE TO DELETED DIRECTORY - SUCCESS + touch(fs, filePath); + + } finally { + fs.delete(filePath, true); + fs.delete(dirPath, true); + fs.setTtlTimeProvider(originalTimeProvider); + } + } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java index 9241686090..f616190040 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java @@ -280,7 +280,7 @@ private void testPruneCommand(Configuration cmdConf, Path parent, "This child should have been kept (prefix restriction).", 1); } finally { getFileSystem().delete(parent, true); - ms.prune(Long.MAX_VALUE); + ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, Long.MAX_VALUE); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java index 149d1f3606..5241dd481d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java @@ -230,6 +230,12 @@ public void tearDown() throws Exception { IOUtils.cleanupWithLogger(LOG, fileSystem); } + @Override protected String getPathStringForPrune(String path) + throws Exception { + String b = getTestBucketName(getContract().getFileSystem().getConf()); + return "/" + b + "/dir2"; + } + /** * Each contract has its own S3AFileSystem and DynamoDBMetadataStore objects. */ @@ -437,7 +443,7 @@ private void doTestBatchWrite(int numDelete, int numPut, } // move the old paths to new paths and verify - ms.move(pathsToDelete, newMetas); + ms.move(pathsToDelete, newMetas, getTtlTimeProvider()); assertEquals(0, ms.listChildren(oldDir).withoutTombstones().numEntries()); if (newMetas != null) { assertTrue(CollectionUtils @@ -650,7 +656,7 @@ public void testMovePopulatesAncestors() throws IOException { 1024, false)) ); - ddbms.move(fullSourcePaths, pathsToCreate); + ddbms.move(fullSourcePaths, pathsToCreate, getTtlTimeProvider()); // assert that all the ancestors should have been populated automatically assertCached(testRoot + "/c"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java index 301ba16aec..95c607aa66 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java @@ -243,7 +243,8 @@ public void test_030_BatchedWrite() throws Exception { if (pruneItems == BATCH_SIZE) { describe("pruning files"); - ddbms.prune(Long.MAX_VALUE /* all files */); + ddbms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, + Long.MAX_VALUE /* all files */); pruneItems = 0; } if (tracker.probe()) { @@ -305,7 +306,7 @@ public void test_050_getVersionMarkerItem() throws Throwable { private void retryingDelete(final Path path) { try { ddbms.getInvoker().retry("Delete ", path.toString(), true, - () -> ddbms.delete(path)); + () -> ddbms.delete(path, new S3Guard.TtlTimeProvider(getConf()))); } catch (IOException e) { LOG.warn("Failed to delete {}: ", path, e); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java index 55f4707fe4..754da0db79 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Set; import com.google.common.collect.Sets; @@ -68,6 +69,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase { static final FsPermission PERMISSION = null; static final String GROUP = null; private final long accessTime = 0; + private static ITtlTimeProvider ttlTimeProvider; /** * Each test should override this. Will use a new Configuration instance. @@ -123,6 +125,8 @@ public void setUp() throws Exception { assertNotNull("null MetadataStore", ms); assertNotNull("null FileSystem", contract.getFileSystem()); ms.initialize(contract.getFileSystem()); + ttlTimeProvider = + new S3Guard.TtlTimeProvider(contract.getFileSystem().getConf()); } @After @@ -310,7 +314,7 @@ public void testRootDirPutNew() throws Exception { public void testDelete() throws Exception { setUpDeleteTest(); - ms.delete(strToPath("/ADirectory1/db1/file2")); + ms.delete(strToPath("/ADirectory1/db1/file2"), ttlTimeProvider); /* Ensure delete happened. */ assertDirectorySize("/ADirectory1/db1", 1); @@ -338,7 +342,7 @@ private void deleteSubtreeHelper(String pathPrefix) throws Exception { if (!allowMissing()) { assertCached(p + "/ADirectory1/db1"); } - ms.deleteSubtree(strToPath(p + "/ADirectory1/db1/")); + ms.deleteSubtree(strToPath(p + "/ADirectory1/db1/"), ttlTimeProvider); assertEmptyDirectory(p + "/ADirectory1"); assertDeleted(p + "/ADirectory1/db1"); @@ -358,7 +362,7 @@ private void deleteSubtreeHelper(String pathPrefix) throws Exception { public void testDeleteRecursiveRoot() throws Exception { setUpDeleteTest(); - ms.deleteSubtree(strToPath("/")); + ms.deleteSubtree(strToPath("/"), ttlTimeProvider); assertDeleted("/ADirectory1"); assertDeleted("/ADirectory2"); assertDeleted("/ADirectory2/db1"); @@ -369,10 +373,10 @@ public void testDeleteRecursiveRoot() throws Exception { @Test public void testDeleteNonExisting() throws Exception { // Path doesn't exist, but should silently succeed - ms.delete(strToPath("/bobs/your/uncle")); + ms.delete(strToPath("/bobs/your/uncle"), ttlTimeProvider); // Ditto. - ms.deleteSubtree(strToPath("/internets")); + ms.deleteSubtree(strToPath("/internets"), ttlTimeProvider); } @@ -408,7 +412,7 @@ public void testGet() throws Exception { } if (!(ms instanceof NullMetadataStore)) { - ms.delete(strToPath(filePath)); + ms.delete(strToPath(filePath), ttlTimeProvider); meta = ms.get(strToPath(filePath)); assertTrue("Tombstone not left for deleted file", meta.isDeleted()); } @@ -586,7 +590,7 @@ public void testMove() throws Exception { destMetas.add(new PathMetadata(makeDirStatus("/b1"))); destMetas.add(new PathMetadata(makeFileStatus("/b1/file1", 100))); destMetas.add(new PathMetadata(makeFileStatus("/b1/file2", 100))); - ms.move(srcPaths, destMetas); + ms.move(srcPaths, destMetas, ttlTimeProvider); // Assert src is no longer there dirMeta = ms.listChildren(strToPath("/a1")); @@ -636,11 +640,11 @@ public void testMultiBucketPaths() throws Exception { // Make sure delete is correct as well if (!allowMissing()) { - ms.delete(new Path(p2)); + ms.delete(new Path(p2), ttlTimeProvider); meta = ms.get(new Path(p1)); assertNotNull("Path should not have been deleted", meta); } - ms.delete(new Path(p1)); + ms.delete(new Path(p1), ttlTimeProvider); } @Test @@ -668,7 +672,7 @@ public void testPruneFiles() throws Exception { assertListingsEqual(ls.getListing(), "/pruneFiles/new", "/pruneFiles/old"); } - ms.prune(cutoff); + ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, cutoff); ls = ms.listChildren(strToPath("/pruneFiles")); if (allowMissing()) { assertDeleted("/pruneFiles/old"); @@ -698,7 +702,7 @@ public void testPruneDirs() throws Exception { Thread.sleep(1); long cutoff = getTime(); - ms.prune(cutoff); + ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, cutoff); assertDeleted("/pruneDirs/dir/file"); } @@ -728,7 +732,7 @@ public void testPruneUnsetsAuthoritative() throws Exception { ms.put(parentDirMd); } - ms.prune(time); + ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, time); DirListingMetadata listing; for (String directory : directories) { Path path = strToPath(directory); @@ -765,7 +769,7 @@ public void testPrunePreservesAuthoritative() throws Exception { ms.put(parentDirMd); // prune the ms - ms.prune(time); + ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, time); // get the directory listings DirListingMetadata rootDirMd = ms.listChildren(strToPath(rootDir)); @@ -823,6 +827,89 @@ public void testPutRetainsIsDeletedInParentListing() throws Exception { } } + @Test + public void testPruneExpiredTombstones() throws Exception { + List keepFilenames = new ArrayList<>( + Arrays.asList("/dir1/fileK1", "/dir1/fileK2", "/dir1/fileK3")); + List removeFilenames = new ArrayList<>( + Arrays.asList("/dir1/fileR1", "/dir1/fileR2", "/dir1/fileR3")); + + long cutoff = 9001; + + for(String fN : keepFilenames) { + final PathMetadata pathMetadata = new PathMetadata(makeFileStatus(fN, 1)); + pathMetadata.setLastUpdated(9002L); + ms.put(pathMetadata); + } + + for(String fN : removeFilenames) { + final PathMetadata pathMetadata = new PathMetadata(makeFileStatus(fN, 1)); + pathMetadata.setLastUpdated(9000L); + // tombstones are the deleted files! + pathMetadata.setIsDeleted(true); + ms.put(pathMetadata); + } + + ms.prune(MetadataStore.PruneMode.TOMBSTONES_BY_LASTUPDATED, cutoff); + + if (!allowMissing()) { + for (String fN : keepFilenames) { + final PathMetadata pathMetadata = ms.get(strToPath(fN)); + assertNotNull("Kept files should be in the metastore after prune", + pathMetadata); + } + } + + for(String fN : removeFilenames) { + final PathMetadata pathMetadata = ms.get(strToPath(fN)); + assertNull("Expired tombstones should be removed from metastore after " + + "the prune.", pathMetadata); + } + } + + @Test + public void testPruneExpiredTombstonesSpecifiedPath() throws Exception { + List keepFilenames = new ArrayList<>( + Arrays.asList("/dir1/fileK1", "/dir1/fileK2", "/dir1/fileK3")); + List removeFilenames = new ArrayList<>( + Arrays.asList("/dir2/fileR1", "/dir2/fileR2", "/dir2/fileR3")); + + long cutoff = 9001; + + // Both are expired. Difference is it will only delete the specified one. + for (String fN : keepFilenames) { + final PathMetadata pathMetadata = new PathMetadata(makeFileStatus(fN, 1)); + pathMetadata.setLastUpdated(9002L); + ms.put(pathMetadata); + } + + for (String fN : removeFilenames) { + final PathMetadata pathMetadata = new PathMetadata(makeFileStatus(fN, 1)); + pathMetadata.setLastUpdated(9000L); + // tombstones are the deleted files! + pathMetadata.setIsDeleted(true); + ms.put(pathMetadata); + } + + final String prunePath = getPathStringForPrune("/dir2"); + ms.prune(MetadataStore.PruneMode.TOMBSTONES_BY_LASTUPDATED, cutoff, + prunePath); + + if (!allowMissing()) { + for (String fN : keepFilenames) { + final PathMetadata pathMetadata = ms.get(strToPath(fN)); + assertNotNull("Kept files should be in the metastore after prune", + pathMetadata); + } + } + + for (String fN : removeFilenames) { + final PathMetadata pathMetadata = ms.get(strToPath(fN)); + assertNull("Expired tombstones should be removed from metastore after " + + "the prune.", pathMetadata); + } + } + /* * Helper functions. */ @@ -837,6 +924,16 @@ private String[] buildPathStrings(String parent, String... paths) return paths; } + + /** + * The prune operation needs the path with the bucket name as a string in + * {@link DynamoDBMetadataStore}, but not for {@link LocalMetadataStore}. + * This is an implementation detail of the ms, so this should be + * implemented in the subclasses. + */ + protected abstract String getPathStringForPrune(String path) + throws Exception; + private void commonTestPutListStatus(final String parent) throws IOException { putListStatusFiles(parent, true, buildPathStrings(parent, "file1", "file2", "file3")); @@ -1012,4 +1109,8 @@ protected static long getTime() { return System.currentTimeMillis(); } + protected static ITtlTimeProvider getTtlTimeProvider() { + return ttlTimeProvider; + } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java index d0156f13e8..ee7b584ca1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java @@ -75,6 +75,11 @@ public AbstractMSContract createContract(Configuration conf) throws return new LocalMSContract(conf); } + @Override protected String getPathStringForPrune(String path) + throws Exception{ + return path; + } + @Test public void testClearByAncestor() throws Exception { Cache cache = CacheBuilder.newBuilder().build(); @@ -184,7 +189,7 @@ private static void assertClearResult(Cache cache, String prefixStr, String pathStr, int leftoverSize) throws IOException { populateMap(cache, prefixStr); LocalMetadataStore.deleteEntryByAncestor(new Path(prefixStr + pathStr), - cache, true); + cache, true, getTtlTimeProvider()); assertEquals(String.format("Cache should have %d entries", leftoverSize), leftoverSize, sizeOfMap(cache)); cache.invalidateAll(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestNullMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestNullMetadataStore.java index c0541ea98e..2e0bc4b7e4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestNullMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestNullMetadataStore.java @@ -46,6 +46,11 @@ public boolean allowMissing() { return true; } + @Override protected String getPathStringForPrune(String path) + throws Exception { + return path; + } + @Override public AbstractMSContract createContract() { return new NullMSContract(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java index b246da2d50..bdb256cba3 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java @@ -18,18 +18,28 @@ package org.apache.hadoop.fs.s3a.s3guard; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.Tristate; -import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL; +import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for the {@link S3Guard} utility class. @@ -58,8 +68,8 @@ public void testDirListingUnion() throws Exception { makeFileStatus("s3a://bucket/dir/s3-file4", false) ); - S3Guard.ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider( - DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL); + ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider( + DEFAULT_METADATASTORE_METADATA_TTL); FileStatus[] result = S3Guard.dirListingUnion(ms, dirPath, s3Listing, dirMeta, false, timeProvider); @@ -70,6 +80,185 @@ public void testDirListingUnion() throws Exception { assertContainsPath(result, "s3a://bucket/dir/s3-file4"); } + @Test + public void testPutWithTtlDirListingMeta() throws Exception { + // arrange + DirListingMetadata dlm = new DirListingMetadata(new Path("/"), null, + false); + MetadataStore ms = spy(MetadataStore.class); + ITtlTimeProvider timeProvider = + mock(ITtlTimeProvider.class); + when(timeProvider.getNow()).thenReturn(100L); + + // act + S3Guard.putWithTtl(ms, dlm, timeProvider); + + // assert + assertEquals("last update in " + dlm, 100L, dlm.getLastUpdated()); + verify(timeProvider, times(1)).getNow(); + verify(ms, times(1)).put(dlm); + } + + @Test + public void testPutWithTtlFileMeta() throws Exception { + // arrange + S3AFileStatus fileStatus = mock(S3AFileStatus.class); + when(fileStatus.getPath()).thenReturn(new Path("/")); + PathMetadata pm = new PathMetadata(fileStatus); + MetadataStore ms = spy(MetadataStore.class); + ITtlTimeProvider timeProvider = + mock(ITtlTimeProvider.class); + when(timeProvider.getNow()).thenReturn(100L); + + // act + S3Guard.putWithTtl(ms, pm, timeProvider); + + // assert + assertEquals("last update in " + pm, 100L, pm.getLastUpdated()); + verify(timeProvider, times(1)).getNow(); + verify(ms, times(1)).put(pm); + } + + @Test + public void testPutWithTtlCollection() throws Exception { + // arrange + S3AFileStatus fileStatus = mock(S3AFileStatus.class); + when(fileStatus.getPath()).thenReturn(new Path("/")); + Collection pmCollection = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + pmCollection.add(new PathMetadata(fileStatus)); + } + MetadataStore ms = spy(MetadataStore.class); + ITtlTimeProvider timeProvider = + mock(ITtlTimeProvider.class); + when(timeProvider.getNow()).thenReturn(100L); + + // act + S3Guard.putWithTtl(ms, pmCollection, timeProvider); + + // assert + pmCollection.forEach( + pm -> assertEquals(100L, pm.getLastUpdated()) + ); + verify(timeProvider, times(1)).getNow(); + verify(ms, times(1)).put(pmCollection); + } + + @Test + public void testGetWithTtlExpired() throws Exception { + // arrange + S3AFileStatus fileStatus = mock(S3AFileStatus.class); + Path path = new Path("/file"); + when(fileStatus.getPath()).thenReturn(path); + PathMetadata pm = new PathMetadata(fileStatus); + pm.setLastUpdated(100L); + + MetadataStore ms = mock(MetadataStore.class); + when(ms.get(path)).thenReturn(pm); + + ITtlTimeProvider timeProvider = + mock(ITtlTimeProvider.class); + when(timeProvider.getNow()).thenReturn(101L); + when(timeProvider.getMetadataTtl()).thenReturn(1L); + + // act + final PathMetadata pmExpired = S3Guard.getWithTtl(ms, path, timeProvider); + + // assert + assertNull(pmExpired); + } + + @Test + public void testGetWithTtlNotExpired() throws Exception { + // arrange + S3AFileStatus fileStatus = mock(S3AFileStatus.class); + Path path = new Path("/file"); + when(fileStatus.getPath()).thenReturn(path); + PathMetadata pm = new PathMetadata(fileStatus); + pm.setLastUpdated(100L); + + MetadataStore ms = mock(MetadataStore.class); + when(ms.get(path)).thenReturn(pm); + + ITtlTimeProvider timeProvider = + mock(ITtlTimeProvider.class); + when(timeProvider.getNow()).thenReturn(101L); + when(timeProvider.getMetadataTtl()).thenReturn(2L); + + // act + final PathMetadata pmNotExpired = + S3Guard.getWithTtl(ms, path, timeProvider); + + // assert + assertNotNull(pmNotExpired); + } + + @Test + public void testGetWithZeroLastUpdatedNotExpired() throws Exception { + // arrange + S3AFileStatus fileStatus = mock(S3AFileStatus.class); + Path path = new Path("/file"); + when(fileStatus.getPath()).thenReturn(path); + PathMetadata pm = new PathMetadata(fileStatus); + // we set 0 this time as the last updated: can happen eg. when we use an + // old dynamo table + pm.setLastUpdated(0L); + + MetadataStore ms = mock(MetadataStore.class); + when(ms.get(path)).thenReturn(pm); + + ITtlTimeProvider timeProvider = + mock(ITtlTimeProvider.class); + when(timeProvider.getNow()).thenReturn(101L); + when(timeProvider.getMetadataTtl()).thenReturn(2L); + + // act + final PathMetadata pmExpired = S3Guard.getWithTtl(ms, path, timeProvider); + + // assert + assertNotNull(pmExpired); + } + + + /** + * Makes sure that all uses of TTL timeouts use a consistent time unit. + * @throws Throwable failure + */ + @Test + public void testTTLConstruction() throws Throwable { + // first one + ITtlTimeProvider timeProviderExplicit = new S3Guard.TtlTimeProvider( + DEFAULT_METADATASTORE_METADATA_TTL); + + // mirror the FS construction, + // from a config guaranteed to be empty (i.e. the code defval) + Configuration conf = new Configuration(false); + long millitime = conf.getTimeDuration(METADATASTORE_METADATA_TTL, + DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS); + assertEquals(15 * 60_000, millitime); + S3Guard.TtlTimeProvider fsConstruction = new S3Guard.TtlTimeProvider( + millitime); + assertEquals("explicit vs fs construction", timeProviderExplicit, + fsConstruction); + assertEquals("first and second constructor", timeProviderExplicit, + new S3Guard.TtlTimeProvider(conf)); + // set the conf to a time without unit + conf.setLong(METADATASTORE_METADATA_TTL, + DEFAULT_METADATASTORE_METADATA_TTL); + assertEquals("first and second time set through long", timeProviderExplicit, + new S3Guard.TtlTimeProvider(conf)); + double timeInSeconds = DEFAULT_METADATASTORE_METADATA_TTL / 1000; + double timeInMinutes = timeInSeconds / 60; + String timeStr = String.format("%dm", (int) timeInMinutes); + assertEquals(":wrong time in minutes from " + timeInMinutes, + "15m", timeStr); + conf.set(METADATASTORE_METADATA_TTL, timeStr); + assertEquals("Time in millis as string from " + + conf.get(METADATASTORE_METADATA_TTL), + timeProviderExplicit, + new S3Guard.TtlTimeProvider(conf)); + } + void assertContainsPath(FileStatus[] statuses, String pathStr) { assertTrue("listing doesn't contain " + pathStr, containsPath(statuses, pathStr)); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java index b843392ebf..1bffc3b1b7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java @@ -18,11 +18,15 @@ package org.apache.hadoop.fs.s3a.scale; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider; import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; import org.apache.hadoop.fs.s3a.s3guard.PathMetadata; +import org.apache.hadoop.fs.s3a.s3guard.S3Guard; +import org.junit.Before; import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; @@ -54,6 +58,12 @@ public abstract class AbstractITestS3AMetadataStoreScale extends static final long ACCESS_TIME = System.currentTimeMillis(); static final Path BUCKET_ROOT = new Path("s3a://fake-bucket/"); + private ITtlTimeProvider ttlTimeProvider; + + @Before + public void initialize() { + ttlTimeProvider = new S3Guard.TtlTimeProvider(new Configuration()); + } /** * Subclasses should override this to provide the MetadataStore they which @@ -129,7 +139,7 @@ public void test_020_Moves() throws Throwable { toDelete = movedPaths; toCreate = origMetas; } - ms.move(toDelete, toCreate); + ms.move(toDelete, toCreate, ttlTimeProvider); } moveTimer.end(); printTiming(LOG, "move", moveTimer, operations); @@ -194,7 +204,7 @@ protected void clearMetadataStore(MetadataStore ms, long count) throws IOException { describe("Recursive deletion"); NanoTimer deleteTimer = new NanoTimer(); - ms.deleteSubtree(BUCKET_ROOT); + ms.deleteSubtree(BUCKET_ROOT, ttlTimeProvider); deleteTimer.end(); printTiming(LOG, "delete", deleteTimer, count); }