diff --git a/dev-support/test-patch.sh b/dev-support/test-patch.sh index 9d7dff9a87..7143b51406 100755 --- a/dev-support/test-patch.sh +++ b/dev-support/test-patch.sh @@ -425,9 +425,9 @@ checkJavadocWarnings () { echo "" echo "There appear to be $javadocWarnings javadoc warnings generated by the patched build." - #There are 11 warnings that are caused by things that are caused by using sun internal APIs. + #There are 12 warnings that are caused by things that are caused by using sun internal APIs. #There are 2 warnings that are caused by the Apache DS Dn class used in MiniKdc. - OK_JAVADOC_WARNINGS=13; + OK_JAVADOC_WARNINGS=14; ### if current warnings greater than OK_JAVADOC_WARNINGS if [[ $javadocWarnings -ne $OK_JAVADOC_WARNINGS ]] ; then JIRA_COMMENT="$JIRA_COMMENT diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java index 3d6ce7b6c0..820f106897 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java @@ -23,7 +23,9 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.lang.reflect.Field; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -34,10 +36,11 @@ import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException; import org.apache.hadoop.util.NativeCodeLoader; import org.apache.hadoop.util.Shell; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import sun.misc.Unsafe; + import com.google.common.annotations.VisibleForTesting; /** @@ -271,6 +274,26 @@ public static void munlock(ByteBuffer buffer, long len) } munlock_native(buffer, len); } + + /** + * Unmaps the block from memory. See munmap(2). + * + * There isn't any portable way to unmap a memory region in Java. + * So we use the sun.nio method here. + * Note that unmapping a memory region could cause crashes if code + * continues to reference the unmapped code. However, if we don't + * manually unmap the memory, we are dependent on the finalizer to + * do it, and we have no idea when the finalizer will run. + * + * @param buffer The buffer to unmap. + */ + public static void munmap(MappedByteBuffer buffer) { + if (buffer instanceof sun.nio.ch.DirectBuffer) { + sun.misc.Cleaner cleaner = + ((sun.nio.ch.DirectBuffer)buffer).cleaner(); + cleaner.clean(); + } + } /** Linux only methods used for getOwner() implementation */ private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException; @@ -539,6 +562,21 @@ public static long getMemlockLimit() { private static native long getMemlockLimit0(); + /** + * @return the operating system's page size. + */ + public static long getOperatingSystemPageSize() { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + Unsafe unsafe = (Unsafe)f.get(null); + return unsafe.pageSize(); + } catch (Throwable e) { + LOG.warn("Unable to get operating system page size. Guessing 4096.", e); + return 4096; + } + } + private static class CachedUid { final long timestamp; final String username; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/LossyRetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/LossyRetryInvocationHandler.java index df5895553a..bdb6a614ef 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/LossyRetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/LossyRetryInvocationHandler.java @@ -18,9 +18,9 @@ package org.apache.hadoop.io.retry; import java.lang.reflect.Method; -import java.net.UnknownHostException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.ipc.RetriableException; /** * A dummy invocation handler extending RetryInvocationHandler. It drops the @@ -52,7 +52,7 @@ protected Object invokeMethod(Method method, Object[] args) throws Throwable { if (retryCount < this.numToDrop) { RetryCount.set(++retryCount); LOG.info("Drop the response. Current retryCount == " + retryCount); - throw new UnknownHostException("Fake Exception"); + throw new RetriableException("Fake Exception"); } else { LOG.info("retryCount == " + retryCount + ". It's time to normally process the response"); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java index aa1f3ec0fa..a248f22cfc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java @@ -558,27 +558,25 @@ public RetryAction shouldRetry(Exception e, int retries, isWrappedStandbyException(e)) { return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY, getFailoverOrRetrySleepTime(failovers)); - } else if (e instanceof SocketException || - (e instanceof IOException && !(e instanceof RemoteException))) { + } else if (e instanceof RetriableException + || getWrappedRetriableException(e) != null) { + // RetriableException or RetriableException wrapped + return new RetryAction(RetryAction.RetryDecision.RETRY, + getFailoverOrRetrySleepTime(retries)); + } else if (e instanceof SocketException + || (e instanceof IOException && !(e instanceof RemoteException))) { if (isIdempotentOrAtMostOnce) { return RetryAction.FAILOVER_AND_RETRY; } else { return new RetryAction(RetryAction.RetryDecision.FAIL, 0, - "the invoked method is not idempotent, and unable to determine " + - "whether it was invoked"); + "the invoked method is not idempotent, and unable to determine " + + "whether it was invoked"); } } else { - RetriableException re = getWrappedRetriableException(e); - if (re != null) { - return new RetryAction(RetryAction.RetryDecision.RETRY, - getFailoverOrRetrySleepTime(retries)); - } else { return fallbackPolicy.shouldRetry(e, retries, failovers, isIdempotentOrAtMostOnce); - } } } - } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 9e77ab2b61..de8f452562 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -359,6 +359,8 @@ Trunk (Unreleased) HDFS-5468. CacheAdmin help command does not recognize commands (Stephen Chu via Colin Patrick McCabe) + HDFS-5394. Fix race conditions in DN caching and uncaching (cmccabe) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES @@ -455,6 +457,9 @@ Release 2.3.0 - UNRELEASED HDFS-5436. Move HsFtpFileSystem and HFtpFileSystem into org.apache.hdfs.web (Haohui Mai via Arpit Agarwal) + HDFS-5371. Let client retry the same NN when + "dfs.client.test.drop.namenode.response.number" is enabled. (jing9) + OPTIMIZATIONS HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) @@ -506,6 +511,11 @@ Release 2.3.0 - UNRELEASED HDFS-5443. Delete 0-sized block when deleting an under-construction file that is included in snapshot. (jing9) + HDFS-5476. Snapshot: clean the blocks/files/directories under a renamed + file/directory while deletion. (jing9) + + HDFS-5325. Remove WebHdfsFileSystem#ConnRunner. (Haohui Mai via jing9) + Release 2.2.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java index 566c2b5457..91a62306f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.io.nativeio.NativeIO; import java.io.IOException; import java.lang.ref.WeakReference; @@ -147,20 +148,9 @@ public long getLastEvictableTimeNs() { /** * Unmap the memory region. - * - * There isn't any portable way to unmap a memory region in Java. - * So we use the sun.nio method here. - * Note that unmapping a memory region could cause crashes if code - * continues to reference the unmapped code. However, if we don't - * manually unmap the memory, we are dependent on the finalizer to - * do it, and we have no idea when the finalizer will run. */ void unmap() { assert(refCount.get() == 0); - if (map instanceof sun.nio.ch.DirectBuffer) { - final sun.misc.Cleaner cleaner = - ((sun.nio.ch.DirectBuffer) map).cleaner(); - cleaner.clean(); - } + NativeIO.POSIX.munmap(map); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index ec943a5a09..f5560b680b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -47,7 +47,6 @@ import org.apache.hadoop.net.*; import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; import org.apache.hadoop.util.Daemon; -import org.apache.hadoop.util.IntrusiveCollection; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index 6a68e633e8..2af46bb891 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -18,24 +18,35 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.Map.Entry; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.io.nativeio.NativeIO; /** * Manages caching for an FsDatasetImpl by using the mmap(2) and mlock(2) @@ -45,178 +56,411 @@ @InterfaceAudience.Private @InterfaceStability.Unstable public class FsDatasetCache { + /** + * Keys which identify MappableBlocks. + */ + private static final class Key { + /** + * Block id. + */ + final long id; + + /** + * Block pool id. + */ + final String bpid; + + Key(long id, String bpid) { + this.id = id; + this.bpid = bpid; + } + + @Override + public boolean equals(Object o) { + if (o == null) { + return false; + } + if (!(o.getClass() == getClass())) { + return false; + } + Key other = (Key)o; + return ((other.id == this.id) && (other.bpid.equals(this.bpid))); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(id).append(bpid).hashCode(); + } + }; + + /** + * MappableBlocks that we know about. + */ + private static final class Value { + final State state; + final MappableBlock mappableBlock; + + Value(MappableBlock mappableBlock, State state) { + this.mappableBlock = mappableBlock; + this.state = state; + } + } + + private enum State { + /** + * The MappableBlock is in the process of being cached. + */ + CACHING, + + /** + * The MappableBlock was in the process of being cached, but it was + * cancelled. Only the FsDatasetCache#WorkerTask can remove cancelled + * MappableBlock objects. + */ + CACHING_CANCELLED, + + /** + * The MappableBlock is in the cache. + */ + CACHED, + + /** + * The MappableBlock is in the process of uncaching. + */ + UNCACHING; + + /** + * Whether we should advertise this block as cached to the NameNode and + * clients. + */ + public boolean shouldAdvertise() { + return (this == CACHED); + } + } private static final Log LOG = LogFactory.getLog(FsDatasetCache.class); /** - * Map of cached blocks + * Stores MappableBlock objects and the states they're in. */ - private final ConcurrentMap cachedBlocks; + private final HashMap mappableBlockMap = new HashMap(); private final FsDatasetImpl dataset; + + private final ThreadPoolExecutor uncachingExecutor; + /** - * Number of cached bytes + * The approximate amount of cache space in use. + * + * This number is an overestimate, counting bytes that will be used only + * if pending caching operations succeed. It does not take into account + * pending uncaching operations. + * + * This overestimate is more useful to the NameNode than an underestimate, + * since we don't want the NameNode to assign us more replicas than + * we can cache, because of the current batch of operations. */ - private AtomicLong usedBytes; + private final UsedBytesCount usedBytesCount; + + public static class PageRounder { + private final long osPageSize = NativeIO.getOperatingSystemPageSize(); + + /** + * Round up a number to the operating system page size. + */ + public long round(long count) { + long newCount = + (count + (osPageSize - 1)) / osPageSize; + return newCount * osPageSize; + } + } + + private class UsedBytesCount { + private final AtomicLong usedBytes = new AtomicLong(0); + + private PageRounder rounder = new PageRounder(); + + /** + * Try to reserve more bytes. + * + * @param count The number of bytes to add. We will round this + * up to the page size. + * + * @return The new number of usedBytes if we succeeded; + * -1 if we failed. + */ + long reserve(long count) { + count = rounder.round(count); + while (true) { + long cur = usedBytes.get(); + long next = cur + count; + if (next > maxBytes) { + return -1; + } + if (usedBytes.compareAndSet(cur, next)) { + return next; + } + } + } + + /** + * Release some bytes that we're using. + * + * @param count The number of bytes to release. We will round this + * up to the page size. + * + * @return The new number of usedBytes. + */ + long release(long count) { + count = rounder.round(count); + return usedBytes.addAndGet(-count); + } + + long get() { + return usedBytes.get(); + } + } + /** - * Total cache capacity in bytes + * The total cache capacity in bytes. */ private final long maxBytes; public FsDatasetCache(FsDatasetImpl dataset) { this.dataset = dataset; - this.cachedBlocks = new ConcurrentHashMap(); - this.usedBytes = new AtomicLong(0); this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory(); - } - - /** - * @return if the block is cached - */ - boolean isCached(String bpid, long blockId) { - MappableBlock mapBlock = cachedBlocks.get(blockId); - if (mapBlock != null) { - return mapBlock.getBlockPoolId().equals(bpid); - } - return false; + ThreadFactory workerFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("FsDatasetCache-%d-" + dataset.toString()) + .build(); + this.usedBytesCount = new UsedBytesCount(); + this.uncachingExecutor = new ThreadPoolExecutor( + 0, 1, + 60, TimeUnit.SECONDS, + new LinkedBlockingQueue(), + workerFactory); + this.uncachingExecutor.allowCoreThreadTimeOut(true); } /** * @return List of cached blocks suitable for translation into a * {@link BlockListAsLongs} for a cache report. */ - List getCachedBlocks(String bpid) { + synchronized List getCachedBlocks(String bpid) { List blocks = new ArrayList(); - // ConcurrentHashMap iteration doesn't see latest updates, which is okay - Iterator it = cachedBlocks.values().iterator(); - while (it.hasNext()) { - MappableBlock mapBlock = it.next(); - if (mapBlock.getBlockPoolId().equals(bpid)) { - blocks.add(mapBlock.getBlock().getBlockId()); + for (Iterator> iter = + mappableBlockMap.entrySet().iterator(); iter.hasNext(); ) { + Entry entry = iter.next(); + if (entry.getKey().bpid.equals(bpid)) { + if (entry.getValue().state.shouldAdvertise()) { + blocks.add(entry.getKey().id); + } } } return blocks; } /** - * Asynchronously attempts to cache a block. This is subject to the - * configured maximum locked memory limit. - * - * @param block block to cache - * @param volume volume of the block - * @param blockIn stream of the block's data file - * @param metaIn stream of the block's meta file + * Attempt to begin caching a block. */ - void cacheBlock(String bpid, Block block, FsVolumeImpl volume, - FileInputStream blockIn, FileInputStream metaIn) { - if (isCached(bpid, block.getBlockId())) { - return; - } - MappableBlock mapBlock = null; - try { - mapBlock = new MappableBlock(bpid, block, volume, blockIn, metaIn); - } catch (IOException e) { - LOG.warn("Failed to cache replica " + block + ": Could not instantiate" - + " MappableBlock", e); - IOUtils.closeQuietly(blockIn); - IOUtils.closeQuietly(metaIn); - return; - } - // Check if there's sufficient cache capacity - boolean success = false; - long bytes = mapBlock.getNumBytes(); - long used = usedBytes.get(); - while (used+bytes < maxBytes) { - if (usedBytes.compareAndSet(used, used+bytes)) { - success = true; - break; + synchronized void cacheBlock(long blockId, String bpid, + String blockFileName, long length, long genstamp, + Executor volumeExecutor) { + Key key = new Key(blockId, bpid); + Value prevValue = mappableBlockMap.get(key); + if (prevValue != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Block with id " + blockId + ", pool " + bpid + + " already exists in the FsDatasetCache with state " + + prevValue.state); } - used = usedBytes.get(); - } - if (!success) { - LOG.warn(String.format( - "Failed to cache replica %s: %s exceeded (%d + %d > %d)", - mapBlock.getBlock().toString(), - DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, - used, bytes, maxBytes)); - mapBlock.close(); return; } - // Submit it to the worker pool to be cached - volume.getExecutor().execute(new WorkerTask(mapBlock)); + mappableBlockMap.put(key, new Value(null, State.CACHING)); + volumeExecutor.execute( + new CachingTask(key, blockFileName, length, genstamp)); } - /** - * Uncaches a block if it is cached. - * @param blockId id to uncache - */ - void uncacheBlock(String bpid, long blockId) { - MappableBlock mapBlock = cachedBlocks.get(blockId); - if (mapBlock != null && - mapBlock.getBlockPoolId().equals(bpid) && - mapBlock.getBlock().getBlockId() == blockId) { - mapBlock.close(); - cachedBlocks.remove(blockId); - long bytes = mapBlock.getNumBytes(); - long used = usedBytes.get(); - while (!usedBytes.compareAndSet(used, used - bytes)) { - used = usedBytes.get(); + synchronized void uncacheBlock(String bpid, long blockId) { + Key key = new Key(blockId, bpid); + Value prevValue = mappableBlockMap.get(key); + + if (prevValue == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Block with id " + blockId + ", pool " + bpid + " " + + "does not need to be uncached, because it is not currently " + + "in the mappableBlockMap."); } - LOG.info("Successfully uncached block " + blockId); - } else { - LOG.info("Could not uncache block " + blockId + ": unknown block."); + return; + } + switch (prevValue.state) { + case CACHING: + if (LOG.isDebugEnabled()) { + LOG.debug("Cancelling caching for block with id " + blockId + + ", pool " + bpid + "."); + } + mappableBlockMap.put(key, + new Value(prevValue.mappableBlock, State.CACHING_CANCELLED)); + break; + case CACHED: + if (LOG.isDebugEnabled()) { + LOG.debug("Block with id " + blockId + ", pool " + bpid + " " + + "has been scheduled for uncaching."); + } + mappableBlockMap.put(key, + new Value(prevValue.mappableBlock, State.UNCACHING)); + uncachingExecutor.execute(new UncachingTask(key)); + break; + default: + if (LOG.isDebugEnabled()) { + LOG.debug("Block with id " + blockId + ", pool " + bpid + " " + + "does not need to be uncached, because it is " + + "in state " + prevValue.state + "."); + } + break; } } /** * Background worker that mmaps, mlocks, and checksums a block */ - private class WorkerTask implements Runnable { + private class CachingTask implements Runnable { + private final Key key; + private final String blockFileName; + private final long length; + private final long genstamp; - private MappableBlock block; - WorkerTask(MappableBlock block) { - this.block = block; + CachingTask(Key key, String blockFileName, long length, long genstamp) { + this.key = key; + this.blockFileName = blockFileName; + this.length = length; + this.genstamp = genstamp; } @Override public void run() { boolean success = false; - try { - block.map(); - block.lock(); - block.verifyChecksum(); - success = true; - } catch (ChecksumException e) { - // Exception message is bogus since this wasn't caused by a file read - LOG.warn("Failed to cache block " + block.getBlock() + ": Checksum " - + "verification failed."); - } catch (IOException e) { - LOG.warn("Failed to cache block " + block.getBlock() + ": IOException", - e); + FileInputStream blockIn = null, metaIn = null; + MappableBlock mappableBlock = null; + ExtendedBlock extBlk = + new ExtendedBlock(key.bpid, key.id, length, genstamp); + long newUsedBytes = usedBytesCount.reserve(length); + if (newUsedBytes < 0) { + LOG.warn("Failed to cache block id " + key.id + ", pool " + key.bpid + + ": could not reserve " + length + " more bytes in the " + + "cache: " + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + + " of " + maxBytes + " exceeded."); + return; } - // If we failed or the block became uncacheable in the meantime, - // clean up and return the reserved cache allocation - if (!success || - !dataset.validToCache(block.getBlockPoolId(), - block.getBlock().getBlockId())) { - block.close(); - long used = usedBytes.get(); - while (!usedBytes.compareAndSet(used, used-block.getNumBytes())) { - used = usedBytes.get(); + try { + try { + blockIn = (FileInputStream)dataset.getBlockInputStream(extBlk, 0); + metaIn = (FileInputStream)dataset.getMetaDataInputStream(extBlk) + .getWrappedStream(); + } catch (ClassCastException e) { + LOG.warn("Failed to cache block with id " + key.id + ", pool " + + key.bpid + ": Underlying blocks are not backed by files.", e); + return; + } catch (FileNotFoundException e) { + LOG.info("Failed to cache block with id " + key.id + ", pool " + + key.bpid + ": failed to find backing files."); + return; + } catch (IOException e) { + LOG.warn("Failed to cache block with id " + key.id + ", pool " + + key.bpid + ": failed to open file", e); + return; } - } else { - LOG.info("Successfully cached block " + block.getBlock()); - cachedBlocks.put(block.getBlock().getBlockId(), block); + try { + mappableBlock = MappableBlock. + load(length, blockIn, metaIn, blockFileName); + } catch (ChecksumException e) { + // Exception message is bogus since this wasn't caused by a file read + LOG.warn("Failed to cache block " + key.id + " in " + key.bpid + ": " + + "checksum verification failed."); + return; + } catch (IOException e) { + LOG.warn("Failed to cache block " + key.id + " in " + key.bpid, e); + return; + } + synchronized (FsDatasetCache.this) { + Value value = mappableBlockMap.get(key); + Preconditions.checkNotNull(value); + Preconditions.checkState(value.state == State.CACHING || + value.state == State.CACHING_CANCELLED); + if (value.state == State.CACHING_CANCELLED) { + mappableBlockMap.remove(key); + LOG.warn("Caching of block " + key.id + " in " + key.bpid + + " was cancelled."); + return; + } + mappableBlockMap.put(key, new Value(mappableBlock, State.CACHED)); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Successfully cached block " + key.id + " in " + key.bpid + + ". We are now caching " + newUsedBytes + " bytes in total."); + } + success = true; + } finally { + if (!success) { + newUsedBytes = usedBytesCount.release(length); + if (LOG.isDebugEnabled()) { + LOG.debug("Caching of block " + key.id + " in " + + key.bpid + " was aborted. We are now caching only " + + newUsedBytes + " + bytes in total."); + } + IOUtils.closeQuietly(blockIn); + IOUtils.closeQuietly(metaIn); + if (mappableBlock != null) { + mappableBlock.close(); + } + } + } + } + } + + private class UncachingTask implements Runnable { + private final Key key; + + UncachingTask(Key key) { + this.key = key; + } + + @Override + public void run() { + Value value; + + synchronized (FsDatasetCache.this) { + value = mappableBlockMap.get(key); + } + Preconditions.checkNotNull(value); + Preconditions.checkArgument(value.state == State.UNCACHING); + // TODO: we will eventually need to do revocation here if any clients + // are reading via mmap with checksums enabled. See HDFS-5182. + IOUtils.closeQuietly(value.mappableBlock); + synchronized (FsDatasetCache.this) { + mappableBlockMap.remove(key); + } + long newUsedBytes = + usedBytesCount.release(value.mappableBlock.getLength()); + if (LOG.isDebugEnabled()) { + LOG.debug("Uncaching of block " + key.id + " in " + key.bpid + + " completed. usedBytes = " + newUsedBytes); } } } // Stats related methods for FsDatasetMBean + /** + * Get the approximate amount of cache space used. + */ public long getDnCacheUsed() { - return usedBytes.get(); + return usedBytesCount.get(); } + /** + * Get the maximum amount of bytes we can cache. This is a constant. + */ public long getDnCacheCapacity() { return maxBytes; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index c19768a508..f3e4dae2e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -32,12 +32,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; import javax.management.StandardMBean; -import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -599,7 +599,7 @@ public synchronized ReplicaInPipeline append(ExtendedBlock b, private synchronized ReplicaBeingWritten append(String bpid, FinalizedReplica replicaInfo, long newGS, long estimateBlockLen) throws IOException { - // uncache the block + // If the block is cached, start uncaching it. cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId()); // unlink the finalized replica replicaInfo.unlinkBlock(1); @@ -1244,10 +1244,11 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException { volumeMap.remove(bpid, invalidBlks[i]); perVolumeReplicaMap.get(v.getStorageID()).remove(bpid, invalidBlks[i]); } - - // Uncache the block synchronously + // If the block is cached, start uncaching it. cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId()); - // Delete the block asynchronously to make sure we can do it fast enough + // Delete the block asynchronously to make sure we can do it fast enough. + // It's ok to unlink the block file before the uncache operation + // finishes. asyncDiskService.deleteAsync(v, f, FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()), new ExtendedBlock(bpid, invalidBlks[i])); @@ -1257,66 +1258,47 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException { } } - synchronized boolean validToCache(String bpid, long blockId) { - ReplicaInfo info = volumeMap.get(bpid, blockId); - if (info == null) { - LOG.warn("Failed to cache replica in block pool " + bpid + - " with block id " + blockId + ": ReplicaInfo not found."); - return false; - } - FsVolumeImpl volume = (FsVolumeImpl)info.getVolume(); - if (volume == null) { - LOG.warn("Failed to cache block with id " + blockId + - ": Volume not found."); - return false; - } - if (info.getState() != ReplicaState.FINALIZED) { - LOG.warn("Failed to block with id " + blockId + - ": Replica is not finalized."); - return false; - } - return true; - } - /** * Asynchronously attempts to cache a single block via {@link FsDatasetCache}. */ private void cacheBlock(String bpid, long blockId) { - ReplicaInfo info; FsVolumeImpl volume; + String blockFileName; + long length, genstamp; + Executor volumeExecutor; + synchronized (this) { - if (!validToCache(bpid, blockId)) { + ReplicaInfo info = volumeMap.get(bpid, blockId); + if (info == null) { + LOG.warn("Failed to cache block with id " + blockId + ", pool " + + bpid + ": ReplicaInfo not found."); return; } - info = volumeMap.get(bpid, blockId); - volume = (FsVolumeImpl)info.getVolume(); + if (info.getState() != ReplicaState.FINALIZED) { + LOG.warn("Failed to cache block with id " + blockId + ", pool " + + bpid + ": replica is not finalized; it is in state " + + info.getState()); + return; + } + try { + volume = (FsVolumeImpl)info.getVolume(); + if (volume == null) { + LOG.warn("Failed to cache block with id " + blockId + ", pool " + + bpid + ": volume not found."); + return; + } + } catch (ClassCastException e) { + LOG.warn("Failed to cache block with id " + blockId + + ": volume was not an instance of FsVolumeImpl."); + return; + } + blockFileName = info.getBlockFile().getAbsolutePath(); + length = info.getVisibleLength(); + genstamp = info.getGenerationStamp(); + volumeExecutor = volume.getCacheExecutor(); } - // Try to open block and meta streams - FileInputStream blockIn = null; - FileInputStream metaIn = null; - boolean success = false; - ExtendedBlock extBlk = - new ExtendedBlock(bpid, blockId, - info.getBytesOnDisk(), info.getGenerationStamp()); - try { - blockIn = (FileInputStream)getBlockInputStream(extBlk, 0); - metaIn = (FileInputStream)getMetaDataInputStream(extBlk) - .getWrappedStream(); - success = true; - } catch (ClassCastException e) { - LOG.warn("Failed to cache replica " + extBlk + ": Underlying blocks" - + " are not backed by files.", e); - } catch (IOException e) { - LOG.warn("Failed to cache replica " + extBlk + ": IOException while" - + " trying to open block or meta files.", e); - } - if (!success) { - IOUtils.closeQuietly(blockIn); - IOUtils.closeQuietly(metaIn); - return; - } - cacheManager.cacheBlock(bpid, extBlk.getLocalBlock(), - volume, blockIn, metaIn); + cacheManager.cacheBlock(blockId, bpid, + blockFileName, length, genstamp, volumeExecutor); } @Override // FsDatasetSpi diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index f31d312447..b7e2ccd10d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -199,7 +198,7 @@ File addBlock(String bpid, Block b, File f) throws IOException { return getBlockPoolSlice(bpid).addBlock(b, f); } - Executor getExecutor() { + Executor getCacheExecutor() { return cacheExecutor; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java index a2a9e6c5a2..09d2ed6d5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java @@ -28,149 +28,139 @@ import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumException; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.DataChecksum; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** - * Low-level wrapper for a Block and its backing files that provides mmap, - * mlock, and checksum verification operations. - * - * This could be a private class of FsDatasetCache, not meant for other users. + * Represents an HDFS block that is mmapped by the DataNode. */ @InterfaceAudience.Private @InterfaceStability.Unstable -class MappableBlock implements Closeable { - - private final String bpid; - private final Block block; - private final FsVolumeImpl volume; - - private final FileInputStream blockIn; - private final FileInputStream metaIn; - private final FileChannel blockChannel; - private final FileChannel metaChannel; - private final long blockSize; - - private boolean isMapped; - private boolean isLocked; - private boolean isChecksummed; - - private MappedByteBuffer blockMapped = null; - - public MappableBlock(String bpid, Block blk, FsVolumeImpl volume, - FileInputStream blockIn, FileInputStream metaIn) throws IOException { - this.bpid = bpid; - this.block = blk; - this.volume = volume; - - this.blockIn = blockIn; - this.metaIn = metaIn; - this.blockChannel = blockIn.getChannel(); - this.metaChannel = metaIn.getChannel(); - this.blockSize = blockChannel.size(); - - this.isMapped = false; - this.isLocked = false; - this.isChecksummed = false; +public class MappableBlock implements Closeable { + public static interface Mlocker { + void mlock(MappedByteBuffer mmap, long length) throws IOException; + } + + private static class PosixMlocker implements Mlocker { + public void mlock(MappedByteBuffer mmap, long length) + throws IOException { + NativeIO.POSIX.mlock(mmap, length); + } } - public String getBlockPoolId() { - return bpid; + @VisibleForTesting + public static Mlocker mlocker = new PosixMlocker(); + + private MappedByteBuffer mmap; + private final long length; + + MappableBlock(MappedByteBuffer mmap, long length) { + this.mmap = mmap; + this.length = length; + assert length > 0; } - public Block getBlock() { - return block; - } - - public FsVolumeImpl getVolume() { - return volume; - } - - public boolean isMapped() { - return isMapped; - } - - public boolean isLocked() { - return isLocked; - } - - public boolean isChecksummed() { - return isChecksummed; + public long getLength() { + return length; } /** - * Returns the number of bytes on disk for the block file + * Load the block. + * + * mmap and mlock the block, and then verify its checksum. + * + * @param length The current length of the block. + * @param blockIn The block input stream. Should be positioned at the + * start. The caller must close this. + * @param metaIn The meta file input stream. Should be positioned at + * the start. The caller must close this. + * @param blockFileName The block file name, for logging purposes. + * + * @return The Mappable block. */ - public long getNumBytes() { - return blockSize; + public static MappableBlock load(long length, + FileInputStream blockIn, FileInputStream metaIn, + String blockFileName) throws IOException { + MappableBlock mappableBlock = null; + MappedByteBuffer mmap = null; + try { + FileChannel blockChannel = blockIn.getChannel(); + if (blockChannel == null) { + throw new IOException("Block InputStream has no FileChannel."); + } + mmap = blockChannel.map(MapMode.READ_ONLY, 0, length); + mlocker.mlock(mmap, length); + verifyChecksum(length, metaIn, blockChannel, blockFileName); + mappableBlock = new MappableBlock(mmap, length); + } finally { + if (mappableBlock == null) { + if (mmap != null) { + NativeIO.POSIX.munmap(mmap); // unmapping also unlocks + } + } + } + return mappableBlock; } /** - * Maps the block into memory. See mmap(2). + * Verifies the block's checksum. This is an I/O intensive operation. + * @return if the block was successfully checksummed. */ - public void map() throws IOException { - if (isMapped) { - return; + private static void verifyChecksum(long length, + FileInputStream metaIn, FileChannel blockChannel, String blockFileName) + throws IOException, ChecksumException { + // Verify the checksum from the block's meta file + // Get the DataChecksum from the meta file header + BlockMetadataHeader header = + BlockMetadataHeader.readHeader(new DataInputStream( + new BufferedInputStream(metaIn, BlockMetadataHeader + .getHeaderSize()))); + FileChannel metaChannel = metaIn.getChannel(); + if (metaChannel == null) { + throw new IOException("Block InputStream meta file has no FileChannel."); } - blockMapped = blockChannel.map(MapMode.READ_ONLY, 0, blockSize); - isMapped = true; - } - - /** - * Unmaps the block from memory. See munmap(2). - */ - public void unmap() { - if (!isMapped) { - return; + DataChecksum checksum = header.getChecksum(); + final int bytesPerChecksum = checksum.getBytesPerChecksum(); + final int checksumSize = checksum.getChecksumSize(); + final int numChunks = (8*1024*1024) / bytesPerChecksum; + ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum); + ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize); + // Verify the checksum + int bytesVerified = 0; + while (bytesVerified < length) { + Preconditions.checkState(bytesVerified % bytesPerChecksum == 0, + "Unexpected partial chunk before EOF"); + assert bytesVerified % bytesPerChecksum == 0; + int bytesRead = fillBuffer(blockChannel, blockBuf); + if (bytesRead == -1) { + throw new IOException("checksum verification failed: premature EOF"); + } + blockBuf.flip(); + // Number of read chunks, including partial chunk at end + int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum; + checksumBuf.limit(chunks*checksumSize); + fillBuffer(metaChannel, checksumBuf); + checksumBuf.flip(); + checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName, + bytesVerified); + // Success + bytesVerified += bytesRead; + blockBuf.clear(); + checksumBuf.clear(); } - if (blockMapped instanceof sun.nio.ch.DirectBuffer) { - sun.misc.Cleaner cleaner = - ((sun.nio.ch.DirectBuffer)blockMapped).cleaner(); - cleaner.clean(); - } - isMapped = false; - isLocked = false; - isChecksummed = false; - } - - /** - * Locks the block into memory. This prevents the block from being paged out. - * See mlock(2). - */ - public void lock() throws IOException { - Preconditions.checkArgument(isMapped, - "Block must be mapped before it can be locked!"); - if (isLocked) { - return; - } - NativeIO.POSIX.mlock(blockMapped, blockSize); - isLocked = true; - } - - /** - * Unlocks the block from memory, allowing it to be paged out. See munlock(2). - */ - public void unlock() throws IOException { - if (!isLocked || !isMapped) { - return; - } - NativeIO.POSIX.munlock(blockMapped, blockSize); - isLocked = false; - isChecksummed = false; } /** * Reads bytes into a buffer until EOF or the buffer's limit is reached */ - private int fillBuffer(FileChannel channel, ByteBuffer buf) + private static int fillBuffer(FileChannel channel, ByteBuffer buf) throws IOException { int bytesRead = channel.read(buf); if (bytesRead < 0) { @@ -188,62 +178,11 @@ private int fillBuffer(FileChannel channel, ByteBuffer buf) return bytesRead; } - /** - * Verifies the block's checksum. This is an I/O intensive operation. - * @return if the block was successfully checksummed. - */ - public void verifyChecksum() throws IOException, ChecksumException { - Preconditions.checkArgument(isLocked && isMapped, - "Block must be mapped and locked before checksum verification!"); - // skip if checksum has already been successfully verified - if (isChecksummed) { - return; - } - // Verify the checksum from the block's meta file - // Get the DataChecksum from the meta file header - metaChannel.position(0); - BlockMetadataHeader header = - BlockMetadataHeader.readHeader(new DataInputStream( - new BufferedInputStream(metaIn, BlockMetadataHeader - .getHeaderSize()))); - DataChecksum checksum = header.getChecksum(); - final int bytesPerChecksum = checksum.getBytesPerChecksum(); - final int checksumSize = checksum.getChecksumSize(); - final int numChunks = (8*1024*1024) / bytesPerChecksum; - ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum); - ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize); - // Verify the checksum - int bytesVerified = 0; - while (bytesVerified < blockChannel.size()) { - Preconditions.checkState(bytesVerified % bytesPerChecksum == 0, - "Unexpected partial chunk before EOF"); - assert bytesVerified % bytesPerChecksum == 0; - int bytesRead = fillBuffer(blockChannel, blockBuf); - if (bytesRead == -1) { - throw new IOException("Premature EOF"); - } - blockBuf.flip(); - // Number of read chunks, including partial chunk at end - int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum; - checksumBuf.limit(chunks*checksumSize); - fillBuffer(metaChannel, checksumBuf); - checksumBuf.flip(); - checksum.verifyChunkedSums(blockBuf, checksumBuf, block.getBlockName(), - bytesVerified); - // Success - bytesVerified += bytesRead; - blockBuf.clear(); - checksumBuf.clear(); - } - isChecksummed = true; - // Can close the backing file since everything is safely in memory - blockChannel.close(); - } - @Override public void close() { - unmap(); - IOUtils.closeQuietly(blockIn); - IOUtils.closeQuietly(metaIn); + if (mmap != null) { + NativeIO.POSIX.munmap(mmap); + mmap = null; + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java index fcc4b9e7fa..1f4ff7de18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java @@ -646,16 +646,14 @@ public void destroyAndCollectBlocks( FileWithSnapshot sfile = (FileWithSnapshot) referred; // make sure we mark the file as deleted sfile.deleteCurrentFile(); - if (snapshot != null) { - try { - // when calling cleanSubtree of the referred node, since we - // compute quota usage updates before calling this destroy - // function, we use true for countDiffChange - referred.cleanSubtree(snapshot, prior, collectedBlocks, - removedINodes, true); - } catch (QuotaExceededException e) { - LOG.error("should not exceed quota while snapshot deletion", e); - } + try { + // when calling cleanSubtree of the referred node, since we + // compute quota usage updates before calling this destroy + // function, we use true for countDiffChange + referred.cleanSubtree(snapshot, prior, collectedBlocks, + removedINodes, true); + } catch (QuotaExceededException e) { + LOG.error("should not exceed quota while snapshot deletion", e); } } else if (referred instanceof INodeDirectoryWithSnapshot) { // similarly, if referred is a directory, it must be an diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java index 808ac75c62..b5f3caa96c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java @@ -716,14 +716,8 @@ public Quota.Counts cleanSubtree(final Snapshot snapshot, Snapshot prior, if (priorDiff != null && priorDiff.getSnapshot().equals(prior)) { List cList = priorDiff.diff.getList(ListType.CREATED); List dList = priorDiff.diff.getList(ListType.DELETED); - priorCreated = new HashMap(cList.size()); - for (INode cNode : cList) { - priorCreated.put(cNode, cNode); - } - priorDeleted = new HashMap(dList.size()); - for (INode dNode : dList) { - priorDeleted.put(dNode, dNode); - } + priorCreated = cloneDiffList(cList); + priorDeleted = cloneDiffList(dList); } } @@ -896,6 +890,17 @@ private void computeContentSummary4Snapshot(final Content.Counts counts) { counts.add(Content.DIRECTORY, diffs.asList().size()); } + private static Map cloneDiffList(List diffList) { + if (diffList == null || diffList.size() == 0) { + return null; + } + Map map = new HashMap(diffList.size()); + for (INode node : diffList) { + map.put(node, node); + } + return map; + } + /** * Destroy a subtree under a DstReference node. */ @@ -914,26 +919,28 @@ public static void destroyDstSubtree(INode inode, final Snapshot snapshot, destroyDstSubtree(inode.asReference().getReferredINode(), snapshot, prior, collectedBlocks, removedINodes); } - } else if (inode.isFile() && snapshot != null) { + } else if (inode.isFile()) { inode.cleanSubtree(snapshot, prior, collectedBlocks, removedINodes, true); } else if (inode.isDirectory()) { Map excludedNodes = null; if (inode instanceof INodeDirectoryWithSnapshot) { INodeDirectoryWithSnapshot sdir = (INodeDirectoryWithSnapshot) inode; + DirectoryDiffList diffList = sdir.getDiffs(); + DirectoryDiff priorDiff = diffList.getDiff(prior); + if (priorDiff != null && priorDiff.getSnapshot().equals(prior)) { + List dList = priorDiff.diff.getList(ListType.DELETED); + excludedNodes = cloneDiffList(dList); + } + if (snapshot != null) { diffList.deleteSnapshotDiff(snapshot, prior, sdir, collectedBlocks, removedINodes, true); } - DirectoryDiff priorDiff = diffList.getDiff(prior); + priorDiff = diffList.getDiff(prior); if (priorDiff != null && priorDiff.getSnapshot().equals(prior)) { priorDiff.diff.destroyCreatedList(sdir, collectedBlocks, removedINodes); - List dList = priorDiff.diff.getList(ListType.DELETED); - excludedNodes = new HashMap(dList.size()); - for (INode dNode : dList) { - excludedNodes.put(dNode, dNode); - } } } for (INode child : inode.asDirectory().getChildrenList(prior)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java index 05077a6c2c..fb7457b732 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java @@ -109,8 +109,10 @@ public Quota.Counts cleanSubtree(final Snapshot snapshot, Snapshot prior, final List removedINodes, final boolean countDiffChange) throws QuotaExceededException { if (snapshot == null) { // delete the current file - recordModification(prior, null); - isCurrentFileDeleted = true; + if (!isCurrentFileDeleted()) { + recordModification(prior, null); + deleteCurrentFile(); + } Util.collectBlocksAndClear(this, collectedBlocks, removedINodes); return Quota.Counts.newInstance(); } else { // delete a snapshot diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java index 59a7fc96ba..f2d1f40a7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java @@ -96,8 +96,10 @@ public Quota.Counts cleanSubtree(final Snapshot snapshot, Snapshot prior, final List removedINodes, final boolean countDiffChange) throws QuotaExceededException { if (snapshot == null) { // delete the current file - recordModification(prior, null); - isCurrentFileDeleted = true; + if (!isCurrentFileDeleted()) { + recordModification(prior, null); + deleteCurrentFile(); + } Util.collectBlocksAndClear(this, collectedBlocks, removedINodes); return Quota.Counts.newInstance(); } else { // delete a snapshot diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 9cab6885a1..9b623c056f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -683,19 +683,6 @@ protected URLRunner(final HttpOpParam.Op op, final URL url, boolean redirected) } } - @VisibleForTesting - final class ConnRunner extends AbstractRunner { - protected ConnRunner(final HttpOpParam.Op op, HttpURLConnection conn) { - super(op, false); - this.conn = conn; - } - - @Override - protected URL getUrl() { - return null; - } - } - private FsPermission applyUMask(FsPermission permission) { if (permission == null) { permission = FsPermission.getDefault(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java index b13c81a156..a5a9521b0c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java @@ -23,20 +23,14 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; -import java.net.HttpURLConnection; import java.net.InetAddress; import java.net.NetworkInterface; -import java.net.URL; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Enumeration; -import java.util.Map; - -import javax.servlet.http.HttpServletResponse; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -47,23 +41,17 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; -import org.apache.hadoop.hdfs.web.resources.DoAsParam; -import org.apache.hadoop.hdfs.web.resources.ExceptionHandler; -import org.apache.hadoop.hdfs.web.resources.GetOpParam; -import org.apache.hadoop.hdfs.web.resources.PostOpParam; -import org.apache.hadoop.hdfs.web.resources.PutOpParam; import org.apache.hadoop.security.TestDoAsEffectiveUser; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.token.Token; -import org.apache.log4j.Level; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; public class TestDelegationTokenForProxyUser { private static MiniDFSCluster cluster; @@ -155,56 +143,26 @@ public Token[] run() throws IOException { } } - @Test(timeout=20000) + @Test(timeout=5000) public void testWebHdfsDoAs() throws Exception { WebHdfsTestUtil.LOG.info("START: testWebHdfsDoAs()"); - ((Log4JLogger)NamenodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL); - ((Log4JLogger)ExceptionHandler.LOG).getLogger().setLevel(Level.ALL); WebHdfsTestUtil.LOG.info("ugi.getShortUserName()=" + ugi.getShortUserName()); final WebHdfsFileSystem webhdfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(ugi, config); final Path root = new Path("/"); cluster.getFileSystem().setPermission(root, new FsPermission((short)0777)); - { - //test GETHOMEDIRECTORY with doAs - final URL url = WebHdfsTestUtil.toUrl(webhdfs, - GetOpParam.Op.GETHOMEDIRECTORY, root, new DoAsParam(PROXY_USER)); - final HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - final Map m = WebHdfsTestUtil.connectAndGetJson(conn, HttpServletResponse.SC_OK); - conn.disconnect(); - - final Object responsePath = m.get(Path.class.getSimpleName()); - WebHdfsTestUtil.LOG.info("responsePath=" + responsePath); - Assert.assertEquals("/user/" + PROXY_USER, responsePath); - } + Whitebox.setInternalState(webhdfs, "ugi", proxyUgi); { - //test GETHOMEDIRECTORY with DOas - final URL url = WebHdfsTestUtil.toUrl(webhdfs, - GetOpParam.Op.GETHOMEDIRECTORY, root, new DoAsParam(PROXY_USER) { - @Override - public String getName() { - return "DOas"; - } - }); - final HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - final Map m = WebHdfsTestUtil.connectAndGetJson(conn, HttpServletResponse.SC_OK); - conn.disconnect(); - - final Object responsePath = m.get(Path.class.getSimpleName()); + Path responsePath = webhdfs.getHomeDirectory(); WebHdfsTestUtil.LOG.info("responsePath=" + responsePath); - Assert.assertEquals("/user/" + PROXY_USER, responsePath); + Assert.assertEquals(webhdfs.getUri() + "/user/" + PROXY_USER, responsePath.toString()); } final Path f = new Path("/testWebHdfsDoAs/a.txt"); { - //test create file with doAs - final PutOpParam.Op op = PutOpParam.Op.CREATE; - final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER)); - HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn); - final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096); + FSDataOutputStream out = webhdfs.create(f); out.write("Hello, webhdfs user!".getBytes()); out.close(); @@ -214,12 +172,7 @@ public String getName() { } { - //test append file with doAs - final PostOpParam.Op op = PostOpParam.Op.APPEND; - final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER)); - HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn); - final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096); + final FSDataOutputStream out = webhdfs.append(f); out.write("\nHello again!".getBytes()); out.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index e6e87b9cb3..e889413ec4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -26,8 +26,11 @@ import java.io.FileInputStream; import java.io.IOException; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.HdfsBlockLocation; @@ -42,6 +45,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock; import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; @@ -52,12 +57,18 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Logger; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; + public class TestFsDatasetCache { + private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class); // Most Linux installs allow a default of 64KB locked memory private static final long CACHE_CAPACITY = 64 * 1024; @@ -71,12 +82,14 @@ public class TestFsDatasetCache { private static DataNode dn; private static FsDatasetSpi fsd; private static DatanodeProtocolClientSideTranslatorPB spyNN; + private static PageRounder rounder = new PageRounder(); @Before public void setUp() throws Exception { assumeTrue(!Path.WINDOWS); - assumeTrue(NativeIO.isAvailable()); + assumeTrue(NativeIO.getMemlockLimit() >= CACHE_CAPACITY); conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY); @@ -169,19 +182,34 @@ private static long[] getBlockSizes(HdfsBlockLocation[] locs) * Blocks until cache usage hits the expected new value. */ private long verifyExpectedCacheUsage(final long expected) throws Exception { - long cacheUsed = fsd.getDnCacheUsed(); - while (cacheUsed != expected) { - cacheUsed = fsd.getDnCacheUsed(); - Thread.sleep(100); - } - assertEquals("Unexpected amount of cache used", expected, cacheUsed); - return cacheUsed; + GenericTestUtils.waitFor(new Supplier() { + private int tries = 0; + + @Override + public Boolean get() { + long curDnCacheUsed = fsd.getDnCacheUsed(); + if (curDnCacheUsed != expected) { + if (tries++ > 10) { + LOG.info("verifyExpectedCacheUsage: expected " + + expected + ", got " + curDnCacheUsed + "; " + + "memlock limit = " + NativeIO.getMemlockLimit() + + ". Waiting..."); + } + return false; + } + return true; + } + }, 100, 60000); + return expected; } - @Test(timeout=60000) + @Test(timeout=600000) public void testCacheAndUncacheBlock() throws Exception { + LOG.info("beginning testCacheAndUncacheBlock"); final int NUM_BLOCKS = 5; + verifyExpectedCacheUsage(0); + // Write a test file final Path testFile = new Path("/testCacheBlock"); final long testFileLen = BLOCK_SIZE*NUM_BLOCKS; @@ -211,15 +239,23 @@ public void testCacheAndUncacheBlock() throws Exception { setHeartbeatResponse(uncacheBlock(locs[i])); current = verifyExpectedCacheUsage(current - blockSizes[i]); } + LOG.info("finishing testCacheAndUncacheBlock"); } - @Test(timeout=60000) + @Test(timeout=600000) public void testFilesExceedMaxLockedMemory() throws Exception { + LOG.info("beginning testFilesExceedMaxLockedMemory"); + + // We don't want to deal with page rounding issues, so skip this + // test if page size is weird + long osPageSize = NativeIO.getOperatingSystemPageSize(); + assumeTrue(osPageSize == 4096); + // Create some test files that will exceed total cache capacity - // Don't forget that meta files take up space too! - final int numFiles = 4; - final long fileSize = CACHE_CAPACITY / numFiles; - final Path[] testFiles = new Path[4]; + final int numFiles = 5; + final long fileSize = 15000; + + final Path[] testFiles = new Path[numFiles]; final HdfsBlockLocation[][] fileLocs = new HdfsBlockLocation[numFiles][]; final long[] fileSizes = new long[numFiles]; for (int i=0; i() { + @Override + public Boolean get() { + int lines = appender.countLinesWithMessage( + "more bytes in the cache: " + + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY); + return lines > 0; + } + }, 500, 30000); // Uncache the n-1 files for (int i=0; i foo2 + final Path foo2 = new Path(test, "foo2"); + hdfs.rename(foo, foo2); + // create snapshot s1, note the file is included in s1 + hdfs.createSnapshot(test, "s1"); + // delete bar and foo2 + hdfs.delete(new Path(foo2, "bar"), true); + hdfs.delete(foo2, true); + + final Path sfileInBar = SnapshotTestHelper.getSnapshotPath(test, "s1", + "foo2/bar/file"); + assertTrue(hdfs.exists(sfileInBar)); + + hdfs.deleteSnapshot(test, "s1"); + assertFalse(hdfs.exists(sfileInBar)); + + restartClusterAndCheckImage(true); + // make sure the file under bar is deleted + final Path barInS0 = SnapshotTestHelper.getSnapshotPath(test, "s0", + "foo/bar"); + INodeDirectoryWithSnapshot barNode = (INodeDirectoryWithSnapshot) fsdir + .getINode(barInS0.toString()); + assertEquals(0, barNode.getChildrenList(null).size()); + List diffList = barNode.getDiffs().asList(); + assertEquals(1, diffList.size()); + DirectoryDiff diff = diffList.get(0); + assertEquals(0, diff.getChildrenDiff().getList(ListType.DELETED).size()); + assertEquals(0, diff.getChildrenDiff().getList(ListType.CREATED).size()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java index 99a8a8dc2f..47614b77cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java @@ -347,4 +347,49 @@ public void testDeletionWithZeroSizeBlock2() throws Exception { assertEquals(1, blks.length); assertEquals(BLOCKSIZE, blks[0].getNumBytes()); } + + /** + * 1. rename under-construction file with 0-sized blocks after snapshot. + * 2. delete the renamed directory. + * make sure we delete the 0-sized block. + * see HDFS-5476. + */ + @Test + public void testDeletionWithZeroSizeBlock3() throws Exception { + final Path foo = new Path("/foo"); + final Path subDir = new Path(foo, "sub"); + final Path bar = new Path(subDir, "bar"); + DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPLICATION, 0L); + + hdfs.append(bar); + + INodeFile barNode = fsdir.getINode4Write(bar.toString()).asFile(); + BlockInfo[] blks = barNode.getBlocks(); + assertEquals(1, blks.length); + ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]); + cluster.getNameNodeRpc() + .addBlock(bar.toString(), hdfs.getClient().getClientName(), previous, + null, barNode.getId(), null); + + SnapshotTestHelper.createSnapshot(hdfs, foo, "s1"); + + // rename bar + final Path bar2 = new Path(subDir, "bar2"); + hdfs.rename(bar, bar2); + + INodeFile bar2Node = fsdir.getINode4Write(bar2.toString()).asFile(); + blks = bar2Node.getBlocks(); + assertEquals(2, blks.length); + assertEquals(BLOCKSIZE, blks[0].getNumBytes()); + assertEquals(0, blks[1].getNumBytes()); + + // delete subDir + hdfs.delete(subDir, true); + + final Path sbar = SnapshotTestHelper.getSnapshotPath(foo, "s1", "sub/bar"); + barNode = fsdir.getINode(sbar.toString()).asFile(); + blks = barNode.getBlocks(); + assertEquals(1, blks.length); + assertEquals(BLOCKSIZE, blks[0].getNumBytes()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java index df87c6b584..9b512e0448 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java @@ -78,11 +78,6 @@ public static URL toUrl(final WebHdfsFileSystem webhdfs, Assert.assertEquals(expectedResponseCode, conn.getResponseCode()); return WebHdfsFileSystem.jsonParse(conn, false); } - - public static HttpURLConnection twoStepWrite(final WebHdfsFileSystem webhdfs, - final HttpOpParam.Op op, HttpURLConnection conn) throws IOException { - return webhdfs.new ConnRunner(op, conn).twoStepWrite(); - } public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs, final HttpOpParam.Op op, final HttpURLConnection conn, diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 21a2e94b60..6c5d808517 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -88,6 +88,9 @@ Release 2.3.0 - UNRELEASED YARN-1323. Set HTTPS webapp address along with other RPC addresses in HAUtil (Karthik Kambatla via Sandy Ryza) + YARN-1121. Changed ResourceManager's state-store to drain all events on + shut-down. (Jian He via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 9301bba081..bf5058a9d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -49,6 +49,19 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { private final BlockingQueue eventQueue; private volatile boolean stopped = false; + // Configuration flag for enabling/disabling draining dispatcher's events on + // stop functionality. + private volatile boolean drainEventsOnStop = false; + + // Indicates all the remaining dispatcher's events on stop have been drained + // and processed. + private volatile boolean drained = true; + + // For drainEventsOnStop enabled only, block newly coming events into the + // queue while stopping. + private volatile boolean blockNewEvents = false; + private EventHandler handlerInstance = null; + private Thread eventHandlingThread; protected final Map, EventHandler> eventDispatchers; private boolean exitOnDispatchException; @@ -68,6 +81,7 @@ Runnable createThread() { @Override public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { + drained = eventQueue.isEmpty(); Event event; try { event = eventQueue.take(); @@ -102,8 +116,19 @@ protected void serviceStart() throws Exception { eventHandlingThread.start(); } + public void setDrainEventsOnStop() { + drainEventsOnStop = true; + } + @Override protected void serviceStop() throws Exception { + if (drainEventsOnStop) { + blockNewEvents = true; + LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); + while(!drained) { + Thread.yield(); + } + } stopped = true; if (eventHandlingThread != null) { eventHandlingThread.interrupt(); @@ -173,11 +198,19 @@ public void register(Class eventType, @Override public EventHandler getEventHandler() { - return new GenericEventHandler(); + if (handlerInstance == null) { + handlerInstance = new GenericEventHandler(); + } + return handlerInstance; } class GenericEventHandler implements EventHandler { public void handle(Event event) { + if (blockNewEvents) { + return; + } + drained = false; + /* all this method does is enqueue all the events onto the queue */ int qSize = eventQueue.size(); if (qSize !=0 && qSize %1000 == 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 5a7c7dcbb1..911107daab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -261,17 +261,20 @@ public void setRMDispatcher(Dispatcher dispatcher) { } AsyncDispatcher dispatcher; - - public synchronized void serviceInit(Configuration conf) throws Exception{ + + @Override + protected void serviceInit(Configuration conf) throws Exception{ // create async handler dispatcher = new AsyncDispatcher(); dispatcher.init(conf); dispatcher.register(RMStateStoreEventType.class, new ForwardingEventHandler()); + dispatcher.setDrainEventsOnStop(); initInternal(conf); } - - protected synchronized void serviceStart() throws Exception { + + @Override + protected void serviceStart() throws Exception { dispatcher.start(); startInternal(); } @@ -288,11 +291,12 @@ protected synchronized void serviceStart() throws Exception { */ protected abstract void startInternal() throws Exception; - public synchronized void serviceStop() throws Exception { + @Override + protected void serviceStop() throws Exception { closeInternal(); dispatcher.stop(); } - + /** * Derived classes close themselves using this method. * The base class will be closed and the event dispatcher will be shutdown @@ -509,8 +513,7 @@ public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) { } // Dispatcher related code - - private synchronized void handleStoreEvent(RMStateStoreEvent event) { + protected void handleStoreEvent(RMStateStoreEvent event) { if (event.getType().equals(RMStateStoreEventType.STORE_APP) || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) { ApplicationState appState = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index aba334ad94..cb41ca4e45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -163,6 +163,14 @@ public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user, Map acls, boolean unmanaged, String queue, int maxAppAttempts, Credentials ts, String appType) throws Exception { + return submitApp(masterMemory, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, true); + } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted) throws Exception { ApplicationClientProtocol client = getClientRMService(); GetNewApplicationResponse resp = client.getNewApplication(Records .newRecord(GetNewApplicationRequest.class)); @@ -222,7 +230,9 @@ PrivilegedAction setClientReq( }.setClientReq(client, req); fakeUser.doAs(action); // make sure app is immediately available after submit - waitForState(appId, RMAppState.ACCEPTED); + if (waitForAccepted) { + waitForState(appId, RMAppState.ACCEPTED); + } return getRMContext().getRMApps().get(appId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 97f51a2791..f87f6894bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -1062,6 +1063,65 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { rm2.stop(); } + @Test + public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore() { + volatile boolean wait = true; + @Override + public void serviceStop() throws Exception { + // Unblock app saving request. + wait = false; + super.serviceStop(); + } + + @Override + protected void handleStoreEvent(RMStateStoreEvent event) { + // Block app saving request. + while (wait); + super.handleStoreEvent(event); + } + }; + memStore.init(conf); + + // start RM + final MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + + // create apps. + final ArrayList appList = new ArrayList(); + final int NUM_APPS = 5; + + for (int i = 0; i < NUM_APPS; i++) { + RMApp app = rm1.submitApp(200, "name", "user", + new HashMap(), false, + "default", -1, null, "MAPREDUCE", false); + appList.add(app); + rm1.waitForState(app.getApplicationId(), RMAppState.NEW_SAVING); + } + // all apps's saving request are now enqueued to RMStateStore's dispatcher + // queue, and will be processed once rm.stop() is called. + + // Nothing exist in state store before stop is called. + Map rmAppState = + memStore.getState().getApplicationState(); + Assert.assertTrue(rmAppState.size() == 0); + + // stop rm + rm1.stop(); + + // Assert app info is still saved even if stop is called with pending saving + // request on dispatcher. + for (RMApp app : appList) { + ApplicationState appState = rmAppState.get(app.getApplicationId()); + Assert.assertNotNull(appState); + Assert.assertEquals(0, appState.getAttemptCount()); + Assert.assertEquals(appState.getApplicationSubmissionContext() + .getApplicationId(), app.getApplicationSubmissionContext() + .getApplicationId()); + } + Assert.assertTrue(rmAppState.size() == NUM_APPS); + } + public static class TestSecurityMockRM extends MockRM { public TestSecurityMockRM(Configuration conf, RMStateStore store) {