diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 897bf69471..5631838827 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.EnumMap; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -101,6 +102,17 @@ DatanodeStorageInfo[] chooseTarget(String src, excludedNodes, blocksize, storagePolicy, flags); } + /** + * @param storageTypes storage types that should be used as targets. + */ + public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas, + Node writer, List chosen, boolean returnChosenNodes, + Set excludedNodes, long blocksize, BlockStoragePolicy storagePolicy, + EnumSet flags, EnumMap storageTypes) { + return chooseTarget(srcPath, numOfReplicas, writer, chosen, + returnChosenNodes, excludedNodes, blocksize, storagePolicy, flags); + } + /** * Verify if the block's placement meets requirement of placement policy, * i.e. replicas are placed on no less than minRacks racks in the system. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index d13782686e..6fed8a18f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -150,7 +150,16 @@ public DatanodeStorageInfo[] chooseTarget(String srcPath, final BlockStoragePolicy storagePolicy, EnumSet flags) { return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes, - excludedNodes, blocksize, storagePolicy, flags); + excludedNodes, blocksize, storagePolicy, flags, null); + } + + @Override + public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas, + Node writer, List chosen, boolean returnChosenNodes, + Set excludedNodes, long blocksize, BlockStoragePolicy storagePolicy, + EnumSet flags, EnumMap storageTypes) { + return chooseTarget(numOfReplicas, writer, chosen, returnChosenNodes, + excludedNodes, blocksize, storagePolicy, flags, storageTypes); } @Override @@ -202,7 +211,8 @@ DatanodeStorageInfo[] chooseTarget(String src, DatanodeStorageInfo[] remainingTargets = chooseTarget(src, numOfReplicas, writer, new ArrayList(numOfReplicas), false, - favoriteAndExcludedNodes, blocksize, storagePolicy, flags); + favoriteAndExcludedNodes, blocksize, storagePolicy, flags, + storageTypes); for (int i = 0; i < remainingTargets.length; i++) { results.add(remainingTargets[i]); } @@ -252,7 +262,8 @@ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, Set excludedNodes, long blocksize, final BlockStoragePolicy storagePolicy, - EnumSet addBlockFlags) { + EnumSet addBlockFlags, + EnumMap sTypes) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { return DatanodeStorageInfo.EMPTY_ARRAY; } @@ -290,7 +301,7 @@ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, localNode = chooseTarget(numOfReplicas, writer, excludedNodeCopy, blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy, - EnumSet.noneOf(StorageType.class), results.isEmpty()); + EnumSet.noneOf(StorageType.class), results.isEmpty(), sTypes); if (results.size() < numOfReplicas) { // not enough nodes; discard results and fall back results = null; @@ -300,7 +311,8 @@ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, results = new ArrayList<>(chosenStorage); localNode = chooseTarget(numOfReplicas, writer, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, - storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty()); + storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty(), + sTypes); } if (!returnChosenNodes) { @@ -380,6 +392,7 @@ private EnumMap getRequiredStorageTypes( * @param maxNodesPerRack max nodes allowed per rack * @param results the target nodes already chosen * @param avoidStaleNodes avoid stale nodes in replica choosing + * @param storageTypes storage type to be considered for target * @return local node of writer (not chosen node) */ private Node chooseTarget(int numOfReplicas, @@ -391,7 +404,8 @@ private Node chooseTarget(int numOfReplicas, final boolean avoidStaleNodes, final BlockStoragePolicy storagePolicy, final EnumSet unavailableStorages, - final boolean newBlock) { + final boolean newBlock, + EnumMap storageTypes) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { return (writer instanceof DatanodeDescriptor) ? writer : null; } @@ -409,8 +423,9 @@ private Node chooseTarget(int numOfReplicas, .chooseStorageTypes((short) totalReplicasExpected, DatanodeStorageInfo.toStorageTypes(results), unavailableStorages, newBlock); - final EnumMap storageTypes = - getRequiredStorageTypes(requiredStorageTypes); + if (storageTypes == null) { + storageTypes = getRequiredStorageTypes(requiredStorageTypes); + } if (LOG.isTraceEnabled()) { LOG.trace("storageTypes=" + storageTypes); } @@ -453,7 +468,7 @@ private Node chooseTarget(int numOfReplicas, numOfReplicas = totalReplicasExpected - results.size(); return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, maxNodesPerRack, results, false, storagePolicy, unavailableStorages, - newBlock); + newBlock, null); } boolean retry = false; @@ -473,7 +488,7 @@ private Node chooseTarget(int numOfReplicas, numOfReplicas = totalReplicasExpected - results.size(); return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, maxNodesPerRack, results, false, storagePolicy, unavailableStorages, - newBlock); + newBlock, null); } } return writer; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 8ad70852ae..d09ad0209c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -40,6 +40,7 @@ import java.security.NoSuchAlgorithmException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; @@ -50,6 +51,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; @@ -80,6 +82,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem.HdfsDataOutputStreamBuilder; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; import org.apache.hadoop.hdfs.net.Peer; @@ -1979,4 +1982,30 @@ public Object run() throws Exception { } } } + + @Test + public void testStorageFavouredNodes() + throws IOException, InterruptedException, TimeoutException { + Configuration conf = new HdfsConfiguration(); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .storageTypes(new StorageType[] {StorageType.SSD, StorageType.DISK}) + .numDataNodes(3).storagesPerDatanode(2).build()) { + DistributedFileSystem fs = cluster.getFileSystem(); + Path file1 = new Path("/tmp/file1"); + fs.mkdirs(new Path("/tmp")); + fs.setStoragePolicy(new Path("/tmp"), "ONE_SSD"); + InetSocketAddress[] addrs = + {cluster.getDataNodes().get(0).getXferAddress()}; + HdfsDataOutputStream stream = fs.create(file1, FsPermission.getDefault(), + false, 1024, (short) 3, 1024, null, addrs); + stream.write("Some Bytes".getBytes()); + stream.close(); + DFSTestUtil.waitReplication(fs, file1, (short) 3); + BlockLocation[] locations = fs.getClient() + .getBlockLocations(file1.toUri().getPath(), 0, Long.MAX_VALUE); + int numSSD = Collections.frequency( + Arrays.asList(locations[0].getStorageTypes()), StorageType.SSD); + assertEquals("Number of SSD should be 1 but was : " + numSSD, 1, numSSD); + } + } }