HDFS-17044. Set size of non-exist block to NO_ACK when process FBR or IBR to avoid useless report from DataNode. (#5735). Contributed by Haiyang Hu.
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
8a52990150
commit
4bd873b816
@ -3295,8 +3295,11 @@ private BlockInfo processReportedBlock(
|
||||
BlockInfo storedBlock = getStoredBlock(block);
|
||||
if(storedBlock == null) {
|
||||
// If blocksMap does not contain reported block id,
|
||||
// the replica should be removed from the data-node.
|
||||
toInvalidate.add(new Block(block));
|
||||
// The replica should be removed from Datanode, and set NumBytes to BlockCommand.No_ACK to
|
||||
// avoid useless report to NameNode from Datanode when complete to process it.
|
||||
Block invalidateBlock = new Block(block);
|
||||
invalidateBlock.setNumBytes(BlockCommand.NO_ACK);
|
||||
toInvalidate.add(invalidateBlock);
|
||||
return null;
|
||||
}
|
||||
BlockUCState ucState = storedBlock.getBlockUCState();
|
||||
|
@ -346,6 +346,10 @@ public void incrBlocksRemoved(int delta) {
|
||||
blocksRemoved.incr(delta);
|
||||
}
|
||||
|
||||
public long getBlocksRemoved() {
|
||||
return blocksRemoved.value();
|
||||
}
|
||||
|
||||
public void incrBytesWritten(int delta) {
|
||||
bytesWritten.incr(delta);
|
||||
}
|
||||
|
@ -117,6 +117,7 @@
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
@ -2121,4 +2122,83 @@ public void testBlockReportAfterDataNodeRestart() throws Exception {
|
||||
assertEquals(2, locs[0].getHosts().length);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test processing toInvalidate in block reported, if the block not exists need
|
||||
* to set the numBytes of the block to NO_ACK,
|
||||
* the DataNode processing will not report incremental blocks.
|
||||
*/
|
||||
@Test(timeout = 360000)
|
||||
public void testBlockReportSetNoAckBlockToInvalidate() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10);
|
||||
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
||||
try (MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build()) {
|
||||
cluster.waitActive();
|
||||
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
// Write file.
|
||||
Path file = new Path("/test");
|
||||
DFSTestUtil.createFile(fs, file, 10240L, (short)1, 0L);
|
||||
DFSTestUtil.waitReplication(fs, file, (short) 1);
|
||||
LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, file).get(0);
|
||||
DatanodeInfo[] loc = lb.getLocations();
|
||||
assertEquals(1, loc.length);
|
||||
List<DataNode> datanodes = cluster.getDataNodes();
|
||||
assertEquals(1, datanodes.size());
|
||||
DataNode datanode = datanodes.get(0);
|
||||
assertEquals(datanode.getDatanodeUuid(), loc[0].getDatanodeUuid());
|
||||
|
||||
MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name());
|
||||
// Check the IncrementalBlockReportsNumOps of DataNode, it will be 0.
|
||||
assertEquals(1, getLongCounter("IncrementalBlockReportsNumOps", rb));
|
||||
|
||||
// Delete file and remove block.
|
||||
fs.delete(file, false);
|
||||
|
||||
// Wait for the processing of the marked deleted block to complete.
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(blockManager);
|
||||
assertNull(blockManager.getStoredBlock(lb.getBlock().getLocalBlock()));
|
||||
|
||||
// Expire heartbeat on the NameNode,and datanode to be marked dead.
|
||||
datanode.setHeartbeatsDisabledForTests(true);
|
||||
cluster.setDataNodeDead(datanode.getDatanodeId());
|
||||
assertFalse(blockManager.containsInvalidateBlock(loc[0], lb.getBlock().getLocalBlock()));
|
||||
|
||||
// Wait for re-registration and heartbeat.
|
||||
datanode.setHeartbeatsDisabledForTests(false);
|
||||
final DatanodeDescriptor dn1Desc = cluster.getNamesystem(0)
|
||||
.getBlockManager().getDatanodeManager()
|
||||
.getDatanode(datanode.getDatanodeId());
|
||||
GenericTestUtils.waitFor(
|
||||
() -> dn1Desc.isAlive() && dn1Desc.isHeartbeatedSinceRegistration(),
|
||||
100, 5000);
|
||||
|
||||
// Trigger BlockReports and block is not exists,
|
||||
// it will add invalidateBlocks and set block numBytes be NO_ACK.
|
||||
cluster.triggerBlockReports();
|
||||
GenericTestUtils.waitFor(
|
||||
() -> blockManager.containsInvalidateBlock(loc[0], lb.getBlock().getLocalBlock()),
|
||||
100, 1000);
|
||||
|
||||
// Trigger schedule blocks for deletion at datanode.
|
||||
int workCount = blockManager.computeInvalidateWork(1);
|
||||
assertEquals(1, workCount);
|
||||
assertFalse(blockManager.containsInvalidateBlock(loc[0], lb.getBlock().getLocalBlock()));
|
||||
|
||||
// Wait for the blocksRemoved value in DataNode to be 1.
|
||||
GenericTestUtils.waitFor(
|
||||
() -> datanode.getMetrics().getBlocksRemoved() == 1,
|
||||
100, 5000);
|
||||
|
||||
// Trigger immediate deletion report at datanode.
|
||||
cluster.triggerDeletionReports();
|
||||
|
||||
// Delete block numBytes be NO_ACK and will not deletion block report,
|
||||
// so check the IncrementalBlockReportsNumOps of DataNode still 1.
|
||||
assertEquals(1, getLongCounter("IncrementalBlockReportsNumOps", rb));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user