HDFS-8461. Erasure coding: fix priority level of UnderReplicatedBlocks for striped block. Contributed by Walter Su.
This commit is contained in:
parent
ee01a09500
commit
2c494a8436
@ -329,3 +329,6 @@
|
|||||||
|
|
||||||
HDFS-8684. Erasure Coding: fix some block number calculation for striped
|
HDFS-8684. Erasure Coding: fix some block number calculation for striped
|
||||||
block. (yliu)
|
block. (yliu)
|
||||||
|
|
||||||
|
HDFS-8461. Erasure coding: fix priority level of UnderReplicatedBlocks for
|
||||||
|
striped block. (Walter Su via jing9)
|
||||||
|
@ -34,7 +34,7 @@
|
|||||||
*
|
*
|
||||||
* <p/>
|
* <p/>
|
||||||
* The policy for choosing which priority to give added blocks
|
* The policy for choosing which priority to give added blocks
|
||||||
* is implemented in {@link #getPriority(int, int, int)}.
|
* is implemented in {@link #getPriority(BlockInfo, int, int, int)}.
|
||||||
* </p>
|
* </p>
|
||||||
* <p>The queue order is as follows:</p>
|
* <p>The queue order is as follows:</p>
|
||||||
* <ol>
|
* <ol>
|
||||||
@ -144,14 +144,28 @@ synchronized boolean contains(BlockInfo block) {
|
|||||||
* @param expectedReplicas expected number of replicas of the block
|
* @param expectedReplicas expected number of replicas of the block
|
||||||
* @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
|
* @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
|
||||||
*/
|
*/
|
||||||
private int getPriority(int curReplicas,
|
private int getPriority(BlockInfo block,
|
||||||
|
int curReplicas,
|
||||||
int decommissionedReplicas,
|
int decommissionedReplicas,
|
||||||
int expectedReplicas) {
|
int expectedReplicas) {
|
||||||
assert curReplicas >= 0 : "Negative replicas!";
|
assert curReplicas >= 0 : "Negative replicas!";
|
||||||
if (curReplicas >= expectedReplicas) {
|
if (curReplicas >= expectedReplicas) {
|
||||||
// Block has enough copies, but not enough racks
|
// Block has enough copies, but not enough racks
|
||||||
return QUEUE_REPLICAS_BADLY_DISTRIBUTED;
|
return QUEUE_REPLICAS_BADLY_DISTRIBUTED;
|
||||||
} else if (curReplicas == 0) {
|
}
|
||||||
|
if (block.isStriped()) {
|
||||||
|
BlockInfoStriped sblk = (BlockInfoStriped) block;
|
||||||
|
return getPriorityStriped(curReplicas, decommissionedReplicas,
|
||||||
|
sblk.getRealDataBlockNum(), sblk.getParityBlockNum());
|
||||||
|
} else {
|
||||||
|
return getPriorityContiguous(curReplicas, decommissionedReplicas,
|
||||||
|
expectedReplicas);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getPriorityContiguous(int curReplicas, int decommissionedReplicas,
|
||||||
|
int expectedReplicas) {
|
||||||
|
if (curReplicas == 0) {
|
||||||
// If there are zero non-decommissioned replicas but there are
|
// If there are zero non-decommissioned replicas but there are
|
||||||
// some decommissioned replicas, then assign them highest priority
|
// some decommissioned replicas, then assign them highest priority
|
||||||
if (decommissionedReplicas > 0) {
|
if (decommissionedReplicas > 0) {
|
||||||
@ -160,7 +174,7 @@ private int getPriority(int curReplicas,
|
|||||||
//all we have are corrupt blocks
|
//all we have are corrupt blocks
|
||||||
return QUEUE_WITH_CORRUPT_BLOCKS;
|
return QUEUE_WITH_CORRUPT_BLOCKS;
|
||||||
} else if (curReplicas == 1) {
|
} else if (curReplicas == 1) {
|
||||||
//only on replica -risk of loss
|
// only one replica, highest risk of loss
|
||||||
// highest priority
|
// highest priority
|
||||||
return QUEUE_HIGHEST_PRIORITY;
|
return QUEUE_HIGHEST_PRIORITY;
|
||||||
} else if ((curReplicas * 3) < expectedReplicas) {
|
} else if ((curReplicas * 3) < expectedReplicas) {
|
||||||
@ -173,6 +187,27 @@ private int getPriority(int curReplicas,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int getPriorityStriped(int curReplicas, int decommissionedReplicas,
|
||||||
|
short dataBlkNum, short parityBlkNum) {
|
||||||
|
if (curReplicas < dataBlkNum) {
|
||||||
|
// There are some replicas on decommissioned nodes so it's not corrupted
|
||||||
|
if (curReplicas + decommissionedReplicas >= dataBlkNum) {
|
||||||
|
return QUEUE_HIGHEST_PRIORITY;
|
||||||
|
}
|
||||||
|
return QUEUE_WITH_CORRUPT_BLOCKS;
|
||||||
|
} else if (curReplicas == dataBlkNum) {
|
||||||
|
// highest risk of loss, highest priority
|
||||||
|
return QUEUE_HIGHEST_PRIORITY;
|
||||||
|
} else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) {
|
||||||
|
// can only afford one replica loss
|
||||||
|
// this is considered very under-replicated
|
||||||
|
return QUEUE_VERY_UNDER_REPLICATED;
|
||||||
|
} else {
|
||||||
|
// add to the normal queue for under replicated blocks
|
||||||
|
return QUEUE_UNDER_REPLICATED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** add a block to a under replication queue according to its priority
|
/** add a block to a under replication queue according to its priority
|
||||||
* @param block a under replication block
|
* @param block a under replication block
|
||||||
* @param curReplicas current number of replicas of the block
|
* @param curReplicas current number of replicas of the block
|
||||||
@ -185,7 +220,7 @@ synchronized boolean add(BlockInfo block,
|
|||||||
int decomissionedReplicas,
|
int decomissionedReplicas,
|
||||||
int expectedReplicas) {
|
int expectedReplicas) {
|
||||||
assert curReplicas >= 0 : "Negative replicas!";
|
assert curReplicas >= 0 : "Negative replicas!";
|
||||||
int priLevel = getPriority(curReplicas, decomissionedReplicas,
|
int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
|
||||||
expectedReplicas);
|
expectedReplicas);
|
||||||
if(priorityQueues.get(priLevel).add(block)) {
|
if(priorityQueues.get(priLevel).add(block)) {
|
||||||
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
|
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
|
||||||
@ -208,7 +243,7 @@ synchronized boolean remove(BlockInfo block,
|
|||||||
int oldReplicas,
|
int oldReplicas,
|
||||||
int decommissionedReplicas,
|
int decommissionedReplicas,
|
||||||
int oldExpectedReplicas) {
|
int oldExpectedReplicas) {
|
||||||
int priLevel = getPriority(oldReplicas,
|
int priLevel = getPriority(block, oldReplicas,
|
||||||
decommissionedReplicas,
|
decommissionedReplicas,
|
||||||
oldExpectedReplicas);
|
oldExpectedReplicas);
|
||||||
boolean removedBlock = remove(block, priLevel);
|
boolean removedBlock = remove(block, priLevel);
|
||||||
@ -282,8 +317,10 @@ synchronized void update(BlockInfo block, int curReplicas,
|
|||||||
int curReplicasDelta, int expectedReplicasDelta) {
|
int curReplicasDelta, int expectedReplicasDelta) {
|
||||||
int oldReplicas = curReplicas-curReplicasDelta;
|
int oldReplicas = curReplicas-curReplicasDelta;
|
||||||
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
|
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
|
||||||
int curPri = getPriority(curReplicas, decommissionedReplicas, curExpectedReplicas);
|
int curPri = getPriority(block, curReplicas, decommissionedReplicas,
|
||||||
int oldPri = getPriority(oldReplicas, decommissionedReplicas, oldExpectedReplicas);
|
curExpectedReplicas);
|
||||||
|
int oldPri = getPriority(block, oldReplicas, decommissionedReplicas,
|
||||||
|
oldExpectedReplicas);
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " +
|
NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " +
|
||||||
block +
|
block +
|
||||||
|
@ -306,4 +306,12 @@ public static void recheckDecommissionState(DatanodeManager dm)
|
|||||||
throws ExecutionException, InterruptedException {
|
throws ExecutionException, InterruptedException {
|
||||||
dm.getDecomManager().runMonitor();
|
dm.getDecomManager().runMonitor();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* add block to the replicateBlocks queue of the Datanode
|
||||||
|
*/
|
||||||
|
public static void addBlockToBeReplicated(DatanodeDescriptor node,
|
||||||
|
Block block, DatanodeStorageInfo[] targets) {
|
||||||
|
node.addBlockToBeReplicated(block, targets);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,9 @@
|
|||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
@ -28,10 +31,21 @@
|
|||||||
|
|
||||||
public class TestUnderReplicatedBlockQueues {
|
public class TestUnderReplicatedBlockQueues {
|
||||||
|
|
||||||
|
private final ECSchema ecSchema =
|
||||||
|
ErasureCodingSchemaManager.getSystemDefaultSchema();
|
||||||
|
private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
|
||||||
|
|
||||||
private BlockInfo genBlockInfo(long id) {
|
private BlockInfo genBlockInfo(long id) {
|
||||||
return new BlockInfoContiguous(new Block(id), (short) 3);
|
return new BlockInfoContiguous(new Block(id), (short) 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private BlockInfo genStripedBlockInfo(long id, long numBytes) {
|
||||||
|
BlockInfoStriped sblk = new BlockInfoStriped(new Block(id), ecSchema,
|
||||||
|
CELLSIZE);
|
||||||
|
sblk.setNumBytes(numBytes);
|
||||||
|
return sblk;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that adding blocks with different replication counts puts them
|
* Test that adding blocks with different replication counts puts them
|
||||||
* into different queues
|
* into different queues
|
||||||
@ -85,6 +99,54 @@ public void testBlockPriorities() throws Throwable {
|
|||||||
assertEquals(2, queues.getCorruptReplOneBlockSize());
|
assertEquals(2, queues.getCorruptReplOneBlockSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStripedBlockPriorities() throws Throwable {
|
||||||
|
int dataBlkNum = ecSchema.getNumDataUnits();
|
||||||
|
int parityBlkNUm = ecSchema.getNumParityUnits();
|
||||||
|
doTestStripedBlockPriorities(1, parityBlkNUm);
|
||||||
|
doTestStripedBlockPriorities(dataBlkNum, parityBlkNUm);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doTestStripedBlockPriorities(int dataBlkNum, int parityBlkNum)
|
||||||
|
throws Throwable {
|
||||||
|
int groupSize = dataBlkNum + parityBlkNum;
|
||||||
|
long numBytes = CELLSIZE * dataBlkNum;
|
||||||
|
UnderReplicatedBlocks queues = new UnderReplicatedBlocks();
|
||||||
|
|
||||||
|
// add a striped block which been left NUM_DATA_BLOCKS internal blocks
|
||||||
|
BlockInfo block1 = genStripedBlockInfo(-100, numBytes);
|
||||||
|
assertAdded(queues, block1, dataBlkNum, 0, groupSize);
|
||||||
|
assertEquals(1, queues.getUnderReplicatedBlockCount());
|
||||||
|
assertEquals(1, queues.size());
|
||||||
|
assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
|
||||||
|
|
||||||
|
// add a striped block which been left NUM_DATA_BLOCKS+1 internal blocks
|
||||||
|
BlockInfo block2 = genStripedBlockInfo(-200, numBytes);
|
||||||
|
assertAdded(queues, block2, dataBlkNum + 1, 0, groupSize);
|
||||||
|
assertEquals(2, queues.getUnderReplicatedBlockCount());
|
||||||
|
assertEquals(2, queues.size());
|
||||||
|
assertInLevel(queues, block2,
|
||||||
|
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED);
|
||||||
|
|
||||||
|
// add a striped block which been left NUM_DATA_BLOCKS+2 internal blocks
|
||||||
|
BlockInfo block3 = genStripedBlockInfo(-300, numBytes);
|
||||||
|
assertAdded(queues, block3, dataBlkNum + 2, 0, groupSize);
|
||||||
|
assertEquals(3, queues.getUnderReplicatedBlockCount());
|
||||||
|
assertEquals(3, queues.size());
|
||||||
|
assertInLevel(queues, block3,
|
||||||
|
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED);
|
||||||
|
|
||||||
|
// add a corrupted block
|
||||||
|
BlockInfo block_corrupt = genStripedBlockInfo(-400, numBytes);
|
||||||
|
assertEquals(0, queues.getCorruptBlockSize());
|
||||||
|
assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize);
|
||||||
|
assertEquals(4, queues.size());
|
||||||
|
assertEquals(3, queues.getUnderReplicatedBlockCount());
|
||||||
|
assertEquals(1, queues.getCorruptBlockSize());
|
||||||
|
assertInLevel(queues, block_corrupt,
|
||||||
|
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
|
||||||
|
}
|
||||||
|
|
||||||
private void assertAdded(UnderReplicatedBlocks queues,
|
private void assertAdded(UnderReplicatedBlocks queues,
|
||||||
BlockInfo block,
|
BlockInfo block,
|
||||||
int curReplicas,
|
int curReplicas,
|
||||||
|
@ -23,7 +23,7 @@
|
|||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
@ -32,29 +32,26 @@
|
|||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
|
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
|
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
|
||||||
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
|
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
|
||||||
|
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class TestRecoverStripedBlocks {
|
public class TestRecoverStripedBlocks {
|
||||||
private final short GROUP_SIZE =
|
private final short GROUP_SIZE =
|
||||||
NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
|
NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
private final Path dirPath = new Path("/dir");
|
private final Path dirPath = new Path("/dir");
|
||||||
private Path filePath = new Path(dirPath, "file");
|
private Path filePath = new Path(dirPath, "file");
|
||||||
|
private int maxReplicationStreams =
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT;
|
||||||
|
|
||||||
@Before
|
private void initConf(Configuration conf) {
|
||||||
public void setup() throws IOException {
|
|
||||||
final Configuration conf = new HdfsConfiguration();
|
|
||||||
// Large value to make sure the pending replication request can stay in
|
// Large value to make sure the pending replication request can stay in
|
||||||
// DatanodeDescriptor.replicateBlocks before test timeout.
|
// DatanodeDescriptor.replicateBlocks before test timeout.
|
||||||
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
|
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
|
||||||
@ -62,63 +59,111 @@ public void setup() throws IOException {
|
|||||||
// chooseUnderReplicatedBlocks at once.
|
// chooseUnderReplicatedBlocks at once.
|
||||||
conf.setInt(
|
conf.setInt(
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5);
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5);
|
||||||
|
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 1)
|
|
||||||
.build();
|
|
||||||
cluster.waitActive();
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() throws Exception {
|
|
||||||
if (cluster != null) {
|
|
||||||
cluster.shutdown();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMissingStripedBlock() throws Exception {
|
public void testMissingStripedBlock() throws Exception {
|
||||||
final int numBlocks = 4;
|
doTestMissingStripedBlock(1, 0);
|
||||||
DFSTestUtil.createStripedFile(cluster, filePath,
|
}
|
||||||
dirPath, numBlocks, 1, true);
|
|
||||||
|
|
||||||
// make sure the file is complete in NN
|
@Test
|
||||||
final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
|
public void testMissingStripedBlockWithBusyNode1() throws Exception {
|
||||||
.getINode4Write(filePath.toString()).asFile();
|
doTestMissingStripedBlock(2, 1);
|
||||||
assertFalse(fileNode.isUnderConstruction());
|
}
|
||||||
assertTrue(fileNode.isStriped());
|
|
||||||
BlockInfo[] blocks = fileNode.getBlocks();
|
|
||||||
assertEquals(numBlocks, blocks.length);
|
|
||||||
for (BlockInfo blk : blocks) {
|
|
||||||
assertTrue(blk.isStriped());
|
|
||||||
assertTrue(blk.isComplete());
|
|
||||||
assertEquals(BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes());
|
|
||||||
final BlockInfoStriped sb = (BlockInfoStriped) blk;
|
|
||||||
assertEquals(GROUP_SIZE, sb.numNodes());
|
|
||||||
}
|
|
||||||
|
|
||||||
final BlockManager bm = cluster.getNamesystem().getBlockManager();
|
@Test
|
||||||
BlockInfo firstBlock = fileNode.getBlocks()[0];
|
public void testMissingStripedBlockWithBusyNode2() throws Exception {
|
||||||
DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock);
|
doTestMissingStripedBlock(3, 1);
|
||||||
|
}
|
||||||
|
|
||||||
DatanodeDescriptor secondDn = storageInfos[1].getDatanodeDescriptor();
|
/**
|
||||||
assertEquals(numBlocks, secondDn.numBlocks());
|
* Start GROUP_SIZE + 1 datanodes.
|
||||||
|
* Inject striped blocks to first GROUP_SIZE datanodes.
|
||||||
|
* Then make numOfBusy datanodes busy, make numOfMissed datanodes missed.
|
||||||
|
* Then trigger BlockManager to compute recovery works. (so all recovery work
|
||||||
|
* will be scheduled to the last datanode)
|
||||||
|
* Finally, verify the recovery work of the last datanode.
|
||||||
|
*/
|
||||||
|
private void doTestMissingStripedBlock(int numOfMissed, int numOfBusy)
|
||||||
|
throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
initConf(conf);
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 1)
|
||||||
|
.build();
|
||||||
|
|
||||||
bm.getDatanodeManager().removeDatanode(secondDn);
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
final int numBlocks = 4;
|
||||||
|
DFSTestUtil.createStripedFile(cluster, filePath,
|
||||||
|
dirPath, numBlocks, 1, true);
|
||||||
|
// all blocks will be located at first GROUP_SIZE DNs, the last DN is
|
||||||
|
// empty because of the util function createStripedFile
|
||||||
|
|
||||||
BlockManagerTestUtil.getComputedDatanodeWork(bm);
|
// make sure the file is complete in NN
|
||||||
|
final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
|
||||||
|
.getINode4Write(filePath.toString()).asFile();
|
||||||
|
assertFalse(fileNode.isUnderConstruction());
|
||||||
|
assertTrue(fileNode.isStriped());
|
||||||
|
BlockInfo[] blocks = fileNode.getBlocks();
|
||||||
|
assertEquals(numBlocks, blocks.length);
|
||||||
|
for (BlockInfo blk : blocks) {
|
||||||
|
assertTrue(blk.isStriped());
|
||||||
|
assertTrue(blk.isComplete());
|
||||||
|
assertEquals(BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS,
|
||||||
|
blk.getNumBytes());
|
||||||
|
final BlockInfoStriped sb = (BlockInfoStriped) blk;
|
||||||
|
assertEquals(GROUP_SIZE, sb.numNodes());
|
||||||
|
}
|
||||||
|
|
||||||
// all the recovery work will be scheduled on the last DN
|
final BlockManager bm = cluster.getNamesystem().getBlockManager();
|
||||||
DataNode lastDn = cluster.getDataNodes().get(GROUP_SIZE);
|
BlockInfo firstBlock = fileNode.getBlocks()[0];
|
||||||
DatanodeDescriptor last =
|
DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock);
|
||||||
|
|
||||||
|
// make numOfBusy nodes busy
|
||||||
|
int i = 0;
|
||||||
|
for (; i < numOfBusy; i++) {
|
||||||
|
DatanodeDescriptor busyNode = storageInfos[i].getDatanodeDescriptor();
|
||||||
|
for (int j = 0; j < maxReplicationStreams + 1; j++) {
|
||||||
|
BlockManagerTestUtil.addBlockToBeReplicated(busyNode, new Block(j),
|
||||||
|
new DatanodeStorageInfo[]{storageInfos[0]});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// make numOfMissed internal blocks missed
|
||||||
|
for (; i < numOfBusy + numOfMissed; i++) {
|
||||||
|
DatanodeDescriptor missedNode = storageInfos[i].getDatanodeDescriptor();
|
||||||
|
assertEquals(numBlocks, missedNode.numBlocks());
|
||||||
|
bm.getDatanodeManager().removeDatanode(missedNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockManagerTestUtil.getComputedDatanodeWork(bm);
|
||||||
|
|
||||||
|
// all the recovery work will be scheduled on the last DN
|
||||||
|
DataNode lastDn = cluster.getDataNodes().get(GROUP_SIZE);
|
||||||
|
DatanodeDescriptor last =
|
||||||
bm.getDatanodeManager().getDatanode(lastDn.getDatanodeId());
|
bm.getDatanodeManager().getDatanode(lastDn.getDatanodeId());
|
||||||
assertEquals("Counting the number of outstanding EC tasks", numBlocks,
|
assertEquals("Counting the number of outstanding EC tasks", numBlocks,
|
||||||
last.getNumberOfBlocksToBeErasureCoded());
|
last.getNumberOfBlocksToBeErasureCoded());
|
||||||
List<BlockECRecoveryInfo> recovery = last.getErasureCodeCommand(numBlocks);
|
List<BlockECRecoveryInfo> recovery =
|
||||||
for (BlockECRecoveryInfo info : recovery) {
|
last.getErasureCodeCommand(numBlocks);
|
||||||
assertEquals(1, info.getTargetDnInfos().length);
|
for (BlockECRecoveryInfo info : recovery) {
|
||||||
assertEquals(last, info.getTargetDnInfos()[0]);
|
assertEquals(1, info.getTargetDnInfos().length);
|
||||||
assertEquals(GROUP_SIZE - 1, info.getSourceDnInfos().length);
|
assertEquals(last, info.getTargetDnInfos()[0]);
|
||||||
assertEquals(GROUP_SIZE - 1, info.getLiveBlockIndices().length);
|
assertEquals(info.getSourceDnInfos().length,
|
||||||
|
info.getLiveBlockIndices().length);
|
||||||
|
if (GROUP_SIZE - numOfMissed == NUM_DATA_BLOCKS) {
|
||||||
|
// It's a QUEUE_HIGHEST_PRIORITY block, so the busy DNs will be chosen
|
||||||
|
// to make sure we have NUM_DATA_BLOCKS DNs to do recovery work.
|
||||||
|
assertEquals(NUM_DATA_BLOCKS, info.getSourceDnInfos().length);
|
||||||
|
} else {
|
||||||
|
// The block has no highest priority, so we don't use the busy DNs as
|
||||||
|
// sources
|
||||||
|
assertEquals(GROUP_SIZE - numOfMissed - numOfBusy,
|
||||||
|
info.getSourceDnInfos().length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user