HDFS-17093. Fix block report lease issue to avoid missing some storages report. (#5855). Contributed by Yanlei Yu.
Reviewed-by: Shuyan Zhang <zqingchai@gmail.com> Reviewed-by: Xing Lin <linxingnku@gmail.com> Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
28d190b904
commit
b58885624b
@ -2909,7 +2909,7 @@ public class BlockManager implements BlockStatsMXBean {
|
||||
+ "discarded non-initial block report from {}"
|
||||
+ " because namenode still in startup phase",
|
||||
strBlockReportId, fullBrLeaseId, nodeID);
|
||||
blockReportLeaseManager.removeLease(node);
|
||||
removeDNLeaseIfNeeded(node);
|
||||
return !node.hasStaleStorages();
|
||||
}
|
||||
|
||||
@ -2957,6 +2957,23 @@ public class BlockManager implements BlockStatsMXBean {
|
||||
return !node.hasStaleStorages();
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the DN lease only when we have received block reports,
|
||||
* for all storages for a particular DN.
|
||||
*/
|
||||
void removeDNLeaseIfNeeded(DatanodeDescriptor node) {
|
||||
boolean needRemoveLease = true;
|
||||
for (DatanodeStorageInfo sInfo : node.getStorageInfos()) {
|
||||
if (sInfo.getBlockReportCount() == 0) {
|
||||
needRemoveLease = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (needRemoveLease) {
|
||||
blockReportLeaseManager.removeLease(node);
|
||||
}
|
||||
}
|
||||
|
||||
public void removeBRLeaseIfNeeded(final DatanodeID nodeID,
|
||||
final BlockReportContext context) throws IOException {
|
||||
namesystem.writeLock();
|
||||
|
@ -23,6 +23,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
@ -269,4 +271,83 @@ public class TestBlockReportLease {
|
||||
}
|
||||
return storageBlockReports;
|
||||
}
|
||||
|
||||
@Test(timeout = 360000)
|
||||
public void testFirstIncompleteBlockReport() throws Exception {
|
||||
HdfsConfiguration conf = new HdfsConfiguration();
|
||||
Random rand = new Random();
|
||||
|
||||
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1).build()) {
|
||||
cluster.waitActive();
|
||||
|
||||
FSNamesystem fsn = cluster.getNamesystem();
|
||||
|
||||
NameNode nameNode = cluster.getNameNode();
|
||||
// Pretend to be in safemode.
|
||||
NameNodeAdapter.enterSafeMode(nameNode, false);
|
||||
|
||||
BlockManager blockManager = fsn.getBlockManager();
|
||||
BlockManager spyBlockManager = spy(blockManager);
|
||||
fsn.setBlockManagerForTesting(spyBlockManager);
|
||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||
|
||||
NamenodeProtocols rpcServer = cluster.getNameNodeRpc();
|
||||
|
||||
// Test based on one DataNode report to Namenode.
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
DatanodeDescriptor datanodeDescriptor = spyBlockManager
|
||||
.getDatanodeManager().getDatanode(dn.getDatanodeId());
|
||||
|
||||
DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId);
|
||||
StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId);
|
||||
|
||||
// Send heartbeat and request full block report lease.
|
||||
HeartbeatResponse hbResponse = rpcServer.sendHeartbeat(
|
||||
dnRegistration, storages, 0, 0, 0, 0, 0, null, true,
|
||||
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
|
||||
|
||||
DelayAnswer delayer = new DelayAnswer(BlockManager.LOG);
|
||||
doAnswer(delayer).when(spyBlockManager).processReport(
|
||||
any(DatanodeStorageInfo.class),
|
||||
any(BlockListAsLongs.class));
|
||||
|
||||
// Trigger sendBlockReport.
|
||||
BlockReportContext brContext = new BlockReportContext(1, 0,
|
||||
rand.nextLong(), hbResponse.getFullBlockReportLeaseId());
|
||||
// Build every storage with 100 blocks for sending report.
|
||||
DatanodeStorage[] datanodeStorages
|
||||
= new DatanodeStorage[storages.length];
|
||||
for (int i = 0; i < storages.length; i++) {
|
||||
datanodeStorages[i] = storages[i].getStorage();
|
||||
StorageBlockReport[] reports = createReports(datanodeStorages, 100);
|
||||
|
||||
// The first multiple send once, simulating the failure of the first report,
|
||||
// only send successfully once.
|
||||
if(i == 0){
|
||||
rpcServer.blockReport(dnRegistration, poolId, reports, brContext);
|
||||
}
|
||||
|
||||
// Send blockReport.
|
||||
DatanodeCommand datanodeCommand = rpcServer.blockReport(dnRegistration, poolId, reports,
|
||||
brContext);
|
||||
|
||||
// Wait until BlockManager calls processReport.
|
||||
delayer.waitForCall();
|
||||
|
||||
// Allow blockreport to proceed.
|
||||
delayer.proceed();
|
||||
|
||||
// Get result, it will not null if process successfully.
|
||||
assertTrue(datanodeCommand instanceof FinalizeCommand);
|
||||
assertEquals(poolId, ((FinalizeCommand)datanodeCommand)
|
||||
.getBlockPoolId());
|
||||
if(i == 0){
|
||||
assertEquals(2, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount());
|
||||
}else{
|
||||
assertEquals(1, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user