HADOOP-15423. Merge fileCache and dirCache into ine single cache in LocalMetadataStore. Contributed by Gabor Bota.

This commit is contained in:
Sean Mackrory 2018-06-25 11:04:34 -06:00
parent a55d6bba71
commit c687a6617d
4 changed files with 240 additions and 123 deletions

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.s3guard;
import javax.annotation.Nullable;
/**
* LocalMetadataEntry is used to store entries in the cache of
* LocalMetadataStore. PathMetadata or dirListingMetadata can be null. The
* entry is not immutable.
*/
public final class LocalMetadataEntry {
@Nullable
private PathMetadata pathMetadata;
@Nullable
private DirListingMetadata dirListingMetadata;
LocalMetadataEntry(PathMetadata pmd){
pathMetadata = pmd;
dirListingMetadata = null;
}
LocalMetadataEntry(DirListingMetadata dlm){
pathMetadata = null;
dirListingMetadata = dlm;
}
public PathMetadata getFileMeta() {
return pathMetadata;
}
public DirListingMetadata getDirListingMeta() {
return dirListingMetadata;
}
public boolean hasPathMeta() {
return this.pathMetadata != null;
}
public boolean hasDirMeta() {
return this.dirListingMetadata != null;
}
public void setPathMetadata(PathMetadata pathMetadata) {
this.pathMetadata = pathMetadata;
}
public void setDirListingMetadata(DirListingMetadata dirListingMetadata) {
this.dirListingMetadata = dirListingMetadata;
}
@Override public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("LocalMetadataEntry{");
if(pathMetadata != null) {
sb.append("pathMetadata=" + pathMetadata.getFileStatus().getPath());
}
if(dirListingMetadata != null){
sb.append("; dirListingMetadata=" + dirListingMetadata.getPath());
}
sb.append("}");
return sb.toString();
}
}

View File

