diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index e9a9c69016..7b116d9e56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -157,4 +157,9 @@ public void delay() {} public void badDecoding(ByteBuffer[] outputs) {} public void markSlow(String dnAddr, int[] replies) {} + + /** + * Just delay delete replica a while. + */ + public void delayDeleteReplica() {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 7bfc24197a..e5eef8e2e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -334,6 +335,13 @@ private boolean moveFiles() { @Override public void run() { try { + // For testing, simulate the case asynchronously deletion of the + // replica task stacked pending. + DataNodeFaultInjector.get().delayDeleteReplica(); + if (!fsdatasetImpl.removeReplicaFromMem(block, volume)) { + return; + } + final long blockLength = replicaToDelete.getBlockDataLength(); final long metaLength = replicaToDelete.getMetadataLength(); boolean result; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 7e875d1867..675ce14f44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -2305,10 +2305,10 @@ private void invalidate(String bpid, Block[] invalidBlks, boolean async) throws IOException { final List errors = new ArrayList(); for (int i = 0; i < invalidBlks.length; i++) { - final ReplicaInfo removing; + final ReplicaInfo info; final FsVolumeImpl v; - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { - final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]); + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) { + info = volumeMap.get(bpid, invalidBlks[i]); if (info == null) { ReplicaInfo infoByBlockId = volumeMap.get(bpid, invalidBlks[i].getBlockId()); @@ -2342,48 +2342,21 @@ private void invalidate(String bpid, Block[] invalidBlks, boolean async) LOG.warn("Parent directory check failed; replica {} is " + "not backed by a local file", info); } - removing = volumeMap.remove(bpid, invalidBlks[i]); - addDeletingBlock(bpid, removing.getBlockId()); - LOG.debug("Block file {} is to be deleted", removing.getBlockURI()); - datanode.getMetrics().incrBlocksRemoved(1); - if (removing instanceof ReplicaInPipeline) { - ((ReplicaInPipeline) removing).releaseAllBytesReserved(); - } } - if (v.isTransientStorage()) { - RamDiskReplica replicaInfo = - ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId()); - if (replicaInfo != null) { - if (!replicaInfo.getIsPersisted()) { - datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted(); - } - ramDiskReplicaTracker.discardReplica(replicaInfo.getBlockPoolId(), - replicaInfo.getBlockId(), true); - } - } - - // If a DFSClient has the replica in its cache of short-circuit file - // descriptors (and the client is using ShortCircuitShm), invalidate it. - datanode.getShortCircuitRegistry().processBlockInvalidation( - new ExtendedBlockId(invalidBlks[i].getBlockId(), bpid)); - - // If the block is cached, start uncaching it. - cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId()); - try { if (async) { // Delete the block asynchronously to make sure we can do it fast // enough. // It's ok to unlink the block file before the uncache operation // finishes. - asyncDiskService.deleteAsync(v.obtainReference(), removing, + asyncDiskService.deleteAsync(v.obtainReference(), info, new ExtendedBlock(bpid, invalidBlks[i]), - dataStorage.getTrashDirectoryForReplica(bpid, removing)); + dataStorage.getTrashDirectoryForReplica(bpid, info)); } else { - asyncDiskService.deleteSync(v.obtainReference(), removing, + asyncDiskService.deleteSync(v.obtainReference(), info, new ExtendedBlock(bpid, invalidBlks[i]), - dataStorage.getTrashDirectoryForReplica(bpid, removing)); + dataStorage.getTrashDirectoryForReplica(bpid, info)); } } catch (ClosedChannelException e) { LOG.warn("Volume {} is closed, ignore the deletion task for " + @@ -2422,6 +2395,91 @@ public void invalidate(String bpid, ReplicaInfo block) { block.getStorageUuid()); } + /** + * Remove Replica from ReplicaMap. + * + * @param block + * @param volume + * @return + */ + boolean removeReplicaFromMem(final ExtendedBlock block, final FsVolumeImpl volume) { + final String bpid = block.getBlockPoolId(); + final Block localBlock = block.getLocalBlock(); + final long blockId = localBlock.getBlockId(); + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { + final ReplicaInfo info = volumeMap.get(bpid, localBlock); + if (info == null) { + ReplicaInfo infoByBlockId = volumeMap.get(bpid, blockId); + if (infoByBlockId == null) { + // It is okay if the block is not found -- it + // may be deleted earlier. + LOG.info("Failed to delete replica {}: ReplicaInfo not found " + + "in removeReplicaFromMem.", localBlock); + } else { + LOG.error("Failed to delete replica {}: GenerationStamp not matched, " + + "existing replica is {} in removeReplicaFromMem.", + localBlock, Block.toString(infoByBlockId)); + } + return false; + } + + FsVolumeImpl v = (FsVolumeImpl) info.getVolume(); + if (v == null) { + LOG.error("Failed to delete replica {}. No volume for this replica {} " + + "in removeReplicaFromMem.", localBlock, info); + return false; + } + + try { + File blockFile = new File(info.getBlockURI()); + if (blockFile.getParentFile() == null) { + LOG.error("Failed to delete replica {}. Parent not found for block file: {} " + + "in removeReplicaFromMem.", localBlock, blockFile); + return false; + } + } catch(IllegalArgumentException e) { + LOG.warn("Parent directory check failed; replica {} is " + + "not backed by a local file in removeReplicaFromMem.", info); + } + + if (!volume.getStorageID().equals(v.getStorageID())) { + LOG.error("Failed to delete replica {}. Appear different volumes, oldVolume: {} " + + "and newVolume: {} for this replica in removeReplicaFromMem.", + localBlock, volume, v); + return false; + } + + ReplicaInfo removing = volumeMap.remove(bpid, localBlock); + addDeletingBlock(bpid, removing.getBlockId()); + LOG.debug("Block file {} is to be deleted", removing.getBlockURI()); + datanode.getMetrics().incrBlocksRemoved(1); + if (removing instanceof ReplicaInPipeline) { + ((ReplicaInPipeline) removing).releaseAllBytesReserved(); + } + } + + if (volume.isTransientStorage()) { + RamDiskReplicaTracker.RamDiskReplica replicaInfo = ramDiskReplicaTracker. + getReplica(bpid, blockId); + if (replicaInfo != null) { + if (!replicaInfo.getIsPersisted()) { + datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted(); + } + ramDiskReplicaTracker.discardReplica(replicaInfo.getBlockPoolId(), + replicaInfo.getBlockId(), true); + } + } + + // If a DFSClient has the replica in its cache of short-circuit file + // descriptors (and the client is using ShortCircuitShm), invalidate it. + datanode.getShortCircuitRegistry().processBlockInvalidation( + ExtendedBlockId.fromExtendedBlock(block)); + + // If the block is cached, start uncaching it. + cacheManager.uncacheBlock(bpid, blockId); + return true; + } + /** * Asynchronously attempts to cache a single block via {@link FsDatasetCache}. */ @@ -3628,8 +3686,8 @@ public void removeDeletedBlocks(String bpid, Set blockIds) { } } } - - private void addDeletingBlock(String bpid, Long blockId) { + + protected void addDeletingBlock(String bpid, Long blockId) { synchronized(deletingBlock) { Set s = deletingBlock.get(bpid); if (s == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 0805257a28..9f2425d937 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -25,10 +25,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import org.apache.hadoop.fs.DF; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner; import org.apache.hadoop.hdfs.server.datanode.LocalReplica; @@ -1830,4 +1833,93 @@ public void testTransferAndNativeCopyMetrics() throws IOException { assertEquals(3, metrics.getNativeCopyIoQuantiles().length); } } + + /** + * The block should be in the replicaMap if the async deletion task is pending. + */ + @Test + public void testAysncDiskServiceDeleteReplica() + throws IOException, InterruptedException, TimeoutException { + HdfsConfiguration config = new HdfsConfiguration(); + // Bump up replication interval. + config.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(config).numDataNodes(3).build(); + DistributedFileSystem fs = cluster.getFileSystem(); + String bpid = cluster.getNamesystem().getBlockPoolId(); + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); + final Semaphore semaphore = new Semaphore(0); + try { + cluster.waitActive(); + final DataNodeFaultInjector injector = new DataNodeFaultInjector() { + @Override + public void delayDeleteReplica() { + // Lets wait for the remove replica process. + try { + semaphore.acquire(1); + } catch (InterruptedException e) { + // ignore. + } + } + }; + DataNodeFaultInjector.set(injector); + + // Create file. + Path path = new Path("/testfile"); + DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0); + DFSTestUtil.waitReplication(fs, path, (short) 3); + LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0); + ExtendedBlock extendedBlock = lb.getBlock(); + DatanodeInfo[] loc = lb.getLocations(); + assertEquals(3, loc.length); + + // DN side. + DataNode dn = cluster.getDataNode(loc[0].getIpcPort()); + final FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn); + List blockList = Lists.newArrayList(extendedBlock.getLocalBlock()); + assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId())); + ds.invalidate(bpid, blockList.toArray(new Block[0])); + + // Test get blocks and datanodes. + loc = DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations(); + assertEquals(3, loc.length); + List uuids = Lists.newArrayList(); + for (DatanodeInfo datanodeInfo : loc) { + uuids.add(datanodeInfo.getDatanodeUuid()); + } + assertTrue(uuids.contains(dn.getDatanodeUuid())); + + // Do verification that the first replication shouldn't be deleted from the memory first. + // Because the namenode still contains this replica, so client will try to read it. + // If this replica is deleted from memory, the client would got an ReplicaNotFoundException. + assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId())); + + // Make it resume the removeReplicaFromMem method. + semaphore.release(1); + + // Waiting for the async deletion task finish. + GenericTestUtils.waitFor(() -> + ds.asyncDiskService.countPendingDeletions() == 0, 100, 1000); + + // Sleep for two heartbeat times. + Thread.sleep(config.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, + TimeUnit.SECONDS, TimeUnit.MILLISECONDS) * 2); + + // Test get blocks and datanodes again. + loc = DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations(); + assertEquals(2, loc.length); + uuids = Lists.newArrayList(); + for (DatanodeInfo datanodeInfo : loc) { + uuids.add(datanodeInfo.getDatanodeUuid()); + } + // The namenode does not contain this replica. + assertFalse(uuids.contains(dn.getDatanodeUuid())); + + // This replica has deleted from datanode memory. + assertNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId())); + } finally { + cluster.shutdown(); + DataNodeFaultInjector.set(oldInjector); + } + } } \ No newline at end of file