From 5d9d702607913685eab0d8ad077040ddc82bf085 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Wed, 28 Aug 2013 06:30:15 +0000 Subject: [PATCH] HDFS-4987. Namenode changes to track multiple storages per datanode. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1518087 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES_HDFS-2832.txt | 4 + .../server/blockmanagement/BlockInfo.java | 132 +++++++++------ .../server/blockmanagement/BlockManager.java | 130 +++++++-------- .../blockmanagement/DatanodeDescriptor.java | 152 ++++++++++++------ .../blockmanagement/DatanodeStorageInfo.java | 142 ++++++++++++++++ .../PendingDataNodeMessages.java | 12 +- .../hdfs/server/namenode/FSNamesystem.java | 33 ++-- .../server/namenode/NameNodeRpcServer.java | 26 +-- .../hadoop/hdfs/TestFileCorruption.java | 2 +- .../server/blockmanagement/TestBlockInfo.java | 124 -------------- .../blockmanagement/TestBlockManager.java | 20 +-- .../TestDatanodeDescriptor.java | 7 +- .../TestPendingDataNodeMessages.java | 4 +- .../TestPendingReplication.java | 4 +- .../TestReplicationPolicy.java | 2 +- .../namenode/metrics/TestNameNodeMetrics.java | 4 +- 16 files changed, 458 insertions(+), 340 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt new file mode 100644 index 0000000000..7ab7c4c165 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt @@ -0,0 +1,4 @@ + BREAKDOWN OF HDFS-2832 ENABLE SUPPORT FOR HETEROGENEOUS STORAGES IN HDFS + + HDFS-4987. Namenode changes to track multiple storages per datanode. + (szetszwo) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index 8307180ced..7aa7721a02 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.util.LightWeightGSet; @@ -39,11 +40,11 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement { private LightWeightGSet.LinkedElement nextLinkedElement; /** - * This array contains triplets of references. For each i-th datanode the - * block belongs to triplets[3*i] is the reference to the DatanodeDescriptor - * and triplets[3*i+1] and triplets[3*i+2] are references to the previous and - * the next blocks, respectively, in the list of blocks belonging to this - * data-node. + * This array contains triplets of references. For each i-th storage, the + * block belongs to triplets[3*i] is the reference to the + * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are + * references to the previous and the next blocks, respectively, in the list + * of blocks belonging to this storage. * * Using previous and next in Object triplets is done instead of a * {@link LinkedList} list to efficiently use memory. With LinkedList the cost @@ -86,9 +87,15 @@ public void setBlockCollection(BlockCollection bc) { } DatanodeDescriptor getDatanode(int index) { + DatanodeStorageInfo storage = getStorageInfo(index); + return storage == null ? null : storage.getDatanodeDescriptor(); + } + + DatanodeStorageInfo getStorageInfo(int index) { assert this.triplets != null : "BlockInfo is not initialized"; assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; - return (DatanodeDescriptor)triplets[index*3]; + DatanodeStorageInfo storage = (DatanodeStorageInfo)triplets[index*3]; + return storage; } private BlockInfo getPrevious(int index) { @@ -111,14 +118,10 @@ BlockInfo getNext(int index) { return info; } - private void setDatanode(int index, DatanodeDescriptor node, BlockInfo previous, - BlockInfo next) { + void setStorageInfo(int index, DatanodeStorageInfo storage) { assert this.triplets != null : "BlockInfo is not initialized"; - int i = index * 3; - assert index >= 0 && i+2 < triplets.length : "Index is out of bound"; - triplets[i] = node; - triplets[i+1] = previous; - triplets[i+2] = next; + assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; + triplets[index*3] = storage; } /** @@ -190,22 +193,24 @@ public int numNodes() { } /** - * Add data-node this block belongs to. + * Add a {@link DatanodeStorageInfo} location for a block */ - public boolean addNode(DatanodeDescriptor node) { - if(findDatanode(node) >= 0) // the node is already there + boolean addStorage(DatanodeStorageInfo storage) { + if(findStorageInfo(storage) >= 0) // the node is already there return false; // find the last null node int lastNode = ensureCapacity(1); - setDatanode(lastNode, node, null, null); + setStorageInfo(lastNode, storage); + setNext(lastNode, null); + setPrevious(lastNode, null); return true; } /** - * Remove data-node from the block. + * Remove {@link DatanodeStorageInfo} location for a block */ - public boolean removeNode(DatanodeDescriptor node) { - int dnIndex = findDatanode(node); + boolean removeStorage(DatanodeStorageInfo storage) { + int dnIndex = findStorageInfo(storage); if(dnIndex < 0) // the node is not found return false; assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : @@ -213,10 +218,13 @@ assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : // find the last not null node int lastNode = numNodes()-1; // replace current node triplet by the lastNode one - setDatanode(dnIndex, getDatanode(lastNode), getPrevious(lastNode), - getNext(lastNode)); + setStorageInfo(dnIndex, getStorageInfo(lastNode)); + setNext(dnIndex, getNext(lastNode)); + setPrevious(dnIndex, getPrevious(lastNode)); // set the last triplet to null - setDatanode(lastNode, null, null, null); + setStorageInfo(lastNode, null); + setNext(lastNode, null); + setPrevious(lastNode, null); return true; } @@ -236,37 +244,70 @@ int findDatanode(DatanodeDescriptor dn) { } return -1; } + /** + * Find specified DatanodeStorageInfo. + * @param dn + * @return index or -1 if not found. + */ + int findStorageInfo(DatanodeInfo dn) { + int len = getCapacity(); + for(int idx = 0; idx < len; idx++) { + DatanodeStorageInfo cur = getStorageInfo(idx); + if(cur == null) + break; + if(cur.getDatanodeDescriptor() == dn) + return idx; + } + return -1; + } + + /** + * Find specified DatanodeStorageInfo. + * @param storageInfo + * @return index or -1 if not found. + */ + int findStorageInfo(DatanodeStorageInfo storageInfo) { + int len = getCapacity(); + for(int idx = 0; idx < len; idx++) { + DatanodeStorageInfo cur = getStorageInfo(idx); + if(cur == storageInfo) + return idx; + if(cur == null) + break; + } + return -1; + } /** * Insert this block into the head of the list of blocks - * related to the specified DatanodeDescriptor. + * related to the specified DatanodeStorageInfo. * If the head is null then form a new list. * @return current block as the new head of the list. */ - public BlockInfo listInsert(BlockInfo head, DatanodeDescriptor dn) { - int dnIndex = this.findDatanode(dn); + BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) { + int dnIndex = this.findStorageInfo(storage); assert dnIndex >= 0 : "Data node is not found: current"; assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : "Block is already in the list and cannot be inserted."; this.setPrevious(dnIndex, null); this.setNext(dnIndex, head); if(head != null) - head.setPrevious(head.findDatanode(dn), this); + head.setPrevious(head.findStorageInfo(storage), this); return this; } /** * Remove this block from the list of blocks - * related to the specified DatanodeDescriptor. + * related to the specified DatanodeStorageInfo. * If this block is the head of the list then return the next block as * the new head. * @return the new head of the list or null if the list becomes - * empty after deletion. + * empy after deletion. */ - public BlockInfo listRemove(BlockInfo head, DatanodeDescriptor dn) { + BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) { if(head == null) return null; - int dnIndex = this.findDatanode(dn); + int dnIndex = this.findStorageInfo(storage); if(dnIndex < 0) // this block is not on the data-node list return head; @@ -275,33 +316,20 @@ public BlockInfo listRemove(BlockInfo head, DatanodeDescriptor dn) { this.setNext(dnIndex, null); this.setPrevious(dnIndex, null); if(prev != null) - prev.setNext(prev.findDatanode(dn), next); + prev.setNext(prev.findStorageInfo(storage), next); if(next != null) - next.setPrevious(next.findDatanode(dn), prev); + next.setPrevious(next.findStorageInfo(storage), prev); if(this == head) // removing the head head = next; return head; } - /** - * Remove this block from the list of blocks related to the specified - * DatanodeDescriptor. Insert it into the head of the list of blocks. - * - * @return the new head of the list. - */ - public BlockInfo moveBlockToHead(BlockInfo head, DatanodeDescriptor dn, - int curIndex, int headIndex) { - if (head == this) { - return this; - } - BlockInfo next = this.setNext(curIndex, head); - BlockInfo prev = this.setPrevious(curIndex, null); - - head.setPrevious(headIndex, this); - prev.setNext(prev.findDatanode(dn), next); - if (next != null) - next.setPrevious(next.findDatanode(dn), prev); - return this; + int listCount(DatanodeStorageInfo storage) { + int count = 0; + for(BlockInfo cur = this; cur != null; + cur = cur.getNext(cur.findStorageInfo(storage))) + count++; + return count; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 512a052ddb..77d75a4eb5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -33,6 +33,7 @@ import java.util.Queue; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -68,8 +69,10 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; @@ -1034,7 +1037,7 @@ private void addToInvalidates(Block b) { * for logging purposes */ public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk, - final DatanodeInfo dn, String reason) throws IOException { + final DatanodeInfo dn, String storageID, String reason) throws IOException { assert namesystem.hasWriteLock(); final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock()); if (storedBlock == null) { @@ -1046,11 +1049,11 @@ public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk, + blk + " not found"); return; } - markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn); + markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn, storageID); } private void markBlockAsCorrupt(BlockToMarkCorrupt b, - DatanodeInfo dn) throws IOException { + DatanodeInfo dn, String storageID) throws IOException { DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); if (node == null) { throw new IOException("Cannot mark " + b @@ -1066,7 +1069,7 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b, } // Add replica to the data-node if it is not already there - node.addBlock(b.stored); + node.addBlock(storageID, b.stored); // Add this replica to corruptReplicas Map corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason); @@ -1601,10 +1604,11 @@ public String toString() { } /** - * The given datanode is reporting all its blocks. - * Update the (machine-->blocklist) and (block-->machinelist) maps. + * The given storage is reporting all its blocks. + * Update the (storage-->block list) and (block-->storage list) maps. */ - public void processReport(final DatanodeID nodeID, final String poolId, + public void processReport(final DatanodeID nodeID, + final DatanodeStorage storage, final String poolId, final BlockListAsLongs newReport) throws IOException { namesystem.writeLock(); final long startTime = Time.now(); //after acquiring write lock @@ -1628,9 +1632,9 @@ public void processReport(final DatanodeID nodeID, final String poolId, if (node.numBlocks() == 0) { // The first block report can be processed a lot more efficiently than // ordinary block reports. This shortens restart times. - processFirstBlockReport(node, newReport); + processFirstBlockReport(node, storage.getStorageID(), newReport); } else { - processReport(node, newReport); + processReport(node, storage, newReport); } // Now that we have an up-to-date block report, we know that any @@ -1691,28 +1695,31 @@ private void rescanPostponedMisreplicatedBlocks() { } private void processReport(final DatanodeDescriptor node, + final DatanodeStorage storage, final BlockListAsLongs report) throws IOException { // Normal case: // Modify the (block-->datanode) map, according to the difference // between the old and new block report. // Collection toAdd = new LinkedList(); - Collection toRemove = new LinkedList(); + Collection toRemove = new TreeSet(); Collection toInvalidate = new LinkedList(); Collection toCorrupt = new LinkedList(); Collection toUC = new LinkedList(); - reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC); + reportDiff(node, storage, report, + toAdd, toRemove, toInvalidate, toCorrupt, toUC); // Process the blocks on each queue for (StatefulBlockInfo b : toUC) { - addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState); + addStoredBlockUnderConstruction(b.storedBlock, node, + storage.getStorageID(), b.reportedState); } for (Block b : toRemove) { removeStoredBlock(b, node); } int numBlocksLogged = 0; for (BlockInfo b : toAdd) { - addStoredBlock(b, node, null, numBlocksLogged < maxNumBlocksToLog); + addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -1726,7 +1733,7 @@ private void processReport(final DatanodeDescriptor node, addToInvalidates(b, node); } for (BlockToMarkCorrupt b : toCorrupt) { - markBlockAsCorrupt(b, node); + markBlockAsCorrupt(b, node, storage.getStorageID()); } } @@ -1742,6 +1749,7 @@ private void processReport(final DatanodeDescriptor node, * @throws IOException */ private void processFirstBlockReport(final DatanodeDescriptor node, + final String storageID, final BlockListAsLongs report) throws IOException { if (report == null) return; assert (namesystem.hasWriteLock()); @@ -1754,7 +1762,7 @@ private void processFirstBlockReport(final DatanodeDescriptor node, if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(iblk)) { - queueReportedBlock(node, iblk, reportedState, + queueReportedBlock(node, storageID, iblk, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); continue; } @@ -1771,10 +1779,10 @@ private void processFirstBlockReport(final DatanodeDescriptor node, if (shouldPostponeBlocksFromFuture) { // In the Standby, we may receive a block report for a file that we // just have an out-of-date gen-stamp or state for, for example. - queueReportedBlock(node, iblk, reportedState, + queueReportedBlock(node, storageID, iblk, reportedState, QUEUE_REASON_CORRUPT_STATE); } else { - markBlockAsCorrupt(c, node); + markBlockAsCorrupt(c, node, storageID); } continue; } @@ -1787,25 +1795,26 @@ private void processFirstBlockReport(final DatanodeDescriptor node, } //add replica if appropriate if (reportedState == ReplicaState.FINALIZED) { - addStoredBlockImmediate(storedBlock, node); + addStoredBlockImmediate(storedBlock, node, storageID); } } } - private void reportDiff(DatanodeDescriptor dn, + private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, BlockListAsLongs newReport, Collection toAdd, // add to DatanodeDescriptor Collection toRemove, // remove from DatanodeDescriptor Collection toInvalidate, // should be removed from DN Collection toCorrupt, // add to corrupt replicas list Collection toUC) { // add to under-construction list - // place a delimiter in the list which separates blocks - // that have been reported from those that have not - BlockInfo delimiter = new BlockInfo(new Block(), 1); - boolean added = dn.addBlock(delimiter); - assert added : "Delimiting block cannot be present in the node"; - int headIndex = 0; //currently the delimiter is in the head of the list - int curIndex; + + dn.updateStorage(storage); + + // add all blocks to remove list + for(Iterator it = dn.getBlockIterator(storage.getStorageID()); + it.hasNext(); ) { + toRemove.add(it.next()); + } if (newReport == null) newReport = new BlockListAsLongs(); @@ -1814,20 +1823,10 @@ private void reportDiff(DatanodeDescriptor dn, while(itBR.hasNext()) { Block iblk = itBR.next(); ReplicaState iState = itBR.getCurrentReplicaState(); - BlockInfo storedBlock = processReportedBlock(dn, iblk, iState, - toAdd, toInvalidate, toCorrupt, toUC); - // move block to the head of the list - if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) { - headIndex = dn.moveBlockToHead(storedBlock, curIndex, headIndex); - } + BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(), + iblk, iState, toAdd, toInvalidate, toCorrupt, toUC); + toRemove.remove(storedBlock); } - // collect blocks that have not been reported - // all of them are next to the delimiter - Iterator it = new DatanodeDescriptor.BlockIterator( - delimiter.getNext(0), dn); - while(it.hasNext()) - toRemove.add(it.next()); - dn.removeBlock(delimiter); } /** @@ -1861,7 +1860,8 @@ private void reportDiff(DatanodeDescriptor dn, * @return the up-to-date stored block, if it should be kept. * Otherwise, null. */ - private BlockInfo processReportedBlock(final DatanodeDescriptor dn, + private BlockInfo processReportedBlock(final DatanodeDescriptor dn, + final String storageID, final Block block, final ReplicaState reportedState, final Collection toAdd, final Collection toInvalidate, @@ -1876,7 +1876,7 @@ private BlockInfo processReportedBlock(final DatanodeDescriptor dn, if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(block)) { - queueReportedBlock(dn, block, reportedState, + queueReportedBlock(dn, storageID, block, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); return null; } @@ -1911,7 +1911,7 @@ private BlockInfo processReportedBlock(final DatanodeDescriptor dn, // If the block is an out-of-date generation stamp or state, // but we're the standby, we shouldn't treat it as corrupt, // but instead just queue it for later processing. - queueReportedBlock(dn, storedBlock, reportedState, + queueReportedBlock(dn, storageID, storedBlock, reportedState, QUEUE_REASON_CORRUPT_STATE); } else { toCorrupt.add(c); @@ -1938,7 +1938,7 @@ private BlockInfo processReportedBlock(final DatanodeDescriptor dn, * standby node. @see PendingDataNodeMessages. * @param reason a textual reason to report in the debug logs */ - private void queueReportedBlock(DatanodeDescriptor dn, Block block, + private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block, ReplicaState reportedState, String reason) { assert shouldPostponeBlocksFromFuture; @@ -1948,7 +1948,7 @@ private void queueReportedBlock(DatanodeDescriptor dn, Block block, " from datanode " + dn + " for later processing " + "because " + reason + "."); } - pendingDNMessages.enqueueReportedBlock(dn, block, reportedState); + pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState); } /** @@ -1971,8 +1971,8 @@ private void processQueuedMessages(Iterable rbis) if (LOG.isDebugEnabled()) { LOG.debug("Processing previouly queued message " + rbi); } - processAndHandleReportedBlock( - rbi.getNode(), rbi.getBlock(), rbi.getReportedState(), null); + processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(), + rbi.getBlock(), rbi.getReportedState(), null); } } @@ -2090,18 +2090,18 @@ private boolean isBlockUnderConstruction(BlockInfo storedBlock, void addStoredBlockUnderConstruction( BlockInfoUnderConstruction block, - DatanodeDescriptor node, + DatanodeDescriptor node, String storageID, ReplicaState reportedState) throws IOException { block.addReplicaIfNotPresent(node, block, reportedState); if (reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) { - addStoredBlock(block, node, null, true); + addStoredBlock(block, node, storageID, null, true); } } /** * Faster version of - * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, DatanodeDescriptor, boolean)} + * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)} * , intended for use with initial block report at startup. If not in startup * safe mode, will call standard addStoredBlock(). Assumes this method is * called "immediately" so there is no need to refresh the storedBlock from @@ -2112,17 +2112,17 @@ void addStoredBlockUnderConstruction( * @throws IOException */ private void addStoredBlockImmediate(BlockInfo storedBlock, - DatanodeDescriptor node) + DatanodeDescriptor node, String storageID) throws IOException { assert (storedBlock != null && namesystem.hasWriteLock()); if (!namesystem.isInStartupSafeMode() || namesystem.isPopulatingReplQueues()) { - addStoredBlock(storedBlock, node, null, false); + addStoredBlock(storedBlock, node, storageID, null, false); return; } // just add it - node.addBlock(storedBlock); + node.addBlock(storageID, storedBlock); // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); @@ -2145,6 +2145,7 @@ private void addStoredBlockImmediate(BlockInfo storedBlock, */ private Block addStoredBlock(final BlockInfo block, DatanodeDescriptor node, + String storageID, DatanodeDescriptor delNodeHint, boolean logEveryBlock) throws IOException { @@ -2170,7 +2171,7 @@ private Block addStoredBlock(final BlockInfo block, assert bc != null : "Block must belong to a file"; // add block to the datanode - boolean added = node.addBlock(storedBlock); + boolean added = node.addBlock(storageID, storedBlock); int curReplicaDelta; if (added) { @@ -2614,7 +2615,7 @@ private long addBlock(Block block, List results) { * The given node is reporting that it received a certain block. */ @VisibleForTesting - void addBlock(DatanodeDescriptor node, Block block, String delHint) + void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint) throws IOException { // decrement number of blocks scheduled to this datanode. // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with @@ -2635,11 +2636,12 @@ void addBlock(DatanodeDescriptor node, Block block, String delHint) // Modify the blocks->datanode map and node's map. // pendingReplications.decrement(block, node); - processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED, + processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED, delHintNode); } - private void processAndHandleReportedBlock(DatanodeDescriptor node, Block block, + private void processAndHandleReportedBlock(DatanodeDescriptor node, + String storageID, Block block, ReplicaState reportedState, DatanodeDescriptor delHintNode) throws IOException { // blockReceived reports a finalized block @@ -2647,7 +2649,7 @@ private void processAndHandleReportedBlock(DatanodeDescriptor node, Block block, Collection toInvalidate = new LinkedList(); Collection toCorrupt = new LinkedList(); Collection toUC = new LinkedList(); - processReportedBlock(node, block, reportedState, + processReportedBlock(node, storageID, block, reportedState, toAdd, toInvalidate, toCorrupt, toUC); // the block is only in one of the to-do lists // if it is in none then data-node already has it @@ -2655,11 +2657,11 @@ private void processAndHandleReportedBlock(DatanodeDescriptor node, Block block, : "The block should be only in one of the lists."; for (StatefulBlockInfo b : toUC) { - addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState); + addStoredBlockUnderConstruction(b.storedBlock, node, storageID, b.reportedState); } long numBlocksLogged = 0; for (BlockInfo b : toAdd) { - addStoredBlock(b, node, delHintNode, numBlocksLogged < maxNumBlocksToLog); + addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -2673,7 +2675,7 @@ private void processAndHandleReportedBlock(DatanodeDescriptor node, Block block, addToInvalidates(b, node); } for (BlockToMarkCorrupt b : toCorrupt) { - markBlockAsCorrupt(b, node); + markBlockAsCorrupt(b, node, storageID); } } @@ -2685,7 +2687,7 @@ private void processAndHandleReportedBlock(DatanodeDescriptor node, Block block, * This method must be called with FSNamesystem lock held. */ public void processIncrementalBlockReport(final DatanodeID nodeID, - final String poolId, final ReceivedDeletedBlockInfo blockInfos[]) + final String poolId, final StorageReceivedDeletedBlocks srdb) throws IOException { assert namesystem.hasWriteLock(); int received = 0; @@ -2701,19 +2703,19 @@ public void processIncrementalBlockReport(final DatanodeID nodeID, "Got incremental block report from unregistered or dead node"); } - for (ReceivedDeletedBlockInfo rdbi : blockInfos) { + for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) { switch (rdbi.getStatus()) { case DELETED_BLOCK: removeStoredBlock(rdbi.getBlock(), node); deleted++; break; case RECEIVED_BLOCK: - addBlock(node, rdbi.getBlock(), rdbi.getDelHints()); + addBlock(node, srdb.getStorageID(), rdbi.getBlock(), rdbi.getDelHints()); received++; break; case RECEIVING_BLOCK: receiving++; - processAndHandleReportedBlock(node, rdbi.getBlock(), + processAndHandleReportedBlock(node, srdb.getStorageID(), rdbi.getBlock(), ReplicaState.RBW, null); break; default: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index c542ae343e..0c3ff98fc4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -18,15 +18,20 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Queue; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.util.Time; @@ -93,8 +98,9 @@ synchronized void clear() { } } - private volatile BlockInfo blockList = null; - private int numBlocks = 0; + private final Map storageMap = + new HashMap(); + // isAlive == heartbeats.contains(this) // This is an optimization, because contains takes O(n) time on Arraylist public boolean isAlive = false; @@ -217,39 +223,47 @@ public DatanodeDescriptor(DatanodeID nodeID, } /** - * Add datanode to the block. - * Add block to the head of the list of blocks belonging to the data-node. + * Add data-node to the block. Add block to the head of the list of blocks + * belonging to the data-node. */ - public boolean addBlock(BlockInfo b) { - if(!b.addNode(this)) - return false; - // add to the head of the data-node list - blockList = b.listInsert(blockList, this); - numBlocks++; - return true; - } - - /** - * Remove block from the list of blocks belonging to the data-node. - * Remove datanode from the block. - */ - public boolean removeBlock(BlockInfo b) { - blockList = b.listRemove(blockList, this); - if ( b.removeNode(this) ) { - numBlocks--; - return true; - } else { - return false; + public boolean addBlock(String storageID, BlockInfo b) { + DatanodeStorageInfo s = getStorageInfo(storageID); + if (s != null) { + return s.addBlock(b); } + return false; + } + + DatanodeStorageInfo getStorageInfo(String storageID) { + return storageMap.get(storageID); + } + public Collection getStorageInfos() { + return storageMap.values(); } /** - * Move block to the head of the list of blocks belonging to the data-node. - * @return the index of the head of the blockList + * Remove block from the list of blocks belonging to the data-node. Remove + * data-node from the block. */ - int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) { - blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex); - return curIndex; + boolean removeBlock(BlockInfo b) { + int index = b.findStorageInfo(this); + DatanodeStorageInfo s = b.getStorageInfo(index); + if (s != null) { + return s.removeBlock(b); + } + return false; + } + + /** + * Remove block from the list of blocks belonging to the data-node. Remove + * data-node from the block. + */ + boolean removeBlock(String storageID, BlockInfo b) { + DatanodeStorageInfo s = getStorageInfo(storageID); + if (s != null) { + return s.removeBlock(b); + } + return false; } /** @@ -257,7 +271,7 @@ int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) { * @return the head of the blockList */ protected BlockInfo getHead(){ - return blockList; + return getBlockIterator().next(); } /** @@ -268,9 +282,12 @@ protected BlockInfo getHead(){ * @return the new block */ public BlockInfo replaceBlock(BlockInfo oldBlock, BlockInfo newBlock) { - boolean done = removeBlock(oldBlock); + int index = oldBlock.findStorageInfo(this); + DatanodeStorageInfo s = oldBlock.getStorageInfo(index); + boolean done = s.removeBlock(oldBlock); assert done : "Old block should belong to the data-node when replacing"; - done = addBlock(newBlock); + + done = s.addBlock(newBlock); assert done : "New block should not belong to the data-node when replacing"; return newBlock; } @@ -281,7 +298,6 @@ public void resetBlocks() { setBlockPoolUsed(0); setDfsUsed(0); setXceiverCount(0); - this.blockList = null; this.invalidateBlocks.clear(); this.volumeFailures = 0; } @@ -295,7 +311,12 @@ public void clearBlockQueues() { } public int numBlocks() { - return numBlocks; + // TODO: synchronization + int blocks = 0; + for (DatanodeStorageInfo entry : storageMap.values()) { + blocks += entry.numBlocks(); + } + return blocks; } /** @@ -314,38 +335,52 @@ public void updateHeartbeat(long capacity, long dfsUsed, long remaining, rollBlocksScheduled(getLastUpdate()); } - /** - * Iterates over the list of blocks belonging to the datanode. - */ - public static class BlockIterator implements Iterator { - private BlockInfo current; - private DatanodeDescriptor node; - - BlockIterator(BlockInfo head, DatanodeDescriptor dn) { - this.current = head; - this.node = dn; + private static class BlockIterator implements Iterator { + private final int maxIndex; + private int index = 0; + private List> iterators = new ArrayList>(); + + private BlockIterator(final Iterable storages) { + for (DatanodeStorageInfo e : storages) { + iterators.add(e.getBlockIterator()); + } + maxIndex = iterators.size() - 1; + } + + private BlockIterator(final DatanodeStorageInfo storage) { + iterators.add(storage.getBlockIterator()); + maxIndex = iterators.size() - 1; } @Override public boolean hasNext() { - return current != null; + update(); + return iterators.get(index).hasNext(); } @Override public BlockInfo next() { - BlockInfo res = current; - current = current.getNext(current.findDatanode(node)); - return res; + update(); + return iterators.get(index).next(); } - + @Override - public void remove() { - throw new UnsupportedOperationException("Sorry. can't remove."); + public void remove() { + throw new UnsupportedOperationException("Remove unsupported."); + } + + private void update() { + while(index < maxIndex && !iterators.get(index).hasNext()) { + index++; + } } } - public Iterator getBlockIterator() { - return new BlockIterator(this.blockList, this); + Iterator getBlockIterator() { + return new BlockIterator(storageMap.values()); + } + Iterator getBlockIterator(final String storageID) { + return new BlockIterator(storageMap.get(storageID)); } /** @@ -601,4 +636,15 @@ public String dumpDatanode() { } return sb.toString(); } + + DatanodeStorageInfo updateStorage(DatanodeStorage s) { + DatanodeStorageInfo storage = getStorageInfo(s.getStorageID()); + if (storage == null) { + storage = new DatanodeStorageInfo(this, s); + storageMap.put(s.getStorageID(), storage); + } else { + storage.setState(s.getState()); + } + return storage; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java new file mode 100644 index 0000000000..805e334acb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.util.Iterator; + +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; + +/** + * A Datanode has one or more storages. A storage in the Datanode is represented + * by this class. + */ +public class DatanodeStorageInfo { + /** + * Iterates over the list of blocks belonging to the data-node. + */ + static class BlockIterator implements Iterator { + private BlockInfo current; + private DatanodeStorageInfo node; + + BlockIterator(BlockInfo head, DatanodeStorageInfo dn) { + this.current = head; + this.node = dn; + } + + public boolean hasNext() { + return current != null; + } + + public BlockInfo next() { + BlockInfo res = current; + current = current.getNext(current.findStorageInfo(node)); + return res; + } + + public void remove() { + throw new UnsupportedOperationException("Sorry. can't remove."); + } + } + + private final DatanodeDescriptor dn; + private final String storageID; + private final StorageType storageType; + private State state; + private long capacity; + private long dfsUsed; + private long remaining; + private volatile BlockInfo blockList = null; + + DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) { + this.dn = dn; + this.storageID = s.getStorageID(); + this.storageType = s.getStorageType(); + this.state = s.getState(); + } + + public void setUtilization(long capacity, long dfsUsed, long remaining) { + this.capacity = capacity; + this.dfsUsed = dfsUsed; + this.remaining = remaining; + } + + public void setState(State s) { + this.state = s; + + // TODO: if goes to failed state cleanup the block list + } + + public State getState() { + return this.state; + } + + public String getStorageID() { + return storageID; + } + + public long getCapacity() { + return capacity; + } + + public long getDfsUsed() { + return dfsUsed; + } + + public long getRemaining() { + return remaining; + } + + public boolean addBlock(BlockInfo b) { + if(!b.addStorage(this)) + return false; + // add to the head of the data-node list + blockList = b.listInsert(blockList, this); + return true; + } + + public boolean removeBlock(BlockInfo b) { + blockList = b.listRemove(blockList, this); + return b.removeStorage(this); + } + + public int numBlocks() { + return blockList == null ? 0 : blockList.listCount(this); + } + + Iterator getBlockIterator() { + return new BlockIterator(this.blockList, this); + } + + public void updateState(StorageReport r) { + capacity = r.getCapacity(); + dfsUsed = r.getDfsUsed(); + remaining = r.getRemaining(); + } + + public DatanodeDescriptor getDatanodeDescriptor() { + return dn; + } + + @Override + public String toString() { + return "[" + storageType + "]" + storageID + ":" + state; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java index 860d1d261f..8afd3ce6ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java @@ -42,11 +42,13 @@ class PendingDataNodeMessages { static class ReportedBlockInfo { private final Block block; private final DatanodeDescriptor dn; + private final String storageID; private final ReplicaState reportedState; - ReportedBlockInfo(DatanodeDescriptor dn, Block block, + ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block, ReplicaState reportedState) { this.dn = dn; + this.storageID = storageID; this.block = block; this.reportedState = reportedState; } @@ -58,6 +60,10 @@ Block getBlock() { DatanodeDescriptor getNode() { return dn; } + + String getStorageID() { + return storageID; + } ReplicaState getReportedState() { return reportedState; @@ -70,11 +76,11 @@ public String toString() { } } - void enqueueReportedBlock(DatanodeDescriptor dn, Block block, + void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block, ReplicaState reportedState) { block = new Block(block); getBlockQueue(block).add( - new ReportedBlockInfo(dn, block, reportedState)); + new ReportedBlockInfo(dn, storageID, block, reportedState)); count++; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 030893028c..d83d53a5f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -162,7 +162,13 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; -import org.apache.hadoop.hdfs.server.blockmanagement.*; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics; +import org.apache.hadoop.hdfs.server.blockmanagement.OutOfV1GenerationStampsException; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; @@ -174,14 +180,7 @@ import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; -import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; -import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; -import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter; -import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status; -import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; -import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType; import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.ha.HAState; @@ -193,6 +192,12 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType; import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -201,7 +206,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RetryCache; @@ -3785,7 +3790,7 @@ void commitBlockSynchronization(ExtendedBlock lastblock, // Otherwise fsck will report these blocks as MISSING, especially if the // blocksReceived from Datanodes take a long time to arrive. for (int i = 0; i < descriptors.length; i++) { - descriptors[i].addBlock(storedBlock); + descriptors[i].addBlock(newtargetstorages[i], storedBlock); } } // add pipeline locations into the INodeUnderConstruction @@ -5088,11 +5093,11 @@ NamenodeCommand startCheckpoint(NamenodeRegistration backupNode, } public void processIncrementalBlockReport(final DatanodeID nodeID, - final String poolId, final ReceivedDeletedBlockInfo blockInfos[]) + final String poolId, final StorageReceivedDeletedBlocks srdb) throws IOException { writeLock(); try { - blockManager.processIncrementalBlockReport(nodeID, poolId, blockInfos); + blockManager.processIncrementalBlockReport(nodeID, poolId, srdb); } finally { writeUnlock(); } @@ -5578,8 +5583,8 @@ void reportBadBlocks(LocatedBlock[] blocks) throws IOException { ExtendedBlock blk = blocks[i].getBlock(); DatanodeInfo[] nodes = blocks[i].getLocations(); for (int j = 0; j < nodes.length; j++) { - DatanodeInfo dn = nodes[j]; - blockManager.findAndMarkBlockAsCorrupt(blk, dn, + //TODO: add "storageID to LocatedBlock + blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j], "STORAGE_ID", "client machine reported it"); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 0a3309fdbb..62a17a96f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -42,8 +42,8 @@ import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; @@ -59,21 +59,21 @@ import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; @@ -89,9 +89,10 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; @@ -943,14 +944,18 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg, public DatanodeCommand blockReport(DatanodeRegistration nodeReg, String poolId, StorageBlockReport[] reports) throws IOException { verifyRequest(nodeReg); - BlockListAsLongs blist = new BlockListAsLongs(reports[0].getBlocks()); if(blockStateChangeLog.isDebugEnabled()) { blockStateChangeLog.debug("*BLOCK* NameNode.blockReport: " - + "from " + nodeReg + " " + blist.getNumberOfBlocks() - + " blocks"); + + "from " + nodeReg + ", reports.length=" + reports.length); + } + final BlockManager bm = namesystem.getBlockManager(); + for(StorageBlockReport r : reports) { + final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks()); + bm.processReport(nodeReg, r.getStorage(), poolId, blocks); } - namesystem.getBlockManager().processReport(nodeReg, poolId, blist); + DatanodeDescriptor datanode = bm.getDatanodeManager().getDatanode(nodeReg); + datanode.receivedBlockReport(); if (nn.getFSImage().isUpgradeFinalized() && !nn.isStandbyState()) return new FinalizeCommand(poolId); return null; @@ -965,8 +970,9 @@ public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId, +"from "+nodeReg+" "+receivedAndDeletedBlocks.length +" blocks."); } - namesystem.processIncrementalBlockReport( - nodeReg, poolId, receivedAndDeletedBlocks[0].getBlocks()); + for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) { + namesystem.processIncrementalBlockReport(nodeReg, poolId, r); + } } @Override // DatanodeProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java index 458880af56..a829096c32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java @@ -157,7 +157,7 @@ public void testArrayOutOfBoundsException() throws Exception { ns.writeLock(); try { cluster.getNamesystem().getBlockManager().findAndMarkBlockAsCorrupt( - blk, new DatanodeInfo(dnR), "TEST"); + blk, new DatanodeInfo(dnR), "TEST", "STORAGE_ID"); } finally { ns.writeUnlock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java deleted file mode 100644 index 8b7111f010..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.blockmanagement; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Random; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.common.GenerationStamp; -import org.junit.Test; - -/** - * This class provides tests for BlockInfo class, which is used in BlocksMap. - * The test covers BlockList.listMoveToHead, used for faster block report - * processing in DatanodeDescriptor.reportDiff. - */ - -public class TestBlockInfo { - - private static final Log LOG = LogFactory - .getLog("org.apache.hadoop.hdfs.TestBlockInfo"); - - @Test - public void testBlockListMoveToHead() throws Exception { - LOG.info("BlockInfo moveToHead tests..."); - - final int MAX_BLOCKS = 10; - - DatanodeDescriptor dd = DFSTestUtil.getLocalDatanodeDescriptor(); - ArrayList blockList = new ArrayList(MAX_BLOCKS); - ArrayList blockInfoList = new ArrayList(); - int headIndex; - int curIndex; - - LOG.info("Building block list..."); - for (int i = 0; i < MAX_BLOCKS; i++) { - blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP)); - blockInfoList.add(new BlockInfo(blockList.get(i), 3)); - dd.addBlock(blockInfoList.get(i)); - - // index of the datanode should be 0 - assertEquals("Find datanode should be 0", 0, blockInfoList.get(i) - .findDatanode(dd)); - } - - // list length should be equal to the number of blocks we inserted - LOG.info("Checking list length..."); - assertEquals("Length should be MAX_BLOCK", MAX_BLOCKS, dd.numBlocks()); - Iterator it = dd.getBlockIterator(); - int len = 0; - while (it.hasNext()) { - it.next(); - len++; - } - assertEquals("There should be MAX_BLOCK blockInfo's", MAX_BLOCKS, len); - - headIndex = dd.getHead().findDatanode(dd); - - LOG.info("Moving each block to the head of the list..."); - for (int i = 0; i < MAX_BLOCKS; i++) { - curIndex = blockInfoList.get(i).findDatanode(dd); - headIndex = dd.moveBlockToHead(blockInfoList.get(i), curIndex, headIndex); - // the moved element must be at the head of the list - assertEquals("Block should be at the head of the list now.", - blockInfoList.get(i), dd.getHead()); - } - - // move head of the list to the head - this should not change the list - LOG.info("Moving head to the head..."); - - BlockInfo temp = dd.getHead(); - curIndex = 0; - headIndex = 0; - dd.moveBlockToHead(temp, curIndex, headIndex); - assertEquals( - "Moving head to the head of the list shopuld not change the list", - temp, dd.getHead()); - - // check all elements of the list against the original blockInfoList - LOG.info("Checking elements of the list..."); - temp = dd.getHead(); - assertNotNull("Head should not be null", temp); - int c = MAX_BLOCKS - 1; - while (temp != null) { - assertEquals("Expected element is not on the list", - blockInfoList.get(c--), temp); - temp = temp.getNext(0); - } - - LOG.info("Moving random blocks to the head of the list..."); - headIndex = dd.getHead().findDatanode(dd); - Random rand = new Random(); - for (int i = 0; i < MAX_BLOCKS; i++) { - int j = rand.nextInt(MAX_BLOCKS); - curIndex = blockInfoList.get(j).findDatanode(dd); - headIndex = dd.moveBlockToHead(blockInfoList.get(j), curIndex, headIndex); - // the moved element must be at the head of the list - assertEquals("Block should be at the head of the list now.", - blockInfoList.get(j), dd.getHead()); - } - } -} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index e88ec92e39..67306cf782 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -124,7 +124,7 @@ public void testBasicReplication() throws Exception { private void doBasicTest(int testIndex) { List origNodes = getNodes(0, 1); - BlockInfo blockInfo = addBlockOnNodes((long)testIndex, origNodes); + BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes); DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo); assertEquals(2, pipeline.length); @@ -307,7 +307,7 @@ public void testSufficientlyReplBlocksUsesNewRack() throws Exception { private void doTestSufficientlyReplBlocksUsesNewRack(int testIndex) { // Originally on only nodes in rack A. List origNodes = rackA; - BlockInfo blockInfo = addBlockOnNodes((long)testIndex, origNodes); + BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes); DatanodeDescriptor pipeline[] = scheduleSingleReplication(blockInfo); assertEquals(2, pipeline.length); // single new copy @@ -340,7 +340,7 @@ private void doTestSingleRackClusterIsSufficientlyReplicated(int testIndex, List origNodes) throws Exception { assertEquals(0, bm.numOfUnderReplicatedBlocks()); - addBlockOnNodes((long)testIndex, origNodes); + addBlockOnNodes(testIndex, origNodes); bm.processMisReplicatedBlocks(); assertEquals(0, bm.numOfUnderReplicatedBlocks()); } @@ -353,7 +353,7 @@ private void doTestSingleRackClusterIsSufficientlyReplicated(int testIndex, private void fulfillPipeline(BlockInfo blockInfo, DatanodeDescriptor[] pipeline) throws IOException { for (int i = 1; i < pipeline.length; i++) { - bm.addBlock(pipeline[i], blockInfo, null); + bm.addBlock(pipeline[i], "STORAGE_ID", blockInfo, null); } } @@ -362,7 +362,9 @@ private BlockInfo blockOnNodes(long blkId, List nodes) { BlockInfo blockInfo = new BlockInfo(block, 3); for (DatanodeDescriptor dn : nodes) { - blockInfo.addNode(dn); + for (DatanodeStorageInfo storage : dn.getStorageInfos()) { + blockInfo.addStorage(storage); + } } return blockInfo; } @@ -508,12 +510,12 @@ public void testSafeModeIBR() throws Exception { assertTrue(node.isFirstBlockReport()); // send block report, should be processed reset(node); - bm.processReport(node, "pool", new BlockListAsLongs(null, null)); + bm.processReport(node, null, "pool", new BlockListAsLongs(null, null)); verify(node).receivedBlockReport(); assertFalse(node.isFirstBlockReport()); // send block report again, should NOT be processed reset(node); - bm.processReport(node, "pool", new BlockListAsLongs(null, null)); + bm.processReport(node, null, "pool", new BlockListAsLongs(null, null)); verify(node, never()).receivedBlockReport(); assertFalse(node.isFirstBlockReport()); @@ -525,7 +527,7 @@ public void testSafeModeIBR() throws Exception { assertTrue(node.isFirstBlockReport()); // ready for report again // send block report, should be processed after restart reset(node); - bm.processReport(node, "pool", new BlockListAsLongs(null, null)); + bm.processReport(node, null, "pool", new BlockListAsLongs(null, null)); verify(node).receivedBlockReport(); assertFalse(node.isFirstBlockReport()); } @@ -550,7 +552,7 @@ public void testSafeModeIBRAfterIncremental() throws Exception { // send block report while pretending to already have blocks reset(node); doReturn(1).when(node).numBlocks(); - bm.processReport(node, "pool", new BlockListAsLongs(null, null)); + bm.processReport(node, null, "pool", new BlockListAsLongs(null, null)); verify(node).receivedBlockReport(); assertFalse(node.isFirstBlockReport()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java index bd030fb94d..41afa739d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java @@ -59,17 +59,18 @@ public void testBlocksCounter() throws Exception { assertEquals(0, dd.numBlocks()); BlockInfo blk = new BlockInfo(new Block(1L), 1); BlockInfo blk1 = new BlockInfo(new Block(2L), 2); + final String storageID = "STORAGE_ID"; // add first block - assertTrue(dd.addBlock(blk)); + assertTrue(dd.addBlock(storageID, blk)); assertEquals(1, dd.numBlocks()); // remove a non-existent block assertFalse(dd.removeBlock(blk1)); assertEquals(1, dd.numBlocks()); // add an existent block - assertFalse(dd.addBlock(blk)); + assertFalse(dd.addBlock(storageID, blk)); assertEquals(1, dd.numBlocks()); // add second block - assertTrue(dd.addBlock(blk1)); + assertTrue(dd.addBlock(storageID, blk1)); assertEquals(2, dd.numBlocks()); // remove first block assertTrue(dd.removeBlock(blk)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java index 3c7ad8ca02..dbff77402f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java @@ -43,8 +43,8 @@ public class TestPendingDataNodeMessages { @Test public void testQueues() { DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor(); - msgs.enqueueReportedBlock(fakeDN, block1Gs1, ReplicaState.FINALIZED); - msgs.enqueueReportedBlock(fakeDN, block1Gs2, ReplicaState.FINALIZED); + msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs1, ReplicaState.FINALIZED); + msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs2, ReplicaState.FINALIZED); assertEquals(2, msgs.count()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java index 0c1fee9864..8eb9ced9e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java @@ -291,9 +291,9 @@ public void testPendingAndInvalidate() throws Exception { cluster.getNamesystem().writeLock(); try { bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0], - "TEST"); + "STORAGE_ID", "TEST"); bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[1], - "TEST"); + "STORAGE_ID", "TEST"); } finally { cluster.getNamesystem().writeUnlock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index ba6c373726..547e8536f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -1096,7 +1096,7 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication() // Adding this block will increase its current replication, and that will // remove it from the queue. bm.addStoredBlockUnderConstruction(info, - TestReplicationPolicy.dataNodes[0], ReplicaState.FINALIZED); + TestReplicationPolicy.dataNodes[0], "STORAGE", ReplicaState.FINALIZED); // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java index 007fda5fb5..d8aed46641 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java @@ -235,7 +235,7 @@ public void testCorruptBlock() throws Exception { cluster.getNamesystem().writeLock(); try { bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0], - "TEST"); + "STORAGE_ID", "TEST"); } finally { cluster.getNamesystem().writeUnlock(); } @@ -286,7 +286,7 @@ public void testMissingBlock() throws Exception { cluster.getNamesystem().writeLock(); try { bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0], - "TEST"); + "STORAGE_ID", "TEST"); } finally { cluster.getNamesystem().writeUnlock(); }