HDFS-15998. Fix NullPointException In listOpenFiles (#3036)

Co-authored-by: huhaiyang <huhaiyang@didichuxing.com>
This commit is contained in:
huhaiyang 2021-06-01 15:26:47 +08:00 committed by GitHub
parent 9a0a808338
commit b38b00e528
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 112 additions and 3 deletions

View File

@ -2006,8 +2006,14 @@ public BatchedListEntries<OpenFileEntry> getFilesBlockingDecom(long prevId,
continue; continue;
} }
Preconditions.checkState(ucFile instanceof INodeFile); Preconditions.checkState(ucFile instanceof INodeFile);
openFileIds.add(ucFileId);
INodeFile inodeFile = ucFile.asFile(); INodeFile inodeFile = ucFile.asFile();
if (!inodeFile.isUnderConstruction()) {
LOG.warn("The file {} is not under construction but has lease.",
inodeFile.getFullPathName());
continue;
}
openFileIds.add(ucFileId);
String fullPathName = inodeFile.getFullPathName(); String fullPathName = inodeFile.getFullPathName();
if (org.apache.commons.lang3.StringUtils.isEmpty(path) if (org.apache.commons.lang3.StringUtils.isEmpty(path)

View File

@ -300,8 +300,13 @@ public BatchedListEntries<OpenFileEntry> getUnderConstructionFiles(
Iterator<Long> inodeIdIterator = inodeIds.iterator(); Iterator<Long> inodeIdIterator = inodeIds.iterator();
while (inodeIdIterator.hasNext()) { while (inodeIdIterator.hasNext()) {
Long inodeId = inodeIdIterator.next(); Long inodeId = inodeIdIterator.next();
final INodeFile inodeFile = INode ucFile = fsnamesystem.getFSDirectory().getInode(inodeId);
fsnamesystem.getFSDirectory().getInode(inodeId).asFile(); if (ucFile == null) {
//probably got deleted
continue;
}
final INodeFile inodeFile = ucFile.asFile();
if (!inodeFile.isUnderConstruction()) { if (!inodeFile.isUnderConstruction()) {
LOG.warn("The file {} is not under construction but has lease.", LOG.warn("The file {} is not under construction but has lease.",
inodeFile.getFullPathName()); inodeFile.getFullPathName());

View File

@ -37,6 +37,7 @@
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.EnumSet;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
@ -46,6 +47,7 @@
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -55,6 +57,8 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
@ -867,6 +871,69 @@ public void run() {
closedFileSet, openFilesMap, 0); closedFileSet, openFilesMap, 0);
} }
/**
* Verify Decommission In Progress with List Open Files
* 1. start decommissioning a node (set LeavingServiceStatus)
* 2. close file with decommissioning
* @throws Exception
*/
@Test(timeout=180000)
public void testDecommissionWithCloseFileAndListOpenFiles()
throws Exception {
LOG.info("Starting test testDecommissionWithCloseFileAndListOpenFiles");
// Disable redundancy monitor check so that open files blocking
// decommission can be listed and verified.
getConf().setInt(
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1000);
getConf().setLong(
DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, 1);
startSimpleCluster(1, 3);
FileSystem fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0);
Path file = new Path("/openFile");
FSDataOutputStream st = AdminStatesBaseTest.writeIncompleteFile(fileSys,
file, (short)3, (short)(fileSize / blockSize));
for (DataNode d: getCluster().getDataNodes()) {
DataNodeTestUtils.triggerBlockReport(d);
}
LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
getCluster().getNameNode(0), file.toUri().getPath(),
0, blockSize * 10);
DatanodeInfo dnToDecommission = lbs.getLastLocatedBlock().getLocations()[0];
DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
dnToDecommission = dm.getDatanode(dnToDecommission.getDatanodeUuid());
initExcludeHost(dnToDecommission.getXferAddr());
refreshNodes(0);
BlockManagerTestUtil.recheckDecommissionState(dm);
waitNodeState(dnToDecommission, AdminStates.DECOMMISSION_INPROGRESS);
Thread.sleep(3000);
//Make sure DatanodeAdminMonitor(DatanodeAdminBackoffMonitor) At least twice run.
BatchedEntries<OpenFileEntry> batchedListEntries = getCluster().
getNameNodeRpc(0).listOpenFiles(0,
EnumSet.of(OpenFilesIterator.OpenFilesType.BLOCKING_DECOMMISSION),
OpenFilesIterator.FILTER_PATH_DEFAULT);
assertEquals(1, batchedListEntries.size());
st.close(); //close file
try {
batchedListEntries = getCluster().getNameNodeRpc().listOpenFiles(0,
EnumSet.of(OpenFilesIterator.OpenFilesType.BLOCKING_DECOMMISSION),
OpenFilesIterator.FILTER_PATH_DEFAULT);
assertEquals(0, batchedListEntries.size());
} catch (NullPointerException e) {
Assert.fail("Should not throw NPE when the file is not under " +
"construction but has lease!");
}
initExcludeHost("");
refreshNodes(0);
fileSys.delete(file, false);
}
@Test(timeout = 360000) @Test(timeout = 360000)
public void testDecommissionWithOpenFileAndBlockRecovery() public void testDecommissionWithOpenFileAndBlockRecovery()
throws IOException, InterruptedException { throws IOException, InterruptedException {

View File

@ -54,9 +54,11 @@
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.ChunkedArrayList;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.Assert;
/** /**
* Verify open files listing. * Verify open files listing.
@ -321,4 +323,33 @@ public void testListOpenFilesWithInvalidPathClientSide() throws Exception {
"hdfs://non-cluster/")); "hdfs://non-cluster/"));
fs.listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES), "/path"); fs.listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES), "/path");
} }
@Test
public void testListOpenFilesWithDeletedPath() throws Exception {
HashMap<Path, FSDataOutputStream> openFiles = new HashMap<>();
openFiles.putAll(
DFSTestUtil.createOpenFiles(fs, new Path("/"), "open-1", 1));
BatchedEntries<OpenFileEntry> openFileEntryBatchedEntries = nnRpc
.listOpenFiles(0, EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
OpenFilesIterator.FILTER_PATH_DEFAULT);
assertEquals(1, openFileEntryBatchedEntries.size());
String path = openFileEntryBatchedEntries.get(0).getFilePath();
FSNamesystem fsNamesystem = cluster.getNamesystem();
FSDirectory dir = fsNamesystem.getFSDirectory();
List<INode> removedINodes = new ChunkedArrayList<>();
removedINodes.add(dir.getINode(path));
fsNamesystem.writeLock();
try {
dir.removeFromInodeMap(removedINodes);
openFileEntryBatchedEntries = nnRpc
.listOpenFiles(0, EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
OpenFilesIterator.FILTER_PATH_DEFAULT);
assertEquals(0, openFileEntryBatchedEntries.size());
fsNamesystem.leaseManager.removeLease(dir.getINode(path).getId());
} catch (NullPointerException e) {
Assert.fail("Should not throw NPE when the file is deleted but has lease!");
} finally {
fsNamesystem.writeUnlock();
}
}
} }