HDFS-15963. Unreleased volume references cause an infinite loop. (#2889) Contributed by Shuyan Zhang.

Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org>
Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
zhangshuyan0 2021-04-17 00:08:31 +08:00 committed by GitHub
parent f182798695
commit 14816be0b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 188 additions and 41 deletions

View File

@ -432,6 +432,7 @@ class BlockSender implements java.io.Closeable {
ris = new ReplicaInputStreams( ris = new ReplicaInputStreams(
blockIn, checksumIn, volumeRef, fileIoProvider); blockIn, checksumIn, volumeRef, fileIoProvider);
} catch (IOException ioe) { } catch (IOException ioe) {
IOUtils.cleanupWithLogger(null, volumeRef);
IOUtils.closeStream(this); IOUtils.closeStream(this);
org.apache.commons.io.IOUtils.closeQuietly(blockIn); org.apache.commons.io.IOUtils.closeQuietly(blockIn);
org.apache.commons.io.IOUtils.closeQuietly(checksumIn); org.apache.commons.io.IOUtils.closeQuietly(checksumIn);

View File

@ -167,18 +167,26 @@ synchronized long countPendingDeletions() {
* Execute the task sometime in the future, using ThreadPools. * Execute the task sometime in the future, using ThreadPools.
*/ */
synchronized void execute(FsVolumeImpl volume, Runnable task) { synchronized void execute(FsVolumeImpl volume, Runnable task) {
if (executors == null) { try {
throw new RuntimeException("AsyncDiskService is already shutdown"); if (executors == null) {
} throw new RuntimeException("AsyncDiskService is already shutdown");
if (volume == null) { }
throw new RuntimeException("A null volume does not have a executor"); if (volume == null) {
} throw new RuntimeException("A null volume does not have a executor");
ThreadPoolExecutor executor = executors.get(volume.getStorageID()); }
if (executor == null) { ThreadPoolExecutor executor = executors.get(volume.getStorageID());
throw new RuntimeException("Cannot find volume " + volume if (executor == null) {
+ " for execution of task " + task); throw new RuntimeException("Cannot find volume " + volume
} else { + " for execution of task " + task);
executor.execute(task); } else {
executor.execute(task);
}
} catch (RuntimeException re) {
if (task instanceof ReplicaFileDeleteTask) {
IOUtils.cleanupWithLogger(null,
((ReplicaFileDeleteTask) task).volumeRef);
}
throw re;
} }
} }
@ -314,28 +322,31 @@ private boolean moveFiles() {
@Override @Override
public void run() { public void run() {
final long blockLength = replicaToDelete.getBlockDataLength(); try {
final long metaLength = replicaToDelete.getMetadataLength(); final long blockLength = replicaToDelete.getBlockDataLength();
boolean result; final long metaLength = replicaToDelete.getMetadataLength();
boolean result;
result = (trashDirectory == null) ? deleteFiles() : moveFiles(); result = (trashDirectory == null) ? deleteFiles() : moveFiles();
if (!result) { if (!result) {
LOG.warn("Unexpected error trying to " LOG.warn("Unexpected error trying to "
+ (trashDirectory == null ? "delete" : "move") + (trashDirectory == null ? "delete" : "move")
+ " block " + block.getBlockPoolId() + " " + block.getLocalBlock() + " block " + block.getBlockPoolId() + " " + block.getLocalBlock()
+ " at file " + replicaToDelete.getBlockURI() + ". Ignored."); + " at file " + replicaToDelete.getBlockURI() + ". Ignored.");
} else { } else {
if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){ if (block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK) {
datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID()); datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
}
volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength);
volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength);
LOG.info("Deleted " + block.getBlockPoolId() + " " +
block.getLocalBlock() + " URI " + replicaToDelete.getBlockURI());
} }
volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength); updateDeletedBlockId(block);
volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength); } finally {
LOG.info("Deleted " + block.getBlockPoolId() + " " IOUtils.cleanupWithLogger(null, this.volumeRef);
+ block.getLocalBlock() + " URI " + replicaToDelete.getBlockURI());
} }
updateDeletedBlockId(block);
IOUtils.cleanupWithLogger(null, volumeRef);
} }
} }

View File

