HDFS-9837. BlockManager#countNodes should be able to detect duplicated internal blocks. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2016-02-24 11:49:56 -08:00
parent 954dd57043
commit 47b92f2b6f
7 changed files with 361 additions and 163 deletions

View File

@ -435,6 +435,9 @@ Trunk (Unreleased)
HDFS-9818. Correctly handle EC reconstruction work caused by not enough
racks. (jing9)
HDFS-9837. BlockManager#countNodes should be able to detect duplicated
internal blocks. (jing9)
BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS
HDFS-7347. Configurable erasure coding policy for individual files and

View File

@ -23,6 +23,9 @@
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
*
@ -227,4 +230,47 @@ final boolean hasNoStorage() {
}
return true;
}
static class StorageAndBlockIndex {
final DatanodeStorageInfo storage;
final byte blockIndex;
StorageAndBlockIndex(DatanodeStorageInfo storage, byte blockIndex) {
this.storage = storage;
this.blockIndex = blockIndex;
}
}
public Iterable<StorageAndBlockIndex> getStorageAndIndexInfos() {
return new Iterable<StorageAndBlockIndex>() {
@Override
public Iterator<StorageAndBlockIndex> iterator() {
return new Iterator<StorageAndBlockIndex>() {
private int index = 0;
@Override
public boolean hasNext() {
while (index < getCapacity() && getStorageInfo(index) == null) {
index++;
}
return index < getCapacity();
}
@Override
public StorageAndBlockIndex next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
int i = index++;
return new StorageAndBlockIndex(storages[i], indices[i]);
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove is not supported");
}
};
}
};
}
}

View File

