From 85c203602993a946fb5f41eadf1cf1484a0ce686 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Mon, 16 Sep 2013 18:41:27 +0000 Subject: [PATCH] HDFS-5210. Fix some failing unit tests on HDFS-4949 branch. (Contributed by Andrew Wang) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1523754 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES-HDFS-4949.txt | 2 ++ .../CacheReplicationManager.java | 13 +++++---- .../CacheReplicationMonitor.java | 2 +- .../CacheReplicationPolicy.java | 14 +++++---- .../hdfs/server/datanode/BPOfferService.java | 6 ---- .../hdfs/server/datanode/BPServiceActor.java | 15 +++------- .../hadoop/hdfs/server/datanode/DataNode.java | 1 - .../fsdataset/impl/MappableBlock.java | 2 +- .../hdfs/server/namenode/CacheManager.java | 2 ++ .../TestCacheReplicationManager.java | 29 ++++++++++++++++--- 10 files changed, 52 insertions(+), 34 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt index c041cf4ed4..78bdc796b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt @@ -53,3 +53,5 @@ HDFS-4949 (Unreleased) HDFS-5201. NativeIO: consolidate getrlimit into NativeIO#getMemlockLimit (Contributed by Colin Patrick McCabe) + HDFS-5210. Fix some failing unit tests on HDFS-4949 branch. + (Contributed by Andrew Wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java index fb269c7689..d58f308146 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java @@ -167,11 +167,13 @@ public void close() { } public void clearQueues() { - blocksToUncache.clear(); - synchronized (neededCacheBlocks) { - neededCacheBlocks.clear(); + if (isCachingEnabled) { + blocksToUncache.clear(); + synchronized (neededCacheBlocks) { + neededCacheBlocks.clear(); + } + pendingCacheBlocks.clear(); } - pendingCacheBlocks.clear(); } public boolean isCachingEnabled() { @@ -571,7 +573,8 @@ private void updateNeededCaching(final Block block, } /** - * Return the safely cached replicas of a block in a BlocksMap + * Return the safe replicas (not corrupt or decomissioning/decommissioned) of + * a block in a BlocksMap */ List getSafeReplicas(BlocksMap map, Block block) { List nodes = new ArrayList(3); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java index ce70b3f677..b6255460a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java @@ -156,7 +156,7 @@ private void computeCachingWorkForBlocks(List blocksToCache) { } // Choose some replicas to cache if needed additionalRepl = requiredRepl - effectiveRepl; - targets = new ArrayList(storedNodes); + targets = new ArrayList(storedNodes.size()); // Only target replicas that aren't already cached. for (DatanodeDescriptor dn: storedNodes) { if (!cachedNodes.contains(dn)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java index 2674f6a0c7..3bd19331ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java @@ -35,6 +35,9 @@ @InterfaceAudience.LimitedPrivate({"HDFS"}) public class CacheReplicationPolicy { + // Not thread-safe, but only accessed by the CacheReplicationMonitor + private static RandomData random = new RandomDataImpl(); + /** * @return List of datanodes with sufficient capacity to cache the block */ @@ -53,8 +56,7 @@ private static List selectSufficientCapacity(Block block, /** * Returns a random datanode from targets, weighted by the amount of free - * cache capacity on the datanode. Prunes unsuitable datanodes from the - * targets list. + * cache capacity on the datanode. * * @param block Block to be cached * @param targets List of potential cache targets @@ -75,8 +77,7 @@ private static DatanodeDescriptor randomDatanodeByRemainingCache(Block block, lottery.put(totalCacheAvailable, dn); } // Pick our lottery winner - RandomData r = new RandomDataImpl(); - long winningTicket = r.nextLong(0, totalCacheAvailable - 1); + long winningTicket = random.nextLong(0, totalCacheAvailable - 1); Entry winner = lottery.higherEntry(winningTicket); return winner.getValue(); } @@ -94,7 +95,10 @@ static List chooseTargetsToCache(Block block, List chosen = new ArrayList(numTargets); for (int i = 0; i < numTargets && !sufficient.isEmpty(); i++) { - chosen.add(randomDatanodeByRemainingCache(block, sufficient)); + DatanodeDescriptor choice = + randomDatanodeByRemainingCache(block, sufficient); + chosen.add(choice); + sufficient.remove(choice); } return chosen; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 3c01345a3f..bc78eda828 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -368,12 +368,6 @@ void scheduleBlockReport(long delay) { } } - void scheduleCacheReport(long delay) { - for (BPServiceActor actor: bpServices) { - actor.scheduleCacheReport(delay); - } - } - /** * Ask each of the actors to report a bad block hosted on another DN. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index b4912881a0..b96292410e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -242,17 +242,6 @@ void scheduleBlockReport(long delay) { resetBlockReportTime = true; // reset future BRs for randomness } - void scheduleCacheReport(long delay) { - if (delay > 0) { - // Uniform random jitter by the delay - lastCacheReport = Time.monotonicNow() - - dnConf.cacheReportInterval - + DFSUtil.getRandom().nextInt(((int)delay)); - } else { // send at next heartbeat - lastCacheReport = lastCacheReport - dnConf.cacheReportInterval; - } - } - void reportBadBlocks(ExtendedBlock block) { if (bpRegistration == null) { return; @@ -445,6 +434,10 @@ DatanodeCommand blockReport() throws IOException { } DatanodeCommand cacheReport() throws IOException { + // If caching is disabled, do not send a cache report + if (dn.getFSDataset().getCacheCapacity() == 0) { + return null; + } // send cache report if timer has expired. DatanodeCommand cmd = null; long startTime = Time.monotonicNow(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 778820b0a8..7115888134 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1916,7 +1916,6 @@ static StartupOption getStartupOption(Configuration conf) { public void scheduleAllBlockReport(long delay) { for(BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { bpos.scheduleBlockReport(delay); - bpos.scheduleCacheReport(delay); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java index de0bcd35d7..a2a9e6c5a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java @@ -225,7 +225,7 @@ public void verifyChecksum() throws IOException, ChecksumException { blockBuf.flip(); // Number of read chunks, including partial chunk at end int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum; - checksumBuf.limit(chunks*bytesPerChecksum); + checksumBuf.limit(chunks*checksumSize); fillBuffer(metaChannel, checksumBuf); checksumBuf.flip(); checksum.verifyChunkedSums(blockBuf, checksumBuf, block.getBlockName(), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index 5b82848015..945b425038 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -186,6 +186,8 @@ private synchronized Fallible addDirective( // TODO: adjustable cache replication factor namesystem.setCacheReplicationInt(directive.getPath(), file.getBlockReplication()); + } else { + LOG.warn("Path " + directive.getPath() + " is not a file"); } } catch (IOException ioe) { LOG.info("addDirective " + directive +": failed to cache file: " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java index 557a9bf91b..8c7037c1e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java @@ -48,9 +48,11 @@ public class TestCacheReplicationManager { + private static final long BLOCK_SIZE = 512; + private static final int REPL_FACTOR = 3; + private static final int NUM_DATANODES = 4; // Most Linux installs allow a default of 64KB locked memory - private static final long CACHE_CAPACITY = 64 * 1024; - private static final long BLOCK_SIZE = 4096; + private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES; private static Configuration conf; private static MiniDFSCluster cluster = null; @@ -75,7 +77,7 @@ public void setUp() throws Exception { conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000); cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(1).build(); + .numDataNodes(NUM_DATANODES).build(); cluster.waitActive(); fs = cluster.getFileSystem(); @@ -106,6 +108,25 @@ private void waitForExpectedNumCachedBlocks(final int expected) Thread.sleep(500); actual = countNumCachedBlocks(); } + waitForExpectedNumCachedReplicas(expected*REPL_FACTOR); + } + + private void waitForExpectedNumCachedReplicas(final int expected) + throws Exception { + BlocksMap cachedBlocksMap = cacheReplManager.cachedBlocksMap; + int actual = 0; + while (expected != actual) { + Thread.sleep(500); + nn.getNamesystem().readLock(); + try { + actual = 0; + for (BlockInfo b : cachedBlocksMap.getBlocks()) { + actual += cachedBlocksMap.numNodes(b); + } + } finally { + nn.getNamesystem().readUnlock(); + } + } } @Test(timeout=60000) @@ -114,7 +135,7 @@ public void testCachePaths() throws Exception { final String pool = "friendlyPool"; nnRpc.addCachePool(new CachePoolInfo("friendlyPool")); // Create some test files - final int numFiles = 3; + final int numFiles = 2; final int numBlocksPerFile = 2; final List paths = new ArrayList(numFiles); for (int i=0; i