HDFS-9866. BlockManager#chooseExcessReplicasStriped may weaken rack fault tolerance. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2016-02-28 14:54:49 -08:00
parent 7634d404b7
commit 408f2c807b
9 changed files with 133 additions and 112 deletions

View File

@ -446,6 +446,9 @@ Trunk (Unreleased)
HDFS-9734. Refactoring of checksum failure report related codes. HDFS-9734. Refactoring of checksum failure report related codes.
(Kai Zheng via zhz) (Kai Zheng via zhz)
HDFS-9866. BlockManager#chooseExcessReplicasStriped may weaken rack fault
tolerance. (jing9)
BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS
HDFS-7347. Configurable erasure coding policy for individual files and HDFS-7347. Configurable erasure coding policy for individual files and

View File

@ -3245,7 +3245,7 @@ private void chooseExcessReplicasContiguous(
DatanodeDescriptor delNodeHint, List<StorageType> excessTypes) { DatanodeDescriptor delNodeHint, List<StorageType> excessTypes) {
BlockPlacementPolicy replicator = placementPolicies.getPolicy(false); BlockPlacementPolicy replicator = placementPolicies.getPolicy(false);
List<DatanodeStorageInfo> replicasToDelete = replicator List<DatanodeStorageInfo> replicasToDelete = replicator
.chooseReplicasToDelete(nonExcess, replication, excessTypes, .chooseReplicasToDelete(nonExcess, nonExcess, replication, excessTypes,
addedNode, delNodeHint); addedNode, delNodeHint);
for (DatanodeStorageInfo choosenReplica : replicasToDelete) { for (DatanodeStorageInfo choosenReplica : replicasToDelete) {
processChosenExcessReplica(nonExcess, choosenReplica, storedBlock); processChosenExcessReplica(nonExcess, choosenReplica, storedBlock);
@ -3316,8 +3316,8 @@ private void chooseExcessReplicasStriped(BlockCollection bc,
internalBlock.setBlockId(storedBlock.getBlockId() + targetIndex); internalBlock.setBlockId(storedBlock.getBlockId() + targetIndex);
while (candidates.size() > 1) { while (candidates.size() > 1) {
List<DatanodeStorageInfo> replicasToDelete = placementPolicy List<DatanodeStorageInfo> replicasToDelete = placementPolicy
.chooseReplicasToDelete(candidates, (short) 1, excessTypes, null, .chooseReplicasToDelete(nonExcess, candidates, (short) 1,
null); excessTypes, null, null);
for (DatanodeStorageInfo chosen : replicasToDelete) { for (DatanodeStorageInfo chosen : replicasToDelete) {
processChosenExcessReplica(nonExcess, chosen, storedBlock); processChosenExcessReplica(nonExcess, chosen, storedBlock);
candidates.remove(chosen); candidates.remove(chosen);

View File

@ -77,8 +77,6 @@ public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
BlockStoragePolicy storagePolicy); BlockStoragePolicy storagePolicy);
/** /**
* Same as {@link #chooseTarget(String, int, Node, Set, long, List, StorageType)}
* with added parameter {@code favoredDatanodes}
* @param favoredNodes datanodes that should be favored as targets. This * @param favoredNodes datanodes that should be favored as targets. This
* is only a hint and due to cluster state, namenode may not be * is only a hint and due to cluster state, namenode may not be
* able to place the blocks on these datanodes. * able to place the blocks on these datanodes.
@ -106,17 +104,21 @@ DatanodeStorageInfo[] chooseTarget(String src,
* @param numOfReplicas replica number of file to be verified * @param numOfReplicas replica number of file to be verified
* @return the result of verification * @return the result of verification
*/ */
abstract public BlockPlacementStatus verifyBlockPlacement( public abstract BlockPlacementStatus verifyBlockPlacement(
DatanodeInfo[] locs, int numOfReplicas); DatanodeInfo[] locs, int numOfReplicas);
/** /**
* Select the excess replica storages for deletion based on either * Select the excess replica storages for deletion based on either
* delNodehint/Excess storage types. * delNodehint/Excess storage types.
* *
* @param candidates * @param availableReplicas
* available replicas * available replicas
* @param delCandidates
* Candidates for deletion. For normal replication, this set is the
* same with availableReplicas. For striped blocks, this set is a
* subset of availableReplicas.
* @param expectedNumOfReplicas * @param expectedNumOfReplicas
* The required number of replicas for this block * The expected number of replicas remaining in the delCandidates
* @param excessTypes * @param excessTypes
* type of the storagepolicy * type of the storagepolicy
* @param addedNode * @param addedNode
@ -125,10 +127,12 @@ abstract public BlockPlacementStatus verifyBlockPlacement(
* Hint for excess storage selection * Hint for excess storage selection
* @return Returns the list of excess replicas chosen for deletion * @return Returns the list of excess replicas chosen for deletion
*/ */
abstract public List<DatanodeStorageInfo> chooseReplicasToDelete( public abstract List<DatanodeStorageInfo> chooseReplicasToDelete(
Collection<DatanodeStorageInfo> candidates, int expectedNumOfReplicas, Collection<DatanodeStorageInfo> availableReplicas,
Collection<DatanodeStorageInfo> delCandidates, int expectedNumOfReplicas,
List<StorageType> excessTypes, DatanodeDescriptor addedNode, List<StorageType> excessTypes, DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint); DatanodeDescriptor delNodeHint);
/** /**
* Used to setup a BlockPlacementPolicy object. This should be defined by * Used to setup a BlockPlacementPolicy object. This should be defined by
* all implementations of a BlockPlacementPolicy. * all implementations of a BlockPlacementPolicy.
@ -137,7 +141,7 @@ abstract public List<DatanodeStorageInfo> chooseReplicasToDelete(
* @param stats retrieve cluster status from here * @param stats retrieve cluster status from here
* @param clusterMap cluster topology * @param clusterMap cluster topology
*/ */
abstract protected void initialize(Configuration conf, FSClusterStats stats, protected abstract void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap, NetworkTopology clusterMap,
Host2NodesMap host2datanodeMap); Host2NodesMap host2datanodeMap);
@ -149,7 +153,7 @@ abstract protected void initialize(Configuration conf, FSClusterStats stats,
* @param source source replica of the move * @param source source replica of the move
* @param target target replica of the move * @param target target replica of the move
*/ */
abstract public boolean isMovable(Collection<DatanodeInfo> candidates, public abstract boolean isMovable(Collection<DatanodeInfo> candidates,
DatanodeInfo source, DatanodeInfo target); DatanodeInfo source, DatanodeInfo target);
/** /**
@ -191,10 +195,8 @@ protected <T> DatanodeInfo getDatanodeInfo(T datanode) {
"class " + datanode.getClass().getName() + " not allowed"); "class " + datanode.getClass().getName() + " not allowed");
if (datanode instanceof DatanodeInfo) { if (datanode instanceof DatanodeInfo) {
return ((DatanodeInfo)datanode); return ((DatanodeInfo)datanode);
} else if (datanode instanceof DatanodeStorageInfo) {
return ((DatanodeStorageInfo)datanode).getDatanodeDescriptor();
} else { } else {
return null; return ((DatanodeStorageInfo)datanode).getDatanodeDescriptor();
} }
} }
@ -209,35 +211,37 @@ protected String getRack(final DatanodeInfo datanode) {
/** /**
* Split data nodes into two sets, one set includes nodes on rack with * Split data nodes into two sets, one set includes nodes on rack with
* more than one replica, the other set contains the remaining nodes. * more than one replica, the other set contains the remaining nodes.
* *
* @param storagesOrDataNodes DatanodeStorageInfo/DatanodeInfo to be split * @param availableSet all the available DataNodes/storages of the block
* @param candidates DatanodeStorageInfo/DatanodeInfo to be split
* into two sets * into two sets
* @param rackMap a map from rack to datanodes * @param rackMap a map from rack to datanodes
* @param moreThanOne contains nodes on rack with more than one replica * @param moreThanOne contains nodes on rack with more than one replica
* @param exactlyOne remains contains the remaining nodes * @param exactlyOne remains contains the remaining nodes
*/ */
public <T> void splitNodesWithRack( public <T> void splitNodesWithRack(
final Iterable<T> storagesOrDataNodes, final Iterable<T> availableSet,
final Collection<T> candidates,
final Map<String, List<T>> rackMap, final Map<String, List<T>> rackMap,
final List<T> moreThanOne, final List<T> moreThanOne,
final List<T> exactlyOne) { final List<T> exactlyOne) {
for(T s: storagesOrDataNodes) { for(T s: availableSet) {
final String rackName = getRack(getDatanodeInfo(s)); final String rackName = getRack(getDatanodeInfo(s));
List<T> storageList = rackMap.get(rackName); List<T> storageList = rackMap.get(rackName);
if (storageList == null) { if (storageList == null) {
storageList = new ArrayList<T>(); storageList = new ArrayList<>();
rackMap.put(rackName, storageList); rackMap.put(rackName, storageList);
} }
storageList.add(s); storageList.add(s);
} }
// split nodes into two sets for (T candidate : candidates) {
for(List<T> storageList : rackMap.values()) { final String rackName = getRack(getDatanodeInfo(candidate));
if (storageList.size() == 1) { if (rackMap.get(rackName).size() == 1) {
// exactlyOne contains nodes on rack with only one replica // exactlyOne contains nodes on rack with only one replica
exactlyOne.add(storageList.get(0)); exactlyOne.add(candidate);
} else { } else {
// moreThanOne contains nodes on rack with more than one replica // moreThanOne contains nodes on rack with more than one replica
moreThanOne.addAll(storageList); moreThanOne.add(candidate);
} }
} }
} }

View File

@ -972,7 +972,8 @@ public DatanodeStorageInfo chooseReplicaToDelete(
@Override @Override
public List<DatanodeStorageInfo> chooseReplicasToDelete( public List<DatanodeStorageInfo> chooseReplicasToDelete(
Collection<DatanodeStorageInfo> candidates, Collection<DatanodeStorageInfo> availableReplicas,
Collection<DatanodeStorageInfo> delCandidates,
int expectedNumOfReplicas, int expectedNumOfReplicas,
List<StorageType> excessTypes, List<StorageType> excessTypes,
DatanodeDescriptor addedNode, DatanodeDescriptor addedNode,
@ -985,28 +986,29 @@ public List<DatanodeStorageInfo> chooseReplicasToDelete(
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>(); final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>(); final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
// split nodes into two sets // split candidate nodes for deletion into two sets
// moreThanOne contains nodes on rack with more than one replica // moreThanOne contains nodes on rack with more than one replica
// exactlyOne contains the remaining nodes // exactlyOne contains the remaining nodes
splitNodesWithRack(candidates, rackMap, moreThanOne, exactlyOne); splitNodesWithRack(availableReplicas, delCandidates, rackMap, moreThanOne,
exactlyOne);
// pick one node to delete that favors the delete hint // pick one node to delete that favors the delete hint
// otherwise pick one with least space from priSet if it is not empty // otherwise pick one with least space from priSet if it is not empty
// otherwise one node with least space from remains // otherwise one node with least space from remains
boolean firstOne = true; boolean firstOne = true;
final DatanodeStorageInfo delNodeHintStorage = final DatanodeStorageInfo delNodeHintStorage =
DatanodeStorageInfo.getDatanodeStorageInfo(candidates, delNodeHint); DatanodeStorageInfo.getDatanodeStorageInfo(delCandidates, delNodeHint);
final DatanodeStorageInfo addedNodeStorage = final DatanodeStorageInfo addedNodeStorage =
DatanodeStorageInfo.getDatanodeStorageInfo(candidates, addedNode); DatanodeStorageInfo.getDatanodeStorageInfo(delCandidates, addedNode);
while (candidates.size() - expectedNumOfReplicas > excessReplicas.size()) { while (delCandidates.size() - expectedNumOfReplicas > excessReplicas.size()) {
final DatanodeStorageInfo cur; final DatanodeStorageInfo cur;
if (firstOne && useDelHint(delNodeHintStorage, addedNodeStorage, if (firstOne && useDelHint(delNodeHintStorage, addedNodeStorage,
moreThanOne, exactlyOne, excessTypes)) { moreThanOne, exactlyOne, excessTypes)) {
cur = delNodeHintStorage; cur = delNodeHintStorage;
} else { // regular excessive replica removal } else { // regular excessive replica removal
cur = chooseReplicaToDelete(moreThanOne, exactlyOne, excessTypes, cur = chooseReplicaToDelete(moreThanOne, exactlyOne,
rackMap); excessTypes, rackMap);
} }
firstOne = false; firstOne = false;
if (cur == null) { if (cur == null) {
@ -1056,7 +1058,7 @@ public boolean isMovable(Collection<DatanodeInfo> locs,
final Map<String, List<DatanodeInfo>> rackMap = new HashMap<>(); final Map<String, List<DatanodeInfo>> rackMap = new HashMap<>();
final List<DatanodeInfo> moreThanOne = new ArrayList<>(); final List<DatanodeInfo> moreThanOne = new ArrayList<>();
final List<DatanodeInfo> exactlyOne = new ArrayList<>(); final List<DatanodeInfo> exactlyOne = new ArrayList<>();
splitNodesWithRack(locs, rackMap, moreThanOne, exactlyOne); splitNodesWithRack(locs, locs, rackMap, moreThanOne, exactlyOne);
return notReduceNumOfGroups(moreThanOne, source, target); return notReduceNumOfGroups(moreThanOne, source, target);
} }

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet; import java.util.BitSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -137,6 +138,11 @@ void addTaskToDatanode() {
stripedBlk.getBlockId() + blockIndex, internBlkLen, stripedBlk.getBlockId() + blockIndex, internBlkLen,
stripedBlk.getGenerationStamp()); stripedBlk.getGenerationStamp());
source.addBlockToBeReplicated(targetBlk, getTargets()); source.addBlockToBeReplicated(targetBlk, getTargets());
if (BlockManager.LOG.isDebugEnabled()) {
BlockManager.LOG.debug("Add replication task from source {} to " +
"targets {} for EC block {}", source, Arrays.toString(getTargets()),
targetBlk);
}
} else { } else {
getTargets()[0].getDatanodeDescriptor().addBlockToBeErasureCoded( getTargets()[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
new ExtendedBlock(blockPoolId, stripedBlk), new ExtendedBlock(blockPoolId, stripedBlk),

View File

@ -23,8 +23,9 @@
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -39,9 +40,8 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Arrays; import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
@ -55,14 +55,13 @@ public class TestReconstructStripedBlocksWithRackAwareness {
static { static {
GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.ALL); GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.ALL);
GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL); GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL);
GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
} }
private static final String[] hosts = new String[]{"host1", "host2", "host3", private static final String[] hosts = new String[]{"host1", "host2", "host3",
"host4", "host5", "host6", "host7", "host8", "host9", "host10"}; "host4", "host5", "host6", "host7", "host8", "host9", "host10"};
private static final String[] racks = new String[]{"/r1", "/r1", "/r2", "/r2", private static final String[] racks = new String[]{"/r1", "/r1", "/r2", "/r2",
"/r3", "/r3", "/r4", "/r4", "/r5", "/r6"}; "/r3", "/r3", "/r4", "/r4", "/r5", "/r6"};
private static final List<String> singleNodeRacks = Arrays.asList("host9", "host10");
private static final short blockNum = (short) (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS);
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private DistributedFileSystem fs; private DistributedFileSystem fs;
@ -94,6 +93,20 @@ public void tearDown() {
} }
} }
private MiniDFSCluster.DataNodeProperties stopDataNode(String hostname)
throws IOException {
MiniDFSCluster.DataNodeProperties dnProp = null;
for (int i = 0; i < cluster.getDataNodes().size(); i++) {
DataNode dn = cluster.getDataNodes().get(i);
if (dn.getDatanodeId().getHostName().equals(hostname)) {
dnProp = cluster.stopDataNode(i);
cluster.setDataNodeDead(dn.getDatanodeId());
LOG.info("stop datanode " + dn.getDatanodeId().getHostName());
}
}
return dnProp;
}
/** /**
* When there are all the internal blocks available but they are not placed on * When there are all the internal blocks available but they are not placed on
* enough racks, NameNode should avoid normal decoding reconstruction but copy * enough racks, NameNode should avoid normal decoding reconstruction but copy
@ -102,24 +115,13 @@ public void tearDown() {
* In this test, we first need to create a scenario that a striped block has * In this test, we first need to create a scenario that a striped block has
* all the internal blocks but distributed in <6 racks. Then we check if the * all the internal blocks but distributed in <6 racks. Then we check if the
* replication monitor can correctly schedule the reconstruction work for it. * replication monitor can correctly schedule the reconstruction work for it.
*
* For the 9 internal blocks + 5 racks setup, the test does the following:
* 1. create a 6 rack cluster with 10 datanodes, where there are 2 racks only
* containing 1 datanodes each
* 2. for a striped block with 9 internal blocks, there must be one internal
* block locating in a single-node rack. find this node and stop it
* 3. namenode will trigger reconstruction for the block and since the cluster
* has only 5 racks remaining, after the reconstruction we have 9 internal
* blocks distributed in 5 racks.
* 4. we bring the datanode back, now the cluster has 6 racks again
* 5. let the datanode call reportBadBlock, this will make the namenode to
* check if the striped block is placed in >= 6 racks, and the namenode will
* put the block into the under-replicated queue
* 6. now we can check if the replication monitor works as expected
*/ */
@Test @Test
public void testReconstructForNotEnoughRacks() throws Exception { public void testReconstructForNotEnoughRacks() throws Exception {
MiniDFSCluster.DataNodeProperties host10 = stopDataNode("host10");
final Path file = new Path("/foo"); final Path file = new Path("/foo");
// the file's block is in 9 dn but 5 racks
DFSTestUtil.createFile(fs, file, DFSTestUtil.createFile(fs, file,
BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS * 2, (short) 1, 0L); BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS * 2, (short) 1, 0L);
Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks()); Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
@ -128,39 +130,6 @@ public void testReconstructForNotEnoughRacks() throws Exception {
.getINode4Write(file.toString()).asFile(); .getINode4Write(file.toString()).asFile();
BlockInfoStriped blockInfo = (BlockInfoStriped) fileNode.getLastBlock(); BlockInfoStriped blockInfo = (BlockInfoStriped) fileNode.getLastBlock();
// find the internal block located in the single node rack
Block internalBlock = null;
String hostToStop = null;
for (DatanodeStorageInfo storage : blockInfo.storages) {
if (singleNodeRacks.contains(storage.getDatanodeDescriptor().getHostName())) {
hostToStop = storage.getDatanodeDescriptor().getHostName();
internalBlock = blockInfo.getBlockOnStorage(storage);
}
}
Assert.assertNotNull(internalBlock);
Assert.assertNotNull(hostToStop);
// delete the block on the chosen datanode
cluster.corruptBlockOnDataNodesByDeletingBlockFile(
new ExtendedBlock(bm.getBlockPoolId(), internalBlock));
// stop the chosen datanode
MiniDFSCluster.DataNodeProperties dnProp = null;
for (int i = 0; i < cluster.getDataNodes().size(); i++) {
DataNode dn = cluster.getDataNodes().get(i);
if (dn.getDatanodeId().getHostName().equals(hostToStop)) {
dnProp = cluster.stopDataNode(i);
cluster.setDataNodeDead(dn.getDatanodeId());
LOG.info("stop datanode " + dn.getDatanodeId().getHostName());
}
}
NetworkTopology topology = bm.getDatanodeManager().getNetworkTopology();
Assert.assertEquals(5, topology.getNumOfRacks());
// make sure the reconstruction work can finish
// now we have 9 internal blocks in 5 racks
DFSTestUtil.waitForReplication(fs, file, blockNum, 15 * 1000);
// we now should have 9 internal blocks distributed in 5 racks // we now should have 9 internal blocks distributed in 5 racks
Set<String> rackSet = new HashSet<>(); Set<String> rackSet = new HashSet<>();
for (DatanodeStorageInfo storage : blockInfo.storages) { for (DatanodeStorageInfo storage : blockInfo.storages) {
@ -169,27 +138,25 @@ public void testReconstructForNotEnoughRacks() throws Exception {
Assert.assertEquals(5, rackSet.size()); Assert.assertEquals(5, rackSet.size());
// restart the stopped datanode // restart the stopped datanode
cluster.restartDataNode(dnProp); cluster.restartDataNode(host10);
cluster.waitActive(); cluster.waitActive();
// make sure we have 6 racks again // make sure we have 6 racks again
topology = bm.getDatanodeManager().getNetworkTopology(); NetworkTopology topology = bm.getDatanodeManager().getNetworkTopology();
Assert.assertEquals(hosts.length, topology.getNumOfLeaves()); Assert.assertEquals(hosts.length, topology.getNumOfLeaves());
Assert.assertEquals(6, topology.getNumOfRacks()); Assert.assertEquals(6, topology.getNumOfRacks());
// pause all the heartbeats // pause all the heartbeats
DataNode badDn = null;
for (DataNode dn : cluster.getDataNodes()) { for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
if (dn.getDatanodeId().getHostName().equals(hostToStop)) {
badDn = dn;
}
} }
assert badDn != null;
// let the DN report the bad block, so that the namenode will put the block fsn.writeLock();
// into under-replicated queue. note that the block still has 9 internal try {
// blocks but in 5 racks bm.processMisReplicatedBlocks();
badDn.reportBadBlocks(new ExtendedBlock(bm.getBlockPoolId(), internalBlock)); } finally {
fsn.writeUnlock();
}
// check if replication monitor correctly schedule the replication work // check if replication monitor correctly schedule the replication work
boolean scheduled = false; boolean scheduled = false;
@ -210,4 +177,42 @@ public void testReconstructForNotEnoughRacks() throws Exception {
} }
Assert.assertTrue(scheduled); Assert.assertTrue(scheduled);
} }
@Test
public void testChooseExcessReplicasToDelete() throws Exception {
MiniDFSCluster.DataNodeProperties host10 = stopDataNode("host10");
final Path file = new Path("/foo");
DFSTestUtil.createFile(fs, file,
BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS * 2, (short) 1, 0L);
// stop host1
MiniDFSCluster.DataNodeProperties host1 = stopDataNode("host1");
// bring host10 back
cluster.restartDataNode(host10);
cluster.waitActive();
// wait for reconstruction to finish
final short blockNum = (short) (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS);
DFSTestUtil.waitForReplication(fs, file, blockNum, 15 * 1000);
// restart host1
cluster.restartDataNode(host1);
cluster.waitActive();
for (DataNode dn : cluster.getDataNodes()) {
if (dn.getDatanodeId().getHostName().equals("host1")) {
DataNodeTestUtils.triggerBlockReport(dn);
break;
}
}
// make sure the excess replica is detected, and we delete host1's replica
// so that we have 6 racks
DFSTestUtil.waitForReplication(fs, file, blockNum, 15 * 1000);
LocatedBlocks blks = fs.getClient().getLocatedBlocks(file.toString(), 0);
LocatedStripedBlock block = (LocatedStripedBlock) blks.getLastLocatedBlock();
for (DatanodeInfo dn : block.getLocations()) {
Assert.assertFalse(dn.getHostName().equals("host1"));
}
}
} }

View File

@ -965,7 +965,8 @@ public void testChooseReplicaToDelete() throws Exception {
List<DatanodeStorageInfo> first = new ArrayList<>(); List<DatanodeStorageInfo> first = new ArrayList<>();
List<DatanodeStorageInfo> second = new ArrayList<>(); List<DatanodeStorageInfo> second = new ArrayList<>();
replicator.splitNodesWithRack(replicaList, rackMap, first, second); replicator.splitNodesWithRack(replicaList, replicaList, rackMap, first,
second);
// storages[0] and storages[1] are in first set as their rack has two // storages[0] and storages[1] are in first set as their rack has two
// replica nodes, while storages[2] and dataNodes[5] are in second set. // replica nodes, while storages[2] and dataNodes[5] are in second set.
assertEquals(2, first.size()); assertEquals(2, first.size());
@ -1018,7 +1019,7 @@ public void testChooseReplicasToDelete() throws Exception {
DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor(); DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor();
List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3, List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess)); DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), delHintNode); excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
assertTrue(excessReplicas.size() == 1); assertTrue(excessReplicas.size() == 1);
assertTrue(excessReplicas.contains(storages[0])); assertTrue(excessReplicas.contains(storages[0]));
@ -1031,7 +1032,7 @@ public void testChooseReplicasToDelete() throws Exception {
nonExcess.add(excessStorage); nonExcess.add(excessStorage);
excessTypes = storagePolicy.chooseExcess((short) 3, excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess)); DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), null); excessTypes, storages[3].getDatanodeDescriptor(), null);
assertTrue(excessReplicas.contains(excessStorage)); assertTrue(excessReplicas.contains(excessStorage));
@ -1051,7 +1052,7 @@ public void testChooseReplicasToDelete() throws Exception {
nonExcess.add(storages[5]); nonExcess.add(storages[5]);
excessTypes = storagePolicy.chooseExcess((short) 3, excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess)); DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), excessTypes, storages[3].getDatanodeDescriptor(),
storages[5].getDatanodeDescriptor()); storages[5].getDatanodeDescriptor());
assertEquals(1, excessReplicas.size()); assertEquals(1, excessReplicas.size());
@ -1070,7 +1071,7 @@ public void testChooseReplicasToDelete() throws Exception {
nonExcess.add(storages[3]); nonExcess.add(storages[3]);
excessTypes = storagePolicy.chooseExcess((short) 3, excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess)); DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[1].getDatanodeDescriptor(), excessTypes, storages[1].getDatanodeDescriptor(),
storages[3].getDatanodeDescriptor()); storages[3].getDatanodeDescriptor());
assertEquals(1, excessReplicas.size()); assertEquals(1, excessReplicas.size());
@ -1084,7 +1085,7 @@ public void testChooseReplicasToDelete() throws Exception {
nonExcess.add(storages[2]); nonExcess.add(storages[2]);
excessTypes = storagePolicy.chooseExcess((short) 1, excessTypes = storagePolicy.chooseExcess((short) 1,
DatanodeStorageInfo.toStorageTypes(nonExcess)); DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 1, excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 1,
excessTypes, storages[2].getDatanodeDescriptor(), null); excessTypes, storages[2].getDatanodeDescriptor(), null);
assertEquals(1, excessReplicas.size()); assertEquals(1, excessReplicas.size());
assertTrue(excessReplicas.contains(excessSSD)); assertTrue(excessReplicas.contains(excessSSD));
@ -1104,7 +1105,7 @@ public void testChooseReplicasToDelete() throws Exception {
nonExcess.add(storages[5]); nonExcess.add(storages[5]);
excessTypes = storagePolicy.chooseExcess((short) 2, excessTypes = storagePolicy.chooseExcess((short) 2,
DatanodeStorageInfo.toStorageTypes(nonExcess)); DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 2, excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 2,
excessTypes, null, null); excessTypes, null, null);
assertEquals(0, excessReplicas.size()); assertEquals(0, excessReplicas.size());
} }

