diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 957034bda2..88b19d81a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -427,6 +427,9 @@ Release 2.6.0 - UNRELEASED HDFS-6879. Adding tracing to Hadoop RPC (Masatake Iwasaki via Colin Patrick McCabe) + HDFS-6774. Make FsDataset and DataStore support removing volumes. (Lei Xu + via atm) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java index 88f858b94c..b7f688dca4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java @@ -201,6 +201,20 @@ public class BlockPoolSliceStorage extends Storage { writeProperties(bpSdir); } + /** + * Remove storage directories. + * @param storageDirs a set of storage directories to be removed. + */ + void removeVolumes(Set storageDirs) { + for (Iterator it = this.storageDirs.iterator(); + it.hasNext(); ) { + StorageDirectory sd = it.next(); + if (storageDirs.contains(sd.getRoot())) { + it.remove(); + } + } + } + /** * Set layoutVersion, namespaceID and blockpoolID into block pool storage * VERSION file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index 4b9656eb8e..ceb2aa0795 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -336,6 +336,33 @@ public class DataStorage extends Storage { } } + /** + * Remove volumes from DataStorage. + * @param locations a collection of volumes. + */ + synchronized void removeVolumes(Collection locations) { + if (locations.isEmpty()) { + return; + } + + Set dataDirs = new HashSet(); + for (StorageLocation sl : locations) { + dataDirs.add(sl.getFile()); + } + + for (BlockPoolSliceStorage bpsStorage : this.bpStorageMap.values()) { + bpsStorage.removeVolumes(dataDirs); + } + + for (Iterator it = this.storageDirs.iterator(); + it.hasNext(); ) { + StorageDirectory sd = it.next(); + if (dataDirs.contains(sd.getRoot())) { + it.remove(); + } + } + } + /** * Analyze storage directories. * Recover from previous transitions if required. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index a64f9c0d58..0fbfe19086 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -97,6 +97,9 @@ public interface FsDatasetSpi extends FSDatasetMBean { public void addVolumes(Collection volumes) throws IOException; + /** Removes a collection of volumes from FsDataset. */ + public void removeVolumes(Collection volumes); + /** @return a storage with the given storage ID */ public DatanodeStorage getStorage(final String storageUuid); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index af467b93f0..57744073c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -340,7 +340,7 @@ class BlockPoolSlice { loadRwr = false; } sc.close(); - if (restartMeta.delete()) { + if (!restartMeta.delete()) { FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " + restartMeta.getPath()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 539e97be4a..bee7bf70c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -118,6 +118,24 @@ class FsDatasetAsyncDiskService { } addExecutorForVolume(volume); } + + /** + * Stops AsyncDiskService for a volume. + * @param volume the root of the volume. + */ + synchronized void removeVolume(File volume) { + if (executors == null) { + throw new RuntimeException("AsyncDiskService is already shutdown"); + } + ThreadPoolExecutor executor = executors.get(volume); + if (executor == null) { + throw new RuntimeException("Can not find volume " + volume + + " to remove."); + } else { + executor.shutdown(); + executors.remove(volume); + } + } synchronized long countPendingDeletions() { long count = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 148055c6f9..5306be7714 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -30,9 +30,11 @@ import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import javax.management.NotCompliantMBeanException; @@ -314,6 +316,51 @@ class FsDatasetImpl implements FsDatasetSpi { } } + /** + * Removes a collection of volumes from FsDataset. + * @param volumes the root directories of the volumes. + * + * DataNode should call this function before calling + * {@link DataStorage#removeVolumes(java.util.Collection)}. + */ + @Override + public synchronized void removeVolumes(Collection volumes) { + Set volumeSet = new HashSet(); + for (StorageLocation sl : volumes) { + volumeSet.add(sl.getFile()); + } + for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { + Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); + if (volumeSet.contains(sd.getRoot())) { + String volume = sd.getRoot().toString(); + LOG.info("Removing " + volume + " from FsDataset."); + + this.volumes.removeVolume(volume); + storageMap.remove(sd.getStorageUuid()); + asyncDiskService.removeVolume(sd.getCurrentDir()); + + // Removed all replica information for the blocks on the volume. Unlike + // updating the volumeMap in addVolume(), this operation does not scan + // disks. + for (String bpid : volumeMap.getBlockPoolList()) { + List blocks = new ArrayList(); + for (Iterator it = volumeMap.replicas(bpid).iterator(); + it.hasNext(); ) { + ReplicaInfo block = it.next(); + if (block.getVolume().getBasePath().equals(volume)) { + invalidate(bpid, block.getBlockId()); + blocks.add(block); + it.remove(); + } + } + // Delete blocks from the block scanner in batch. + datanode.getBlockScanner().deleteBlocks(bpid, + blocks.toArray(new Block[blocks.size()])); + } + } + } + } + private StorageType getStorageTypeFromLocations( Collection dataLocations, File dir) { for (StorageLocation dataLocation : dataLocations) { @@ -1294,6 +1341,28 @@ class FsDatasetImpl implements FsDatasetSpi { } } + /** + * Invalidate a block but does not delete the actual on-disk block file. + * + * It should only be used for decommissioning disks. + * + * @param bpid the block pool ID. + * @param blockId the ID of the block. + */ + public void invalidate(String bpid, long blockId) { + // If a DFSClient has the replica in its cache of short-circuit file + // descriptors (and the client is using ShortCircuitShm), invalidate it. + // The short-circuit registry is null in the unit tests, because the + // datanode is mock object. + if (datanode.getShortCircuitRegistry() != null) { + datanode.getShortCircuitRegistry().processBlockInvalidation( + new ExtendedBlockId(blockId, bpid)); + + // If the block is cached, start uncaching it. + cacheManager.uncacheBlock(bpid, blockId); + } + } + /** * Asynchronously attempts to cache a single block via {@link FsDatasetCache}. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index d4f8adc011..90739c3f41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -212,6 +212,25 @@ class FsVolumeList { FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString()); } + /** + * Dynamically remove volume to the list. + * @param volume the volume to be removed. + */ + synchronized void removeVolume(String volume) { + // Make a copy of volumes to remove one volume. + final List volumeList = new ArrayList(volumes); + for (Iterator it = volumeList.iterator(); it.hasNext(); ) { + FsVolumeImpl fsVolume = it.next(); + if (fsVolume.getBasePath().equals(volume)) { + fsVolume.shutdown(); + it.remove(); + volumes = Collections.unmodifiableList(volumeList); + FsDatasetImpl.LOG.info("Removed volume: " + volume); + break; + } + } + } + void addBlockPool(final String bpid, final Configuration conf) throws IOException { long totalStartTime = Time.monotonicNow(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 109a0394fc..a51342ea75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1120,6 +1120,11 @@ public class SimulatedFSDataset implements FsDatasetSpi { throw new UnsupportedOperationException(); } + @Override + public synchronized void removeVolumes(Collection volumes) { + throw new UnsupportedOperationException(); + } + @Override public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, FileDescriptor fd, long offset, long nbytes, int flags) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index d9e99078be..2c4c401205 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -18,12 +18,20 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.datanode.DNConf; +import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.StringUtils; import org.junit.Before; import org.junit.Test; @@ -35,25 +43,44 @@ import java.util.ArrayList; import java.util.List; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class TestFsDatasetImpl { private static final String BASE_DIR = - System.getProperty("test.build.dir") + "/fsdatasetimpl"; + new FileSystemTestHelper().getTestRootDir(); private static final int NUM_INIT_VOLUMES = 2; + private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"}; + // Use to generate storageUuid + private static final DataStorage dsForStorageUuid = new DataStorage( + new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE)); + + private Configuration conf; private DataStorage storage; + private DataBlockScanner scanner; private FsDatasetImpl dataset; + private static Storage.StorageDirectory createStorageDirectory(File root) { + Storage.StorageDirectory sd = new Storage.StorageDirectory(root); + dsForStorageUuid.createStorageID(sd); + return sd; + } + private static void createStorageDirs(DataStorage storage, Configuration conf, int numDirs) throws IOException { List dirs = new ArrayList(); List dirStrings = new ArrayList(); for (int i = 0; i < numDirs; i++) { - String loc = BASE_DIR + "/data" + i; - dirStrings.add(loc); - dirs.add(new Storage.StorageDirectory(new File(loc))); + File loc = new File(BASE_DIR + "/data" + i); + dirStrings.add(loc.toString()); + loc.mkdirs(); + dirs.add(createStorageDirectory(loc)); when(storage.getStorageDir(i)).thenReturn(dirs.get(i)); } @@ -66,14 +93,19 @@ public class TestFsDatasetImpl { public void setUp() throws IOException { final DataNode datanode = Mockito.mock(DataNode.class); storage = Mockito.mock(DataStorage.class); - Configuration conf = new Configuration(); + scanner = Mockito.mock(DataBlockScanner.class); + this.conf = new Configuration(); final DNConf dnConf = new DNConf(conf); when(datanode.getConf()).thenReturn(conf); when(datanode.getDnConf()).thenReturn(dnConf); + when(datanode.getBlockScanner()).thenReturn(scanner); createStorageDirs(storage, conf, NUM_INIT_VOLUMES); dataset = new FsDatasetImpl(datanode, storage, conf); + for (String bpid : BLOCK_POOL_IDS) { + dataset.addBlockPool(bpid, conf); + } assertEquals(NUM_INIT_VOLUMES, dataset.getVolumes().size()); assertEquals(0, dataset.getNumFailedVolumes()); @@ -89,15 +121,63 @@ public class TestFsDatasetImpl { String path = BASE_DIR + "/newData" + i; newLocations.add(StorageLocation.parse(path)); when(storage.getStorageDir(numExistingVolumes + i)) - .thenReturn(new Storage.StorageDirectory(new File(path))); + .thenReturn(createStorageDirectory(new File(path))); } when(storage.getNumStorageDirs()).thenReturn(totalVolumes); dataset.addVolumes(newLocations); assertEquals(totalVolumes, dataset.getVolumes().size()); + assertEquals(totalVolumes, dataset.storageMap.size()); for (int i = 0; i < numNewVolumes; i++) { assertEquals(newLocations.get(i).getFile().getPath(), dataset.getVolumes().get(numExistingVolumes + i).getBasePath()); } } + + @Test + public void testRemoveVolumes() throws IOException { + // Feed FsDataset with block metadata. + final int NUM_BLOCKS = 100; + for (int i = 0; i < NUM_BLOCKS; i++) { + String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length]; + ExtendedBlock eb = new ExtendedBlock(bpid, i); + dataset.createRbw(StorageType.DEFAULT, eb); + } + final String[] dataDirs = + conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(","); + final String volumePathToRemove = dataDirs[0]; + List volumesToRemove = new ArrayList(); + volumesToRemove.add(StorageLocation.parse(volumePathToRemove)); + + dataset.removeVolumes(volumesToRemove); + int expectedNumVolumes = dataDirs.length - 1; + assertEquals("The volume has been removed from the volumeList.", + expectedNumVolumes, dataset.getVolumes().size()); + assertEquals("The volume has been removed from the storageMap.", + expectedNumVolumes, dataset.storageMap.size()); + + try { + dataset.asyncDiskService.execute(volumesToRemove.get(0).getFile(), + new Runnable() { + @Override + public void run() {} + }); + fail("Expect RuntimeException: the volume has been removed from the " + + "AsyncDiskService."); + } catch (RuntimeException e) { + GenericTestUtils.assertExceptionContains("Cannot find root", e); + } + + int totalNumReplicas = 0; + for (String bpid : dataset.volumeMap.getBlockPoolList()) { + totalNumReplicas += dataset.volumeMap.size(bpid); + } + assertEquals("The replica infos on this volume has been removed from the " + + "volumeMap.", NUM_BLOCKS / NUM_INIT_VOLUMES, + totalNumReplicas); + + // Verify that every BlockPool deletes the removed blocks from the volume. + verify(scanner, times(BLOCK_POOL_IDS.length)) + .deleteBlocks(anyString(), any(Block[].class)); + } }