From 73451ed2d9fb5eb228d80ad5f3be302a60496527 Mon Sep 17 00:00:00 2001 From: Hairong Kuang Date: Fri, 26 Aug 2011 04:46:42 +0000 Subject: [PATCH] HDFS-395. DFS Scalability: Incremental block reports. Contributed by Tomasz Nykiel. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1161992 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +- .../server/blockmanagement/BlockManager.java | 52 +++++-- .../hadoop/hdfs/server/datanode/DataNode.java | 130 +++++++++++------- .../hdfs/server/datanode/FSDataset.java | 12 +- .../datanode/FSDatasetAsyncDiskService.java | 52 +++---- .../hadoop/hdfs/server/namenode/NameNode.java | 16 +-- .../hdfs/server/protocol/BlockCommand.java | 10 ++ .../server/protocol/DatanodeProtocol.java | 14 +- .../protocol/ReceivedDeletedBlockInfo.java | 101 ++++++++++++++ .../namenode/NNThroughputBenchmark.java | 18 +-- .../server/namenode/TestDeadDatanode.java | 7 +- 11 files changed, 295 insertions(+), 119 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a2d2105935..ac90757c3b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -2,7 +2,7 @@ Hadoop HDFS Change Log Trunk (unreleased changes) NEW FEATURES - HDFS-349. DFS Scalability: Incremental block reports. (Tomasz Nykiel + HDFS-395. DFS Scalability: Incremental block reports. (Tomasz Nykiel via hairong) Release 0.23.0 - Unreleased diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 081a60430c..4e45449b1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -59,10 +59,12 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.net.Node; import org.apache.hadoop.util.Daemon; @@ -2002,7 +2004,7 @@ private void addToExcessReplicate(DatanodeInfo dn, Block block) { * Modify (block-->datanode) map. Possibly generate replication tasks, if the * removed block is still valid. */ - private void removeStoredBlock(Block block, DatanodeDescriptor node) { + public void removeStoredBlock(Block block, DatanodeDescriptor node) { if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: " + block + " from " + node.getName()); @@ -2121,27 +2123,48 @@ void addBlock(DatanodeDescriptor node, Block block, String delHint) } } - /** The given node is reporting that it received a certain block. */ - public void blockReceived(final DatanodeID nodeID, final String poolId, - final Block block, final String delHint) throws IOException { + /** The given node is reporting that it received/deleted certain blocks. */ + public void blockReceivedAndDeleted(final DatanodeID nodeID, + final String poolId, + final ReceivedDeletedBlockInfo receivedAndDeletedBlocks[] + ) throws IOException { namesystem.writeLock(); + int received = 0; + int deleted = 0; try { final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); if (node == null || !node.isAlive) { - final String s = block + " is received from dead or unregistered node " - + nodeID.getName(); - NameNode.stateChangeLog.warn("BLOCK* blockReceived: " + s); - throw new IOException(s); - } - - if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("BLOCK* blockReceived: " + block - + " is received from " + nodeID.getName()); + NameNode.stateChangeLog + .warn("BLOCK* blockReceivedDeleted" + + " is received from dead or unregistered node " + + nodeID.getName()); + throw new IOException( + "Got blockReceivedDeleted message from unregistered or dead node"); } - addBlock(node, block, delHint); + for (int i = 0; i < receivedAndDeletedBlocks.length; i++) { + if (receivedAndDeletedBlocks[i].isDeletedBlock()) { + removeStoredBlock( + receivedAndDeletedBlocks[i].getBlock(), node); + deleted++; + } else { + addBlock(node, receivedAndDeletedBlocks[i].getBlock(), + receivedAndDeletedBlocks[i].getDelHints()); + received++; + } + if (NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("BLOCK* block" + + (receivedAndDeletedBlocks[i].isDeletedBlock() ? "Deleted" + : "Received") + ": " + receivedAndDeletedBlocks[i].getBlock() + + " is received from " + nodeID.getName()); + } + } } finally { namesystem.writeUnlock(); + NameNode.stateChangeLog + .debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from " + + nodeID.getName() + " received: " + received + ", " + + " deleted: " + deleted); } } @@ -2316,6 +2339,7 @@ public int getTotalBlocks() { } public void removeBlock(Block block) { + block.setNumBytes(BlockCommand.NO_ACK); addToInvalidates(block); corruptReplicas.removeFromCorruptReplicasMap(block); blocksMap.removeBlock(block); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index f94dc9b72c..a9c29cc821 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -106,6 +106,7 @@ import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; @@ -348,6 +349,8 @@ void refreshNamenodes(Configuration conf) ThreadGroup threadGroup = null; long blockReportInterval; boolean resetBlockReportTime = true; + long deleteReportInterval; + long lastDeletedReport = 0; long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L; long heartBeatInterval; private boolean heartbeatsDisabledForTests = false; @@ -458,6 +461,7 @@ private void initConfig(Configuration conf) { this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L; + this.deleteReportInterval = 100 * heartBeatInterval; // do we need to sync block file contents to disk when blockfile is closed? this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, DFS_DATANODE_SYNCONCLOSE_DEFAULT); @@ -643,6 +647,17 @@ protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) } } + // calls specific to BP + protected void notifyNamenodeDeletedBlock(ExtendedBlock block) { + BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); + if (bpos != null) { + bpos.notifyNamenodeDeletedBlock(block); + } else { + LOG.warn("Cannot find BPOfferService for reporting block deleted for bpid=" + + block.getBlockPoolId()); + } + } + public void reportBadBlocks(ExtendedBlock block) throws IOException{ BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); if(bpos == null || bpos.bpNamenode == null) { @@ -677,8 +692,9 @@ class BPOfferService implements Runnable { private String blockPoolId; private long lastHeartbeat = 0; private volatile boolean initialized = false; - private final LinkedList receivedBlockList = new LinkedList(); - private final LinkedList delHints = new LinkedList(); + private final LinkedList receivedAndDeletedBlockList + = new LinkedList(); + private volatile int pendingReceivedRequests = 0; private volatile boolean shouldServiceRun = true; private boolean isBlockTokenInitialized = false; UpgradeManagerDatanode upgradeManager = null; @@ -848,41 +864,33 @@ private void reportBadBlocks(ExtendedBlock block) { /** * Report received blocks and delete hints to the Namenode + * * @throws IOException */ - private void reportReceivedBlocks() throws IOException { - //check if there are newly received blocks - Block [] blockArray=null; - String [] delHintArray=null; - synchronized(receivedBlockList) { - synchronized(delHints){ - int numBlocks = receivedBlockList.size(); - if (numBlocks > 0) { - if(numBlocks!=delHints.size()) { - LOG.warn("Panic: receiveBlockList and delHints are not of " + - "the same length" ); - } - // - // Send newly-received blockids to namenode - // - blockArray = receivedBlockList.toArray(new Block[numBlocks]); - delHintArray = delHints.toArray(new String[numBlocks]); - } + private void reportReceivedDeletedBlocks() throws IOException { + + // check if there are newly received blocks + ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null; + int currentReceivedRequestsCounter; + synchronized (receivedAndDeletedBlockList) { + currentReceivedRequestsCounter = pendingReceivedRequests; + int numBlocks = receivedAndDeletedBlockList.size(); + if (numBlocks > 0) { + // + // Send newly-received and deleted blockids to namenode + // + receivedAndDeletedBlockArray = receivedAndDeletedBlockList + .toArray(new ReceivedDeletedBlockInfo[numBlocks]); } } - if (blockArray != null) { - if(delHintArray == null || delHintArray.length != blockArray.length ) { - LOG.warn("Panic: block array & delHintArray are not the same" ); - } - bpNamenode.blockReceived(bpRegistration, blockPoolId, blockArray, - delHintArray); - synchronized(receivedBlockList) { - synchronized(delHints){ - for(int i=0; i