diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7026879d0d..70c9089d68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -263,6 +263,9 @@ Release 0.23.3 - UNRELEASED HDFS-3071. haadmin failover command does not provide enough detail when target NN is not ready to be active. (todd) + HDFS-3089. Move FSDatasetInterface and the related classes to a package. + (szetszwo) + OPTIMIZATIONS HDFS-3024. Improve performance of stringification in addStoredBlock (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index db38bdf122..aad8c8fd56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -237,9 +237,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0; public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed"; public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true; - public static final String DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY = "dfs.datanode.block.volume.choice.policy"; - public static final String DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY_DEFAULT = - "org.apache.hadoop.hdfs.server.datanode.RoundRobinVolumesPolicy"; public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval"; public static final long DFS_HEARTBEAT_INTERVAL_DEFAULT = 3; public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval"; @@ -305,6 +302,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { //Keys with no defaults public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins"; public static final String DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory"; + public static final String DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY = "dfs.datanode.fsdataset.volume.choosing.policy"; public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout"; public static final String DFS_DATANODE_STARTUP_KEY = "dfs.datanode.startup"; public static final String DFS_NAMENODE_PLUGINS_KEY = "dfs.namenode.plugins"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java index fdc78c9b9a..4a6ccfe497 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java @@ -44,7 +44,9 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.GenerationStamp; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; @@ -72,7 +74,7 @@ class BlockPoolSliceScanner { private final AtomicLong lastScanTime = new AtomicLong(); private final DataNode datanode; - private final FSDatasetInterface dataset; + private final FsDatasetSpi dataset; private final SortedSet blockInfoSet = new TreeSet(); @@ -134,8 +136,7 @@ public int compareTo(BlockScanInfo other) { } BlockPoolSliceScanner(String bpid, DataNode datanode, - FSDatasetInterface dataset, - Configuration conf) { + FsDatasetSpi dataset, Configuration conf) { this.datanode = datanode; this.dataset = dataset; this.blockPoolId = bpid; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java index 63b2464e72..72aae099dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; /** @@ -43,7 +44,7 @@ public class DataBlockScanner implements Runnable { public static final Log LOG = LogFactory.getLog(DataBlockScanner.class); private final DataNode datanode; - private final FSDatasetInterface dataset; + private final FsDatasetSpi dataset; private final Configuration conf; /** @@ -55,7 +56,7 @@ public class DataBlockScanner implements Runnable { Thread blockScannerThread = null; DataBlockScanner(DataNode datanode, - FSDatasetInterface dataset, + FsDatasetSpi dataset, Configuration conf) { this.datanode = datanode; this.dataset = dataset; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 9de306e178..f7dd2a5ac3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -122,6 +122,7 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; @@ -231,7 +232,7 @@ public static InetSocketAddress createSocketAddr(String target) { volatile boolean shouldRun = true; private BlockPoolManager blockPoolManager; - volatile FSDatasetInterface data = null; + volatile FsDatasetSpi data = null; private String clusterId = null; public final static String EMPTY_DEL_HINT = ""; @@ -809,8 +810,8 @@ int getBpOsCount() { * handshake with the the first namenode is completed. */ private void initStorage(final NamespaceInfo nsInfo) throws IOException { - final FSDatasetInterface.Factory> factory - = FSDatasetInterface.Factory.getFactory(conf); + final FsDatasetSpi.Factory> factory + = FsDatasetSpi.Factory.getFactory(conf); if (!factory.isSimulated()) { final StartupOption startOpt = getStartupOption(conf); @@ -828,7 +829,7 @@ private void initStorage(final NamespaceInfo nsInfo) throws IOException { synchronized(this) { if (data == null) { - data = factory.createFSDatasetInterface(this, storage, conf); + data = factory.newInstance(this, storage, conf); } } } @@ -1695,7 +1696,7 @@ public void scheduleAllBlockReport(long delay) { * * @return the fsdataset that stores the blocks */ - FSDatasetInterface getFSDataset() { + FsDatasetSpi getFSDataset() { return data; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 91ce4092d7..eac68d970b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.GenerationStamp; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.util.Daemon; @@ -55,7 +56,7 @@ public class DirectoryScanner implements Runnable { private static final Log LOG = LogFactory.getLog(DirectoryScanner.class); private final DataNode datanode; - private final FSDatasetInterface dataset; + private final FsDatasetSpi dataset; private final ExecutorService reportCompileThreadPool; private final ScheduledExecutorService masterThread; private final long scanPeriodMsecs; @@ -219,7 +220,7 @@ public long getGenStamp() { } } - DirectoryScanner(DataNode dn, FSDatasetInterface dataset, Configuration conf) { + DirectoryScanner(DataNode dn, FsDatasetSpi dataset, Configuration conf) { this.datanode = dn; this.dataset = dataset; int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, @@ -411,7 +412,7 @@ private void addDifference(LinkedList diffRecord, } /** Is the given volume still valid in the dataset? */ - private static boolean isValid(final FSDatasetInterface dataset, + private static boolean isValid(final FsDatasetSpi dataset, final FsVolumeSpi volume) { for (FsVolumeSpi vol : dataset.getVolumes()) { if (vol == volume) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java index 10d8cbbc7c..3a4a4b612e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java @@ -61,10 +61,14 @@ import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; @@ -82,13 +86,13 @@ * ***************************************************/ @InterfaceAudience.Private -class FSDataset implements FSDatasetInterface { +public class FSDataset implements FsDatasetSpi { /** * A factory for creating FSDataset objects. */ - static class Factory extends FSDatasetInterface.Factory { + public static class Factory extends FsDatasetSpi.Factory { @Override - public FSDataset createFSDatasetInterface(DataNode datanode, + public FSDataset newInstance(DataNode datanode, DataStorage storage, Configuration conf) throws IOException { return new FSDataset(datanode, storage, conf); } @@ -823,11 +827,11 @@ static class FSVolumeSet { */ private volatile List volumes = null; - BlockVolumeChoosingPolicy blockChooser; + final VolumeChoosingPolicy blockChooser; int numFailedVolumes; FSVolumeSet(List volumes, int failedVols, - BlockVolumeChoosingPolicy blockChooser) { + VolumeChoosingPolicy blockChooser) { this.volumes = Collections.unmodifiableList(volumes); this.blockChooser = blockChooser; this.numFailedVolumes = failedVols; @@ -1018,7 +1022,7 @@ private static long parseGenerationStamp(File blockFile, File metaFile } } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public List getVolumes() { return volumes.volumes; } @@ -1029,7 +1033,7 @@ public synchronized FSVolume getVolume(final ExtendedBlock b) { return r != null? (FSVolume)r.getVolume(): null; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized Block getStoredBlock(String bpid, long blkid) throws IOException { File blockfile = getFile(bpid, blkid); @@ -1066,7 +1070,7 @@ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { return null; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public LengthInputStream getMetaDataInputStream(ExtendedBlock b) throws IOException { final File meta = getMetaFile(b); @@ -1125,11 +1129,11 @@ private FSDataset(DataNode datanode, DataStorage storage, Configuration conf volumeMap = new ReplicasMap(this); @SuppressWarnings("unchecked") - final BlockVolumeChoosingPolicy blockChooserImpl = + final VolumeChoosingPolicy blockChooserImpl = ReflectionUtils.newInstance(conf.getClass( - DFSConfigKeys.DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY, - RoundRobinVolumesPolicy.class, - BlockVolumeChoosingPolicy.class), conf); + DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, + RoundRobinVolumeChoosingPolicy.class, + VolumeChoosingPolicy.class), conf); volumes = new FSVolumeSet(volArray, volsFailed, blockChooserImpl); volumes.getVolumeMap(volumeMap); @@ -1164,7 +1168,7 @@ public long getBlockPoolUsed(String bpid) throws IOException { /** * Return true - if there are still valid volumes on the DataNode. */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public boolean hasEnoughResource() { return getVolumes().size() >= validVolsRequired; } @@ -1199,7 +1203,7 @@ public int getNumFailedVolumes() { /** * Find the block's on-disk length */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public long getLength(ExtendedBlock b) throws IOException { return getBlockFile(b).length(); } @@ -1243,7 +1247,7 @@ private File getBlockFileNoExistsCheck(ExtendedBlock b) return f; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) throws IOException { File blockFile = getBlockFileNoExistsCheck(b); @@ -1301,7 +1305,7 @@ private ReplicaInfo getReplicaInfo(String bpid, long blkid) /** * Returns handles to the block file and its metadata file */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long ckoff) throws IOException { ReplicaInfo info = getReplicaInfo(b); @@ -1406,7 +1410,7 @@ static private void truncateBlock(File blockFile, File metaFile, } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { // If the block was successfully finalized because all packets @@ -1547,7 +1551,7 @@ private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS, return replicaInfo; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { DataNode.LOG.info("Recover failed append to " + b); @@ -1564,7 +1568,7 @@ public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, } } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { DataNode.LOG.info("Recover failed close " + b); @@ -1606,7 +1610,7 @@ private void bumpReplicaGS(ReplicaInfo replicaInfo, } } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), @@ -1626,7 +1630,7 @@ public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) return newReplicaInfo; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { @@ -1671,7 +1675,7 @@ public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, return rbw; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface convertTemporaryToRbw( final ExtendedBlock b) throws IOException { final long blockId = b.getBlockId(); @@ -1732,7 +1736,7 @@ public synchronized ReplicaInPipelineInterface convertTemporaryToRbw( return rbw; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); @@ -1756,7 +1760,7 @@ public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b) * Sets the offset in the meta file so that the * last checksum will be overwritten. */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams, int checksumSize) throws IOException { FileOutputStream file = (FileOutputStream) streams.getChecksumOut(); @@ -1781,7 +1785,7 @@ public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams strea /** * Complete the block write! */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = getReplicaInfo(b); if (replicaInfo.getState() == ReplicaState.FINALIZED) { @@ -1818,7 +1822,7 @@ private synchronized FinalizedReplica finalizeReplica(String bpid, /** * Remove the temporary block file (if any) */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); @@ -1863,7 +1867,7 @@ private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) { /** * Generates a block report from the in-memory block map. */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public BlockListAsLongs getBlockReport(String bpid) { int size = volumeMap.size(bpid); ArrayList finalized = new ArrayList(size); @@ -1914,7 +1918,7 @@ public synchronized List getFinalizedBlocks(String bpid) { * Check whether the given block is a valid one. * valid means finalized */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public boolean isValidBlock(ExtendedBlock b) { return isValid(b, ReplicaState.FINALIZED); } @@ -1922,7 +1926,7 @@ public boolean isValidBlock(ExtendedBlock b) { /** * Check whether the given block is a valid RBW. */ - @Override // {@link FSDatasetInterface} + @Override // {@link FsDatasetSpi} public boolean isValidRbw(final ExtendedBlock b) { return isValid(b, ReplicaState.RBW); } @@ -1987,7 +1991,7 @@ static void checkReplicaFiles(final ReplicaInfo r) throws IOException { * could lazily garbage-collect the block, but why bother? * just get rid of it. */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void invalidate(String bpid, Block invalidBlks[]) throws IOException { boolean error = false; for (int i = 0; i < invalidBlks.length; i++) { @@ -2053,7 +2057,7 @@ public void notifyNamenodeDeletedBlock(ExtendedBlock block){ datanode.notifyNamenodeDeletedBlock(block); } - @Override // {@link FSDatasetInterface} + @Override // {@link FsDatasetSpi} public synchronized boolean contains(final ExtendedBlock block) { final long blockId = block.getLocalBlock().getBlockId(); return getFile(block.getBlockPoolId(), blockId) != null; @@ -2078,7 +2082,7 @@ File getFile(final String bpid, final long blockId) { * to these volumes * @throws DiskErrorException */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void checkDataDir() throws DiskErrorException { long totalBlocks=0, removedBlocks=0; List failedVols = volumes.checkDirs(); @@ -2122,7 +2126,7 @@ public void checkDataDir() throws DiskErrorException { } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public String toString() { return "FSDataset{dirpath='"+volumes+"'}"; } @@ -2153,7 +2157,7 @@ void registerMBean(final String storageId) { DataNode.LOG.info("Registered FSDatasetState MBean"); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void shutdown() { if (mbeanName != null) MBeans.unregister(mbeanName); @@ -2334,7 +2338,7 @@ public void checkAndUpdate(String bpid, long blockId, File diskFile, /** * @deprecated use {@link #fetchReplicaInfo(String, long)} instead. */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi @Deprecated public ReplicaInfo getReplica(String bpid, long blockId) { return volumeMap.get(bpid, blockId); @@ -2346,7 +2350,7 @@ public synchronized String getReplicaString(String bpid, long blockId) { return r == null? "null": r.toString(); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaRecoveryInfo initReplicaRecovery( RecoveringBlock rBlock) throws IOException { return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), @@ -2419,7 +2423,7 @@ static ReplicaRecoveryInfo initReplicaRecovery(String bpid, return rur.createInfo(); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized String updateReplicaUnderRecovery( final ExtendedBlock oldBlock, final long recoveryId, @@ -2501,7 +2505,7 @@ private FinalizedReplica updateReplicaUnderRecovery( return finalizeReplica(bpid, rur); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { final Replica replica = getReplicaInfo(block.getBlockPoolId(), @@ -2584,7 +2588,7 @@ public Map getVolumeInfoMap() { return info; } - @Override //FSDatasetInterface + @Override //FsDatasetSpi public synchronized void deleteBlockPool(String bpid, boolean force) throws IOException { if (!force) { @@ -2602,7 +2606,7 @@ public synchronized void deleteBlockPool(String bpid, boolean force) } } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) throws IOException { File datafile = getBlockFile(block); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java index 235b4f6c42..acc3113af5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java @@ -25,7 +25,7 @@ /** * This defines the interface of a replica in Pipeline that's being written to */ -interface ReplicaInPipelineInterface extends Replica { +public interface ReplicaInPipelineInterface extends Replica { /** * Set the number of bytes received * @param bytesReceived number of bytes received diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java similarity index 86% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index d28e359607..0e3242e49b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.datanode; +package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.File; @@ -31,10 +31,11 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.FSDataset; +import org.apache.hadoop.hdfs.server.datanode.Replica; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; @@ -42,19 +43,16 @@ import org.apache.hadoop.util.ReflectionUtils; /** - * This is an interface for the underlying storage that stores blocks for - * a data node. - * Examples are the FSDataset (which stores blocks on dirs) and - * SimulatedFSDataset (which simulates data). - * + * This is a service provider interface for the underlying storage that + * stores replicas for a data node. + * The default implementation stores replicas on local drives. */ @InterfaceAudience.Private -public interface FSDatasetInterface - extends FSDatasetMBean { +public interface FsDatasetSpi extends FSDatasetMBean { /** - * A factory for creating FSDatasetInterface objects. + * A factory for creating {@link FsDatasetSpi} objects. */ - public abstract class Factory> { + public static abstract class Factory> { /** @return the configured factory. */ public static Factory getFactory(Configuration conf) { @SuppressWarnings("rawtypes") @@ -65,10 +63,9 @@ public static Factory getFactory(Configuration conf) { return ReflectionUtils.newInstance(clazz, conf); } - /** Create a FSDatasetInterface object. */ - public abstract D createFSDatasetInterface( - DataNode datanode, DataStorage storage, Configuration conf - ) throws IOException; + /** Create a new object. */ + public abstract D newInstance(DataNode datanode, DataStorage storage, + Configuration conf) throws IOException; /** Does the factory create simulated objects? */ public boolean isSimulated() { @@ -82,7 +79,8 @@ public boolean isSimulated() { * @param prefix the prefix of the log names. * @return rolling logs */ - public RollingLogs createRollingLogs(String bpid, String prefix) throws IOException; + public RollingLogs createRollingLogs(String bpid, String prefix + ) throws IOException; /** @return a list of volumes. */ public List getVolumes(); @@ -167,15 +165,15 @@ public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, long ckoff) throws IOException; - /** + /** * Creates a temporary replica and returns the meta information of the replica * * @param b block * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - public ReplicaInPipelineInterface createTemporary(ExtendedBlock b) - throws IOException; + public ReplicaInPipelineInterface createTemporary(ExtendedBlock b + ) throws IOException; /** * Creates a RBW replica and returns the meta info of the replica @@ -184,7 +182,8 @@ public ReplicaInPipelineInterface createTemporary(ExtendedBlock b) * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - public ReplicaInPipelineInterface createRbw(ExtendedBlock b) throws IOException; + public ReplicaInPipelineInterface createRbw(ExtendedBlock b + ) throws IOException; /** * Recovers a RBW replica and returns the meta info of the replica @@ -197,8 +196,7 @@ public ReplicaInPipelineInterface createTemporary(ExtendedBlock b) * @throws IOException if an error occurs */ public ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, - long newGS, long minBytesRcvd, long maxBytesRcvd) - throws IOException; + long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException; /** * Covert a temporary replica to a RBW. @@ -217,8 +215,8 @@ public ReplicaInPipelineInterface convertTemporaryToRbw( * @return the meata info of the replica which is being written to * @throws IOException */ - public ReplicaInPipelineInterface append(ExtendedBlock b, - long newGS, long expectedBlockLen) throws IOException; + public ReplicaInPipelineInterface append(ExtendedBlock b, long newGS, + long expectedBlockLen) throws IOException; /** * Recover a failed append to a finalized replica @@ -230,8 +228,8 @@ public ReplicaInPipelineInterface append(ExtendedBlock b, * @return the meta info of the replica which is being written to * @throws IOException */ - public ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, - long newGS, long expectedBlockLen) throws IOException; + public ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, long newGS, + long expectedBlockLen) throws IOException; /** * Recover a failed pipeline close @@ -242,8 +240,8 @@ public ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, * @param expectedBlockLen the number of bytes the replica is expected to have * @throws IOException */ - public void recoverClose(ExtendedBlock b, - long newGS, long expectedBlockLen) throws IOException; + public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen + ) throws IOException; /** * Finalizes the block previously opened for writing using writeToBlock. @@ -300,7 +298,7 @@ public void recoverClose(ExtendedBlock b, * @throws DiskErrorException */ public void checkDataDir() throws DiskErrorException; - + /** * Shutdown the FSDataset */ @@ -310,12 +308,12 @@ public void recoverClose(ExtendedBlock b, * Sets the file pointer of the checksum stream so that the last checksum * will be overwritten * @param b block - * @param stream The stream for the data file and checksum file + * @param outs The streams for the data file and checksum file * @param checksumSize number of bytes each checksum has * @throws IOException */ - public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams stream, - int checksumSize) throws IOException; + public void adjustCrcChannelPosition(ExtendedBlock b, + ReplicaOutputStreams outs, int checksumSize) throws IOException; /** * Checks how many valid storage volumes there are in the DataNode. @@ -334,8 +332,8 @@ public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams strea * @return actual state of the replica on this data-node or * null if data-node does not have the replica. */ - public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) - throws IOException; + public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock + ) throws IOException; /** * Update replica's generation stamp and length and finalize it. @@ -372,6 +370,7 @@ public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, /** * Get {@link BlockLocalPathInfo} for the given block. - **/ - public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) throws IOException; + */ + public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b + ) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RollingLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java similarity index 88% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RollingLogs.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java index 002de6ad61..b34bfa5c94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RollingLogs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java @@ -15,16 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.datanode; +package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; import java.io.IOException; import java.util.Iterator; /** - * Rolling logs consist of a current log and a previous log. - * When the roll operation is invoked, current is rolled to previous - * and previous is deleted. + * Rolling logs consist of a current log and a set of previous logs. + * * The implementation should support a single appender and multiple readers. */ public interface RollingLogs { @@ -57,7 +56,7 @@ public interface Appender extends Appendable, Closeable { public Appender appender(); /** - * Roll current to previous and delete the previous. + * Roll current to previous. * * @return true if the rolling succeeded. * When it returns false, it is not equivalent to an error. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java similarity index 79% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java index 228ebc06dc..7f4bdaeb63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java @@ -15,16 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.datanode; +package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.IOException; import java.util.List; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; -public class RoundRobinVolumesPolicy - implements BlockVolumeChoosingPolicy { +/** + * Choose volumes in round-robin order. + */ +public class RoundRobinVolumeChoosingPolicy + implements VolumeChoosingPolicy { private int curVolume = 0; @@ -55,13 +57,10 @@ public synchronized V chooseVolume(final List volumes, final long blockSize } if (curVolume == startVolume) { - throw new DiskOutOfSpaceException( - "Insufficient space for an additional block. Volume with the most available space has " - + maxAvailable - + " bytes free, configured block size is " - + blockSize); + throw new DiskOutOfSpaceException("Out of space: " + + "The volume with the most available space (=" + maxAvailable + + " B) is less than the block size (=" + blockSize + " B)."); } } } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java similarity index 50% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java index 28c1dc46d7..62b1e759ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java @@ -15,37 +15,29 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.datanode; +package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.IOException; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; -/************************************************** - * BlockVolumeChoosingPolicy allows a DataNode to - * specify what policy is to be used while choosing - * a volume for a block request. - * - * Note: This is an evolving i/f and is only for - * advanced use. - * - ***************************************************/ +/** + * This interface specifies the policy for choosing volumes to store replicas. + */ @InterfaceAudience.Private -public interface BlockVolumeChoosingPolicy { +public interface VolumeChoosingPolicy { /** - * Returns a specific FSVolume after applying a suitable choice algorithm - * to place a given block, given a list of FSVolumes and the block - * size sought for storage. + * Choose a volume to place a replica, + * given a list of volumes and the replica size sought for storage. * - * (Policies that maintain state must be thread-safe.) + * The implementations of this interface must be thread-safe. * - * @param volumes - the array of FSVolumes that are available. - * @param blockSize - the size of the block for which a volume is sought. - * @return the chosen volume to store the block. + * @param volumes - a list of available volumes. + * @param replicaSize - the size of the replica for which a volume is sought. + * @return the chosen volume. * @throws IOException when disks are unavailable or are full. */ - public V chooseVolume(List volumes, long blockSize) throws IOException; -} + public V chooseVolume(List volumes, long replicaSize) throws IOException; +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index df2e251a75..b9391449f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -428,15 +428,6 @@ - - dfs.datanode.block.volume.choice.policy - org.apache.hadoop.hdfs.server.datanode.RoundRobinVolumesPolicy - The policy class to use to determine into which of the - datanode's available volumes a block must be written to. Default is a simple - round-robin policy that chooses volumes in a cyclic order. - - - dfs.heartbeat.interval 3 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java index 25198e3692..88e7bcf8f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java @@ -25,8 +25,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.namenode.CreateEditsLog; import org.apache.hadoop.net.DNS; @@ -125,7 +125,7 @@ public static void main(String[] args) { } else if (args[i].equals("-simulated")) { SimulatedFSDataset.setFactory(conf); } else if (args[i].equals("-inject")) { - if (!FSDatasetInterface.Factory.getFactory(conf).isSimulated()) { + if (!FsDatasetSpi.Factory.getFactory(conf).isSimulated()) { System.out.print("-inject is valid only for simulated"); printUsageExit(); } @@ -157,8 +157,7 @@ public static void main(String[] args) { System.out.println("No name node address and port in config"); System.exit(-1); } - boolean simulated = - FSDatasetInterface.Factory.getFactory(conf).isSimulated(); + boolean simulated = FsDatasetSpi.Factory.getFactory(conf).isSimulated(); System.out.println("Starting " + numDataNodes + (simulated ? " Simulated " : " ") + " Data Nodes that will connect to Name Node at " + nameNodeAdr); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 5de6b9ef6b..6ab3f0ce66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -17,6 +17,29 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HOSTS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI; import java.io.File; @@ -43,9 +66,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; - import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocolHelper; import org.apache.hadoop.ha.ServiceFailedException; @@ -57,21 +77,20 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; +import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataStorage; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.DFSAdmin; -import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.NetUtils; @@ -1802,7 +1821,7 @@ public void injectBlocks(int dataNodeIndex, Iterable blocksToInject) thro throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; - final FSDatasetInterface dataSet = DataNodeTestUtils.getFSDataset(dn); + final FsDatasetSpi dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } @@ -1821,7 +1840,7 @@ public void injectBlocks(int nameNodeIndex, int dataNodeIndex, throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; - final FSDatasetInterface dataSet = DataNodeTestUtils.getFSDataset(dn); + final FsDatasetSpi dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index 35846c5c39..a9000ed5fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -60,8 +60,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.io.IOUtils; @@ -211,7 +211,7 @@ public void testFileCreation() throws IOException { // can't check capacities for real storage since the OS file system may be changing under us. if (simulatedStorage) { DataNode dn = cluster.getDataNodes().get(0); - FSDatasetInterface dataset = DataNodeTestUtils.getFSDataset(dn); + FsDatasetSpi dataset = DataNodeTestUtils.getFSDataset(dn); assertEquals(fileSize, dataset.getDfsUsed()); assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize, dataset.getRemaining()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index c9be8f9524..1a871dd35e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; /** @@ -48,7 +49,7 @@ public class DataNodeTestUtils { * * @return the fsdataset that stores the blocks */ - public static FSDatasetInterface getFSDataset(DataNode dn) { + public static FsDatasetSpi getFSDataset(DataNode dn) { return dn.getFSDataset(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 356b6b10ad..a12ac722a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -39,10 +39,12 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolumeSet; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; @@ -65,10 +67,10 @@ * * Note the synchronization is coarse grained - it is at each method. */ -public class SimulatedFSDataset implements FSDatasetInterface { - static class Factory extends FSDatasetInterface.Factory { +public class SimulatedFSDataset implements FsDatasetSpi { + static class Factory extends FsDatasetSpi.Factory { @Override - public SimulatedFSDataset createFSDatasetInterface(DataNode datanode, + public SimulatedFSDataset newInstance(DataNode datanode, DataStorage storage, Configuration conf) throws IOException { return new SimulatedFSDataset(datanode, storage, conf); } @@ -427,7 +429,7 @@ private Map getMap(String bpid) throws IOException { return map; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { final Map map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); @@ -437,7 +439,7 @@ public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { binfo.finalizeBlock(b.getBlockPoolId(), b.getNumBytes()); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized void unfinalizeBlock(ExtendedBlock b) { if (isValidRbw(b)) { blockMap.remove(b.getLocalBlock()); @@ -483,7 +485,7 @@ public int getNumFailedVolumes() { return storage.getNumFailedVolumes(); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized long getLength(ExtendedBlock b) throws IOException { final Map map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); @@ -513,7 +515,7 @@ public synchronized String getReplicaString(String bpid, long blockId) { return r == null? "null": r.toString(); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public Block getStoredBlock(String bpid, long blkid) throws IOException { final Map map = blockMap.get(bpid); if (map != null) { @@ -526,7 +528,7 @@ public Block getStoredBlock(String bpid, long blkid) throws IOException { return null; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized void invalidate(String bpid, Block[] invalidBlks) throws IOException { boolean error = false; @@ -557,12 +559,12 @@ private BInfo getBInfo(final ExtendedBlock b) { return map == null? null: map.get(b.getLocalBlock()); } - @Override // {@link FSDatasetInterface} + @Override // {@link FsDatasetSpi} public boolean contains(ExtendedBlock block) { return getBInfo(block) != null; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized boolean isValidBlock(ExtendedBlock b) { final BInfo binfo = getBInfo(b); return binfo != null && binfo.isFinalized(); @@ -580,7 +582,7 @@ public String toString() { return getStorageInfo(); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { final Map map = getMap(b.getBlockPoolId()); @@ -593,7 +595,7 @@ public synchronized ReplicaInPipelineInterface append(ExtendedBlock b, return binfo; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { final Map map = getMap(b.getBlockPoolId()); @@ -611,7 +613,7 @@ public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, return binfo; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { final Map map = getMap(b.getBlockPoolId()); @@ -628,7 +630,7 @@ public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) map.put(binfo.theBlock, binfo); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { final Map map = getMap(b.getBlockPoolId()); @@ -647,13 +649,13 @@ public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, return binfo; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) throws IOException { return createTemporary(b); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b) throws IOException { if (isValidBlock(b)) { @@ -681,7 +683,7 @@ synchronized InputStream getBlockInputStream(ExtendedBlock b return binfo.getIStream(); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) throws IOException { InputStream result = getBlockInputStream(b); @@ -690,13 +692,13 @@ public synchronized InputStream getBlockInputStream(ExtendedBlock b, } /** Not supported */ - @Override // FSDatasetInterface + @Override // FsDatasetSpi public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, long ckoff) throws IOException { throw new IOException("Not supported"); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b ) throws IOException { final Map map = getMap(b.getBlockPoolId()); @@ -717,7 +719,7 @@ public void checkDataDir() throws DiskErrorException { // nothing to check for simulated data set } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public synchronized void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams stream, int checksumSize) @@ -902,32 +904,32 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) binfo.isFinalized()?ReplicaState.FINALIZED : ReplicaState.RBW); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, long newlength) { return storageId; } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public long getReplicaVisibleLength(ExtendedBlock block) { return block.getNumBytes(); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void addBlockPool(String bpid, Configuration conf) { Map map = new HashMap(); blockMap.put(bpid, map); storage.addBlockPool(bpid); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void shutdownBlockPool(String bpid) { blockMap.remove(bpid); storage.removeBlockPool(bpid); } - @Override // FSDatasetInterface + @Override // FsDatasetSpi public void deleteBlockPool(String bpid, boolean force) { return; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 4332202ed3..f14664fb90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; @@ -77,7 +78,7 @@ public class TestBPOfferService { private NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2]; private int heartbeatCounts[] = new int[2]; private DataNode mockDn; - private FSDatasetInterface mockFSDataset; + private FsDatasetSpi mockFSDataset; @Before public void setupMocks() throws Exception { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index d7c254d0bb..b319dd9046 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.util.DataChecksum; @@ -88,12 +89,12 @@ int addSomeBlocks(SimulatedFSDataset fsdataset ) throws IOException { public void testFSDatasetFactory() { final Configuration conf = new Configuration(); - FSDatasetInterface.Factory f = FSDatasetInterface.Factory.getFactory(conf); + FsDatasetSpi.Factory f = FsDatasetSpi.Factory.getFactory(conf); assertEquals(FSDataset.Factory.class, f.getClass()); assertFalse(f.isSimulated()); SimulatedFSDataset.setFactory(conf); - FSDatasetInterface.Factory s = FSDatasetInterface.Factory.getFactory(conf); + FsDatasetSpi.Factory s = FsDatasetSpi.Factory.getFactory(conf); assertEquals(SimulatedFSDataset.Factory.class, s.getClass()); assertTrue(s.isSimulated()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java similarity index 75% rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java rename to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java index b1de62352d..f8f3cd85c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java @@ -15,20 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.datanode; +package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.ReflectionUtils; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; -public class TestRoundRobinVolumesPolicy { +public class TestRoundRobinVolumeChoosingPolicy { // Test the Round-Robin block-volume choosing algorithm. @Test @@ -44,9 +43,8 @@ public void testRR() throws Exception { Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L); @SuppressWarnings("unchecked") - final RoundRobinVolumesPolicy policy = - (RoundRobinVolumesPolicy)ReflectionUtils.newInstance( - RoundRobinVolumesPolicy.class, null); + final RoundRobinVolumeChoosingPolicy policy = + ReflectionUtils.newInstance(RoundRobinVolumeChoosingPolicy.class, null); // Test two rounds of round-robin choosing Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0)); @@ -67,10 +65,10 @@ public void testRR() throws Exception { } } - // ChooseVolume should throw DiskOutOfSpaceException with volume and block sizes in exception message. + // ChooseVolume should throw DiskOutOfSpaceException + // with volume and block sizes in exception message. @Test - public void testRRPolicyExceptionMessage() - throws Exception { + public void testRRPolicyExceptionMessage() throws Exception { final List volumes = new ArrayList(); // First volume, with 500 bytes of space. @@ -81,18 +79,17 @@ public void testRRPolicyExceptionMessage() volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L); - final RoundRobinVolumesPolicy policy - = new RoundRobinVolumesPolicy(); + final RoundRobinVolumeChoosingPolicy policy + = new RoundRobinVolumeChoosingPolicy(); int blockSize = 700; try { policy.chooseVolume(volumes, blockSize); Assert.fail("expected to throw DiskOutOfSpaceException"); - } catch (DiskOutOfSpaceException e) { - Assert - .assertEquals( - "Not returnig the expected message", - "Insufficient space for an additional block. Volume with the most available space has 600 bytes free, configured block size is " + blockSize, e - .getMessage()); + } catch(DiskOutOfSpaceException e) { + Assert.assertEquals("Not returnig the expected message", + "Out of space: The volume with the most available space (=" + 600 + + " B) is less than the block size (=" + blockSize + " B).", + e.getMessage()); } }