HADOOP-9757. Har metadata cache can grow without limit (Cristina Abad via daryn)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1512465 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2f988135e3
commit
deec7ca21a
@ -361,6 +361,8 @@ Release 2.1.1-beta - UNRELEASED
|
||||
HADOOP-9675. use svn:eol-style native for html to prevent line ending
|
||||
issues (Colin Patrick McCabe)
|
||||
|
||||
HADOOP-9757. Har metadata cache can grow without limit (Cristina Abad via daryn)
|
||||
|
||||
Release 2.1.0-beta - 2013-08-06
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -24,11 +24,12 @@
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URLDecoder;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -56,10 +57,12 @@ public class HarFileSystem extends FilterFileSystem {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(HarFileSystem.class);
|
||||
|
||||
public static final String METADATA_CACHE_ENTRIES_KEY = "fs.har.metadatacache.entries";
|
||||
public static final int METADATA_CACHE_ENTRIES_DEFAULT = 10;
|
||||
|
||||
public static final int VERSION = 3;
|
||||
|
||||
private static final Map<URI, HarMetaData> harMetaCache =
|
||||
new ConcurrentHashMap<URI, HarMetaData>();
|
||||
private static Map<URI, HarMetaData> harMetaCache;
|
||||
|
||||
// uri representation of this Har filesystem
|
||||
private URI uri;
|
||||
@ -99,6 +102,13 @@ public HarFileSystem(FileSystem fs) {
|
||||
super(fs);
|
||||
}
|
||||
|
||||
private synchronized void initializeMetadataCache(Configuration conf) {
|
||||
if (harMetaCache == null) {
|
||||
int cacheSize = conf.getInt(METADATA_CACHE_ENTRIES_KEY, METADATA_CACHE_ENTRIES_DEFAULT);
|
||||
harMetaCache = Collections.synchronizedMap(new LruCache<URI, HarMetaData>(cacheSize));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize a Har filesystem per har archive. The
|
||||
* archive home directory is the top level directory
|
||||
@ -114,6 +124,9 @@ public HarFileSystem(FileSystem fs) {
|
||||
*/
|
||||
@Override
|
||||
public void initialize(URI name, Configuration conf) throws IOException {
|
||||
// initialize the metadata cache, if needed
|
||||
initializeMetadataCache(conf);
|
||||
|
||||
// decode the name
|
||||
URI underLyingURI = decodeHarURI(name, conf);
|
||||
// we got the right har Path- now check if this is
|
||||
@ -1117,4 +1130,18 @@ private void parseMetaData() throws IOException {
|
||||
HarMetaData getMetadata() {
|
||||
return metadata;
|
||||
}
|
||||
|
||||
private static class LruCache<K, V> extends LinkedHashMap<K, V> {
|
||||
private final int MAX_ENTRIES;
|
||||
|
||||
public LruCache(int maxEntries) {
|
||||
super(maxEntries + 1, 1.0f, true);
|
||||
MAX_ENTRIES = maxEntries;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
|
||||
return size() > MAX_ENTRIES;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ private HarFileSystem createHarFileSysten(final Configuration conf)
|
||||
localFileSystem.createNewFile(masterIndexPath);
|
||||
assertTrue(localFileSystem.exists(masterIndexPath));
|
||||
|
||||
writeVersionToMasterIndexImpl(HarFileSystem.VERSION);
|
||||
writeVersionToMasterIndexImpl(HarFileSystem.VERSION, masterIndexPath);
|
||||
|
||||
final HarFileSystem harFileSystem = new HarFileSystem(localFileSystem);
|
||||
final URI uri = new URI("har://" + harPath.toString());
|
||||
@ -90,8 +90,25 @@ private HarFileSystem createHarFileSysten(final Configuration conf)
|
||||
return harFileSystem;
|
||||
}
|
||||
|
||||
private void writeVersionToMasterIndexImpl(int version) throws IOException {
|
||||
final Path masterIndexPath = new Path(harPath, "_masterindex");
|
||||
private HarFileSystem createHarFileSystem(final Configuration conf, Path aHarPath)
|
||||
throws Exception {
|
||||
localFileSystem.mkdirs(aHarPath);
|
||||
final Path indexPath = new Path(aHarPath, "_index");
|
||||
final Path masterIndexPath = new Path(aHarPath, "_masterindex");
|
||||
localFileSystem.createNewFile(indexPath);
|
||||
assertTrue(localFileSystem.exists(indexPath));
|
||||
localFileSystem.createNewFile(masterIndexPath);
|
||||
assertTrue(localFileSystem.exists(masterIndexPath));
|
||||
|
||||
writeVersionToMasterIndexImpl(HarFileSystem.VERSION, masterIndexPath);
|
||||
|
||||
final HarFileSystem harFileSystem = new HarFileSystem(localFileSystem);
|
||||
final URI uri = new URI("har://" + aHarPath.toString());
|
||||
harFileSystem.initialize(uri, conf);
|
||||
return harFileSystem;
|
||||
}
|
||||
|
||||
private void writeVersionToMasterIndexImpl(int version, Path masterIndexPath) throws IOException {
|
||||
// write Har version into the master index:
|
||||
final FSDataOutputStream fsdos = localFileSystem.create(masterIndexPath);
|
||||
try {
|
||||
@ -172,6 +189,29 @@ public void testPositiveNewHarFsOnTheSameUnderlyingFs() throws Exception {
|
||||
assertTrue(hfs.getMetadata() == harFileSystem.getMetadata());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPositiveLruMetadataCacheFs() throws Exception {
|
||||
// Init 2nd har file system on the same underlying FS, so the
|
||||
// metadata gets reused:
|
||||
HarFileSystem hfs = new HarFileSystem(localFileSystem);
|
||||
URI uri = new URI("har://" + harPath.toString());
|
||||
hfs.initialize(uri, new Configuration());
|
||||
// the metadata should be reused from cache:
|
||||
assertTrue(hfs.getMetadata() == harFileSystem.getMetadata());
|
||||
|
||||
// Create more hars, until the cache is full + 1; the last creation should evict the first entry from the cache
|
||||
for (int i = 0; i <= hfs.METADATA_CACHE_ENTRIES_DEFAULT; i++) {
|
||||
Path p = new Path(rootPath, "path1/path2/my" + i +".har");
|
||||
createHarFileSystem(conf, p);
|
||||
}
|
||||
|
||||
// The first entry should not be in the cache anymore:
|
||||
hfs = new HarFileSystem(localFileSystem);
|
||||
uri = new URI("har://" + harPath.toString());
|
||||
hfs.initialize(uri, new Configuration());
|
||||
assertTrue(hfs.getMetadata() != harFileSystem.getMetadata());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPositiveInitWithoutUnderlyingFS() throws Exception {
|
||||
// Init HarFS with no constructor arg, so that the underlying FS object
|
||||
@ -218,7 +258,7 @@ public void testNegativeInitWithAnUnsupportedVersion() throws Exception {
|
||||
// time with 1 second accuracy:
|
||||
Thread.sleep(1000);
|
||||
// write an unsupported version:
|
||||
writeVersionToMasterIndexImpl(7777);
|
||||
writeVersionToMasterIndexImpl(7777, new Path(harPath, "_masterindex"));
|
||||
// init the Har:
|
||||
final HarFileSystem hfs = new HarFileSystem(localFileSystem);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user