HDFS-15622. Deleted blocks linger in the replications queue. Contributed by Ahmed Hussein.

(cherry picked from commit da1b6e3cc286db00b385f3280627d2b2063b4e59)
This commit is contained in:
Kihwal Lee 2020-10-22 19:41:38 -05:00
parent c40f0f1eb3
commit 02709cb054
3 changed files with 82 additions and 14 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -500,6 +501,8 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
* the block count is met or iteration reaches the end of the lowest priority * the block count is met or iteration reaches the end of the lowest priority
* list, in which case bookmarks for each block list are reset to the heads * list, in which case bookmarks for each block list are reset to the heads
* of their respective lists. * of their respective lists.
* If a block is deleted (has invalid bcId), it will be removed from the low
* redundancy queues.
* *
* @param blocksToProcess - number of blocks to fetch from low redundancy * @param blocksToProcess - number of blocks to fetch from low redundancy
* blocks. * blocks.
@ -515,21 +518,32 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
int count = 0; int count = 0;
int priority = 0; int priority = 0;
HashSet<BlockInfo> toRemove = new HashSet<>();
for (; count < blocksToProcess && priority < LEVEL; priority++) { for (; count < blocksToProcess && priority < LEVEL; priority++) {
if (priority == QUEUE_WITH_CORRUPT_BLOCKS) {
// do not choose corrupted blocks.
continue;
}
// Go through all blocks that need reconstructions with current priority. // Go through all blocks that need reconstructions with current priority.
// Set the iterator to the first unprocessed block at this priority level // Set the iterator to the first unprocessed block at this priority level
// We do not want to skip QUEUE_WITH_CORRUPT_BLOCKS because we still need
// to look for deleted blocks if any.
final boolean inCorruptLevel = (QUEUE_WITH_CORRUPT_BLOCKS == priority);
final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark(); final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
final List<BlockInfo> blocks = new LinkedList<>(); final List<BlockInfo> blocks = new LinkedList<>();
if (!inCorruptLevel) {
blocksToReconstruct.add(blocks); blocksToReconstruct.add(blocks);
// Loop through all remaining blocks in the list.
for(; count < blocksToProcess && i.hasNext(); count++) {
blocks.add(i.next());
} }
for(; count < blocksToProcess && i.hasNext(); count++) {
BlockInfo block = i.next();
if (block.isDeleted()) {
toRemove.add(block);
continue;
}
if (!inCorruptLevel) {
blocks.add(block);
}
}
for (BlockInfo bInfo : toRemove) {
remove(bInfo, priority);
}
toRemove.clear();
} }
if (priority == LEVEL || resetIterators) { if (priority == LEVEL || resetIterators) {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -41,6 +42,7 @@ import static org.junit.Assert.fail;
public class TestLowRedundancyBlockQueues { public class TestLowRedundancyBlockQueues {
private final ErasureCodingPolicy ecPolicy; private final ErasureCodingPolicy ecPolicy;
private static AtomicLong mockINodeId = new AtomicLong(0);
public TestLowRedundancyBlockQueues(ErasureCodingPolicy policy) { public TestLowRedundancyBlockQueues(ErasureCodingPolicy policy) {
ecPolicy = policy; ecPolicy = policy;
@ -52,7 +54,15 @@ public class TestLowRedundancyBlockQueues {
} }
private BlockInfo genBlockInfo(long id) { private BlockInfo genBlockInfo(long id) {
return new BlockInfoContiguous(new Block(id), (short) 3); return genBlockInfo(id, false);
}
private BlockInfo genBlockInfo(long id, boolean isCorruptBlock) {
BlockInfo bInfo = new BlockInfoContiguous(new Block(id), (short) 3);
if (!isCorruptBlock) {
bInfo.setBlockCollectionId(mockINodeId.incrementAndGet());
}
return bInfo;
} }
private BlockInfo genStripedBlockInfo(long id, long numBytes) { private BlockInfo genStripedBlockInfo(long id, long numBytes) {
@ -93,6 +103,41 @@ public class TestLowRedundancyBlockQueues {
queues.getHighestPriorityECBlockCount()); queues.getHighestPriorityECBlockCount());
} }
/**
* Tests that deleted blocks should not be returned by
* {@link LowRedundancyBlocks#chooseLowRedundancyBlocks(int, boolean)}.
* @throws Exception
*/
@Test
public void testDeletedBlocks() throws Exception {
int numBlocks = 5;
LowRedundancyBlocks queues = new LowRedundancyBlocks();
// create 5 blockinfos. The first one is corrupt.
for (int ind = 0; ind < numBlocks; ind++) {
BlockInfo blockInfo = genBlockInfo(ind, ind == 0);
queues.add(blockInfo, 2, 0, 0, 3);
}
List<List<BlockInfo>> blocks;
// Get two blocks from the queue, but we should only get one because first
// block is deleted.
blocks = queues.chooseLowRedundancyBlocks(2, false);
assertEquals(1, blocks.get(2).size());
assertEquals(1, blocks.get(2).get(0).getBlockId());
// Get the next blocks - should be ID 2
blocks = queues.chooseLowRedundancyBlocks(1, false);
assertEquals(2, blocks.get(2).get(0).getBlockId());
// Get the next block, but also reset this time - should be ID 3 returned
blocks = queues.chooseLowRedundancyBlocks(1, true);
assertEquals(3, blocks.get(2).get(0).getBlockId());
// Get one more block and due to resetting the queue it will be block id 1
blocks = queues.chooseLowRedundancyBlocks(1, false);
assertEquals(1, blocks.get(2).get(0).getBlockId());
}
@Test @Test
public void testQueuePositionCanBeReset() throws Throwable { public void testQueuePositionCanBeReset() throws Throwable {
LowRedundancyBlocks queues = new LowRedundancyBlocks(); LowRedundancyBlocks queues = new LowRedundancyBlocks();

View File

@ -38,6 +38,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.AddBlockFlag;
@ -82,7 +83,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
// The interval for marking a datanode as stale, // The interval for marking a datanode as stale,
private static final long staleInterval = private static final long staleInterval =
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT; DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
private static AtomicLong mockINodeId = new AtomicLong(0);
@Rule @Rule
public ExpectedException exception = ExpectedException.none(); public ExpectedException exception = ExpectedException.none();
@ -825,7 +826,15 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
} }
private BlockInfo genBlockInfo(long id) { private BlockInfo genBlockInfo(long id) {
return new BlockInfoContiguous(new Block(id), (short) 3); return genBlockInfo(id, false);
}
private BlockInfo genBlockInfo(long id, boolean isBlockCorrupted) {
BlockInfo bInfo = new BlockInfoContiguous(new Block(id), (short) 3);
if (!isBlockCorrupted) {
bInfo.setBlockCollectionId(mockINodeId.incrementAndGet());
}
return bInfo;
} }
/** /**
@ -848,7 +857,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
// Adding the blocks directly to normal priority // Adding the blocks directly to normal priority
neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current(). neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 2, 0, 0, 3); nextLong(), true), 2, 0, 0, 3);
} }
// Lets wait for the replication interval, to start process normal // Lets wait for the replication interval, to start process normal
// priority blocks // priority blocks
@ -856,7 +865,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
// Adding the block directly to high priority list // Adding the block directly to high priority list
neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current(). neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 1, 0, 0, 3); nextLong(), true), 1, 0, 0, 3);
// Lets wait for the replication interval // Lets wait for the replication interval
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL); Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);