HADOOP-16433. S3Guard: Filter expired entries and tombstones when listing with MetadataStore.listChildren().
Contributed by Gabor Bota. This pulls the tracking of the lastUpdated timestamp of metadata entries up from the DDB metastore into all s3guard stores, and then uses this to filter out expired tombstones from listings. Change-Id: I80f121236b49c75a024116f65a3ef29d3580b462
This commit is contained in:
parent
cf9ff08f4b
commit
7b219778e0
@ -31,7 +31,8 @@ public class DDBPathMetadata extends PathMetadata {
|
||||
private boolean isAuthoritativeDir;
|
||||
|
||||
public DDBPathMetadata(PathMetadata pmd) {
|
||||
super(pmd.getFileStatus(), pmd.isEmptyDirectory(), pmd.isDeleted());
|
||||
super(pmd.getFileStatus(), pmd.isEmptyDirectory(), pmd.isDeleted(),
|
||||
pmd.getLastUpdated());
|
||||
this.isAuthoritativeDir = false;
|
||||
this.setLastUpdated(pmd.getLastUpdated());
|
||||
}
|
||||
@ -42,16 +43,15 @@ public DDBPathMetadata(S3AFileStatus fileStatus) {
|
||||
}
|
||||
|
||||
public DDBPathMetadata(S3AFileStatus fileStatus, Tristate isEmptyDir,
|
||||
boolean isDeleted) {
|
||||
super(fileStatus, isEmptyDir, isDeleted);
|
||||
boolean isDeleted, long lastUpdated) {
|
||||
super(fileStatus, isEmptyDir, isDeleted, lastUpdated);
|
||||
this.isAuthoritativeDir = false;
|
||||
}
|
||||
|
||||
public DDBPathMetadata(S3AFileStatus fileStatus, Tristate isEmptyDir,
|
||||
boolean isDeleted, boolean isAuthoritativeDir, long lastUpdated) {
|
||||
super(fileStatus, isEmptyDir, isDeleted);
|
||||
super(fileStatus, isEmptyDir, isDeleted, lastUpdated);
|
||||
this.isAuthoritativeDir = isAuthoritativeDir;
|
||||
this.setLastUpdated(lastUpdated);
|
||||
}
|
||||
|
||||
public boolean isAuthoritativeDir() {
|
||||
@ -74,7 +74,6 @@ public boolean equals(Object o) {
|
||||
@Override public String toString() {
|
||||
return "DDBPathMetadata{" +
|
||||
"isAuthoritativeDir=" + isAuthoritativeDir +
|
||||
", lastUpdated=" + this.getLastUpdated() +
|
||||
", PathMetadata=" + super.toString() +
|
||||
'}';
|
||||
}
|
||||
|
@ -24,6 +24,7 @@
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -62,7 +63,7 @@ public class DirListingMetadata extends ExpirableMetadata {
|
||||
* Create a directory listing metadata container.
|
||||
*
|
||||
* @param path Path of the directory. If this path has a host component, then
|
||||
* all paths added later via {@link #put(S3AFileStatus)} must also have
|
||||
* all paths added later via {@link #put(PathMetadata)} must also have
|
||||
* the same host.
|
||||
* @param listing Entries in the directory.
|
||||
* @param isAuthoritative true iff listing is the full contents of the
|
||||
@ -203,9 +204,9 @@ public PathMetadata get(Path childPath) {
|
||||
* Replace an entry with a tombstone.
|
||||
* @param childPath path of entry to replace.
|
||||
*/
|
||||
public void markDeleted(Path childPath) {
|
||||
public void markDeleted(Path childPath, long lastUpdated) {
|
||||
checkChildPath(childPath);
|
||||
listMap.put(childPath, PathMetadata.tombstone(childPath));
|
||||
listMap.put(childPath, PathMetadata.tombstone(childPath, lastUpdated));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -222,16 +223,17 @@ public void remove(Path childPath) {
|
||||
* Add an entry to the directory listing. If this listing already contains a
|
||||
* {@code FileStatus} with the same path, it will be replaced.
|
||||
*
|
||||
* @param childFileStatus entry to add to this directory listing.
|
||||
* @param childPathMetadata entry to add to this directory listing.
|
||||
* @return true if the status was added or replaced with a new value. False
|
||||
* if the same FileStatus value was already present.
|
||||
*/
|
||||
public boolean put(S3AFileStatus childFileStatus) {
|
||||
Preconditions.checkNotNull(childFileStatus,
|
||||
"childFileStatus must be non-null");
|
||||
Path childPath = childStatusToPathKey(childFileStatus);
|
||||
PathMetadata newValue = new PathMetadata(childFileStatus);
|
||||
PathMetadata oldValue = listMap.put(childPath, newValue);
|
||||
public boolean put(PathMetadata childPathMetadata) {
|
||||
Preconditions.checkNotNull(childPathMetadata,
|
||||
"childPathMetadata must be non-null");
|
||||
final S3AFileStatus fileStatus = childPathMetadata.getFileStatus();
|
||||
Path childPath = childStatusToPathKey(fileStatus);
|
||||
PathMetadata newValue = childPathMetadata;
|
||||
PathMetadata oldValue = listMap.put(childPath, childPathMetadata);
|
||||
return oldValue == null || !oldValue.equals(newValue);
|
||||
}
|
||||
|
||||
@ -245,6 +247,25 @@ public String toString() {
|
||||
'}';
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove expired entries from the listing based on TTL.
|
||||
* @param ttl the ttl time
|
||||
* @param now the current time
|
||||
*/
|
||||
public synchronized void removeExpiredEntriesFromListing(long ttl,
|
||||
long now) {
|
||||
final Iterator<Map.Entry<Path, PathMetadata>> iterator =
|
||||
listMap.entrySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
final Map.Entry<Path, PathMetadata> entry = iterator.next();
|
||||
// we filter iff the lastupdated is not 0 and the entry is expired
|
||||
if (entry.getValue().getLastUpdated() != 0
|
||||
&& (entry.getValue().getLastUpdated() + ttl) <= now) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Log contents to supplied StringBuilder in a pretty fashion.
|
||||
* @param sb target StringBuilder
|
||||
|
@ -585,9 +585,8 @@ private void innerDelete(final Path path,
|
||||
if (tombstone) {
|
||||
Preconditions.checkArgument(ttlTimeProvider != null, "ttlTimeProvider "
|
||||
+ "must not be null");
|
||||
final PathMetadata pmTombstone = PathMetadata.tombstone(path);
|
||||
// update the last updated field of record when putting a tombstone
|
||||
pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
|
||||
final PathMetadata pmTombstone = PathMetadata.tombstone(path,
|
||||
ttlTimeProvider.getNow());
|
||||
Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
|
||||
new DDBPathMetadata(pmTombstone));
|
||||
writeOp.retry(
|
||||
@ -785,8 +784,16 @@ public DirListingMetadata listChildren(final Path path) throws IOException {
|
||||
// get a null in DDBPathMetadata.
|
||||
DDBPathMetadata dirPathMeta = get(path);
|
||||
|
||||
return getDirListingMetadataFromDirMetaAndList(path, metas,
|
||||
dirPathMeta);
|
||||
// Filter expired entries.
|
||||
final DirListingMetadata dirListing =
|
||||
getDirListingMetadataFromDirMetaAndList(path, metas,
|
||||
dirPathMeta);
|
||||
if(dirListing != null) {
|
||||
dirListing.removeExpiredEntriesFromListing(
|
||||
ttlTimeProvider.getMetadataTtl(),
|
||||
ttlTimeProvider.getNow());
|
||||
}
|
||||
return dirListing;
|
||||
});
|
||||
}
|
||||
|
||||
@ -947,7 +954,7 @@ public void addAncestors(final Path qualifiedPath,
|
||||
S3AFileStatus status = makeDirStatus(username, parent);
|
||||
LOG.debug("Adding new ancestor entry {}", status);
|
||||
DDBPathMetadata meta = new DDBPathMetadata(status, Tristate.FALSE,
|
||||
false);
|
||||
false, ttlTimeProvider.getNow());
|
||||
newDirs.add(meta);
|
||||
// Do not update ancestor state here, as it
|
||||
// will happen in the innerPut() call. Were we to add it
|
||||
@ -1039,8 +1046,8 @@ public void move(@Nullable Collection<Path> pathsToDelete,
|
||||
for (Path meta : pathsToDelete) {
|
||||
Preconditions.checkArgument(ttlTimeProvider != null, "ttlTimeProvider"
|
||||
+ " must not be null");
|
||||
final PathMetadata pmTombstone = PathMetadata.tombstone(meta);
|
||||
pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
|
||||
final PathMetadata pmTombstone = PathMetadata.tombstone(meta,
|
||||
ttlTimeProvider.getNow());
|
||||
tombstones.add(new DDBPathMetadata(pmTombstone));
|
||||
}
|
||||
// sort all the tombstones lowest first.
|
||||
|
@ -129,32 +129,31 @@ public String toString() {
|
||||
@Override
|
||||
public void delete(Path p)
|
||||
throws IOException {
|
||||
doDelete(p, false, true, ttlTimeProvider);
|
||||
doDelete(p, false, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forgetMetadata(Path p) throws IOException {
|
||||
doDelete(p, false, false, null);
|
||||
doDelete(p, false, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteSubtree(Path path)
|
||||
throws IOException {
|
||||
doDelete(path, true, true, ttlTimeProvider);
|
||||
doDelete(path, true, true);
|
||||
}
|
||||
|
||||
private synchronized void doDelete(Path p, boolean recursive,
|
||||
boolean tombstone, ITtlTimeProvider ttlTp) {
|
||||
boolean tombstone) {
|
||||
|
||||
Path path = standardize(p);
|
||||
|
||||
// Delete entry from file cache, then from cached parent directory, if any
|
||||
|
||||
deleteCacheEntries(path, tombstone, ttlTp);
|
||||
deleteCacheEntries(path, tombstone);
|
||||
|
||||
if (recursive) {
|
||||
// Remove all entries that have this dir as path prefix.
|
||||
deleteEntryByAncestor(path, localCache, tombstone, ttlTp);
|
||||
deleteEntryByAncestor(path, localCache, tombstone, ttlTimeProvider);
|
||||
}
|
||||
}
|
||||
|
||||
@ -202,8 +201,16 @@ public synchronized DirListingMetadata listChildren(Path p) throws
|
||||
LOG.debug("listChildren({}) -> {}", path,
|
||||
listing == null ? "null" : listing.prettyPrint());
|
||||
}
|
||||
// Make a copy so callers can mutate without affecting our state
|
||||
return listing == null ? null : new DirListingMetadata(listing);
|
||||
|
||||
if (listing != null) {
|
||||
listing.removeExpiredEntriesFromListing(
|
||||
ttlTimeProvider.getMetadataTtl(), ttlTimeProvider.getNow());
|
||||
LOG.debug("listChildren [after removing expired entries] ({}) -> {}",
|
||||
path, listing.prettyPrint());
|
||||
// Make a copy so callers can mutate without affecting our state
|
||||
return new DirListingMetadata(listing);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -309,15 +316,17 @@ public void put(PathMetadata meta,
|
||||
DirListingMetadata parentDirMeta =
|
||||
new DirListingMetadata(parentPath, DirListingMetadata.EMPTY_DIR,
|
||||
false);
|
||||
parentDirMeta.setLastUpdated(meta.getLastUpdated());
|
||||
parentMeta.setDirListingMetadata(parentDirMeta);
|
||||
}
|
||||
|
||||
// Add the child status to the listing
|
||||
parentMeta.getDirListingMeta().put(status);
|
||||
// Add the child pathMetadata to the listing
|
||||
parentMeta.getDirListingMeta().put(meta);
|
||||
|
||||
// Mark the listing entry as deleted if the meta is set to deleted
|
||||
if(meta.isDeleted()) {
|
||||
parentMeta.getDirListingMeta().markDeleted(path);
|
||||
parentMeta.getDirListingMeta().markDeleted(path,
|
||||
ttlTimeProvider.getNow());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -463,8 +472,8 @@ static void deleteEntryByAncestor(Path ancestor,
|
||||
if(meta.hasDirMeta()){
|
||||
cache.invalidate(path);
|
||||
} else if(tombstone && meta.hasPathMeta()){
|
||||
final PathMetadata pmTombstone = PathMetadata.tombstone(path);
|
||||
pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
|
||||
final PathMetadata pmTombstone = PathMetadata.tombstone(path,
|
||||
ttlTimeProvider.getNow());
|
||||
meta.setPathMetadata(pmTombstone);
|
||||
} else {
|
||||
cache.invalidate(path);
|
||||
@ -489,8 +498,7 @@ private static boolean isAncestorOf(Path ancestor, Path f) {
|
||||
* Update fileCache and dirCache to reflect deletion of file 'f'. Call with
|
||||
* lock held.
|
||||
*/
|
||||
private void deleteCacheEntries(Path path, boolean tombstone,
|
||||
ITtlTimeProvider ttlTp) {
|
||||
private void deleteCacheEntries(Path path, boolean tombstone) {
|
||||
LocalMetadataEntry entry = localCache.getIfPresent(path);
|
||||
// If there's no entry, delete should silently succeed
|
||||
// (based on MetadataStoreTestBase#testDeleteNonExisting)
|
||||
@ -503,8 +511,8 @@ private void deleteCacheEntries(Path path, boolean tombstone,
|
||||
LOG.debug("delete file entry for {}", path);
|
||||
if(entry.hasPathMeta()){
|
||||
if (tombstone) {
|
||||
PathMetadata pmd = PathMetadata.tombstone(path);
|
||||
pmd.setLastUpdated(ttlTp.getNow());
|
||||
PathMetadata pmd = PathMetadata.tombstone(path,
|
||||
ttlTimeProvider.getNow());
|
||||
entry.setPathMetadata(pmd);
|
||||
} else {
|
||||
entry.setPathMetadata(null);
|
||||
@ -530,8 +538,7 @@ private void deleteCacheEntries(Path path, boolean tombstone,
|
||||
if (dir != null) {
|
||||
LOG.debug("removing parent's entry for {} ", path);
|
||||
if (tombstone) {
|
||||
dir.markDeleted(path);
|
||||
dir.setLastUpdated(ttlTp.getNow());
|
||||
dir.markDeleted(path, ttlTimeProvider.getNow());
|
||||
} else {
|
||||
dir.remove(path);
|
||||
}
|
||||
@ -613,7 +620,8 @@ public void addAncestors(final Path qualifiedPath,
|
||||
if (directory == null || directory.isDeleted()) {
|
||||
S3AFileStatus status = new S3AFileStatus(Tristate.FALSE, parent,
|
||||
username);
|
||||
PathMetadata meta = new PathMetadata(status, Tristate.FALSE, false);
|
||||
PathMetadata meta = new PathMetadata(status, Tristate.FALSE, false,
|
||||
ttlTimeProvider.getNow());
|
||||
newDirs.add(meta);
|
||||
} else {
|
||||
break;
|
||||
|
@ -27,7 +27,9 @@
|
||||
|
||||
/**
|
||||
* {@code PathMetadata} models path metadata stored in the
|
||||
* {@link MetadataStore}.
|
||||
* {@link MetadataStore}. The lastUpdated field is implicitly set to 0 in the
|
||||
* constructors without that parameter to show that it will be initialized
|
||||
* with 0 if not set otherwise.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
@ -39,38 +41,85 @@ public class PathMetadata extends ExpirableMetadata {
|
||||
|
||||
/**
|
||||
* Create a tombstone from the current time.
|
||||
* It is mandatory to set the lastUpdated field to update when the
|
||||
* tombstone state has changed to set when the entry got deleted.
|
||||
*
|
||||
* @param path path to tombstone
|
||||
* @param lastUpdated last updated time on which expiration is based.
|
||||
* @return the entry.
|
||||
*/
|
||||
public static PathMetadata tombstone(Path path) {
|
||||
public static PathMetadata tombstone(Path path, long lastUpdated) {
|
||||
S3AFileStatus s3aStatus = new S3AFileStatus(0,
|
||||
System.currentTimeMillis(), path, 0, null,
|
||||
null, null);
|
||||
return new PathMetadata(s3aStatus, Tristate.UNKNOWN, true);
|
||||
return new PathMetadata(s3aStatus, Tristate.UNKNOWN, true, lastUpdated);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@code PathMetadata} containing given {@code FileStatus}.
|
||||
* lastUpdated field will be updated to 0 implicitly in this constructor.
|
||||
*
|
||||
* @param fileStatus file status containing an absolute path.
|
||||
*/
|
||||
public PathMetadata(S3AFileStatus fileStatus) {
|
||||
this(fileStatus, Tristate.UNKNOWN, false);
|
||||
this(fileStatus, Tristate.UNKNOWN, false, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@code PathMetadata} containing given {@code FileStatus}.
|
||||
*
|
||||
* @param fileStatus file status containing an absolute path.
|
||||
* @param lastUpdated last updated time on which expiration is based.
|
||||
*/
|
||||
public PathMetadata(S3AFileStatus fileStatus, long lastUpdated) {
|
||||
this(fileStatus, Tristate.UNKNOWN, false, lastUpdated);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@code PathMetadata}.
|
||||
* lastUpdated field will be updated to 0 implicitly in this constructor.
|
||||
*
|
||||
* @param fileStatus file status containing an absolute path.
|
||||
* @param isEmptyDir empty directory {@link Tristate}
|
||||
*/
|
||||
public PathMetadata(S3AFileStatus fileStatus, Tristate isEmptyDir) {
|
||||
this(fileStatus, isEmptyDir, false);
|
||||
this(fileStatus, isEmptyDir, false, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@code PathMetadata}.
|
||||
* lastUpdated field will be updated to 0 implicitly in this constructor.
|
||||
*
|
||||
* @param fileStatus file status containing an absolute path.
|
||||
* @param isEmptyDir empty directory {@link Tristate}
|
||||
* @param isDeleted deleted / tombstoned flag
|
||||
*/
|
||||
public PathMetadata(S3AFileStatus fileStatus, Tristate isEmptyDir,
|
||||
boolean isDeleted) {
|
||||
this(fileStatus, isEmptyDir, isDeleted, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@code PathMetadata}.
|
||||
*
|
||||
* @param fileStatus file status containing an absolute path.
|
||||
* @param isEmptyDir empty directory {@link Tristate}
|
||||
* @param isDeleted deleted / tombstoned flag
|
||||
* @param lastUpdated last updated time on which expiration is based.
|
||||
*/
|
||||
public PathMetadata(S3AFileStatus fileStatus, Tristate isEmptyDir, boolean
|
||||
isDeleted) {
|
||||
isDeleted, long lastUpdated) {
|
||||
Preconditions.checkNotNull(fileStatus, "fileStatus must be non-null");
|
||||
Preconditions.checkNotNull(fileStatus.getPath(), "fileStatus path must be" +
|
||||
" non-null");
|
||||
Preconditions.checkArgument(fileStatus.getPath().isAbsolute(), "path must" +
|
||||
" be absolute");
|
||||
Preconditions.checkArgument(lastUpdated >=0, "lastUpdated parameter must "
|
||||
+ "be greater or equal to 0.");
|
||||
this.fileStatus = fileStatus;
|
||||
this.isEmptyDirectory = isEmptyDir;
|
||||
this.isDeleted = isDeleted;
|
||||
this.setLastUpdated(lastUpdated);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -122,6 +171,7 @@ public String toString() {
|
||||
"fileStatus=" + fileStatus +
|
||||
"; isEmptyDirectory=" + isEmptyDirectory +
|
||||
"; isDeleted=" + isDeleted +
|
||||
"; lastUpdated=" + super.getLastUpdated() +
|
||||
'}';
|
||||
}
|
||||
|
||||
|
@ -296,12 +296,14 @@ public static FileStatus[] dirListingUnion(MetadataStore ms, Path path,
|
||||
continue;
|
||||
}
|
||||
|
||||
final PathMetadata pathMetadata = new PathMetadata(s);
|
||||
|
||||
if (!isAuthoritative){
|
||||
FileStatus status = dirMetaMap.get(s.getPath());
|
||||
if (status != null
|
||||
&& s.getModificationTime() > status.getModificationTime()) {
|
||||
LOG.debug("Update ms with newer metadata of: {}", status);
|
||||
S3Guard.putWithTtl(ms, new PathMetadata(s), timeProvider, null);
|
||||
S3Guard.putWithTtl(ms, pathMetadata, timeProvider, null);
|
||||
}
|
||||
}
|
||||
|
||||
@ -312,7 +314,7 @@ public static FileStatus[] dirListingUnion(MetadataStore ms, Path path,
|
||||
// Any FileSystem has similar race conditions, but we could persist
|
||||
// a stale entry longer. We could expose an atomic
|
||||
// DirListingMetadata#putIfNotPresent()
|
||||
boolean updated = dirMeta.put(s);
|
||||
boolean updated = dirMeta.put(pathMetadata);
|
||||
changed = changed || updated;
|
||||
}
|
||||
|
||||
|
@ -27,6 +27,7 @@
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -47,6 +48,7 @@
|
||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.readBytesToString;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||
@ -61,6 +63,7 @@
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
@ -288,7 +291,7 @@ public void testListingDelete() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that tombstone expiry is implemented, so if a file is created raw
|
||||
* Tests that tombstone expiry is implemented. If a file is created raw
|
||||
* while the tombstone exist in ms for with the same name then S3Guard will
|
||||
* check S3 for the file.
|
||||
*
|
||||
@ -538,6 +541,47 @@ public void deleteAfterTombstoneExpiryOobCreate() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that a tombstone won't hide an entry after it's expired in the
|
||||
* listing.
|
||||
*/
|
||||
@Test
|
||||
public void testRootTombstones() throws Exception {
|
||||
long ttl = 10L;
|
||||
ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
|
||||
when(mockTimeProvider.getMetadataTtl()).thenReturn(ttl);
|
||||
when(mockTimeProvider.getNow()).thenReturn(100L);
|
||||
ITtlTimeProvider originalTimeProvider = guardedFs.getTtlTimeProvider();
|
||||
guardedFs.setTtlTimeProvider(mockTimeProvider);
|
||||
|
||||
Path base = path(getMethodName() + UUID.randomUUID());
|
||||
Path testFile = new Path(base, "test.file");
|
||||
|
||||
try {
|
||||
touch(guardedFs, testFile);
|
||||
ContractTestUtils.assertDeleted(guardedFs, testFile, false);
|
||||
|
||||
touch(rawFS, testFile);
|
||||
awaitFileStatus(rawFS, testFile);
|
||||
|
||||
// the rawFS will include the file=
|
||||
LambdaTestUtils.eventually(5000, 1000, () -> {
|
||||
checkListingContainsPath(rawFS, testFile);
|
||||
});
|
||||
|
||||
// it will be hidden because of the tombstone
|
||||
checkListingDoesNotContainPath(guardedFs, testFile);
|
||||
|
||||
// the tombstone is expired, so we should detect the file
|
||||
when(mockTimeProvider.getNow()).thenReturn(100 + ttl);
|
||||
checkListingContainsPath(guardedFs, testFile);
|
||||
} finally {
|
||||
// cleanup
|
||||
guardedFs.delete(base, true);
|
||||
guardedFs.setTtlTimeProvider(originalTimeProvider);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform an out-of-band delete.
|
||||
* @param testFilePath filename
|
||||
|
@ -18,9 +18,12 @@
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
@ -30,6 +33,7 @@
|
||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
@ -286,4 +290,70 @@ public void testCreateParentHasTombstone() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that listing of metadatas is filtered from expired items.
|
||||
*/
|
||||
@Test
|
||||
public void testListingFilteredExpiredItems() throws Exception {
|
||||
LOG.info("Authoritative mode: {}", authoritative);
|
||||
final S3AFileSystem fs = getFileSystem();
|
||||
|
||||
long oldTime = 100L;
|
||||
long newTime = 110L;
|
||||
long ttl = 9L;
|
||||
final String basedir = "testListingFilteredExpiredItems";
|
||||
final Path tombstonedPath = path(basedir + "/tombstonedPath");
|
||||
final Path baseDirPath = path(basedir);
|
||||
final List<Path> filesToCreate = new ArrayList<>();
|
||||
final MetadataStore ms = fs.getMetadataStore();
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
filesToCreate.add(path(basedir + "/file" + i));
|
||||
}
|
||||
|
||||
ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
|
||||
ITtlTimeProvider originalTimeProvider = fs.getTtlTimeProvider();
|
||||
|
||||
try {
|
||||
fs.setTtlTimeProvider(mockTimeProvider);
|
||||
when(mockTimeProvider.getMetadataTtl()).thenReturn(ttl);
|
||||
|
||||
// add and delete entry with the oldtime
|
||||
when(mockTimeProvider.getNow()).thenReturn(oldTime);
|
||||
touch(fs, tombstonedPath);
|
||||
fs.delete(tombstonedPath, false);
|
||||
|
||||
// create items with newTime
|
||||
when(mockTimeProvider.getNow()).thenReturn(newTime);
|
||||
for (Path path : filesToCreate) {
|
||||
touch(fs, path);
|
||||
}
|
||||
|
||||
// listing will contain the tombstone with oldtime
|
||||
when(mockTimeProvider.getNow()).thenReturn(oldTime);
|
||||
final DirListingMetadata fullDLM = ms.listChildren(baseDirPath);
|
||||
List<Path> containedPaths = fullDLM.getListing().stream()
|
||||
.map(pm -> pm.getFileStatus().getPath())
|
||||
.collect(Collectors.toList());
|
||||
Assertions.assertThat(containedPaths)
|
||||
.describedAs("Full listing of path %s", baseDirPath)
|
||||
.hasSize(11)
|
||||
.contains(tombstonedPath);
|
||||
|
||||
// listing will be filtered, and won't contain the tombstone with oldtime
|
||||
when(mockTimeProvider.getNow()).thenReturn(newTime);
|
||||
final DirListingMetadata filteredDLM = ms.listChildren(baseDirPath);
|
||||
containedPaths = filteredDLM.getListing().stream()
|
||||
.map(pm -> pm.getFileStatus().getPath())
|
||||
.collect(Collectors.toList());
|
||||
Assertions.assertThat(containedPaths)
|
||||
.describedAs("Full listing of path %s", baseDirPath)
|
||||
.hasSize(10)
|
||||
.doesNotContain(tombstonedPath);
|
||||
} finally {
|
||||
fs.delete(baseDirPath, true);
|
||||
fs.setTtlTimeProvider(originalTimeProvider);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1021,7 +1021,7 @@ private void testGetEmptyDirFlagCanSetTrueOrUnknown(boolean auth)
|
||||
}
|
||||
|
||||
// Test with non-authoritative listing, non-empty dir
|
||||
dlm.put(basicFileStatus(fileToPut, 1, false));
|
||||
dlm.put(new PathMetadata(basicFileStatus(fileToPut, 1, false)));
|
||||
ms.put(dlm, null);
|
||||
final PathMetadata pmdResultNotEmpty = ms.get(dirToPut, true);
|
||||
assertEquals(Tristate.FALSE, pmdResultNotEmpty.isEmptyDirectory());
|
||||
|
@ -109,7 +109,7 @@ public boolean supportsPruning() {
|
||||
/** The MetadataStore contract used to test against. */
|
||||
private AbstractMSContract contract;
|
||||
|
||||
private MetadataStore ms;
|
||||
protected MetadataStore ms;
|
||||
|
||||
/**
|
||||
* @return reference to the test contract.
|
||||
@ -554,7 +554,8 @@ public void testListChildrenAuthoritative() throws IOException {
|
||||
|
||||
DirListingMetadata dirMeta = ms.listChildren(strToPath("/a1/b1"));
|
||||
dirMeta.setAuthoritative(true);
|
||||
dirMeta.put(makeFileStatus("/a1/b1/file_new", 100));
|
||||
dirMeta.put(new PathMetadata(
|
||||
makeFileStatus("/a1/b1/file_new", 100)));
|
||||
ms.put(dirMeta, null);
|
||||
|
||||
dirMeta = ms.listChildren(strToPath("/a1/b1"));
|
||||
|
@ -22,6 +22,7 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
@ -198,7 +199,7 @@ public void testPut() {
|
||||
assertFalse(meta.isAuthoritative());
|
||||
PathMetadata pathMeta4 = new PathMetadata(
|
||||
new S3AFileStatus(true, new Path(path, "dir3"), TEST_OWNER));
|
||||
meta.put(pathMeta4.getFileStatus());
|
||||
meta.put(pathMeta4);
|
||||
assertTrue(meta.getListing().contains(pathMeta4));
|
||||
assertEquals(pathMeta4, meta.get(pathMeta4.getFileStatus().getPath()));
|
||||
}
|
||||
@ -218,7 +219,7 @@ public void testPutNullPath() {
|
||||
DirListingMetadata meta = new DirListingMetadata(path, null, false);
|
||||
exception.expect(NullPointerException.class);
|
||||
exception.expectMessage(notNullValue(String.class));
|
||||
meta.put(new S3AFileStatus(true, null, TEST_OWNER));
|
||||
meta.put(new PathMetadata(new S3AFileStatus(true, null, TEST_OWNER)));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -227,7 +228,8 @@ public void testPutRoot() {
|
||||
DirListingMetadata meta = new DirListingMetadata(path, null, false);
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
exception.expectMessage(notNullValue(String.class));
|
||||
meta.put(new S3AFileStatus(true, new Path("/"), TEST_OWNER));
|
||||
meta.put(new PathMetadata(new S3AFileStatus(true, new Path("/"),
|
||||
TEST_OWNER)));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -236,8 +238,8 @@ public void testPutNotChild() {
|
||||
DirListingMetadata meta = new DirListingMetadata(path, null, false);
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
exception.expectMessage(notNullValue(String.class));
|
||||
meta.put(new S3AFileStatus(true, new Path("/different/ancestor"),
|
||||
TEST_OWNER));
|
||||
meta.put(new PathMetadata(
|
||||
new S3AFileStatus(true, new Path("/different/ancestor"), TEST_OWNER)));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -291,6 +293,38 @@ public void testRemoveNotChild() {
|
||||
meta.remove(new Path("/different/ancestor"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testRemoveExpiredEntriesFromListing() {
|
||||
long ttl = 9;
|
||||
long oldTime = 100;
|
||||
long newTime = 110;
|
||||
long now = 110;
|
||||
|
||||
Path path = new Path("/path");
|
||||
PathMetadata pathMeta1 = new PathMetadata(
|
||||
new S3AFileStatus(true, new Path(path, "dir1"), TEST_OWNER));
|
||||
PathMetadata pathMeta2 = new PathMetadata(
|
||||
new S3AFileStatus(true, new Path(path, "dir2"), TEST_OWNER));
|
||||
PathMetadata pathMeta3 = new PathMetadata(
|
||||
new S3AFileStatus(123, 456, new Path(path, "file1"), 8192, TEST_OWNER,
|
||||
TEST_ETAG, TEST_VERSION_ID));
|
||||
pathMeta1.setLastUpdated(oldTime);
|
||||
pathMeta2.setLastUpdated(0);
|
||||
pathMeta3.setLastUpdated(newTime);
|
||||
|
||||
List<PathMetadata> listing = Arrays.asList(pathMeta1, pathMeta2, pathMeta3);
|
||||
DirListingMetadata meta = new DirListingMetadata(path, listing, false);
|
||||
|
||||
meta.removeExpiredEntriesFromListing(ttl, now);
|
||||
|
||||
Assertions.assertThat(meta.getListing())
|
||||
.describedAs("Metadata listing for %s", path)
|
||||
.doesNotContain(pathMeta1)
|
||||
.contains(pathMeta2)
|
||||
.contains(pathMeta3);
|
||||
}
|
||||
|
||||
/*
|
||||
* Create DirListingMetadata with two dirs and one file living in directory
|
||||
* 'parent'
|
||||
|
@ -19,11 +19,13 @@
|
||||
package org.apache.hadoop.fs.s3a.s3guard;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.base.Ticker;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -34,6 +36,9 @@
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
import org.apache.hadoop.fs.s3a.Tristate;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* MetadataStore unit test for {@link LocalMetadataStore}.
|
||||
*/
|
||||
@ -164,6 +169,41 @@ public void testCacheTimedEvictionAfterWrite() {
|
||||
assertNull("PathMetadata should be null after eviction", pm1);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testUpdateParentLastUpdatedOnPutNewParent() throws Exception {
|
||||
Assume.assumeTrue("This test only applies if metadatastore does not allow"
|
||||
+ " missing values (skip for NullMS).", !allowMissing());
|
||||
|
||||
ITtlTimeProvider tp = mock(ITtlTimeProvider.class);
|
||||
ITtlTimeProvider originalTimeProvider = getTtlTimeProvider();
|
||||
|
||||
long now = 100L;
|
||||
|
||||
final String parent = "/parentUpdated-" + UUID.randomUUID();
|
||||
final String child = parent + "/file1";
|
||||
|
||||
try {
|
||||
when(tp.getNow()).thenReturn(now);
|
||||
|
||||
// create a file
|
||||
ms.put(new PathMetadata(makeFileStatus(child, 100), tp.getNow()),
|
||||
null);
|
||||
final PathMetadata fileMeta = ms.get(strToPath(child));
|
||||
assertEquals("lastUpdated field of first file should be equal to the "
|
||||
+ "mocked value", now, fileMeta.getLastUpdated());
|
||||
|
||||
final DirListingMetadata listing = ms.listChildren(strToPath(parent));
|
||||
assertEquals("Listing lastUpdated field should be equal to the mocked "
|
||||
+ "time value.", now, listing.getLastUpdated());
|
||||
|
||||
} finally {
|
||||
ms.setTtlTimeProvider(originalTimeProvider);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static void populateMap(Cache<Path, LocalMetadataEntry> cache,
|
||||
String prefix) {
|
||||
populateEntry(cache, new Path(prefix + "/dirA/dirB/"));
|
||||
|
Loading…
Reference in New Issue
Block a user