HDFS-15086. Block scheduled counter never get decremet if the block got deleted before replication. Contributed by hemanthboyina.

This commit is contained in:
Surendra Singh Lilhore 2020-02-13 16:30:19 +05:30
parent f1b1b332f5
commit a98352ced1
5 changed files with 146 additions and 35 deletions

View File

@ -83,6 +83,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingReconstructionBlocks.PendingBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
@ -1112,15 +1113,15 @@ public void addExpectedReplicasToPending(BlockInfo blk) {
DatanodeStorageInfo[] expectedStorages =
blk.getUnderConstructionFeature().getExpectedStorageLocations();
if (expectedStorages.length - blk.numNodes() > 0) {
ArrayList<DatanodeDescriptor> pendingNodes = new ArrayList<>();
ArrayList<DatanodeStorageInfo> pendingNodes = new ArrayList<>();
for (DatanodeStorageInfo storage : expectedStorages) {
DatanodeDescriptor dnd = storage.getDatanodeDescriptor();
if (blk.findStorageInfo(dnd) == null) {
pendingNodes.add(dnd);
pendingNodes.add(storage);
}
}
pendingReconstruction.increment(blk,
pendingNodes.toArray(new DatanodeDescriptor[pendingNodes.size()]));
pendingNodes.toArray(new DatanodeStorageInfo[pendingNodes.size()]));
}
}
}
@ -2170,8 +2171,7 @@ private boolean validateReconstructionWork(BlockReconstructionWork rw) {
// Move the block-replication into a "pending" state.
// The reason we use 'pending' is so we can retry
// reconstructions that fail after an appropriate amount of time.
pendingReconstruction.increment(block,
DatanodeStorageInfo.toDatanodeDescriptors(targets));
pendingReconstruction.increment(block, targets);
blockLog.debug("BLOCK* block {} is moved from neededReconstruction to "
+ "pendingReconstruction", block);
@ -4084,7 +4084,7 @@ public void addBlock(DatanodeStorageInfo storageInfo, Block block,
BlockInfo storedBlock = getStoredBlock(block);
if (storedBlock != null &&
block.getGenerationStamp() == storedBlock.getGenerationStamp()) {
if (pendingReconstruction.decrement(storedBlock, node)) {
if (pendingReconstruction.decrement(storedBlock, storageInfo)) {
NameNode.getNameNodeMetrics().incSuccessfulReReplications();
}
}
@ -4499,7 +4499,11 @@ public void removeBlock(BlockInfo block) {
addToInvalidates(block);
removeBlockFromMap(block);
// Remove the block from pendingReconstruction and neededReconstruction
pendingReconstruction.remove(block);
PendingBlockInfo remove = pendingReconstruction.remove(block);
if (remove != null) {
DatanodeStorageInfo.decrementBlocksScheduled(remove.getTargets()
.toArray(new DatanodeStorageInfo[remove.getTargets().size()]));
}
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
postponedMisreplicatedBlocks.remove(block);
}

View File

@ -1713,8 +1713,24 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
numReplicationTasks);
if (pendingList != null && !pendingList.isEmpty()) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList));
// If the block is deleted, the block size will become
// BlockCommand.NO_ACK (LONG.MAX_VALUE) . This kind of block we don't
// need
// to send for replication or reconstruction
Iterator<BlockTargetPair> iterator = pendingList.iterator();
while (iterator.hasNext()) {
BlockTargetPair cmd = iterator.next();
if (cmd.block != null
&& cmd.block.getNumBytes() == BlockCommand.NO_ACK) {
// block deleted
DatanodeStorageInfo.decrementBlocksScheduled(cmd.targets);
iterator.remove();
}
}
if (!pendingList.isEmpty()) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList));
}
}
// check pending erasure coding tasks
List<BlockECReconstructionInfo> pendingECList = nodeinfo

View File

