diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java index 7a81795545..e043fbd904 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java @@ -38,6 +38,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; @@ -68,7 +69,7 @@ public class SingleFilePerBlockCache implements BlockCache { */ private int numGets = 0; - private boolean closed; + private final AtomicBoolean closed; private final PrefetchingStatistics prefetchingStatistics; @@ -174,6 +175,7 @@ private boolean takeLock(LockType lockType, long timeout, TimeUnit unit) { */ public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) { this.prefetchingStatistics = requireNonNull(prefetchingStatistics); + this.closed = new AtomicBoolean(false); } /** @@ -207,7 +209,7 @@ public int size() { */ @Override public void get(int blockNumber, ByteBuffer buffer) throws IOException { - if (closed) { + if (closed.get()) { return; } @@ -262,7 +264,7 @@ private Entry getEntry(int blockNumber) { @Override public void put(int blockNumber, ByteBuffer buffer, Configuration conf, LocalDirAllocator localDirAllocator) throws IOException { - if (closed) { + if (closed.get()) { return; } @@ -333,37 +335,31 @@ protected Path getCacheFilePath(final Configuration conf, @Override public void close() throws IOException { - if (closed) { - return; - } + if (closed.compareAndSet(false, true)) { + LOG.debug(getStats()); + int numFilesDeleted = 0; - closed = true; - - LOG.info(getStats()); - int numFilesDeleted = 0; - - for (Entry entry : blocks.values()) { - boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT, - PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); - if (!lockAcquired) { - LOG.error("Cache file {} deletion would not be attempted as write lock could not" - + " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT, + for (Entry entry : blocks.values()) { + boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT, PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); - continue; + if (!lockAcquired) { + LOG.error("Cache file {} deletion would not be attempted as write lock could not" + + " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT, + PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); + continue; + } + try { + Files.deleteIfExists(entry.path); + prefetchingStatistics.blockRemovedFromFileCache(); + numFilesDeleted++; + } catch (IOException e) { + LOG.warn("Failed to delete cache file {}", entry.path, e); + } finally { + entry.releaseLock(Entry.LockType.WRITE); + } } - try { - Files.deleteIfExists(entry.path); - prefetchingStatistics.blockRemovedFromFileCache(); - numFilesDeleted++; - } catch (IOException e) { - LOG.debug("Failed to delete cache file {}", entry.path, e); - } finally { - entry.releaseLock(Entry.LockType.WRITE); - } - } - if (numFilesDeleted > 0) { - LOG.info("Deleted {} cache files", numFilesDeleted); + LOG.debug("Prefetch cache close: Deleted {} cache files", numFilesDeleted); } }