diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java index 0061b567b7..742c41aac1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java @@ -21,7 +21,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -37,6 +40,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * This is a local, in-memory, implementation of MetadataStore. @@ -51,24 +55,35 @@ * This MetadataStore does not enforce filesystem rules such as disallowing * non-recursive removal of non-empty directories. It is assumed the caller * already has to perform these sorts of checks. + * + * Contains cache internally with time based eviction. */ public class LocalMetadataStore implements MetadataStore { public static final Logger LOG = LoggerFactory.getLogger(MetadataStore.class); - // TODO HADOOP-13649: use time instead of capacity for eviction. public static final int DEFAULT_MAX_RECORDS = 128; + public static final int DEFAULT_CACHE_ENTRY_TTL_MSEC = 10 * 1000; /** * Maximum number of records. */ + @InterfaceStability.Evolving public static final String CONF_MAX_RECORDS = "fs.metadatastore.local.max_records"; + /** + * Time to live in milliseconds. If zero, time-based expiration is + * disabled. + */ + @InterfaceStability.Evolving + public static final String CONF_CACHE_ENTRY_TTL = + "fs.metadatastore.local.ttl"; + /** Contains directories and files. */ - private LruHashMap fileHash; + private Cache fileCache; /** Contains directory listings. */ - private LruHashMap dirHash; + private Cache dirCache; private FileSystem fs; /* Null iff this FS does not have an associated URI host. */ @@ -94,9 +109,15 @@ public void initialize(Configuration conf) throws IOException { if (maxRecords < 4) { maxRecords = 4; } - // Start w/ less than max capacity. Space / time trade off. - fileHash = new LruHashMap<>(maxRecords/2, maxRecords); - dirHash = new LruHashMap<>(maxRecords/4, maxRecords); + int ttl = conf.getInt(CONF_CACHE_ENTRY_TTL, DEFAULT_CACHE_ENTRY_TTL_MSEC); + + CacheBuilder builder = CacheBuilder.newBuilder().maximumSize(maxRecords); + if (ttl >= 0) { + builder.expireAfterAccess(ttl, TimeUnit.MILLISECONDS); + } + + fileCache = builder.build(); + dirCache = builder.build(); } @Override @@ -130,12 +151,12 @@ private synchronized void doDelete(Path p, boolean recursive, boolean // Delete entry from file cache, then from cached parent directory, if any - deleteHashEntries(path, tombstone); + deleteCacheEntries(path, tombstone); if (recursive) { // Remove all entries that have this dir as path prefix. - deleteHashByAncestor(path, dirHash, tombstone); - deleteHashByAncestor(path, fileHash, tombstone); + deleteEntryByAncestor(path, dirCache, tombstone); + deleteEntryByAncestor(path, fileCache, tombstone); } } @@ -149,7 +170,7 @@ public PathMetadata get(Path p, boolean wantEmptyDirectoryFlag) throws IOException { Path path = standardize(p); synchronized (this) { - PathMetadata m = fileHash.mruGet(path); + PathMetadata m = fileCache.getIfPresent(path); if (wantEmptyDirectoryFlag && m != null && m.getFileStatus().isDirectory()) { @@ -170,7 +191,7 @@ public PathMetadata get(Path p, boolean wantEmptyDirectoryFlag) * @return TRUE / FALSE if known empty / not-empty, UNKNOWN otherwise. */ private Tristate isEmptyDirectory(Path p) { - DirListingMetadata dirMeta = dirHash.get(p); + DirListingMetadata dirMeta = dirCache.getIfPresent(p); return dirMeta.withoutTombstones().isEmpty(); } @@ -178,7 +199,7 @@ private Tristate isEmptyDirectory(Path p) { public synchronized DirListingMetadata listChildren(Path p) throws IOException { Path path = standardize(p); - DirListingMetadata listing = dirHash.mruGet(path); + DirListingMetadata listing = dirCache.getIfPresent(path); if (LOG.isDebugEnabled()) { LOG.debug("listChildren({}) -> {}", path, listing == null ? "null" : listing.prettyPrint()); @@ -237,10 +258,10 @@ public void put(PathMetadata meta) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("put {} -> {}", path, meta.prettyPrint()); } - fileHash.put(path, meta); + fileCache.put(path, meta); /* Directory case: - * We also make sure we have an entry in the dirHash, so subsequent + * We also make sure we have an entry in the dirCache, so subsequent * listStatus(path) at least see the directory. * * If we had a boolean flag argument "isNew", we would know whether this @@ -251,9 +272,9 @@ public void put(PathMetadata meta) throws IOException { */ if (status.isDirectory()) { - DirListingMetadata dir = dirHash.mruGet(path); + DirListingMetadata dir = dirCache.getIfPresent(path); if (dir == null) { - dirHash.put(path, new DirListingMetadata(path, DirListingMetadata + dirCache.put(path, new DirListingMetadata(path, DirListingMetadata .EMPTY_DIR, false)); } } @@ -261,14 +282,14 @@ public void put(PathMetadata meta) throws IOException { /* Update cached parent dir. */ Path parentPath = path.getParent(); if (parentPath != null) { - DirListingMetadata parent = dirHash.mruGet(parentPath); + DirListingMetadata parent = dirCache.getIfPresent(parentPath); if (parent == null) { /* Track this new file's listing in parent. Parent is not * authoritative, since there may be other items in it we don't know * about. */ parent = new DirListingMetadata(parentPath, DirListingMetadata.EMPTY_DIR, false); - dirHash.put(parentPath, parent); + dirCache.put(parentPath, parent); } parent.put(status); } @@ -280,7 +301,7 @@ public synchronized void put(DirListingMetadata meta) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("put dirMeta {}", meta.prettyPrint()); } - dirHash.put(standardize(meta.getPath()), meta); + dirCache.put(standardize(meta.getPath()), meta); put(meta.getListing()); } @@ -298,8 +319,8 @@ public void close() throws IOException { @Override public void destroy() throws IOException { - if (dirHash != null) { - dirHash.clear(); + if (dirCache != null) { + dirCache.invalidateAll(); } } @@ -312,7 +333,7 @@ public void prune(long modTime) throws IOException{ public synchronized void prune(long modTime, String keyPrefix) throws IOException { Iterator> files = - fileHash.entrySet().iterator(); + fileCache.asMap().entrySet().iterator(); while (files.hasNext()) { Map.Entry entry = files.next(); if (expired(entry.getValue().getFileStatus(), modTime, keyPrefix)) { @@ -320,7 +341,7 @@ public synchronized void prune(long modTime, String keyPrefix) } } Iterator> dirs = - dirHash.entrySet().iterator(); + dirCache.asMap().entrySet().iterator(); while (dirs.hasNext()) { Map.Entry entry = dirs.next(); Path path = entry.getKey(); @@ -335,9 +356,10 @@ public synchronized void prune(long modTime, String keyPrefix) } } if (newChildren.size() != oldChildren.size()) { - dirHash.put(path, new DirListingMetadata(path, newChildren, false)); + dirCache.put(path, new DirListingMetadata(path, newChildren, false)); if (!path.isRoot()) { - DirListingMetadata parent = dirHash.get(path.getParent()); + DirListingMetadata parent = null; + parent = dirCache.getIfPresent(path.getParent()); if (parent != null) { parent.setAuthoritative(false); } @@ -354,9 +376,9 @@ private boolean expired(FileStatus status, long expiry, String keyPrefix) { } @VisibleForTesting - static void deleteHashByAncestor(Path ancestor, Map hash, + static void deleteEntryByAncestor(Path ancestor, Cache cache, boolean tombstone) { - for (Iterator> it = hash.entrySet().iterator(); + for (Iterator> it = cache.asMap().entrySet().iterator(); it.hasNext();) { Map.Entry entry = it.next(); Path f = entry.getKey(); @@ -364,11 +386,11 @@ static void deleteHashByAncestor(Path ancestor, Map hash, if (isAncestorOf(ancestor, f)) { if (tombstone) { if (meta instanceof PathMetadata) { - entry.setValue((T) PathMetadata.tombstone(f)); + cache.put(f, (T) PathMetadata.tombstone(f)); } else if (meta instanceof DirListingMetadata) { it.remove(); } else { - throw new IllegalStateException("Unknown type in hash"); + throw new IllegalStateException("Unknown type in cache"); } } else { it.remove(); @@ -391,17 +413,17 @@ private static boolean isAncestorOf(Path ancestor, Path f) { } /** - * Update fileHash and dirHash to reflect deletion of file 'f'. Call with + * Update fileCache and dirCache to reflect deletion of file 'f'. Call with * lock held. */ - private void deleteHashEntries(Path path, boolean tombstone) { + private void deleteCacheEntries(Path path, boolean tombstone) { // Remove target file/dir LOG.debug("delete file entry for {}", path); if (tombstone) { - fileHash.put(path, PathMetadata.tombstone(path)); + fileCache.put(path, PathMetadata.tombstone(path)); } else { - fileHash.remove(path); + fileCache.invalidate(path); } // Update this and parent dir listing, if any @@ -409,12 +431,13 @@ private void deleteHashEntries(Path path, boolean tombstone) { /* If this path is a dir, remove its listing */ LOG.debug("removing listing of {}", path); - dirHash.remove(path); + dirCache.invalidate(path); /* Remove this path from parent's dir listing */ Path parent = path.getParent(); if (parent != null) { - DirListingMetadata dir = dirHash.get(parent); + DirListingMetadata dir = null; + dir = dirCache.getIfPresent(parent); if (dir != null) { LOG.debug("removing parent's entry for {} ", path); if (tombstone) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java deleted file mode 100644 index e355095062..0000000000 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs.s3a.s3guard; - -import java.util.LinkedHashMap; -import java.util.Map; - -/** - * LinkedHashMap that implements a maximum size and LRU eviction policy. - */ -public class LruHashMap extends LinkedHashMap { - private final int maxSize; - public LruHashMap(int initialCapacity, int maxSize) { - super(initialCapacity); - this.maxSize = maxSize; - } - - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > maxSize; - } - - /** - * get() plus side-effect of making the element Most Recently Used. - * @param key lookup key - * @return value - */ - - public V mruGet(K key) { - V val = remove(key); - if (val != null) { - put(key, val); - } - return val; - } -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java index 1b765afec2..074319f582 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java @@ -19,9 +19,11 @@ package org.apache.hadoop.fs.s3a.s3guard; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.TimeUnit; +import com.google.common.base.Ticker; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.junit.Test; import org.apache.hadoop.conf.Configuration; @@ -75,38 +77,105 @@ public AbstractMSContract createContract(Configuration conf) throws @Test public void testClearByAncestor() { - Map map = new HashMap<>(); + Cache cache = CacheBuilder.newBuilder().build(); // 1. Test paths without scheme/host - assertClearResult(map, "", "/", 0); - assertClearResult(map, "", "/dirA/dirB", 2); - assertClearResult(map, "", "/invalid", 5); + assertClearResult(cache, "", "/", 0); + assertClearResult(cache, "", "/dirA/dirB", 2); + assertClearResult(cache, "", "/invalid", 5); // 2. Test paths w/ scheme/host String p = "s3a://fake-bucket-name"; - assertClearResult(map, p, "/", 0); - assertClearResult(map, p, "/dirA/dirB", 2); - assertClearResult(map, p, "/invalid", 5); + assertClearResult(cache, p, "/", 0); + assertClearResult(cache, p, "/dirA/dirB", 2); + assertClearResult(cache, p, "/invalid", 5); } - private static void populateMap(Map map, + static class TestTicker extends Ticker { + private long myTicker = 0; + @Override public long read() { + return myTicker; + } + public void set(long val) { + this.myTicker = val; + } + + } + + /** + * Test that time eviction in cache used in {@link LocalMetadataStore} + * implementation working properly. + * + * The test creates a Ticker instance, which will be used to control the + * internal clock of the cache to achieve eviction without having to wait + * for the system clock. + * The test creates 3 entry: 2nd and 3rd entry will survive the eviction, + * because it will be created later than the 1st - using the ticker. + */ + @Test + public void testCacheTimedEvictionAfterWrite() { + TestTicker testTicker = new TestTicker(); + final long t0 = testTicker.read(); + final long t1 = t0 + 100; + final long t2 = t1 + 100; + + final long ttl = t1 + 50; // between t1 and t2 + + Cache cache = CacheBuilder.newBuilder() + .expireAfterWrite(ttl, + TimeUnit.NANOSECONDS /* nanos to avoid conversions */) + .ticker(testTicker) + .build(); + + String p = "s3a://fake-bucket-name"; + Path path1 = new Path(p + "/dirA/dirB/file1"); + Path path2 = new Path(p + "/dirA/dirB/file2"); + Path path3 = new Path(p + "/dirA/dirB/file3"); + + // Test time is t0 + populateEntry(cache, path1); + + // set new value on the ticker, so the next two entries will be added later + testTicker.set(t1); // Test time is now t1 + populateEntry(cache, path2); + populateEntry(cache, path3); + + assertEquals("Cache should contain 3 records before eviction", + 3, cache.size()); + PathMetadata pm1 = cache.getIfPresent(path1); + assertNotNull("PathMetadata should not be null before eviction", pm1); + + // set the ticker to a time when timed eviction should occur + // for the first entry + testTicker.set(t2); + + // call cleanup explicitly, as timed expiration is performed with + // periodic maintenance during writes and occasionally during reads only + cache.cleanUp(); + + assertEquals("Cache size should be 2 after eviction", 2, cache.size()); + pm1 = cache.getIfPresent(path1); + assertNull("PathMetadata should be null after eviction", pm1); + } + + private static void populateMap(Cache cache, String prefix) { - populateEntry(map, new Path(prefix + "/dirA/dirB/")); - populateEntry(map, new Path(prefix + "/dirA/dirB/dirC")); - populateEntry(map, new Path(prefix + "/dirA/dirB/dirC/file1")); - populateEntry(map, new Path(prefix + "/dirA/dirB/dirC/file2")); - populateEntry(map, new Path(prefix + "/dirA/file1")); + populateEntry(cache, new Path(prefix + "/dirA/dirB/")); + populateEntry(cache, new Path(prefix + "/dirA/dirB/dirC")); + populateEntry(cache, new Path(prefix + "/dirA/dirB/dirC/file1")); + populateEntry(cache, new Path(prefix + "/dirA/dirB/dirC/file2")); + populateEntry(cache, new Path(prefix + "/dirA/file1")); } - private static void populateEntry(Map map, + private static void populateEntry(Cache cache, Path path) { - map.put(path, new PathMetadata(new FileStatus(0, true, 0, 0, 0, path))); + cache.put(path, new PathMetadata(new FileStatus(0, true, 0, 0, 0, path))); } - private static int sizeOfMap(Map map) { + private static int sizeOfMap(Cache cache) { int count = 0; - for (PathMetadata meta : map.values()) { + for (PathMetadata meta : cache.asMap().values()) { if (!meta.isDeleted()) { count++; } @@ -114,14 +183,14 @@ private static int sizeOfMap(Map map) { return count; } - private static void assertClearResult(Map map, + private static void assertClearResult(Cache cache, String prefixStr, String pathStr, int leftoverSize) { - populateMap(map, prefixStr); - LocalMetadataStore.deleteHashByAncestor(new Path(prefixStr + pathStr), map, - true); - assertEquals(String.format("Map should have %d entries", leftoverSize), - leftoverSize, sizeOfMap(map)); - map.clear(); + populateMap(cache, prefixStr); + LocalMetadataStore.deleteEntryByAncestor(new Path(prefixStr + pathStr), + cache, true); + assertEquals(String.format("Cache should have %d entries", leftoverSize), + leftoverSize, sizeOfMap(cache)); + cache.invalidateAll(); } @Override