From 5411dc559d5f73e4153e76fdff94a26869c17a37 Mon Sep 17 00:00:00 2001
From: Tsz-Wo Nicholas Sze
Date: Thu, 15 Oct 2015 18:07:09 +0800
Subject: [PATCH] HDFS-9205. Do not schedule corrupt blocks for replication.
(szetszwo)
---
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../server/blockmanagement/BlockManager.java | 35 ++-
.../blockmanagement/DecommissionManager.java | 2 +-
.../blockmanagement/NumberReplicas.java | 18 +-
.../UnderReplicatedBlocks.java | 209 ++++++------------
.../TestReplicationPolicy.java | 71 +++---
.../TestUnderReplicatedBlockQueues.java | 14 +-
7 files changed, 148 insertions(+), 203 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index cddb34012c..a6dc78fe32 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1531,6 +1531,8 @@ Release 2.8.0 - UNRELEASED
HDFS-9188. Make block corruption related tests FsDataset-agnostic. (lei)
+ HDFS-9205. Do not schedule corrupt blocks for replication. (szetszwo)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 418522049f..c7dbbd5db9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -785,6 +785,7 @@ public LocatedBlock convertLastBlockToUnderConstruction(
// Remove block from replication queue.
NumberReplicas replicas = countNodes(lastBlock);
neededReplications.remove(lastBlock, replicas.liveReplicas(),
+ replicas.readOnlyReplicas(),
replicas.decommissionedAndDecommissioning(), getReplication(lastBlock));
pendingReplications.remove(lastBlock);
@@ -1795,6 +1796,7 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
nodesContainingLiveReplicas.clear();
List srcNodes = new ArrayList<>();
int live = 0;
+ int readonly = 0;
int decommissioned = 0;
int decommissioning = 0;
int corrupt = 0;
@@ -1820,6 +1822,9 @@ else if (node.isDecommissionInProgress()) {
nodesContainingLiveReplicas.add(storage);
live += countableReplica;
}
+ if (storage.getState() == State.READ_ONLY_SHARED) {
+ readonly++;
+ }
containingNodes.add(node);
// Check if this replica is corrupt
// If so, do not select the node as src node
@@ -1858,7 +1863,7 @@ else if (node.isDecommissionInProgress()) {
}
}
if(numReplicas != null)
- numReplicas.initialize(live, decommissioned, decommissioning, corrupt,
+ numReplicas.set(live, readonly, decommissioned, decommissioning, corrupt,
excess, 0);
return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]);
}
@@ -1883,7 +1888,7 @@ private void processPendingReplications() {
}
NumberReplicas num = countNodes(timedOutItems[i]);
if (isNeededReplication(bi, num.liveReplicas())) {
- neededReplications.add(bi, num.liveReplicas(),
+ neededReplications.add(bi, num.liveReplicas(), num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(), getReplication(bi));
}
}
@@ -2799,6 +2804,7 @@ private Block addStoredBlock(final BlockInfo block,
short fileReplication = getExpectedReplicaNum(storedBlock);
if (!isNeededReplication(storedBlock, numCurrentReplica)) {
neededReplications.remove(storedBlock, numCurrentReplica,
+ num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(), fileReplication);
} else {
updateNeededReplications(storedBlock, curReplicaDelta, 0);
@@ -3043,8 +3049,8 @@ private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
int numCurrentReplica = num.liveReplicas();
// add to under-replicated queue if need to be
if (isNeededReplication(block, numCurrentReplica)) {
- if (neededReplications.add(block, numCurrentReplica, num
- .decommissionedAndDecommissioning(), expectedReplication)) {
+ if (neededReplications.add(block, numCurrentReplica, num.readOnlyReplicas(),
+ num.decommissionedAndDecommissioning(), expectedReplication)) {
return MisReplicationResult.UNDER_REPLICATED;
}
}
@@ -3583,15 +3589,22 @@ public void processIncrementalBlockReport(final DatanodeID nodeID,
* For a striped block, this includes nodes storing blocks belonging to the
* striped block group.
*/
- public NumberReplicas countNodes(BlockInfo b) {
+ public NumberReplicas countNodes(Block b) {
int decommissioned = 0;
int decommissioning = 0;
int live = 0;
+ int readonly = 0;
int corrupt = 0;
int excess = 0;
int stale = 0;
Collection nodesCorrupt = corruptReplicas.getNodes(b);
- for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
+ for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
+ if (storage.getState() == State.FAILED) {
+ continue;
+ } else if (storage.getState() == State.READ_ONLY_SHARED) {
+ readonly++;
+ continue;
+ }
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
corrupt++;
@@ -3612,7 +3625,8 @@ public NumberReplicas countNodes(BlockInfo b) {
stale++;
}
}
- return new NumberReplicas(live, decommissioned, decommissioning, corrupt, excess, stale);
+ return new NumberReplicas(live, readonly, decommissioned, decommissioning,
+ corrupt, excess, stale);
}
/**
@@ -3765,13 +3779,13 @@ private void updateNeededReplications(final BlockInfo block,
NumberReplicas repl = countNodes(block);
int curExpectedReplicas = getReplication(block);
if (isNeededReplication(block, repl.liveReplicas())) {
- neededReplications.update(block, repl.liveReplicas(), repl
- .decommissionedAndDecommissioning(), curExpectedReplicas,
+ neededReplications.update(block, repl.liveReplicas(), repl.readOnlyReplicas(),
+ repl.decommissionedAndDecommissioning(), curExpectedReplicas,
curReplicasDelta, expectedReplicasDelta);
} else {
int oldReplicas = repl.liveReplicas()-curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
- neededReplications.remove(block, oldReplicas,
+ neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(),
repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
}
} finally {
@@ -3792,6 +3806,7 @@ public void checkReplication(BlockCollection bc) {
final int pending = pendingReplications.getNumReplicas(block);
if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
neededReplications.add(block, n.liveReplicas() + pending,
+ n.readOnlyReplicas(),
n.decommissionedAndDecommissioning(), expected);
} else if (n.liveReplicas() > expected) {
processOverReplicatedBlock(block, expected, null, null);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index 1f1ae091a9..4281035072 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@ -545,7 +545,7 @@ private void processBlocksForDecomInternal(
blockManager.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode.
blockManager.neededReplications.add(block,
- liveReplicas,
+ liveReplicas, num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(),
blockManager.getExpectedReplicaNum(block));
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
index e567bbf3a2..44ae6f6a46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
@@ -23,6 +23,7 @@
*/
public class NumberReplicas {
private int liveReplicas;
+ private int readOnlyReplicas;
// Tracks only the decommissioning replicas
private int decommissioning;
@@ -33,17 +34,18 @@ public class NumberReplicas {
private int replicasOnStaleNodes;
NumberReplicas() {
- initialize(0, 0, 0, 0, 0, 0);
+ this(0, 0, 0, 0, 0, 0, 0);
}
- NumberReplicas(int live, int decommissioned, int decommissioning, int corrupt,
- int excess, int stale) {
- initialize(live, decommissioned, decommissioning, corrupt, excess, stale);
+ NumberReplicas(int live, int readonly, int decommissioned,
+ int decommissioning, int corrupt, int excess, int stale) {
+ set(live, readonly, decommissioned, decommissioning, corrupt, excess, stale);
}
- void initialize(int live, int decommissioned, int decommissioning,
- int corrupt, int excess, int stale) {
+ void set(int live, int readonly, int decommissioned, int decommissioning,
+ int corrupt, int excess, int stale) {
liveReplicas = live;
+ readOnlyReplicas = readonly;
this.decommissioning = decommissioning;
this.decommissioned = decommissioned;
corruptReplicas = corrupt;
@@ -55,6 +57,10 @@ public int liveReplicas() {
return liveReplicas;
}
+ public int readOnlyReplicas() {
+ return readOnlyReplicas;
+ }
+
/**
*
* @return decommissioned replicas + decommissioning replicas
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
index 7e8f479787..d4938c5436 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
@@ -19,9 +19,11 @@
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
-import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
+
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
/**
* Keep prioritized queues of under replicated blocks.
@@ -34,7 +36,7 @@
*
*
* The policy for choosing which priority to give added blocks
- * is implemented in {@link #getPriority(BlockInfo, int, int, int)}.
+ * is implemented in {@link #getPriority(BlockInfo, int, int, int, int)}.
*
* The queue order is as follows:
*
@@ -147,6 +149,7 @@ synchronized boolean contains(BlockInfo block) {
*/
private int getPriority(BlockInfo block,
int curReplicas,
+ int readOnlyReplicas,
int decommissionedReplicas,
int expectedReplicas) {
assert curReplicas >= 0 : "Negative replicas!";
@@ -159,19 +162,24 @@ private int getPriority(BlockInfo block,
return getPriorityStriped(curReplicas, decommissionedReplicas,
sblk.getRealDataBlockNum(), sblk.getParityBlockNum());
} else {
- return getPriorityContiguous(curReplicas, decommissionedReplicas,
- expectedReplicas);
+ return getPriorityContiguous(curReplicas, readOnlyReplicas,
+ decommissionedReplicas, expectedReplicas);
}
}
- private int getPriorityContiguous(int curReplicas, int decommissionedReplicas,
- int expectedReplicas) {
+ private int getPriorityContiguous(int curReplicas, int readOnlyReplicas,
+ int decommissionedReplicas, int expectedReplicas) {
if (curReplicas == 0) {
// If there are zero non-decommissioned replicas but there are
// some decommissioned replicas, then assign them highest priority
if (decommissionedReplicas > 0) {
return QUEUE_HIGHEST_PRIORITY;
}
+ if (readOnlyReplicas > 0) {
+ // only has read-only replicas, highest risk
+ // since the read-only replicas may go down all together.
+ return QUEUE_HIGHEST_PRIORITY;
+ }
//all we have are corrupt blocks
return QUEUE_WITH_CORRUPT_BLOCKS;
} else if (curReplicas == 1) {
@@ -218,11 +226,12 @@ private int getPriorityStriped(int curReplicas, int decommissionedReplicas,
*/
synchronized boolean add(BlockInfo block,
int curReplicas,
+ int readOnlyReplicas,
int decomissionedReplicas,
int expectedReplicas) {
assert curReplicas >= 0 : "Negative replicas!";
- int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
- expectedReplicas);
+ final int priLevel = getPriority(block, curReplicas, readOnlyReplicas,
+ decomissionedReplicas, expectedReplicas);
if(priorityQueues.get(priLevel).add(block)) {
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
expectedReplicas == 1) {
@@ -242,11 +251,11 @@ synchronized boolean add(BlockInfo block,
/** remove a block from a under replication queue */
synchronized boolean remove(BlockInfo block,
int oldReplicas,
+ int oldReadOnlyReplicas,
int decommissionedReplicas,
int oldExpectedReplicas) {
- int priLevel = getPriority(block, oldReplicas,
- decommissionedReplicas,
- oldExpectedReplicas);
+ final int priLevel = getPriority(block, oldReplicas, oldReadOnlyReplicas,
+ decommissionedReplicas, oldExpectedReplicas);
boolean removedBlock = remove(block, priLevel);
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
oldExpectedReplicas == 1 &&
@@ -285,10 +294,10 @@ boolean remove(BlockInfo block, int priLevel) {
// Try to remove the block from all queues if the block was
// not found in the queue for the given priority level.
for (int i = 0; i < LEVEL; i++) {
- if (priorityQueues.get(i).remove(block)) {
+ if (i != priLevel && priorityQueues.get(i).remove(block)) {
NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block" +
- " {} from priority queue {}", block, priLevel);
+ " {} from priority queue {}", block, i);
return true;
}
}
@@ -313,15 +322,15 @@ boolean remove(BlockInfo block, int priLevel) {
* @param expectedReplicasDelta the change in the expected replica count from before
*/
synchronized void update(BlockInfo block, int curReplicas,
- int decommissionedReplicas,
+ int readOnlyReplicas, int decommissionedReplicas,
int curExpectedReplicas,
int curReplicasDelta, int expectedReplicasDelta) {
int oldReplicas = curReplicas-curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
- int curPri = getPriority(block, curReplicas, decommissionedReplicas,
- curExpectedReplicas);
- int oldPri = getPriority(block, oldReplicas, decommissionedReplicas,
- oldExpectedReplicas);
+ int curPri = getPriority(block, curReplicas, readOnlyReplicas,
+ decommissionedReplicas, curExpectedReplicas);
+ int oldPri = getPriority(block, oldReplicas, readOnlyReplicas,
+ decommissionedReplicas, oldExpectedReplicas);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " +
block +
@@ -371,143 +380,69 @@ synchronized void update(BlockInfo block, int curReplicas,
* @return Return a list of block lists to be replicated. The block list index
* represents its replication priority.
*/
- public synchronized List> chooseUnderReplicatedBlocks(
+ synchronized List> chooseUnderReplicatedBlocks(
int blocksToProcess) {
- // initialize data structure for the return value
- List> blocksToReplicate = new ArrayList<>(LEVEL);
- for (int i = 0; i < LEVEL; i++) {
- blocksToReplicate.add(new ArrayList());
- }
-
- if (size() == 0) { // There are no blocks to collect.
- return blocksToReplicate;
- }
+ final List> blocksToReplicate = new ArrayList<>(LEVEL);
- int blockCount = 0;
- for (int priority = 0; priority < LEVEL; priority++) {
- // Go through all blocks that need replications with current priority.
- BlockIterator neededReplicationsIterator = iterator(priority);
- // Set the iterator to the first unprocessed block at this priority level.
- neededReplicationsIterator.setToBookmark();
+ int count = 0;
+ int priority = 0;
+ for (; count < blocksToProcess && priority < LEVEL; priority++) {
+ if (priority == QUEUE_WITH_CORRUPT_BLOCKS) {
+ // do not choose corrupted blocks.
+ continue;
+ }
- blocksToProcess = Math.min(blocksToProcess, size());
-
- if (blockCount == blocksToProcess) {
- break; // break if already expected blocks are obtained
- }
-
+ // Go through all blocks that need replications with current priority.
+ // Set the iterator to the first unprocessed block at this priority level.
+ final Iterator i = priorityQueues.get(priority).getBookmark();
+ final List blocks = new LinkedList<>();
+ blocksToReplicate.add(blocks);
// Loop through all remaining blocks in the list.
- while (blockCount < blocksToProcess
- && neededReplicationsIterator.hasNext()) {
- BlockInfo block = neededReplicationsIterator.next();
- blocksToReplicate.get(priority).add(block);
- blockCount++;
- }
-
- if (!neededReplicationsIterator.hasNext()
- && neededReplicationsIterator.getPriority() == LEVEL - 1) {
- // Reset all priorities' bookmarks to the beginning because there were
- // no recently added blocks in any list.
- for (int i = 0; i < LEVEL; i++) {
- this.priorityQueues.get(i).resetBookmark();
- }
- break;
+ for(; count < blocksToProcess && i.hasNext(); count++) {
+ blocks.add(i.next());
}
}
+
+ if (priority == LEVEL) {
+ // Reset all bookmarks because there were no recently added blocks.
+ for (LightWeightLinkedSet q : priorityQueues) {
+ q.resetBookmark();
+ }
+ }
+
return blocksToReplicate;
}
/** returns an iterator of all blocks in a given priority queue */
- synchronized BlockIterator iterator(int level) {
- return new BlockIterator(level);
+ synchronized Iterator iterator(int level) {
+ return priorityQueues.get(level).iterator();
}
/** return an iterator of all the under replication blocks */
@Override
- public synchronized BlockIterator iterator() {
- return new BlockIterator();
- }
+ public synchronized Iterator iterator() {
+ final Iterator> q = priorityQueues.iterator();
+ return new Iterator() {
+ private Iterator b = q.next().iterator();
- /**
- * An iterator over blocks.
- */
- class BlockIterator implements Iterator {
- private int level;
- private boolean isIteratorForLevel = false;
- private final List> iterators = new ArrayList<>();
-
- /**
- * Construct an iterator over all queues.
- */
- private BlockIterator() {
- level=0;
- for(int i=0; i> chosenBlocks, int firstPrioritySize,
- int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize,
- int fifthPrioritySize) {
- assertEquals(
- "Not returned the expected number of QUEUE_HIGHEST_PRIORITY blocks",
- firstPrioritySize, chosenBlocks.get(
- UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).size());
- assertEquals(
- "Not returned the expected number of QUEUE_VERY_UNDER_REPLICATED blocks",
- secondPrioritySize, chosenBlocks.get(
- UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).size());
- assertEquals(
- "Not returned the expected number of QUEUE_UNDER_REPLICATED blocks",
- thirdPrioritySize, chosenBlocks.get(
- UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).size());
- assertEquals(
- "Not returned the expected number of QUEUE_REPLICAS_BADLY_DISTRIBUTED blocks",
- fourthPrioritySize, chosenBlocks.get(
- UnderReplicatedBlocks.QUEUE_REPLICAS_BADLY_DISTRIBUTED).size());
- assertEquals(
- "Not returned the expected number of QUEUE_WITH_CORRUPT_BLOCKS blocks",
- fifthPrioritySize, chosenBlocks.get(
- UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size());
+ List> chosenBlocks, int... expectedSizes) {
+ int i = 0;
+ for(; i < chosenBlocks.size(); i++) {
+ assertEquals("Not returned the expected number for i=" + i,
+ expectedSizes[i], chosenBlocks.get(i).size());
+ }
+ for(; i < expectedSizes.length; i++) {
+ assertEquals("Expected size is non-zero for i=" + i, 0, expectedSizes[i]);
+ }
}
/**
@@ -1101,14 +1086,14 @@ public void testUpdateDoesNotCauseSkippedReplication() {
// Adding QUEUE_VERY_UNDER_REPLICATED block
final int block1CurReplicas = 2;
final int block1ExpectedReplicas = 7;
- underReplicatedBlocks.add(block1, block1CurReplicas, 0,
+ underReplicatedBlocks.add(block1, block1CurReplicas, 0, 0,
block1ExpectedReplicas);
// Adding QUEUE_VERY_UNDER_REPLICATED block
- underReplicatedBlocks.add(block2, 2, 0, 7);
+ underReplicatedBlocks.add(block2, 2, 0, 0, 7);
// Adding QUEUE_UNDER_REPLICATED block
- underReplicatedBlocks.add(block3, 2, 0, 6);
+ underReplicatedBlocks.add(block3, 2, 0, 0, 6);
List> chosenBlocks;
@@ -1119,7 +1104,7 @@ public void testUpdateDoesNotCauseSkippedReplication() {
// Increasing the replications will move the block down a
// priority. This simulates a replica being completed in between checks.
- underReplicatedBlocks.update(block1, block1CurReplicas+1, 0,
+ underReplicatedBlocks.update(block1, block1CurReplicas+1, 0, 0,
block1ExpectedReplicas, 1, 0);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
@@ -1147,10 +1132,10 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_UNDER_REPLICATED block
- underReplicatedBlocks.add(block1, 0, 1, 1);
+ underReplicatedBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block
- underReplicatedBlocks.add(block2, 0, 1, 1);
+ underReplicatedBlocks.add(block2, 0, 0, 1, 1);
List> chosenBlocks;
@@ -1205,10 +1190,10 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
BlockInfo block2 = genBlockInfo(blkID2);
// Adding QUEUE_UNDER_REPLICATED block
- underReplicatedBlocks.add(block1, 0, 1, 1);
+ underReplicatedBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block
- underReplicatedBlocks.add(block2, 0, 1, 1);
+ underReplicatedBlocks.add(block2, 0, 0, 1, 1);
List> chosenBlocks;
@@ -1268,10 +1253,10 @@ public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_UNDER_REPLICATED block
- underReplicatedBlocks.add(block1, 0, 1, 1);
+ underReplicatedBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block
- underReplicatedBlocks.add(block2, 0, 1, 1);
+ underReplicatedBlocks.add(block2, 0, 0, 1, 1);
List> chosenBlocks;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
index 7cd2e19427..3ad45dfde8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
+import java.util.Iterator;
+
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@@ -64,7 +66,7 @@ public void testBlockPriorities() throws Throwable {
assertEquals(1, queues.size());
assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
//repeated additions fail
- assertFalse(queues.add(block1, 1, 0, 3));
+ assertFalse(queues.add(block1, 1, 0, 0, 3));
//add a second block with two replicas
assertAdded(queues, block2, 2, 0, 3);
@@ -88,11 +90,11 @@ public void testBlockPriorities() throws Throwable {
assertAdded(queues, block_corrupt_repl_one, 0, 0, 1);
assertEquals(2, queues.getCorruptBlockSize());
assertEquals(1, queues.getCorruptReplOneBlockSize());
- queues.update(block_corrupt_repl_one, 0, 0, 3, 0, 2);
+ queues.update(block_corrupt_repl_one, 0, 0, 0, 3, 0, 2);
assertEquals(0, queues.getCorruptReplOneBlockSize());
- queues.update(block_corrupt, 0, 0, 1, 0, -2);
+ queues.update(block_corrupt, 0, 0, 0, 1, 0, -2);
assertEquals(1, queues.getCorruptReplOneBlockSize());
- queues.update(block_very_under_replicated, 0, 0, 1, -4, -24);
+ queues.update(block_very_under_replicated, 0, 0, 0, 1, -4, -24);
assertEquals(2, queues.getCorruptReplOneBlockSize());
}
@@ -151,7 +153,7 @@ private void assertAdded(UnderReplicatedBlocks queues,
int expectedReplicas) {
assertTrue("Failed to add " + block,
queues.add(block,
- curReplicas,
+ curReplicas, 0,
decomissionedReplicas,
expectedReplicas));
}
@@ -169,7 +171,7 @@ private void assertAdded(UnderReplicatedBlocks queues,
private void assertInLevel(UnderReplicatedBlocks queues,
Block block,
int level) {
- UnderReplicatedBlocks.BlockIterator bi = queues.iterator(level);
+ final Iterator bi = queues.iterator(level);
while (bi.hasNext()) {
Block next = bi.next();
if (block.equals(next)) {