HDFS-8005. Erasure Coding: simplify striped block recovery work computation and add tests. Contributed by Jing Zhao.
This commit is contained in:
parent
abf833a7b2
commit
cd655ee817
@ -541,7 +541,7 @@ private void dumpBlockMeta(Block block, PrintWriter out) {
|
|||||||
// source node returned is not used
|
// source node returned is not used
|
||||||
chooseSourceDatanodes(getStoredBlock(block), containingNodes,
|
chooseSourceDatanodes(getStoredBlock(block), containingNodes,
|
||||||
containingLiveReplicasNodes, numReplicas,
|
containingLiveReplicasNodes, numReplicas,
|
||||||
new LinkedList<Short>(), 1, UnderReplicatedBlocks.LEVEL);
|
new LinkedList<Short>(), UnderReplicatedBlocks.LEVEL);
|
||||||
|
|
||||||
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
|
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
|
||||||
// not included in the numReplicas.liveReplicas() count
|
// not included in the numReplicas.liveReplicas() count
|
||||||
@ -1389,7 +1389,7 @@ int computeBlockRecoveryWork(int blocksToProcess) {
|
|||||||
int computeRecoveryWorkForBlocks(List<List<BlockInfo>> blocksToRecover) {
|
int computeRecoveryWorkForBlocks(List<List<BlockInfo>> blocksToRecover) {
|
||||||
int requiredReplication, numEffectiveReplicas;
|
int requiredReplication, numEffectiveReplicas;
|
||||||
List<DatanodeDescriptor> containingNodes;
|
List<DatanodeDescriptor> containingNodes;
|
||||||
BlockCollection bc = null;
|
BlockCollection bc;
|
||||||
int additionalReplRequired;
|
int additionalReplRequired;
|
||||||
|
|
||||||
int scheduledWork = 0;
|
int scheduledWork = 0;
|
||||||
@ -1417,13 +1417,10 @@ int computeRecoveryWorkForBlocks(List<List<BlockInfo>> blocksToRecover) {
|
|||||||
containingNodes = new ArrayList<>();
|
containingNodes = new ArrayList<>();
|
||||||
List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
|
List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
|
||||||
NumberReplicas numReplicas = new NumberReplicas();
|
NumberReplicas numReplicas = new NumberReplicas();
|
||||||
List<Short> missingBlockIndices = new LinkedList<>();
|
List<Short> liveBlockIndices = new ArrayList<>();
|
||||||
DatanodeDescriptor[] srcNodes;
|
final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
|
||||||
int numSourceNodes = bc.isStriped() ?
|
containingNodes, liveReplicaNodes, numReplicas,
|
||||||
HdfsConstants.NUM_DATA_BLOCKS : 1;
|
liveBlockIndices, priority);
|
||||||
srcNodes = chooseSourceDatanodes(
|
|
||||||
block, containingNodes, liveReplicaNodes, numReplicas,
|
|
||||||
missingBlockIndices, numSourceNodes, priority);
|
|
||||||
if(srcNodes == null || srcNodes.length == 0) {
|
if(srcNodes == null || srcNodes.length == 0) {
|
||||||
// block can not be replicated from any node
|
// block can not be replicated from any node
|
||||||
LOG.debug("Block " + block + " cannot be recovered " +
|
LOG.debug("Block " + block + " cannot be recovered " +
|
||||||
@ -1455,15 +1452,14 @@ int computeRecoveryWorkForBlocks(List<List<BlockInfo>> blocksToRecover) {
|
|||||||
} else {
|
} else {
|
||||||
additionalReplRequired = 1; // Needed on a new rack
|
additionalReplRequired = 1; // Needed on a new rack
|
||||||
}
|
}
|
||||||
if (bc.isStriped()) {
|
if (block.isStriped()) {
|
||||||
|
short[] indices = new short[liveBlockIndices.size()];
|
||||||
|
for (int i = 0 ; i < liveBlockIndices.size(); i++) {
|
||||||
|
indices[i] = liveBlockIndices.get(i);
|
||||||
|
}
|
||||||
ErasureCodingWork ecw = new ErasureCodingWork(block, bc, srcNodes,
|
ErasureCodingWork ecw = new ErasureCodingWork(block, bc, srcNodes,
|
||||||
containingNodes, liveReplicaNodes, additionalReplRequired,
|
containingNodes, liveReplicaNodes, additionalReplRequired,
|
||||||
priority);
|
priority, indices);
|
||||||
short[] missingBlockArray = new short[missingBlockIndices.size()];
|
|
||||||
for (int i = 0 ; i < missingBlockIndices.size(); i++) {
|
|
||||||
missingBlockArray[i] = missingBlockIndices.get(i);
|
|
||||||
}
|
|
||||||
ecw.setMissingBlockIndices(missingBlockArray);
|
|
||||||
recovWork.add(ecw);
|
recovWork.add(ecw);
|
||||||
} else {
|
} else {
|
||||||
recovWork.add(new ReplicationWork(block, bc, srcNodes,
|
recovWork.add(new ReplicationWork(block, bc, srcNodes,
|
||||||
@ -1543,15 +1539,14 @@ int computeRecoveryWorkForBlocks(List<List<BlockInfo>> blocksToRecover) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Add block to the to be replicated list
|
// Add block to the to be replicated list
|
||||||
if (bc.isStriped()) {
|
if (block.isStriped()) {
|
||||||
assert rw instanceof ErasureCodingWork;
|
assert rw instanceof ErasureCodingWork;
|
||||||
assert rw.targets.length > 0;
|
assert rw.targets.length > 0;
|
||||||
rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
|
rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
|
||||||
new ExtendedBlock(namesystem.getBlockPoolId(), block),
|
new ExtendedBlock(namesystem.getBlockPoolId(), block),
|
||||||
rw.srcNodes, rw.targets,
|
rw.srcNodes, rw.targets,
|
||||||
((ErasureCodingWork)rw).getMissingBlockIndicies());
|
((ErasureCodingWork) rw).liveBlockIndicies);
|
||||||
}
|
} else {
|
||||||
else {
|
|
||||||
rw.srcNodes[0].addBlockToBeReplicated(block, targets);
|
rw.srcNodes[0].addBlockToBeReplicated(block, targets);
|
||||||
}
|
}
|
||||||
scheduledWork++;
|
scheduledWork++;
|
||||||
@ -1581,9 +1576,9 @@ int computeRecoveryWorkForBlocks(List<List<BlockInfo>> blocksToRecover) {
|
|||||||
DatanodeStorageInfo[] targets = rw.targets;
|
DatanodeStorageInfo[] targets = rw.targets;
|
||||||
if (targets != null && targets.length != 0) {
|
if (targets != null && targets.length != 0) {
|
||||||
StringBuilder targetList = new StringBuilder("datanode(s)");
|
StringBuilder targetList = new StringBuilder("datanode(s)");
|
||||||
for (int k = 0; k < targets.length; k++) {
|
for (DatanodeStorageInfo target : targets) {
|
||||||
targetList.append(' ');
|
targetList.append(' ');
|
||||||
targetList.append(targets[k].getDatanodeDescriptor());
|
targetList.append(target.getDatanodeDescriptor());
|
||||||
}
|
}
|
||||||
blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNodes,
|
blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNodes,
|
||||||
rw.block, targetList);
|
rw.block, targetList);
|
||||||
@ -1694,11 +1689,8 @@ List<DatanodeDescriptor> getDatanodeDescriptors(List<String> nodes) {
|
|||||||
* @param numReplicas NumberReplicas instance to be initialized with the
|
* @param numReplicas NumberReplicas instance to be initialized with the
|
||||||
* counts of live, corrupt, excess, and decommissioned
|
* counts of live, corrupt, excess, and decommissioned
|
||||||
* replicas of the given block.
|
* replicas of the given block.
|
||||||
* @param missingBlockIndices List to be populated with indices of missing
|
* @param liveBlockIndices List to be populated with indices of healthy
|
||||||
* blocks in a striped block group or missing
|
* blocks in a striped block group
|
||||||
* replicas of a replicated block
|
|
||||||
* @param numSourceNodes integer specifying the number of source nodes to
|
|
||||||
* choose
|
|
||||||
* @param priority integer representing replication priority of the given
|
* @param priority integer representing replication priority of the given
|
||||||
* block
|
* block
|
||||||
* @return the array of DatanodeDescriptor of the chosen nodes from which to
|
* @return the array of DatanodeDescriptor of the chosen nodes from which to
|
||||||
@ -1709,24 +1701,20 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
|
|||||||
List<DatanodeDescriptor> containingNodes,
|
List<DatanodeDescriptor> containingNodes,
|
||||||
List<DatanodeStorageInfo> nodesContainingLiveReplicas,
|
List<DatanodeStorageInfo> nodesContainingLiveReplicas,
|
||||||
NumberReplicas numReplicas,
|
NumberReplicas numReplicas,
|
||||||
List<Short> missingBlockIndices, int numSourceNodes, int priority) {
|
List<Short> liveBlockIndices, int priority) {
|
||||||
containingNodes.clear();
|
containingNodes.clear();
|
||||||
nodesContainingLiveReplicas.clear();
|
nodesContainingLiveReplicas.clear();
|
||||||
LinkedList<DatanodeDescriptor> srcNodes = new LinkedList<>();
|
List<DatanodeDescriptor> srcNodes = new ArrayList<>();
|
||||||
int live = 0;
|
int live = 0;
|
||||||
int decommissioned = 0;
|
int decommissioned = 0;
|
||||||
int decommissioning = 0;
|
int decommissioning = 0;
|
||||||
int corrupt = 0;
|
int corrupt = 0;
|
||||||
int excess = 0;
|
int excess = 0;
|
||||||
missingBlockIndices.clear();
|
liveBlockIndices.clear();
|
||||||
Set<Short> healthyIndices = new HashSet<>();
|
final boolean isStriped = block.isStriped();
|
||||||
|
|
||||||
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
|
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
|
||||||
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
|
for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
|
||||||
if (block.isStriped()) {
|
|
||||||
healthyIndices.add((short) ((BlockInfoStriped) block).
|
|
||||||
getStorageBlockIndex(storage));
|
|
||||||
}
|
|
||||||
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
||||||
LightWeightLinkedSet<BlockInfo> excessBlocks =
|
LightWeightLinkedSet<BlockInfo> excessBlocks =
|
||||||
excessReplicateMap.get(node.getDatanodeUuid());
|
excessReplicateMap.get(node.getDatanodeUuid());
|
||||||
@ -1765,27 +1753,19 @@ else if (node.isDecommissionInProgress()) {
|
|||||||
if(node.isDecommissioned())
|
if(node.isDecommissioned())
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
// We got this far, current node is a reasonable choice
|
if(isStriped || srcNodes.isEmpty()) {
|
||||||
if(srcNodes.size() < numSourceNodes) {
|
|
||||||
srcNodes.add(node);
|
srcNodes.add(node);
|
||||||
|
if (isStriped) {
|
||||||
|
liveBlockIndices.add((short) ((BlockInfoStriped) block).
|
||||||
|
getStorageBlockIndex(storage));
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// switch to a different node randomly
|
// for replicated block, switch to a different node randomly
|
||||||
// this to prevent from deterministically selecting the same node even
|
// this to prevent from deterministically selecting the same node even
|
||||||
// if the node failed to replicate the block on previous iterations
|
// if the node failed to replicate the block on previous iterations
|
||||||
if(ThreadLocalRandom.current().nextBoolean()) {
|
if (!isStriped && ThreadLocalRandom.current().nextBoolean()) {
|
||||||
int pos = ThreadLocalRandom.current().nextInt(numSourceNodes);
|
srcNodes.set(0, node);
|
||||||
if(!srcNodes.get(pos).isDecommissionInProgress()) {
|
|
||||||
srcNodes.set(pos, node);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (block.isStriped()) {
|
|
||||||
for (short i = 0; i < HdfsConstants.NUM_DATA_BLOCKS +
|
|
||||||
HdfsConstants.NUM_PARITY_BLOCKS; i++) {
|
|
||||||
if (!healthyIndices.contains(i)) {
|
|
||||||
missingBlockIndices.add(i);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(numReplicas != null)
|
if(numReplicas != null)
|
||||||
@ -3889,25 +3869,25 @@ public static LocatedBlock newLocatedBlock(
|
|||||||
* to represent a task to recover a block through replication or erasure
|
* to represent a task to recover a block through replication or erasure
|
||||||
* coding. Recovery is done by transferring data from srcNodes to targets
|
* coding. Recovery is done by transferring data from srcNodes to targets
|
||||||
*/
|
*/
|
||||||
private static class BlockRecoveryWork {
|
private abstract static class BlockRecoveryWork {
|
||||||
protected final BlockInfo block;
|
final BlockInfo block;
|
||||||
protected final BlockCollection bc;
|
final BlockCollection bc;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An erasure coding recovery task has multiple source nodes.
|
* An erasure coding recovery task has multiple source nodes.
|
||||||
* A replication task only has 1 source node, stored on top of the array
|
* A replication task only has 1 source node, stored on top of the array
|
||||||
*/
|
*/
|
||||||
protected final DatanodeDescriptor[] srcNodes;
|
final DatanodeDescriptor[] srcNodes;
|
||||||
/** Nodes containing the block; avoid them in choosing new targets */
|
/** Nodes containing the block; avoid them in choosing new targets */
|
||||||
protected final List<DatanodeDescriptor> containingNodes;
|
final List<DatanodeDescriptor> containingNodes;
|
||||||
/** Required by {@link BlockPlacementPolicy#chooseTarget} */
|
/** Required by {@link BlockPlacementPolicy#chooseTarget} */
|
||||||
protected final List<DatanodeStorageInfo> liveReplicaStorages;
|
final List<DatanodeStorageInfo> liveReplicaStorages;
|
||||||
protected final int additionalReplRequired;
|
final int additionalReplRequired;
|
||||||
|
|
||||||
protected DatanodeStorageInfo[] targets;
|
DatanodeStorageInfo[] targets;
|
||||||
protected final int priority;
|
final int priority;
|
||||||
|
|
||||||
public BlockRecoveryWork(BlockInfo block,
|
BlockRecoveryWork(BlockInfo block,
|
||||||
BlockCollection bc,
|
BlockCollection bc,
|
||||||
DatanodeDescriptor[] srcNodes,
|
DatanodeDescriptor[] srcNodes,
|
||||||
List<DatanodeDescriptor> containingNodes,
|
List<DatanodeDescriptor> containingNodes,
|
||||||
@ -3924,15 +3904,13 @@ public BlockRecoveryWork(BlockInfo block,
|
|||||||
this.targets = null;
|
this.targets = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void chooseTargets(BlockPlacementPolicy blockplacement,
|
abstract void chooseTargets(BlockPlacementPolicy blockplacement,
|
||||||
BlockStoragePolicySuite storagePolicySuite,
|
BlockStoragePolicySuite storagePolicySuite,
|
||||||
Set<Node> excludedNodes) {
|
Set<Node> excludedNodes);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ReplicationWork extends BlockRecoveryWork {
|
private static class ReplicationWork extends BlockRecoveryWork {
|
||||||
|
ReplicationWork(BlockInfo block,
|
||||||
public ReplicationWork(BlockInfo block,
|
|
||||||
BlockCollection bc,
|
BlockCollection bc,
|
||||||
DatanodeDescriptor[] srcNodes,
|
DatanodeDescriptor[] srcNodes,
|
||||||
List<DatanodeDescriptor> containingNodes,
|
List<DatanodeDescriptor> containingNodes,
|
||||||
@ -3944,7 +3922,8 @@ public ReplicationWork(BlockInfo block,
|
|||||||
LOG.debug("Creating a ReplicationWork to recover " + block);
|
LOG.debug("Creating a ReplicationWork to recover " + block);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void chooseTargets(BlockPlacementPolicy blockplacement,
|
@Override
|
||||||
|
void chooseTargets(BlockPlacementPolicy blockplacement,
|
||||||
BlockStoragePolicySuite storagePolicySuite,
|
BlockStoragePolicySuite storagePolicySuite,
|
||||||
Set<Node> excludedNodes) {
|
Set<Node> excludedNodes) {
|
||||||
assert srcNodes.length > 0
|
assert srcNodes.length > 0
|
||||||
@ -3961,30 +3940,23 @@ protected void chooseTargets(BlockPlacementPolicy blockplacement,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static class ErasureCodingWork extends BlockRecoveryWork {
|
private static class ErasureCodingWork extends BlockRecoveryWork {
|
||||||
|
final short[] liveBlockIndicies;
|
||||||
|
|
||||||
private short[] missingBlockIndicies = null;
|
ErasureCodingWork(BlockInfo block,
|
||||||
|
|
||||||
public ErasureCodingWork(BlockInfo block,
|
|
||||||
BlockCollection bc,
|
BlockCollection bc,
|
||||||
DatanodeDescriptor[] srcNodes,
|
DatanodeDescriptor[] srcNodes,
|
||||||
List<DatanodeDescriptor> containingNodes,
|
List<DatanodeDescriptor> containingNodes,
|
||||||
List<DatanodeStorageInfo> liveReplicaStorages,
|
List<DatanodeStorageInfo> liveReplicaStorages,
|
||||||
int additionalReplRequired,
|
int additionalReplRequired,
|
||||||
int priority) {
|
int priority, short[] liveBlockIndicies) {
|
||||||
super(block, bc, srcNodes, containingNodes,
|
super(block, bc, srcNodes, containingNodes,
|
||||||
liveReplicaStorages, additionalReplRequired, priority);
|
liveReplicaStorages, additionalReplRequired, priority);
|
||||||
|
this.liveBlockIndicies = liveBlockIndicies;
|
||||||
LOG.debug("Creating an ErasureCodingWork to recover " + block);
|
LOG.debug("Creating an ErasureCodingWork to recover " + block);
|
||||||
}
|
}
|
||||||
|
|
||||||
public short[] getMissingBlockIndicies() {
|
@Override
|
||||||
return missingBlockIndicies;
|
void chooseTargets(BlockPlacementPolicy blockplacement,
|
||||||
}
|
|
||||||
|
|
||||||
public void setMissingBlockIndices(short[] missingBlockIndicies) {
|
|
||||||
this.missingBlockIndicies = missingBlockIndicies;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void chooseTargets(BlockPlacementPolicy blockplacement,
|
|
||||||
BlockStoragePolicySuite storagePolicySuite,
|
BlockStoragePolicySuite storagePolicySuite,
|
||||||
Set<Node> excludedNodes) {
|
Set<Node> excludedNodes) {
|
||||||
try {
|
try {
|
||||||
|
@ -106,14 +106,14 @@ public static class BlockECRecoveryInfo {
|
|||||||
public final ExtendedBlock block;
|
public final ExtendedBlock block;
|
||||||
public final DatanodeDescriptor[] sources;
|
public final DatanodeDescriptor[] sources;
|
||||||
public final DatanodeStorageInfo[] targets;
|
public final DatanodeStorageInfo[] targets;
|
||||||
public final short[] missingBlockIndices;
|
public final short[] liveBlockIndices;
|
||||||
|
|
||||||
BlockECRecoveryInfo(ExtendedBlock block, DatanodeDescriptor[] sources,
|
BlockECRecoveryInfo(ExtendedBlock block, DatanodeDescriptor[] sources,
|
||||||
DatanodeStorageInfo[] targets, short[] missingBlockIndices) {
|
DatanodeStorageInfo[] targets, short[] liveBlockIndices) {
|
||||||
this.block = block;
|
this.block = block;
|
||||||
this.sources = sources;
|
this.sources = sources;
|
||||||
this.targets = targets;
|
this.targets = targets;
|
||||||
this.missingBlockIndices = missingBlockIndices;
|
this.liveBlockIndices = liveBlockIndices;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -122,6 +122,7 @@ public String toString() {
|
|||||||
append("Recovering ").append(block).
|
append("Recovering ").append(block).
|
||||||
append(" From: ").append(Arrays.asList(sources)).
|
append(" From: ").append(Arrays.asList(sources)).
|
||||||
append(" To: ").append(Arrays.asList(targets)).append(")\n").
|
append(" To: ").append(Arrays.asList(targets)).append(")\n").
|
||||||
|
append(" Block Indices: ").append(Arrays.asList(liveBlockIndices)).
|
||||||
toString();
|
toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -635,10 +636,10 @@ void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
|
|||||||
* Store block erasure coding work.
|
* Store block erasure coding work.
|
||||||
*/
|
*/
|
||||||
void addBlockToBeErasureCoded(ExtendedBlock block, DatanodeDescriptor[] sources,
|
void addBlockToBeErasureCoded(ExtendedBlock block, DatanodeDescriptor[] sources,
|
||||||
DatanodeStorageInfo[] targets, short[] missingBlockIndicies) {
|
DatanodeStorageInfo[] targets, short[] liveBlockIndices) {
|
||||||
assert(block != null && sources != null && sources.length > 0);
|
assert(block != null && sources != null && sources.length > 0);
|
||||||
BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets,
|
BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets,
|
||||||
missingBlockIndicies);
|
liveBlockIndices);
|
||||||
erasurecodeBlocks.offer(task);
|
erasurecodeBlocks.offer(task);
|
||||||
BlockManager.LOG.debug("Adding block recovery task " + task +
|
BlockManager.LOG.debug("Adding block recovery task " + task +
|
||||||
"to " + getName() + ", current queue size is " +
|
"to " + getName() + ", current queue size is " +
|
||||||
@ -679,7 +680,8 @@ int getNumberOfBlocksToBeReplicated() {
|
|||||||
/**
|
/**
|
||||||
* The number of work items that are pending to be replicated
|
* The number of work items that are pending to be replicated
|
||||||
*/
|
*/
|
||||||
int getNumberOfBlocksToBeErasureCoded() {
|
@VisibleForTesting
|
||||||
|
public int getNumberOfBlocksToBeErasureCoded() {
|
||||||
return erasurecodeBlocks.size();
|
return erasurecodeBlocks.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -404,6 +404,7 @@ public final short getFileReplication(int snapshot) {
|
|||||||
|
|
||||||
/** The same as getFileReplication(null). */
|
/** The same as getFileReplication(null). */
|
||||||
@Override // INodeFileAttributes
|
@Override // INodeFileAttributes
|
||||||
|
// TODO striped
|
||||||
public final short getFileReplication() {
|
public final short getFileReplication() {
|
||||||
return getFileReplication(CURRENT_STATE_ID);
|
return getFileReplication(CURRENT_STATE_ID);
|
||||||
}
|
}
|
||||||
|
@ -513,30 +513,33 @@ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception {
|
|||||||
cntNodes,
|
cntNodes,
|
||||||
liveNodes,
|
liveNodes,
|
||||||
new NumberReplicas(),
|
new NumberReplicas(),
|
||||||
new LinkedList<Short>(), 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
|
new ArrayList<Short>(),
|
||||||
|
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
|
||||||
|
|
||||||
assertNull("Does not choose a source node for a less-than-highest-priority"
|
assertEquals("Does not choose a source node for a less-than-highest-priority"
|
||||||
+ " replication since all available source nodes have reached"
|
+ " replication since all available source nodes have reached"
|
||||||
+ " their replication limits.",
|
+ " their replication limits.", 0,
|
||||||
bm.chooseSourceDatanodes(
|
bm.chooseSourceDatanodes(
|
||||||
bm.getStoredBlock(aBlock),
|
bm.getStoredBlock(aBlock),
|
||||||
cntNodes,
|
cntNodes,
|
||||||
liveNodes,
|
liveNodes,
|
||||||
new NumberReplicas(),
|
new NumberReplicas(),
|
||||||
new LinkedList<Short>(), 1, UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED)[0]);
|
new ArrayList<Short>(),
|
||||||
|
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).length);
|
||||||
|
|
||||||
// Increase the replication count to test replication count > hard limit
|
// Increase the replication count to test replication count > hard limit
|
||||||
DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
|
DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
|
||||||
origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
|
origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
|
||||||
|
|
||||||
assertNull("Does not choose a source node for a highest-priority"
|
assertEquals("Does not choose a source node for a highest-priority"
|
||||||
+ " replication when all available nodes exceed the hard limit.",
|
+ " replication when all available nodes exceed the hard limit.", 0,
|
||||||
bm.chooseSourceDatanodes(
|
bm.chooseSourceDatanodes(
|
||||||
bm.getStoredBlock(aBlock),
|
bm.getStoredBlock(aBlock),
|
||||||
cntNodes,
|
cntNodes,
|
||||||
liveNodes,
|
liveNodes,
|
||||||
new NumberReplicas(),
|
new NumberReplicas(),
|
||||||
new LinkedList<Short>(), 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
|
new ArrayList<Short>(),
|
||||||
|
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).length);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -561,26 +564,24 @@ public void testFavorDecomUntilHardLimit() throws Exception {
|
|||||||
bm.getStoredBlock(aBlock),
|
bm.getStoredBlock(aBlock),
|
||||||
cntNodes,
|
cntNodes,
|
||||||
liveNodes,
|
liveNodes,
|
||||||
new NumberReplicas(), new LinkedList<Short>(), 1,
|
new NumberReplicas(), new LinkedList<Short>(),
|
||||||
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED));
|
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED)[0]);
|
||||||
|
|
||||||
|
|
||||||
// Increase the replication count to test replication count > hard limit
|
// Increase the replication count to test replication count > hard limit
|
||||||
DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
|
DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
|
||||||
origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
|
origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
|
||||||
|
|
||||||
assertNull("Does not choose a source decommissioning node for a normal"
|
assertEquals("Does not choose a source decommissioning node for a normal"
|
||||||
+ " replication when all available nodes exceed the hard limit.",
|
+ " replication when all available nodes exceed the hard limit.", 0,
|
||||||
bm.chooseSourceDatanodes(
|
bm.chooseSourceDatanodes(
|
||||||
bm.getStoredBlock(aBlock),
|
bm.getStoredBlock(aBlock),
|
||||||
cntNodes,
|
cntNodes,
|
||||||
liveNodes,
|
liveNodes,
|
||||||
new NumberReplicas(), new LinkedList<Short>(), 1,
|
new NumberReplicas(), new LinkedList<Short>(),
|
||||||
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED));
|
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSafeModeIBR() throws Exception {
|
public void testSafeModeIBR() throws Exception {
|
||||||
DatanodeDescriptor node = spy(nodes.get(0));
|
DatanodeDescriptor node = spy(nodes.get(0));
|
||||||
|
@ -1,107 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
||||||
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Iterator;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.EC_STORAGE_POLICY_NAME;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
public class TestRecoverStripedBlocks {
|
|
||||||
private final short GROUP_SIZE =
|
|
||||||
HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
|
|
||||||
private final short NUM_OF_DATANODES = GROUP_SIZE + 1;
|
|
||||||
private Configuration conf;
|
|
||||||
private MiniDFSCluster cluster;
|
|
||||||
private DistributedFileSystem fs;
|
|
||||||
private static final int BLOCK_SIZE = 1024;
|
|
||||||
private HdfsAdmin dfsAdmin;
|
|
||||||
private FSNamesystem namesystem;
|
|
||||||
private Path ECFilePath;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setupCluster() throws IOException {
|
|
||||||
conf = new HdfsConfiguration();
|
|
||||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
|
||||||
// Large value to make sure the pending replication request can stay in
|
|
||||||
// DatanodeDescriptor.replicateBlocks before test timeout.
|
|
||||||
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
|
|
||||||
// Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via
|
|
||||||
// chooseUnderReplicatedBlocks at once.
|
|
||||||
conf.setInt(
|
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5);
|
|
||||||
|
|
||||||
cluster = new MiniDFSCluster.Builder(conf).
|
|
||||||
numDataNodes(NUM_OF_DATANODES).build();
|
|
||||||
cluster.waitActive();
|
|
||||||
fs = cluster.getFileSystem();
|
|
||||||
dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
|
|
||||||
namesystem = cluster.getNamesystem();
|
|
||||||
ECFilePath = new Path("/ecfile");
|
|
||||||
DFSTestUtil.createFile(fs, ECFilePath, 4 * BLOCK_SIZE, GROUP_SIZE, 0);
|
|
||||||
dfsAdmin.setStoragePolicy(ECFilePath, EC_STORAGE_POLICY_NAME);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMissingStripedBlock() throws Exception {
|
|
||||||
final BlockManager bm = cluster.getNamesystem().getBlockManager();
|
|
||||||
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, ECFilePath);
|
|
||||||
Iterator<DatanodeStorageInfo> storageInfos =
|
|
||||||
bm.blocksMap.getStorages(b.getLocalBlock())
|
|
||||||
.iterator();
|
|
||||||
|
|
||||||
DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor();
|
|
||||||
Iterator<BlockInfo> it = firstDn.getBlockIterator();
|
|
||||||
int missingBlkCnt = 0;
|
|
||||||
while (it.hasNext()) {
|
|
||||||
BlockInfo blk = it.next();
|
|
||||||
BlockManager.LOG.debug("Block " + blk + " will be lost");
|
|
||||||
missingBlkCnt++;
|
|
||||||
}
|
|
||||||
BlockManager.LOG.debug("Missing in total " + missingBlkCnt + " blocks");
|
|
||||||
|
|
||||||
bm.getDatanodeManager().removeDatanode(firstDn);
|
|
||||||
|
|
||||||
bm.computeDatanodeWork();
|
|
||||||
|
|
||||||
short cnt = 0;
|
|
||||||
for (DataNode dn : cluster.getDataNodes()) {
|
|
||||||
DatanodeDescriptor dnDescriptor =
|
|
||||||
bm.getDatanodeManager().getDatanode(dn.getDatanodeUuid());
|
|
||||||
cnt += dnDescriptor.getNumberOfBlocksToBeErasureCoded();
|
|
||||||
}
|
|
||||||
|
|
||||||
assertTrue("Counting the number of outstanding EC tasks", cnt == missingBlkCnt);
|
|
||||||
}
|
|
||||||
}
|
|
@ -224,7 +224,7 @@ public void testAddUCReplica() throws Exception {
|
|||||||
int i = 0;
|
int i = 0;
|
||||||
for (DataNode dn : cluster.getDataNodes()) {
|
for (DataNode dn : cluster.getDataNodes()) {
|
||||||
final Block block = new Block(lastBlock.getBlockId() + i++,
|
final Block block = new Block(lastBlock.getBlockId() + i++,
|
||||||
lastBlock.getGenerationStamp(), 0);
|
0, lastBlock.getGenerationStamp());
|
||||||
DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
|
DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
|
||||||
storageIDs.add(storage.getStorageID());
|
storageIDs.add(storage.getStorageID());
|
||||||
StorageReceivedDeletedBlocks[] reports = DFSTestUtil
|
StorageReceivedDeletedBlocks[] reports = DFSTestUtil
|
||||||
|
@ -0,0 +1,210 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
|
||||||
|
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.EC_STORAGE_POLICY_NAME;
|
||||||
|
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class TestRecoverStripedBlocks {
|
||||||
|
private final short GROUP_SIZE =
|
||||||
|
NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
|
||||||
|
private MiniDFSCluster cluster;
|
||||||
|
private final Path dirPath = new Path("/dir");
|
||||||
|
private Path filePath = new Path(dirPath, "file");
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws IOException {
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
// Large value to make sure the pending replication request can stay in
|
||||||
|
// DatanodeDescriptor.replicateBlocks before test timeout.
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
|
||||||
|
// Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via
|
||||||
|
// chooseUnderReplicatedBlocks at once.
|
||||||
|
conf.setInt(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5);
|
||||||
|
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 1)
|
||||||
|
.build();
|
||||||
|
cluster.waitActive();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
|
||||||
|
int numBlocks) throws Exception {
|
||||||
|
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
dfs.mkdirs(dir);
|
||||||
|
dfs.setStoragePolicy(dir, EC_STORAGE_POLICY_NAME);
|
||||||
|
|
||||||
|
FSDataOutputStream out = null;
|
||||||
|
try {
|
||||||
|
out = dfs.create(file, (short) 1); // create an empty file
|
||||||
|
|
||||||
|
FSNamesystem ns = cluster.getNamesystem();
|
||||||
|
FSDirectory fsdir = ns.getFSDirectory();
|
||||||
|
INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
|
||||||
|
|
||||||
|
ExtendedBlock previous = null;
|
||||||
|
for (int i = 0; i < numBlocks; i++) {
|
||||||
|
Block newBlock = createBlock(cluster.getDataNodes(), ns,
|
||||||
|
file.toString(), fileNode, dfs.getClient().getClientName(),
|
||||||
|
previous);
|
||||||
|
previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
ns.completeFile(file.toString(), dfs.getClient().getClientName(),
|
||||||
|
previous, fileNode.getId());
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(null, out);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static Block createBlock(List<DataNode> dataNodes, FSNamesystem ns,
|
||||||
|
String file, INodeFile fileNode, String clientName,
|
||||||
|
ExtendedBlock previous) throws Exception {
|
||||||
|
ns.getAdditionalBlock(file, fileNode.getId(), clientName, previous, null,
|
||||||
|
null);
|
||||||
|
|
||||||
|
final BlockInfo lastBlock = fileNode.getLastBlock();
|
||||||
|
final int groupSize = fileNode.getBlockReplication();
|
||||||
|
// 1. RECEIVING_BLOCK IBR
|
||||||
|
int i = 0;
|
||||||
|
for (DataNode dn : dataNodes) {
|
||||||
|
if (i < groupSize) {
|
||||||
|
final Block block = new Block(lastBlock.getBlockId() + i++, 0,
|
||||||
|
lastBlock.getGenerationStamp());
|
||||||
|
DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
|
||||||
|
StorageReceivedDeletedBlocks[] reports = DFSTestUtil
|
||||||
|
.makeReportForReceivedBlock(block,
|
||||||
|
ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
|
||||||
|
for (StorageReceivedDeletedBlocks report : reports) {
|
||||||
|
ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. RECEIVED_BLOCK IBR
|
||||||
|
i = 0;
|
||||||
|
for (DataNode dn : dataNodes) {
|
||||||
|
if (i < groupSize) {
|
||||||
|
final Block block = new Block(lastBlock.getBlockId() + i++,
|
||||||
|
BLOCK_STRIPED_CHUNK_SIZE, lastBlock.getGenerationStamp());
|
||||||
|
DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
|
||||||
|
StorageReceivedDeletedBlocks[] reports = DFSTestUtil
|
||||||
|
.makeReportForReceivedBlock(block,
|
||||||
|
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
|
||||||
|
for (StorageReceivedDeletedBlocks report : reports) {
|
||||||
|
ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
lastBlock.setNumBytes(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS);
|
||||||
|
return lastBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMissingStripedBlock() throws Exception {
|
||||||
|
final int numBlocks = 4;
|
||||||
|
createECFile(cluster, filePath, dirPath, numBlocks);
|
||||||
|
|
||||||
|
// make sure the file is complete in NN
|
||||||
|
final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
|
||||||
|
.getINode4Write(filePath.toString()).asFile();
|
||||||
|
assertFalse(fileNode.isUnderConstruction());
|
||||||
|
assertTrue(fileNode.isWithStripedBlocks());
|
||||||
|
BlockInfo[] blocks = fileNode.getBlocks();
|
||||||
|
assertEquals(numBlocks, blocks.length);
|
||||||
|
for (BlockInfo blk : blocks) {
|
||||||
|
assertTrue(blk.isStriped());
|
||||||
|
assertTrue(blk.isComplete());
|
||||||
|
assertEquals(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes());
|
||||||
|
final BlockInfoStriped sb = (BlockInfoStriped) blk;
|
||||||
|
assertEquals(GROUP_SIZE, sb.numNodes());
|
||||||
|
}
|
||||||
|
|
||||||
|
final BlockManager bm = cluster.getNamesystem().getBlockManager();
|
||||||
|
BlockInfo firstBlock = fileNode.getBlocks()[0];
|
||||||
|
DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock);
|
||||||
|
|
||||||
|
DatanodeDescriptor secondDn = storageInfos[1].getDatanodeDescriptor();
|
||||||
|
assertEquals(numBlocks, secondDn.numBlocks());
|
||||||
|
|
||||||
|
bm.getDatanodeManager().removeDatanode(secondDn);
|
||||||
|
|
||||||
|
BlockManagerTestUtil.getComputedDatanodeWork(bm);
|
||||||
|
|
||||||
|
// all the recovery work will be scheduled on the last DN
|
||||||
|
DataNode lastDn = cluster.getDataNodes().get(GROUP_SIZE);
|
||||||
|
DatanodeDescriptor last =
|
||||||
|
bm.getDatanodeManager().getDatanode(lastDn.getDatanodeId());
|
||||||
|
assertEquals("Counting the number of outstanding EC tasks", numBlocks,
|
||||||
|
last.getNumberOfBlocksToBeErasureCoded());
|
||||||
|
List<BlockECRecoveryInfo> recovery = last.getErasureCodeCommand(numBlocks);
|
||||||
|
for (BlockECRecoveryInfo info : recovery) {
|
||||||
|
assertEquals(1, info.targets.length);
|
||||||
|
assertEquals(last, info.targets[0].getDatanodeDescriptor());
|
||||||
|
assertEquals(GROUP_SIZE - 1, info.sources.length);
|
||||||
|
assertEquals(GROUP_SIZE - 1, info.liveBlockIndices.length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user