@ -37,13 +37,12 @@
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* This is a local, in-memory, implementation of MetadataStore.
* This is a local, in-memory implementation of MetadataStore.
* This is <i>not</i> a coherent cache across processes. It is only
* locally-coherent.
*
@ -56,12 +55,12 @@
* non-recursive removal of non-empty directories. It is assumed the caller
* already has to perform these sorts of checks.
*
* Contains cache internally with time based eviction.
* Contains one cache internally with time based eviction.
*/
public class LocalMetadataStore implements MetadataStore {
public static final Logger LOG = LoggerFactory.getLogger(MetadataStore.class);
public static final int DEFAULT_MAX_RECORDS = 128;
public static final int DEFAULT_MAX_RECORDS = 256;
public static final int DEFAULT_CACHE_ENTRY_TTL_MSEC = 10 * 1000;
/**
@ -79,11 +78,8 @@ public class LocalMetadataStore implements MetadataStore {
public static final String CONF_CACHE_ENTRY_TTL =
"fs.metadatastore.local.ttl";
/** Contains directories and files. */
private Cache<Path, PathMetadata> fileCache;
/** Contains directory listings. */
private Cache<Path, DirListingMetadata> dirCache;
/** Contains directory and file listings. */
private Cache<Path, LocalMetadataEntry> localCache;
private FileSystem fs;
/* Null iff this FS does not have an associated URI host. */
@ -116,8 +112,7 @@ public void initialize(Configuration conf) throws IOException {
builder.expireAfterAccess(ttl, TimeUnit.MILLISECONDS);
}
fileCache = builder.build();
dirCache = builder.build();
localCache = builder.build();
}
@Override
@ -155,8 +150,7 @@ private synchronized void doDelete(Path p, boolean recursive, boolean
if (recursive) {
// Remove all entries that have this dir as path prefix.
deleteEntryByAncestor(path, dirCache, tombstone);
deleteEntryByAncestor(path, fileCache, tombstone);
deleteEntryByAncestor(path, localCache, tombstone);
}
}
@ -170,7 +164,7 @@ public PathMetadata get(Path p, boolean wantEmptyDirectoryFlag)
throws IOException {
Path path = standardize(p);
synchronized (this) {
PathMetadata m = fileCache.getIfPresent(path);
PathMetadata m = getFileMeta(path);
if (wantEmptyDirectoryFlag && m != null &&
m.getFileStatus().isDirectory()) {
@ -191,15 +185,15 @@ public PathMetadata get(Path p, boolean wantEmptyDirectoryFlag)
* @return TRUE / FALSE if known empty / not-empty, UNKNOWN otherwise.
*/
private Tristate isEmptyDirectory(Path p) {
DirListingMetadata dirMeta = dirCache.getIfPresent(p);
return dirMeta.withoutTombstones().isEmpty();
DirListingMetadata dlm = getDirListingMeta(p);
return dlm.withoutTombstones().isEmpty();
}
@Override
public synchronized DirListingMetadata listChildren(Path p) throws
IOException {
Path path = standardize(p);
DirListingMetadata listing = dirCache.getIfPresent(path);
DirListingMetadata listing = getDirListingMeta(path);
if (LOG.isDebugEnabled()) {
LOG.debug("listChildren({}) -> {}", path,
listing == null ? "null" : listing.prettyPrint());
@ -211,6 +205,7 @@ public synchronized DirListingMetadata listChildren(Path p) throws
@Override
public void move(Collection<Path> pathsToDelete,
Collection<PathMetadata> pathsToCreate) throws IOException {
LOG.info("Move {} to {}", pathsToDelete, pathsToCreate);
Preconditions.checkNotNull(pathsToDelete, "pathsToDelete is null");
Preconditions.checkNotNull(pathsToCreate, "pathsToCreate is null");
@ -258,7 +253,12 @@ public void put(PathMetadata meta) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("put {} -> {}", path, meta.prettyPrint());
}
fileCache.put(path, meta);
LocalMetadataEntry entry = localCache.getIfPresent(path);
if(entry == null){
entry = new LocalMetadataEntry(meta);
} else {
entry.setPathMetadata(meta);
}
/* Directory case:
* We also make sure we have an entry in the dirCache, so subsequent
@ -271,27 +271,32 @@ public void put(PathMetadata meta) throws IOException {
* saving round trips to underlying store for subsequent listStatus()
*/
if (status.isDirectory()) {
DirListingMetadata dir = dirCache.getIfPresent(path);
if (dir == null) {
dirCache.put(path, new DirListingMetadata(path, DirListingMetadata
.EMPTY_DIR, false));
}
// only create DirListingMetadata if the entry does not have one
if (status.isDirectory() && !entry.hasDirMeta()) {
DirListingMetadata dlm =
new DirListingMetadata(path, DirListingMetadata.EMPTY_DIR, false);
entry.setDirListingMetadata(dlm);
}
localCache.put(path, entry);
/* Update cached parent dir. */
Path parentPath = path.getParent();
if (parentPath != null) {
DirListingMetadata parent = dirCache.getIfPresent(parentPath);
if (parent == null) {
/* Track this new file's listing in parent. Parent is not
* authoritative, since there may be other items in it we don't know
* about. */
parent = new DirListingMetadata(parentPath,
DirListingMetadata.EMPTY_DIR, false);
dirCache.put(parentPath, parent);
LocalMetadataEntry parentMeta = localCache.getIfPresent(parentPath);
DirListingMetadata parentDirMeta =
new DirListingMetadata(parentPath, DirListingMetadata.EMPTY_DIR,
false);
parentDirMeta.put(status);
getDirListingMeta(parentPath);
if (parentMeta == null){
localCache.put(parentPath, new LocalMetadataEntry(parentDirMeta));
} else if (!parentMeta.hasDirMeta()) {
parentMeta.setDirListingMetadata(parentDirMeta);
} else {
parentMeta.getDirListingMeta().put(status);
}
parent.put(status);
}
}
}
@ -301,7 +306,13 @@ public synchronized void put(DirListingMetadata meta) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("put dirMeta {}", meta.prettyPrint());
}
dirCache.put(standardize(meta.getPath()), meta);
LocalMetadataEntry entry =
localCache.getIfPresent(standardize(meta.getPath()));
if(entry == null){
localCache.put(standardize(meta.getPath()), new LocalMetadataEntry(meta));
} else {
entry.setDirListingMetadata(meta);
}
put(meta.getListing());
}
@ -319,8 +330,8 @@ public void close() throws IOException {
@Override
public void destroy() throws IOException {
if (dirCache != null) {
dirCache.invalidateAll();
if (localCache != null) {
localCache.invalidateAll();
}
}
@ -330,42 +341,44 @@ public void prune(long modTime) throws IOException{
}
@Override
public synchronized void prune(long modTime, String keyPrefix)
throws IOException {
Iterator<Map.Entry<Path, PathMetadata>> files =
fileCache.asMap().entrySet().iterator();
while (files.hasNext()) {
Map.Entry<Path, PathMetadata> entry = files.next();
if (expired(entry.getValue().getFileStatus(), modTime, keyPrefix)) {
files.remove();
}
}
Iterator<Map.Entry<Path, DirListingMetadata>> dirs =
dirCache.asMap().entrySet().iterator();
while (dirs.hasNext()) {
Map.Entry<Path, DirListingMetadata> entry = dirs.next();
Path path = entry.getKey();
DirListingMetadata metadata = entry.getValue();
Collection<PathMetadata> oldChildren = metadata.getListing();
Collection<PathMetadata> newChildren = new LinkedList<>();
public synchronized void prune(long modTime, String keyPrefix) {
// prune files
// filter path_metadata (files), filter expired, remove expired
localCache.asMap().entrySet().stream()
.filter(entry -> entry.getValue().hasPathMeta())
.filter(entry -> expired(
entry.getValue().getFileMeta().getFileStatus(), modTime, keyPrefix))
.forEach(entry -> localCache.invalidate(entry.getKey()));
for (PathMetadata child : oldChildren) {
FileStatus status = child.getFileStatus();
if (!expired(status, modTime, keyPrefix)) {
newChildren.add(child);
}
}
if (newChildren.size() != oldChildren.size()) {
dirCache.put(path, new DirListingMetadata(path, newChildren, false));
if (!path.isRoot()) {
DirListingMetadata parent = null;
parent = dirCache.getIfPresent(path.getParent());
if (parent != null) {
parent.setAuthoritative(false);
// prune dirs
// filter DIR_LISTING_METADATA, remove expired, remove authoritative bit
localCache.asMap().entrySet().stream()
.filter(entry -> entry.getValue().hasDirMeta())
.forEach(entry -> {
Path path = entry.getKey();
DirListingMetadata metadata = entry.getValue().getDirListingMeta();
Collection<PathMetadata> oldChildren = metadata.getListing();
Collection<PathMetadata> newChildren = new LinkedList<>();
for (PathMetadata child : oldChildren) {
FileStatus status = child.getFileStatus();
if (!expired(status, modTime, keyPrefix)) {
newChildren.add(child);
}
}
}
}
}
if (newChildren.size() != oldChildren.size()) {
DirListingMetadata dlm =
new DirListingMetadata(path, newChildren, false);
localCache.put(path, new LocalMetadataEntry(dlm));
if (!path.isRoot()) {
DirListingMetadata parent = getDirListingMeta(path.getParent());
if (parent != null) {
parent.setAuthoritative(false);
}
}
}
});
}
private boolean expired(FileStatus status, long expiry, String keyPrefix) {
@ -390,31 +403,26 @@ private boolean expired(FileStatus status, long expiry, String keyPrefix) {
}
@VisibleForTesting
static <T> void deleteEntryByAncestor(Path ancestor, Cache<Path, T> cache,
boolean tombstone) {
for (Iterator<Map.Entry<Path, T>> it = cache.asMap().entrySet().iterator();
it.hasNext();) {
Map.Entry<Path, T> entry = it.next();
Path f = entry.getKey();
T meta = entry.getValue();
if (isAncestorOf(ancestor, f)) {
if (tombstone) {
if (meta instanceof PathMetadata) {
cache.put(f, (T) PathMetadata.tombstone(f));
} else if (meta instanceof DirListingMetadata) {
it.remove();
static void deleteEntryByAncestor(Path ancestor,
Cache<Path, LocalMetadataEntry> cache, boolean tombstone) {
cache.asMap().entrySet().stream()
.filter(entry -> isAncestorOf(ancestor, entry.getKey()))
.forEach(entry -> {
LocalMetadataEntry meta = entry.getValue();
Path path = entry.getKey();
if(meta.hasDirMeta()){
cache.invalidate(path);
} else if(tombstone && meta.hasPathMeta()){
meta.setPathMetadata(PathMetadata.tombstone(path));
} else {
throw new IllegalStateException("Unknown type in cache");
cache.invalidate(path);
}
} else {
it.remove();
}
}
}
});
}
/**
* @return true iff 'ancestor' is ancestor dir in path 'f'.
* @return true if 'ancestor' is ancestor dir in path 'f'.
* All paths here are absolute. Dir does not count as its own ancestor.
*/
private static boolean isAncestorOf(Path ancestor, Path f) {
@ -431,27 +439,41 @@ private static boolean isAncestorOf(Path ancestor, Path f) {
* lock held.
*/
private void deleteCacheEntries(Path path, boolean tombstone) {
// Remove target file/dir
LOG.debug("delete file entry for {}", path);
if (tombstone) {
fileCache.put(path, PathMetadata.tombstone(path));
} else {
fileCache.invalidate(path);
LocalMetadataEntry entry = localCache.getIfPresent(path);
// If there's no entry, delete should silently succeed
// (based on MetadataStoreTestBase#testDeleteNonExisting)
if(entry == null){
LOG.warn("Delete: path {} is missing from cache.", path);
return;
}
// Update this and parent dir listing, if any
// Remove target file entry
LOG.debug("delete file entry for {}", path);
if(entry.hasPathMeta()){
if (tombstone) {
PathMetadata pmd = PathMetadata.tombstone(path);
entry.setPathMetadata(pmd);
} else {
entry.setPathMetadata(null);
}
}
/* If this path is a dir, remove its listing */
LOG.debug("removing listing of {}", path);
// If this path is a dir, remove its listing
if(entry.hasDirMeta()) {
LOG.debug("removing listing of {}", path);
entry.setDirListingMetadata(null);
}
dirCache.invalidate(path);
// If the entry is empty (contains no dirMeta or pathMeta) remove it from
// the cache.
if(!entry.hasDirMeta() && !entry.hasPathMeta()){
localCache.invalidate(entry);
}
/* Remove this path from parent's dir listing */
Path parent = path.getParent();
if (parent != null) {
DirListingMetadata dir = null;
dir = dirCache.getIfPresent(parent);
DirListingMetadata dir = getDirListingMeta(parent);
if (dir != null) {
LOG.debug("removing parent's entry for {} ", path);
if (tombstone) {
@ -494,4 +516,23 @@ public Map<String, String> getDiagnostics() throws IOException {
public void updateParameters(Map<String, String> parameters)
throws IOException {
}
PathMetadata getFileMeta(Path p){
LocalMetadataEntry entry = localCache.getIfPresent(p);
if(entry != null && entry.hasPathMeta()){
return entry.getFileMeta();
} else {
return null;
}
}
DirListingMetadata getDirListingMeta(Path p){
LocalMetadataEntry entry = localCache.getIfPresent(p);
if(entry != null && entry.hasDirMeta()){
return entry.getDirListingMeta();
} else {
return null;
}
}
}

View File

@ -836,7 +836,7 @@ private void assertDirectorySize(String pathStr, int size)
throws IOException {
DirListingMetadata dirMeta = ms.listChildren(strToPath(pathStr));
if (!allowMissing()) {
assertNotNull("Directory " + pathStr + " in cache", dirMeta);
assertNotNull("Directory " + pathStr + " is null in cache", dirMeta);
}
if (!allowMissing() || dirMeta != null) {
dirMeta = dirMeta.withoutTombstones();

View File

@ -37,7 +37,6 @@
*/
public class TestLocalMetadataStore extends MetadataStoreTestBase {
private static final String MAX_ENTRIES_STR = "16";
private final static class LocalMSContract extends AbstractMSContract {
@ -48,7 +47,6 @@ private LocalMSContract() throws IOException {
}
private LocalMSContract(Configuration config) throws IOException {
config.set(LocalMetadataStore.CONF_MAX_RECORDS, MAX_ENTRIES_STR);
fs = FileSystem.getLocal(config);
}
@ -76,8 +74,8 @@ public AbstractMSContract createContract(Configuration conf) throws
}
@Test
public void testClearByAncestor() {
Cache<Path, PathMetadata> cache = CacheBuilder.newBuilder().build();
public void testClearByAncestor() throws Exception {
Cache<Path, LocalMetadataEntry> cache = CacheBuilder.newBuilder().build();
// 1. Test paths without scheme/host
assertClearResult(cache, "", "/", 0);
@ -122,7 +120,7 @@ public void testCacheTimedEvictionAfterWrite() {
final long ttl = t1 + 50; // between t1 and t2
Cache<Path, PathMetadata> cache = CacheBuilder.newBuilder()
Cache<Path, LocalMetadataEntry> cache = CacheBuilder.newBuilder()
.expireAfterWrite(ttl,
TimeUnit.NANOSECONDS /* nanos to avoid conversions */)
.ticker(testTicker)
@ -143,7 +141,7 @@ public void testCacheTimedEvictionAfterWrite() {
assertEquals("Cache should contain 3 records before eviction",
3, cache.size());
PathMetadata pm1 = cache.getIfPresent(path1);
LocalMetadataEntry pm1 = cache.getIfPresent(path1);
assertNotNull("PathMetadata should not be null before eviction", pm1);
// set the ticker to a time when timed eviction should occur
@ -159,7 +157,7 @@ public void testCacheTimedEvictionAfterWrite() {
assertNull("PathMetadata should be null after eviction", pm1);
}
private static void populateMap(Cache<Path, PathMetadata> cache,
private static void populateMap(Cache<Path, LocalMetadataEntry> cache,
String prefix) {
populateEntry(cache, new Path(prefix + "/dirA/dirB/"));
populateEntry(cache, new Path(prefix + "/dirA/dirB/dirC"));
@ -168,23 +166,20 @@ private static void populateMap(Cache<Path, PathMetadata> cache,
populateEntry(cache, new Path(prefix + "/dirA/file1"));
}
private static void populateEntry(Cache<Path, PathMetadata> cache,
private static void populateEntry(Cache<Path, LocalMetadataEntry> cache,
Path path) {
cache.put(path, new PathMetadata(new FileStatus(0, true, 0, 0, 0, path)));
FileStatus fileStatus = new FileStatus(0, true, 0, 0, 0, path);
cache.put(path, new LocalMetadataEntry(new PathMetadata(fileStatus)));
}
private static int sizeOfMap(Cache<Path, PathMetadata> cache) {
int count = 0;
for (PathMetadata meta : cache.asMap().values()) {
if (!meta.isDeleted()) {
count++;
}
}
return count;
private static long sizeOfMap(Cache<Path, LocalMetadataEntry> cache) {
return cache.asMap().values().stream()
.filter(entry -> !entry.getFileMeta().isDeleted())
.count();
}
private static void assertClearResult(Cache<Path, PathMetadata> cache,
String prefixStr, String pathStr, int leftoverSize) {
private static void assertClearResult(Cache<Path, LocalMetadataEntry> cache,
String prefixStr, String pathStr, int leftoverSize) throws IOException {
populateMap(cache, prefixStr);
LocalMetadataStore.deleteEntryByAncestor(new Path(prefixStr + pathStr),
cache, true);