@ -319,7 +319,7 @@ private void checkReference() {
} }
@VisibleForTesting @VisibleForTesting
int getReferenceCount() { public int getReferenceCount() {
return this.reference.getReferenceCount(); return this.reference.getReferenceCount();
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -153,16 +154,24 @@ synchronized boolean queryVolume(FsVolumeImpl volume) {
* Execute the task sometime in the future, using ThreadPools. * Execute the task sometime in the future, using ThreadPools.
*/ */
synchronized void execute(String storageId, Runnable task) { synchronized void execute(String storageId, Runnable task) {
if (executors == null) { try {
throw new RuntimeException( if (executors == null) {
"AsyncLazyPersistService is already shutdown"); throw new RuntimeException(
} "AsyncLazyPersistService is already shutdown");
ThreadPoolExecutor executor = executors.get(storageId); }
if (executor == null) { ThreadPoolExecutor executor = executors.get(storageId);
throw new RuntimeException("Cannot find root storage volume with id " + if (executor == null) {
storageId + " for execution of task " + task); throw new RuntimeException("Cannot find root storage volume with id " +
} else { storageId + " for execution of task " + task);
executor.execute(task); } else {
executor.execute(task);
}
} catch (RuntimeException re) {
if (task instanceof ReplicaLazyPersistTask) {
IOUtils.cleanupWithLogger(null,
((ReplicaLazyPersistTask) task).targetVolume);
}
throw re;
} }
} }

View File

@ -33,6 +33,9 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Random; import java.util.Random;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -562,4 +565,56 @@ void writeBlock(ExtendedBlock block, BlockConstructionStage stage,
checksum, CachingStrategy.newDefaultStrategy(), false, false, checksum, CachingStrategy.newDefaultStrategy(), false, false,
null, null, new String[0]); null, null, new String[0]);
} }
@Test(timeout = 30000)
public void testReleaseVolumeRefIfExceptionThrown()
throws IOException, InterruptedException {
Path file = new Path("dataprotocol.dat");
int numDataNodes = 1;
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
numDataNodes).build();
try {
cluster.waitActive();
datanode = cluster.getFileSystem().getDataNodeStats(
DatanodeReportType.LIVE)[0];
dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
FileSystem fileSys = cluster.getFileSystem();
int fileLen = Math.min(
conf.getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096), 4096);
DFSTestUtil.createFile(fileSys, file, fileLen, fileLen,
fileSys.getDefaultBlockSize(file),
fileSys.getDefaultReplication(file), 0L);
// Get the first blockid for the file.
final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
String bpid = cluster.getNamesystem().getBlockPoolId();
ExtendedBlock blk = new ExtendedBlock(bpid, firstBlock.getLocalBlock());
sendBuf.reset();
recvBuf.reset();
// Delete the meta file to create a exception in BlockSender constructor.
DataNode dn = cluster.getDataNodes().get(0);
cluster.getMaterializedReplica(0, blk).deleteMeta();
FsVolumeImpl volume = (FsVolumeImpl) DataNodeTestUtils.getFSDataset(
dn).getVolume(blk);
int beforeCnt = volume.getReferenceCount();
sender.copyBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN);
sendRecvData("Copy a block.", false);
Thread.sleep(3000);
int afterCnt = volume.getReferenceCount();
assertEquals(beforeCnt, afterCnt);
} finally {
cluster.shutdown();
}
}
} }

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.DF;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import java.io.OutputStream; import java.io.OutputStream;
@ -1805,4 +1806,37 @@ public void testNotifyNamenodeMissingOrNewBlock() throws Exception {
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test(timeout = 20000)
public void testReleaseVolumeRefIfExceptionThrown() throws IOException {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(
new HdfsConfiguration()).build();
cluster.waitActive();
FsVolumeImpl vol = (FsVolumeImpl) dataset.getFsVolumeReferences().get(0);
ExtendedBlock eb;
ReplicaInfo info;
int beforeCnt = 0;
try {
List<Block> blockList = new ArrayList<Block>();
eb = new ExtendedBlock(BLOCKPOOL, 1, 1, 1001);
info = new FinalizedReplica(
eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
dataset.volumeMap.add(BLOCKPOOL, info);
((LocalReplica) info).getBlockFile().createNewFile();
((LocalReplica) info).getMetaFile().createNewFile();
blockList.add(info);
// Create a runtime exception.
dataset.asyncDiskService.shutdown();
beforeCnt = vol.getReferenceCount();
dataset.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
} catch (RuntimeException re) {
int afterCnt = vol.getReferenceCount();
assertEquals(beforeCnt, afterCnt);
} finally {
cluster.shutdown();
}
}
} }

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ThreadUtil; import org.apache.hadoop.util.ThreadUtil;
import org.junit.Assert; import org.junit.Assert;
@ -280,4 +283,38 @@ public void run() {
} }
} }
} }
@Test(timeout = 20000)
public void testReleaseVolumeRefIfExceptionThrown()
throws IOException, InterruptedException {
getClusterBuilder().setRamDiskReplicaCapacity(2).build();
final String methodName = GenericTestUtils.getMethodName();
final int seed = 0xFADED;
Path path = new Path("/" + methodName + ".Writer.File.dat");
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetSpi.FsVolumeReferences volumes =
DataNodeTestUtils.getFSDataset(dn).getFsVolumeReferences();
int[] beforeCnts = new int[volumes.size()];
FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
// Create a runtime exception.
ds.asyncLazyPersistService.shutdown();
for (int i = 0; i < volumes.size(); ++i) {
beforeCnts[i] = ((FsVolumeImpl) volumes.get(i)).getReferenceCount();
}
makeRandomTestFile(path, BLOCK_SIZE, true, seed);
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
for (int i = 0; i < volumes.size(); ++i) {
int afterCnt = ((FsVolumeImpl) volumes.get(i)).getReferenceCount();
// LazyWriter keeps trying to save copies even if
// asyncLazyPersistService is already shutdown.
// If we do not release references, the number of
// references will increase infinitely.
Assert.assertTrue(
beforeCnts[i] == afterCnt || beforeCnts[i] == (afterCnt - 1));
}
}
} }