From 97199baea1c41a66bd2a88bda31742ef6ddcb5dc Mon Sep 17 00:00:00 2001 From: Colin McCabe Date: Fri, 8 Nov 2013 03:00:19 +0000 Subject: [PATCH] HDFS-5394: Fix race conditions in DN caching and uncaching (cmccabe) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1539909 13f79535-47bb-0310-9956-ffa450edef68 --- dev-support/test-patch.sh | 4 +- .../apache/hadoop/io/nativeio/NativeIO.java | 40 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../apache/hadoop/hdfs/client/ClientMmap.java | 14 +- .../blockmanagement/DatanodeManager.java | 1 - .../fsdataset/impl/FsDatasetCache.java | 476 +++++++++++++----- .../fsdataset/impl/FsDatasetImpl.java | 94 ++-- .../datanode/fsdataset/impl/FsVolumeImpl.java | 3 +- .../fsdataset/impl/MappableBlock.java | 271 ++++------ .../server/datanode/TestFsDatasetCache.java | 146 ++++-- 10 files changed, 666 insertions(+), 385 deletions(-) diff --git a/dev-support/test-patch.sh b/dev-support/test-patch.sh index 9d7dff9a87..7143b51406 100755 --- a/dev-support/test-patch.sh +++ b/dev-support/test-patch.sh @@ -425,9 +425,9 @@ checkJavadocWarnings () { echo "" echo "There appear to be $javadocWarnings javadoc warnings generated by the patched build." - #There are 11 warnings that are caused by things that are caused by using sun internal APIs. + #There are 12 warnings that are caused by things that are caused by using sun internal APIs. #There are 2 warnings that are caused by the Apache DS Dn class used in MiniKdc. - OK_JAVADOC_WARNINGS=13; + OK_JAVADOC_WARNINGS=14; ### if current warnings greater than OK_JAVADOC_WARNINGS if [[ $javadocWarnings -ne $OK_JAVADOC_WARNINGS ]] ; then JIRA_COMMENT="$JIRA_COMMENT diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java index 3d6ce7b6c0..820f106897 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java @@ -23,7 +23,9 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.lang.reflect.Field; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -34,10 +36,11 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException; import org.apache.hadoop.util.NativeCodeLoader; import org.apache.hadoop.util.Shell; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import sun.misc.Unsafe; + import com.google.common.annotations.VisibleForTesting; /** @@ -271,6 +274,26 @@ public class NativeIO { } munlock_native(buffer, len); } + + /** + * Unmaps the block from memory. See munmap(2). + * + * There isn't any portable way to unmap a memory region in Java. + * So we use the sun.nio method here. + * Note that unmapping a memory region could cause crashes if code + * continues to reference the unmapped code. However, if we don't + * manually unmap the memory, we are dependent on the finalizer to + * do it, and we have no idea when the finalizer will run. + * + * @param buffer The buffer to unmap. + */ + public static void munmap(MappedByteBuffer buffer) { + if (buffer instanceof sun.nio.ch.DirectBuffer) { + sun.misc.Cleaner cleaner = + ((sun.nio.ch.DirectBuffer)buffer).cleaner(); + cleaner.clean(); + } + } /** Linux only methods used for getOwner() implementation */ private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException; @@ -539,6 +562,21 @@ public class NativeIO { private static native long getMemlockLimit0(); + /** + * @return the operating system's page size. + */ + public static long getOperatingSystemPageSize() { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + Unsafe unsafe = (Unsafe)f.get(null); + return unsafe.pageSize(); + } catch (Throwable e) { + LOG.warn("Unable to get operating system page size. Guessing 4096.", e); + return 4096; + } + } + private static class CachedUid { final long timestamp; final String username; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 9e77ab2b61..df99dbc26b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -359,6 +359,8 @@ Trunk (Unreleased) HDFS-5468. CacheAdmin help command does not recognize commands (Stephen Chu via Colin Patrick McCabe) + HDFS-5394. Fix race conditions in DN caching and uncaching (cmccabe) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java index 566c2b5457..91a62306f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java @@ -22,6 +22,7 @@ import java.io.FileInputStream; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.io.nativeio.NativeIO; import java.io.IOException; import java.lang.ref.WeakReference; @@ -147,20 +148,9 @@ public class ClientMmap { /** * Unmap the memory region. - * - * There isn't any portable way to unmap a memory region in Java. - * So we use the sun.nio method here. - * Note that unmapping a memory region could cause crashes if code - * continues to reference the unmapped code. However, if we don't - * manually unmap the memory, we are dependent on the finalizer to - * do it, and we have no idea when the finalizer will run. */ void unmap() { assert(refCount.get() == 0); - if (map instanceof sun.nio.ch.DirectBuffer) { - final sun.misc.Cleaner cleaner = - ((sun.nio.ch.DirectBuffer) map).cleaner(); - cleaner.clean(); - } + NativeIO.POSIX.munmap(map); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 2ecfde8c8b..bfbee7e45c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -47,7 +47,6 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.*; import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; import org.apache.hadoop.util.Daemon; -import org.apache.hadoop.util.IntrusiveCollection; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index 6a68e633e8..2af46bb891 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -18,24 +18,35 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.Map.Entry; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.io.nativeio.NativeIO; /** * Manages caching for an FsDatasetImpl by using the mmap(2) and mlock(2) @@ -45,178 +56,411 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @InterfaceAudience.Private @InterfaceStability.Unstable public class FsDatasetCache { + /** + * Keys which identify MappableBlocks. + */ + private static final class Key { + /** + * Block id. + */ + final long id; + + /** + * Block pool id. + */ + final String bpid; + + Key(long id, String bpid) { + this.id = id; + this.bpid = bpid; + } + + @Override + public boolean equals(Object o) { + if (o == null) { + return false; + } + if (!(o.getClass() == getClass())) { + return false; + } + Key other = (Key)o; + return ((other.id == this.id) && (other.bpid.equals(this.bpid))); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(id).append(bpid).hashCode(); + } + }; + + /** + * MappableBlocks that we know about. + */ + private static final class Value { + final State state; + final MappableBlock mappableBlock; + + Value(MappableBlock mappableBlock, State state) { + this.mappableBlock = mappableBlock; + this.state = state; + } + } + + private enum State { + /** + * The MappableBlock is in the process of being cached. + */ + CACHING, + + /** + * The MappableBlock was in the process of being cached, but it was + * cancelled. Only the FsDatasetCache#WorkerTask can remove cancelled + * MappableBlock objects. + */ + CACHING_CANCELLED, + + /** + * The MappableBlock is in the cache. + */ + CACHED, + + /** + * The MappableBlock is in the process of uncaching. + */ + UNCACHING; + + /** + * Whether we should advertise this block as cached to the NameNode and + * clients. + */ + public boolean shouldAdvertise() { + return (this == CACHED); + } + } private static final Log LOG = LogFactory.getLog(FsDatasetCache.class); /** - * Map of cached blocks + * Stores MappableBlock objects and the states they're in. */ - private final ConcurrentMap cachedBlocks; + private final HashMap mappableBlockMap = new HashMap(); private final FsDatasetImpl dataset; + + private final ThreadPoolExecutor uncachingExecutor; + /** - * Number of cached bytes + * The approximate amount of cache space in use. + * + * This number is an overestimate, counting bytes that will be used only + * if pending caching operations succeed. It does not take into account + * pending uncaching operations. + * + * This overestimate is more useful to the NameNode than an underestimate, + * since we don't want the NameNode to assign us more replicas than + * we can cache, because of the current batch of operations. */ - private AtomicLong usedBytes; + private final UsedBytesCount usedBytesCount; + + public static class PageRounder { + private final long osPageSize = NativeIO.getOperatingSystemPageSize(); + + /** + * Round up a number to the operating system page size. + */ + public long round(long count) { + long newCount = + (count + (osPageSize - 1)) / osPageSize; + return newCount * osPageSize; + } + } + + private class UsedBytesCount { + private final AtomicLong usedBytes = new AtomicLong(0); + + private PageRounder rounder = new PageRounder(); + + /** + * Try to reserve more bytes. + * + * @param count The number of bytes to add. We will round this + * up to the page size. + * + * @return The new number of usedBytes if we succeeded; + * -1 if we failed. + */ + long reserve(long count) { + count = rounder.round(count); + while (true) { + long cur = usedBytes.get(); + long next = cur + count; + if (next > maxBytes) { + return -1; + } + if (usedBytes.compareAndSet(cur, next)) { + return next; + } + } + } + + /** + * Release some bytes that we're using. + * + * @param count The number of bytes to release. We will round this + * up to the page size. + * + * @return The new number of usedBytes. + */ + long release(long count) { + count = rounder.round(count); + return usedBytes.addAndGet(-count); + } + + long get() { + return usedBytes.get(); + } + } + /** - * Total cache capacity in bytes + * The total cache capacity in bytes. */ private final long maxBytes; public FsDatasetCache(FsDatasetImpl dataset) { this.dataset = dataset; - this.cachedBlocks = new ConcurrentHashMap(); - this.usedBytes = new AtomicLong(0); this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory(); - } - - /** - * @return if the block is cached - */ - boolean isCached(String bpid, long blockId) { - MappableBlock mapBlock = cachedBlocks.get(blockId); - if (mapBlock != null) { - return mapBlock.getBlockPoolId().equals(bpid); - } - return false; + ThreadFactory workerFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("FsDatasetCache-%d-" + dataset.toString()) + .build(); + this.usedBytesCount = new UsedBytesCount(); + this.uncachingExecutor = new ThreadPoolExecutor( + 0, 1, + 60, TimeUnit.SECONDS, + new LinkedBlockingQueue(), + workerFactory); + this.uncachingExecutor.allowCoreThreadTimeOut(true); } /** * @return List of cached blocks suitable for translation into a * {@link BlockListAsLongs} for a cache report. */ - List getCachedBlocks(String bpid) { + synchronized List getCachedBlocks(String bpid) { List blocks = new ArrayList(); - // ConcurrentHashMap iteration doesn't see latest updates, which is okay - Iterator it = cachedBlocks.values().iterator(); - while (it.hasNext()) { - MappableBlock mapBlock = it.next(); - if (mapBlock.getBlockPoolId().equals(bpid)) { - blocks.add(mapBlock.getBlock().getBlockId()); + for (Iterator> iter = + mappableBlockMap.entrySet().iterator(); iter.hasNext(); ) { + Entry entry = iter.next(); + if (entry.getKey().bpid.equals(bpid)) { + if (entry.getValue().state.shouldAdvertise()) { + blocks.add(entry.getKey().id); + } } } return blocks; } /** - * Asynchronously attempts to cache a block. This is subject to the - * configured maximum locked memory limit. - * - * @param block block to cache - * @param volume volume of the block - * @param blockIn stream of the block's data file - * @param metaIn stream of the block's meta file + * Attempt to begin caching a block. */ - void cacheBlock(String bpid, Block block, FsVolumeImpl volume, - FileInputStream blockIn, FileInputStream metaIn) { - if (isCached(bpid, block.getBlockId())) { - return; - } - MappableBlock mapBlock = null; - try { - mapBlock = new MappableBlock(bpid, block, volume, blockIn, metaIn); - } catch (IOException e) { - LOG.warn("Failed to cache replica " + block + ": Could not instantiate" - + " MappableBlock", e); - IOUtils.closeQuietly(blockIn); - IOUtils.closeQuietly(metaIn); - return; - } - // Check if there's sufficient cache capacity - boolean success = false; - long bytes = mapBlock.getNumBytes(); - long used = usedBytes.get(); - while (used+bytes < maxBytes) { - if (usedBytes.compareAndSet(used, used+bytes)) { - success = true; - break; + synchronized void cacheBlock(long blockId, String bpid, + String blockFileName, long length, long genstamp, + Executor volumeExecutor) { + Key key = new Key(blockId, bpid); + Value prevValue = mappableBlockMap.get(key); + if (prevValue != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Block with id " + blockId + ", pool " + bpid + + " already exists in the FsDatasetCache with state " + + prevValue.state); } - used = usedBytes.get(); - } - if (!success) { - LOG.warn(String.format( - "Failed to cache replica %s: %s exceeded (%d + %d > %d)", - mapBlock.getBlock().toString(), - DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, - used, bytes, maxBytes)); - mapBlock.close(); return; } - // Submit it to the worker pool to be cached - volume.getExecutor().execute(new WorkerTask(mapBlock)); + mappableBlockMap.put(key, new Value(null, State.CACHING)); + volumeExecutor.execute( + new CachingTask(key, blockFileName, length, genstamp)); } - /** - * Uncaches a block if it is cached. - * @param blockId id to uncache - */ - void uncacheBlock(String bpid, long blockId) { - MappableBlock mapBlock = cachedBlocks.get(blockId); - if (mapBlock != null && - mapBlock.getBlockPoolId().equals(bpid) && - mapBlock.getBlock().getBlockId() == blockId) { - mapBlock.close(); - cachedBlocks.remove(blockId); - long bytes = mapBlock.getNumBytes(); - long used = usedBytes.get(); - while (!usedBytes.compareAndSet(used, used - bytes)) { - used = usedBytes.get(); + synchronized void uncacheBlock(String bpid, long blockId) { + Key key = new Key(blockId, bpid); + Value prevValue = mappableBlockMap.get(key); + + if (prevValue == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Block with id " + blockId + ", pool " + bpid + " " + + "does not need to be uncached, because it is not currently " + + "in the mappableBlockMap."); } - LOG.info("Successfully uncached block " + blockId); - } else { - LOG.info("Could not uncache block " + blockId + ": unknown block."); + return; + } + switch (prevValue.state) { + case CACHING: + if (LOG.isDebugEnabled()) { + LOG.debug("Cancelling caching for block with id " + blockId + + ", pool " + bpid + "."); + } + mappableBlockMap.put(key, + new Value(prevValue.mappableBlock, State.CACHING_CANCELLED)); + break; + case CACHED: + if (LOG.isDebugEnabled()) { + LOG.debug("Block with id " + blockId + ", pool " + bpid + " " + + "has been scheduled for uncaching."); + } + mappableBlockMap.put(key, + new Value(prevValue.mappableBlock, State.UNCACHING)); + uncachingExecutor.execute(new UncachingTask(key)); + break; + default: + if (LOG.isDebugEnabled()) { + LOG.debug("Block with id " + blockId + ", pool " + bpid + " " + + "does not need to be uncached, because it is " + + "in state " + prevValue.state + "."); + } + break; } } /** * Background worker that mmaps, mlocks, and checksums a block */ - private class WorkerTask implements Runnable { + private class CachingTask implements Runnable { + private final Key key; + private final String blockFileName; + private final long length; + private final long genstamp; - private MappableBlock block; - WorkerTask(MappableBlock block) { - this.block = block; + CachingTask(Key key, String blockFileName, long length, long genstamp) { + this.key = key; + this.blockFileName = blockFileName; + this.length = length; + this.genstamp = genstamp; } @Override public void run() { boolean success = false; - try { - block.map(); - block.lock(); - block.verifyChecksum(); - success = true; - } catch (ChecksumException e) { - // Exception message is bogus since this wasn't caused by a file read - LOG.warn("Failed to cache block " + block.getBlock() + ": Checksum " - + "verification failed."); - } catch (IOException e) { - LOG.warn("Failed to cache block " + block.getBlock() + ": IOException", - e); + FileInputStream blockIn = null, metaIn = null; + MappableBlock mappableBlock = null; + ExtendedBlock extBlk = + new ExtendedBlock(key.bpid, key.id, length, genstamp); + long newUsedBytes = usedBytesCount.reserve(length); + if (newUsedBytes < 0) { + LOG.warn("Failed to cache block id " + key.id + ", pool " + key.bpid + + ": could not reserve " + length + " more bytes in the " + + "cache: " + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + + " of " + maxBytes + " exceeded."); + return; } - // If we failed or the block became uncacheable in the meantime, - // clean up and return the reserved cache allocation - if (!success || - !dataset.validToCache(block.getBlockPoolId(), - block.getBlock().getBlockId())) { - block.close(); - long used = usedBytes.get(); - while (!usedBytes.compareAndSet(used, used-block.getNumBytes())) { - used = usedBytes.get(); + try { + try { + blockIn = (FileInputStream)dataset.getBlockInputStream(extBlk, 0); + metaIn = (FileInputStream)dataset.getMetaDataInputStream(extBlk) + .getWrappedStream(); + } catch (ClassCastException e) { + LOG.warn("Failed to cache block with id " + key.id + ", pool " + + key.bpid + ": Underlying blocks are not backed by files.", e); + return; + } catch (FileNotFoundException e) { + LOG.info("Failed to cache block with id " + key.id + ", pool " + + key.bpid + ": failed to find backing files."); + return; + } catch (IOException e) { + LOG.warn("Failed to cache block with id " + key.id + ", pool " + + key.bpid + ": failed to open file", e); + return; } - } else { - LOG.info("Successfully cached block " + block.getBlock()); - cachedBlocks.put(block.getBlock().getBlockId(), block); + try { + mappableBlock = MappableBlock. + load(length, blockIn, metaIn, blockFileName); + } catch (ChecksumException e) { + // Exception message is bogus since this wasn't caused by a file read + LOG.warn("Failed to cache block " + key.id + " in " + key.bpid + ": " + + "checksum verification failed."); + return; + } catch (IOException e) { + LOG.warn("Failed to cache block " + key.id + " in " + key.bpid, e); + return; + } + synchronized (FsDatasetCache.this) { + Value value = mappableBlockMap.get(key); + Preconditions.checkNotNull(value); + Preconditions.checkState(value.state == State.CACHING || + value.state == State.CACHING_CANCELLED); + if (value.state == State.CACHING_CANCELLED) { + mappableBlockMap.remove(key); + LOG.warn("Caching of block " + key.id + " in " + key.bpid + + " was cancelled."); + return; + } + mappableBlockMap.put(key, new Value(mappableBlock, State.CACHED)); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Successfully cached block " + key.id + " in " + key.bpid + + ". We are now caching " + newUsedBytes + " bytes in total."); + } + success = true; + } finally { + if (!success) { + newUsedBytes = usedBytesCount.release(length); + if (LOG.isDebugEnabled()) { + LOG.debug("Caching of block " + key.id + " in " + + key.bpid + " was aborted. We are now caching only " + + newUsedBytes + " + bytes in total."); + } + IOUtils.closeQuietly(blockIn); + IOUtils.closeQuietly(metaIn); + if (mappableBlock != null) { + mappableBlock.close(); + } + } + } + } + } + + private class UncachingTask implements Runnable { + private final Key key; + + UncachingTask(Key key) { + this.key = key; + } + + @Override + public void run() { + Value value; + + synchronized (FsDatasetCache.this) { + value = mappableBlockMap.get(key); + } + Preconditions.checkNotNull(value); + Preconditions.checkArgument(value.state == State.UNCACHING); + // TODO: we will eventually need to do revocation here if any clients + // are reading via mmap with checksums enabled. See HDFS-5182. + IOUtils.closeQuietly(value.mappableBlock); + synchronized (FsDatasetCache.this) { + mappableBlockMap.remove(key); + } + long newUsedBytes = + usedBytesCount.release(value.mappableBlock.getLength()); + if (LOG.isDebugEnabled()) { + LOG.debug("Uncaching of block " + key.id + " in " + key.bpid + + " completed. usedBytes = " + newUsedBytes); } } } // Stats related methods for FsDatasetMBean + /** + * Get the approximate amount of cache space used. + */ public long getDnCacheUsed() { - return usedBytes.get(); + return usedBytesCount.get(); } + /** + * Get the maximum amount of bytes we can cache. This is a constant. + */ public long getDnCacheCapacity() { return maxBytes; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 010be39518..65f57712ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -32,12 +32,12 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; import javax.management.StandardMBean; -import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -553,7 +553,7 @@ class FsDatasetImpl implements FsDatasetSpi { private synchronized ReplicaBeingWritten append(String bpid, FinalizedReplica replicaInfo, long newGS, long estimateBlockLen) throws IOException { - // uncache the block + // If the block is cached, start uncaching it. cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId()); // unlink the finalized replica replicaInfo.unlinkBlock(1); @@ -1168,10 +1168,11 @@ class FsDatasetImpl implements FsDatasetSpi { } volumeMap.remove(bpid, invalidBlks[i]); } - - // Uncache the block synchronously + // If the block is cached, start uncaching it. cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId()); - // Delete the block asynchronously to make sure we can do it fast enough + // Delete the block asynchronously to make sure we can do it fast enough. + // It's ok to unlink the block file before the uncache operation + // finishes. asyncDiskService.deleteAsync(v, f, FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()), new ExtendedBlock(bpid, invalidBlks[i])); @@ -1181,66 +1182,47 @@ class FsDatasetImpl implements FsDatasetSpi { } } - synchronized boolean validToCache(String bpid, long blockId) { - ReplicaInfo info = volumeMap.get(bpid, blockId); - if (info == null) { - LOG.warn("Failed to cache replica in block pool " + bpid + - " with block id " + blockId + ": ReplicaInfo not found."); - return false; - } - FsVolumeImpl volume = (FsVolumeImpl)info.getVolume(); - if (volume == null) { - LOG.warn("Failed to cache block with id " + blockId + - ": Volume not found."); - return false; - } - if (info.getState() != ReplicaState.FINALIZED) { - LOG.warn("Failed to block with id " + blockId + - ": Replica is not finalized."); - return false; - } - return true; - } - /** * Asynchronously attempts to cache a single block via {@link FsDatasetCache}. */ private void cacheBlock(String bpid, long blockId) { - ReplicaInfo info; FsVolumeImpl volume; + String blockFileName; + long length, genstamp; + Executor volumeExecutor; + synchronized (this) { - if (!validToCache(bpid, blockId)) { + ReplicaInfo info = volumeMap.get(bpid, blockId); + if (info == null) { + LOG.warn("Failed to cache block with id " + blockId + ", pool " + + bpid + ": ReplicaInfo not found."); return; } - info = volumeMap.get(bpid, blockId); - volume = (FsVolumeImpl)info.getVolume(); + if (info.getState() != ReplicaState.FINALIZED) { + LOG.warn("Failed to cache block with id " + blockId + ", pool " + + bpid + ": replica is not finalized; it is in state " + + info.getState()); + return; + } + try { + volume = (FsVolumeImpl)info.getVolume(); + if (volume == null) { + LOG.warn("Failed to cache block with id " + blockId + ", pool " + + bpid + ": volume not found."); + return; + } + } catch (ClassCastException e) { + LOG.warn("Failed to cache block with id " + blockId + + ": volume was not an instance of FsVolumeImpl."); + return; + } + blockFileName = info.getBlockFile().getAbsolutePath(); + length = info.getVisibleLength(); + genstamp = info.getGenerationStamp(); + volumeExecutor = volume.getCacheExecutor(); } - // Try to open block and meta streams - FileInputStream blockIn = null; - FileInputStream metaIn = null; - boolean success = false; - ExtendedBlock extBlk = - new ExtendedBlock(bpid, blockId, - info.getBytesOnDisk(), info.getGenerationStamp()); - try { - blockIn = (FileInputStream)getBlockInputStream(extBlk, 0); - metaIn = (FileInputStream)getMetaDataInputStream(extBlk) - .getWrappedStream(); - success = true; - } catch (ClassCastException e) { - LOG.warn("Failed to cache replica " + extBlk + ": Underlying blocks" - + " are not backed by files.", e); - } catch (IOException e) { - LOG.warn("Failed to cache replica " + extBlk + ": IOException while" - + " trying to open block or meta files.", e); - } - if (!success) { - IOUtils.closeQuietly(blockIn); - IOUtils.closeQuietly(metaIn); - return; - } - cacheManager.cacheBlock(bpid, extBlk.getLocalBlock(), - volume, blockIn, metaIn); + cacheManager.cacheBlock(blockId, bpid, + blockFileName, length, genstamp, volumeExecutor); } @Override // FsDatasetSpi diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index a2732ca3d5..33057dc743 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -196,7 +195,7 @@ class FsVolumeImpl implements FsVolumeSpi { return getBlockPoolSlice(bpid).addBlock(b, f); } - Executor getExecutor() { + Executor getCacheExecutor() { return cacheExecutor; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java index a2a9e6c5a2..09d2ed6d5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java @@ -28,149 +28,139 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumException; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.DataChecksum; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** - * Low-level wrapper for a Block and its backing files that provides mmap, - * mlock, and checksum verification operations. - * - * This could be a private class of FsDatasetCache, not meant for other users. + * Represents an HDFS block that is mmapped by the DataNode. */ @InterfaceAudience.Private @InterfaceStability.Unstable -class MappableBlock implements Closeable { - - private final String bpid; - private final Block block; - private final FsVolumeImpl volume; - - private final FileInputStream blockIn; - private final FileInputStream metaIn; - private final FileChannel blockChannel; - private final FileChannel metaChannel; - private final long blockSize; - - private boolean isMapped; - private boolean isLocked; - private boolean isChecksummed; - - private MappedByteBuffer blockMapped = null; - - public MappableBlock(String bpid, Block blk, FsVolumeImpl volume, - FileInputStream blockIn, FileInputStream metaIn) throws IOException { - this.bpid = bpid; - this.block = blk; - this.volume = volume; - - this.blockIn = blockIn; - this.metaIn = metaIn; - this.blockChannel = blockIn.getChannel(); - this.metaChannel = metaIn.getChannel(); - this.blockSize = blockChannel.size(); - - this.isMapped = false; - this.isLocked = false; - this.isChecksummed = false; +public class MappableBlock implements Closeable { + public static interface Mlocker { + void mlock(MappedByteBuffer mmap, long length) throws IOException; + } + + private static class PosixMlocker implements Mlocker { + public void mlock(MappedByteBuffer mmap, long length) + throws IOException { + NativeIO.POSIX.mlock(mmap, length); + } } - public String getBlockPoolId() { - return bpid; + @VisibleForTesting + public static Mlocker mlocker = new PosixMlocker(); + + private MappedByteBuffer mmap; + private final long length; + + MappableBlock(MappedByteBuffer mmap, long length) { + this.mmap = mmap; + this.length = length; + assert length > 0; } - public Block getBlock() { - return block; - } - - public FsVolumeImpl getVolume() { - return volume; - } - - public boolean isMapped() { - return isMapped; - } - - public boolean isLocked() { - return isLocked; - } - - public boolean isChecksummed() { - return isChecksummed; + public long getLength() { + return length; } /** - * Returns the number of bytes on disk for the block file + * Load the block. + * + * mmap and mlock the block, and then verify its checksum. + * + * @param length The current length of the block. + * @param blockIn The block input stream. Should be positioned at the + * start. The caller must close this. + * @param metaIn The meta file input stream. Should be positioned at + * the start. The caller must close this. + * @param blockFileName The block file name, for logging purposes. + * + * @return The Mappable block. */ - public long getNumBytes() { - return blockSize; + public static MappableBlock load(long length, + FileInputStream blockIn, FileInputStream metaIn, + String blockFileName) throws IOException { + MappableBlock mappableBlock = null; + MappedByteBuffer mmap = null; + try { + FileChannel blockChannel = blockIn.getChannel(); + if (blockChannel == null) { + throw new IOException("Block InputStream has no FileChannel."); + } + mmap = blockChannel.map(MapMode.READ_ONLY, 0, length); + mlocker.mlock(mmap, length); + verifyChecksum(length, metaIn, blockChannel, blockFileName); + mappableBlock = new MappableBlock(mmap, length); + } finally { + if (mappableBlock == null) { + if (mmap != null) { + NativeIO.POSIX.munmap(mmap); // unmapping also unlocks + } + } + } + return mappableBlock; } /** - * Maps the block into memory. See mmap(2). + * Verifies the block's checksum. This is an I/O intensive operation. + * @return if the block was successfully checksummed. */ - public void map() throws IOException { - if (isMapped) { - return; + private static void verifyChecksum(long length, + FileInputStream metaIn, FileChannel blockChannel, String blockFileName) + throws IOException, ChecksumException { + // Verify the checksum from the block's meta file + // Get the DataChecksum from the meta file header + BlockMetadataHeader header = + BlockMetadataHeader.readHeader(new DataInputStream( + new BufferedInputStream(metaIn, BlockMetadataHeader + .getHeaderSize()))); + FileChannel metaChannel = metaIn.getChannel(); + if (metaChannel == null) { + throw new IOException("Block InputStream meta file has no FileChannel."); } - blockMapped = blockChannel.map(MapMode.READ_ONLY, 0, blockSize); - isMapped = true; - } - - /** - * Unmaps the block from memory. See munmap(2). - */ - public void unmap() { - if (!isMapped) { - return; + DataChecksum checksum = header.getChecksum(); + final int bytesPerChecksum = checksum.getBytesPerChecksum(); + final int checksumSize = checksum.getChecksumSize(); + final int numChunks = (8*1024*1024) / bytesPerChecksum; + ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum); + ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize); + // Verify the checksum + int bytesVerified = 0; + while (bytesVerified < length) { + Preconditions.checkState(bytesVerified % bytesPerChecksum == 0, + "Unexpected partial chunk before EOF"); + assert bytesVerified % bytesPerChecksum == 0; + int bytesRead = fillBuffer(blockChannel, blockBuf); + if (bytesRead == -1) { + throw new IOException("checksum verification failed: premature EOF"); + } + blockBuf.flip(); + // Number of read chunks, including partial chunk at end + int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum; + checksumBuf.limit(chunks*checksumSize); + fillBuffer(metaChannel, checksumBuf); + checksumBuf.flip(); + checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName, + bytesVerified); + // Success + bytesVerified += bytesRead; + blockBuf.clear(); + checksumBuf.clear(); } - if (blockMapped instanceof sun.nio.ch.DirectBuffer) { - sun.misc.Cleaner cleaner = - ((sun.nio.ch.DirectBuffer)blockMapped).cleaner(); - cleaner.clean(); - } - isMapped = false; - isLocked = false; - isChecksummed = false; - } - - /** - * Locks the block into memory. This prevents the block from being paged out. - * See mlock(2). - */ - public void lock() throws IOException { - Preconditions.checkArgument(isMapped, - "Block must be mapped before it can be locked!"); - if (isLocked) { - return; - } - NativeIO.POSIX.mlock(blockMapped, blockSize); - isLocked = true; - } - - /** - * Unlocks the block from memory, allowing it to be paged out. See munlock(2). - */ - public void unlock() throws IOException { - if (!isLocked || !isMapped) { - return; - } - NativeIO.POSIX.munlock(blockMapped, blockSize); - isLocked = false; - isChecksummed = false; } /** * Reads bytes into a buffer until EOF or the buffer's limit is reached */ - private int fillBuffer(FileChannel channel, ByteBuffer buf) + private static int fillBuffer(FileChannel channel, ByteBuffer buf) throws IOException { int bytesRead = channel.read(buf); if (bytesRead < 0) { @@ -188,62 +178,11 @@ class MappableBlock implements Closeable { return bytesRead; } - /** - * Verifies the block's checksum. This is an I/O intensive operation. - * @return if the block was successfully checksummed. - */ - public void verifyChecksum() throws IOException, ChecksumException { - Preconditions.checkArgument(isLocked && isMapped, - "Block must be mapped and locked before checksum verification!"); - // skip if checksum has already been successfully verified - if (isChecksummed) { - return; - } - // Verify the checksum from the block's meta file - // Get the DataChecksum from the meta file header - metaChannel.position(0); - BlockMetadataHeader header = - BlockMetadataHeader.readHeader(new DataInputStream( - new BufferedInputStream(metaIn, BlockMetadataHeader - .getHeaderSize()))); - DataChecksum checksum = header.getChecksum(); - final int bytesPerChecksum = checksum.getBytesPerChecksum(); - final int checksumSize = checksum.getChecksumSize(); - final int numChunks = (8*1024*1024) / bytesPerChecksum; - ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum); - ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize); - // Verify the checksum - int bytesVerified = 0; - while (bytesVerified < blockChannel.size()) { - Preconditions.checkState(bytesVerified % bytesPerChecksum == 0, - "Unexpected partial chunk before EOF"); - assert bytesVerified % bytesPerChecksum == 0; - int bytesRead = fillBuffer(blockChannel, blockBuf); - if (bytesRead == -1) { - throw new IOException("Premature EOF"); - } - blockBuf.flip(); - // Number of read chunks, including partial chunk at end - int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum; - checksumBuf.limit(chunks*checksumSize); - fillBuffer(metaChannel, checksumBuf); - checksumBuf.flip(); - checksum.verifyChunkedSums(blockBuf, checksumBuf, block.getBlockName(), - bytesVerified); - // Success - bytesVerified += bytesRead; - blockBuf.clear(); - checksumBuf.clear(); - } - isChecksummed = true; - // Can close the backing file since everything is safely in memory - blockChannel.close(); - } - @Override public void close() { - unmap(); - IOUtils.closeQuietly(blockIn); - IOUtils.closeQuietly(metaIn); + if (mmap != null) { + NativeIO.POSIX.munmap(mmap); + mmap = null; + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index e6e87b9cb3..e889413ec4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -26,8 +26,11 @@ import static org.mockito.Mockito.doReturn; import java.io.FileInputStream; import java.io.IOException; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.HdfsBlockLocation; @@ -42,6 +45,8 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock; import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; @@ -52,12 +57,18 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Logger; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; + public class TestFsDatasetCache { + private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class); // Most Linux installs allow a default of 64KB locked memory private static final long CACHE_CAPACITY = 64 * 1024; @@ -71,12 +82,14 @@ public class TestFsDatasetCache { private static DataNode dn; private static FsDatasetSpi fsd; private static DatanodeProtocolClientSideTranslatorPB spyNN; + private static PageRounder rounder = new PageRounder(); @Before public void setUp() throws Exception { assumeTrue(!Path.WINDOWS); - assumeTrue(NativeIO.isAvailable()); + assumeTrue(NativeIO.getMemlockLimit() >= CACHE_CAPACITY); conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY); @@ -169,19 +182,34 @@ public class TestFsDatasetCache { * Blocks until cache usage hits the expected new value. */ private long verifyExpectedCacheUsage(final long expected) throws Exception { - long cacheUsed = fsd.getDnCacheUsed(); - while (cacheUsed != expected) { - cacheUsed = fsd.getDnCacheUsed(); - Thread.sleep(100); - } - assertEquals("Unexpected amount of cache used", expected, cacheUsed); - return cacheUsed; + GenericTestUtils.waitFor(new Supplier() { + private int tries = 0; + + @Override + public Boolean get() { + long curDnCacheUsed = fsd.getDnCacheUsed(); + if (curDnCacheUsed != expected) { + if (tries++ > 10) { + LOG.info("verifyExpectedCacheUsage: expected " + + expected + ", got " + curDnCacheUsed + "; " + + "memlock limit = " + NativeIO.getMemlockLimit() + + ". Waiting..."); + } + return false; + } + return true; + } + }, 100, 60000); + return expected; } - @Test(timeout=60000) + @Test(timeout=600000) public void testCacheAndUncacheBlock() throws Exception { + LOG.info("beginning testCacheAndUncacheBlock"); final int NUM_BLOCKS = 5; + verifyExpectedCacheUsage(0); + // Write a test file final Path testFile = new Path("/testCacheBlock"); final long testFileLen = BLOCK_SIZE*NUM_BLOCKS; @@ -211,15 +239,23 @@ public class TestFsDatasetCache { setHeartbeatResponse(uncacheBlock(locs[i])); current = verifyExpectedCacheUsage(current - blockSizes[i]); } + LOG.info("finishing testCacheAndUncacheBlock"); } - @Test(timeout=60000) + @Test(timeout=600000) public void testFilesExceedMaxLockedMemory() throws Exception { + LOG.info("beginning testFilesExceedMaxLockedMemory"); + + // We don't want to deal with page rounding issues, so skip this + // test if page size is weird + long osPageSize = NativeIO.getOperatingSystemPageSize(); + assumeTrue(osPageSize == 4096); + // Create some test files that will exceed total cache capacity - // Don't forget that meta files take up space too! - final int numFiles = 4; - final long fileSize = CACHE_CAPACITY / numFiles; - final Path[] testFiles = new Path[4]; + final int numFiles = 5; + final long fileSize = 15000; + + final Path[] testFiles = new Path[numFiles]; final HdfsBlockLocation[][] fileLocs = new HdfsBlockLocation[numFiles][]; final long[] fileSizes = new long[numFiles]; for (int i=0; i() { + @Override + public Boolean get() { + int lines = appender.countLinesWithMessage( + "more bytes in the cache: " + + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY); + return lines > 0; + } + }, 500, 30000); // Uncache the n-1 files for (int i=0; i