diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 31b70279cd..82c104b4a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -598,6 +598,9 @@ Trunk (Unreleased) HDFS-5667. Include DatanodeStorage in StorageReport. (Arpit Agarwal) + HDFS-5589. Namenode loops caching and uncaching when data should be + uncached (awang via cmccabe) + Release 2.4.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java index 6e6e44b500..aef726fa9b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java @@ -21,12 +21,14 @@ import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Random; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -76,7 +78,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { /** * Pseudorandom number source */ - private final Random random = new Random(); + private static final Random random = new Random(); /** * The interval at which we scan the namesystem for caching changes. @@ -310,8 +312,6 @@ private void rescanCacheDirectives() { FSDirectory fsDir = namesystem.getFSDirectory(); final long now = new Date().getTime(); for (CacheDirective directive : cacheManager.getCacheDirectives()) { - // Reset the directive's statistics - directive.resetStatistics(); // Skip processing this entry if it has expired if (LOG.isTraceEnabled()) { LOG.trace("Directive expiry is at " + directive.getExpiryTime()); @@ -461,7 +461,7 @@ private String findReasonForNotCaching(CachedBlock cblock, // there may be a period of time when incomplete blocks remain cached // on the DataNodes. return "not complete"; - } else if (cblock.getReplication() == 0) { + } else if (cblock.getReplication() == 0) { // Since 0 is not a valid value for a cache directive's replication // field, seeing a replication of 0 on a CacheBlock means that it // has never been reached by any sweep. @@ -469,6 +469,9 @@ private String findReasonForNotCaching(CachedBlock cblock, } else if (cblock.getMark() != mark) { // Although the block was needed in the past, we didn't reach it during // the current sweep. Therefore, it doesn't need to be cached any more. + // Need to set the replication to 0 so it doesn't flip back to cached + // when the mark flips on the next scan + cblock.setReplicationAndMark((short)0, mark); return "no longer needed by any directives"; } return null; @@ -595,7 +598,7 @@ private void addNewPendingUncached(int neededUncached, * @param pendingCached A list of DataNodes that will soon cache the * block. */ - private void addNewPendingCached(int neededCached, + private void addNewPendingCached(final int neededCached, CachedBlock cachedBlock, List cached, List pendingCached) { // To figure out which replicas can be cached, we consult the @@ -616,35 +619,156 @@ private void addNewPendingCached(int neededCached, } return; } - List possibilities = new LinkedList(); + // Filter the list of replicas to only the valid targets + List possibilities = + new LinkedList(); int numReplicas = blockInfo.getCapacity(); Collection corrupt = blockManager.getCorruptReplicas(blockInfo); + int outOfCapacity = 0; for (int i = 0; i < numReplicas; i++) { DatanodeDescriptor datanode = blockInfo.getDatanode(i); - if ((datanode != null) && - ((!pendingCached.contains(datanode)) && - ((corrupt == null) || (!corrupt.contains(datanode))))) { - possibilities.add(datanode); + if (datanode == null) { + continue; } + if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) { + continue; + } + if (corrupt != null && corrupt.contains(datanode)) { + continue; + } + if (pendingCached.contains(datanode) || cached.contains(datanode)) { + continue; + } + long pendingCapacity = datanode.getCacheRemaining(); + // Subtract pending cached blocks from effective capacity + Iterator it = datanode.getPendingCached().iterator(); + while (it.hasNext()) { + CachedBlock cBlock = it.next(); + BlockInfo info = + blockManager.getStoredBlock(new Block(cBlock.getBlockId())); + if (info != null) { + pendingCapacity -= info.getNumBytes(); + } + } + it = datanode.getPendingUncached().iterator(); + // Add pending uncached blocks from effective capacity + while (it.hasNext()) { + CachedBlock cBlock = it.next(); + BlockInfo info = + blockManager.getStoredBlock(new Block(cBlock.getBlockId())); + if (info != null) { + pendingCapacity += info.getNumBytes(); + } + } + if (pendingCapacity < blockInfo.getNumBytes()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Datanode " + datanode + " is not a valid possibility for" + + " block " + blockInfo.getBlockId() + " of size " + + blockInfo.getNumBytes() + " bytes, only has " + + datanode.getCacheRemaining() + " bytes of cache remaining."); + } + outOfCapacity++; + continue; + } + possibilities.add(datanode); } - while (neededCached > 0) { - if (possibilities.isEmpty()) { - LOG.warn("We need " + neededCached + " more replica(s) than " + - "actually exist to provide a cache replication of " + - cachedBlock.getReplication() + " for " + cachedBlock); - return; - } - DatanodeDescriptor datanode = - possibilities.remove(random.nextInt(possibilities.size())); - if (LOG.isDebugEnabled()) { - LOG.debug("AddNewPendingCached: datanode " + datanode + - " will now cache block " + cachedBlock); - } + List chosen = chooseDatanodesForCaching(possibilities, + neededCached, blockManager.getDatanodeManager().getStaleInterval()); + for (DatanodeDescriptor datanode : chosen) { pendingCached.add(datanode); boolean added = datanode.getPendingCached().add(cachedBlock); assert added; - neededCached--; + } + // We were unable to satisfy the requested replication factor + if (neededCached > chosen.size()) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Only have " + + (cachedBlock.getReplication() - neededCached + chosen.size()) + + " of " + cachedBlock.getReplication() + " cached replicas for " + + cachedBlock + " (" + outOfCapacity + " nodes have insufficient " + + "capacity)."); + } } } + + /** + * Chooses datanode locations for caching from a list of valid possibilities. + * Non-stale nodes are chosen before stale nodes. + * + * @param possibilities List of candidate datanodes + * @param neededCached Number of replicas needed + * @param staleInterval Age of a stale datanode + * @return A list of chosen datanodes + */ + private static List chooseDatanodesForCaching( + final List possibilities, final int neededCached, + final long staleInterval) { + // Make a copy that we can modify + List targets = + new ArrayList(possibilities); + // Selected targets + List chosen = new LinkedList(); + + // Filter out stale datanodes + List stale = new LinkedList(); + Iterator it = targets.iterator(); + while (it.hasNext()) { + DatanodeDescriptor d = it.next(); + if (d.isStale(staleInterval)) { + it.remove(); + stale.add(d); + } + } + // Select targets + while (chosen.size() < neededCached) { + // Try to use stale nodes if we're out of non-stale nodes, else we're done + if (targets.isEmpty()) { + if (!stale.isEmpty()) { + targets = stale; + } else { + break; + } + } + // Select a random target + DatanodeDescriptor target = + chooseRandomDatanodeByRemainingCapacity(targets); + chosen.add(target); + targets.remove(target); + } + return chosen; + } + + /** + * Choose a single datanode from the provided list of possible + * targets, weighted by the percentage of free space remaining on the node. + * + * @return The chosen datanode + */ + private static DatanodeDescriptor chooseRandomDatanodeByRemainingCapacity( + final List targets) { + // Use a weighted probability to choose the target datanode + float total = 0; + for (DatanodeDescriptor d : targets) { + total += d.getCacheRemainingPercent(); + } + // Give each datanode a portion of keyspace equal to its relative weight + // [0, w1) selects d1, [w1, w2) selects d2, etc. + TreeMap lottery = + new TreeMap(); + int offset = 0; + for (DatanodeDescriptor d : targets) { + // Since we're using floats, be paranoid about negative values + int weight = + Math.max(1, (int)((d.getCacheRemainingPercent() / total) * 1000000)); + offset += weight; + lottery.put(offset, d); + } + // Choose a number from [0, offset), which is the total amount of weight, + // to select the winner + DatanodeDescriptor winner = + lottery.higherEntry(random.nextInt(offset)).getValue(); + return winner; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index b6aac810db..c0a93c4aef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -43,10 +43,13 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.LogVerificationAppender; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -109,8 +112,9 @@ public class TestFsDatasetCache { public void setUp() throws Exception { assumeTrue(!Path.WINDOWS); conf = new HdfsConfiguration(); - conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS, - 500); + conf.setLong( + DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100); + conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY); @@ -328,7 +332,7 @@ public void testFilesExceedMaxLockedMemory() throws Exception { // Create some test files that will exceed total cache capacity final int numFiles = 5; - final long fileSize = 15000; + final long fileSize = CACHE_CAPACITY / (numFiles-1); final Path[] testFiles = new Path[numFiles]; final HdfsBlockLocation[][] fileLocs = new HdfsBlockLocation[numFiles][]; @@ -477,4 +481,42 @@ public void testPageRounder() throws Exception { setHeartbeatResponse(uncacheBlocks(locs)); verifyExpectedCacheUsage(0, 0); } + + @Test(timeout=60000) + public void testUncacheQuiesces() throws Exception { + // Create a file + Path fileName = new Path("/testUncacheQuiesces"); + int fileLen = 4096; + DFSTestUtil.createFile(fs, fileName, fileLen, (short)1, 0xFDFD); + // Cache it + DistributedFileSystem dfs = cluster.getFileSystem(); + dfs.addCachePool(new CachePoolInfo("pool")); + dfs.addCacheDirective(new CacheDirectiveInfo.Builder() + .setPool("pool").setPath(fileName).setReplication((short)3).build()); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name()); + long blocksCached = + MetricsAsserts.getLongCounter("BlocksCached", dnMetrics); + return blocksCached > 0; + } + }, 1000, 30000); + // Uncache it + dfs.removeCacheDirective(1); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name()); + long blocksUncached = + MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics); + return blocksUncached > 0; + } + }, 1000, 30000); + // Make sure that no additional messages were sent + Thread.sleep(10000); + MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name()); + MetricsAsserts.assertCounter("BlocksCached", 1l, dnMetrics); + MetricsAsserts.assertCounter("BlocksUncached", 1l, dnMetrics); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java index b81fde32ad..d47c275771 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java @@ -57,17 +57,18 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.LogVerificationAppender; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration; import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator; import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.CachePoolStats; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -81,6 +82,7 @@ import org.apache.hadoop.util.GSet; import org.apache.log4j.Level; import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -603,8 +605,8 @@ public void testCacheManagerRestart() throws Exception { * Wait for the NameNode to have an expected number of cached blocks * and replicas. * @param nn NameNode - * @param expectedCachedBlocks - * @param expectedCachedReplicas + * @param expectedCachedBlocks if -1, treat as wildcard + * @param expectedCachedReplicas if -1, treat as wildcard * @throws Exception */ private static void waitForCachedBlocks(NameNode nn, @@ -633,16 +635,18 @@ public Boolean get() { } finally { namesystem.readUnlock(); } - if ((numCachedBlocks == expectedCachedBlocks) && - (numCachedReplicas == expectedCachedReplicas)) { - return true; - } else { - LOG.info(logString + " cached blocks: have " + numCachedBlocks + - " / " + expectedCachedBlocks + ". " + - "cached replicas: have " + numCachedReplicas + - " / " + expectedCachedReplicas); - return false; + if (expectedCachedBlocks == -1 || + numCachedBlocks == expectedCachedBlocks) { + if (expectedCachedReplicas == -1 || + numCachedReplicas == expectedCachedReplicas) { + return true; + } } + LOG.info(logString + " cached blocks: have " + numCachedBlocks + + " / " + expectedCachedBlocks + ". " + + "cached replicas: have " + numCachedReplicas + + " / " + expectedCachedReplicas); + return false; } }, 500, 60000); } @@ -1351,4 +1355,39 @@ public void testMaxRelativeExpiry() throws Exception { .setExpiration(Expiration.newRelative(RELATIVE_EXPIRY_NEVER - 1)) .build()); } + + @Test(timeout=60000) + public void testExceedsCapacity() throws Exception { + // Create a giant file + final Path fileName = new Path("/exceeds"); + final long fileLen = CACHE_CAPACITY * (NUM_DATANODES*2); + int numCachedReplicas = (int) ((CACHE_CAPACITY*NUM_DATANODES)/BLOCK_SIZE); + DFSTestUtil.createFile(dfs, fileName, fileLen, (short) NUM_DATANODES, + 0xFADED); + // Set up a log appender watcher + final LogVerificationAppender appender = new LogVerificationAppender(); + final Logger logger = Logger.getRootLogger(); + logger.addAppender(appender); + dfs.addCachePool(new CachePoolInfo("pool")); + dfs.addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool") + .setPath(fileName).setReplication((short) 1).build()); + waitForCachedBlocks(namenode, -1, numCachedReplicas, + "testExceeds:1"); + // Check that no DNs saw an excess CACHE message + int lines = appender.countLinesWithMessage( + "more bytes in the cache: " + + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY); + assertEquals("Namenode should not send extra CACHE commands", 0, lines); + // Try creating a file with giant-sized blocks that exceed cache capacity + dfs.delete(fileName, false); + DFSTestUtil.createFile(dfs, fileName, 4096, fileLen, CACHE_CAPACITY * 2, + (short) 1, 0xFADED); + // Nothing will get cached, so just force sleep for a bit + Thread.sleep(4000); + // Still should not see any excess commands + lines = appender.countLinesWithMessage( + "more bytes in the cache: " + + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY); + assertEquals("Namenode should not send extra CACHE commands", 0, lines); + } }