diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 8f720fc558..58b91b60b7 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -329,3 +329,6 @@
HDFS-8684. Erasure Coding: fix some block number calculation for striped
block. (yliu)
+
+ HDFS-8461. Erasure coding: fix priority level of UnderReplicatedBlocks for
+ striped block. (Walter Su via jing9)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
index f9bce268c4..47afb05e9f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
@@ -34,7 +34,7 @@
*
*
* The policy for choosing which priority to give added blocks
- * is implemented in {@link #getPriority(int, int, int)}.
+ * is implemented in {@link #getPriority(BlockInfo, int, int, int)}.
*
* The queue order is as follows:
*
@@ -144,14 +144,28 @@ synchronized boolean contains(BlockInfo block) {
* @param expectedReplicas expected number of replicas of the block
* @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
*/
- private int getPriority(int curReplicas,
+ private int getPriority(BlockInfo block,
+ int curReplicas,
int decommissionedReplicas,
int expectedReplicas) {
assert curReplicas >= 0 : "Negative replicas!";
if (curReplicas >= expectedReplicas) {
// Block has enough copies, but not enough racks
return QUEUE_REPLICAS_BADLY_DISTRIBUTED;
- } else if (curReplicas == 0) {
+ }
+ if (block.isStriped()) {
+ BlockInfoStriped sblk = (BlockInfoStriped) block;
+ return getPriorityStriped(curReplicas, decommissionedReplicas,
+ sblk.getRealDataBlockNum(), sblk.getParityBlockNum());
+ } else {
+ return getPriorityContiguous(curReplicas, decommissionedReplicas,
+ expectedReplicas);
+ }
+ }
+
+ private int getPriorityContiguous(int curReplicas, int decommissionedReplicas,
+ int expectedReplicas) {
+ if (curReplicas == 0) {
// If there are zero non-decommissioned replicas but there are
// some decommissioned replicas, then assign them highest priority
if (decommissionedReplicas > 0) {
@@ -160,7 +174,7 @@ private int getPriority(int curReplicas,
//all we have are corrupt blocks
return QUEUE_WITH_CORRUPT_BLOCKS;
} else if (curReplicas == 1) {
- //only on replica -risk of loss
+ // only one replica, highest risk of loss
// highest priority
return QUEUE_HIGHEST_PRIORITY;
} else if ((curReplicas * 3) < expectedReplicas) {
@@ -173,6 +187,27 @@ private int getPriority(int curReplicas,
}
}
+ private int getPriorityStriped(int curReplicas, int decommissionedReplicas,
+ short dataBlkNum, short parityBlkNum) {
+ if (curReplicas < dataBlkNum) {
+ // There are some replicas on decommissioned nodes so it's not corrupted
+ if (curReplicas + decommissionedReplicas >= dataBlkNum) {
+ return QUEUE_HIGHEST_PRIORITY;
+ }
+ return QUEUE_WITH_CORRUPT_BLOCKS;
+ } else if (curReplicas == dataBlkNum) {
+ // highest risk of loss, highest priority
+ return QUEUE_HIGHEST_PRIORITY;
+ } else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) {
+ // can only afford one replica loss
+ // this is considered very under-replicated
+ return QUEUE_VERY_UNDER_REPLICATED;
+ } else {
+ // add to the normal queue for under replicated blocks
+ return QUEUE_UNDER_REPLICATED;
+ }
+ }
+
/** add a block to a under replication queue according to its priority
* @param block a under replication block
* @param curReplicas current number of replicas of the block
@@ -185,7 +220,7 @@ synchronized boolean add(BlockInfo block,
int decomissionedReplicas,
int expectedReplicas) {
assert curReplicas >= 0 : "Negative replicas!";
- int priLevel = getPriority(curReplicas, decomissionedReplicas,
+ int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
expectedReplicas);
if(priorityQueues.get(priLevel).add(block)) {
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
@@ -208,7 +243,7 @@ synchronized boolean remove(BlockInfo block,
int oldReplicas,
int decommissionedReplicas,
int oldExpectedReplicas) {
- int priLevel = getPriority(oldReplicas,
+ int priLevel = getPriority(block, oldReplicas,
decommissionedReplicas,
oldExpectedReplicas);
boolean removedBlock = remove(block, priLevel);
@@ -282,8 +317,10 @@ synchronized void update(BlockInfo block, int curReplicas,
int curReplicasDelta, int expectedReplicasDelta) {
int oldReplicas = curReplicas-curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
- int curPri = getPriority(curReplicas, decommissionedReplicas, curExpectedReplicas);
- int oldPri = getPriority(oldReplicas, decommissionedReplicas, oldExpectedReplicas);
+ int curPri = getPriority(block, curReplicas, decommissionedReplicas,
+ curExpectedReplicas);
+ int oldPri = getPriority(block, oldReplicas, decommissionedReplicas,
+ oldExpectedReplicas);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " +
block +
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index e25ee31643..64d80bdbe2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -306,4 +306,12 @@ public static void recheckDecommissionState(DatanodeManager dm)
throws ExecutionException, InterruptedException {
dm.getDecomManager().runMonitor();
}
+
+ /**
+ * add block to the replicateBlocks queue of the Datanode
+ */
+ public static void addBlockToBeReplicated(DatanodeDescriptor node,
+ Block block, DatanodeStorageInfo[] targets) {
+ node.addBlockToBeReplicated(block, targets);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
index de36e077ae..0f419ef550 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
@@ -19,6 +19,9 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
+import org.apache.hadoop.io.erasurecode.ECSchema;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -28,10 +31,21 @@
public class TestUnderReplicatedBlockQueues {
+ private final ECSchema ecSchema =
+ ErasureCodingSchemaManager.getSystemDefaultSchema();
+ private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+
private BlockInfo genBlockInfo(long id) {
return new BlockInfoContiguous(new Block(id), (short) 3);
}
+ private BlockInfo genStripedBlockInfo(long id, long numBytes) {
+ BlockInfoStriped sblk = new BlockInfoStriped(new Block(id), ecSchema,
+ CELLSIZE);
+ sblk.setNumBytes(numBytes);
+ return sblk;
+ }
+
/**
* Test that adding blocks with different replication counts puts them
* into different queues
@@ -85,6 +99,54 @@ public void testBlockPriorities() throws Throwable {
assertEquals(2, queues.getCorruptReplOneBlockSize());
}
+ @Test
+ public void testStripedBlockPriorities() throws Throwable {
+ int dataBlkNum = ecSchema.getNumDataUnits();
+ int parityBlkNUm = ecSchema.getNumParityUnits();
+ doTestStripedBlockPriorities(1, parityBlkNUm);
+ doTestStripedBlockPriorities(dataBlkNum, parityBlkNUm);
+ }
+
+ private void doTestStripedBlockPriorities(int dataBlkNum, int parityBlkNum)
+ throws Throwable {
+ int groupSize = dataBlkNum + parityBlkNum;
+ long numBytes = CELLSIZE * dataBlkNum;
+ UnderReplicatedBlocks queues = new UnderReplicatedBlocks();
+
+ // add a striped block which been left NUM_DATA_BLOCKS internal blocks
+ BlockInfo block1 = genStripedBlockInfo(-100, numBytes);
+ assertAdded(queues, block1, dataBlkNum, 0, groupSize);
+ assertEquals(1, queues.getUnderReplicatedBlockCount());
+ assertEquals(1, queues.size());
+ assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
+
+ // add a striped block which been left NUM_DATA_BLOCKS+1 internal blocks
+ BlockInfo block2 = genStripedBlockInfo(-200, numBytes);
+ assertAdded(queues, block2, dataBlkNum + 1, 0, groupSize);
+ assertEquals(2, queues.getUnderReplicatedBlockCount());
+ assertEquals(2, queues.size());
+ assertInLevel(queues, block2,
+ UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED);
+
+ // add a striped block which been left NUM_DATA_BLOCKS+2 internal blocks
+ BlockInfo block3 = genStripedBlockInfo(-300, numBytes);
+ assertAdded(queues, block3, dataBlkNum + 2, 0, groupSize);
+ assertEquals(3, queues.getUnderReplicatedBlockCount());
+ assertEquals(3, queues.size());
+ assertInLevel(queues, block3,
+ UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED);
+
+ // add a corrupted block
+ BlockInfo block_corrupt = genStripedBlockInfo(-400, numBytes);
+ assertEquals(0, queues.getCorruptBlockSize());
+ assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize);
+ assertEquals(4, queues.size());
+ assertEquals(3, queues.getUnderReplicatedBlockCount());
+ assertEquals(1, queues.getCorruptBlockSize());
+ assertInLevel(queues, block_corrupt,
+ UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
+ }
+
private void assertAdded(UnderReplicatedBlocks queues,
BlockInfo block,
int curReplicas,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
index ca4fbbc99c..3134373dab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
@@ -23,7 +23,7 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -32,29 +32,26 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
-
-import java.io.IOException;
import java.util.List;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestRecoverStripedBlocks {
private final short GROUP_SIZE =
- NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
+ NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
private MiniDFSCluster cluster;
private final Path dirPath = new Path("/dir");
private Path filePath = new Path(dirPath, "file");
+ private int maxReplicationStreams =
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT;
- @Before
- public void setup() throws IOException {
- final Configuration conf = new HdfsConfiguration();
+ private void initConf(Configuration conf) {
// Large value to make sure the pending replication request can stay in
// DatanodeDescriptor.replicateBlocks before test timeout.
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
@@ -62,63 +59,111 @@ public void setup() throws IOException {
// chooseUnderReplicatedBlocks at once.
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5);
-
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 1)
- .build();
- cluster.waitActive();
- }
-
- @After
- public void tearDown() throws Exception {
- if (cluster != null) {
- cluster.shutdown();
- }
}
@Test
public void testMissingStripedBlock() throws Exception {
- final int numBlocks = 4;
- DFSTestUtil.createStripedFile(cluster, filePath,
- dirPath, numBlocks, 1, true);
+ doTestMissingStripedBlock(1, 0);
+ }
- // make sure the file is complete in NN
- final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
- .getINode4Write(filePath.toString()).asFile();
- assertFalse(fileNode.isUnderConstruction());
- assertTrue(fileNode.isStriped());
- BlockInfo[] blocks = fileNode.getBlocks();
- assertEquals(numBlocks, blocks.length);
- for (BlockInfo blk : blocks) {
- assertTrue(blk.isStriped());
- assertTrue(blk.isComplete());
- assertEquals(BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes());
- final BlockInfoStriped sb = (BlockInfoStriped) blk;
- assertEquals(GROUP_SIZE, sb.numNodes());
- }
+ @Test
+ public void testMissingStripedBlockWithBusyNode1() throws Exception {
+ doTestMissingStripedBlock(2, 1);
+ }
- final BlockManager bm = cluster.getNamesystem().getBlockManager();
- BlockInfo firstBlock = fileNode.getBlocks()[0];
- DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock);
+ @Test
+ public void testMissingStripedBlockWithBusyNode2() throws Exception {
+ doTestMissingStripedBlock(3, 1);
+ }
- DatanodeDescriptor secondDn = storageInfos[1].getDatanodeDescriptor();
- assertEquals(numBlocks, secondDn.numBlocks());
+ /**
+ * Start GROUP_SIZE + 1 datanodes.
+ * Inject striped blocks to first GROUP_SIZE datanodes.
+ * Then make numOfBusy datanodes busy, make numOfMissed datanodes missed.
+ * Then trigger BlockManager to compute recovery works. (so all recovery work
+ * will be scheduled to the last datanode)
+ * Finally, verify the recovery work of the last datanode.
+ */
+ private void doTestMissingStripedBlock(int numOfMissed, int numOfBusy)
+ throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 1)
+ .build();
- bm.getDatanodeManager().removeDatanode(secondDn);
+ try {
+ cluster.waitActive();
+ final int numBlocks = 4;
+ DFSTestUtil.createStripedFile(cluster, filePath,
+ dirPath, numBlocks, 1, true);
+ // all blocks will be located at first GROUP_SIZE DNs, the last DN is
+ // empty because of the util function createStripedFile
- BlockManagerTestUtil.getComputedDatanodeWork(bm);
+ // make sure the file is complete in NN
+ final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
+ .getINode4Write(filePath.toString()).asFile();
+ assertFalse(fileNode.isUnderConstruction());
+ assertTrue(fileNode.isStriped());
+ BlockInfo[] blocks = fileNode.getBlocks();
+ assertEquals(numBlocks, blocks.length);
+ for (BlockInfo blk : blocks) {
+ assertTrue(blk.isStriped());
+ assertTrue(blk.isComplete());
+ assertEquals(BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS,
+ blk.getNumBytes());
+ final BlockInfoStriped sb = (BlockInfoStriped) blk;
+ assertEquals(GROUP_SIZE, sb.numNodes());
+ }
- // all the recovery work will be scheduled on the last DN
- DataNode lastDn = cluster.getDataNodes().get(GROUP_SIZE);
- DatanodeDescriptor last =
+ final BlockManager bm = cluster.getNamesystem().getBlockManager();
+ BlockInfo firstBlock = fileNode.getBlocks()[0];
+ DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock);
+
+ // make numOfBusy nodes busy
+ int i = 0;
+ for (; i < numOfBusy; i++) {
+ DatanodeDescriptor busyNode = storageInfos[i].getDatanodeDescriptor();
+ for (int j = 0; j < maxReplicationStreams + 1; j++) {
+ BlockManagerTestUtil.addBlockToBeReplicated(busyNode, new Block(j),
+ new DatanodeStorageInfo[]{storageInfos[0]});
+ }
+ }
+
+ // make numOfMissed internal blocks missed
+ for (; i < numOfBusy + numOfMissed; i++) {
+ DatanodeDescriptor missedNode = storageInfos[i].getDatanodeDescriptor();
+ assertEquals(numBlocks, missedNode.numBlocks());
+ bm.getDatanodeManager().removeDatanode(missedNode);
+ }
+
+ BlockManagerTestUtil.getComputedDatanodeWork(bm);
+
+ // all the recovery work will be scheduled on the last DN
+ DataNode lastDn = cluster.getDataNodes().get(GROUP_SIZE);
+ DatanodeDescriptor last =
bm.getDatanodeManager().getDatanode(lastDn.getDatanodeId());
- assertEquals("Counting the number of outstanding EC tasks", numBlocks,
- last.getNumberOfBlocksToBeErasureCoded());
- List recovery = last.getErasureCodeCommand(numBlocks);
- for (BlockECRecoveryInfo info : recovery) {
- assertEquals(1, info.getTargetDnInfos().length);
- assertEquals(last, info.getTargetDnInfos()[0]);
- assertEquals(GROUP_SIZE - 1, info.getSourceDnInfos().length);
- assertEquals(GROUP_SIZE - 1, info.getLiveBlockIndices().length);
+ assertEquals("Counting the number of outstanding EC tasks", numBlocks,
+ last.getNumberOfBlocksToBeErasureCoded());
+ List recovery =
+ last.getErasureCodeCommand(numBlocks);
+ for (BlockECRecoveryInfo info : recovery) {
+ assertEquals(1, info.getTargetDnInfos().length);
+ assertEquals(last, info.getTargetDnInfos()[0]);
+ assertEquals(info.getSourceDnInfos().length,
+ info.getLiveBlockIndices().length);
+ if (GROUP_SIZE - numOfMissed == NUM_DATA_BLOCKS) {
+ // It's a QUEUE_HIGHEST_PRIORITY block, so the busy DNs will be chosen
+ // to make sure we have NUM_DATA_BLOCKS DNs to do recovery work.
+ assertEquals(NUM_DATA_BLOCKS, info.getSourceDnInfos().length);
+ } else {
+ // The block has no highest priority, so we don't use the busy DNs as
+ // sources
+ assertEquals(GROUP_SIZE - numOfMissed - numOfBusy,
+ info.getSourceDnInfos().length);
+ }
+ }
+ } finally {
+ cluster.shutdown();
}
}
}