HDFS-17488. DN can fail IBRs with NPE when a volume is removed (#6759)
This commit is contained in:
parent
700b3e4800
commit
fb0519253d
@ -532,6 +532,7 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
|
||||
| `NumProcessedCommands` | Num of processed commands of all BPServiceActors |
|
||||
| `ProcessedCommandsOpNumOps` | Total number of processed commands operations |
|
||||
| `ProcessedCommandsOpAvgTime` | Average time of processed commands operations in milliseconds |
|
||||
| `NullStorageBlockReports` | Number of blocks in IBRs that failed due to null storage |
|
||||
|
||||
FsVolume
|
||||
--------
|
||||
|
@ -31,6 +31,7 @@
|
||||
import org.apache.hadoop.hdfs.server.protocol.*;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
|
||||
import org.apache.hadoop.util.Lists;
|
||||
import org.apache.hadoop.util.Sets;
|
||||
|
||||
@ -324,6 +325,12 @@ private void notifyNamenodeBlock(ExtendedBlock block, BlockStatus status,
|
||||
final ReceivedDeletedBlockInfo info = new ReceivedDeletedBlockInfo(
|
||||
block.getLocalBlock(), status, delHint);
|
||||
final DatanodeStorage storage = dn.getFSDataset().getStorage(storageUuid);
|
||||
if (storage == null) {
|
||||
LOG.warn("Trying to add RDBI for null storage UUID {}. Trace: {}", storageUuid,
|
||||
Joiner.on("\n").join(Thread.currentThread().getStackTrace()));
|
||||
getDataNode().getMetrics().incrNullStorageBlockReports();
|
||||
return;
|
||||
}
|
||||
|
||||
for (BPServiceActor actor : bpServices) {
|
||||
actor.getIbrManager().notifyNamenodeBlock(info, storage,
|
||||
|
@ -4057,7 +4057,8 @@ public void checkDiskError() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
private void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) {
|
||||
@VisibleForTesting
|
||||
public void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) {
|
||||
if (unhealthyVolumes.isEmpty()) {
|
||||
LOG.debug("handleVolumeFailures done with empty " +
|
||||
"unhealthyVolumes");
|
||||
|
@ -172,4 +172,10 @@ public void delayDiffRecord() {}
|
||||
* Just delay getMetaDataInputStream a while.
|
||||
*/
|
||||
public void delayGetMetaDataInputStream() {}
|
||||
|
||||
/**
|
||||
* Used in {@link DirectoryScanner#reconcile()} to wait until a storage is removed,
|
||||
* leaving a stale copy of {@link DirectoryScanner#diffs}.
|
||||
*/
|
||||
public void waitUntilStorageRemoved() {}
|
||||
}
|
||||
|
@ -466,7 +466,7 @@ void shutdown() {
|
||||
public void reconcile() throws IOException {
|
||||
LOG.debug("reconcile start DirectoryScanning");
|
||||
scan();
|
||||
|
||||
DataNodeFaultInjector.get().waitUntilStorageRemoved();
|
||||
// HDFS-14476: run checkAndUpdate with batch to avoid holding the lock too
|
||||
// long
|
||||
int loopCount = 0;
|
||||
|
@ -2745,8 +2745,12 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
|
||||
curDirScannerNotifyCount = 0;
|
||||
lastDirScannerNotifyTime = startTimeMs;
|
||||
}
|
||||
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid,
|
||||
vol.getStorageID())) {
|
||||
String storageUuid = vol.getStorageID();
|
||||
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid, storageUuid)) {
|
||||
if (!storageMap.containsKey(storageUuid)) {
|
||||
// Storage was already removed
|
||||
return;
|
||||
}
|
||||
memBlockInfo = volumeMap.get(bpid, blockId);
|
||||
if (memBlockInfo != null &&
|
||||
memBlockInfo.getState() != ReplicaState.FINALIZED) {
|
||||
@ -2833,7 +2837,7 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
|
||||
maxDirScannerNotifyCount++;
|
||||
datanode.notifyNamenodeReceivedBlock(
|
||||
new ExtendedBlock(bpid, diskBlockInfo), null,
|
||||
vol.getStorageID(), vol.isTransientStorage());
|
||||
storageUuid, vol.isTransientStorage());
|
||||
}
|
||||
if (vol.isTransientStorage()) {
|
||||
long lockedBytesReserved =
|
||||
|
@ -185,6 +185,8 @@ public class DataNodeMetrics {
|
||||
private MutableCounterLong numProcessedCommands;
|
||||
@Metric("Rate of processed commands of all BPServiceActors")
|
||||
private MutableRate processedCommandsOp;
|
||||
@Metric("Number of blocks in IBRs that failed due to null storage")
|
||||
private MutableCounterLong nullStorageBlockReports;
|
||||
|
||||
// FsDatasetImpl local file process metrics.
|
||||
@Metric private MutableRate createRbwOp;
|
||||
@ -812,4 +814,7 @@ public void incrReplaceBlockOpToOtherHost() {
|
||||
replaceBlockOpToOtherHost.incr();
|
||||
}
|
||||
|
||||
public void incrNullStorageBlockReports() {
|
||||
nullStorageBlockReports.incr();
|
||||
}
|
||||
}
|
||||
|
@ -136,6 +136,7 @@ public class TestBPOfferService {
|
||||
private FsDatasetSpi<?> mockFSDataset;
|
||||
private DataSetLockManager dataSetLockManager = new DataSetLockManager();
|
||||
private boolean isSlownode;
|
||||
private String mockStorageID;
|
||||
|
||||
@Before
|
||||
public void setupMocks() throws Exception {
|
||||
@ -157,6 +158,7 @@ public void setupMocks() throws Exception {
|
||||
// Set up a simulated dataset with our fake BP
|
||||
mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
|
||||
mockFSDataset.addBlockPool(FAKE_BPID, conf);
|
||||
mockStorageID = ((SimulatedFSDataset) mockFSDataset).getStorages().get(0).getStorageUuid();
|
||||
|
||||
// Wire the dataset to the DN.
|
||||
Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset();
|
||||
@ -289,7 +291,7 @@ public void testBasicFunctionality() throws Exception {
|
||||
waitForBlockReport(mockNN2);
|
||||
|
||||
// When we receive a block, it should report it to both NNs
|
||||
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
|
||||
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, mockStorageID, false);
|
||||
|
||||
ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1);
|
||||
assertEquals(1, ret.length);
|
||||
@ -1099,7 +1101,7 @@ public void testRefreshNameNodes() throws Exception {
|
||||
waitForBlockReport(mockNN2);
|
||||
|
||||
// When we receive a block, it should report it to both NNs
|
||||
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
|
||||
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, mockStorageID, false);
|
||||
|
||||
ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK,
|
||||
mockNN1);
|
||||
@ -1140,7 +1142,7 @@ public void testRefreshNameNodes() throws Exception {
|
||||
Mockito.verify(mockNN3).registerDatanode(Mockito.any());
|
||||
|
||||
// When we receive a block, it should report it to both NNs
|
||||
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
|
||||
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, mockStorageID, false);
|
||||
|
||||
// veridfy new NN recieved block report
|
||||
ret = waitForBlockReceived(FAKE_BLOCK, mockNN3);
|
||||
|
@ -37,9 +37,11 @@
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
@ -1420,4 +1422,50 @@ private void writeFile(FileSystem fs, int numFiles) throws IOException {
|
||||
DFSTestUtil.createFile(fs, filePath, 1, (short) 1, 0);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testNullStorage() throws Exception {
|
||||
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
|
||||
|
||||
Configuration conf = getConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
|
||||
client = cluster.getFileSystem().getClient();
|
||||
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
|
||||
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, false);
|
||||
// Make sure checkAndUpdate will run
|
||||
truncateBlockFile();
|
||||
|
||||
// Mock a volume corruption after DirectoryScanner.scan() but before checkAndUpdate()
|
||||
FsVolumeImpl volumeToRemove = fds.getVolumeList().get(0);
|
||||
DataNodeFaultInjector injector = new DataNodeFaultInjector() {
|
||||
@Override
|
||||
public void waitUntilStorageRemoved() {
|
||||
Set<FsVolumeSpi> volumesToRemove = new HashSet<>();
|
||||
volumesToRemove.add(volumeToRemove);
|
||||
cluster.getDataNodes().get(0).handleVolumeFailures(volumesToRemove);
|
||||
}
|
||||
};
|
||||
DataNodeFaultInjector.set(injector);
|
||||
|
||||
GenericTestUtils.LogCapturer logCapturer =
|
||||
GenericTestUtils.LogCapturer.captureLogs(DataNode.LOG);
|
||||
scanner = new DirectoryScanner(fds, conf);
|
||||
scanner.setRetainDiffs(true);
|
||||
scanner.reconcile();
|
||||
assertFalse(logCapturer.getOutput()
|
||||
.contains("Trying to add RDBI for null storage UUID " + volumeToRemove.getStorageID()));
|
||||
} finally {
|
||||
if (scanner != null) {
|
||||
scanner.shutdown();
|
||||
scanner = null;
|
||||
}
|
||||
cluster.shutdown();
|
||||
DataNodeFaultInjector.set(oldInjector);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user