From fb0519253d66ec218abc3c2b6bcbf03e9270d07f Mon Sep 17 00:00:00 2001 From: Felix Nguyen <23214709+kokonguyen191@users.noreply.github.com> Date: Sat, 11 May 2024 15:37:43 +0800 Subject: [PATCH] HDFS-17488. DN can fail IBRs with NPE when a volume is removed (#6759) --- .../src/site/markdown/Metrics.md | 1 + .../hdfs/server/datanode/BPOfferService.java | 7 +++ .../hadoop/hdfs/server/datanode/DataNode.java | 3 +- .../datanode/DataNodeFaultInjector.java | 6 +++ .../server/datanode/DirectoryScanner.java | 2 +- .../fsdataset/impl/FsDatasetImpl.java | 10 ++-- .../datanode/metrics/DataNodeMetrics.java | 5 ++ .../server/datanode/TestBPOfferService.java | 8 ++-- .../server/datanode/TestDirectoryScanner.java | 48 +++++++++++++++++++ 9 files changed, 82 insertions(+), 8 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index aaead83710..a89d254d93 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -532,6 +532,7 @@ Each metrics record contains tags such as SessionId and Hostname as additional i | `NumProcessedCommands` | Num of processed commands of all BPServiceActors | | `ProcessedCommandsOpNumOps` | Total number of processed commands operations | | `ProcessedCommandsOpAvgTime` | Average time of processed commands operations in milliseconds | +| `NullStorageBlockReports` | Number of blocks in IBRs that failed due to null storage | FsVolume -------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 1a2c024c90..11489e919c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.server.protocol.*; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; +import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import org.apache.hadoop.util.Lists; import org.apache.hadoop.util.Sets; @@ -324,6 +325,12 @@ private void notifyNamenodeBlock(ExtendedBlock block, BlockStatus status, final ReceivedDeletedBlockInfo info = new ReceivedDeletedBlockInfo( block.getLocalBlock(), status, delHint); final DatanodeStorage storage = dn.getFSDataset().getStorage(storageUuid); + if (storage == null) { + LOG.warn("Trying to add RDBI for null storage UUID {}. Trace: {}", storageUuid, + Joiner.on("\n").join(Thread.currentThread().getStackTrace())); + getDataNode().getMetrics().incrNullStorageBlockReports(); + return; + } for (BPServiceActor actor : bpServices) { actor.getIbrManager().notifyNamenodeBlock(info, storage, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 956f5bbe51..87e8eee681 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -4057,7 +4057,8 @@ public void checkDiskError() throws IOException { } } - private void handleVolumeFailures(Set unhealthyVolumes) { + @VisibleForTesting + public void handleVolumeFailures(Set unhealthyVolumes) { if (unhealthyVolumes.isEmpty()) { LOG.debug("handleVolumeFailures done with empty " + "unhealthyVolumes"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 372271b4fb..9e046cc360 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -172,4 +172,10 @@ public void delayDiffRecord() {} * Just delay getMetaDataInputStream a while. */ public void delayGetMetaDataInputStream() {} + + /** + * Used in {@link DirectoryScanner#reconcile()} to wait until a storage is removed, + * leaving a stale copy of {@link DirectoryScanner#diffs}. + */ + public void waitUntilStorageRemoved() {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 30a2d2e584..a99f3d78e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -466,7 +466,7 @@ void shutdown() { public void reconcile() throws IOException { LOG.debug("reconcile start DirectoryScanning"); scan(); - + DataNodeFaultInjector.get().waitUntilStorageRemoved(); // HDFS-14476: run checkAndUpdate with batch to avoid holding the lock too // long int loopCount = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 5be095118f..0ca222c083 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -2745,8 +2745,12 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo) curDirScannerNotifyCount = 0; lastDirScannerNotifyTime = startTimeMs; } - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid, - vol.getStorageID())) { + String storageUuid = vol.getStorageID(); + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid, storageUuid)) { + if (!storageMap.containsKey(storageUuid)) { + // Storage was already removed + return; + } memBlockInfo = volumeMap.get(bpid, blockId); if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) { @@ -2833,7 +2837,7 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo) maxDirScannerNotifyCount++; datanode.notifyNamenodeReceivedBlock( new ExtendedBlock(bpid, diskBlockInfo), null, - vol.getStorageID(), vol.isTransientStorage()); + storageUuid, vol.isTransientStorage()); } if (vol.isTransientStorage()) { long lockedBytesReserved = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 2e902f694a..832a8029f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -185,6 +185,8 @@ public class DataNodeMetrics { private MutableCounterLong numProcessedCommands; @Metric("Rate of processed commands of all BPServiceActors") private MutableRate processedCommandsOp; + @Metric("Number of blocks in IBRs that failed due to null storage") + private MutableCounterLong nullStorageBlockReports; // FsDatasetImpl local file process metrics. @Metric private MutableRate createRbwOp; @@ -812,4 +814,7 @@ public void incrReplaceBlockOpToOtherHost() { replaceBlockOpToOtherHost.incr(); } + public void incrNullStorageBlockReports() { + nullStorageBlockReports.incr(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 65855427d7..fd1b5609b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -136,6 +136,7 @@ public class TestBPOfferService { private FsDatasetSpi mockFSDataset; private DataSetLockManager dataSetLockManager = new DataSetLockManager(); private boolean isSlownode; + private String mockStorageID; @Before public void setupMocks() throws Exception { @@ -157,6 +158,7 @@ public void setupMocks() throws Exception { // Set up a simulated dataset with our fake BP mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf)); mockFSDataset.addBlockPool(FAKE_BPID, conf); + mockStorageID = ((SimulatedFSDataset) mockFSDataset).getStorages().get(0).getStorageUuid(); // Wire the dataset to the DN. Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(); @@ -289,7 +291,7 @@ public void testBasicFunctionality() throws Exception { waitForBlockReport(mockNN2); // When we receive a block, it should report it to both NNs - bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false); + bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, mockStorageID, false); ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1); assertEquals(1, ret.length); @@ -1099,7 +1101,7 @@ public void testRefreshNameNodes() throws Exception { waitForBlockReport(mockNN2); // When we receive a block, it should report it to both NNs - bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false); + bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, mockStorageID, false); ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1); @@ -1140,7 +1142,7 @@ public void testRefreshNameNodes() throws Exception { Mockito.verify(mockNN3).registerDatanode(Mockito.any()); // When we receive a block, it should report it to both NNs - bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false); + bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, mockStorageID, false); // veridfy new NN recieved block report ret = waitForBlockReceived(FAKE_BLOCK, mockNN3); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 96b3263963..3392410d1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -37,9 +37,11 @@ import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -1420,4 +1422,50 @@ private void writeFile(FileSystem fs, int numFiles) throws IOException { DFSTestUtil.createFile(fs, filePath, 1, (short) 1, 0); } } + + @Test(timeout = 30000) + public void testNullStorage() throws Exception { + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); + + Configuration conf = getConfiguration(); + conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); + cluster = new MiniDFSCluster.Builder(conf).build(); + try { + cluster.waitActive(); + bpid = cluster.getNamesystem().getBlockPoolId(); + fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); + client = cluster.getFileSystem().getClient(); + conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1); + createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, false); + // Make sure checkAndUpdate will run + truncateBlockFile(); + + // Mock a volume corruption after DirectoryScanner.scan() but before checkAndUpdate() + FsVolumeImpl volumeToRemove = fds.getVolumeList().get(0); + DataNodeFaultInjector injector = new DataNodeFaultInjector() { + @Override + public void waitUntilStorageRemoved() { + Set volumesToRemove = new HashSet<>(); + volumesToRemove.add(volumeToRemove); + cluster.getDataNodes().get(0).handleVolumeFailures(volumesToRemove); + } + }; + DataNodeFaultInjector.set(injector); + + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(DataNode.LOG); + scanner = new DirectoryScanner(fds, conf); + scanner.setRetainDiffs(true); + scanner.reconcile(); + assertFalse(logCapturer.getOutput() + .contains("Trying to add RDBI for null storage UUID " + volumeToRemove.getStorageID())); + } finally { + if (scanner != null) { + scanner.shutdown(); + scanner = null; + } + cluster.shutdown(); + DataNodeFaultInjector.set(oldInjector); + } + } }