diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 07213dd2be..e52b8496f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1139,6 +1139,9 @@ Release 2.7.0 - UNRELEASED HDFS-7830. DataNode does not release the volume lock when adding a volume fails. (Lei Xu via Colin P. Mccabe) + HDFS-6833. DirectoryScanner should not register a deleting block with + memory of DataNode. (Shinichi Yamashita via szetszwo) + BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS HDFS-7720. Quota by Storage Type API, tools and ClientNameNode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 01f967f6e4..61dfb14b9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -443,13 +443,14 @@ void scan() { int d = 0; // index for blockpoolReport int m = 0; // index for memReprot while (m < memReport.length && d < blockpoolReport.length) { - FinalizedReplica memBlock = memReport[Math.min(m, memReport.length - 1)]; - ScanInfo info = blockpoolReport[Math.min( - d, blockpoolReport.length - 1)]; + FinalizedReplica memBlock = memReport[m]; + ScanInfo info = blockpoolReport[d]; if (info.getBlockId() < memBlock.getBlockId()) { - // Block is missing in memory - statsRecord.missingMemoryBlocks++; - addDifference(diffRecord, statsRecord, info); + if (!dataset.isDeletingBlock(bpid, info.getBlockId())) { + // Block is missing in memory + statsRecord.missingMemoryBlocks++; + addDifference(diffRecord, statsRecord, info); + } d++; continue; } @@ -495,8 +496,11 @@ void scan() { current.getBlockId(), current.getVolume()); } while (d < blockpoolReport.length) { - statsRecord.missingMemoryBlocks++; - addDifference(diffRecord, statsRecord, blockpoolReport[d++]); + if (!dataset.isDeletingBlock(bpid, blockpoolReport[d].getBlockId())) { + statsRecord.missingMemoryBlocks++; + addDifference(diffRecord, statsRecord, blockpoolReport[d]); + } + d++; } LOG.info(statsRecord.toString()); } //end for diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 10c83694f9..5b183e67bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -543,4 +543,9 @@ public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block, * Check whether the block was pinned */ public boolean getPinning(ExtendedBlock block) throws IOException; + + /** + * Confirm whether the block is deleting + */ + public boolean isDeletingBlock(String bpid, long blockId); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 13e854f0b7..c1d3990e22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -22,7 +22,10 @@ import java.io.FileDescriptor; import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -64,9 +67,14 @@ class FsDatasetAsyncDiskService { private static final long THREADS_KEEP_ALIVE_SECONDS = 60; private final DataNode datanode; + private final FsDatasetImpl fsdatasetImpl; private final ThreadGroup threadGroup; private Map executors = new HashMap(); + private Map> deletedBlockIds + = new HashMap>(); + private static final int MAX_DELETED_BLOCKS = 64; + private int numDeletedBlocks = 0; /** * Create a AsyncDiskServices with a set of volumes (specified by their @@ -75,8 +83,9 @@ class FsDatasetAsyncDiskService { * The AsyncDiskServices uses one ThreadPool per volume to do the async * disk operations. */ - FsDatasetAsyncDiskService(DataNode datanode) { + FsDatasetAsyncDiskService(DataNode datanode, FsDatasetImpl fsdatasetImpl) { this.datanode = datanode; + this.fsdatasetImpl = fsdatasetImpl; this.threadGroup = new ThreadGroup(getClass().getSimpleName()); } @@ -286,7 +295,27 @@ public void run() { LOG.info("Deleted " + block.getBlockPoolId() + " " + block.getLocalBlock() + " file " + blockFile); } + updateDeletedBlockId(block); IOUtils.cleanup(null, volumeRef); } } + + private synchronized void updateDeletedBlockId(ExtendedBlock block) { + Set blockIds = deletedBlockIds.get(block.getBlockPoolId()); + if (blockIds == null) { + blockIds = new HashSet(); + deletedBlockIds.put(block.getBlockPoolId(), blockIds); + } + blockIds.add(block.getBlockId()); + numDeletedBlocks++; + if (numDeletedBlocks == MAX_DELETED_BLOCKS) { + for (Entry> e : deletedBlockIds.entrySet()) { + String bpid = e.getKey(); + Set bs = e.getValue(); + fsdatasetImpl.removeDeletedBlocks(bpid, bs); + bs.clear(); + } + numDeletedBlocks = 0; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 0f28aa4e53..48ac6ca34c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -237,6 +237,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) private volatile boolean fsRunning; final ReplicaMap volumeMap; + final Map> deletingBlock; final RamDiskReplicaTracker ramDiskReplicaTracker; final RamDiskAsyncLazyPersistService asyncLazyPersistService; @@ -298,8 +299,9 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) VolumeChoosingPolicy.class), conf); volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(), blockChooserImpl); - asyncDiskService = new FsDatasetAsyncDiskService(datanode); + asyncDiskService = new FsDatasetAsyncDiskService(datanode, this); asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode); + deletingBlock = new HashMap>(); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { addVolume(dataLocations, storage.getStorageDir(idx)); @@ -1795,7 +1797,12 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException { + ". Parent not found for file " + f); continue; } - volumeMap.remove(bpid, invalidBlks[i]); + ReplicaInfo removing = volumeMap.remove(bpid, invalidBlks[i]); + addDeletingBlock(bpid, removing.getBlockId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Block file " + removing.getBlockFile().getName() + + " is to be deleted"); + } } if (v.isTransientStorage()) { @@ -3005,5 +3012,35 @@ public boolean getPinning(ExtendedBlock block) throws IOException { FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath())); return fss.getPermission().getStickyBit(); } + + @Override + public boolean isDeletingBlock(String bpid, long blockId) { + synchronized(deletingBlock) { + Set s = deletingBlock.get(bpid); + return s != null ? s.contains(blockId) : false; + } + } + + public void removeDeletedBlocks(String bpid, Set blockIds) { + synchronized (deletingBlock) { + Set s = deletingBlock.get(bpid); + if (s != null) { + for (Long id : blockIds) { + s.remove(id); + } + } + } + } + + private void addDeletingBlock(String bpid, Long blockId) { + synchronized(deletingBlock) { + Set s = deletingBlock.get(bpid); + if (s == null) { + s = new HashSet(); + deletingBlock.put(bpid, s); + } + s.add(blockId); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 8ae541503d..f0dbd0f51f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1318,5 +1318,10 @@ public void setPinning(ExtendedBlock b) throws IOException { public boolean getPinning(ExtendedBlock b) throws IOException { return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned; } + + @Override + public boolean isDeletingBlock(String bpid, long blockId) { + throw new UnsupportedOperationException(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index a3c9935ca8..6653ccae07 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -429,4 +429,9 @@ public void setPinning(ExtendedBlock block) throws IOException { public boolean getPinning(ExtendedBlock block) throws IOException { return false; } + + @Override + public boolean isDeletingBlock(String bpid, long blockId) { + return false; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 3b47dd095d..403cb2a6c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -18,10 +18,14 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import com.google.common.collect.Lists; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.Storage; @@ -29,8 +33,11 @@ import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.DNConf; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; @@ -88,6 +95,8 @@ public class TestFsDatasetImpl { private DataNode datanode; private DataStorage storage; private FsDatasetImpl dataset; + + private final static String BLOCKPOOL = "BP-TEST"; private static Storage.StorageDirectory createStorageDirectory(File root) { Storage.StorageDirectory sd = new Storage.StorageDirectory(root); @@ -334,4 +343,54 @@ public void testAddVolumeFailureReleasesInUseLock() throws IOException { FsDatasetTestUtil.assertFileLockReleased(badDir.toString()); } + + @Test + public void testDeletingBlocks() throws IOException { + MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); + try { + cluster.waitActive(); + DataNode dn = cluster.getDataNodes().get(0); + + FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn); + FsVolumeImpl vol = ds.getVolumes().get(0); + + ExtendedBlock eb; + ReplicaInfo info; + List blockList = new ArrayList(); + for (int i = 1; i <= 63; i++) { + eb = new ExtendedBlock(BLOCKPOOL, i, 1, 1000 + i); + info = new FinalizedReplica( + eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); + ds.volumeMap.add(BLOCKPOOL, info); + info.getBlockFile().createNewFile(); + info.getMetaFile().createNewFile(); + blockList.add(info); + } + ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0])); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Nothing to do + } + assertTrue(ds.isDeletingBlock(BLOCKPOOL, blockList.get(0).getBlockId())); + + blockList.clear(); + eb = new ExtendedBlock(BLOCKPOOL, 64, 1, 1064); + info = new FinalizedReplica( + eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); + ds.volumeMap.add(BLOCKPOOL, info); + info.getBlockFile().createNewFile(); + info.getMetaFile().createNewFile(); + blockList.add(info); + ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0])); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Nothing to do + } + assertFalse(ds.isDeletingBlock(BLOCKPOOL, blockList.get(0).getBlockId())); + } finally { + cluster.shutdown(); + } + } }