From b008348dbf9bdd5070930be5d182116c5d370f6b Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Mon, 18 May 2015 19:06:34 -0700 Subject: [PATCH] HDFS-8418. Fix the isNeededReplication calculation for Striped block in NN. Contributed by Yi Liu. --- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 ++ .../server/blockmanagement/BlockManager.java | 54 ++++++++++++------- .../blockmanagement/DecommissionManager.java | 11 ++-- .../hdfs/server/namenode/NamenodeFsck.java | 2 +- 4 files changed, 43 insertions(+), 27 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index e016ba0679..1549930518 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -220,3 +220,6 @@ HDFS-8417. Erasure Coding: Pread failed to read data starting from not-first stripe. (Walter Su via jing9) + + HDFS-8418. Fix the isNeededReplication calculation for Striped block in NN. + (Yi Liu via jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 8b5144821e..d296aa8707 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -43,7 +43,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; @@ -85,6 +84,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.io.erasurecode.ECSchema; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; import org.apache.hadoop.net.Node; @@ -603,16 +603,7 @@ public class BlockManager { public short getMinStorageNum(BlockInfo block) { if (block.isStriped()) { - final BlockInfoStriped sblock = (BlockInfoStriped) block; - short dataBlockNum = sblock.getDataBlockNum(); - if (sblock.isComplete() || - sblock.getBlockUCState() == BlockUCState.COMMITTED) { - // if the sblock is committed/completed and its length is less than a - // full stripe, the minimum storage number needs to be adjusted - dataBlockNum = (short) Math.min(dataBlockNum, - (sblock.getNumBytes() - 1) / HdfsConstants.BLOCK_STRIPED_CELL_SIZE + 1); - } - return dataBlockNum; + return getStripedDataBlockNum(block); } else { return minReplication; } @@ -1258,7 +1249,7 @@ public class BlockManager { return; } short expectedReplicas = - b.stored.getBlockCollection().getPreferredBlockReplication(); + getExpectedReplicaNum(b.stored.getBlockCollection(), b.stored); // Add replica to the data-node if it is not already there if (storageInfo != null) { @@ -1437,7 +1428,7 @@ public class BlockManager { continue; } - requiredReplication = bc.getPreferredBlockReplication(); + requiredReplication = getExpectedReplicaNum(bc, block); // get a source data-node containingNodes = new ArrayList<>(); @@ -1537,7 +1528,7 @@ public class BlockManager { rw.targets = null; continue; } - requiredReplication = bc.getPreferredBlockReplication(); + requiredReplication = getExpectedReplicaNum(bc, block); // do not schedule more if enough replicas is already pending NumberReplicas numReplicas = countNodes(block); @@ -2539,7 +2530,7 @@ public class BlockManager { int reportedBlkIdx = BlockIdManager.getBlockIndex(reported); wrongSize = reported.getNumBytes() != getInternalBlockLength(stripedBlock.getNumBytes(), - HdfsConstants.BLOCK_STRIPED_CELL_SIZE, + BLOCK_STRIPED_CELL_SIZE, stripedBlock.getDataBlockNum(), reportedBlkIdx); } else { wrongSize = storedBlock.getNumBytes() != reported.getNumBytes(); @@ -2763,7 +2754,7 @@ public class BlockManager { } // handle underReplication/overReplication - short fileReplication = bc.getPreferredBlockReplication(); + short fileReplication = getExpectedReplicaNum(bc, storedBlock); if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) { neededReplications.remove(storedBlock, numCurrentReplica, num.decommissionedAndDecommissioning(), fileReplication); @@ -3003,7 +2994,7 @@ public class BlockManager { } // calculate current replication short expectedReplication = - block.getBlockCollection().getPreferredBlockReplication(); + getExpectedReplicaNum(block.getBlockCollection(), block); NumberReplicas num = countNodes(block); int numCurrentReplica = num.liveReplicas(); // add to under-replicated queue if need to be @@ -3638,8 +3629,8 @@ public class BlockManager { * process it as an over replicated block. */ public void checkReplication(BlockCollection bc) { - final short expected = bc.getPreferredBlockReplication(); for (BlockInfo block : bc.getBlocks()) { + short expected = getExpectedReplicaNum(bc, block); final NumberReplicas n = countNodes(block); if (isNeededReplication(block, expected, n.liveReplicas())) { neededReplications.add(block, n.liveReplicas(), @@ -3674,9 +3665,9 @@ public class BlockManager { * @return 0 if the block is not found; * otherwise, return the replication factor of the block. */ - private int getReplication(Block block) { + private int getReplication(BlockInfo block) { final BlockCollection bc = blocksMap.getBlockCollection(block); - return bc == null? 0: bc.getPreferredBlockReplication(); + return bc == null? 0: getExpectedReplicaNum(bc, block); } @@ -3759,6 +3750,29 @@ public class BlockManager { return current < expected || !blockHasEnoughRacks(storedBlock, expected); } + public short getExpectedReplicaNum(BlockCollection bc, BlockInfo block) { + if (block.isStriped()) { + return (short) (getStripedDataBlockNum(block) + + ((BlockInfoStriped) block).getParityBlockNum()); + } else { + return bc.getPreferredBlockReplication(); + } + } + + short getStripedDataBlockNum(BlockInfo block) { + assert block.isStriped(); + final BlockInfoStriped sblock = (BlockInfoStriped) block; + short dataBlockNum = sblock.getDataBlockNum(); + if (sblock.isComplete() || + sblock.getBlockUCState() == BlockUCState.COMMITTED) { + // if the sblock is committed/completed and its length is less than a + // full stripe, the minimum storage number needs to be adjusted + dataBlockNum = (short) Math.min(dataBlockNum, + (sblock.getNumBytes() - 1) / BLOCK_STRIPED_CELL_SIZE + 1); + } + return dataBlockNum; + } + public long getMissingBlocksCount() { // not locking return this.neededReplications.getCorruptBlockSize(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index 37ce8e34bc..b1cc9bc386 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -36,7 +36,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.util.CyclicIteration; @@ -251,7 +250,7 @@ public class DecommissionManager { */ private boolean isSufficient(BlockInfo block, BlockCollection bc, NumberReplicas numberReplicas) { - final int numExpected = bc.getPreferredBlockReplication(); + final int numExpected = blockManager.getExpectedReplicaNum(bc, block); final int numLive = numberReplicas.liveReplicas(); if (!blockManager.isNeededReplication(block, numExpected, numLive)) { // Block doesn't need replication. Skip. @@ -285,11 +284,11 @@ public class DecommissionManager { return false; } - private static void logBlockReplicationInfo(Block block, BlockCollection bc, + private void logBlockReplicationInfo(BlockInfo block, BlockCollection bc, DatanodeDescriptor srcNode, NumberReplicas num, Iterable storages) { int curReplicas = num.liveReplicas(); - int curExpectedReplicas = bc.getPreferredBlockReplication(); + int curExpectedReplicas = blockManager.getExpectedReplicaNum(bc, block); StringBuilder nodeList = new StringBuilder(); for (DatanodeStorageInfo storage : storages) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); @@ -561,7 +560,7 @@ public class DecommissionManager { // Schedule under-replicated blocks for replication if not already // pending if (blockManager.isNeededReplication(block, - bc.getPreferredBlockReplication(), liveReplicas)) { + blockManager.getExpectedReplicaNum(bc, block), liveReplicas)) { if (!blockManager.neededReplications.contains(block) && blockManager.pendingReplications.getNumReplicas(block) == 0 && namesystem.isPopulatingReplQueues()) { @@ -569,7 +568,7 @@ public class DecommissionManager { blockManager.neededReplications.add(block, liveReplicas, num.decommissionedAndDecommissioning(), - bc.getPreferredBlockReplication()); + blockManager.getExpectedReplicaNum(bc, block)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 24a38e5e16..fccef1795c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -256,7 +256,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { out.println("Block Id: " + blockId); out.println("Block belongs to: "+iNode.getFullPathName()); out.println("No. of Expected Replica: " + - bc.getPreferredBlockReplication()); + bm.getExpectedReplicaNum(bc, blockInfo)); out.println("No. of live Replica: " + numberReplicas.liveReplicas()); out.println("No. of excess Replica: " + numberReplicas.excessReplicas()); out.println("No. of stale Replica: " +