HDFS-4824. FileInputStreamCache.close leaves dangling reference to FileInputStreamCache.cacheCleaner. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1483641 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2013-05-17 03:39:18 +00:00
parent 4e419b3551
commit 9fdb117476
2 changed files with 40 additions and 13 deletions

View File

@ -951,6 +951,9 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4830. Typo in config settings for AvailableSpaceVolumeChoosingPolicy
in hdfs-default.xml. (atm)
HDFS-4824. FileInputStreamCache.close leaves dangling reference to
FileInputStreamCache.cacheCleaner. (Colin Patrick McCabe via todd)
BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS
HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.

View File

@ -17,10 +17,14 @@
*/
package org.apache.hadoop.hdfs;
import java.io.Closeable;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -80,17 +84,26 @@ class FileInputStreamCache {
* Expiry thread which makes sure that the file descriptors get closed
* after a while.
*/
class CacheCleaner implements Runnable {
private static class CacheCleaner implements Runnable, Closeable {
private WeakReference<FileInputStreamCache> cacheRef;
private ScheduledFuture<?> future;
CacheCleaner(FileInputStreamCache cache) {
this.cacheRef = new WeakReference<FileInputStreamCache>(cache);
}
@Override
public void run() {
synchronized(FileInputStreamCache.this) {
if (closed) return;
FileInputStreamCache cache = cacheRef.get();
if (cache == null) return;
synchronized(cache) {
if (cache.closed) return;
long curTime = Time.monotonicNow();
for (Iterator<Entry<Key, Value>> iter = map.entries().iterator();
iter.hasNext();
iter = map.entries().iterator()) {
for (Iterator<Entry<Key, Value>> iter =
cache.map.entries().iterator(); iter.hasNext();
iter = cache.map.entries().iterator()) {
Entry<Key, Value> entry = iter.next();
if (entry.getValue().getTime() + expiryTimeMs >= curTime) {
if (entry.getValue().getTime() + cache.expiryTimeMs >= curTime) {
break;
}
entry.getValue().close();
@ -98,6 +111,17 @@ public void run() {
}
}
}
@Override
public void close() throws IOException {
if (future != null) {
future.cancel(false);
}
}
public void setFuture(ScheduledFuture<?> future) {
this.future = future;
}
}
/**
@ -189,9 +213,11 @@ public void put(DatanodeID datanodeID, ExtendedBlock block,
iter.remove();
}
if (cacheCleaner == null) {
cacheCleaner = new CacheCleaner();
executor.scheduleAtFixedRate(cacheCleaner, expiryTimeMs, expiryTimeMs,
TimeUnit.MILLISECONDS);
cacheCleaner = new CacheCleaner(this);
ScheduledFuture<?> future =
executor.scheduleAtFixedRate(cacheCleaner, expiryTimeMs, expiryTimeMs,
TimeUnit.MILLISECONDS);
cacheCleaner.setFuture(future);
}
map.put(new Key(datanodeID, block), new Value(fis));
inserted = true;
@ -229,9 +255,7 @@ public synchronized FileInputStream[] get(DatanodeID datanodeID,
public synchronized void close() {
if (closed) return;
closed = true;
if (cacheCleaner != null) {
executor.remove(cacheCleaner);
}
IOUtils.cleanup(LOG, cacheCleaner);
for (Iterator<Entry<Key, Value>> iter = map.entries().iterator();
iter.hasNext();) {
Entry<Key, Value> entry = iter.next();