From 1ebcc378af644fc35e61ef91b0f2bcd35fcc8819 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Mon, 26 Mar 2012 21:01:47 +0000 Subject: [PATCH] HDFS-3089. Move FSDatasetInterface and the related classes to a package. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1305590 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +- .../datanode/BlockPoolSliceScanner.java | 7 +- .../server/datanode/DataBlockScanner.java | 5 +- .../hadoop/hdfs/server/datanode/DataNode.java | 11 +- .../server/datanode/DirectoryScanner.java | 7 +- .../hdfs/server/datanode/FSDataset.java | 84 ++-- .../datanode/ReplicaInPipelineInterface.java | 2 +- .../FsDatasetSpi.java} | 376 ++++++++++++++++++ .../datanode/{ => fsdataset}/RollingLogs.java | 67 ++++ .../RoundRobinVolumeChoosingPolicy.java} | 66 +++ .../VolumeChoosingPolicy.java} | 43 ++ .../src/main/resources/hdfs-default.xml | 9 - .../apache/hadoop/hdfs/DataNodeCluster.java | 7 +- .../apache/hadoop/hdfs/MiniDFSCluster.java | 35 +- .../apache/hadoop/hdfs/TestFileCreation.java | 4 +- .../server/datanode/DataNodeTestUtils.java | 3 +- .../server/datanode/SimulatedFSDataset.java | 52 +-- .../server/datanode/TestBPOfferService.java | 3 +- .../datanode/TestSimulatedFSDataset.java | 5 +- .../TestRoundRobinVolumeChoosingPolicy.java} | 96 +++++ 21 files changed, 780 insertions(+), 109 deletions(-) rename hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/{FSDatasetInterface.java => fsdataset/FsDatasetSpi.java} (50%) rename hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/{ => fsdataset}/RollingLogs.java (51%) rename hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/{RoundRobinVolumesPolicy.java => fsdataset/RoundRobinVolumeChoosingPolicy.java} (50%) rename hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/{BlockVolumeChoosingPolicy.java => fsdataset/VolumeChoosingPolicy.java} (55%) rename hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/{TestRoundRobinVolumesPolicy.java => fsdataset/TestRoundRobinVolumeChoosingPolicy.java} (50%) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7026879d0d..70c9089d68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -263,6 +263,9 @@ Release 0.23.3 - UNRELEASED HDFS-3071. haadmin failover command does not provide enough detail when target NN is not ready to be active. (todd) + HDFS-3089. Move FSDatasetInterface and the related classes to a package. + (szetszwo) + OPTIMIZATIONS HDFS-3024. Improve performance of stringification in addStoredBlock (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index db38bdf122..aad8c8fd56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -237,9 +237,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0; public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed"; public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true; - public static final String DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY = "dfs.datanode.block.volume.choice.policy"; - public static final String DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY_DEFAULT = - "org.apache.hadoop.hdfs.server.datanode.RoundRobinVolumesPolicy"; public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval"; public static final long DFS_HEARTBEAT_INTERVAL_DEFAULT = 3; public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval"; @@ -305,6 +302,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { //Keys with no defaults public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins"; public static final String DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory"; + public static final String DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY = "dfs.datanode.fsdataset.volume.choosing.policy"; public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout"; public static final String DFS_DATANODE_STARTUP_KEY = "dfs.datanode.startup"; public static final String DFS_NAMENODE_PLUGINS_KEY = "dfs.namenode.plugins"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java index fdc78c9b9a..4a6ccfe497 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java @@ -44,7 +44,9 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.GenerationStamp; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; @@ -72,7 +74,7 @@ class BlockPoolSliceScanner { private final AtomicLong lastScanTime = new AtomicLong(); private final DataNode datanode; - private final FSDatasetInterface dataset; + private final FsDatasetSpi dataset; private final SortedSet blockInfoSet = new TreeSet(); @@ -134,8 +136,7 @@ public int compareTo(BlockScanInfo other) { } BlockPoolSliceScanner(String bpid, DataNode datanode, - FSDatasetInterface dataset, - Configuration conf) { + FsDatasetSpi dataset, Configuration conf) { this.datanode = datanode; this.dataset = dataset; this.blockPoolId = bpid; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java index 63b2464e72..72aae099dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; /** @@ -43,7 +44,7 @@ public class DataBlockScanner implements Runnable { public static final Log LOG = LogFactory.getLog(DataBlockScanner.class); private final DataNode datanode; - private final FSDatasetInterface dataset; + private final FsDatasetSpi dataset; private final Configuration conf; /** @@ -55,7 +56,7 @@ public class DataBlockScanner implements Runnable { Thread blockScannerThread = null; DataBlockScanner(DataNode datanode, - FSDatasetInterface dataset, + FsDatasetSpi dataset, Configuration conf) { this.datanode = datanode; this.dataset = dataset; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 9de306e178..f7dd2a5ac3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -122,6 +122,7 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; @@ -231,7 +232,7 @@ public static InetSocketAddress createSocketAddr(String target) { volatile boolean shouldRun = true; private BlockPoolManager blockPoolManager; - volatile FSDatasetInterface data = null; + volatile FsDatasetSpi data = null; private String clusterId = null; public final static String EMPTY_DEL_HINT = ""; @@ -809,8 +810,8 @@ int getBpOsCount() { * handshake with the the first namenode is completed. */ private void initStorage(final NamespaceInfo nsInfo) throws IOException { - final FSDatasetInterface.Factory> factory - = FSDatasetInterface.Factory.getFactory(conf); + final FsDatasetSpi.Factory> factory + = FsDatasetSpi.Factory.getFactory(conf); if (!factory.isSimulated()) { final StartupOption startOpt = getStartupOption(conf); @@ -828,7 +829,7 @@ private void initStorage(final NamespaceInfo nsInfo) throws IOException { synchronized(this) { if (data == null) { - data = factory.createFSDatasetInterface(this, storage, conf); + data = factory.newInstance(this, storage, conf); } } } @@ -1695,7 +1696,7 @@ public void scheduleAllBlockReport(long delay) { * * @return the fsdataset that stores the blocks */ - FSDatasetInterface getFSDataset() { + FsDatasetSpi getFSDataset() { return data; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 91ce4092d7..eac68d970b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.GenerationStamp; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.util.Daemon; @@ -55,7 +56,7 @@ public class DirectoryScanner implements Runnable { private static final Log LOG = LogFactory.getLog(DirectoryScanner.class); private final DataNode datanode; - private final FSDatasetInterface dataset; + private final FsDatasetSpi dataset; private final ExecutorService reportCompileThreadPool; private final ScheduledExecutorService masterThread; private final long scanPeriodMsecs; @@ -219,7 +220,7 @@ public long getGenStamp() { } } - DirectoryScanner(DataNode dn, FSDatasetInterface dataset, Configuration conf) { + DirectoryScanner(DataNode dn, FsDatasetSpi dataset, Configuration conf) { this.datanode = dn; this.dataset = dataset; int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, @@ -411,7 +412,7 @@ private void addDifference(LinkedList diffRecord, } /** Is the given volume still valid in the dataset? */ - private static boolean isValid(final FSDatasetInterface dataset, + private static boolean isValid(final FsDatasetSpi dataset, final FsVolumeSpi volume) { for (FsVolumeSpi vol : dataset.getVolumes()) { if (vol == volume) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java index 10d8cbbc7c..3a4a4b612e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java @@ -61,10 +61,14 @@ import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; @@ -82,13 +86,13 @@ * ***************************************************/ @InterfaceAudience.Private -class FSDataset implements FSDatasetInterface { +public class FSDataset implements FsDatasetSpi { /** * A factory for creating FSDataset objects. */ - static class Factory extends FSDatasetInterface.Factory { + public static class Factory extends FsDatasetSpi.Factory { @Override - public FSDataset createFSDatasetInterface(DataNode datanode, + public FSDataset newInstance(DataNode datanode, DataStorage storage, Configuration conf) throws IOException { return new FSDataset(datanode, storage, conf); } @@ -823,11 +827,11 @@ static class FSVolumeSet { */ private volatile List volumes = null; - BlockVolumeChoosingPolicy blockChooser; + final VolumeChoosingPolicy blockChooser; int numFailedVolumes; FSVolumeSet(List volumes, int failedVols, - BlockVolumeChoosingPolicy blockChooser) { + VolumeChoosingPolicy blockChooser) { this.volumes = Collections.unmodifiableList(volumes); this.blockChooser = blockChooser; this.numFailedVolumes = failedVols; @@ -1018,7 +1022,7 @@ private static long parseGenerationStamp(File blockFile, File metaFile } } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public List getVolumes() { return volumes.volumes; } @@ -1029,7 +1033,7 @@ public synchronized FSVolume getVolume(final ExtendedBlock b) { return r != null? (FSVolume)r.getVolume(): null; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized Block getStoredBlock(String bpid, long blkid) throws IOException { File blockfile = getFile(bpid, blkid); @@ -1066,7 +1070,7 @@ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { return null; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public LengthInputStream getMetaDataInputStream(ExtendedBlock b) throws IOException { final File meta = getMetaFile(b); @@ -1125,11 +1129,11 @@ private FSDataset(DataNode datanode, DataStorage storage, Configuration conf volumeMap = new ReplicasMap(this); @SuppressWarnings("unchecked") - final BlockVolumeChoosingPolicy blockChooserImpl = + final VolumeChoosingPolicy blockChooserImpl = ReflectionUtils.newInstance(conf.getClass( - DFSConfigKeys.DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY, - RoundRobinVolumesPolicy.class, - BlockVolumeChoosingPolicy.class), conf); + DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, + RoundRobinVolumeChoosingPolicy.class, + VolumeChoosingPolicy.class), conf); volumes = new FSVolumeSet(volArray, volsFailed, blockChooserImpl); volumes.getVolumeMap(volumeMap); @@ -1164,7 +1168,7 @@ public long getBlockPoolUsed(String bpid) throws IOException { /** * Return true - if there are still valid volumes on the DataNode. */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public boolean hasEnoughResource() { return getVolumes().size() >= validVolsRequired; } @@ -1199,7 +1203,7 @@ public int getNumFailedVolumes() { /** * Find the block's on-disk length */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public long getLength(ExtendedBlock b) throws IOException { return getBlockFile(b).length(); } @@ -1243,7 +1247,7 @@ private File getBlockFileNoExistsCheck(ExtendedBlock b) return f; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) throws IOException { File blockFile = getBlockFileNoExistsCheck(b); @@ -1301,7 +1305,7 @@ private ReplicaInfo getReplicaInfo(String bpid, long blkid) /** * Returns handles to the block file and its metadata file */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long ckoff) throws IOException { ReplicaInfo info = getReplicaInfo(b); @@ -1406,7 +1410,7 @@ static private void truncateBlock(File blockFile, File metaFile, } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { // If the block was successfully finalized because all packets @@ -1547,7 +1551,7 @@ private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS, return replicaInfo; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { DataNode.LOG.info("Recover failed append to " + b); @@ -1564,7 +1568,7 @@ public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, } } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { DataNode.LOG.info("Recover failed close " + b); @@ -1606,7 +1610,7 @@ private void bumpReplicaGS(ReplicaInfo replicaInfo, } } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), @@ -1626,7 +1630,7 @@ public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) return newReplicaInfo; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { @@ -1671,7 +1675,7 @@ public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, return rbw; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface convertTemporaryToRbw( final ExtendedBlock b) throws IOException { final long blockId = b.getBlockId(); @@ -1732,7 +1736,7 @@ public synchronized ReplicaInPipelineInterface convertTemporaryToRbw( return rbw; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); @@ -1756,7 +1760,7 @@ public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b) * Sets the offset in the meta file so that the * last checksum will be overwritten. */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams, int checksumSize) throws IOException { FileOutputStream file = (FileOutputStream) streams.getChecksumOut(); @@ -1781,7 +1785,7 @@ public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams strea /** * Complete the block write! */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = getReplicaInfo(b); if (replicaInfo.getState() == ReplicaState.FINALIZED) { @@ -1818,7 +1822,7 @@ private synchronized FinalizedReplica finalizeReplica(String bpid, /** * Remove the temporary block file (if any) */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); @@ -1863,7 +1867,7 @@ private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) { /** * Generates a block report from the in-memory block map. */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public BlockListAsLongs getBlockReport(String bpid) { int size = volumeMap.size(bpid); ArrayList finalized = new ArrayList(size); @@ -1914,7 +1918,7 @@ public synchronized List getFinalizedBlocks(String bpid) { * Check whether the given block is a valid one. * valid means finalized */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public boolean isValidBlock(ExtendedBlock b) { return isValid(b, ReplicaState.FINALIZED); } @@ -1922,7 +1926,7 @@ public boolean isValidBlock(ExtendedBlock b) { /** * Check whether the given block is a valid RBW. */ - @Override // {@link FSDatasetInterface} + @Override // {@link FsDatasetSpi} public boolean isValidRbw(final ExtendedBlock b) { return isValid(b, ReplicaState.RBW); } @@ -1987,7 +1991,7 @@ static void checkReplicaFiles(final ReplicaInfo r) throws IOException { * could lazily garbage-collect the block, but why bother? * just get rid of it. */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void invalidate(String bpid, Block invalidBlks[]) throws IOException { boolean error = false; for (int i = 0; i < invalidBlks.length; i++) { @@ -2053,7 +2057,7 @@ public void notifyNamenodeDeletedBlock(ExtendedBlock block){ datanode.notifyNamenodeDeletedBlock(block); } - @Override // {@link FSDatasetInterface} + @Override // {@link FsDatasetSpi} public synchronized boolean contains(final ExtendedBlock block) { final long blockId = block.getLocalBlock().getBlockId(); return getFile(block.getBlockPoolId(), blockId) != null; @@ -2078,7 +2082,7 @@ File getFile(final String bpid, final long blockId) { * to these volumes * @throws DiskErrorException */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void checkDataDir() throws DiskErrorException { long totalBlocks=0, removedBlocks=0; List failedVols = volumes.checkDirs(); @@ -2122,7 +2126,7 @@ public void checkDataDir() throws DiskErrorException { } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public String toString() { return "FSDataset{dirpath='"+volumes+"'}"; } @@ -2153,7 +2157,7 @@ void registerMBean(final String storageId) { DataNode.LOG.info("Registered FSDatasetState MBean"); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void shutdown() { if (mbeanName != null) MBeans.unregister(mbeanName); @@ -2334,7 +2338,7 @@ public void checkAndUpdate(String bpid, long blockId, File diskFile, /** * @deprecated use {@link #fetchReplicaInfo(String, long)} instead. */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi @Deprecated public ReplicaInfo getReplica(String bpid, long blockId) { return volumeMap.get(bpid, blockId); @@ -2346,7 +2350,7 @@ public synchronized String getReplicaString(String bpid, long blockId) { return r == null? "null": r.toString(); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaRecoveryInfo initReplicaRecovery( RecoveringBlock rBlock) throws IOException { return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), @@ -2419,7 +2423,7 @@ static ReplicaRecoveryInfo initReplicaRecovery(String bpid, return rur.createInfo(); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized String updateReplicaUnderRecovery( final ExtendedBlock oldBlock, final long recoveryId, @@ -2501,7 +2505,7 @@ private FinalizedReplica updateReplicaUnderRecovery( return finalizeReplica(bpid, rur); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { final Replica replica = getReplicaInfo(block.getBlockPoolId(), @@ -2584,7 +2588,7 @@ public Map getVolumeInfoMap() { return info; } - @Override //FSDatasetInterface + @Override //FsDatasetSpi public synchronized void deleteBlockPool(String bpid, boolean force) throws IOException { if (!force) { @@ -2602,7 +2606,7 @@ public synchronized void deleteBlockPool(String bpid, boolean force) } } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) throws IOException { File datafile = getBlockFile(block); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java index 235b4f6c42..acc3113af5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java @@ -25,7 +25,7 @@ /** * This defines the interface of a replica in Pipeline that's being written to */ -interface ReplicaInPipelineInterface extends Replica { +public interface ReplicaInPipelineInterface extends Replica { /** * Set the number of bytes received * @param bytesReceived number of bytes received diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java similarity index 50% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index d28e359607..1e9cf02a3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -1,3 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset; + + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.FSDataset; +import org.apache.hadoop.hdfs.server.datanode.Replica; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; +import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * This is a service provider interface for the underlying storage that + * stores replicas for a data node. + * The default implementation stores replicas on local drives. + */ +@InterfaceAudience.Private +public interface FsDatasetSpi extends FSDatasetMBean { + /** + * A factory for creating {@link FsDatasetSpi} objects. + */ + public static abstract class Factory> { + /** @return the configured factory. */ + public static Factory getFactory(Configuration conf) { + @SuppressWarnings("rawtypes") + final Class clazz = conf.getClass( + DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY, + FSDataset.Factory.class, + Factory.class); + return ReflectionUtils.newInstance(clazz, conf); + } + + /** Create a new object. */ + public abstract D newInstance(DataNode datanode, DataStorage storage, + Configuration conf) throws IOException; + + /** Does the factory create simulated objects? */ + public boolean isSimulated() { + return false; + } + } + + /** + * Create rolling logs. + * + * @param prefix the prefix of the log names. + * @return rolling logs + */ + public RollingLogs createRollingLogs(String bpid, String prefix + ) throws IOException; + + /** @return a list of volumes. */ + public List getVolumes(); + + /** @return the volume that contains a replica of the block. */ + public V getVolume(ExtendedBlock b); + + /** @return a volume information map (name => info). */ + public Map getVolumeInfoMap(); + + /** @return a list of block pools. */ + public String[] getBlockPoolList(); + + /** @return a list of finalized blocks for the given block pool. */ + public List getFinalizedBlocks(String bpid); + + /** + * Check whether the in-memory block record matches the block on the disk, + * and, in case that they are not matched, update the record or mark it + * as corrupted. + */ + public void checkAndUpdate(String bpid, long blockId, File diskFile, + File diskMetaFile, FsVolumeSpi vol); + + /** + * @param b - the block + * @return a stream if the meta-data of the block exists; + * otherwise, return null. + * @throws IOException + */ + public LengthInputStream getMetaDataInputStream(ExtendedBlock b + ) throws IOException; + + /** + * Returns the specified block's on-disk length (excluding metadata) + * @param b + * @return the specified block's on-disk length (excluding metadta) + * @throws IOException + */ + public long getLength(ExtendedBlock b) throws IOException; + + /** + * Get reference to the replica meta info in the replicasMap. + * To be called from methods that are synchronized on {@link FSDataset} + * @param blockId + * @return replica from the replicas map + */ + @Deprecated + public Replica getReplica(String bpid, long blockId); + + /** + * @return replica meta information + */ + public String getReplicaString(String bpid, long blockId); + + /** + * @return the generation stamp stored with the block. + */ + public Block getStoredBlock(String bpid, long blkid) throws IOException; + + /** + * Returns an input stream at specified offset of the specified block + * @param b + * @param seekOffset + * @return an input stream to read the contents of the specified block, + * starting at the offset + * @throws IOException + */ + public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) + throws IOException; + + /** + * Returns an input stream at specified offset of the specified block + * The block is still in the tmp directory and is not finalized + * @param b + * @param blkoff + * @param ckoff + * @return an input stream to read the contents of the specified block, + * starting at the offset + * @throws IOException + */ + public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, + long ckoff) throws IOException; + + /** + * Creates a temporary replica and returns the meta information of the replica + * + * @param b block + * @return the meta info of the replica which is being written to + * @throws IOException if an error occurs + */ + public ReplicaInPipelineInterface createTemporary(ExtendedBlock b + ) throws IOException; + + /** + * Creates a RBW replica and returns the meta info of the replica + * + * @param b block + * @return the meta info of the replica which is being written to + * @throws IOException if an error occurs + */ + public ReplicaInPipelineInterface createRbw(ExtendedBlock b + ) throws IOException; + + /** + * Recovers a RBW replica and returns the meta info of the replica + * + * @param b block + * @param newGS the new generation stamp for the replica + * @param minBytesRcvd the minimum number of bytes that the replica could have + * @param maxBytesRcvd the maximum number of bytes that the replica could have + * @return the meta info of the replica which is being written to + * @throws IOException if an error occurs + */ + public ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, + long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException; + + /** + * Covert a temporary replica to a RBW. + * @param temporary the temporary replica being converted + * @return the result RBW + */ + public ReplicaInPipelineInterface convertTemporaryToRbw( + ExtendedBlock temporary) throws IOException; + + /** + * Append to a finalized replica and returns the meta info of the replica + * + * @param b block + * @param newGS the new generation stamp for the replica + * @param expectedBlockLen the number of bytes the replica is expected to have + * @return the meata info of the replica which is being written to + * @throws IOException + */ + public ReplicaInPipelineInterface append(ExtendedBlock b, long newGS, + long expectedBlockLen) throws IOException; + + /** + * Recover a failed append to a finalized replica + * and returns the meta info of the replica + * + * @param b block + * @param newGS the new generation stamp for the replica + * @param expectedBlockLen the number of bytes the replica is expected to have + * @return the meta info of the replica which is being written to + * @throws IOException + */ + public ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, long newGS, + long expectedBlockLen) throws IOException; + + /** + * Recover a failed pipeline close + * It bumps the replica's generation stamp and finalize it if RBW replica + * + * @param b block + * @param newGS the new generation stamp for the replica + * @param expectedBlockLen the number of bytes the replica is expected to have + * @throws IOException + */ + public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen + ) throws IOException; + + /** + * Finalizes the block previously opened for writing using writeToBlock. + * The block size is what is in the parameter b and it must match the amount + * of data written + * @param b + * @throws IOException + */ + public void finalizeBlock(ExtendedBlock b) throws IOException; + + /** + * Unfinalizes the block previously opened for writing using writeToBlock. + * The temporary file associated with this block is deleted. + * @param b + * @throws IOException + */ + public void unfinalizeBlock(ExtendedBlock b) throws IOException; + + /** + * Returns the block report - the full list of blocks stored under a + * block pool + * @param bpid Block Pool Id + * @return - the block report - the full list of blocks stored + */ + public BlockListAsLongs getBlockReport(String bpid); + + /** Does the dataset contain the block? */ + public boolean contains(ExtendedBlock block); + + /** + * Is the block valid? + * @param b + * @return - true if the specified block is valid + */ + public boolean isValidBlock(ExtendedBlock b); + + /** + * Is the block a valid RBW? + * @param b + * @return - true if the specified block is a valid RBW + */ + public boolean isValidRbw(ExtendedBlock b); + + /** + * Invalidates the specified blocks + * @param bpid Block pool Id + * @param invalidBlks - the blocks to be invalidated + * @throws IOException + */ + public void invalidate(String bpid, Block invalidBlks[]) throws IOException; + + /** + * Check if all the data directories are healthy + * @throws DiskErrorException + */ + public void checkDataDir() throws DiskErrorException; + + /** + * Shutdown the FSDataset + */ + public void shutdown(); + + /** + * Sets the file pointer of the checksum stream so that the last checksum + * will be overwritten + * @param b block + * @param outs The streams for the data file and checksum file + * @param checksumSize number of bytes each checksum has + * @throws IOException + */ + public void adjustCrcChannelPosition(ExtendedBlock b, + ReplicaOutputStreams outs, int checksumSize) throws IOException; + + /** + * Checks how many valid storage volumes there are in the DataNode. + * @return true if more than the minimum number of valid volumes are left + * in the FSDataSet. + */ + public boolean hasEnoughResource(); + + /** + * Get visible length of the specified replica. + */ + long getReplicaVisibleLength(final ExtendedBlock block) throws IOException; + + /** + * Initialize a replica recovery. + * @return actual state of the replica on this data-node or + * null if data-node does not have the replica. + */ + public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock + ) throws IOException; + + /** + * Update replica's generation stamp and length and finalize it. + * @return the ID of storage that stores the block + */ + public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, + long recoveryId, long newLength) throws IOException; + + /** + * add new block pool ID + * @param bpid Block pool Id + * @param conf Configuration + */ + public void addBlockPool(String bpid, Configuration conf) throws IOException; + + /** + * Shutdown and remove the block pool from underlying storage. + * @param bpid Block pool Id to be removed + */ + public void shutdownBlockPool(String bpid) ; + + /** + * Deletes the block pool directories. If force is false, directories are + * deleted only if no block files exist for the block pool. If force + * is true entire directory for the blockpool is deleted along with its + * contents. + * @param bpid BlockPool Id to be deleted. + * @param force If force is false, directories are deleted only if no + * block files exist for the block pool, otherwise entire + * directory for the blockpool is deleted along with its contents. + * @throws IOException + */ + public void deleteBlockPool(String bpid, boolean force) throws IOException; + + /** + * Get {@link BlockLocalPathInfo} for the given block. + */ + public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b + ) throws IOException; +} /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RollingLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java similarity index 51% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RollingLogs.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java index 002de6ad61..b3b52f2b97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RollingLogs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java @@ -1,3 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; + +/** + * Rolling logs consist of a current log and a set of previous logs. + * + * The implementation should support a single appender and multiple readers. + */ +public interface RollingLogs { + /** + * To iterate the lines of the logs. + */ + public interface LineIterator extends Iterator, Closeable { + /** Is the iterator iterating the previous? */ + public boolean isPrevious(); + } + + /** + * To append text to the logs. + */ + public interface Appender extends Appendable, Closeable { + } + + /** + * Create an iterator to iterate the lines in the logs. + * + * @param skipPrevious Should it skip reading the previous log? + * @return a new iterator. + */ + public LineIterator iterator(boolean skipPrevious) throws IOException; + + /** + * @return the only appender to append text to the logs. + * The same object is returned if it is invoked multiple times. + */ + public Appender appender(); + + /** + * Roll current to previous. + * + * @return true if the rolling succeeded. + * When it returns false, it is not equivalent to an error. + * It means that the rolling cannot be performed at the moment, + * e.g. the logs are being read. + */ + public boolean roll() throws IOException; +} /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java similarity index 50% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java index 228ebc06dc..d30ab8e868 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java @@ -1,3 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; + +/** + * Choose volumes in round-robin order. + */ +public class RoundRobinVolumeChoosingPolicy + implements VolumeChoosingPolicy { + + private int curVolume = 0; + + @Override + public synchronized V chooseVolume(final List volumes, final long blockSize + ) throws IOException { + if(volumes.size() < 1) { + throw new DiskOutOfSpaceException("No more available volumes"); + } + + // since volumes could've been removed because of the failure + // make sure we are not out of bounds + if(curVolume >= volumes.size()) { + curVolume = 0; + } + + int startVolume = curVolume; + long maxAvailable = 0; + + while (true) { + final V volume = volumes.get(curVolume); + curVolume = (curVolume + 1) % volumes.size(); + long availableVolumeSize = volume.getAvailable(); + if (availableVolumeSize > blockSize) { return volume; } + + if (availableVolumeSize > maxAvailable) { + maxAvailable = availableVolumeSize; + } + + if (curVolume == startVolume) { + throw new DiskOutOfSpaceException("Out of space: " + + "The volume with the most available space (=" + maxAvailable + + " B) is less than the block size (=" + blockSize + " B)."); + } + } + } +} /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java similarity index 55% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java index 28c1dc46d7..363cb51c68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java @@ -1,3 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * This interface specifies the policy for choosing volumes to store replicas. + */ +@InterfaceAudience.Private +public interface VolumeChoosingPolicy { + + /** + * Choose a volume to place a replica, + * given a list of volumes and the replica size sought for storage. + * + * The implementations of this interface must be thread-safe. + * + * @param volumes - a list of available volumes. + * @param replicaSize - the size of the replica for which a volume is sought. + * @return the chosen volume. + * @throws IOException when disks are unavailable or are full. + */ + public V chooseVolume(List volumes, long replicaSize) throws IOException; +} /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index df2e251a75..b9391449f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -428,15 +428,6 @@ - - dfs.datanode.block.volume.choice.policy - org.apache.hadoop.hdfs.server.datanode.RoundRobinVolumesPolicy - The policy class to use to determine into which of the - datanode's available volumes a block must be written to. Default is a simple - round-robin policy that chooses volumes in a cyclic order. - - - dfs.heartbeat.interval 3 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java index 25198e3692..88e7bcf8f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java @@ -25,8 +25,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.namenode.CreateEditsLog; import org.apache.hadoop.net.DNS; @@ -125,7 +125,7 @@ public static void main(String[] args) { } else if (args[i].equals("-simulated")) { SimulatedFSDataset.setFactory(conf); } else if (args[i].equals("-inject")) { - if (!FSDatasetInterface.Factory.getFactory(conf).isSimulated()) { + if (!FsDatasetSpi.Factory.getFactory(conf).isSimulated()) { System.out.print("-inject is valid only for simulated"); printUsageExit(); } @@ -157,8 +157,7 @@ public static void main(String[] args) { System.out.println("No name node address and port in config"); System.exit(-1); } - boolean simulated = - FSDatasetInterface.Factory.getFactory(conf).isSimulated(); + boolean simulated = FsDatasetSpi.Factory.getFactory(conf).isSimulated(); System.out.println("Starting " + numDataNodes + (simulated ? " Simulated " : " ") + " Data Nodes that will connect to Name Node at " + nameNodeAdr); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 5de6b9ef6b..6ab3f0ce66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -17,6 +17,29 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HOSTS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI; import java.io.File; @@ -43,9 +66,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; - import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocolHelper; import org.apache.hadoop.ha.ServiceFailedException; @@ -57,21 +77,20 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; +import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataStorage; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.DFSAdmin; -import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.NetUtils; @@ -1802,7 +1821,7 @@ public void injectBlocks(int dataNodeIndex, Iterable blocksToInject) thro throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; - final FSDatasetInterface dataSet = DataNodeTestUtils.getFSDataset(dn); + final FsDatasetSpi dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } @@ -1821,7 +1840,7 @@ public void injectBlocks(int nameNodeIndex, int dataNodeIndex, throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; - final FSDatasetInterface dataSet = DataNodeTestUtils.getFSDataset(dn); + final FsDatasetSpi dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index 35846c5c39..a9000ed5fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -60,8 +60,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.io.IOUtils; @@ -211,7 +211,7 @@ public void testFileCreation() throws IOException { // can't check capacities for real storage since the OS file system may be changing under us. if (simulatedStorage) { DataNode dn = cluster.getDataNodes().get(0); - FSDatasetInterface dataset = DataNodeTestUtils.getFSDataset(dn); + FsDatasetSpi dataset = DataNodeTestUtils.getFSDataset(dn); assertEquals(fileSize, dataset.getDfsUsed()); assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize, dataset.getRemaining()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index c9be8f9524..1a871dd35e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; /** @@ -48,7 +49,7 @@ public class DataNodeTestUtils { * * @return the fsdataset that stores the blocks */ - public static FSDatasetInterface getFSDataset(DataNode dn) { + public static FsDatasetSpi getFSDataset(DataNode dn) { return dn.getFSDataset(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 356b6b10ad..a12ac722a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -39,10 +39,12 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolumeSet; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; @@ -65,10 +67,10 @@ * * Note the synchronization is coarse grained - it is at each method. */ -public class SimulatedFSDataset implements FSDatasetInterface { - static class Factory extends FSDatasetInterface.Factory { +public class SimulatedFSDataset implements FsDatasetSpi { + static class Factory extends FsDatasetSpi.Factory { @Override - public SimulatedFSDataset createFSDatasetInterface(DataNode datanode, + public SimulatedFSDataset newInstance(DataNode datanode, DataStorage storage, Configuration conf) throws IOException { return new SimulatedFSDataset(datanode, storage, conf); } @@ -427,7 +429,7 @@ private Map getMap(String bpid) throws IOException { return map; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { final Map map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); @@ -437,7 +439,7 @@ public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { binfo.finalizeBlock(b.getBlockPoolId(), b.getNumBytes()); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized void unfinalizeBlock(ExtendedBlock b) { if (isValidRbw(b)) { blockMap.remove(b.getLocalBlock()); @@ -483,7 +485,7 @@ public int getNumFailedVolumes() { return storage.getNumFailedVolumes(); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized long getLength(ExtendedBlock b) throws IOException { final Map map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); @@ -513,7 +515,7 @@ public synchronized String getReplicaString(String bpid, long blockId) { return r == null? "null": r.toString(); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public Block getStoredBlock(String bpid, long blkid) throws IOException { final Map map = blockMap.get(bpid); if (map != null) { @@ -526,7 +528,7 @@ public Block getStoredBlock(String bpid, long blkid) throws IOException { return null; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized void invalidate(String bpid, Block[] invalidBlks) throws IOException { boolean error = false; @@ -557,12 +559,12 @@ private BInfo getBInfo(final ExtendedBlock b) { return map == null? null: map.get(b.getLocalBlock()); } - @Override // {@link FSDatasetInterface} + @Override // {@link FsDatasetSpi} public boolean contains(ExtendedBlock block) { return getBInfo(block) != null; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized boolean isValidBlock(ExtendedBlock b) { final BInfo binfo = getBInfo(b); return binfo != null && binfo.isFinalized(); @@ -580,7 +582,7 @@ public String toString() { return getStorageInfo(); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { final Map map = getMap(b.getBlockPoolId()); @@ -593,7 +595,7 @@ public synchronized ReplicaInPipelineInterface append(ExtendedBlock b, return binfo; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { final Map map = getMap(b.getBlockPoolId()); @@ -611,7 +613,7 @@ public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, return binfo; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { final Map map = getMap(b.getBlockPoolId()); @@ -628,7 +630,7 @@ public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) map.put(binfo.theBlock, binfo); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { final Map map = getMap(b.getBlockPoolId()); @@ -647,13 +649,13 @@ public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, return binfo; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) throws IOException { return createTemporary(b); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b) throws IOException { if (isValidBlock(b)) { @@ -681,7 +683,7 @@ synchronized InputStream getBlockInputStream(ExtendedBlock b return binfo.getIStream(); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) throws IOException { InputStream result = getBlockInputStream(b); @@ -690,13 +692,13 @@ public synchronized InputStream getBlockInputStream(ExtendedBlock b, } /** Not supported */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, long ckoff) throws IOException { throw new IOException("Not supported"); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b ) throws IOException { final Map map = getMap(b.getBlockPoolId()); @@ -717,7 +719,7 @@ public void checkDataDir() throws DiskErrorException { // nothing to check for simulated data set } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams stream, int checksumSize) @@ -902,32 +904,32 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) binfo.isFinalized()?ReplicaState.FINALIZED : ReplicaState.RBW); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, long newlength) { return storageId; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public long getReplicaVisibleLength(ExtendedBlock block) { return block.getNumBytes(); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void addBlockPool(String bpid, Configuration conf) { Map map = new HashMap(); blockMap.put(bpid, map); storage.addBlockPool(bpid); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void shutdownBlockPool(String bpid) { blockMap.remove(bpid); storage.removeBlockPool(bpid); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void deleteBlockPool(String bpid, boolean force) { return; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 4332202ed3..f14664fb90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; @@ -77,7 +78,7 @@ public class TestBPOfferService { private NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2]; private int heartbeatCounts[] = new int[2]; private DataNode mockDn; - private FSDatasetInterface mockFSDataset; + private FsDatasetSpi mockFSDataset; @Before public void setupMocks() throws Exception { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index d7c254d0bb..b319dd9046 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.util.DataChecksum; @@ -88,12 +89,12 @@ int addSomeBlocks(SimulatedFSDataset fsdataset ) throws IOException { public void testFSDatasetFactory() { final Configuration conf = new Configuration(); - FSDatasetInterface.Factory f = FSDatasetInterface.Factory.getFactory(conf); + FsDatasetSpi.Factory f = FsDatasetSpi.Factory.getFactory(conf); assertEquals(FSDataset.Factory.class, f.getClass()); assertFalse(f.isSimulated()); SimulatedFSDataset.setFactory(conf); - FSDatasetInterface.Factory s = FSDatasetInterface.Factory.getFactory(conf); + FsDatasetSpi.Factory s = FsDatasetSpi.Factory.getFactory(conf); assertEquals(SimulatedFSDataset.Factory.class, s.getClass()); assertTrue(s.isSimulated()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java similarity index 50% rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java rename to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java index b1de62352d..9ddd0f9198 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java @@ -1,3 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; +import org.apache.hadoop.util.ReflectionUtils; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestRoundRobinVolumeChoosingPolicy { + + // Test the Round-Robin block-volume choosing algorithm. + @Test + public void testRR() throws Exception { + final List volumes = new ArrayList(); + + // First volume, with 100 bytes of space. + volumes.add(Mockito.mock(FsVolumeSpi.class)); + Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L); + + // Second volume, with 200 bytes of space. + volumes.add(Mockito.mock(FsVolumeSpi.class)); + Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L); + + @SuppressWarnings("unchecked") + final RoundRobinVolumeChoosingPolicy policy = + ReflectionUtils.newInstance(RoundRobinVolumeChoosingPolicy.class, null); + + // Test two rounds of round-robin choosing + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0)); + + // The first volume has only 100L space, so the policy should + // wisely choose the second one in case we ask for more. + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150)); + + // Fail if no volume can be chosen? + try { + policy.chooseVolume(volumes, Long.MAX_VALUE); + Assert.fail(); + } catch (IOException e) { + // Passed. + } + } + + // ChooseVolume should throw DiskOutOfSpaceException + // with volume and block sizes in exception message. + @Test + public void testRRPolicyExceptionMessage() throws Exception { + final List volumes = new ArrayList(); + + // First volume, with 500 bytes of space. + volumes.add(Mockito.mock(FsVolumeSpi.class)); + Mockito.when(volumes.get(0).getAvailable()).thenReturn(500L); + + // Second volume, with 600 bytes of space. + volumes.add(Mockito.mock(FsVolumeSpi.class)); + Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L); + + final RoundRobinVolumeChoosingPolicy policy + = new RoundRobinVolumeChoosingPolicy(); + int blockSize = 700; + try { + policy.chooseVolume(volumes, blockSize); + Assert.fail("expected to throw DiskOutOfSpaceException"); + } catch(DiskOutOfSpaceException e) { + Assert.assertEquals("Not returnig the expected message", + "Out of space: The volume with the most available space (=" + 600 + + " B) is less than the block size (=" + blockSize + " B).", + e.getMessage()); + } + } + +} /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file