HDFS-9818. Correctly handle EC reconstruction work caused by not enough racks. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2016-02-19 19:02:23 -08:00
parent 6eae4337d1
commit e54cc29312
12 changed files with 388 additions and 53 deletions

View File

@ -260,7 +260,12 @@ public class DFSStripedInputStream extends DFSInputStream {
private void closeReader(BlockReaderInfo readerInfo) {
if (readerInfo != null) {
// IOUtils.cleanup(null, readerInfo.reader);
if (readerInfo.reader != null) {
try {
readerInfo.reader.close();
} catch (Throwable ignored) {
}
}
readerInfo.skip();
}
}

View File

@ -432,6 +432,9 @@ Trunk (Unreleased)
HDFS-9794. Streamer threads may leak if failure happens when closing the
striped outputstream. (jing9)
HDFS-9818. Correctly handle EC reconstruction work caused by not enough
racks. (jing9)
BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS
HDFS-7347. Configurable erasure coding policy for individual files and

View File

@ -1628,7 +1628,7 @@ public class BlockManager implements BlockStatsMXBean {
for (int i = 0 ; i < liveBlockIndices.size(); i++) {
indices[i] = liveBlockIndices.get(i);
}
return new ErasureCodingWork(block, bc, srcNodes,
return new ErasureCodingWork(getBlockPoolId(), block, bc, srcNodes,
containingNodes, liveReplicaNodes, additionalReplRequired,
priority, indices);
} else {
@ -1638,6 +1638,16 @@ public class BlockManager implements BlockStatsMXBean {
}
}
private boolean isInNewRack(DatanodeDescriptor[] srcs,
DatanodeDescriptor target) {
for (DatanodeDescriptor src : srcs) {
if (src.getNetworkLocation().equals(target.getNetworkLocation())) {
return false;
}
}
return true;
}
private boolean validateReconstructionWork(BlockReconstructionWork rw) {
BlockInfo block = rw.getBlock();
int priority = rw.getPriority();
@ -1665,31 +1675,14 @@ public class BlockManager implements BlockStatsMXBean {
DatanodeStorageInfo[] targets = rw.getTargets();
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
(!isPlacementPolicySatisfied(block)) ) {
if (rw.getSrcNodes()[0].getNetworkLocation().equals(
targets[0].getDatanodeDescriptor().getNetworkLocation())) {
//No use continuing, unless a new rack in this case
if (!isInNewRack(rw.getSrcNodes(), targets[0].getDatanodeDescriptor())) {
// No use continuing, unless a new rack in this case
return false;
}
}
// Add block to the to be reconstructed list
if (block.isStriped()) {
assert rw instanceof ErasureCodingWork;
assert rw.getTargets().length > 0;
assert pendingNum == 0 : "Should wait the previous reconstruction"
+ " to finish";
final ErasureCodingPolicy ecPolicy =
((BlockInfoStriped) block).getErasureCodingPolicy();
assert ecPolicy != null;
rw.getTargets()[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
new ExtendedBlock(getBlockPoolId(), block),
rw.getSrcNodes(), rw.getTargets(),
((ErasureCodingWork) rw).getLiveBlockIndicies(), ecPolicy);
} else {
rw.getSrcNodes()[0].addBlockToBeReplicated(block, targets);
}
// Add block to the datanode's task list
rw.addTaskToDatanode();
DatanodeStorageInfo.incrementBlocksScheduled(targets);
// Move the block-replication into a "pending" state.
@ -3973,7 +3966,8 @@ public class BlockManager implements BlockStatsMXBean {
.getPolicy(storedBlock.isStriped());
int numReplicas = storedBlock.isStriped() ? ((BlockInfoStriped) storedBlock)
.getRealDataBlockNum() : storedBlock.getReplication();
return placementPolicy.verifyBlockPlacement(locs, numReplicas).isPlacementPolicySatisfied();
return placementPolicy.verifyBlockPlacement(locs, numReplicas)
.isPlacementPolicySatisfied();
}
/**

View File

@ -901,7 +901,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
locs = DatanodeDescriptor.EMPTY_ARRAY;
if (!clusterMap.hasClusterEverBeenMultiRack()) {
// only one rack
return new BlockPlacementStatusDefault(1, 1);
return new BlockPlacementStatusDefault(1, 1, 1);
}
int minRacks = 2;
minRacks = Math.min(minRacks, numberOfReplicas);
@ -910,7 +910,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
Set<String> racks = new TreeSet<>();
for (DatanodeInfo dn : locs)
racks.add(dn.getNetworkLocation());
return new BlockPlacementStatusDefault(racks.size(), minRacks);
return new BlockPlacementStatusDefault(racks.size(), minRacks,
clusterMap.getNumOfRacks());
}
/**
* Decide whether deleting the specified replica of the block still makes

View File

@ -160,14 +160,16 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD
locs = DatanodeDescriptor.EMPTY_ARRAY;
if (!clusterMap.hasClusterEverBeenMultiRack()) {
// only one rack
return new BlockPlacementStatusDefault(1, 1);
return new BlockPlacementStatusDefault(1, 1, 1);
}
// 1. Check that all locations are different.
// 2. Count locations on different racks.
Set<String> racks = new TreeSet<String>();
for (DatanodeInfo dn : locs)
Set<String> racks = new TreeSet<>();
for (DatanodeInfo dn : locs) {
racks.add(dn.getNetworkLocation());
return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas);
}
return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas,
clusterMap.getNumOfRacks());
}
@Override

View File

@ -21,15 +21,18 @@ public class BlockPlacementStatusDefault implements BlockPlacementStatus {
private int requiredRacks = 0;
private int currentRacks = 0;
private final int totalRacks;
public BlockPlacementStatusDefault(int currentRacks, int requiredRacks){
public BlockPlacementStatusDefault(int currentRacks, int requiredRacks,
int totalRacks){
this.requiredRacks = requiredRacks;
this.currentRacks = currentRacks;
this.totalRacks = totalRacks;
}
@Override
public boolean isPlacementPolicySatisfied() {
return requiredRacks <= currentRacks;
return requiredRacks <= currentRacks || currentRacks >= totalRacks;
}
@Override
@ -38,7 +41,8 @@ public class BlockPlacementStatusDefault implements BlockPlacementStatus {
return null;
}
return "Block should be additionally replicated on " +
(requiredRacks - currentRacks) + " more rack(s).";
(requiredRacks - currentRacks) +
" more rack(s). Total number of racks in the cluster: " + totalRacks;
}
}

View File

@ -108,4 +108,9 @@ abstract class BlockReconstructionWork {
abstract void chooseTargets(BlockPlacementPolicy blockplacement,
BlockStoragePolicySuite storagePolicySuite,
Set<Node> excludedNodes);
/**
* add reconstruction task into a source datanode
*/
abstract void addTaskToDatanode();
}

View File

@ -17,15 +17,23 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.net.Node;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
class ErasureCodingWork extends BlockReconstructionWork {
private final byte[] liveBlockIndicies;
private final String blockPoolId;
public ErasureCodingWork(BlockInfo block,
public ErasureCodingWork(String blockPoolId, BlockInfo block,
BlockCollection bc,
DatanodeDescriptor[] srcNodes,
List<DatanodeDescriptor> containingNodes,
@ -34,6 +42,7 @@ class ErasureCodingWork extends BlockReconstructionWork {
int priority, byte[] liveBlockIndicies) {
super(block, bc, srcNodes, containingNodes,
liveReplicaStorages, additionalReplRequired, priority);
this.blockPoolId = blockPoolId;
this.liveBlockIndicies = liveBlockIndicies;
BlockManager.LOG.debug("Creating an ErasureCodingWork to {} reconstruct ",
block);
@ -47,15 +56,92 @@ class ErasureCodingWork extends BlockReconstructionWork {
void chooseTargets(BlockPlacementPolicy blockplacement,
BlockStoragePolicySuite storagePolicySuite,
Set<Node> excludedNodes) {
try {
// TODO: new placement policy for EC considering multiple writers
DatanodeStorageInfo[] chosenTargets = blockplacement.chooseTarget(
getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0],
getLiveReplicaStorages(), false, excludedNodes,
getBlock().getNumBytes(),
storagePolicySuite.getPolicy(getBc().getStoragePolicyID()));
setTargets(chosenTargets);
} finally {
// TODO: new placement policy for EC considering multiple writers
DatanodeStorageInfo[] chosenTargets = blockplacement.chooseTarget(
getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0],
getLiveReplicaStorages(), false, excludedNodes,
getBlock().getNumBytes(),
storagePolicySuite.getPolicy(getBc().getStoragePolicyID()));
setTargets(chosenTargets);
}
/**
* @return true if the current source nodes cover all the internal blocks.
* I.e., we only need to have more racks.
*/
private boolean hasAllInternalBlocks() {
final BlockInfoStriped block = (BlockInfoStriped) getBlock();
if (getSrcNodes().length < block.getRealTotalBlockNum()) {
return false;
}
BitSet bitSet = new BitSet(block.getTotalBlockNum());
for (byte index : liveBlockIndicies) {
bitSet.set(index);
}
for (int i = 0; i < block.getRealDataBlockNum(); i++) {
if (!bitSet.get(i)) {
return false;
}
}
for (int i = block.getDataBlockNum(); i < block.getTotalBlockNum(); i++) {
if (!bitSet.get(i)) {
return false;
}
}
return true;
}
/**
* We have all the internal blocks but not enough racks. Thus we do not need
* to do decoding but only simply make an extra copy of an internal block. In
* this scenario, use this method to choose the source datanode for simple
* replication.
* @return The index of the source datanode.
*/
private int chooseSource4SimpleReplication() {
Map<String, List<Integer>> map = new HashMap<>();
for (int i = 0; i < getSrcNodes().length; i++) {
final String rack = getSrcNodes()[i].getNetworkLocation();
List<Integer> dnList = map.get(rack);
if (dnList == null) {
dnList = new ArrayList<>();
map.put(rack, dnList);
}
dnList.add(i);
}
List<Integer> max = null;
for (Map.Entry<String, List<Integer>> entry : map.entrySet()) {
if (max == null || entry.getValue().size() > max.size()) {
max = entry.getValue();
}
}
assert max != null;
return max.get(0);
}
@Override
void addTaskToDatanode() {
assert getTargets().length > 0;
BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();
// if we already have all the internal blocks, but not enough racks,
// we only need to replicate one internal block to a new rack
if (hasAllInternalBlocks()) {
int sourceIndex = chooseSource4SimpleReplication();
final byte blockIndex = liveBlockIndicies[sourceIndex];
final DatanodeDescriptor source = getSrcNodes()[sourceIndex];
final long internBlkLen = StripedBlockUtil.getInternalBlockLength(
stripedBlk.getNumBytes(), stripedBlk.getCellSize(),
stripedBlk.getDataBlockNum(), blockIndex);
final Block targetBlk = new Block(
stripedBlk.getBlockId() + blockIndex, internBlkLen,
stripedBlk.getGenerationStamp());
source.addBlockToBeReplicated(targetBlk, getTargets());
} else {
getTargets()[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
new ExtendedBlock(blockPoolId, stripedBlk),
getSrcNodes(), getTargets(), getLiveBlockIndicies(),
stripedBlk.getErasureCodingPolicy());
}
}
}

View File

@ -53,4 +53,9 @@ class ReplicationWork extends BlockReconstructionWork {
getSrcNodes()[0].decrementPendingReplicationWithoutTargets();
}
}
@Override
void addTaskToDatanode() {
getSrcNodes()[0].addBlockToBeReplicated(getBlock(), getTargets());
}
}

View File

@ -177,8 +177,14 @@ public final class ErasureCodingWorker {
Collection<BlockECReconstructionInfo> ecTasks) {
for (BlockECReconstructionInfo reconstructionInfo : ecTasks) {
try {
EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL
.submit(new ReconstructAndTransferBlock(reconstructionInfo));
ReconstructAndTransferBlock task =
new ReconstructAndTransferBlock(reconstructionInfo);
if (task.hasValidTargets()) {
EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL.submit(task);
} else {
LOG.warn("No missing internal block. Skip reconstruction for task:{}",
reconstructionInfo);
}
} catch (Throwable e) {
LOG.warn("Failed to reconstruct striped block {}",
reconstructionInfo.getExtendedBlock().getLocalBlock(), e);
@ -292,6 +298,7 @@ public final class ErasureCodingWorker {
private final CompletionService<Void> readService =
new ExecutorCompletionService<>(
EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL);
private final boolean hasValidTargets;
ReconstructAndTransferBlock(BlockECReconstructionInfo reconstructionInfo) {
ErasureCodingPolicy ecPolicy = reconstructionInfo
@ -339,10 +346,14 @@ public final class ErasureCodingWorker {
seqNo4Targets[i] = 0;
}
getTargetIndices();
hasValidTargets = getTargetIndices();
cachingStrategy = CachingStrategy.newDefaultStrategy();
}
boolean hasValidTargets() {
return hasValidTargets;
}
private ByteBuffer allocateBuffer(int length) {
return ByteBuffer.allocate(length);
}
@ -505,24 +516,30 @@ public final class ErasureCodingWorker {
}
}
private void getTargetIndices() {
/**
* @return true if there is valid target for reconstruction
*/
private boolean getTargetIndices() {
BitSet bitset = new BitSet(dataBlkNum + parityBlkNum);
for (int i = 0; i < sources.length; i++) {
bitset.set(liveIndices[i]);
}
int m = 0;
int k = 0;
boolean hasValidTarget = false;
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
if (!bitset.get(i)) {
if (getBlockLen(blockGroup, i) > 0) {
if (m < targets.length) {
targetIndices[m++] = (short)i;
hasValidTarget = true;
}
} else {
zeroStripeIndices[k++] = (short)i;
}
}
}
return hasValidTarget;
}
/** the reading length should not exceed the length for reconstruction. */

View File

@ -285,7 +285,7 @@ public class TestBlocksWithNotEnoughRacks {
final DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
try {
// Create a file with one block with a replication factor of 2
// Create a file with one block with a replication factor of 3
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
@ -315,8 +315,9 @@ public class TestBlocksWithNotEnoughRacks {
dm.removeDatanode(dnId);
// Make sure we have enough live replicas even though we are
// short one rack and therefore need one replica
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
// short one rack. The cluster now has only 1 rack thus we just make sure
// we still have 3 replicas.
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
} finally {
cluster.shutdown();
}
@ -357,9 +358,8 @@ public class TestBlocksWithNotEnoughRacks {
// The block gets re-replicated to another datanode so it has a
// sufficient # replicas, but not across racks, so there should
// be 1 rack, and 1 needed replica (even though there are 2 hosts
// available and only 2 replicas required).
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
// be 1 rack.
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
// Start the "failed" datanode, which has a replica so the block is
// now over-replicated and therefore a replica should be removed but

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_DATA_BLOCKS;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_PARITY_BLOCKS;
public class TestReconstructStripedBlocksWithRackAwareness {
public static final Logger LOG = LoggerFactory.getLogger(
TestReconstructStripedBlocksWithRackAwareness.class);
static {
GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.ALL);
GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL);
}
private static final String[] hosts = new String[]{"host1", "host2", "host3",
"host4", "host5", "host6", "host7", "host8", "host9", "host10"};
private static final String[] racks = new String[]{"/r1", "/r1", "/r2", "/r2",
"/r3", "/r3", "/r4", "/r4", "/r5", "/r6"};
private static final List<String> singleNodeRacks = Arrays.asList("host9", "host10");
private static final short blockNum = (short) (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS);
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private FSNamesystem fsn;
private BlockManager bm;
@Before
public void setup() throws Exception {
final HdfsConfiguration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
false);
cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts)
.numDataNodes(hosts.length).build();
cluster.waitActive();
fsn = cluster.getNamesystem();
bm = fsn.getBlockManager();
fs = cluster.getFileSystem();
fs.setErasureCodingPolicy(new Path("/"), null);
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* When there are all the internal blocks available but they are not placed on
* enough racks, NameNode should avoid normal decoding reconstruction but copy
* an internal block to a new rack.
*
* In this test, we first need to create a scenario that a striped block has
* all the internal blocks but distributed in <6 racks. Then we check if the
* replication monitor can correctly schedule the reconstruction work for it.
*
* For the 9 internal blocks + 5 racks setup, the test does the following:
* 1. create a 6 rack cluster with 10 datanodes, where there are 2 racks only
* containing 1 datanodes each
* 2. for a striped block with 9 internal blocks, there must be one internal
* block locating in a single-node rack. find this node and stop it
* 3. namenode will trigger reconstruction for the block and since the cluster
* has only 5 racks remaining, after the reconstruction we have 9 internal
* blocks distributed in 5 racks.
* 4. we bring the datanode back, now the cluster has 6 racks again
* 5. let the datanode call reportBadBlock, this will make the namenode to
* check if the striped block is placed in >= 6 racks, and the namenode will
* put the block into the under-replicated queue
* 6. now we can check if the replication monitor works as expected
*/
@Test
public void testReconstructForNotEnoughRacks() throws Exception {
final Path file = new Path("/foo");
DFSTestUtil.createFile(fs, file,
BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS * 2, (short) 1, 0L);
Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
final INodeFile fileNode = fsn.getFSDirectory()
.getINode4Write(file.toString()).asFile();
BlockInfoStriped blockInfo = (BlockInfoStriped) fileNode.getLastBlock();
// find the internal block located in the single node rack
Block internalBlock = null;
String hostToStop = null;
for (DatanodeStorageInfo storage : blockInfo.storages) {
if (singleNodeRacks.contains(storage.getDatanodeDescriptor().getHostName())) {
hostToStop = storage.getDatanodeDescriptor().getHostName();
internalBlock = blockInfo.getBlockOnStorage(storage);
}
}
Assert.assertNotNull(internalBlock);
Assert.assertNotNull(hostToStop);
// delete the block on the chosen datanode
cluster.corruptBlockOnDataNodesByDeletingBlockFile(
new ExtendedBlock(bm.getBlockPoolId(), internalBlock));
// stop the chosen datanode
MiniDFSCluster.DataNodeProperties dnProp = null;
for (int i = 0; i < cluster.getDataNodes().size(); i++) {
DataNode dn = cluster.getDataNodes().get(i);
if (dn.getDatanodeId().getHostName().equals(hostToStop)) {
dnProp = cluster.stopDataNode(i);
cluster.setDataNodeDead(dn.getDatanodeId());
LOG.info("stop datanode " + dn.getDatanodeId().getHostName());
}
}
NetworkTopology topology = bm.getDatanodeManager().getNetworkTopology();
Assert.assertEquals(5, topology.getNumOfRacks());
// make sure the reconstruction work can finish
// now we have 9 internal blocks in 5 racks
DFSTestUtil.waitForReplication(fs, file, blockNum, 15 * 1000);
// we now should have 9 internal blocks distributed in 5 racks
Set<String> rackSet = new HashSet<>();
for (DatanodeStorageInfo storage : blockInfo.storages) {
rackSet.add(storage.getDatanodeDescriptor().getNetworkLocation());
}
Assert.assertEquals(5, rackSet.size());
// restart the stopped datanode
cluster.restartDataNode(dnProp);
cluster.waitActive();
// make sure we have 6 racks again
topology = bm.getDatanodeManager().getNetworkTopology();
Assert.assertEquals(hosts.length, topology.getNumOfLeaves());
Assert.assertEquals(6, topology.getNumOfRacks());
// pause all the heartbeats
DataNode badDn = null;
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
if (dn.getDatanodeId().getHostName().equals(hostToStop)) {
badDn = dn;
}
}
assert badDn != null;
// let the DN report the bad block, so that the namenode will put the block
// into under-replicated queue. note that the block still has 9 internal
// blocks but in 5 racks
badDn.reportBadBlocks(new ExtendedBlock(bm.getBlockPoolId(), internalBlock));
// check if replication monitor correctly schedule the replication work
boolean scheduled = false;
for (int i = 0; i < 5; i++) { // retry 5 times
for (DatanodeStorageInfo storage : blockInfo.storages) {
if (storage != null) {
DatanodeDescriptor dn = storage.getDatanodeDescriptor();
Assert.assertEquals(0, dn.getNumberOfBlocksToBeErasureCoded());
if (dn.getNumberOfBlocksToBeReplicated() == 1) {
scheduled = true;
}
}
}
if (scheduled) {
break;
}
Thread.sleep(1000);
}
Assert.assertTrue(scheduled);
}
}