From fa5ba6d977520f1faaa97c55a50a22c98b3ee109 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Wed, 6 Nov 2013 06:50:33 +0000 Subject: [PATCH] HDFS-5439. Fix TestPendingReplication. (Contributed by Junping Du, Arpit Agarwal) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1539247 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES_HDFS-2832.txt | 3 +++ .../server/blockmanagement/BlockManager.java | 7 +++--- .../BlockPlacementPolicyDefault.java | 14 +++++++----- .../blockmanagement/DatanodeStorageInfo.java | 9 ++++++++ .../PendingReplicationBlocks.java | 22 +++++++++---------- .../hdfs/server/datanode/BPServiceActor.java | 2 -- .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../server/protocol/DatanodeRegistration.java | 2 +- .../TestPendingReplication.java | 21 ++++++++++-------- 9 files changed, 50 insertions(+), 32 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt index f5ede31804..1bcd677af6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt @@ -76,3 +76,6 @@ IMPROVEMENTS: HDFS-5466. Update storage IDs when the pipeline is updated. (Contributed by szetszwo) + HDFS-5439. Fix TestPendingReplication. (Contributed by Junping Du, Arpit + Agarwal) + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 867f4e1802..54b24d1a22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1332,7 +1332,8 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { // Move the block-replication into a "pending" state. // The reason we use 'pending' is so we can retry // replications that fail after an appropriate amount of time. - pendingReplications.increment(block, targets); + pendingReplications.increment(block, + DatanodeStorageInfo.toDatanodeDescriptors(targets)); if(blockLog.isDebugEnabled()) { blockLog.debug( "BLOCK* block " + block @@ -1357,7 +1358,7 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { StringBuilder targetList = new StringBuilder("datanode(s)"); for (int k = 0; k < targets.length; k++) { targetList.append(' '); - targetList.append(targets[k]); + targetList.append(targets[k].getDatanodeDescriptor()); } blockLog.info("BLOCK* ask " + rw.srcNode + " to replicate " + rw.block + " to " + targetList); @@ -2645,7 +2646,7 @@ void addBlock(DatanodeDescriptor node, String storageID, Block block, String del // // Modify the blocks->datanode map and node's map. // - pendingReplications.decrement(block, node, storageID); + pendingReplications.decrement(block, node); processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED, delHintNode); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 8cd5dd6be5..fa1bfe645d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -496,7 +496,7 @@ protected DatanodeStorageInfo chooseRandom(int numOfReplicas, builder.setLength(0); builder.append("["); } - boolean goodTarget = false; + boolean badTarget = false; DatanodeStorageInfo firstChosen = null; while(numOfReplicas > 0 && numOfAvailableNodes > 0) { DatanodeDescriptor chosenNode = @@ -506,26 +506,30 @@ protected DatanodeStorageInfo chooseRandom(int numOfReplicas, final DatanodeStorageInfo[] storages = DFSUtil.shuffle( chosenNode.getStorageInfos()); - for(int i = 0; i < storages.length && !goodTarget; i++) { + int i; + for(i = 0; i < storages.length; i++) { final int newExcludedNodes = addIfIsGoodTarget(storages[i], excludedNodes, blocksize, maxNodesPerRack, considerLoad, results, avoidStaleNodes, storageType); - goodTarget = newExcludedNodes >= 0; - if (goodTarget) { + if (newExcludedNodes >= 0) { numOfReplicas--; if (firstChosen == null) { firstChosen = storages[i]; } numOfAvailableNodes -= newExcludedNodes; + break; } } + + // If no candidate storage was found on this DN then set badTarget. + badTarget = (i == storages.length); } } if (numOfReplicas>0) { String detail = enableDebugLogging; if (LOG.isDebugEnabled()) { - if (!goodTarget && builder != null) { + if (badTarget && builder != null) { detail = builder.append("]").toString(); builder.setLength(0); } else detail = ""; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 440a3cfe81..7286755459 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -46,6 +46,15 @@ static DatanodeInfo[] toDatanodeInfos(List storages) { return datanodes; } + static DatanodeDescriptor[] toDatanodeDescriptors( + DatanodeStorageInfo[] storages) { + DatanodeDescriptor[] datanodes = new DatanodeDescriptor[storages.length]; + for (int i = 0; i < storages.length; ++i) { + datanodes[i] = storages[i].getDatanodeDescriptor(); + } + return datanodes; + } + public static String[] toStorageIDs(DatanodeStorageInfo[] storages) { String[] storageIDs = new String[storages.length]; for(int i = 0; i < storageIDs.length; i++) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java index c99d53fc14..9a05c711fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java @@ -76,7 +76,7 @@ void start() { * @param block The corresponding block * @param targets The DataNodes where replicas of the block should be placed */ - void increment(Block block, DatanodeStorageInfo[] targets) { + void increment(Block block, DatanodeDescriptor[] targets) { synchronized (pendingReplications) { PendingBlockInfo found = pendingReplications.get(block); if (found == null) { @@ -95,14 +95,14 @@ void increment(Block block, DatanodeStorageInfo[] targets) { * * @param The DataNode that finishes the replication */ - void decrement(Block block, DatanodeDescriptor dn, String storageID) { + void decrement(Block block, DatanodeDescriptor dn) { synchronized (pendingReplications) { PendingBlockInfo found = pendingReplications.get(block); if (found != null) { if(LOG.isDebugEnabled()) { LOG.debug("Removing pending replication for " + block); } - found.decrementReplicas(dn.getStorageInfo(storageID)); + found.decrementReplicas(dn); if (found.getNumReplicas() <= 0) { pendingReplications.remove(block); } @@ -174,12 +174,12 @@ Block[] getTimedOutBlocks() { */ static class PendingBlockInfo { private long timeStamp; - private final List targets; + private final List targets; - PendingBlockInfo(DatanodeStorageInfo[] targets) { + PendingBlockInfo(DatanodeDescriptor[] targets) { this.timeStamp = now(); - this.targets = targets == null ? new ArrayList() - : new ArrayList(Arrays.asList(targets)); + this.targets = targets == null ? new ArrayList() + : new ArrayList(Arrays.asList(targets)); } long getTimeStamp() { @@ -190,16 +190,16 @@ void setTimeStamp() { timeStamp = now(); } - void incrementReplicas(DatanodeStorageInfo... newTargets) { + void incrementReplicas(DatanodeDescriptor... newTargets) { if (newTargets != null) { - for (DatanodeStorageInfo dn : newTargets) { + for (DatanodeDescriptor dn : newTargets) { targets.add(dn); } } } - void decrementReplicas(DatanodeStorageInfo storage) { - targets.remove(storage); + void decrementReplicas(DatanodeDescriptor target) { + targets.remove(target); } int getNumReplicas() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 8eb6527053..652c990f43 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -267,8 +267,6 @@ void reportBadBlocks(ExtendedBlock block, /** * Report received blocks and delete hints to the Namenode - * TODO: Fix reportReceivedDeletedBlocks to send reports per-volume. - * * @throws IOException */ private void reportReceivedDeletedBlocks() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 22778c3cc6..e2615f4ca5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -867,7 +867,7 @@ private void initStorage(final NamespaceInfo nsInfo) throws IOException { final StorageInfo bpStorage = storage.getBPStorage(bpid); LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID() + ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion() - + ";nsInfo=" + nsInfo); + + ";nsInfo=" + nsInfo + ";dnuuid=" + storage.getDatanodeUuid()); } synchronized(this) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java index c4c30f8bb8..8ab18f4b3c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java @@ -82,7 +82,7 @@ public String getAddress() { public String toString() { return getClass().getSimpleName() + "(" + getIpAddr() - + ", storageID=" + getDatanodeUuid() + + ", datanodeUuid=" + getDatanodeUuid() + ", infoPort=" + getInfoPort() + ", ipcPort=" + getIpcPort() + ", storageInfo=" + storageInfo diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java index 71da2b3c22..c63badc9eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java @@ -67,7 +67,8 @@ public void testPendingReplication() { Block block = new Block(i, i, 0); DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i]; System.arraycopy(storages, 0, targets, 0, i); - pendingReplications.increment(block, targets); + pendingReplications.increment(block, + DatanodeStorageInfo.toDatanodeDescriptors(targets)); } assertEquals("Size of pendingReplications ", 10, pendingReplications.size()); @@ -77,18 +78,18 @@ public void testPendingReplication() { // remove one item and reinsert it // Block blk = new Block(8, 8, 0); - pendingReplications.decrement(blk, storages[7].getDatanodeDescriptor(), - storages[7].getStorageID()); // removes one replica + pendingReplications.decrement(blk, storages[7].getDatanodeDescriptor()); // removes one replica assertEquals("pendingReplications.getNumReplicas ", 7, pendingReplications.getNumReplicas(blk)); for (int i = 0; i < 7; i++) { // removes all replicas - pendingReplications.decrement(blk, storages[i].getDatanodeDescriptor(), - storages[i].getStorageID()); + pendingReplications.decrement(blk, storages[i].getDatanodeDescriptor()); } assertTrue(pendingReplications.size() == 9); - pendingReplications.increment(blk, DFSTestUtil.createDatanodeStorageInfos(8)); + pendingReplications.increment(blk, + DatanodeStorageInfo.toDatanodeDescriptors( + DFSTestUtil.createDatanodeStorageInfos(8))); assertTrue(pendingReplications.size() == 10); // @@ -116,7 +117,9 @@ public void testPendingReplication() { for (int i = 10; i < 15; i++) { Block block = new Block(i, i, 0); - pendingReplications.increment(block, DFSTestUtil.createDatanodeStorageInfos(i)); + pendingReplications.increment(block, + DatanodeStorageInfo.toDatanodeDescriptors( + DFSTestUtil.createDatanodeStorageInfos(i))); } assertTrue(pendingReplications.size() == 15); @@ -198,7 +201,7 @@ public void testBlockReceived() throws Exception { DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP( poolId); StorageReceivedDeletedBlocks[] report = { - new StorageReceivedDeletedBlocks(dnR.getDatanodeUuid(), + new StorageReceivedDeletedBlocks("Fake-storage-ID-Ignored", new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo( blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) }; cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report); @@ -215,7 +218,7 @@ public void testBlockReceived() throws Exception { DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP( poolId); StorageReceivedDeletedBlocks[] report = - { new StorageReceivedDeletedBlocks(dnR.getDatanodeUuid(), + { new StorageReceivedDeletedBlocks("Fake-storage-ID-Ignored", new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo( blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) }; cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);