HDFS-9205. Do not schedule corrupt blocks for replication. (szetszwo)

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-10-15 18:07:09 +08:00
parent 63020c54c1
commit 5411dc559d
7 changed files with 148 additions and 203 deletions

View File

@ -1531,6 +1531,8 @@ Release 2.8.0 - UNRELEASED
HDFS-9188. Make block corruption related tests FsDataset-agnostic. (lei)
HDFS-9205. Do not schedule corrupt blocks for replication. (szetszwo)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -785,6 +785,7 @@ public LocatedBlock convertLastBlockToUnderConstruction(
// Remove block from replication queue.
NumberReplicas replicas = countNodes(lastBlock);
neededReplications.remove(lastBlock, replicas.liveReplicas(),
replicas.readOnlyReplicas(),
replicas.decommissionedAndDecommissioning(), getReplication(lastBlock));
pendingReplications.remove(lastBlock);
@ -1795,6 +1796,7 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
nodesContainingLiveReplicas.clear();
List<DatanodeDescriptor> srcNodes = new ArrayList<>();
int live = 0;
int readonly = 0;
int decommissioned = 0;
int decommissioning = 0;
int corrupt = 0;
@ -1820,6 +1822,9 @@ else if (node.isDecommissionInProgress()) {
nodesContainingLiveReplicas.add(storage);
live += countableReplica;
}
if (storage.getState() == State.READ_ONLY_SHARED) {
readonly++;
}
containingNodes.add(node);
// Check if this replica is corrupt
// If so, do not select the node as src node
@ -1858,7 +1863,7 @@ else if (node.isDecommissionInProgress()) {
}
}
if(numReplicas != null)
numReplicas.initialize(live, decommissioned, decommissioning, corrupt,
numReplicas.set(live, readonly, decommissioned, decommissioning, corrupt,
excess, 0);
return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]);
}
@ -1883,7 +1888,7 @@ private void processPendingReplications() {
}
NumberReplicas num = countNodes(timedOutItems[i]);
if (isNeededReplication(bi, num.liveReplicas())) {
neededReplications.add(bi, num.liveReplicas(),
neededReplications.add(bi, num.liveReplicas(), num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(), getReplication(bi));
}
}
@ -2799,6 +2804,7 @@ private Block addStoredBlock(final BlockInfo block,
short fileReplication = getExpectedReplicaNum(storedBlock);
if (!isNeededReplication(storedBlock, numCurrentReplica)) {
neededReplications.remove(storedBlock, numCurrentReplica,
num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(), fileReplication);
} else {
updateNeededReplications(storedBlock, curReplicaDelta, 0);
@ -3043,8 +3049,8 @@ private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
int numCurrentReplica = num.liveReplicas();
// add to under-replicated queue if need to be
if (isNeededReplication(block, numCurrentReplica)) {
if (neededReplications.add(block, numCurrentReplica, num
.decommissionedAndDecommissioning(), expectedReplication)) {
if (neededReplications.add(block, numCurrentReplica, num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(), expectedReplication)) {
return MisReplicationResult.UNDER_REPLICATED;
}
}
@ -3583,15 +3589,22 @@ public void processIncrementalBlockReport(final DatanodeID nodeID,
* For a striped block, this includes nodes storing blocks belonging to the
* striped block group.
*/
public NumberReplicas countNodes(BlockInfo b) {
public NumberReplicas countNodes(Block b) {
int decommissioned = 0;
int decommissioning = 0;
int live = 0;
int readonly = 0;
int corrupt = 0;
int excess = 0;
int stale = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
if (storage.getState() == State.FAILED) {
continue;
} else if (storage.getState() == State.READ_ONLY_SHARED) {
readonly++;
continue;
}
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
corrupt++;
@ -3612,7 +3625,8 @@ public NumberReplicas countNodes(BlockInfo b) {
stale++;
}
}
return new NumberReplicas(live, decommissioned, decommissioning, corrupt, excess, stale);
return new NumberReplicas(live, readonly, decommissioned, decommissioning,
corrupt, excess, stale);
}
/**
@ -3765,13 +3779,13 @@ private void updateNeededReplications(final BlockInfo block,
NumberReplicas repl = countNodes(block);
int curExpectedReplicas = getReplication(block);
if (isNeededReplication(block, repl.liveReplicas())) {
neededReplications.update(block, repl.liveReplicas(), repl
.decommissionedAndDecommissioning(), curExpectedReplicas,
neededReplications.update(block, repl.liveReplicas(), repl.readOnlyReplicas(),
repl.decommissionedAndDecommissioning(), curExpectedReplicas,
curReplicasDelta, expectedReplicasDelta);
} else {
int oldReplicas = repl.liveReplicas()-curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
neededReplications.remove(block, oldReplicas,
neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(),
repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
}
} finally {
@ -3792,6 +3806,7 @@ public void checkReplication(BlockCollection bc) {
final int pending = pendingReplications.getNumReplicas(block);
if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
neededReplications.add(block, n.liveReplicas() + pending,
n.readOnlyReplicas(),
n.decommissionedAndDecommissioning(), expected);
} else if (n.liveReplicas() > expected) {
processOverReplicatedBlock(block, expected, null, null);

View File

@ -545,7 +545,7 @@ private void processBlocksForDecomInternal(
blockManager.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode.
blockManager.neededReplications.add(block,
liveReplicas,
liveReplicas, num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(),
blockManager.getExpectedReplicaNum(block));
}

View File

@ -23,6 +23,7 @@
*/
public class NumberReplicas {
private int liveReplicas;
private int readOnlyReplicas;
// Tracks only the decommissioning replicas
private int decommissioning;
@ -33,17 +34,18 @@ public class NumberReplicas {
private int replicasOnStaleNodes;
NumberReplicas() {
initialize(0, 0, 0, 0, 0, 0);
this(0, 0, 0, 0, 0, 0, 0);
}
NumberReplicas(int live, int decommissioned, int decommissioning, int corrupt,
int excess, int stale) {
initialize(live, decommissioned, decommissioning, corrupt, excess, stale);
NumberReplicas(int live, int readonly, int decommissioned,
int decommissioning, int corrupt, int excess, int stale) {
set(live, readonly, decommissioned, decommissioning, corrupt, excess, stale);
}
void initialize(int live, int decommissioned, int decommissioning,
int corrupt, int excess, int stale) {
void set(int live, int readonly, int decommissioned, int decommissioning,
int corrupt, int excess, int stale) {
liveReplicas = live;
readOnlyReplicas = readonly;
this.decommissioning = decommissioning;
this.decommissioned = decommissioned;
corruptReplicas = corrupt;
@ -55,6 +57,10 @@ public int liveReplicas() {
return liveReplicas;
}
public int readOnlyReplicas() {
return readOnlyReplicas;
}
/**
*
* @return decommissioned replicas + decommissioning replicas

View File

@ -19,9 +19,11 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
/**
* Keep prioritized queues of under replicated blocks.
@ -34,7 +36,7 @@
*
* <p/>
* The policy for choosing which priority to give added blocks
* is implemented in {@link #getPriority(BlockInfo, int, int, int)}.
* is implemented in {@link #getPriority(BlockInfo, int, int, int, int)}.
* </p>
* <p>The queue order is as follows:</p>
* <ol>
@ -147,6 +149,7 @@ synchronized boolean contains(BlockInfo block) {
*/
private int getPriority(BlockInfo block,
int curReplicas,
int readOnlyReplicas,
int decommissionedReplicas,
int expectedReplicas) {
assert curReplicas >= 0 : "Negative replicas!";
@ -159,19 +162,24 @@ private int getPriority(BlockInfo block,
return getPriorityStriped(curReplicas, decommissionedReplicas,
sblk.getRealDataBlockNum(), sblk.getParityBlockNum());
} else {
return getPriorityContiguous(curReplicas, decommissionedReplicas,
expectedReplicas);
return getPriorityContiguous(curReplicas, readOnlyReplicas,
decommissionedReplicas, expectedReplicas);
}
}
private int getPriorityContiguous(int curReplicas, int decommissionedReplicas,
int expectedReplicas) {
private int getPriorityContiguous(int curReplicas, int readOnlyReplicas,
int decommissionedReplicas, int expectedReplicas) {
if (curReplicas == 0) {
// If there are zero non-decommissioned replicas but there are
// some decommissioned replicas, then assign them highest priority
if (decommissionedReplicas > 0) {
return QUEUE_HIGHEST_PRIORITY;
}
if (readOnlyReplicas > 0) {
// only has read-only replicas, highest risk
// since the read-only replicas may go down all together.
return QUEUE_HIGHEST_PRIORITY;
}
//all we have are corrupt blocks
return QUEUE_WITH_CORRUPT_BLOCKS;
} else if (curReplicas == 1) {
@ -218,11 +226,12 @@ private int getPriorityStriped(int curReplicas, int decommissionedReplicas,
*/
synchronized boolean add(BlockInfo block,
int curReplicas,
int readOnlyReplicas,
int decomissionedReplicas,
int expectedReplicas) {
assert curReplicas >= 0 : "Negative replicas!";
int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
expectedReplicas);
final int priLevel = getPriority(block, curReplicas, readOnlyReplicas,
decomissionedReplicas, expectedReplicas);
if(priorityQueues.get(priLevel).add(block)) {
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
expectedReplicas == 1) {
@ -242,11 +251,11 @@ synchronized boolean add(BlockInfo block,
/** remove a block from a under replication queue */
synchronized boolean remove(BlockInfo block,
int oldReplicas,
int oldReadOnlyReplicas,
int decommissionedReplicas,
int oldExpectedReplicas) {
int priLevel = getPriority(block, oldReplicas,
decommissionedReplicas,
oldExpectedReplicas);
final int priLevel = getPriority(block, oldReplicas, oldReadOnlyReplicas,
decommissionedReplicas, oldExpectedReplicas);
boolean removedBlock = remove(block, priLevel);
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
oldExpectedReplicas == 1 &&
@ -285,10 +294,10 @@ boolean remove(BlockInfo block, int priLevel) {
// Try to remove the block from all queues if the block was
// not found in the queue for the given priority level.
for (int i = 0; i < LEVEL; i++) {
if (priorityQueues.get(i).remove(block)) {
if (i != priLevel && priorityQueues.get(i).remove(block)) {
NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block" +
" {} from priority queue {}", block, priLevel);
" {} from priority queue {}", block, i);
return true;
}
}
@ -313,15 +322,15 @@ boolean remove(BlockInfo block, int priLevel) {
* @param expectedReplicasDelta the change in the expected replica count from before
*/
synchronized void update(BlockInfo block, int curReplicas,
int decommissionedReplicas,
int readOnlyReplicas, int decommissionedReplicas,
int curExpectedReplicas,
int curReplicasDelta, int expectedReplicasDelta) {
int oldReplicas = curReplicas-curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
int curPri = getPriority(block, curReplicas, decommissionedReplicas,
curExpectedReplicas);
int oldPri = getPriority(block, oldReplicas, decommissionedReplicas,
oldExpectedReplicas);
int curPri = getPriority(block, curReplicas, readOnlyReplicas,
decommissionedReplicas, curExpectedReplicas);
int oldPri = getPriority(block, oldReplicas, readOnlyReplicas,
decommissionedReplicas, oldExpectedReplicas);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " +
block +
@ -371,143 +380,69 @@ synchronized void update(BlockInfo block, int curReplicas,
* @return Return a list of block lists to be replicated. The block list index
* represents its replication priority.
*/
public synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks(
synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks(
int blocksToProcess) {
// initialize data structure for the return value
List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL);
for (int i = 0; i < LEVEL; i++) {
blocksToReplicate.add(new ArrayList<BlockInfo>());
}
if (size() == 0) { // There are no blocks to collect.
return blocksToReplicate;
}
final List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL);
int blockCount = 0;
for (int priority = 0; priority < LEVEL; priority++) {
// Go through all blocks that need replications with current priority.
BlockIterator neededReplicationsIterator = iterator(priority);
// Set the iterator to the first unprocessed block at this priority level.
neededReplicationsIterator.setToBookmark();
int count = 0;
int priority = 0;
for (; count < blocksToProcess && priority < LEVEL; priority++) {
if (priority == QUEUE_WITH_CORRUPT_BLOCKS) {
// do not choose corrupted blocks.
continue;
}
blocksToProcess = Math.min(blocksToProcess, size());
if (blockCount == blocksToProcess) {
break; // break if already expected blocks are obtained
}
// Go through all blocks that need replications with current priority.
// Set the iterator to the first unprocessed block at this priority level.
final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
final List<BlockInfo> blocks = new LinkedList<>();
blocksToReplicate.add(blocks);
// Loop through all remaining blocks in the list.
while (blockCount < blocksToProcess
&& neededReplicationsIterator.hasNext()) {
BlockInfo block = neededReplicationsIterator.next();
blocksToReplicate.get(priority).add(block);
blockCount++;
}
if (!neededReplicationsIterator.hasNext()
&& neededReplicationsIterator.getPriority() == LEVEL - 1) {
// Reset all priorities' bookmarks to the beginning because there were
// no recently added blocks in any list.
for (int i = 0; i < LEVEL; i++) {
this.priorityQueues.get(i).resetBookmark();
}
break;
for(; count < blocksToProcess && i.hasNext(); count++) {
blocks.add(i.next());
}
}
if (priority == LEVEL) {
// Reset all bookmarks because there were no recently added blocks.
for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) {
q.resetBookmark();
}
}
return blocksToReplicate;
}
/** returns an iterator of all blocks in a given priority queue */
synchronized BlockIterator iterator(int level) {
return new BlockIterator(level);
synchronized Iterator<BlockInfo> iterator(int level) {
return priorityQueues.get(level).iterator();
}
/** return an iterator of all the under replication blocks */
@Override
public synchronized BlockIterator iterator() {
return new BlockIterator();
}
public synchronized Iterator<BlockInfo> iterator() {
final Iterator<LightWeightLinkedSet<BlockInfo>> q = priorityQueues.iterator();
return new Iterator<BlockInfo>() {
private Iterator<BlockInfo> b = q.next().iterator();
/**
* An iterator over blocks.
*/
class BlockIterator implements Iterator<BlockInfo> {
private int level;
private boolean isIteratorForLevel = false;
private final List<Iterator<BlockInfo>> iterators = new ArrayList<>();
/**
* Construct an iterator over all queues.
*/
private BlockIterator() {
level=0;
for(int i=0; i<LEVEL; i++) {
iterators.add(priorityQueues.get(i).iterator());
@Override
public BlockInfo next() {
hasNext();
return b.next();
}
}
/**
* Constrict an iterator for a single queue level
* @param l the priority level to iterate over
*/
private BlockIterator(int l) {
level = l;
isIteratorForLevel = true;
iterators.add(priorityQueues.get(level).iterator());
}
private void update() {
if (isIteratorForLevel) {
return;
}
while(level< LEVEL-1 && !iterators.get(level).hasNext()) {
level++;
}
}
@Override
public BlockInfo next() {
if (isIteratorForLevel) {
return iterators.get(0).next();
}
update();
return iterators.get(level).next();
}
@Override
public boolean hasNext() {
if (isIteratorForLevel) {
return iterators.get(0).hasNext();
}
update();
return iterators.get(level).hasNext();
}
@Override
public void remove() {
if (isIteratorForLevel) {
iterators.get(0).remove();
} else {
iterators.get(level).remove();
}
}
int getPriority() {
return level;
}
/**
* Sets iterator(s) to bookmarked elements.
*/
private synchronized void setToBookmark() {
if (this.isIteratorForLevel) {
this.iterators.set(0, priorityQueues.get(this.level)
.getBookmark());
} else {
for(int i=0; i<LEVEL; i++) {
this.iterators.set(i, priorityQueues.get(i).getBookmark());
@Override
public boolean hasNext() {
for(; !b.hasNext() && q.hasNext(); ) {
b = q.next().iterator();
}
return b.hasNext();
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}

View File

@ -836,7 +836,7 @@ public void testReplicationWithPriority() throws Exception {
// Adding the blocks directly to normal priority
neededReplications.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 2, 0, 3);
nextLong()), 2, 0, 0, 3);
}
// Lets wait for the replication interval, to start process normal
// priority blocks
@ -844,7 +844,7 @@ public void testReplicationWithPriority() throws Exception {
// Adding the block directly to high priority list
neededReplications.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 1, 0, 3);
nextLong()), 1, 0, 0, 3);
// Lets wait for the replication interval
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
@ -868,23 +868,23 @@ public void testChooseUnderReplicatedBlocks() throws Exception {
for (int i = 0; i < 5; i++) {
// Adding QUEUE_HIGHEST_PRIORITY block
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 1, 0, 3);
nextLong()), 1, 0, 0, 3);
// Adding QUEUE_VERY_UNDER_REPLICATED block
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 2, 0, 7);
nextLong()), 2, 0, 0, 7);
// Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 6, 0, 6);
nextLong()), 6, 0, 0, 6);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 5, 0, 6);
nextLong()), 5, 0, 0, 6);
// Adding QUEUE_WITH_CORRUPT_BLOCKS block
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 0, 0, 3);
nextLong()), 0, 0, 0, 3);
}
// Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks
@ -902,13 +902,12 @@ public void testChooseUnderReplicatedBlocks() throws Exception {
// Adding QUEUE_HIGHEST_PRIORITY
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 1, 0, 3);
nextLong()), 0, 1, 0, 3);
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from
// QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED
// and 5 blocks from QUEUE_WITH_CORRUPT_BLOCKS.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4, 5);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4);
// Since it is reached to end of all lists,
// should start picking the blocks from start.
@ -920,29 +919,15 @@ public void testChooseUnderReplicatedBlocks() throws Exception {
/** asserts the chosen blocks with expected priority blocks */
private void assertTheChosenBlocks(
List<List<BlockInfo>> chosenBlocks, int firstPrioritySize,
int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize,
int fifthPrioritySize) {
assertEquals(
"Not returned the expected number of QUEUE_HIGHEST_PRIORITY blocks",
firstPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).size());
assertEquals(
"Not returned the expected number of QUEUE_VERY_UNDER_REPLICATED blocks",
secondPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).size());
assertEquals(
"Not returned the expected number of QUEUE_UNDER_REPLICATED blocks",
thirdPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).size());
assertEquals(
"Not returned the expected number of QUEUE_REPLICAS_BADLY_DISTRIBUTED blocks",
fourthPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_REPLICAS_BADLY_DISTRIBUTED).size());
assertEquals(
"Not returned the expected number of QUEUE_WITH_CORRUPT_BLOCKS blocks",
fifthPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size());
List<List<BlockInfo>> chosenBlocks, int... expectedSizes) {
int i = 0;
for(; i < chosenBlocks.size(); i++) {
assertEquals("Not returned the expected number for i=" + i,
expectedSizes[i], chosenBlocks.get(i).size());
}
for(; i < expectedSizes.length; i++) {
assertEquals("Expected size is non-zero for i=" + i, 0, expectedSizes[i]);
}
}
/**
@ -1101,14 +1086,14 @@ public void testUpdateDoesNotCauseSkippedReplication() {
// Adding QUEUE_VERY_UNDER_REPLICATED block
final int block1CurReplicas = 2;
final int block1ExpectedReplicas = 7;
underReplicatedBlocks.add(block1, block1CurReplicas, 0,
underReplicatedBlocks.add(block1, block1CurReplicas, 0, 0,
block1ExpectedReplicas);
// Adding QUEUE_VERY_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 2, 0, 7);
underReplicatedBlocks.add(block2, 2, 0, 0, 7);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block3, 2, 0, 6);
underReplicatedBlocks.add(block3, 2, 0, 0, 6);
List<List<BlockInfo>> chosenBlocks;
@ -1119,7 +1104,7 @@ public void testUpdateDoesNotCauseSkippedReplication() {
// Increasing the replications will move the block down a
// priority. This simulates a replica being completed in between checks.
underReplicatedBlocks.update(block1, block1CurReplicas+1, 0,
underReplicatedBlocks.update(block1, block1CurReplicas+1, 0, 0,
block1ExpectedReplicas, 1, 0);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
@ -1147,10 +1132,10 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 1, 1);
underReplicatedBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 1, 1);
underReplicatedBlocks.add(block2, 0, 0, 1, 1);
List<List<BlockInfo>> chosenBlocks;
@ -1205,10 +1190,10 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
BlockInfo block2 = genBlockInfo(blkID2);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 1, 1);
underReplicatedBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 1, 1);
underReplicatedBlocks.add(block2, 0, 0, 1, 1);
List<List<BlockInfo>> chosenBlocks;
@ -1268,10 +1253,10 @@ public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 1, 1);
underReplicatedBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 1, 1);
underReplicatedBlocks.add(block2, 0, 0, 1, 1);
List<List<BlockInfo>> chosenBlocks;

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Iterator;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@ -64,7 +66,7 @@ public void testBlockPriorities() throws Throwable {
assertEquals(1, queues.size());
assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
//repeated additions fail
assertFalse(queues.add(block1, 1, 0, 3));
assertFalse(queues.add(block1, 1, 0, 0, 3));
//add a second block with two replicas
assertAdded(queues, block2, 2, 0, 3);
@ -88,11 +90,11 @@ public void testBlockPriorities() throws Throwable {
assertAdded(queues, block_corrupt_repl_one, 0, 0, 1);
assertEquals(2, queues.getCorruptBlockSize());
assertEquals(1, queues.getCorruptReplOneBlockSize());
queues.update(block_corrupt_repl_one, 0, 0, 3, 0, 2);
queues.update(block_corrupt_repl_one, 0, 0, 0, 3, 0, 2);
assertEquals(0, queues.getCorruptReplOneBlockSize());
queues.update(block_corrupt, 0, 0, 1, 0, -2);
queues.update(block_corrupt, 0, 0, 0, 1, 0, -2);
assertEquals(1, queues.getCorruptReplOneBlockSize());
queues.update(block_very_under_replicated, 0, 0, 1, -4, -24);
queues.update(block_very_under_replicated, 0, 0, 0, 1, -4, -24);
assertEquals(2, queues.getCorruptReplOneBlockSize());
}
@ -151,7 +153,7 @@ private void assertAdded(UnderReplicatedBlocks queues,
int expectedReplicas) {
assertTrue("Failed to add " + block,
queues.add(block,
curReplicas,
curReplicas, 0,
decomissionedReplicas,
expectedReplicas));
}
@ -169,7 +171,7 @@ private void assertAdded(UnderReplicatedBlocks queues,
private void assertInLevel(UnderReplicatedBlocks queues,
Block block,
int level) {
UnderReplicatedBlocks.BlockIterator bi = queues.iterator(level);
final Iterator<BlockInfo> bi = queues.iterator(level);
while (bi.hasNext()) {
Block next = bi.next();
if (block.equals(next)) {