From cc875f0124d1951a4aab0565442242dac3dd35c8 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Wed, 17 Aug 2011 22:00:27 +0000 Subject: [PATCH] DFS-1257. Fix a race condition on BlockManager.recentInvalidateSets. Contributed by Eric Payne git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1158933 13f79535-47bb-0310-9956-ffa450edef68 --- hdfs/CHANGES.txt | 3 + .../server/blockmanagement/BlockManager.java | 138 ++++++++++-------- 2 files changed, 83 insertions(+), 58 deletions(-) diff --git a/hdfs/CHANGES.txt b/hdfs/CHANGES.txt index 75088ff15b..449b97452b 100644 --- a/hdfs/CHANGES.txt +++ b/hdfs/CHANGES.txt @@ -972,6 +972,9 @@ Trunk (unreleased changes) HDFS-73. DFSOutputStream does not close all the sockets. (Uma Maheswara Rao G via eli) + HDFS-1257. Fix a race condition on BlockManager.recentInvalidateSets. + (Eric Payne via szetszwo) + BREAKDOWN OF HDFS-1073 SUBTASKS HDFS-1521. Persist transaction ID on disk between NN restarts. diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 7ebd9dd315..252f6f05d3 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -758,18 +758,23 @@ void removeBlocksAssociatedTo(final DatanodeDescriptor node) { } private void removeFromInvalidates(String storageID, Block block) { - Collection v = recentInvalidateSets.get(storageID); - if (v != null && v.remove(block)) { - pendingDeletionBlocksCount--; - if (v.isEmpty()) { - recentInvalidateSets.remove(storageID); + synchronized(recentInvalidateSets) { + Collection v = recentInvalidateSets.get(storageID); + if (v != null && v.remove(block)) { + pendingDeletionBlocksCount--; + if (v.isEmpty()) { + recentInvalidateSets.remove(storageID); + } } } } boolean belongsToInvalidates(String storageID, Block block) { - Collection invalidateSet = recentInvalidateSets.get(storageID); - return invalidateSet != null && invalidateSet.contains(block); + Collection invalidateSet; + synchronized(recentInvalidateSets) { + invalidateSet = recentInvalidateSets.get(storageID); + return invalidateSet != null && invalidateSet.contains(block); + } } /** @@ -781,17 +786,19 @@ boolean belongsToInvalidates(String storageID, Block block) { * @param log true to create an entry in the log */ private void addToInvalidates(Block b, DatanodeInfo dn, boolean log) { - Collection invalidateSet = recentInvalidateSets - .get(dn.getStorageID()); - if (invalidateSet == null) { - invalidateSet = new HashSet(); - recentInvalidateSets.put(dn.getStorageID(), invalidateSet); - } - if (invalidateSet.add(b)) { - pendingDeletionBlocksCount++; - if (log) { - NameNode.stateChangeLog.info("BLOCK* addToInvalidates: " - + b + " to " + dn.getName()); + synchronized(recentInvalidateSets) { + Collection invalidateSet = recentInvalidateSets + .get(dn.getStorageID()); + if (invalidateSet == null) { + invalidateSet = new HashSet(); + recentInvalidateSets.put(dn.getStorageID(), invalidateSet); + } + if (invalidateSet.add(b)) { + pendingDeletionBlocksCount++; + if (log) { + NameNode.stateChangeLog.info("BLOCK* addToInvalidates: " + + b + " to " + dn.getName()); + } } } } @@ -830,16 +837,21 @@ private void addToInvalidates(Block b) { */ private void dumpRecentInvalidateSets(PrintWriter out) { assert namesystem.hasWriteLock(); - int size = recentInvalidateSets.values().size(); + int size; + synchronized(recentInvalidateSets) { + size = recentInvalidateSets.values().size(); + } out.println("Metasave: Blocks " + pendingDeletionBlocksCount + " waiting deletion from " + size + " datanodes."); if (size == 0) { return; } - for(Map.Entry> entry : recentInvalidateSets.entrySet()) { - Collection blocks = entry.getValue(); - if (blocks.size() > 0) { - out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks); + synchronized(recentInvalidateSets) { + for(Map.Entry> entry : recentInvalidateSets.entrySet()) { + Collection blocks = entry.getValue(); + if (blocks.size() > 0) { + out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks); + } } } } @@ -950,13 +962,16 @@ public int getUnderReplicatedNotMissingBlocks() { * @return total number of block for deletion */ int computeInvalidateWork(int nodesToProcess) { - int numOfNodes = recentInvalidateSets.size(); - nodesToProcess = Math.min(numOfNodes, nodesToProcess); + int numOfNodes; + ArrayList keyArray; - // TODO should using recentInvalidateSets be synchronized? - // get an array of the keys - ArrayList keyArray = - new ArrayList(recentInvalidateSets.keySet()); + synchronized(recentInvalidateSets) { + numOfNodes = recentInvalidateSets.size(); + // get an array of the keys + keyArray = new ArrayList(recentInvalidateSets.keySet()); + } + + nodesToProcess = Math.min(numOfNodes, nodesToProcess); // randomly pick up nodesToProcess nodes // and put them at [0, nodesToProcess) @@ -2428,7 +2443,10 @@ private int getReplication(Block block) { /** Remove a datanode from the invalidatesSet */ private void removeFromInvalidates(String storageID) { - Collection blocks = recentInvalidateSets.remove(storageID); + Collection blocks; + synchronized(recentInvalidateSets) { + blocks = recentInvalidateSets.remove(storageID); + } if (blocks != null) { pendingDeletionBlocksCount -= blocks.size(); } @@ -2454,39 +2472,43 @@ private int invalidateWorkForOneNode(String nodeId) { return 0; } - Collection invalidateSet = recentInvalidateSets.get(nodeId); - if (invalidateSet == null) - return 0; + Collection invalidateSet; + ArrayList blocksToInvalidate; + synchronized(recentInvalidateSets) { + invalidateSet = recentInvalidateSets.get(nodeId); + if (invalidateSet == null) + return 0; - ArrayList blocksToInvalidate = new ArrayList( + blocksToInvalidate = new ArrayList( getDatanodeManager().blockInvalidateLimit); - // # blocks that can be sent in one message is limited - Iterator it = invalidateSet.iterator(); - for (int blkCount = 0; blkCount < getDatanodeManager().blockInvalidateLimit - && it.hasNext(); blkCount++) { - blocksToInvalidate.add(it.next()); - it.remove(); - } - - // If we send everything in this message, remove this node entry - if (!it.hasNext()) { - removeFromInvalidates(nodeId); - } - - dn.addBlocksToBeInvalidated(blocksToInvalidate); - - if (NameNode.stateChangeLog.isInfoEnabled()) { - StringBuilder blockList = new StringBuilder(); - for (Block blk : blocksToInvalidate) { - blockList.append(' '); - blockList.append(blk); + // # blocks that can be sent in one message is limited + Iterator it = invalidateSet.iterator(); + for (int blkCount = 0; blkCount < getDatanodeManager().blockInvalidateLimit + && it.hasNext(); blkCount++) { + blocksToInvalidate.add(it.next()); + it.remove(); } - NameNode.stateChangeLog.info("BLOCK* ask " + dn.getName() - + " to delete " + blockList); + + // If we send everything in this message, remove this node entry + if (!it.hasNext()) { + removeFromInvalidates(nodeId); + } + + dn.addBlocksToBeInvalidated(blocksToInvalidate); + + if (NameNode.stateChangeLog.isInfoEnabled()) { + StringBuilder blockList = new StringBuilder(); + for (Block blk : blocksToInvalidate) { + blockList.append(' '); + blockList.append(blk); + } + NameNode.stateChangeLog.info("BLOCK* ask " + dn.getName() + + " to delete " + blockList); + } + pendingDeletionBlocksCount -= blocksToInvalidate.size(); + return blocksToInvalidate.size(); } - pendingDeletionBlocksCount -= blocksToInvalidate.size(); - return blocksToInvalidate.size(); } finally { namesystem.writeUnlock(); }