HADOOP-16279. S3Guard: Implement time-based (TTL) expiry for entries (and tombstones).
Contributed by Gabor Bota. Change-Id: I73a2d2861901dedfe7a0e783b310fbb95e7c1af9
This commit is contained in:
parent
e70aeb4d7e
commit
f9cc9e1621
@ -1502,12 +1502,10 @@
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.metadatastore.authoritative.dir.ttl</name>
|
||||
<value>3600000</value>
|
||||
<name>fs.s3a.metadatastore.metadata.ttl</name>
|
||||
<value>15m</value>
|
||||
<description>
|
||||
This value sets how long a directory listing in the MS is considered as
|
||||
authoritative. The value is in milliseconds.
|
||||
MetadataStore should be authoritative to use this configuration knob.
|
||||
This value sets how long an entry in a MetadataStore is valid.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
@ -353,10 +353,14 @@ private Constants() {
|
||||
/**
|
||||
* How long a directory listing in the MS is considered as authoritative.
|
||||
*/
|
||||
public static final String METADATASTORE_AUTHORITATIVE_DIR_TTL =
|
||||
"fs.s3a.metadatastore.authoritative.dir.ttl";
|
||||
public static final long DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL =
|
||||
TimeUnit.MINUTES.toMillis(60);
|
||||
public static final String METADATASTORE_METADATA_TTL =
|
||||
"fs.s3a.metadatastore.metadata.ttl";
|
||||
|
||||
/**
|
||||
* Default TTL in milliseconds: 15 minutes.
|
||||
*/
|
||||
public static final long DEFAULT_METADATASTORE_METADATA_TTL =
|
||||
TimeUnit.MINUTES.toMillis(15);
|
||||
|
||||
/** read ahead buffer size to prevent connection re-establishments. */
|
||||
public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
|
||||
|
@ -464,7 +464,7 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
|
||||
if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) {
|
||||
S3AFileStatus status = createFileStatus(keyPath, summary,
|
||||
owner.getDefaultBlockSize(keyPath), owner.getUsername(),
|
||||
null, null);
|
||||
summary.getETag(), null);
|
||||
LOG.debug("Adding: {}", status);
|
||||
stats.add(status);
|
||||
added++;
|
||||
|
@ -126,6 +126,7 @@
|
||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
|
||||
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.fs.store.EtagChecksum;
|
||||
@ -244,7 +245,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||
|
||||
private AWSCredentialProviderList credentials;
|
||||
|
||||
private S3Guard.ITtlTimeProvider ttlTimeProvider;
|
||||
private ITtlTimeProvider ttlTimeProvider;
|
||||
|
||||
/** Add any deprecated keys. */
|
||||
@SuppressWarnings("deprecation")
|
||||
@ -388,9 +389,11 @@ public void initialize(URI name, Configuration originalConf)
|
||||
getMetadataStore(), allowAuthoritative);
|
||||
}
|
||||
initMultipartUploads(conf);
|
||||
long authDirTtl = conf.getLong(METADATASTORE_AUTHORITATIVE_DIR_TTL,
|
||||
DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL);
|
||||
ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
|
||||
if (hasMetadataStore()) {
|
||||
long authDirTtl = conf.getTimeDuration(METADATASTORE_METADATA_TTL,
|
||||
DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
|
||||
ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
|
||||
}
|
||||
} catch (AmazonClientException e) {
|
||||
throw translateException("initializing ", new Path(name), e);
|
||||
}
|
||||
@ -1341,7 +1344,7 @@ childDst, length, getDefaultBlockSize(childDst), username,
|
||||
}
|
||||
}
|
||||
|
||||
metadataStore.move(srcPaths, dstMetas);
|
||||
metadataStore.move(srcPaths, dstMetas, ttlTimeProvider);
|
||||
|
||||
if (!src.getParent().equals(dst.getParent())) {
|
||||
LOG.debug("source & dest parents are different; fix up dir markers");
|
||||
@ -1722,7 +1725,7 @@ void deleteObjectAtPath(Path f, String key, boolean isFile)
|
||||
instrumentation.directoryDeleted();
|
||||
}
|
||||
deleteObject(key);
|
||||
metadataStore.delete(f);
|
||||
metadataStore.delete(f, ttlTimeProvider);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -2143,7 +2146,7 @@ private boolean innerDelete(S3AFileStatus status, boolean recursive)
|
||||
}
|
||||
}
|
||||
}
|
||||
metadataStore.deleteSubtree(f);
|
||||
metadataStore.deleteSubtree(f, ttlTimeProvider);
|
||||
} else {
|
||||
LOG.debug("delete: Path is a file");
|
||||
deleteObjectAtPath(f, key, true);
|
||||
@ -2466,7 +2469,10 @@ S3AFileStatus innerGetFileStatus(final Path f,
|
||||
LOG.debug("Getting path status for {} ({})", path, key);
|
||||
|
||||
// Check MetadataStore, if any.
|
||||
PathMetadata pm = metadataStore.get(path, needEmptyDirectoryFlag);
|
||||
PathMetadata pm = null;
|
||||
if (hasMetadataStore()) {
|
||||
pm = S3Guard.getWithTtl(metadataStore, path, ttlTimeProvider);
|
||||
}
|
||||
Set<Path> tombstones = Collections.emptySet();
|
||||
if (pm != null) {
|
||||
if (pm.isDeleted()) {
|
||||
@ -2501,7 +2507,7 @@ S3AFileStatus innerGetFileStatus(final Path f,
|
||||
LOG.debug("S3Guard metadata for {} is outdated, updating it",
|
||||
path);
|
||||
return S3Guard.putAndReturn(metadataStore, s3AFileStatus,
|
||||
instrumentation);
|
||||
instrumentation, ttlTimeProvider);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2534,12 +2540,14 @@ S3AFileStatus innerGetFileStatus(final Path f,
|
||||
null, null);
|
||||
}
|
||||
// entry was found, save in S3Guard
|
||||
return S3Guard.putAndReturn(metadataStore, s3FileStatus, instrumentation);
|
||||
return S3Guard.putAndReturn(metadataStore, s3FileStatus,
|
||||
instrumentation, ttlTimeProvider);
|
||||
} else {
|
||||
// there was no entry in S3Guard
|
||||
// retrieve the data and update the metadata store in the process.
|
||||
return S3Guard.putAndReturn(metadataStore,
|
||||
s3GetFileStatus(path, key, tombstones), instrumentation);
|
||||
s3GetFileStatus(path, key, tombstones), instrumentation,
|
||||
ttlTimeProvider);
|
||||
}
|
||||
}
|
||||
|
||||
@ -3191,11 +3199,12 @@ void finishedWrite(String key, long length, String eTag, String versionId)
|
||||
// See note about failure semantics in S3Guard documentation
|
||||
try {
|
||||
if (hasMetadataStore()) {
|
||||
S3Guard.addAncestors(metadataStore, p, username);
|
||||
S3Guard.addAncestors(metadataStore, p, username, ttlTimeProvider);
|
||||
S3AFileStatus status = createUploadFileStatus(p,
|
||||
S3AUtils.objectRepresentsDirectory(key, length), length,
|
||||
getDefaultBlockSize(p), username, eTag, versionId);
|
||||
S3Guard.putAndReturn(metadataStore, status, instrumentation);
|
||||
S3Guard.putAndReturn(metadataStore, status, instrumentation,
|
||||
ttlTimeProvider);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (failOnMetadataWriteError) {
|
||||
@ -3860,12 +3869,12 @@ public AWSCredentialProviderList shareCredentials(final String purpose) {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected S3Guard.ITtlTimeProvider getTtlTimeProvider() {
|
||||
public ITtlTimeProvider getTtlTimeProvider() {
|
||||
return ttlTimeProvider;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected void setTtlTimeProvider(S3Guard.ITtlTimeProvider ttlTimeProvider) {
|
||||
protected void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
|
||||
this.ttlTimeProvider = ttlTimeProvider;
|
||||
}
|
||||
|
||||
|
@ -189,8 +189,10 @@
|
||||
* directory helps prevent unnecessary queries during traversal of an entire
|
||||
* sub-tree.
|
||||
*
|
||||
* Some mutating operations, notably {@link #deleteSubtree(Path)} and
|
||||
* {@link #move(Collection, Collection)}, are less efficient with this schema.
|
||||
* Some mutating operations, notably
|
||||
* {@link MetadataStore#deleteSubtree(Path, ITtlTimeProvider)} and
|
||||
* {@link MetadataStore#move(Collection, Collection, ITtlTimeProvider)},
|
||||
* are less efficient with this schema.
|
||||
* They require mutating multiple items in the DynamoDB table.
|
||||
*
|
||||
* By default, DynamoDB access is performed within the same AWS region as
|
||||
@ -471,14 +473,15 @@ private void initDataAccessRetries(Configuration config) {
|
||||
|
||||
@Override
|
||||
@Retries.RetryTranslated
|
||||
public void delete(Path path) throws IOException {
|
||||
innerDelete(path, true);
|
||||
public void delete(Path path, ITtlTimeProvider ttlTimeProvider)
|
||||
throws IOException {
|
||||
innerDelete(path, true, ttlTimeProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Retries.RetryTranslated
|
||||
public void forgetMetadata(Path path) throws IOException {
|
||||
innerDelete(path, false);
|
||||
innerDelete(path, false, null);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -487,10 +490,13 @@ public void forgetMetadata(Path path) throws IOException {
|
||||
* There is no check as to whether the entry exists in the table first.
|
||||
* @param path path to delete
|
||||
* @param tombstone flag to create a tombstone marker
|
||||
* @param ttlTimeProvider The time provider to set last_updated. Must not
|
||||
* be null if tombstone is true.
|
||||
* @throws IOException I/O error.
|
||||
*/
|
||||
@Retries.RetryTranslated
|
||||
private void innerDelete(final Path path, boolean tombstone)
|
||||
private void innerDelete(final Path path, boolean tombstone,
|
||||
ITtlTimeProvider ttlTimeProvider)
|
||||
throws IOException {
|
||||
checkPath(path);
|
||||
LOG.debug("Deleting from table {} in region {}: {}",
|
||||
@ -505,8 +511,13 @@ private void innerDelete(final Path path, boolean tombstone)
|
||||
// on that of S3A itself
|
||||
boolean idempotent = S3AFileSystem.DELETE_CONSIDERED_IDEMPOTENT;
|
||||
if (tombstone) {
|
||||
Preconditions.checkArgument(ttlTimeProvider != null, "ttlTimeProvider "
|
||||
+ "must not be null");
|
||||
final PathMetadata pmTombstone = PathMetadata.tombstone(path);
|
||||
// update the last updated field of record when putting a tombstone
|
||||
pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
|
||||
Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
|
||||
new DDBPathMetadata(PathMetadata.tombstone(path)));
|
||||
new DDBPathMetadata(pmTombstone));
|
||||
writeOp.retry(
|
||||
"Put tombstone",
|
||||
path.toString(),
|
||||
@ -524,7 +535,8 @@ private void innerDelete(final Path path, boolean tombstone)
|
||||
|
||||
@Override
|
||||
@Retries.RetryTranslated
|
||||
public void deleteSubtree(Path path) throws IOException {
|
||||
public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
|
||||
throws IOException {
|
||||
checkPath(path);
|
||||
LOG.debug("Deleting subtree from table {} in region {}: {}",
|
||||
tableName, region, path);
|
||||
@ -537,7 +549,7 @@ public void deleteSubtree(Path path) throws IOException {
|
||||
|
||||
for (DescendantsIterator desc = new DescendantsIterator(this, meta);
|
||||
desc.hasNext();) {
|
||||
innerDelete(desc.next().getPath(), true);
|
||||
innerDelete(desc.next().getPath(), true, ttlTimeProvider);
|
||||
}
|
||||
}
|
||||
|
||||
@ -731,7 +743,8 @@ Collection<DDBPathMetadata> completeAncestry(
|
||||
@Override
|
||||
@Retries.RetryTranslated
|
||||
public void move(Collection<Path> pathsToDelete,
|
||||
Collection<PathMetadata> pathsToCreate) throws IOException {
|
||||
Collection<PathMetadata> pathsToCreate, ITtlTimeProvider ttlTimeProvider)
|
||||
throws IOException {
|
||||
if (pathsToDelete == null && pathsToCreate == null) {
|
||||
return;
|
||||
}
|
||||
@ -754,7 +767,11 @@ public void move(Collection<Path> pathsToDelete,
|
||||
}
|
||||
if (pathsToDelete != null) {
|
||||
for (Path meta : pathsToDelete) {
|
||||
newItems.add(new DDBPathMetadata(PathMetadata.tombstone(meta)));
|
||||
Preconditions.checkArgument(ttlTimeProvider != null, "ttlTimeProvider"
|
||||
+ " must not be null");
|
||||
final PathMetadata pmTombstone = PathMetadata.tombstone(meta);
|
||||
pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
|
||||
newItems.add(new DDBPathMetadata(pmTombstone));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1024,14 +1041,37 @@ public void destroy() throws IOException {
|
||||
}
|
||||
|
||||
@Retries.RetryTranslated
|
||||
private ItemCollection<ScanOutcome> expiredFiles(long modTime,
|
||||
String keyPrefix) throws IOException {
|
||||
String filterExpression =
|
||||
"mod_time < :mod_time and begins_with(parent, :parent)";
|
||||
String projectionExpression = "parent,child";
|
||||
ValueMap map = new ValueMap()
|
||||
.withLong(":mod_time", modTime)
|
||||
.withString(":parent", keyPrefix);
|
||||
private ItemCollection<ScanOutcome> expiredFiles(PruneMode pruneMode,
|
||||
long cutoff, String keyPrefix) throws IOException {
|
||||
|
||||
String filterExpression;
|
||||
String projectionExpression;
|
||||
ValueMap map;
|
||||
|
||||
switch (pruneMode) {
|
||||
case ALL_BY_MODTIME:
|
||||
filterExpression =
|
||||
"mod_time < :mod_time and begins_with(parent, :parent)";
|
||||
projectionExpression = "parent,child";
|
||||
map = new ValueMap()
|
||||
.withLong(":mod_time", cutoff)
|
||||
.withString(":parent", keyPrefix);
|
||||
break;
|
||||
case TOMBSTONES_BY_LASTUPDATED:
|
||||
filterExpression =
|
||||
"last_updated < :last_updated and begins_with(parent, :parent) "
|
||||
+ "and is_deleted = :is_deleted";
|
||||
projectionExpression = "parent,child";
|
||||
map = new ValueMap()
|
||||
.withLong(":last_updated", cutoff)
|
||||
.withString(":parent", keyPrefix)
|
||||
.withBoolean(":is_deleted", true);
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException("Unsupported prune mode: "
|
||||
+ pruneMode);
|
||||
}
|
||||
|
||||
return readOp.retry(
|
||||
"scan",
|
||||
keyPrefix,
|
||||
@ -1041,20 +1081,31 @@ private ItemCollection<ScanOutcome> expiredFiles(long modTime,
|
||||
|
||||
@Override
|
||||
@Retries.RetryTranslated
|
||||
public void prune(long modTime) throws IOException {
|
||||
prune(modTime, "/");
|
||||
public void prune(PruneMode pruneMode, long cutoff) throws IOException {
|
||||
prune(pruneMode, cutoff, "/");
|
||||
}
|
||||
|
||||
/**
|
||||
* Prune files, in batches. There's a sleep between each batch.
|
||||
* @param modTime Oldest modification time to allow
|
||||
*
|
||||
* @param pruneMode The mode of operation for the prune For details see
|
||||
* {@link MetadataStore#prune(PruneMode, long)}
|
||||
* @param cutoff Oldest modification time to allow
|
||||
* @param keyPrefix The prefix for the keys that should be removed
|
||||
* @throws IOException Any IO/DDB failure.
|
||||
* @throws InterruptedIOException if the prune was interrupted
|
||||
*/
|
||||
@Override
|
||||
@Retries.RetryTranslated
|
||||
public void prune(long modTime, String keyPrefix) throws IOException {
|
||||
public void prune(PruneMode pruneMode, long cutoff, String keyPrefix)
|
||||
throws IOException {
|
||||
final ItemCollection<ScanOutcome> items =
|
||||
expiredFiles(pruneMode, cutoff, keyPrefix);
|
||||
innerPrune(items);
|
||||
}
|
||||
|
||||
private void innerPrune(ItemCollection<ScanOutcome> items)
|
||||
throws IOException {
|
||||
int itemCount = 0;
|
||||
try {
|
||||
Collection<Path> deletionBatch =
|
||||
@ -1064,7 +1115,7 @@ public void prune(long modTime, String keyPrefix) throws IOException {
|
||||
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
Set<Path> parentPathSet = new HashSet<>();
|
||||
for (Item item : expiredFiles(modTime, keyPrefix)) {
|
||||
for (Item item : items) {
|
||||
DDBPathMetadata md = PathMetadataDynamoDBTranslation
|
||||
.itemToPathMetadata(item, username);
|
||||
Path path = md.getFileStatus().getPath();
|
||||
|
@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.s3guard;
|
||||
|
||||
/**
|
||||
* This interface is defined for handling TTL expiry of metadata in S3Guard.
|
||||
*
|
||||
* TTL can be tested by implementing this interface and setting is as
|
||||
* {@code S3Guard.ttlTimeProvider}. By doing this, getNow() can return any
|
||||
* value preferred and flaky tests could be avoided. By default getNow()
|
||||
* returns the EPOCH in runtime.
|
||||
*
|
||||
* Time is measured in milliseconds,
|
||||
*/
|
||||
public interface ITtlTimeProvider {
|
||||
long getNow();
|
||||
long getMetadataTtl();
|
||||
}
|
@ -112,32 +112,34 @@ public String toString() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(Path p) throws IOException {
|
||||
doDelete(p, false, true);
|
||||
public void delete(Path p, ITtlTimeProvider ttlTimeProvider)
|
||||
throws IOException {
|
||||
doDelete(p, false, true, ttlTimeProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forgetMetadata(Path p) throws IOException {
|
||||
doDelete(p, false, false);
|
||||
doDelete(p, false, false, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteSubtree(Path path) throws IOException {
|
||||
doDelete(path, true, true);
|
||||
public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
|
||||
throws IOException {
|
||||
doDelete(path, true, true, ttlTimeProvider);
|
||||
}
|
||||
|
||||
private synchronized void doDelete(Path p, boolean recursive, boolean
|
||||
tombstone) {
|
||||
private synchronized void doDelete(Path p, boolean recursive,
|
||||
boolean tombstone, ITtlTimeProvider ttlTimeProvider) {
|
||||
|
||||
Path path = standardize(p);
|
||||
|
||||
// Delete entry from file cache, then from cached parent directory, if any
|
||||
|
||||
deleteCacheEntries(path, tombstone);
|
||||
deleteCacheEntries(path, tombstone, ttlTimeProvider);
|
||||
|
||||
if (recursive) {
|
||||
// Remove all entries that have this dir as path prefix.
|
||||
deleteEntryByAncestor(path, localCache, tombstone);
|
||||
deleteEntryByAncestor(path, localCache, tombstone, ttlTimeProvider);
|
||||
}
|
||||
}
|
||||
|
||||
@ -191,7 +193,8 @@ public synchronized DirListingMetadata listChildren(Path p) throws
|
||||
|
||||
@Override
|
||||
public void move(Collection<Path> pathsToDelete,
|
||||
Collection<PathMetadata> pathsToCreate) throws IOException {
|
||||
Collection<PathMetadata> pathsToCreate,
|
||||
ITtlTimeProvider ttlTimeProvider) throws IOException {
|
||||
LOG.info("Move {} to {}", pathsToDelete, pathsToCreate);
|
||||
|
||||
Preconditions.checkNotNull(pathsToDelete, "pathsToDelete is null");
|
||||
@ -205,7 +208,7 @@ public void move(Collection<Path> pathsToDelete,
|
||||
// 1. Delete pathsToDelete
|
||||
for (Path meta : pathsToDelete) {
|
||||
LOG.debug("move: deleting metadata {}", meta);
|
||||
delete(meta);
|
||||
delete(meta, ttlTimeProvider);
|
||||
}
|
||||
|
||||
// 2. Create new destination path metadata
|
||||
@ -332,18 +335,19 @@ public void destroy() throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prune(long modTime) throws IOException{
|
||||
prune(modTime, "");
|
||||
public void prune(PruneMode pruneMode, long cutoff) throws IOException{
|
||||
prune(pruneMode, cutoff, "");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void prune(long modTime, String keyPrefix) {
|
||||
public synchronized void prune(PruneMode pruneMode, long cutoff,
|
||||
String keyPrefix) {
|
||||
// prune files
|
||||
// filter path_metadata (files), filter expired, remove expired
|
||||
localCache.asMap().entrySet().stream()
|
||||
.filter(entry -> entry.getValue().hasPathMeta())
|
||||
.filter(entry -> expired(
|
||||
entry.getValue().getFileMeta().getFileStatus(), modTime, keyPrefix))
|
||||
.filter(entry -> expired(pruneMode,
|
||||
entry.getValue().getFileMeta(), cutoff, keyPrefix))
|
||||
.forEach(entry -> localCache.invalidate(entry.getKey()));
|
||||
|
||||
|
||||
@ -358,28 +362,37 @@ public synchronized void prune(long modTime, String keyPrefix) {
|
||||
Collection<PathMetadata> newChildren = new LinkedList<>();
|
||||
|
||||
for (PathMetadata child : oldChildren) {
|
||||
FileStatus status = child.getFileStatus();
|
||||
if (!expired(status, modTime, keyPrefix)) {
|
||||
if (!expired(pruneMode, child, cutoff, keyPrefix)) {
|
||||
newChildren.add(child);
|
||||
}
|
||||
}
|
||||
if (newChildren.size() != oldChildren.size()) {
|
||||
DirListingMetadata dlm =
|
||||
new DirListingMetadata(path, newChildren, false);
|
||||
localCache.put(path, new LocalMetadataEntry(dlm));
|
||||
if (!path.isRoot()) {
|
||||
DirListingMetadata parent = getDirListingMeta(path.getParent());
|
||||
if (parent != null) {
|
||||
parent.setAuthoritative(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
removeAuthoritativeFromParent(path, oldChildren, newChildren);
|
||||
});
|
||||
}
|
||||
|
||||
private boolean expired(FileStatus status, long expiry, String keyPrefix) {
|
||||
private void removeAuthoritativeFromParent(Path path,
|
||||
Collection<PathMetadata> oldChildren,
|
||||
Collection<PathMetadata> newChildren) {
|
||||
if (newChildren.size() != oldChildren.size()) {
|
||||
DirListingMetadata dlm =
|
||||
new DirListingMetadata(path, newChildren, false);
|
||||
localCache.put(path, new LocalMetadataEntry(dlm));
|
||||
if (!path.isRoot()) {
|
||||
DirListingMetadata parent = getDirListingMeta(path.getParent());
|
||||
if (parent != null) {
|
||||
parent.setAuthoritative(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean expired(PruneMode pruneMode, PathMetadata metadata,
|
||||
long cutoff, String keyPrefix) {
|
||||
final S3AFileStatus status = metadata.getFileStatus();
|
||||
final URI statusUri = status.getPath().toUri();
|
||||
|
||||
// remove the protocol from path string to be able to compare
|
||||
String bucket = status.getPath().toUri().getHost();
|
||||
String bucket = statusUri.getHost();
|
||||
String statusTranslatedPath = "";
|
||||
if(bucket != null && !bucket.isEmpty()){
|
||||
// if there's a bucket, (well defined host in Uri) the pathToParentKey
|
||||
@ -389,18 +402,33 @@ private boolean expired(FileStatus status, long expiry, String keyPrefix) {
|
||||
} else {
|
||||
// if there's no bucket in the path the pathToParentKey will fail, so
|
||||
// this is the fallback to get the path from status
|
||||
statusTranslatedPath = status.getPath().toUri().getPath();
|
||||
statusTranslatedPath = statusUri.getPath();
|
||||
}
|
||||
|
||||
// Note: S3 doesn't track modification time on directories, so for
|
||||
// consistency with the DynamoDB implementation we ignore that here
|
||||
return status.getModificationTime() < expiry && !status.isDirectory()
|
||||
&& statusTranslatedPath.startsWith(keyPrefix);
|
||||
boolean expired;
|
||||
switch (pruneMode) {
|
||||
case ALL_BY_MODTIME:
|
||||
// Note: S3 doesn't track modification time on directories, so for
|
||||
// consistency with the DynamoDB implementation we ignore that here
|
||||
expired = status.getModificationTime() < cutoff && !status.isDirectory()
|
||||
&& statusTranslatedPath.startsWith(keyPrefix);
|
||||
break;
|
||||
case TOMBSTONES_BY_LASTUPDATED:
|
||||
expired = metadata.getLastUpdated() < cutoff && metadata.isDeleted()
|
||||
&& statusTranslatedPath.startsWith(keyPrefix);
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException("Unsupported prune mode: "
|
||||
+ pruneMode);
|
||||
}
|
||||
|
||||
return expired;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static void deleteEntryByAncestor(Path ancestor,
|
||||
Cache<Path, LocalMetadataEntry> cache, boolean tombstone) {
|
||||
Cache<Path, LocalMetadataEntry> cache, boolean tombstone,
|
||||
ITtlTimeProvider ttlTimeProvider) {
|
||||
|
||||
cache.asMap().entrySet().stream()
|
||||
.filter(entry -> isAncestorOf(ancestor, entry.getKey()))
|
||||
@ -410,7 +438,9 @@ static void deleteEntryByAncestor(Path ancestor,
|
||||
if(meta.hasDirMeta()){
|
||||
cache.invalidate(path);
|
||||
} else if(tombstone && meta.hasPathMeta()){
|
||||
meta.setPathMetadata(PathMetadata.tombstone(path));
|
||||
final PathMetadata pmTombstone = PathMetadata.tombstone(path);
|
||||
pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
|
||||
meta.setPathMetadata(pmTombstone);
|
||||
} else {
|
||||
cache.invalidate(path);
|
||||
}
|
||||
@ -434,7 +464,8 @@ private static boolean isAncestorOf(Path ancestor, Path f) {
|
||||
* Update fileCache and dirCache to reflect deletion of file 'f'. Call with
|
||||
* lock held.
|
||||
*/
|
||||
private void deleteCacheEntries(Path path, boolean tombstone) {
|
||||
private void deleteCacheEntries(Path path, boolean tombstone,
|
||||
ITtlTimeProvider ttlTimeProvider) {
|
||||
LocalMetadataEntry entry = localCache.getIfPresent(path);
|
||||
// If there's no entry, delete should silently succeed
|
||||
// (based on MetadataStoreTestBase#testDeleteNonExisting)
|
||||
@ -448,6 +479,7 @@ private void deleteCacheEntries(Path path, boolean tombstone) {
|
||||
if(entry.hasPathMeta()){
|
||||
if (tombstone) {
|
||||
PathMetadata pmd = PathMetadata.tombstone(path);
|
||||
pmd.setLastUpdated(ttlTimeProvider.getNow());
|
||||
entry.setPathMetadata(pmd);
|
||||
} else {
|
||||
entry.setPathMetadata(null);
|
||||
@ -474,6 +506,7 @@ private void deleteCacheEntries(Path path, boolean tombstone) {
|
||||
LOG.debug("removing parent's entry for {} ", path);
|
||||
if (tombstone) {
|
||||
dir.markDeleted(path);
|
||||
dir.setLastUpdated(ttlTimeProvider.getNow());
|
||||
} else {
|
||||
dir.remove(path);
|
||||
}
|
||||
|
@ -63,16 +63,23 @@ public interface MetadataStore extends Closeable {
|
||||
* Deletes exactly one path, leaving a tombstone to prevent lingering,
|
||||
* inconsistent copies of it from being listed.
|
||||
*
|
||||
* Deleting an entry with a tombstone needs a
|
||||
* {@link org.apache.hadoop.fs.s3a.s3guard.S3Guard.TtlTimeProvider} because
|
||||
* the lastUpdated field of the record has to be updated to <pre>now</pre>.
|
||||
*
|
||||
* @param path the path to delete
|
||||
* @param ttlTimeProvider the time provider to set last_updated. Must not
|
||||
* be null.
|
||||
* @throws IOException if there is an error
|
||||
*/
|
||||
void delete(Path path) throws IOException;
|
||||
void delete(Path path, ITtlTimeProvider ttlTimeProvider)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Removes the record of exactly one path. Does not leave a tombstone (see
|
||||
* {@link MetadataStore#delete(Path)}. It is currently intended for testing
|
||||
* only, and a need to use it as part of normal FileSystem usage is not
|
||||
* anticipated.
|
||||
* {@link MetadataStore#delete(Path, ITtlTimeProvider)}. It is currently
|
||||
* intended for testing only, and a need to use it as part of normal
|
||||
* FileSystem usage is not anticipated.
|
||||
*
|
||||
* @param path the path to delete
|
||||
* @throws IOException if there is an error
|
||||
@ -88,10 +95,17 @@ public interface MetadataStore extends Closeable {
|
||||
* implementations must also update any stored {@code DirListingMetadata}
|
||||
* objects which track the parent of this file.
|
||||
*
|
||||
* Deleting a subtree with a tombstone needs a
|
||||
* {@link org.apache.hadoop.fs.s3a.s3guard.S3Guard.TtlTimeProvider} because
|
||||
* the lastUpdated field of all records have to be updated to <pre>now</pre>.
|
||||
*
|
||||
* @param path the root of the sub-tree to delete
|
||||
* @param ttlTimeProvider the time provider to set last_updated. Must not
|
||||
* be null.
|
||||
* @throws IOException if there is an error
|
||||
*/
|
||||
void deleteSubtree(Path path) throws IOException;
|
||||
void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Gets metadata for a path.
|
||||
@ -151,10 +165,13 @@ PathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
|
||||
* @param pathsToCreate Collection of all PathMetadata for the new paths
|
||||
* that were created at the destination of the rename
|
||||
* ().
|
||||
* @param ttlTimeProvider the time provider to set last_updated. Must not
|
||||
* be null.
|
||||
* @throws IOException if there is an error
|
||||
*/
|
||||
void move(Collection<Path> pathsToDelete,
|
||||
Collection<PathMetadata> pathsToCreate) throws IOException;
|
||||
Collection<PathMetadata> pathsToCreate,
|
||||
ITtlTimeProvider ttlTimeProvider) throws IOException;
|
||||
|
||||
/**
|
||||
* Saves metadata for exactly one path.
|
||||
@ -212,29 +229,54 @@ void move(Collection<Path> pathsToDelete,
|
||||
void destroy() throws IOException;
|
||||
|
||||
/**
|
||||
* Clear any metadata older than a specified time from the repository.
|
||||
* Implementations MUST clear file metadata, and MAY clear directory metadata
|
||||
* (s3a itself does not track modification time for directories).
|
||||
* Implementations may also choose to throw UnsupportedOperationException
|
||||
* istead. Note that modification times should be in UTC, as returned by
|
||||
* System.currentTimeMillis at the time of modification.
|
||||
* Prune method with two modes of operation:
|
||||
* <ul>
|
||||
* <li>
|
||||
* {@link PruneMode#ALL_BY_MODTIME}
|
||||
* Clear any metadata older than a specified mod_time from the store.
|
||||
* Note that this modification time is the S3 modification time from the
|
||||
* object's metadata - from the object store.
|
||||
* Implementations MUST clear file metadata, and MAY clear directory
|
||||
* metadata (s3a itself does not track modification time for directories).
|
||||
* Implementations may also choose to throw UnsupportedOperationException
|
||||
* instead. Note that modification times must be in UTC, as returned by
|
||||
* System.currentTimeMillis at the time of modification.
|
||||
* </li>
|
||||
* </ul>
|
||||
*
|
||||
* @param modTime Oldest modification time to allow
|
||||
* <ul>
|
||||
* <li>
|
||||
* {@link PruneMode#TOMBSTONES_BY_LASTUPDATED}
|
||||
* Clear any tombstone updated earlier than a specified time from the
|
||||
* store. Note that this last_updated is the time when the metadata
|
||||
* entry was last updated and maintained by the metadata store.
|
||||
* Implementations MUST clear file metadata, and MAY clear directory
|
||||
* metadata (s3a itself does not track modification time for directories).
|
||||
* Implementations may also choose to throw UnsupportedOperationException
|
||||
* instead. Note that last_updated must be in UTC, as returned by
|
||||
* System.currentTimeMillis at the time of modification.
|
||||
* </li>
|
||||
* </ul>
|
||||
*
|
||||
* @param pruneMode
|
||||
* @param cutoff Oldest time to allow (UTC)
|
||||
* @throws IOException if there is an error
|
||||
* @throws UnsupportedOperationException if not implemented
|
||||
*/
|
||||
void prune(long modTime) throws IOException, UnsupportedOperationException;
|
||||
void prune(PruneMode pruneMode, long cutoff) throws IOException,
|
||||
UnsupportedOperationException;
|
||||
|
||||
/**
|
||||
* Same as {@link MetadataStore#prune(long)}, but with an additional
|
||||
* keyPrefix parameter to filter the pruned keys with a prefix.
|
||||
* Same as {@link MetadataStore#prune(PruneMode, long)}, but with an
|
||||
* additional keyPrefix parameter to filter the pruned keys with a prefix.
|
||||
*
|
||||
* @param modTime Oldest modification time to allow
|
||||
* @param pruneMode
|
||||
* @param cutoff Oldest time to allow (UTC)
|
||||
* @param keyPrefix The prefix for the keys that should be removed
|
||||
* @throws IOException if there is an error
|
||||
* @throws UnsupportedOperationException if not implemented
|
||||
*/
|
||||
void prune(long modTime, String keyPrefix)
|
||||
void prune(PruneMode pruneMode, long cutoff, String keyPrefix)
|
||||
throws IOException, UnsupportedOperationException;
|
||||
|
||||
/**
|
||||
@ -252,4 +294,13 @@ void prune(long modTime, String keyPrefix)
|
||||
* @throws IOException if there is an error
|
||||
*/
|
||||
void updateParameters(Map<String, String> parameters) throws IOException;
|
||||
|
||||
/**
|
||||
* Modes of operation for prune.
|
||||
* For details see {@link MetadataStore#prune(PruneMode, long)}
|
||||
*/
|
||||
enum PruneMode {
|
||||
ALL_BY_MODTIME,
|
||||
TOMBSTONES_BY_LASTUPDATED
|
||||
}
|
||||
}
|
||||
|
@ -47,7 +47,8 @@ public void close() throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(Path path) throws IOException {
|
||||
public void delete(Path path, ITtlTimeProvider ttlTimeProvider)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -55,7 +56,8 @@ public void forgetMetadata(Path path) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteSubtree(Path path) throws IOException {
|
||||
public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -76,7 +78,8 @@ public DirListingMetadata listChildren(Path path) throws IOException {
|
||||
|
||||
@Override
|
||||
public void move(Collection<Path> pathsToDelete,
|
||||
Collection<PathMetadata> pathsToCreate) throws IOException {
|
||||
Collection<PathMetadata> pathsToCreate,
|
||||
ITtlTimeProvider ttlTimeProvider) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -96,11 +99,11 @@ public void destroy() throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prune(long modTime) {
|
||||
public void prune(PruneMode pruneMode, long cutoff) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prune(long modTime, String keyPrefix) {
|
||||
public void prune(PruneMode pruneMode, long cutoff, String keyPrefix) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -25,9 +25,13 @@
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
@ -46,6 +50,8 @@
|
||||
import org.apache.hadoop.fs.s3a.Tristate;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
|
||||
import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_PUT_PATH_LATENCY;
|
||||
import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_PUT_PATH_REQUEST;
|
||||
@ -142,15 +148,17 @@ static Class<? extends MetadataStore> getMetadataStoreClass(
|
||||
* @param ms MetadataStore to {@code put()} into.
|
||||
* @param status status to store
|
||||
* @param instrumentation instrumentation of the s3a file system
|
||||
* @param timeProvider Time provider to use when writing entries
|
||||
* @return The same status as passed in
|
||||
* @throws IOException if metadata store update failed
|
||||
*/
|
||||
@RetryTranslated
|
||||
public static S3AFileStatus putAndReturn(MetadataStore ms,
|
||||
S3AFileStatus status,
|
||||
S3AInstrumentation instrumentation) throws IOException {
|
||||
S3AInstrumentation instrumentation,
|
||||
ITtlTimeProvider timeProvider) throws IOException {
|
||||
long startTimeNano = System.nanoTime();
|
||||
ms.put(new PathMetadata(status));
|
||||
S3Guard.putWithTtl(ms, new PathMetadata(status), timeProvider);
|
||||
instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY,
|
||||
(System.nanoTime() - startTimeNano));
|
||||
instrumentation.incrementCounter(S3GUARD_METADATASTORE_PUT_PATH_REQUEST, 1);
|
||||
@ -196,7 +204,7 @@ public static S3AFileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) {
|
||||
* @param backingStatuses Directory listing from the backing store.
|
||||
* @param dirMeta Directory listing from MetadataStore. May be null.
|
||||
* @param isAuthoritative State of authoritative mode
|
||||
* @param timeProvider Time provider for testing.
|
||||
* @param timeProvider Time provider to use when updating entries
|
||||
* @return Final result of directory listing.
|
||||
* @throws IOException if metadata store update failed
|
||||
*/
|
||||
@ -242,7 +250,7 @@ public static FileStatus[] dirListingUnion(MetadataStore ms, Path path,
|
||||
if (status != null
|
||||
&& s.getModificationTime() > status.getModificationTime()) {
|
||||
LOG.debug("Update ms with newer metadata of: {}", status);
|
||||
ms.put(new PathMetadata(s));
|
||||
S3Guard.putWithTtl(ms, new PathMetadata(s), timeProvider);
|
||||
}
|
||||
}
|
||||
|
||||
@ -357,7 +365,7 @@ public static void makeDirsOrdered(MetadataStore ms, List<Path> dirs,
|
||||
}
|
||||
|
||||
// Batched put
|
||||
ms.put(pathMetas);
|
||||
S3Guard.putWithTtl(ms, pathMetas, timeProvider);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("MetadataStore#put() failure:", ioe);
|
||||
}
|
||||
@ -462,7 +470,8 @@ public static void addMoveAncestors(MetadataStore ms,
|
||||
}
|
||||
|
||||
public static void addAncestors(MetadataStore metadataStore,
|
||||
Path qualifiedPath, String username) throws IOException {
|
||||
Path qualifiedPath, String username, ITtlTimeProvider timeProvider)
|
||||
throws IOException {
|
||||
Collection<PathMetadata> newDirs = new ArrayList<>();
|
||||
Path parent = qualifiedPath.getParent();
|
||||
while (!parent.isRoot()) {
|
||||
@ -476,7 +485,7 @@ public static void addAncestors(MetadataStore metadataStore,
|
||||
}
|
||||
parent = parent.getParent();
|
||||
}
|
||||
metadataStore.put(newDirs);
|
||||
S3Guard.putWithTtl(metadataStore, newDirs, timeProvider);
|
||||
}
|
||||
|
||||
private static void addMoveStatus(Collection<Path> srcPaths,
|
||||
@ -513,17 +522,6 @@ public static void assertQualified(Path...paths) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This interface is defined for testing purposes.
|
||||
* TTL can be tested by implementing this interface and setting is as
|
||||
* {@code S3Guard.ttlTimeProvider}. By doing this, getNow() can return any
|
||||
* value preferred and flaky tests could be avoided.
|
||||
*/
|
||||
public interface ITtlTimeProvider {
|
||||
long getNow();
|
||||
long getAuthoritativeDirTtl();
|
||||
}
|
||||
|
||||
/**
|
||||
* Runtime implementation for TTL Time Provider interface.
|
||||
*/
|
||||
@ -534,34 +532,127 @@ public TtlTimeProvider(long authoritativeDirTtl) {
|
||||
this.authoritativeDirTtl = authoritativeDirTtl;
|
||||
}
|
||||
|
||||
public TtlTimeProvider(Configuration conf) {
|
||||
this.authoritativeDirTtl =
|
||||
conf.getTimeDuration(METADATASTORE_METADATA_TTL,
|
||||
DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNow() {
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
|
||||
@Override public long getAuthoritativeDirTtl() {
|
||||
@Override public long getMetadataTtl() {
|
||||
return authoritativeDirTtl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object o) {
|
||||
if (this == o) { return true; }
|
||||
if (o == null || getClass() != o.getClass()) { return false; }
|
||||
final TtlTimeProvider that = (TtlTimeProvider) o;
|
||||
return authoritativeDirTtl == that.authoritativeDirTtl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(authoritativeDirTtl);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder(
|
||||
"TtlTimeProvider{");
|
||||
sb.append("authoritativeDirTtl=").append(authoritativeDirTtl);
|
||||
sb.append(" millis}");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
public static void putWithTtl(MetadataStore ms, DirListingMetadata dirMeta,
|
||||
ITtlTimeProvider timeProvider)
|
||||
throws IOException {
|
||||
dirMeta.setLastUpdated(timeProvider.getNow());
|
||||
dirMeta.getListing()
|
||||
.forEach(pm -> pm.setLastUpdated(timeProvider.getNow()));
|
||||
ms.put(dirMeta);
|
||||
}
|
||||
|
||||
public static DirListingMetadata listChildrenWithTtl(MetadataStore ms,
|
||||
Path path, ITtlTimeProvider timeProvider)
|
||||
throws IOException {
|
||||
long ttl = timeProvider.getAuthoritativeDirTtl();
|
||||
public static void putWithTtl(MetadataStore ms, PathMetadata fileMeta,
|
||||
@Nullable ITtlTimeProvider timeProvider) throws IOException {
|
||||
if (timeProvider != null) {
|
||||
fileMeta.setLastUpdated(timeProvider.getNow());
|
||||
} else {
|
||||
LOG.debug("timeProvider is null, put {} without setting last_updated",
|
||||
fileMeta);
|
||||
}
|
||||
ms.put(fileMeta);
|
||||
}
|
||||
|
||||
public static void putWithTtl(MetadataStore ms,
|
||||
Collection<PathMetadata> fileMetas,
|
||||
@Nullable ITtlTimeProvider timeProvider)
|
||||
throws IOException {
|
||||
if (timeProvider != null) {
|
||||
final long now = timeProvider.getNow();
|
||||
fileMetas.forEach(fileMeta -> fileMeta.setLastUpdated(now));
|
||||
} else {
|
||||
LOG.debug("timeProvider is null, put {} without setting last_updated",
|
||||
fileMetas);
|
||||
}
|
||||
ms.put(fileMetas);
|
||||
}
|
||||
|
||||
public static PathMetadata getWithTtl(MetadataStore ms, Path path,
|
||||
@Nullable ITtlTimeProvider timeProvider) throws IOException {
|
||||
final PathMetadata pathMetadata = ms.get(path);
|
||||
// if timeProvider is null let's return with what the ms has
|
||||
if (timeProvider == null) {
|
||||
LOG.debug("timeProvider is null, returning pathMetadata as is");
|
||||
return pathMetadata;
|
||||
}
|
||||
|
||||
long ttl = timeProvider.getMetadataTtl();
|
||||
|
||||
if (pathMetadata != null) {
|
||||
// Special case: the pathmetadata's last updated is 0. This can happen
|
||||
// eg. with an old db using this implementation
|
||||
if (pathMetadata.getLastUpdated() == 0) {
|
||||
LOG.debug("PathMetadata TTL for {} is 0, so it will be returned as "
|
||||
+ "not expired.");
|
||||
return pathMetadata;
|
||||
}
|
||||
|
||||
if (!pathMetadata.isExpired(ttl, timeProvider.getNow())) {
|
||||
return pathMetadata;
|
||||
} else {
|
||||
LOG.debug("PathMetadata TTl for {} is expired in metadata store.",
|
||||
path);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public static DirListingMetadata listChildrenWithTtl(MetadataStore ms,
|
||||
Path path, @Nullable ITtlTimeProvider timeProvider)
|
||||
throws IOException {
|
||||
DirListingMetadata dlm = ms.listChildren(path);
|
||||
|
||||
if(dlm != null && dlm.isAuthoritative()
|
||||
if (timeProvider == null) {
|
||||
LOG.debug("timeProvider is null, returning DirListingMetadata as is");
|
||||
return dlm;
|
||||
}
|
||||
|
||||
long ttl = timeProvider.getMetadataTtl();
|
||||
|
||||
if (dlm != null && dlm.isAuthoritative()
|
||||
&& dlm.isExpired(ttl, timeProvider.getNow())) {
|
||||
dlm.setAuthoritative(false);
|
||||
}
|
||||
return dlm;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -707,7 +707,8 @@ private void putParentsIfNotPresent(FileStatus f) throws IOException {
|
||||
}
|
||||
S3AFileStatus dir = DynamoDBMetadataStore.makeDirStatus(parent,
|
||||
f.getOwner());
|
||||
getStore().put(new PathMetadata(dir));
|
||||
S3Guard.putWithTtl(getStore(), new PathMetadata(dir),
|
||||
getFilesystem().getTtlTimeProvider());
|
||||
dirCache.add(parent);
|
||||
parent = parent.getParent();
|
||||
}
|
||||
@ -741,7 +742,8 @@ private long importDir(FileStatus status) throws IOException {
|
||||
located.getVersionId());
|
||||
}
|
||||
putParentsIfNotPresent(child);
|
||||
getStore().put(new PathMetadata(child));
|
||||
S3Guard.putWithTtl(getStore(), new PathMetadata(child),
|
||||
getFilesystem().getTtlTimeProvider());
|
||||
items++;
|
||||
}
|
||||
return items;
|
||||
@ -1073,7 +1075,8 @@ public int run(String[] args, PrintStream out) throws
|
||||
}
|
||||
|
||||
try {
|
||||
getStore().prune(divide, keyPrefix);
|
||||
getStore().prune(MetadataStore.PruneMode.ALL_BY_MODTIME, divide,
|
||||
keyPrefix);
|
||||
} catch (UnsupportedOperationException e){
|
||||
errorln("Prune operation not supported in metadata store.");
|
||||
}
|
||||
|
@ -181,8 +181,8 @@ removed on `S3AFileSystem` level.
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.metadatastore.authoritative.dir.ttl</name>
|
||||
<value>3600000</value>
|
||||
<name>fs.s3a.metadatastore.metadata.ttl</name>
|
||||
<value>15m</value>
|
||||
</property>
|
||||
```
|
||||
|
||||
|
@ -24,6 +24,7 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.junit.Assume;
|
||||
@ -37,20 +38,32 @@
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
|
||||
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.readBytesToString;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.metadataStorePersistsAuthoritativeBit;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
*
|
||||
@ -115,7 +128,7 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
|
||||
* Test array for parameterized test runs.
|
||||
* @return a list of parameter tuples.
|
||||
*/
|
||||
@Parameterized.Parameters
|
||||
@Parameterized.Parameters(name="auth={0}")
|
||||
public static Collection<Object[]> params() {
|
||||
return Arrays.asList(new Object[][]{
|
||||
{true}, {false}
|
||||
@ -190,8 +203,11 @@ private S3AFileSystem createGuardedFS(boolean authoritativeMode)
|
||||
URI uri = testFS.getUri();
|
||||
|
||||
removeBaseAndBucketOverrides(uri.getHost(), config,
|
||||
METADATASTORE_AUTHORITATIVE);
|
||||
METADATASTORE_AUTHORITATIVE,
|
||||
METADATASTORE_METADATA_TTL);
|
||||
config.setBoolean(METADATASTORE_AUTHORITATIVE, authoritativeMode);
|
||||
config.setLong(METADATASTORE_METADATA_TTL,
|
||||
DEFAULT_METADATASTORE_METADATA_TTL);
|
||||
final S3AFileSystem gFs = createFS(uri, config);
|
||||
// set back the same metadata store instance
|
||||
gFs.setMetadataStore(realMs);
|
||||
@ -271,6 +287,292 @@ public void testListingDelete() throws Exception {
|
||||
deleteFileInListing();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that tombstone expiry is implemented, so if a file is created raw
|
||||
* while the tombstone exist in ms for with the same name then S3Guard will
|
||||
* check S3 for the file.
|
||||
*
|
||||
* Seq: create guarded; delete guarded; create raw (same path); read guarded;
|
||||
* This will fail if no tombstone expiry is set
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testTombstoneExpiryGuardedDeleteRawCreate() throws Exception {
|
||||
boolean allowAuthoritative = authoritative;
|
||||
Path testFilePath = path("TEGDRC-" + UUID.randomUUID() + "/file");
|
||||
LOG.info("Allow authoritative param: {}", allowAuthoritative);
|
||||
String originalText = "some test";
|
||||
String newText = "the new originalText for test";
|
||||
|
||||
final ITtlTimeProvider originalTimeProvider =
|
||||
guardedFs.getTtlTimeProvider();
|
||||
try {
|
||||
final AtomicLong now = new AtomicLong(1);
|
||||
final AtomicLong metadataTtl = new AtomicLong(1);
|
||||
|
||||
// SET TTL TIME PROVIDER FOR TESTING
|
||||
ITtlTimeProvider testTimeProvider =
|
||||
new ITtlTimeProvider() {
|
||||
@Override public long getNow() {
|
||||
return now.get();
|
||||
}
|
||||
|
||||
@Override public long getMetadataTtl() {
|
||||
return metadataTtl.get();
|
||||
}
|
||||
};
|
||||
guardedFs.setTtlTimeProvider(testTimeProvider);
|
||||
|
||||
// CREATE GUARDED
|
||||
createAndAwaitFs(guardedFs, testFilePath, originalText);
|
||||
|
||||
// DELETE GUARDED
|
||||
deleteGuardedTombstoned(guardedFs, testFilePath, now);
|
||||
|
||||
// CREATE RAW
|
||||
createAndAwaitFs(rawFS, testFilePath, newText);
|
||||
|
||||
// CHECK LISTING - THE FILE SHOULD NOT BE THERE, EVEN IF IT'S CREATED RAW
|
||||
checkListingDoesNotContainPath(guardedFs, testFilePath);
|
||||
|
||||
// CHANGE TTL SO ENTRY (& TOMBSTONE METADATA) WILL EXPIRE
|
||||
long willExpire = now.get() + metadataTtl.get() + 1L;
|
||||
now.set(willExpire);
|
||||
LOG.info("willExpire: {}, ttlNow: {}; ttlTTL: {}", willExpire,
|
||||
testTimeProvider.getNow(), testTimeProvider.getMetadataTtl());
|
||||
|
||||
// READ GUARDED
|
||||
String newRead = readBytesToString(guardedFs, testFilePath,
|
||||
newText.length());
|
||||
|
||||
// CHECK LISTING - THE FILE SHOULD BE THERE, TOMBSTONE EXPIRED
|
||||
checkListingContainsPath(guardedFs, testFilePath);
|
||||
|
||||
// we can assert that the originalText is the new one, which created raw
|
||||
LOG.info("Old: {}, New: {}, Read: {}", originalText, newText, newRead);
|
||||
assertEquals("The text should be modified with a new.", newText,
|
||||
newRead);
|
||||
} finally {
|
||||
guardedFs.delete(testFilePath, true);
|
||||
guardedFs.setTtlTimeProvider(originalTimeProvider);
|
||||
}
|
||||
}
|
||||
|
||||
private void createAndAwaitFs(S3AFileSystem fs, Path testFilePath,
|
||||
String text) throws Exception {
|
||||
writeTextFile(fs, testFilePath, text, true);
|
||||
final FileStatus newStatus = awaitFileStatus(fs, testFilePath);
|
||||
assertNotNull("Newly created file status should not be null.", newStatus);
|
||||
}
|
||||
|
||||
private void deleteGuardedTombstoned(S3AFileSystem guarded,
|
||||
Path testFilePath, AtomicLong now) throws Exception {
|
||||
guarded.delete(testFilePath, true);
|
||||
|
||||
final PathMetadata metadata =
|
||||
guarded.getMetadataStore().get(testFilePath);
|
||||
assertNotNull("Created file metadata should not be null in ms",
|
||||
metadata);
|
||||
assertEquals("Created file metadata last_updated should equal with "
|
||||
+ "mocked now", now.get(), metadata.getLastUpdated());
|
||||
|
||||
intercept(FileNotFoundException.class, testFilePath.toString(),
|
||||
"This file should throw FNFE when reading through "
|
||||
+ "the guarded fs, and the metadatastore tombstoned the file.",
|
||||
() -> guarded.getFileStatus(testFilePath));
|
||||
}
|
||||
|
||||
/**
|
||||
* createNonRecursive must fail if the parent directory has been deleted,
|
||||
* and succeed if the tombstone has expired and the directory has been
|
||||
* created out of band.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateNonRecursiveFailsIfParentDeleted() throws Exception {
|
||||
LOG.info("Authoritative mode: {}", authoritative);
|
||||
|
||||
String dirToDelete = methodName + UUID.randomUUID().toString();
|
||||
String fileToTry = dirToDelete + "/theFileToTry";
|
||||
|
||||
final Path dirPath = path(dirToDelete);
|
||||
final Path filePath = path(fileToTry);
|
||||
|
||||
// Create a directory with
|
||||
ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
|
||||
ITtlTimeProvider originalTimeProvider = guardedFs.getTtlTimeProvider();
|
||||
|
||||
try {
|
||||
guardedFs.setTtlTimeProvider(mockTimeProvider);
|
||||
when(mockTimeProvider.getNow()).thenReturn(100L);
|
||||
when(mockTimeProvider.getMetadataTtl()).thenReturn(5L);
|
||||
|
||||
// CREATE DIRECTORY
|
||||
guardedFs.mkdirs(dirPath);
|
||||
|
||||
// DELETE DIRECTORY
|
||||
guardedFs.delete(dirPath, true);
|
||||
|
||||
// WRITE TO DELETED DIRECTORY - FAIL
|
||||
intercept(FileNotFoundException.class,
|
||||
dirToDelete,
|
||||
"createNonRecursive must fail if the parent directory has been deleted.",
|
||||
() -> createNonRecursive(guardedFs, filePath));
|
||||
|
||||
// CREATE THE DIRECTORY RAW
|
||||
rawFS.mkdirs(dirPath);
|
||||
awaitFileStatus(rawFS, dirPath);
|
||||
|
||||
// SET TIME SO METADATA EXPIRES
|
||||
when(mockTimeProvider.getNow()).thenReturn(110L);
|
||||
|
||||
// WRITE TO DELETED DIRECTORY - SUCCESS
|
||||
createNonRecursive(guardedFs, filePath);
|
||||
|
||||
} finally {
|
||||
guardedFs.delete(filePath, true);
|
||||
guardedFs.delete(dirPath, true);
|
||||
guardedFs.setTtlTimeProvider(originalTimeProvider);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When lastUpdated = 0 the entry should not expire. This is a special case
|
||||
* eg. for old metadata entries
|
||||
*/
|
||||
@Test
|
||||
public void testLastUpdatedZeroWontExpire() throws Exception {
|
||||
LOG.info("Authoritative mode: {}", authoritative);
|
||||
|
||||
String testFile = methodName + UUID.randomUUID().toString() +
|
||||
"/theFileToTry";
|
||||
|
||||
long ttl = 10L;
|
||||
final Path filePath = path(testFile);
|
||||
|
||||
ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
|
||||
ITtlTimeProvider originalTimeProvider = guardedFs.getTtlTimeProvider();
|
||||
|
||||
try {
|
||||
guardedFs.setTtlTimeProvider(mockTimeProvider);
|
||||
when(mockTimeProvider.getMetadataTtl()).thenReturn(ttl);
|
||||
|
||||
// create a file while the NOW is 0, so it will set 0 as the last_updated
|
||||
when(mockTimeProvider.getNow()).thenReturn(0L);
|
||||
touch(guardedFs, filePath);
|
||||
deleteFile(guardedFs, filePath);
|
||||
|
||||
final PathMetadata pathMetadata =
|
||||
guardedFs.getMetadataStore().get(filePath);
|
||||
assertNotNull("pathMetadata should not be null after deleting with "
|
||||
+ "tombstones", pathMetadata);
|
||||
assertEquals("pathMetadata lastUpdated field should be 0", 0,
|
||||
pathMetadata.getLastUpdated());
|
||||
|
||||
// set the time, so the metadata would expire
|
||||
when(mockTimeProvider.getNow()).thenReturn(2*ttl);
|
||||
intercept(FileNotFoundException.class, filePath.toString(),
|
||||
"This file should throw FNFE when reading through "
|
||||
+ "the guarded fs, and the metadatastore tombstoned the file. "
|
||||
+ "The tombstone won't expire if lastUpdated is set to 0.",
|
||||
() -> guardedFs.getFileStatus(filePath));
|
||||
|
||||
} finally {
|
||||
guardedFs.delete(filePath, true);
|
||||
guardedFs.setTtlTimeProvider(originalTimeProvider);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 1. File is deleted in the guarded fs.
|
||||
* 2. File is replaced in the raw fs.
|
||||
* 3. File is deleted in the guarded FS after the expiry time.
|
||||
* 4. File MUST NOT exist in raw FS.
|
||||
*/
|
||||
@Test
|
||||
public void deleteAfterTombstoneExpiryOobCreate() throws Exception {
|
||||
LOG.info("Authoritative mode: {}", authoritative);
|
||||
|
||||
String testFile = methodName + UUID.randomUUID().toString() +
|
||||
"/theFileToTry";
|
||||
|
||||
long ttl = 10L;
|
||||
final Path filePath = path(testFile);
|
||||
|
||||
ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
|
||||
ITtlTimeProvider originalTimeProvider = guardedFs.getTtlTimeProvider();
|
||||
|
||||
try {
|
||||
guardedFs.setTtlTimeProvider(mockTimeProvider);
|
||||
when(mockTimeProvider.getMetadataTtl()).thenReturn(ttl);
|
||||
|
||||
// CREATE AND DELETE WITH GUARDED FS
|
||||
when(mockTimeProvider.getNow()).thenReturn(100L);
|
||||
touch(guardedFs, filePath);
|
||||
deleteFile(guardedFs, filePath);
|
||||
|
||||
final PathMetadata pathMetadata =
|
||||
guardedFs.getMetadataStore().get(filePath);
|
||||
assertNotNull("pathMetadata should not be null after deleting with "
|
||||
+ "tombstones", pathMetadata);
|
||||
|
||||
// REPLACE WITH RAW FS
|
||||
touch(rawFS, filePath);
|
||||
awaitFileStatus(rawFS, filePath);
|
||||
|
||||
// SET EXPIRY TIME, SO THE TOMBSTONE IS EXPIRED
|
||||
when(mockTimeProvider.getNow()).thenReturn(100L + 2 * ttl);
|
||||
|
||||
// DELETE IN GUARDED FS
|
||||
guardedFs.delete(filePath, true);
|
||||
|
||||
// FILE MUST NOT EXIST IN RAW
|
||||
intercept(FileNotFoundException.class, filePath.toString(),
|
||||
"This file should throw FNFE when reading through "
|
||||
+ "the raw fs, and the guarded fs deleted the file.",
|
||||
() -> rawFS.getFileStatus(filePath));
|
||||
|
||||
} finally {
|
||||
guardedFs.delete(filePath, true);
|
||||
guardedFs.setTtlTimeProvider(originalTimeProvider);
|
||||
}
|
||||
}
|
||||
|
||||
private void checkListingDoesNotContainPath(S3AFileSystem fs, Path filePath)
|
||||
throws IOException {
|
||||
final RemoteIterator<LocatedFileStatus> listIter =
|
||||
fs.listFiles(filePath.getParent(), false);
|
||||
while (listIter.hasNext()) {
|
||||
final LocatedFileStatus lfs = listIter.next();
|
||||
assertNotEquals("The tombstone has not been expired, so must not be"
|
||||
+ " listed.", filePath, lfs.getPath());
|
||||
}
|
||||
LOG.info("{}; file omitted from listFiles listing as expected.", filePath);
|
||||
|
||||
final FileStatus[] fileStatuses = fs.listStatus(filePath.getParent());
|
||||
for (FileStatus fileStatus : fileStatuses) {
|
||||
assertNotEquals("The tombstone has not been expired, so must not be"
|
||||
+ " listed.", filePath, fileStatus.getPath());
|
||||
}
|
||||
LOG.info("{}; file omitted from listStatus as expected.", filePath);
|
||||
}
|
||||
|
||||
private void checkListingContainsPath(S3AFileSystem fs, Path filePath)
|
||||
throws IOException {
|
||||
final RemoteIterator<LocatedFileStatus> listIter =
|
||||
fs.listFiles(filePath.getParent(), false);
|
||||
|
||||
while (listIter.hasNext()) {
|
||||
final LocatedFileStatus lfs = listIter.next();
|
||||
assertEquals(filePath, lfs.getPath());
|
||||
}
|
||||
|
||||
final FileStatus[] fileStatuses = fs.listStatus(filePath.getParent());
|
||||
for (FileStatus fileStatus : fileStatuses)
|
||||
assertEquals("The file should be listed in fs.listStatus",
|
||||
filePath, fileStatus.getPath());
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform an out-of-band delete.
|
||||
* @param testFilePath filename
|
||||
@ -384,12 +686,18 @@ private void overwriteFileInListing(String firstText, String secondText)
|
||||
// Create initial statusIterator with guarded ms
|
||||
writeTextFile(guardedFs, testFilePath, firstText, true);
|
||||
// and cache the value for later
|
||||
final FileStatus origStatus = awaitFileStatus(rawFS, testFilePath);
|
||||
final S3AFileStatus origStatus = awaitFileStatus(rawFS, testFilePath);
|
||||
assertNotNull("No etag in raw status " + origStatus,
|
||||
origStatus.getETag());
|
||||
|
||||
// Do a listing to cache the lists. Should be authoritative if it's set.
|
||||
final FileStatus[] origList = guardedFs.listStatus(testDirPath);
|
||||
final S3AFileStatus[] origList = (S3AFileStatus[]) guardedFs.listStatus(
|
||||
testDirPath);
|
||||
assertArraySize("Added one file to the new dir, so the number of "
|
||||
+ "files in the dir should be one.", 1, origList);
|
||||
S3AFileStatus origGuardedFileStatus = origList[0];
|
||||
assertNotNull("No etag in origGuardedFileStatus" + origGuardedFileStatus,
|
||||
origGuardedFileStatus.getETag());
|
||||
final DirListingMetadata dirListingMetadata =
|
||||
realMs.listChildren(guardedFs.qualify(testDirPath));
|
||||
assertListingAuthority(allowAuthoritative, dirListingMetadata);
|
||||
@ -406,7 +714,8 @@ private void overwriteFileInListing(String firstText, String secondText)
|
||||
final FileStatus rawFileStatus = awaitFileStatus(rawFS, testFilePath);
|
||||
|
||||
// check listing in guarded store.
|
||||
final FileStatus[] modList = guardedFs.listStatus(testDirPath);
|
||||
final S3AFileStatus[] modList = (S3AFileStatus[]) guardedFs.listStatus(
|
||||
testDirPath);
|
||||
assertArraySize("Added one file to the new dir then modified it, "
|
||||
+ "so the number of files in the dir should be one.", 1,
|
||||
modList);
|
||||
@ -479,6 +788,24 @@ private void verifyFileStatusAsExpected(final String firstText,
|
||||
expectedLength, guardedLength);
|
||||
}
|
||||
}
|
||||
// check etag. This relies on first and second text being different.
|
||||
final S3AFileStatus rawS3AFileStatus = (S3AFileStatus) rawFileStatus;
|
||||
final S3AFileStatus guardedS3AFileStatus = (S3AFileStatus)
|
||||
guardedFileStatus;
|
||||
final S3AFileStatus origS3AFileStatus = (S3AFileStatus) origStatus;
|
||||
assertNotEquals(
|
||||
"raw status still no to date with changes" + stats,
|
||||
origS3AFileStatus.getETag(), rawS3AFileStatus.getETag());
|
||||
if (allowAuthoritative) {
|
||||
// expect the etag to be out of sync
|
||||
assertNotEquals(
|
||||
"etag in authoritative table with " + stats,
|
||||
rawS3AFileStatus.getETag(), guardedS3AFileStatus.getETag());
|
||||
} else {
|
||||
assertEquals(
|
||||
"etag in non-authoritative table with " + stats,
|
||||
rawS3AFileStatus.getETag(), guardedS3AFileStatus.getETag());
|
||||
}
|
||||
// Next: modification time.
|
||||
long rawModTime = rawFileStatus.getModificationTime();
|
||||
long guardedModTime = guardedFileStatus.getModificationTime();
|
||||
@ -631,12 +958,18 @@ private void awaitDeletedFileDisappearance(final S3AFileSystem fs,
|
||||
* @return the file status.
|
||||
* @throws Exception failure
|
||||
*/
|
||||
private FileStatus awaitFileStatus(S3AFileSystem fs,
|
||||
private S3AFileStatus awaitFileStatus(S3AFileSystem fs,
|
||||
final Path testFilePath)
|
||||
throws Exception {
|
||||
return eventually(
|
||||
return (S3AFileStatus) eventually(
|
||||
STABILIZATION_TIME, PROBE_INTERVAL_MILLIS,
|
||||
() -> fs.getFileStatus(testFilePath));
|
||||
}
|
||||
|
||||
private FSDataOutputStream createNonRecursive(FileSystem fs, Path path)
|
||||
throws Exception {
|
||||
return fs
|
||||
.createNonRecursive(path, false, 4096, (short) 3, (short) 4096, null);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -18,13 +18,22 @@
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
||||
|
||||
import org.junit.Assume;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
|
||||
@ -36,8 +45,37 @@
|
||||
/**
|
||||
* These tests are testing the S3Guard TTL (time to live) features.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class ITestS3GuardTtl extends AbstractS3ATestBase {
|
||||
|
||||
private final boolean authoritative;
|
||||
|
||||
/**
|
||||
* Test array for parameterized test runs.
|
||||
* @return a list of parameter tuples.
|
||||
*/
|
||||
@Parameterized.Parameters
|
||||
public static Collection<Object[]> params() {
|
||||
return Arrays.asList(new Object[][]{
|
||||
{true}, {false}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* By changing the method name, the thread name is changed and
|
||||
* so you can see in the logs which mode is being tested.
|
||||
* @return a string to use for the thread namer.
|
||||
*/
|
||||
@Override
|
||||
protected String getMethodName() {
|
||||
return super.getMethodName() +
|
||||
(authoritative ? "-auth" : "-nonauth");
|
||||
}
|
||||
|
||||
public ITestS3GuardTtl(boolean authoritative) {
|
||||
this.authoritative = authoritative;
|
||||
}
|
||||
|
||||
/**
|
||||
* Patch the configuration - this test needs disabled filesystem caching.
|
||||
* These tests modify the fs instance that would cause flaky tests.
|
||||
@ -47,11 +85,15 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
|
||||
protected Configuration createConfiguration() {
|
||||
Configuration configuration = super.createConfiguration();
|
||||
S3ATestUtils.disableFilesystemCaching(configuration);
|
||||
return S3ATestUtils.prepareTestConfiguration(configuration);
|
||||
configuration =
|
||||
S3ATestUtils.prepareTestConfiguration(configuration);
|
||||
configuration.setBoolean(METADATASTORE_AUTHORITATIVE, authoritative);
|
||||
return configuration;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDirectoryListingAuthoritativeTtl() throws Exception {
|
||||
LOG.info("Authoritative mode: {}", authoritative);
|
||||
|
||||
final S3AFileSystem fs = getFileSystem();
|
||||
Assume.assumeTrue(fs.hasMetadataStore());
|
||||
@ -64,12 +106,12 @@ public void testDirectoryListingAuthoritativeTtl() throws Exception {
|
||||
Assume.assumeTrue("MetadataStore should be authoritative for this test",
|
||||
isMetadataStoreAuthoritative(getFileSystem().getConf()));
|
||||
|
||||
S3Guard.ITtlTimeProvider mockTimeProvider =
|
||||
mock(S3Guard.ITtlTimeProvider.class);
|
||||
S3Guard.ITtlTimeProvider restoreTimeProvider = fs.getTtlTimeProvider();
|
||||
ITtlTimeProvider mockTimeProvider =
|
||||
mock(ITtlTimeProvider.class);
|
||||
ITtlTimeProvider restoreTimeProvider = fs.getTtlTimeProvider();
|
||||
fs.setTtlTimeProvider(mockTimeProvider);
|
||||
when(mockTimeProvider.getNow()).thenReturn(100L);
|
||||
when(mockTimeProvider.getAuthoritativeDirTtl()).thenReturn(1L);
|
||||
when(mockTimeProvider.getMetadataTtl()).thenReturn(1L);
|
||||
|
||||
Path dir = path("ttl/");
|
||||
Path file = path("ttl/afile");
|
||||
@ -102,4 +144,146 @@ public void testDirectoryListingAuthoritativeTtl() throws Exception {
|
||||
fs.setTtlTimeProvider(restoreTimeProvider);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFileMetadataExpiresTtl() throws Exception {
|
||||
LOG.info("Authoritative mode: {}", authoritative);
|
||||
|
||||
Path fileExpire1 = path("expirettl-" + UUID.randomUUID());
|
||||
Path fileExpire2 = path("expirettl-" + UUID.randomUUID());
|
||||
Path fileRetain = path("expirettl-" + UUID.randomUUID());
|
||||
|
||||
final S3AFileSystem fs = getFileSystem();
|
||||
Assume.assumeTrue(fs.hasMetadataStore());
|
||||
final MetadataStore ms = fs.getMetadataStore();
|
||||
|
||||
ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
|
||||
ITtlTimeProvider originalTimeProvider = fs.getTtlTimeProvider();
|
||||
|
||||
try {
|
||||
fs.setTtlTimeProvider(mockTimeProvider);
|
||||
when(mockTimeProvider.getMetadataTtl()).thenReturn(5L);
|
||||
|
||||
// set the time, so the fileExpire1 will expire
|
||||
when(mockTimeProvider.getNow()).thenReturn(100L);
|
||||
touch(fs, fileExpire1);
|
||||
// set the time, so fileExpire2 will expire
|
||||
when(mockTimeProvider.getNow()).thenReturn(101L);
|
||||
touch(fs, fileExpire2);
|
||||
// set the time, so fileRetain won't expire
|
||||
when(mockTimeProvider.getNow()).thenReturn(109L);
|
||||
touch(fs, fileRetain);
|
||||
final FileStatus origFileRetainStatus = fs.getFileStatus(fileRetain);
|
||||
// change time, so the first two file metadata is expired
|
||||
when(mockTimeProvider.getNow()).thenReturn(110L);
|
||||
|
||||
// metadata is expired so this should refresh the metadata with
|
||||
// last_updated to the getNow()
|
||||
final FileStatus fileExpire1Status = fs.getFileStatus(fileExpire1);
|
||||
assertNotNull(fileExpire1Status);
|
||||
assertEquals(110L, ms.get(fileExpire1).getLastUpdated());
|
||||
|
||||
// metadata is expired so this should refresh the metadata with
|
||||
// last_updated to the getNow()
|
||||
final FileStatus fileExpire2Status = fs.getFileStatus(fileExpire2);
|
||||
assertNotNull(fileExpire2Status);
|
||||
assertEquals(110L, ms.get(fileExpire2).getLastUpdated());
|
||||
|
||||
final FileStatus fileRetainStatus = fs.getFileStatus(fileRetain);
|
||||
assertEquals("Modification time of these files should be equal.",
|
||||
origFileRetainStatus.getModificationTime(),
|
||||
fileRetainStatus.getModificationTime());
|
||||
assertNotNull(fileRetainStatus);
|
||||
assertEquals(109L, ms.get(fileRetain).getLastUpdated());
|
||||
} finally {
|
||||
fs.delete(fileExpire1, true);
|
||||
fs.delete(fileExpire2, true);
|
||||
fs.delete(fileRetain, true);
|
||||
fs.setTtlTimeProvider(originalTimeProvider);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* create(tombstone file) must succeed irrespective of overwrite flag.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateOnTombstonedFileSucceeds() throws Exception {
|
||||
LOG.info("Authoritative mode: {}", authoritative);
|
||||
final S3AFileSystem fs = getFileSystem();
|
||||
|
||||
String fileToTry = methodName + UUID.randomUUID().toString();
|
||||
|
||||
final Path filePath = path(fileToTry);
|
||||
|
||||
// Create a directory with
|
||||
ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
|
||||
ITtlTimeProvider originalTimeProvider = fs.getTtlTimeProvider();
|
||||
|
||||
try {
|
||||
fs.setTtlTimeProvider(mockTimeProvider);
|
||||
when(mockTimeProvider.getNow()).thenReturn(100L);
|
||||
when(mockTimeProvider.getMetadataTtl()).thenReturn(5L);
|
||||
|
||||
// CREATE A FILE
|
||||
touch(fs, filePath);
|
||||
|
||||
// DELETE THE FILE - TOMBSTONE
|
||||
fs.delete(filePath, true);
|
||||
|
||||
// CREATE THE SAME FILE WITHOUT ERROR DESPITE THE TOMBSTONE
|
||||
touch(fs, filePath);
|
||||
|
||||
} finally {
|
||||
fs.delete(filePath, true);
|
||||
fs.setTtlTimeProvider(originalTimeProvider);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* create("parent has tombstone") must always succeed (We dont check the
|
||||
* parent), but after the file has been written, all entries up the tree
|
||||
* must be valid. That is: the putAncestor code will correct everything
|
||||
*/
|
||||
@Test
|
||||
public void testCreateParentHasTombstone() throws Exception {
|
||||
LOG.info("Authoritative mode: {}", authoritative);
|
||||
final S3AFileSystem fs = getFileSystem();
|
||||
|
||||
String dirToDelete = methodName + UUID.randomUUID().toString();
|
||||
String fileToTry = dirToDelete + "/theFileToTry";
|
||||
|
||||
final Path dirPath = path(dirToDelete);
|
||||
final Path filePath = path(fileToTry);
|
||||
|
||||
// Create a directory with
|
||||
ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
|
||||
ITtlTimeProvider originalTimeProvider = fs.getTtlTimeProvider();
|
||||
|
||||
try {
|
||||
fs.setTtlTimeProvider(mockTimeProvider);
|
||||
when(mockTimeProvider.getNow()).thenReturn(100L);
|
||||
when(mockTimeProvider.getMetadataTtl()).thenReturn(5L);
|
||||
|
||||
// CREATE DIRECTORY
|
||||
fs.mkdirs(dirPath);
|
||||
|
||||
// DELETE DIRECTORY
|
||||
fs.delete(dirPath, true);
|
||||
|
||||
// WRITE TO DELETED DIRECTORY - SUCCESS
|
||||
touch(fs, filePath);
|
||||
|
||||
// SET TIME SO METADATA EXPIRES
|
||||
when(mockTimeProvider.getNow()).thenReturn(110L);
|
||||
|
||||
// WRITE TO DELETED DIRECTORY - SUCCESS
|
||||
touch(fs, filePath);
|
||||
|
||||
} finally {
|
||||
fs.delete(filePath, true);
|
||||
fs.delete(dirPath, true);
|
||||
fs.setTtlTimeProvider(originalTimeProvider);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -280,7 +280,7 @@ private void testPruneCommand(Configuration cmdConf, Path parent,
|
||||
"This child should have been kept (prefix restriction).", 1);
|
||||
} finally {
|
||||
getFileSystem().delete(parent, true);
|
||||
ms.prune(Long.MAX_VALUE);
|
||||
ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, Long.MAX_VALUE);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -230,6 +230,12 @@ public void tearDown() throws Exception {
|
||||
IOUtils.cleanupWithLogger(LOG, fileSystem);
|
||||
}
|
||||
|
||||
@Override protected String getPathStringForPrune(String path)
|
||||
throws Exception {
|
||||
String b = getTestBucketName(getContract().getFileSystem().getConf());
|
||||
return "/" + b + "/dir2";
|
||||
}
|
||||
|
||||
/**
|
||||
* Each contract has its own S3AFileSystem and DynamoDBMetadataStore objects.
|
||||
*/
|
||||
@ -437,7 +443,7 @@ private void doTestBatchWrite(int numDelete, int numPut,
|
||||
}
|
||||
|
||||
// move the old paths to new paths and verify
|
||||
ms.move(pathsToDelete, newMetas);
|
||||
ms.move(pathsToDelete, newMetas, getTtlTimeProvider());
|
||||
assertEquals(0, ms.listChildren(oldDir).withoutTombstones().numEntries());
|
||||
if (newMetas != null) {
|
||||
assertTrue(CollectionUtils
|
||||
@ -650,7 +656,7 @@ public void testMovePopulatesAncestors() throws IOException {
|
||||
1024, false))
|
||||
);
|
||||
|
||||
ddbms.move(fullSourcePaths, pathsToCreate);
|
||||
ddbms.move(fullSourcePaths, pathsToCreate, getTtlTimeProvider());
|
||||
|
||||
// assert that all the ancestors should have been populated automatically
|
||||
assertCached(testRoot + "/c");
|
||||
|
@ -243,7 +243,8 @@ public void test_030_BatchedWrite() throws Exception {
|
||||
|
||||
if (pruneItems == BATCH_SIZE) {
|
||||
describe("pruning files");
|
||||
ddbms.prune(Long.MAX_VALUE /* all files */);
|
||||
ddbms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME,
|
||||
Long.MAX_VALUE /* all files */);
|
||||
pruneItems = 0;
|
||||
}
|
||||
if (tracker.probe()) {
|
||||
@ -305,7 +306,7 @@ public void test_050_getVersionMarkerItem() throws Throwable {
|
||||
private void retryingDelete(final Path path) {
|
||||
try {
|
||||
ddbms.getInvoker().retry("Delete ", path.toString(), true,
|
||||
() -> ddbms.delete(path));
|
||||
() -> ddbms.delete(path, new S3Guard.TtlTimeProvider(getConf())));
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to delete {}: ", path, e);
|
||||
}
|
||||
|
@ -23,6 +23,7 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
@ -68,6 +69,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
|
||||
static final FsPermission PERMISSION = null;
|
||||
static final String GROUP = null;
|
||||
private final long accessTime = 0;
|
||||
private static ITtlTimeProvider ttlTimeProvider;
|
||||
|
||||
/**
|
||||
* Each test should override this. Will use a new Configuration instance.
|
||||
@ -123,6 +125,8 @@ public void setUp() throws Exception {
|
||||
assertNotNull("null MetadataStore", ms);
|
||||
assertNotNull("null FileSystem", contract.getFileSystem());
|
||||
ms.initialize(contract.getFileSystem());
|
||||
ttlTimeProvider =
|
||||
new S3Guard.TtlTimeProvider(contract.getFileSystem().getConf());
|
||||
}
|
||||
|
||||
@After
|
||||
@ -310,7 +314,7 @@ public void testRootDirPutNew() throws Exception {
|
||||
public void testDelete() throws Exception {
|
||||
setUpDeleteTest();
|
||||
|
||||
ms.delete(strToPath("/ADirectory1/db1/file2"));
|
||||
ms.delete(strToPath("/ADirectory1/db1/file2"), ttlTimeProvider);
|
||||
|
||||
/* Ensure delete happened. */
|
||||
assertDirectorySize("/ADirectory1/db1", 1);
|
||||
@ -338,7 +342,7 @@ private void deleteSubtreeHelper(String pathPrefix) throws Exception {
|
||||
if (!allowMissing()) {
|
||||
assertCached(p + "/ADirectory1/db1");
|
||||
}
|
||||
ms.deleteSubtree(strToPath(p + "/ADirectory1/db1/"));
|
||||
ms.deleteSubtree(strToPath(p + "/ADirectory1/db1/"), ttlTimeProvider);
|
||||
|
||||
assertEmptyDirectory(p + "/ADirectory1");
|
||||
assertDeleted(p + "/ADirectory1/db1");
|
||||
@ -358,7 +362,7 @@ private void deleteSubtreeHelper(String pathPrefix) throws Exception {
|
||||
public void testDeleteRecursiveRoot() throws Exception {
|
||||
setUpDeleteTest();
|
||||
|
||||
ms.deleteSubtree(strToPath("/"));
|
||||
ms.deleteSubtree(strToPath("/"), ttlTimeProvider);
|
||||
assertDeleted("/ADirectory1");
|
||||
assertDeleted("/ADirectory2");
|
||||
assertDeleted("/ADirectory2/db1");
|
||||
@ -369,10 +373,10 @@ public void testDeleteRecursiveRoot() throws Exception {
|
||||
@Test
|
||||
public void testDeleteNonExisting() throws Exception {
|
||||
// Path doesn't exist, but should silently succeed
|
||||
ms.delete(strToPath("/bobs/your/uncle"));
|
||||
ms.delete(strToPath("/bobs/your/uncle"), ttlTimeProvider);
|
||||
|
||||
// Ditto.
|
||||
ms.deleteSubtree(strToPath("/internets"));
|
||||
ms.deleteSubtree(strToPath("/internets"), ttlTimeProvider);
|
||||
}
|
||||
|
||||
|
||||
@ -408,7 +412,7 @@ public void testGet() throws Exception {
|
||||
}
|
||||
|
||||
if (!(ms instanceof NullMetadataStore)) {
|
||||
ms.delete(strToPath(filePath));
|
||||
ms.delete(strToPath(filePath), ttlTimeProvider);
|
||||
meta = ms.get(strToPath(filePath));
|
||||
assertTrue("Tombstone not left for deleted file", meta.isDeleted());
|
||||
}
|
||||
@ -586,7 +590,7 @@ public void testMove() throws Exception {
|
||||
destMetas.add(new PathMetadata(makeDirStatus("/b1")));
|
||||
destMetas.add(new PathMetadata(makeFileStatus("/b1/file1", 100)));
|
||||
destMetas.add(new PathMetadata(makeFileStatus("/b1/file2", 100)));
|
||||
ms.move(srcPaths, destMetas);
|
||||
ms.move(srcPaths, destMetas, ttlTimeProvider);
|
||||
|
||||
// Assert src is no longer there
|
||||
dirMeta = ms.listChildren(strToPath("/a1"));
|
||||
@ -636,11 +640,11 @@ public void testMultiBucketPaths() throws Exception {
|
||||
|
||||
// Make sure delete is correct as well
|
||||
if (!allowMissing()) {
|
||||
ms.delete(new Path(p2));
|
||||
ms.delete(new Path(p2), ttlTimeProvider);
|
||||
meta = ms.get(new Path(p1));
|
||||
assertNotNull("Path should not have been deleted", meta);
|
||||
}
|
||||
ms.delete(new Path(p1));
|
||||
ms.delete(new Path(p1), ttlTimeProvider);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -668,7 +672,7 @@ public void testPruneFiles() throws Exception {
|
||||
assertListingsEqual(ls.getListing(), "/pruneFiles/new",
|
||||
"/pruneFiles/old");
|
||||
}
|
||||
ms.prune(cutoff);
|
||||
ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, cutoff);
|
||||
ls = ms.listChildren(strToPath("/pruneFiles"));
|
||||
if (allowMissing()) {
|
||||
assertDeleted("/pruneFiles/old");
|
||||
@ -698,7 +702,7 @@ public void testPruneDirs() throws Exception {
|
||||
Thread.sleep(1);
|
||||
long cutoff = getTime();
|
||||
|
||||
ms.prune(cutoff);
|
||||
ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, cutoff);
|
||||
|
||||
assertDeleted("/pruneDirs/dir/file");
|
||||
}
|
||||
@ -728,7 +732,7 @@ public void testPruneUnsetsAuthoritative() throws Exception {
|
||||
ms.put(parentDirMd);
|
||||
}
|
||||
|
||||
ms.prune(time);
|
||||
ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, time);
|
||||
DirListingMetadata listing;
|
||||
for (String directory : directories) {
|
||||
Path path = strToPath(directory);
|
||||
@ -765,7 +769,7 @@ public void testPrunePreservesAuthoritative() throws Exception {
|
||||
ms.put(parentDirMd);
|
||||
|
||||
// prune the ms
|
||||
ms.prune(time);
|
||||
ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, time);
|
||||
|
||||
// get the directory listings
|
||||
DirListingMetadata rootDirMd = ms.listChildren(strToPath(rootDir));
|
||||
@ -823,6 +827,89 @@ public void testPutRetainsIsDeletedInParentListing() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPruneExpiredTombstones() throws Exception {
|
||||
List<String> keepFilenames = new ArrayList<>(
|
||||
Arrays.asList("/dir1/fileK1", "/dir1/fileK2", "/dir1/fileK3"));
|
||||
List<String> removeFilenames = new ArrayList<>(
|
||||
Arrays.asList("/dir1/fileR1", "/dir1/fileR2", "/dir1/fileR3"));
|
||||
|
||||
long cutoff = 9001;
|
||||
|
||||
for(String fN : keepFilenames) {
|
||||
final PathMetadata pathMetadata = new PathMetadata(makeFileStatus(fN, 1));
|
||||
pathMetadata.setLastUpdated(9002L);
|
||||
ms.put(pathMetadata);
|
||||
}
|
||||
|
||||
for(String fN : removeFilenames) {
|
||||
final PathMetadata pathMetadata = new PathMetadata(makeFileStatus(fN, 1));
|
||||
pathMetadata.setLastUpdated(9000L);
|
||||
// tombstones are the deleted files!
|
||||
pathMetadata.setIsDeleted(true);
|
||||
ms.put(pathMetadata);
|
||||
}
|
||||
|
||||
ms.prune(MetadataStore.PruneMode.TOMBSTONES_BY_LASTUPDATED, cutoff);
|
||||
|
||||
if (!allowMissing()) {
|
||||
for (String fN : keepFilenames) {
|
||||
final PathMetadata pathMetadata = ms.get(strToPath(fN));
|
||||
assertNotNull("Kept files should be in the metastore after prune",
|
||||
pathMetadata);
|
||||
}
|
||||
}
|
||||
|
||||
for(String fN : removeFilenames) {
|
||||
final PathMetadata pathMetadata = ms.get(strToPath(fN));
|
||||
assertNull("Expired tombstones should be removed from metastore after "
|
||||
+ "the prune.", pathMetadata);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPruneExpiredTombstonesSpecifiedPath() throws Exception {
|
||||
List<String> keepFilenames = new ArrayList<>(
|
||||
Arrays.asList("/dir1/fileK1", "/dir1/fileK2", "/dir1/fileK3"));
|
||||
List<String> removeFilenames = new ArrayList<>(
|
||||
Arrays.asList("/dir2/fileR1", "/dir2/fileR2", "/dir2/fileR3"));
|
||||
|
||||
long cutoff = 9001;
|
||||
|
||||
// Both are expired. Difference is it will only delete the specified one.
|
||||
for (String fN : keepFilenames) {
|
||||
final PathMetadata pathMetadata = new PathMetadata(makeFileStatus(fN, 1));
|
||||
pathMetadata.setLastUpdated(9002L);
|
||||
ms.put(pathMetadata);
|
||||
}
|
||||
|
||||
for (String fN : removeFilenames) {
|
||||
final PathMetadata pathMetadata = new PathMetadata(makeFileStatus(fN, 1));
|
||||
pathMetadata.setLastUpdated(9000L);
|
||||
// tombstones are the deleted files!
|
||||
pathMetadata.setIsDeleted(true);
|
||||
ms.put(pathMetadata);
|
||||
}
|
||||
|
||||
final String prunePath = getPathStringForPrune("/dir2");
|
||||
ms.prune(MetadataStore.PruneMode.TOMBSTONES_BY_LASTUPDATED, cutoff,
|
||||
prunePath);
|
||||
|
||||
if (!allowMissing()) {
|
||||
for (String fN : keepFilenames) {
|
||||
final PathMetadata pathMetadata = ms.get(strToPath(fN));
|
||||
assertNotNull("Kept files should be in the metastore after prune",
|
||||
pathMetadata);
|
||||
}
|
||||
}
|
||||
|
||||
for (String fN : removeFilenames) {
|
||||
final PathMetadata pathMetadata = ms.get(strToPath(fN));
|
||||
assertNull("Expired tombstones should be removed from metastore after "
|
||||
+ "the prune.", pathMetadata);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper functions.
|
||||
*/
|
||||
@ -837,6 +924,16 @@ private String[] buildPathStrings(String parent, String... paths)
|
||||
return paths;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* The prune operation needs the path with the bucket name as a string in
|
||||
* {@link DynamoDBMetadataStore}, but not for {@link LocalMetadataStore}.
|
||||
* This is an implementation detail of the ms, so this should be
|
||||
* implemented in the subclasses.
|
||||
*/
|
||||
protected abstract String getPathStringForPrune(String path)
|
||||
throws Exception;
|
||||
|
||||
private void commonTestPutListStatus(final String parent) throws IOException {
|
||||
putListStatusFiles(parent, true, buildPathStrings(parent, "file1", "file2",
|
||||
"file3"));
|
||||
@ -1012,4 +1109,8 @@ protected static long getTime() {
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
|
||||
protected static ITtlTimeProvider getTtlTimeProvider() {
|
||||
return ttlTimeProvider;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -75,6 +75,11 @@ public AbstractMSContract createContract(Configuration conf) throws
|
||||
return new LocalMSContract(conf);
|
||||
}
|
||||
|
||||
@Override protected String getPathStringForPrune(String path)
|
||||
throws Exception{
|
||||
return path;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClearByAncestor() throws Exception {
|
||||
Cache<Path, LocalMetadataEntry> cache = CacheBuilder.newBuilder().build();
|
||||
@ -184,7 +189,7 @@ private static void assertClearResult(Cache<Path, LocalMetadataEntry> cache,
|
||||
String prefixStr, String pathStr, int leftoverSize) throws IOException {
|
||||
populateMap(cache, prefixStr);
|
||||
LocalMetadataStore.deleteEntryByAncestor(new Path(prefixStr + pathStr),
|
||||
cache, true);
|
||||
cache, true, getTtlTimeProvider());
|
||||
assertEquals(String.format("Cache should have %d entries", leftoverSize),
|
||||
leftoverSize, sizeOfMap(cache));
|
||||
cache.invalidateAll();
|
||||
|
@ -46,6 +46,11 @@ public boolean allowMissing() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override protected String getPathStringForPrune(String path)
|
||||
throws Exception {
|
||||
return path;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractMSContract createContract() {
|
||||
return new NullMSContract();
|
||||
|
@ -18,18 +18,28 @@
|
||||
|
||||
package org.apache.hadoop.fs.s3a.s3guard;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||
import org.apache.hadoop.fs.s3a.Tristate;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Tests for the {@link S3Guard} utility class.
|
||||
@ -58,8 +68,8 @@ public void testDirListingUnion() throws Exception {
|
||||
makeFileStatus("s3a://bucket/dir/s3-file4", false)
|
||||
);
|
||||
|
||||
S3Guard.ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider(
|
||||
DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL);
|
||||
ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider(
|
||||
DEFAULT_METADATASTORE_METADATA_TTL);
|
||||
FileStatus[] result = S3Guard.dirListingUnion(ms, dirPath, s3Listing,
|
||||
dirMeta, false, timeProvider);
|
||||
|
||||
@ -70,6 +80,185 @@ public void testDirListingUnion() throws Exception {
|
||||
assertContainsPath(result, "s3a://bucket/dir/s3-file4");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutWithTtlDirListingMeta() throws Exception {
|
||||
// arrange
|
||||
DirListingMetadata dlm = new DirListingMetadata(new Path("/"), null,
|
||||
false);
|
||||
MetadataStore ms = spy(MetadataStore.class);
|
||||
ITtlTimeProvider timeProvider =
|
||||
mock(ITtlTimeProvider.class);
|
||||
when(timeProvider.getNow()).thenReturn(100L);
|
||||
|
||||
// act
|
||||
S3Guard.putWithTtl(ms, dlm, timeProvider);
|
||||
|
||||
// assert
|
||||
assertEquals("last update in " + dlm, 100L, dlm.getLastUpdated());
|
||||
verify(timeProvider, times(1)).getNow();
|
||||
verify(ms, times(1)).put(dlm);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutWithTtlFileMeta() throws Exception {
|
||||
// arrange
|
||||
S3AFileStatus fileStatus = mock(S3AFileStatus.class);
|
||||
when(fileStatus.getPath()).thenReturn(new Path("/"));
|
||||
PathMetadata pm = new PathMetadata(fileStatus);
|
||||
MetadataStore ms = spy(MetadataStore.class);
|
||||
ITtlTimeProvider timeProvider =
|
||||
mock(ITtlTimeProvider.class);
|
||||
when(timeProvider.getNow()).thenReturn(100L);
|
||||
|
||||
// act
|
||||
S3Guard.putWithTtl(ms, pm, timeProvider);
|
||||
|
||||
// assert
|
||||
assertEquals("last update in " + pm, 100L, pm.getLastUpdated());
|
||||
verify(timeProvider, times(1)).getNow();
|
||||
verify(ms, times(1)).put(pm);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutWithTtlCollection() throws Exception {
|
||||
// arrange
|
||||
S3AFileStatus fileStatus = mock(S3AFileStatus.class);
|
||||
when(fileStatus.getPath()).thenReturn(new Path("/"));
|
||||
Collection<PathMetadata> pmCollection = new ArrayList<>();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
pmCollection.add(new PathMetadata(fileStatus));
|
||||
}
|
||||
MetadataStore ms = spy(MetadataStore.class);
|
||||
ITtlTimeProvider timeProvider =
|
||||
mock(ITtlTimeProvider.class);
|
||||
when(timeProvider.getNow()).thenReturn(100L);
|
||||
|
||||
// act
|
||||
S3Guard.putWithTtl(ms, pmCollection, timeProvider);
|
||||
|
||||
// assert
|
||||
pmCollection.forEach(
|
||||
pm -> assertEquals(100L, pm.getLastUpdated())
|
||||
);
|
||||
verify(timeProvider, times(1)).getNow();
|
||||
verify(ms, times(1)).put(pmCollection);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetWithTtlExpired() throws Exception {
|
||||
// arrange
|
||||
S3AFileStatus fileStatus = mock(S3AFileStatus.class);
|
||||
Path path = new Path("/file");
|
||||
when(fileStatus.getPath()).thenReturn(path);
|
||||
PathMetadata pm = new PathMetadata(fileStatus);
|
||||
pm.setLastUpdated(100L);
|
||||
|
||||
MetadataStore ms = mock(MetadataStore.class);
|
||||
when(ms.get(path)).thenReturn(pm);
|
||||
|
||||
ITtlTimeProvider timeProvider =
|
||||
mock(ITtlTimeProvider.class);
|
||||
when(timeProvider.getNow()).thenReturn(101L);
|
||||
when(timeProvider.getMetadataTtl()).thenReturn(1L);
|
||||
|
||||
// act
|
||||
final PathMetadata pmExpired = S3Guard.getWithTtl(ms, path, timeProvider);
|
||||
|
||||
// assert
|
||||
assertNull(pmExpired);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetWithTtlNotExpired() throws Exception {
|
||||
// arrange
|
||||
S3AFileStatus fileStatus = mock(S3AFileStatus.class);
|
||||
Path path = new Path("/file");
|
||||
when(fileStatus.getPath()).thenReturn(path);
|
||||
PathMetadata pm = new PathMetadata(fileStatus);
|
||||
pm.setLastUpdated(100L);
|
||||
|
||||
MetadataStore ms = mock(MetadataStore.class);
|
||||
when(ms.get(path)).thenReturn(pm);
|
||||
|
||||
ITtlTimeProvider timeProvider =
|
||||
mock(ITtlTimeProvider.class);
|
||||
when(timeProvider.getNow()).thenReturn(101L);
|
||||
when(timeProvider.getMetadataTtl()).thenReturn(2L);
|
||||
|
||||
// act
|
||||
final PathMetadata pmNotExpired =
|
||||
S3Guard.getWithTtl(ms, path, timeProvider);
|
||||
|
||||
// assert
|
||||
assertNotNull(pmNotExpired);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetWithZeroLastUpdatedNotExpired() throws Exception {
|
||||
// arrange
|
||||
S3AFileStatus fileStatus = mock(S3AFileStatus.class);
|
||||
Path path = new Path("/file");
|
||||
when(fileStatus.getPath()).thenReturn(path);
|
||||
PathMetadata pm = new PathMetadata(fileStatus);
|
||||
// we set 0 this time as the last updated: can happen eg. when we use an
|
||||
// old dynamo table
|
||||
pm.setLastUpdated(0L);
|
||||
|
||||
MetadataStore ms = mock(MetadataStore.class);
|
||||
when(ms.get(path)).thenReturn(pm);
|
||||
|
||||
ITtlTimeProvider timeProvider =
|
||||
mock(ITtlTimeProvider.class);
|
||||
when(timeProvider.getNow()).thenReturn(101L);
|
||||
when(timeProvider.getMetadataTtl()).thenReturn(2L);
|
||||
|
||||
// act
|
||||
final PathMetadata pmExpired = S3Guard.getWithTtl(ms, path, timeProvider);
|
||||
|
||||
// assert
|
||||
assertNotNull(pmExpired);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Makes sure that all uses of TTL timeouts use a consistent time unit.
|
||||
* @throws Throwable failure
|
||||
*/
|
||||
@Test
|
||||
public void testTTLConstruction() throws Throwable {
|
||||
// first one
|
||||
ITtlTimeProvider timeProviderExplicit = new S3Guard.TtlTimeProvider(
|
||||
DEFAULT_METADATASTORE_METADATA_TTL);
|
||||
|
||||
// mirror the FS construction,
|
||||
// from a config guaranteed to be empty (i.e. the code defval)
|
||||
Configuration conf = new Configuration(false);
|
||||
long millitime = conf.getTimeDuration(METADATASTORE_METADATA_TTL,
|
||||
DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
|
||||
assertEquals(15 * 60_000, millitime);
|
||||
S3Guard.TtlTimeProvider fsConstruction = new S3Guard.TtlTimeProvider(
|
||||
millitime);
|
||||
assertEquals("explicit vs fs construction", timeProviderExplicit,
|
||||
fsConstruction);
|
||||
assertEquals("first and second constructor", timeProviderExplicit,
|
||||
new S3Guard.TtlTimeProvider(conf));
|
||||
// set the conf to a time without unit
|
||||
conf.setLong(METADATASTORE_METADATA_TTL,
|
||||
DEFAULT_METADATASTORE_METADATA_TTL);
|
||||
assertEquals("first and second time set through long", timeProviderExplicit,
|
||||
new S3Guard.TtlTimeProvider(conf));
|
||||
double timeInSeconds = DEFAULT_METADATASTORE_METADATA_TTL / 1000;
|
||||
double timeInMinutes = timeInSeconds / 60;
|
||||
String timeStr = String.format("%dm", (int) timeInMinutes);
|
||||
assertEquals(":wrong time in minutes from " + timeInMinutes,
|
||||
"15m", timeStr);
|
||||
conf.set(METADATASTORE_METADATA_TTL, timeStr);
|
||||
assertEquals("Time in millis as string from "
|
||||
+ conf.get(METADATASTORE_METADATA_TTL),
|
||||
timeProviderExplicit,
|
||||
new S3Guard.TtlTimeProvider(conf));
|
||||
}
|
||||
|
||||
void assertContainsPath(FileStatus[] statuses, String pathStr) {
|
||||
assertTrue("listing doesn't contain " + pathStr,
|
||||
containsPath(statuses, pathStr));
|
||||
|
@ -18,11 +18,15 @@
|
||||
|
||||
package org.apache.hadoop.fs.s3a.scale;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.FixMethodOrder;
|
||||
import org.junit.Test;
|
||||
import org.junit.runners.MethodSorters;
|
||||
@ -54,6 +58,12 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
|
||||
static final long ACCESS_TIME = System.currentTimeMillis();
|
||||
|
||||
static final Path BUCKET_ROOT = new Path("s3a://fake-bucket/");
|
||||
private ITtlTimeProvider ttlTimeProvider;
|
||||
|
||||
@Before
|
||||
public void initialize() {
|
||||
ttlTimeProvider = new S3Guard.TtlTimeProvider(new Configuration());
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclasses should override this to provide the MetadataStore they which
|
||||
@ -129,7 +139,7 @@ public void test_020_Moves() throws Throwable {
|
||||
toDelete = movedPaths;
|
||||
toCreate = origMetas;
|
||||
}
|
||||
ms.move(toDelete, toCreate);
|
||||
ms.move(toDelete, toCreate, ttlTimeProvider);
|
||||
}
|
||||
moveTimer.end();
|
||||
printTiming(LOG, "move", moveTimer, operations);
|
||||
@ -194,7 +204,7 @@ protected void clearMetadataStore(MetadataStore ms, long count)
|
||||
throws IOException {
|
||||
describe("Recursive deletion");
|
||||
NanoTimer deleteTimer = new NanoTimer();
|
||||
ms.deleteSubtree(BUCKET_ROOT);
|
||||
ms.deleteSubtree(BUCKET_ROOT, ttlTimeProvider);
|
||||
deleteTimer.end();
|
||||
printTiming(LOG, "delete", deleteTimer, count);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user