diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java index d58a695ec2..7a81795545 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java @@ -37,6 +37,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.slf4j.Logger; @@ -70,6 +72,16 @@ public class SingleFilePerBlockCache implements BlockCache { private final PrefetchingStatistics prefetchingStatistics; + /** + * Timeout to be used by close, while acquiring prefetch block write lock. + */ + private static final int PREFETCH_WRITE_LOCK_TIMEOUT = 5; + + /** + * Lock timeout unit to be used by the thread while acquiring prefetch block write lock. + */ + private static final TimeUnit PREFETCH_WRITE_LOCK_TIMEOUT_UNIT = TimeUnit.SECONDS; + /** * File attributes attached to any intermediate temporary file created during index creation. */ @@ -85,12 +97,18 @@ private static final class Entry { private final Path path; private final int size; private final long checksum; + private final ReentrantReadWriteLock lock; + private enum LockType { + READ, + WRITE + } Entry(int blockNumber, Path path, int size, long checksum) { this.blockNumber = blockNumber; this.path = path; this.size = size; this.checksum = checksum; + this.lock = new ReentrantReadWriteLock(); } @Override @@ -99,6 +117,54 @@ public String toString() { "([%03d] %s: size = %d, checksum = %d)", blockNumber, path, size, checksum); } + + /** + * Take the read or write lock. + * + * @param lockType type of the lock. + */ + private void takeLock(LockType lockType) { + if (LockType.READ == lockType) { + lock.readLock().lock(); + } else if (LockType.WRITE == lockType) { + lock.writeLock().lock(); + } + } + + /** + * Release the read or write lock. + * + * @param lockType type of the lock. + */ + private void releaseLock(LockType lockType) { + if (LockType.READ == lockType) { + lock.readLock().unlock(); + } else if (LockType.WRITE == lockType) { + lock.writeLock().unlock(); + } + } + + /** + * Try to take the read or write lock within the given timeout. + * + * @param lockType type of the lock. + * @param timeout the time to wait for the given lock. + * @param unit the time unit of the timeout argument. + * @return true if the lock of the given lock type was acquired. + */ + private boolean takeLock(LockType lockType, long timeout, TimeUnit unit) { + try { + if (LockType.READ == lockType) { + return lock.readLock().tryLock(timeout, unit); + } else if (LockType.WRITE == lockType) { + return lock.writeLock().tryLock(timeout, unit); + } + } catch (InterruptedException e) { + LOG.warn("Thread interrupted while trying to acquire {} lock", lockType, e); + Thread.currentThread().interrupt(); + } + return false; + } } /** @@ -148,11 +214,15 @@ public void get(int blockNumber, ByteBuffer buffer) throws IOException { checkNotNull(buffer, "buffer"); Entry entry = getEntry(blockNumber); - buffer.clear(); - readFile(entry.path, buffer); - buffer.rewind(); - - validateEntry(entry, buffer); + entry.takeLock(Entry.LockType.READ); + try { + buffer.clear(); + readFile(entry.path, buffer); + buffer.rewind(); + validateEntry(entry, buffer); + } finally { + entry.releaseLock(Entry.LockType.READ); + } } protected int readFile(Path path, ByteBuffer buffer) throws IOException { @@ -200,7 +270,12 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf, if (blocks.containsKey(blockNumber)) { Entry entry = blocks.get(blockNumber); - validateEntry(entry, buffer); + entry.takeLock(Entry.LockType.READ); + try { + validateEntry(entry, buffer); + } finally { + entry.releaseLock(Entry.LockType.READ); + } return; } @@ -268,12 +343,22 @@ public void close() throws IOException { int numFilesDeleted = 0; for (Entry entry : blocks.values()) { + boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT, + PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); + if (!lockAcquired) { + LOG.error("Cache file {} deletion would not be attempted as write lock could not" + + " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT, + PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); + continue; + } try { Files.deleteIfExists(entry.path); prefetchingStatistics.blockRemovedFromFileCache(); numFilesDeleted++; } catch (IOException e) { LOG.debug("Failed to delete cache file {}", entry.path, e); + } finally { + entry.releaseLock(Entry.LockType.WRITE); } }