diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index 33493ce6e1..cd5490de28 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -950,6 +950,7 @@ public void sortByDistanceUsingNetworkLocation(Node reader, *

* As an additional twist, we also randomize the nodes at each network * distance. This helps with load balancing when there is data skew. + * And it helps choose node with more fast storage type. * * @param reader Node where data will be read * @param nodes Available replicas with the requested data diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java index c1b6a544b1..ed4d668806 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java @@ -158,7 +158,7 @@ public ExtendedBlock getBlock() { * {@link org.apache.hadoop.hdfs.protocol.LocatedBlock#updateCachedStorageInfo} * to update the cached Storage ID/Type arrays. */ - public DatanodeInfo[] getLocations() { + public DatanodeInfoWithStorage[] getLocations() { return locs; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java index 699ea92abb..7c80ad64bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java @@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceStatus; @@ -58,6 +59,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -537,7 +539,10 @@ private static LocatedBlock getMockLocatedBlock(final String nsId) { DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", 1111, 1112, 1113, 1114); DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId); - when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo}); + DatanodeInfoWithStorage datanodeInfoWithStorage = + new DatanodeInfoWithStorage(dnInfo, "storageID", StorageType.DEFAULT); + when(lb.getLocations()) + .thenReturn(new DatanodeInfoWithStorage[] {datanodeInfoWithStorage}); ExtendedBlock eb = mock(ExtendedBlock.class); when(eb.getBlockPoolId()).thenReturn(nsId); when(lb.getBlock()).thenReturn(eb); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index b2f8ad2a5a..2c943b6037 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -243,6 +243,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.namenode.read.considerLoad"; public static final boolean DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT = false; + public static final String DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY = + "dfs.namenode.read.considerStorageType"; + public static final boolean DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_DEFAULT = + false; public static final String DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR = "dfs.namenode.redundancy.considerLoad.factor"; public static final double diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index c26dd90988..fbe132a066 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -137,6 +137,9 @@ public class DatanodeManager { /** Whether or not to consider lad for reading. */ private final boolean readConsiderLoad; + /** Whether or not to consider storageType for reading. */ + private final boolean readConsiderStorageType; + /** * Whether or not to avoid using stale DataNodes for writing. * Note that, even if this is configured, the policy may be @@ -320,6 +323,15 @@ public class DatanodeManager { this.readConsiderLoad = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY, DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT); + this.readConsiderStorageType = conf.getBoolean( + DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY, + DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_DEFAULT); + LOG.warn( + "{} and {} are incompatible and only one can be enabled. " + + "Both are currently enabled.", + DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY, + DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY); + this.avoidStaleDataNodesForWrite = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT); @@ -524,7 +536,7 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost, } } - DatanodeInfo[] di = lb.getLocations(); + DatanodeInfoWithStorage[] di = lb.getLocations(); // Move decommissioned/stale datanodes to the bottom Arrays.sort(di, comparator); @@ -547,10 +559,15 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost, lb.updateCachedStorageInfo(); } - private Consumer> createSecondaryNodeSorter() { - Consumer> secondarySort = null; + private Consumer> createSecondaryNodeSorter() { + Consumer> secondarySort = null; + if (readConsiderStorageType) { + Comparator comp = + Comparator.comparing(DatanodeInfoWithStorage::getStorageType); + secondarySort = list -> Collections.sort(list, comp); + } if (readConsiderLoad) { - Comparator comp = + Comparator comp = Comparator.comparingInt(DatanodeInfo::getXceiverCount); secondarySort = list -> Collections.sort(list, comp); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index 66a8713f54..88d71143ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -43,6 +43,7 @@ import java.util.TreeMap; import java.util.concurrent.locks.ReentrantLock; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -935,6 +936,11 @@ public void setCachedLocations(LocatedBlocks locations) { } } + @SuppressFBWarnings( + value="EC_UNRELATED_TYPES", + justification="HDFS-15255 Asked Wei-Chiu and Pifta to review this" + + " warning and we all agree the code is OK and the warning is not " + + "needed") private void setCachedLocations(LocatedBlock block) { CachedBlock cachedBlock = new CachedBlock(block.getBlock().getBlockId(), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index bf18f3a0be..ccd20d5677 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -327,9 +327,23 @@ Decide if sort block locations considers the target's load or not when read. Turn off by default. + It is not possible to enable this feature along with dfs.namenode.read.considerStorageType as only one sort can be + enabled at a time. + + dfs.namenode.read.considerStorageType + false + + Decide if sort block locations considers the target's storage type or not when read. Any locations with the same + network distance are sorted in order of the storage speed, fastest first (RAM, SSD, Disk, Archive). This is + disabled by default, and the locations will be ordered randomly. + It is not possible to enable this feature along with dfs.namenode.read.considerLoad as only one sort can be + enabled at a time. + + + dfs.datanode.httpserver.filter.handlers org.apache.hadoop.hdfs.server.datanode.web.RestCsrfPreventionFilterHandler diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java index bdc342ad3d..f2d580576c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java @@ -35,9 +35,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.net.unix.DomainSocket; @@ -273,7 +275,10 @@ public void testReadWithoutPreferredCachingReplica() throws IOException { DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", 1111, 1112, 1113, 1114); DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId); - when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo}); + DatanodeInfoWithStorage dnInfoStorage = + new DatanodeInfoWithStorage(dnInfo, "DISK", StorageType.DISK); + when(lb.getLocations()).thenReturn( + new DatanodeInfoWithStorage[] {dnInfoStorage}); DatanodeInfo retDNInfo = dfsInputStream.getBestNodeDNAddrPair(lb, null).info; assertEquals(dnInfo, retDNInfo); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index ef9758aa88..cdce754e41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -668,6 +668,179 @@ public void testGetBlockLocationConsiderLoadWithNodesOfSameDistance() assertEquals(2, ipSet.size()); } + @Test + public void testGetBlockLocationConsiderStorageType() + throws IOException, URISyntaxException { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY, + true); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true); + FSNamesystem fsn = Mockito.mock(FSNamesystem.class); + Mockito.when(fsn.hasWriteLock()).thenReturn(true); + URL shellScript = getClass() + .getResource("/" + Shell.appendScriptExtension("topology-script")); + Path resourcePath = Paths.get(shellScript.toURI()); + FileUtil.setExecutable(resourcePath.toFile(), true); + conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, + resourcePath.toString()); + DatanodeManager dm = mockDatanodeManager(fsn, conf); + + int totalDNs = 5; + // Register 5 datanodes and 2 nodes per rack with different load. + DatanodeInfo[] locs = new DatanodeInfo[totalDNs]; + String[] storageIDs = new String[totalDNs]; + List storageTypesList = + new ArrayList<>(Arrays.asList(StorageType.ARCHIVE, StorageType.DISK, + StorageType.SSD, StorageType.DEFAULT, StorageType.SSD)); + StorageType[] storageTypes = storageTypesList.toArray(new StorageType[0]); + + for (int i = 0; i < totalDNs; i++) { + // Register new datanode. + String uuid = "UUID-" + i; + String ip = "IP-" + i / 2 + "-" + i; + DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class); + Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid); + Mockito.when(dr.getIpAddr()).thenReturn(ip); + dm.registerDatanode(dr); + + // Get location and storage information. + locs[i] = dm.getDatanode(uuid); + storageIDs[i] = "storageID-" + i; + } + + // Set node 0 decommissioned. + locs[0].setDecommissioned(); + + // Create LocatedBlock with above locations. + ExtendedBlock b = new ExtendedBlock("somePoolID", 1234); + LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes); + List blocks = new ArrayList<>(); + blocks.add(block); + + // Test client located at locs[3] in cluster. + final String targetIpInCluster = locs[3].getIpAddr(); + dm.sortLocatedBlocks(targetIpInCluster, blocks); + DatanodeInfo[] sortedLocs = block.getLocations(); + assertEquals(totalDNs, sortedLocs.length); + // Ensure the local node is first. + assertEquals(targetIpInCluster, sortedLocs[0].getIpAddr()); + // Ensure choose fast storage type node when distance is same. + assertEquals(locs[3].getIpAddr(), sortedLocs[0].getIpAddr()); + assertEquals(locs[2].getIpAddr(), sortedLocs[1].getIpAddr()); + assertEquals(locs[4].getIpAddr(), sortedLocs[2].getIpAddr()); + assertEquals(locs[1].getIpAddr(), sortedLocs[3].getIpAddr()); + // Ensure the two decommissioned DNs were moved to the end. + assertThat(sortedLocs[4].getAdminState(), + is(DatanodeInfo.AdminStates.DECOMMISSIONED)); + assertEquals(locs[0].getIpAddr(), sortedLocs[4].getIpAddr()); + + // Test client not in cluster but same rack with locs[3]. + final String targetIpNotInCluster = locs[3].getIpAddr() + "-client"; + dm.sortLocatedBlocks(targetIpNotInCluster, blocks); + DatanodeInfo[] sortedLocs2 = block.getLocations(); + assertEquals(totalDNs, sortedLocs2.length); + // Ensure the local rack is first and choose fast storage type node + // when distance is same. + assertEquals(locs[2].getIpAddr(), sortedLocs2[0].getIpAddr()); + assertEquals(locs[3].getIpAddr(), sortedLocs2[1].getIpAddr()); + assertEquals(locs[4].getIpAddr(), sortedLocs2[2].getIpAddr()); + assertEquals(locs[1].getIpAddr(), sortedLocs2[3].getIpAddr()); + // Ensure the two decommissioned DNs were moved to the end. + assertThat(sortedLocs[4].getAdminState(), + is(DatanodeInfo.AdminStates.DECOMMISSIONED)); + assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr()); + } + + @Test + public void testGetBlockLocationConsiderStorageTypeAndLoad() + throws IOException, URISyntaxException { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY, + true); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY, true); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true); + FSNamesystem fsn = Mockito.mock(FSNamesystem.class); + Mockito.when(fsn.hasWriteLock()).thenReturn(true); + URL shellScript = getClass() + .getResource("/" + Shell.appendScriptExtension("topology-script")); + Path resourcePath = Paths.get(shellScript.toURI()); + FileUtil.setExecutable(resourcePath.toFile(), true); + conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, + resourcePath.toString()); + DatanodeManager dm = mockDatanodeManager(fsn, conf); + + int totalDNs = 5; + // Register 5 datanodes and 2 nodes per rack with different load. + DatanodeInfo[] locs = new DatanodeInfo[totalDNs]; + String[] storageIDs = new String[totalDNs]; + List storageTypesList = + new ArrayList<>(Arrays.asList(StorageType.DISK, StorageType.DISK, + StorageType.DEFAULT, StorageType.SSD, StorageType.SSD)); + StorageType[] storageTypes = storageTypesList.toArray(new StorageType[0]); + + for (int i = 0; i < totalDNs; i++) { + // Register new datanode. + String uuid = "UUID-" + i; + String ip = "IP-" + i / 2 + "-" + i; + DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class); + Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid); + Mockito.when(dr.getIpAddr()).thenReturn(ip); + dm.registerDatanode(dr); + + // Get location and storage information. + locs[i] = dm.getDatanode(uuid); + storageIDs[i] = "storageID-" + i; + + // Set load for datanodes. + locs[i].setXceiverCount(i); + } + + // Set node 0 decommissioned. + locs[0].setDecommissioned(); + + // Create LocatedBlock with above locations. + ExtendedBlock b = new ExtendedBlock("somePoolID", 1234); + LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes); + List blocks = new ArrayList<>(); + blocks.add(block); + + // Test client located at locs[3] in cluster. + final String targetIpInCluster = locs[3].getIpAddr(); + dm.sortLocatedBlocks(targetIpInCluster, blocks); + DatanodeInfo[] sortedLocs = block.getLocations(); + assertEquals(totalDNs, sortedLocs.length); + // Ensure the local node is first. + assertEquals(targetIpInCluster, sortedLocs[0].getIpAddr()); + // Ensure choose the light weight node between light weight and fast storage + // type node when distance is same. + assertEquals(locs[3].getIpAddr(), sortedLocs[0].getIpAddr()); + assertEquals(locs[2].getIpAddr(), sortedLocs[1].getIpAddr()); + assertEquals(locs[1].getIpAddr(), sortedLocs[2].getIpAddr()); + assertEquals(locs[4].getIpAddr(), sortedLocs[3].getIpAddr()); + // Ensure the two decommissioned DNs were moved to the end. + assertThat(sortedLocs[4].getAdminState(), + is(DatanodeInfo.AdminStates.DECOMMISSIONED)); + assertEquals(locs[0].getIpAddr(), sortedLocs[4].getIpAddr()); + + // Test client not in cluster but same rack with locs[3]. + final String targetIpNotInCluster = locs[3].getIpAddr() + "-client"; + dm.sortLocatedBlocks(targetIpNotInCluster, blocks); + DatanodeInfo[] sortedLocs2 = block.getLocations(); + assertEquals(totalDNs, sortedLocs2.length); + // Ensure the local rack is first and choose the light weight node between + // light weight and fast storage type node when distance is same. + assertEquals(locs[2].getIpAddr(), sortedLocs2[0].getIpAddr()); + assertEquals(locs[3].getIpAddr(), sortedLocs2[1].getIpAddr()); + assertEquals(locs[1].getIpAddr(), sortedLocs2[2].getIpAddr()); + assertEquals(locs[4].getIpAddr(), sortedLocs2[3].getIpAddr()); + // Ensure the two decommissioned DNs were moved to the end. + assertThat(sortedLocs[4].getAdminState(), + is(DatanodeInfo.AdminStates.DECOMMISSIONED)); + assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr()); + } + /** * Test whether removing a host from the includes list without adding it to * the excludes list will exclude it from data node reports.