View File

@ -637,7 +637,7 @@ public void testChooseReplicaToDelete() throws Exception {
List<DatanodeStorageInfo> first = new ArrayList<>(); List<DatanodeStorageInfo> first = new ArrayList<>();
List<DatanodeStorageInfo> second = new ArrayList<>(); List<DatanodeStorageInfo> second = new ArrayList<>();
replicator.splitNodesWithRack( replicator.splitNodesWithRack(replicaList,
replicaList, rackMap, first, second); replicaList, rackMap, first, second);
assertEquals(3, first.size()); assertEquals(3, first.size());
assertEquals(1, second.size()); assertEquals(1, second.size());

View File

@ -330,7 +330,7 @@ public void testChooseReplicasToDelete() throws Exception {
DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor(); DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor();
List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3, List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess)); DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), delHintNode); excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
assertTrue(excessReplicas.size() == 1); assertTrue(excessReplicas.size() == 1);
assertTrue(excessReplicas.contains(storages[0])); assertTrue(excessReplicas.contains(storages[0]));
@ -340,7 +340,7 @@ public void testChooseReplicasToDelete() throws Exception {
delHintNode = storages[1].getDatanodeDescriptor(); delHintNode = storages[1].getDatanodeDescriptor();
excessTypes = storagePolicy.chooseExcess((short) 3, excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess)); DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), delHintNode); excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
assertTrue(excessReplicas.size() == 1); assertTrue(excessReplicas.size() == 1);
assertTrue(excessReplicas.contains(storages[0])); assertTrue(excessReplicas.contains(storages[0]));
@ -353,7 +353,7 @@ public void testChooseReplicasToDelete() throws Exception {
nonExcess.add(storages[8]); nonExcess.add(storages[8]);
excessTypes = storagePolicy.chooseExcess((short) 3, excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess)); DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[8].getDatanodeDescriptor(), null); excessTypes, storages[8].getDatanodeDescriptor(), null);
assertTrue(excessReplicas.size() == 1); assertTrue(excessReplicas.size() == 1);
assertTrue(excessReplicas.contains(storages[1])); assertTrue(excessReplicas.contains(storages[1]));
@ -366,7 +366,7 @@ public void testChooseReplicasToDelete() throws Exception {
nonExcess.add(storages[5]); nonExcess.add(storages[5]);
excessTypes = storagePolicy.chooseExcess((short) 3, excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess)); DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[8].getDatanodeDescriptor(), null); excessTypes, storages[8].getDatanodeDescriptor(), null);
assertTrue(excessReplicas.size() == 1); assertTrue(excessReplicas.size() == 1);
assertTrue(excessReplicas.contains(storages[1]) || assertTrue(excessReplicas.contains(storages[1]) ||
@ -384,7 +384,7 @@ public void testChooseReplicasToDelete() throws Exception {
nonExcess.add(excessStorage); nonExcess.add(excessStorage);
excessTypes = storagePolicy.chooseExcess((short) 3, excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess)); DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), null); excessTypes, storages[3].getDatanodeDescriptor(), null);
assertTrue(excessReplicas.size() == 2); assertTrue(excessReplicas.size() == 2);
assertTrue(excessReplicas.contains(storages[0])); assertTrue(excessReplicas.contains(storages[0]));
@ -416,7 +416,7 @@ public void testChooseReplicasToDelete() throws Exception {
nonExcess.add(storages[8]); nonExcess.add(storages[8]);
excessTypes = storagePolicy.chooseExcess((short) 3, excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess)); DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), excessTypes, storages[3].getDatanodeDescriptor(),
storages[7].getDatanodeDescriptor()); storages[7].getDatanodeDescriptor());
assertEquals(1, excessReplicas.size()); assertEquals(1, excessReplicas.size());