@ -81,7 +81,7 @@ void start() {
* @param block The corresponding block
* @param targets The DataNodes where replicas of the block should be placed
*/
void increment(BlockInfo block, DatanodeDescriptor... targets) {
void increment(BlockInfo block, DatanodeStorageInfo... targets) {
synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block);
if (found == null) {
@ -101,7 +101,7 @@ void increment(BlockInfo block, DatanodeDescriptor... targets) {
* @param dn The DataNode that finishes the reconstruction
* @return true if the block is decremented to 0 and got removed.
*/
boolean decrement(BlockInfo block, DatanodeDescriptor dn) {
boolean decrement(BlockInfo block, DatanodeStorageInfo dn) {
boolean removed = false;
synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block);
@ -124,9 +124,9 @@ boolean decrement(BlockInfo block, DatanodeDescriptor dn) {
* The given block whose pending reconstruction requests need to be
* removed
*/
void remove(BlockInfo block) {
PendingBlockInfo remove(BlockInfo block) {
synchronized (pendingReconstructions) {
pendingReconstructions.remove(block);
return pendingReconstructions.remove(block);
}
}
@ -200,11 +200,11 @@ BlockInfo[] getTimedOutBlocks() {
*/
static class PendingBlockInfo {
private long timeStamp;
private final List<DatanodeDescriptor> targets;
private final List<DatanodeStorageInfo> targets;
PendingBlockInfo(DatanodeDescriptor[] targets) {
PendingBlockInfo(DatanodeStorageInfo[] targets) {
this.timeStamp = monotonicNow();
this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()
this.targets = targets == null ? new ArrayList<DatanodeStorageInfo>()
: new ArrayList<>(Arrays.asList(targets));
}
@ -216,9 +216,9 @@ void setTimeStamp() {
timeStamp = monotonicNow();
}
void incrementReplicas(DatanodeDescriptor... newTargets) {
void incrementReplicas(DatanodeStorageInfo... newTargets) {
if (newTargets != null) {
for (DatanodeDescriptor newTarget : newTargets) {
for (DatanodeStorageInfo newTarget : newTargets) {
if (!targets.contains(newTarget)) {
targets.add(newTarget);
}
@ -226,13 +226,23 @@ void incrementReplicas(DatanodeDescriptor... newTargets) {
}
}
void decrementReplicas(DatanodeDescriptor dn) {
targets.remove(dn);
void decrementReplicas(DatanodeStorageInfo dn) {
Iterator<DatanodeStorageInfo> iterator = targets.iterator();
while (iterator.hasNext()) {
DatanodeStorageInfo next = iterator.next();
if (next.getDatanodeDescriptor() == dn.getDatanodeDescriptor()) {
iterator.remove();
}
}
}
int getNumReplicas() {
return targets.size();
}
List<DatanodeStorageInfo> getTargets() {
return targets;
}
}
/*
@ -318,4 +328,14 @@ void metaSave(PrintWriter out) {
}
}
}
List<DatanodeStorageInfo> getTargets(BlockInfo block) {
synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block);
if (found != null) {
return found.targets;
}
}
return null;
}
}

View File

@ -22,11 +22,19 @@
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.junit.After;
import org.junit.Test;
@ -129,4 +137,69 @@ public void testScheduledBlocksCounterShouldDecrementOnAbandonBlock()
0, descriptor.getBlocksScheduled());
}
}
}
/**
* Test if Block Scheduled counter decrement if scheduled blocks file is.
* deleted
* @throws Exception
*/
@Test
public void testScheduledBlocksCounterDecrementOnDeletedBlock()
throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build();
cluster.waitActive();
BlockManager bm = cluster.getNamesystem().getBlockManager();
try {
DistributedFileSystem dfs = cluster.getFileSystem();
// 1. create a file
Path filePath = new Path("/tmp.txt");
DFSTestUtil.createFile(dfs, filePath, 1024, (short) 3, 0L);
// 2. disable the heartbeats
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
DatanodeManager datanodeManager =
cluster.getNamesystem().getBlockManager().getDatanodeManager();
ArrayList<DatanodeDescriptor> dnList =
new ArrayList<DatanodeDescriptor>();
datanodeManager.fetchDatanodes(dnList, dnList, false);
// 3. mark a couple of blocks as corrupt
LocatedBlock block = NameNodeAdapter
.getBlockLocations(cluster.getNameNode(), filePath.toString(), 0, 1)
.get(0);
DatanodeInfo[] locs = block.getLocations();
cluster.getNamesystem().writeLock();
try {
bm.findAndMarkBlockAsCorrupt(block.getBlock(), locs[0], "STORAGE_ID",
"TEST");
bm.findAndMarkBlockAsCorrupt(block.getBlock(), locs[1], "STORAGE_ID",
"TEST");
BlockManagerTestUtil.computeAllPendingWork(bm);
BlockManagerTestUtil.updateState(bm);
assertEquals(1L, bm.getPendingReconstructionBlocksCount());
} finally {
cluster.getNamesystem().writeUnlock();
}
// 4. delete the file
dfs.delete(filePath, true);
int blocksScheduled = 0;
for (DatanodeDescriptor descriptor : dnList) {
if (descriptor.getBlocksScheduled() != 0) {
blocksScheduled += descriptor.getBlocksScheduled();
}
}
assertEquals(0, blocksScheduled);
} finally {
cluster.shutdown();
}
}
}

View File

@ -89,8 +89,7 @@ public void testPendingReconstruction() {
BlockInfo block = genBlockInfo(i, i, 0);
DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i];
System.arraycopy(storages, 0, targets, 0, i);
pendingReconstructions.increment(block,
DatanodeStorageInfo.toDatanodeDescriptors(targets));
pendingReconstructions.increment(block, targets);
}
assertEquals("Size of pendingReconstruction ",
10, pendingReconstructions.size());
@ -100,25 +99,24 @@ public void testPendingReconstruction() {
// remove one item
//
BlockInfo blk = genBlockInfo(8, 8, 0);
pendingReconstructions.decrement(blk, storages[7].getDatanodeDescriptor()); // removes one replica
pendingReconstructions.decrement(blk, storages[7]); // removes one replica
assertEquals("pendingReconstructions.getNumReplicas ",
7, pendingReconstructions.getNumReplicas(blk));
//
// insert the same item twice should be counted as once
//
pendingReconstructions.increment(blk, storages[0].getDatanodeDescriptor());
pendingReconstructions.increment(blk, storages[0]);
assertEquals("pendingReconstructions.getNumReplicas ",
7, pendingReconstructions.getNumReplicas(blk));
for (int i = 0; i < 7; i++) {
// removes all replicas
pendingReconstructions.decrement(blk, storages[i].getDatanodeDescriptor());
pendingReconstructions.decrement(blk, storages[i]);
}
assertTrue(pendingReconstructions.size() == 9);
pendingReconstructions.increment(blk,
DatanodeStorageInfo.toDatanodeDescriptors(
DFSTestUtil.createDatanodeStorageInfos(8)));
DFSTestUtil.createDatanodeStorageInfos(8));
assertTrue(pendingReconstructions.size() == 10);
//
@ -148,8 +146,7 @@ public void testPendingReconstruction() {
for (int i = 10; i < 15; i++) {
BlockInfo block = genBlockInfo(i, i, 0);
pendingReconstructions.increment(block,
DatanodeStorageInfo.toDatanodeDescriptors(
DFSTestUtil.createDatanodeStorageInfos(i)));
DFSTestUtil.createDatanodeStorageInfos(i));
}
assertEquals(15, pendingReconstructions.size());
assertEquals(0L, pendingReconstructions.getNumTimedOuts());
@ -217,8 +214,7 @@ public void testProcessPendingReconstructions() throws Exception {
blockInfo = new BlockInfoContiguous(block, (short) 3);
pendingReconstruction.increment(blockInfo,
DatanodeStorageInfo.toDatanodeDescriptors(
DFSTestUtil.createDatanodeStorageInfos(1)));
DFSTestUtil.createDatanodeStorageInfos(1));
BlockCollection bc = Mockito.mock(BlockCollection.class);
// Place into blocksmap with GenerationStamp = 1
blockInfo.setGenerationStamp(1);
@ -234,8 +230,7 @@ public void testProcessPendingReconstructions() throws Exception {
block = new Block(2, 2, 0);
blockInfo = new BlockInfoContiguous(block, (short) 3);
pendingReconstruction.increment(blockInfo,
DatanodeStorageInfo.toDatanodeDescriptors(
DFSTestUtil.createDatanodeStorageInfos(1)));
DFSTestUtil.createDatanodeStorageInfos(1));
// verify 2 blocks in pendingReconstructions
assertEquals("Size of pendingReconstructions ", 2,
@ -281,7 +276,8 @@ public void testProcessPendingReconstructions() throws Exception {
getDatanodes().iterator().next() };
// Add a stored block to the pendingReconstruction.
pendingReconstruction.increment(storedBlock, desc);
pendingReconstruction.increment(blockInfo,
DFSTestUtil.createDatanodeStorageInfos(1));
assertEquals("Size of pendingReconstructions ", 1,
pendingReconstruction.size());
@ -310,6 +306,8 @@ public void testProcessPendingReconstructions() throws Exception {
fsn.writeUnlock();
}
GenericTestUtils.waitFor(() -> pendingReconstruction.size() == 0, 500,
10000);
// The pending queue should be empty.
assertEquals("Size of pendingReconstructions ", 0,
pendingReconstruction.size());