@ -71,8 +71,10 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped.StorageAndBlockIndex;
import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@ -1816,76 +1818,62 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
containingNodes.clear();
nodesContainingLiveReplicas.clear();
List<DatanodeDescriptor> srcNodes = new ArrayList<>();
int live = 0;
int readonly = 0;
int decommissioned = 0;
int decommissioning = 0;
int corrupt = 0;
int excess = 0;
liveBlockIndices.clear();
final boolean isStriped = block.isStriped();
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
BitSet bitSet = isStriped ?
new BitSet(((BlockInfoStriped) block).getTotalBlockNum()) : null;
for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
LightWeightHashSet<BlockInfo> excessBlocks =
excessReplicateMap.get(node.getDatanodeUuid());
int countableReplica = storage.getState() == State.NORMAL ? 1 : 0;
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
corrupt += countableReplica;
else if (node.isDecommissionInProgress()) {
decommissioning += countableReplica;
} else if (node.isDecommissioned()) {
decommissioned += countableReplica;
} else if (excessBlocks != null && excessBlocks.contains(block)) {
excess += countableReplica;
} else {
final StoredReplicaState state = checkReplicaOnStorage(numReplicas, block,
storage, corruptReplicas.getNodes(block), false);
if (state == StoredReplicaState.LIVE) {
nodesContainingLiveReplicas.add(storage);
live += countableReplica;
}
if (storage.getState() == State.READ_ONLY_SHARED) {
readonly++;
}
containingNodes.add(node);
// Check if this replica is corrupt
// If so, do not select the node as src node
if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
// do not select corrupted replica as src. also do not select the block
// that is already in excess map
if (state == StoredReplicaState.CORRUPT ||
state == StoredReplicaState.EXCESS) {
continue;
}
if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY
&& !node.isDecommissionInProgress()
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
{
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
continue; // already reached replication limit
}
if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit)
{
if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) {
continue;
}
// the block must not be scheduled for removal on srcNode
if(excessBlocks != null && excessBlocks.contains(block))
continue;
// never use already decommissioned nodes
if(node.isDecommissioned())
if (node.isDecommissioned()) {
continue;
}
if(isStriped || srcNodes.isEmpty()) {
srcNodes.add(node);
if (isStriped) {
liveBlockIndices.add(((BlockInfoStriped) block).
getStorageBlockIndex(storage));
byte blockIndex = ((BlockInfoStriped) block).
getStorageBlockIndex(storage);
liveBlockIndices.add(blockIndex);
if (!bitSet.get(blockIndex)) {
bitSet.set(blockIndex);
} else if (state == StoredReplicaState.LIVE) {
numReplicas.subtract(StoredReplicaState.LIVE, 1);
numReplicas.add(StoredReplicaState.REDUNDANT, 1);
}
}
continue;
}
// for replicated block, switch to a different node randomly
// this to prevent from deterministically selecting the same node even
// if the node failed to replicate the block on previous iterations
if (!isStriped && ThreadLocalRandom.current().nextBoolean()) {
if (ThreadLocalRandom.current().nextBoolean()) {
srcNodes.set(0, node);
}
}
if(numReplicas != null)
numReplicas.set(live, readonly, decommissioned, decommissioning, corrupt,
excess, 0);
return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]);
}
@ -2872,8 +2860,8 @@ private Block addStoredBlock(final BlockInfo block,
// Now check for completion of blocks and safe block count
NumberReplicas num = countNodes(storedBlock);
int numLiveReplicas = num.liveReplicas();
int numCurrentReplica = numLiveReplicas
+ pendingReplications.getNumReplicas(storedBlock);
int pendingNum = pendingReplications.getNumReplicas(storedBlock);
int numCurrentReplica = numLiveReplicas + pendingNum;
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
hasMinStorage(storedBlock, numLiveReplicas)) {
@ -2907,7 +2895,7 @@ private Block addStoredBlock(final BlockInfo block,
} else {
updateNeededReplications(storedBlock, curReplicaDelta, 0);
}
if (numCurrentReplica > fileReplication) {
if (shouldProcessOverReplicated(num, pendingNum, fileReplication)) {
processOverReplicatedBlock(storedBlock, fileReplication, node, delNodeHint);
}
// If the file replication has reached desired value
@ -2925,6 +2913,13 @@ private Block addStoredBlock(final BlockInfo block,
return storedBlock;
}
private boolean shouldProcessOverReplicated(NumberReplicas num,
int pendingNum, int expectedNum) {
int numCurrent = num.liveReplicas() + pendingNum;
return numCurrent > expectedNum ||
(numCurrent == expectedNum && num.redundantInternalBlocks() > 0);
}
/**
* Invalidate corrupt replicas.
* <p>
@ -3129,7 +3124,7 @@ private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
// calculate current replication
short expectedReplication = getExpectedReplicaNum(block);
NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas();
final int numCurrentReplica = num.liveReplicas();
// add to under-replicated queue if need to be
if (isNeededReplication(block, numCurrentReplica)) {
if (neededReplications.add(block, numCurrentReplica, num.readOnlyReplicas(),
@ -3138,7 +3133,7 @@ private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
}
}
if (numCurrentReplica > expectedReplication) {
if (shouldProcessOverReplicated(num, 0, expectedReplication)) {
if (num.replicasOnStaleNodes() > 0) {
// If any of the replicas of this block are on nodes that are
// considered "stale", then these replicas may in fact have
@ -3666,46 +3661,94 @@ private void processIncrementalBlockReport(final DatanodeDescriptor node,
* Return the number of nodes hosting a given block, grouped
* by the state of those replicas.
* For a striped block, this includes nodes storing blocks belonging to the
* striped block group.
* striped block group. But note we exclude duplicated internal block replicas
* for calculating {@link NumberReplicas#liveReplicas}.
*/
public NumberReplicas countNodes(Block b) {
int decommissioned = 0;
int decommissioning = 0;
int live = 0;
int readonly = 0;
int corrupt = 0;
int excess = 0;
int stale = 0;
public NumberReplicas countNodes(BlockInfo b) {
return countNodes(b, false);
}
private NumberReplicas countNodes(BlockInfo b, boolean inStartupSafeMode) {
NumberReplicas numberReplicas = new NumberReplicas();
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
if (storage.getState() == State.FAILED) {
continue;
} else if (storage.getState() == State.READ_ONLY_SHARED) {
readonly++;
continue;
}
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
corrupt++;
} else if (node.isDecommissionInProgress()) {
decommissioning++;
} else if (node.isDecommissioned()) {
decommissioned++;
} else {
LightWeightHashSet<BlockInfo> blocksExcess = excessReplicateMap.get(
node.getDatanodeUuid());
if (blocksExcess != null && blocksExcess.contains(b)) {
excess++;
} else {
live++;
}
}
if (storage.areBlockContentsStale()) {
stale++;
if (b.isStriped()) {
countReplicasForStripedBlock(numberReplicas, (BlockInfoStriped) b,
nodesCorrupt, inStartupSafeMode);
} else {
for (DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
checkReplicaOnStorage(numberReplicas, b, storage, nodesCorrupt,
inStartupSafeMode);
}
}
return new NumberReplicas(live, readonly, decommissioned, decommissioning,
corrupt, excess, stale);
return numberReplicas;
}
private StoredReplicaState checkReplicaOnStorage(NumberReplicas counters,
BlockInfo b, DatanodeStorageInfo storage,
Collection<DatanodeDescriptor> nodesCorrupt, boolean inStartupSafeMode) {
final StoredReplicaState s;
if (storage.getState() == State.NORMAL) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if (nodesCorrupt != null && nodesCorrupt.contains(node)) {
s = StoredReplicaState.CORRUPT;
} else if (inStartupSafeMode) {
s = StoredReplicaState.LIVE;
counters.add(s, 1);
return s;
} else if (node.isDecommissionInProgress()) {
s = StoredReplicaState.DECOMMISSIONING;
} else if (node.isDecommissioned()) {
s = StoredReplicaState.DECOMMISSIONED;
} else if (isExcess(node, b)) {
s = StoredReplicaState.EXCESS;
} else {
s = StoredReplicaState.LIVE;
}
counters.add(s, 1);
if (storage.areBlockContentsStale()) {
counters.add(StoredReplicaState.STALESTORAGE, 1);
}
} else if (!inStartupSafeMode &&
storage.getState() == State.READ_ONLY_SHARED) {
s = StoredReplicaState.READONLY;
counters.add(s, 1);
} else {
s = null;
}
return s;
}
/**
* For a striped block, it is possible it contains full number of internal
* blocks (i.e., 9 by default), but with duplicated replicas of the same
* internal block. E.g., for the following list of internal blocks
* b0, b0, b1, b2, b3, b4, b5, b6, b7
* we have 9 internal blocks but we actually miss b8.
* We should use this method to detect the above scenario and schedule
* necessary reconstruction.
*/
private void countReplicasForStripedBlock(NumberReplicas counters,
BlockInfoStriped block, Collection<DatanodeDescriptor> nodesCorrupt,
boolean inStartupSafeMode) {
BitSet bitSet = new BitSet(block.getTotalBlockNum());
for (StorageAndBlockIndex si : block.getStorageAndIndexInfos()) {
StoredReplicaState state = checkReplicaOnStorage(counters, block,
si.storage, nodesCorrupt, inStartupSafeMode);
if (state == StoredReplicaState.LIVE) {
if (!bitSet.get(si.blockIndex)) {
bitSet.set(si.blockIndex);
} else {
counters.subtract(StoredReplicaState.LIVE, 1);
counters.add(StoredReplicaState.REDUNDANT, 1);
}
}
}
}
private boolean isExcess(DatanodeDescriptor node, BlockInfo block) {
LightWeightHashSet<BlockInfo> blocksExcess = excessReplicateMap.get(
node.getDatanodeUuid());
return blocksExcess != null && blocksExcess.contains(block);
}
/**
@ -3719,21 +3762,8 @@ public NumberReplicas countNodes(Block b) {
* @return count of live nodes for this block
*/
int countLiveNodes(BlockInfo b) {
if (!namesystem.isInStartupSafeMode()) {
return countNodes(b).liveReplicas();
}
// else proceed with fast case
int live = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
for (DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
if (storage.getState() != State.NORMAL) {
continue;
}
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node)))
live++;
}
return live;
final boolean inStartupSafeMode = namesystem.isInStartupSafeMode();
return countNodes(b, inStartupSafeMode).liveReplicas();
}
/**
@ -3752,9 +3782,8 @@ void processOverReplicatedBlocksOnReCommission(
final BlockInfo block = it.next();
int expectedReplication = this.getReplication(block);
NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas();
if (numCurrentReplica > expectedReplication) {
// over-replicated block
if (shouldProcessOverReplicated(num, 0, expectedReplication)) {
// over-replicated block
processOverReplicatedBlock(block, (short) expectedReplication, null,
null);
numOverReplicated++;
@ -3890,7 +3919,7 @@ public void checkReplication(BlockCollection bc) {
neededReplications.add(block, n.liveReplicas() + pending,
n.readOnlyReplicas(),
n.decommissionedAndDecommissioning(), expected);
} else if (n.liveReplicas() > expected) {
} else if (shouldProcessOverReplicated(n, 0, expected)) {
processOverReplicatedBlock(block, expected, null, null);
}
}

View File

@ -17,59 +17,49 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.util.EnumCounters;
import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.CORRUPT;
import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.DECOMMISSIONED;
import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.DECOMMISSIONING;
import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.EXCESS;
import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.LIVE;
import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.REDUNDANT;
import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.STALESTORAGE;
import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.READONLY;
/**
* A immutable object that stores the number of live replicas and
* the number of decommissioned Replicas.
*/
public class NumberReplicas {
private int liveReplicas;
private int readOnlyReplicas;
public class NumberReplicas extends EnumCounters<NumberReplicas.StoredReplicaState> {
// Tracks only the decommissioning replicas
private int decommissioning;
// Tracks only the decommissioned replicas
private int decommissioned;
private int corruptReplicas;
private int excessReplicas;
private int replicasOnStaleNodes;
NumberReplicas() {
this(0, 0, 0, 0, 0, 0, 0);
public enum StoredReplicaState {
// live replicas. for a striped block, this value excludes redundant
// replicas for the same internal block
LIVE,
READONLY,
DECOMMISSIONING,
DECOMMISSIONED,
CORRUPT,
// excess replicas already tracked by blockmanager's excess map
EXCESS,
STALESTORAGE,
// for striped blocks only. number of redundant internal block replicas
// that have not been tracked by blockmanager yet (i.e., not in excess)
REDUNDANT
}
NumberReplicas(int live, int readonly, int decommissioned,
int decommissioning, int corrupt, int excess, int stale) {
set(live, readonly, decommissioned, decommissioning, corrupt, excess, stale);
}
void set(int live, int readonly, int decommissioned, int decommissioning,
int corrupt, int excess, int stale) {
liveReplicas = live;
readOnlyReplicas = readonly;
this.decommissioning = decommissioning;
this.decommissioned = decommissioned;
corruptReplicas = corrupt;
excessReplicas = excess;
replicasOnStaleNodes = stale;
public NumberReplicas() {
super(StoredReplicaState.class);
}
public int liveReplicas() {
return liveReplicas;
return (int) get(LIVE);
}
public int readOnlyReplicas() {
return readOnlyReplicas;
}
/**
*
* @return decommissioned replicas + decommissioning replicas
* It is deprecated by decommissionedAndDecommissioning
* due to its misleading name.
*/
@Deprecated
public int decommissionedReplicas() {
return decommissionedAndDecommissioning();
return (int) get(READONLY);
}
/**
@ -77,7 +67,7 @@ public int decommissionedReplicas() {
* @return decommissioned and decommissioning replicas
*/
public int decommissionedAndDecommissioning() {
return decommissioned + decommissioning;
return (int) (get(DECOMMISSIONED) + get(DECOMMISSIONING));
}
/**
@ -85,7 +75,7 @@ public int decommissionedAndDecommissioning() {
* @return decommissioned replicas only
*/
public int decommissioned() {
return decommissioned;
return (int) get(DECOMMISSIONED);
}
/**
@ -93,15 +83,15 @@ public int decommissioned() {
* @return decommissioning replicas only
*/
public int decommissioning() {
return decommissioning;
return (int) get(DECOMMISSIONING);
}
public int corruptReplicas() {
return corruptReplicas;
return (int) get(CORRUPT);
}
public int excessReplicas() {
return excessReplicas;
return (int) get(EXCESS);
}
/**
@ -110,6 +100,10 @@ public int excessReplicas() {
* replica may count as both "live" and "stale".
*/
public int replicasOnStaleNodes() {
return replicasOnStaleNodes;
return (int) get(STALESTORAGE);
}
public int redundantInternalBlocks() {
return (int) get(REDUNDANT);
}
}

View File

@ -437,7 +437,7 @@ private void mockBlockManagerForBlockSafeDecrement() {
doReturn(storedBlock).when(bm).getStoredBlock(any(Block.class));
NumberReplicas numberReplicas = mock(NumberReplicas.class);
when(numberReplicas.liveReplicas()).thenReturn(0);
doReturn(numberReplicas).when(bm).countNodes(any(Block.class));
doReturn(numberReplicas).when(bm).countNodes(any(BlockInfo.class));
}
/**

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -40,9 +41,11 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestAddOverReplicatedStripedBlocks {
@ -68,6 +71,7 @@ public void setup() throws IOException {
// disable block recovery
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
SimulatedFSDataset.setFactory(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.waitActive();
@ -201,26 +205,35 @@ public void testProcessOverReplicatedAndCorruptStripedBlock()
} finally {
cluster.getNamesystem().writeUnlock();
}
assertEquals(1, bm.countNodes(blockInfo).corruptReplicas());
assertEquals(1, bm.countNodes(bm.getStoredBlock(blockInfo))
.corruptReplicas());
// let a internal block be over replicated with 2 redundant block.
blk.setBlockId(groupId + 2);
cluster.injectBlocks(numDNs - 3, Arrays.asList(blk), bpid);
cluster.injectBlocks(numDNs - 2, Arrays.asList(blk), bpid);
// update blocksMap
cluster.triggerBlockReports();
// add to invalidates
cluster.triggerHeartbeats();
// datanode delete block
cluster.triggerHeartbeats();
// update blocksMap
cluster.triggerBlockReports();
// verify that all internal blocks exists
lbs = cluster.getNameNodeRpc().getBlockLocations(
filePath.toString(), 0, fileLen);
StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
// verify that all internal blocks exists except b0
// the redundant internal blocks will not be deleted before the corrupted
// block gets reconstructed. but since we set
// DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY to 0, the reconstruction will
// not happen
lbs = cluster.getNameNodeRpc().getBlockLocations(filePath.toString(), 0,
fileLen);
bg = (LocatedStripedBlock) (lbs.get(0));
assertEquals(GROUP_SIZE + 1, bg.getBlockIndices().length);
assertEquals(GROUP_SIZE + 1, bg.getLocations().length);
BitSet set = new BitSet(GROUP_SIZE);
for (byte index : bg.getBlockIndices()) {
set.set(index);
}
Assert.assertFalse(set.get(0));
for (int i = 1; i < GROUP_SIZE; i++) {
assertTrue(set.get(i));
}
}
@Test
@ -260,11 +273,21 @@ public void testProcessOverReplicatedAndMissingStripedBlock()
// update blocksMap
cluster.triggerBlockReports();
// Since one block is missing, when over-replicated blocks got deleted,
// we are left GROUP_SIZE - 1 blocks.
lbs = cluster.getNameNodeRpc().getBlockLocations(
filePath.toString(), 0, fileLen);
StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
// Since one block is missing, then over-replicated blocks will not be
// deleted until reconstruction happens
lbs = cluster.getNameNodeRpc().getBlockLocations(filePath.toString(), 0,
fileLen);
bg = (LocatedStripedBlock) (lbs.get(0));
assertEquals(GROUP_SIZE + 1, bg.getBlockIndices().length);
assertEquals(GROUP_SIZE + 1, bg.getLocations().length);
BitSet set = new BitSet(GROUP_SIZE);
for (byte index : bg.getBlockIndices()) {
set.set(index);
}
Assert.assertFalse(set.get(GROUP_SIZE - 1));
for (int i = 0; i < GROUP_SIZE - 1; i++) {
assertTrue(set.get(i));
}
}
}

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
@ -36,11 +37,18 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.BitSet;
import java.util.List;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
@ -51,6 +59,8 @@
import static org.junit.Assert.assertTrue;
public class TestReconstructStripedBlocks {
public static final Logger LOG = LoggerFactory.getLogger(
TestReconstructStripedBlocks.class);
private static final int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
private final short GROUP_SIZE =
(short) (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS);
@ -233,4 +243,97 @@ private static int getNumberOfBlocksToBeErasureCoded(MiniDFSCluster cluster)
}
return count;
}
/**
* make sure the NN can detect the scenario where there are enough number of
* internal blocks (>=9 by default) but there is still missing data/parity
* block.
*/
@Test
public void testCountLiveReplicas() throws Exception {
final HdfsConfiguration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 2)
.build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
try {
fs.mkdirs(dirPath);
fs.setErasureCodingPolicy(dirPath, null);
DFSTestUtil.createFile(fs, filePath,
BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS * 2, (short) 1, 0L);
// stop a dn
LocatedBlocks blks = fs.getClient().getLocatedBlocks(filePath.toString(), 0);
LocatedStripedBlock block = (LocatedStripedBlock) blks.getLastLocatedBlock();
DatanodeInfo dnToStop = block.getLocations()[0];
MiniDFSCluster.DataNodeProperties dnProp =
cluster.stopDataNode(dnToStop.getXferAddr());
cluster.setDataNodeDead(dnToStop);
// wait for reconstruction to happen
DFSTestUtil.waitForReplication(fs, filePath, GROUP_SIZE, 15 * 1000);
// bring the dn back: 10 internal blocks now
cluster.restartDataNode(dnProp);
cluster.waitActive();
// stop another dn: 9 internal blocks, but only cover 8 real one
dnToStop = block.getLocations()[1];
cluster.stopDataNode(dnToStop.getXferAddr());
cluster.setDataNodeDead(dnToStop);
// currently namenode is able to track the missing block. but restart NN
cluster.restartNameNode(true);
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerBlockReport(dn);
}
FSNamesystem fsn = cluster.getNamesystem();
BlockManager bm = fsn.getBlockManager();
Thread.sleep(3000); // wait 3 running cycles of replication monitor
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerHeartbeat(dn);
}
// check if NN can detect the missing internal block and finish the
// reconstruction
boolean reconstructed = false;
for (int i = 0; i < 5; i++) {
NumberReplicas num = null;
fsn.readLock();
try {
BlockInfo blockInfo = cluster.getNamesystem().getFSDirectory()
.getINode4Write(filePath.toString()).asFile().getLastBlock();
num = bm.countNodes(blockInfo);
} finally {
fsn.readUnlock();
}
if (num.liveReplicas() >= GROUP_SIZE) {
reconstructed = true;
break;
} else {
Thread.sleep(1000);
}
}
Assert.assertTrue(reconstructed);
blks = fs.getClient().getLocatedBlocks(filePath.toString(), 0);
block = (LocatedStripedBlock) blks.getLastLocatedBlock();
BitSet bitSet = new BitSet(GROUP_SIZE);
for (byte index : block.getBlockIndices()) {
bitSet.set(index);
}
for (int i = 0; i < GROUP_SIZE; i++) {
Assert.assertTrue(bitSet.get(i));
}
} finally {
cluster.shutdown();
}
}
}