HDFS-15255. Consider StorageType when DatanodeManager#sortLocatedBlock(). Contributed by Lisheng Sun.

This commit is contained in:
S O'Donnell 2020-05-12 14:34:14 +01:00
parent 928b81a533
commit 29dddb8a14
9 changed files with 232 additions and 7 deletions

View File

@ -949,6 +949,7 @@ public <T extends Node> void sortByDistanceUsingNetworkLocation(Node reader,
* <p>
* As an additional twist, we also randomize the nodes at each network
* distance. This helps with load balancing when there is data skew.
* And it helps choose node with more fast storage type.
*
* @param reader Node where data will be read
* @param nodes Available replicas with the requested data

View File

@ -158,7 +158,7 @@ public ExtendedBlock getBlock() {
* {@link org.apache.hadoop.hdfs.protocol.LocatedBlock#updateCachedStorageInfo}
* to update the cached Storage ID/Type arrays.
*/
public DatanodeInfo[] getLocations() {
public DatanodeInfoWithStorage[] getLocations() {
return locs;
}

View File

@ -46,6 +46,7 @@
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceStatus;
@ -58,6 +59,7 @@
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -537,7 +539,10 @@ private static LocatedBlock getMockLocatedBlock(final String nsId) {
DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0",
1111, 1112, 1113, 1114);
DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId);
when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo});
DatanodeInfoWithStorage datanodeInfoWithStorage =
new DatanodeInfoWithStorage(dnInfo, "storageID", StorageType.DEFAULT);
when(lb.getLocations())
.thenReturn(new DatanodeInfoWithStorage[] {datanodeInfoWithStorage});
ExtendedBlock eb = mock(ExtendedBlock.class);
when(eb.getBlockPoolId()).thenReturn(nsId);
when(lb.getBlock()).thenReturn(eb);

View File

@ -243,6 +243,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.read.considerLoad";
public static final boolean DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT =
false;
public static final String DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY =
"dfs.namenode.read.considerStorageType";
public static final boolean DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_DEFAULT =
false;
public static final String DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR =
"dfs.namenode.redundancy.considerLoad.factor";
public static final double

View File

@ -137,6 +137,9 @@ public class DatanodeManager {
/** Whether or not to consider lad for reading. */
private final boolean readConsiderLoad;
/** Whether or not to consider storageType for reading. */
private final boolean readConsiderStorageType;
/**
* Whether or not to avoid using stale DataNodes for writing.
* Note that, even if this is configured, the policy may be
@ -320,6 +323,15 @@ public class DatanodeManager {
this.readConsiderLoad = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY,
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT);
this.readConsiderStorageType = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY,
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_DEFAULT);
LOG.warn(
"{} and {} are incompatible and only one can be enabled. "
+ "Both are currently enabled.",
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY,
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY);
this.avoidStaleDataNodesForWrite = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
@ -524,7 +536,7 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
}
}
DatanodeInfo[] di = lb.getLocations();
DatanodeInfoWithStorage[] di = lb.getLocations();
// Move decommissioned/stale datanodes to the bottom
Arrays.sort(di, comparator);
@ -547,10 +559,15 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
lb.updateCachedStorageInfo();
}
private Consumer<List<DatanodeInfo>> createSecondaryNodeSorter() {
Consumer<List<DatanodeInfo>> secondarySort = null;
private Consumer<List<DatanodeInfoWithStorage>> createSecondaryNodeSorter() {
Consumer<List<DatanodeInfoWithStorage>> secondarySort = null;
if (readConsiderStorageType) {
Comparator<DatanodeInfoWithStorage> comp =
Comparator.comparing(DatanodeInfoWithStorage::getStorageType);
secondarySort = list -> Collections.sort(list, comp);
}
if (readConsiderLoad) {
Comparator<DatanodeInfo> comp =
Comparator<DatanodeInfoWithStorage> comp =
Comparator.comparingInt(DatanodeInfo::getXceiverCount);
secondarySort = list -> Collections.sort(list, comp);
}

View File

@ -43,6 +43,7 @@
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantLock;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -935,6 +936,11 @@ public void setCachedLocations(LocatedBlocks locations) {
}
}
@SuppressFBWarnings(
value="EC_UNRELATED_TYPES",
justification="HDFS-15255 Asked Wei-Chiu and Pifta to review this" +
" warning and we all agree the code is OK and the warning is not " +
"needed")
private void setCachedLocations(LocatedBlock block) {
CachedBlock cachedBlock =
new CachedBlock(block.getBlock().getBlockId(),

View File

@ -327,9 +327,23 @@
<description>
Decide if sort block locations considers the target's load or not when read.
Turn off by default.
It is not possible to enable this feature along with dfs.namenode.read.considerStorageType as only one sort can be
enabled at a time.
</description>
</property>
<property>
<name>dfs.namenode.read.considerStorageType</name>
<value>false</value>
<description>
Decide if sort block locations considers the target's storage type or not when read. Any locations with the same
network distance are sorted in order of the storage speed, fastest first (RAM, SSD, Disk, Archive). This is
disabled by default, and the locations will be ordered randomly.
It is not possible to enable this feature along with dfs.namenode.read.considerLoad as only one sort can be
enabled at a time.
</description>
</property>
<property>
<name>dfs.datanode.httpserver.filter.handlers</name>
<value>org.apache.hadoop.hdfs.server.datanode.web.RestCsrfPreventionFilterHandler</value>

View File

@ -35,9 +35,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.net.unix.DomainSocket;
@ -273,7 +275,10 @@ public void testReadWithoutPreferredCachingReplica() throws IOException {
DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", 1111,
1112, 1113, 1114);
DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId);
when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo});
DatanodeInfoWithStorage dnInfoStorage =
new DatanodeInfoWithStorage(dnInfo, "DISK", StorageType.DISK);
when(lb.getLocations()).thenReturn(
new DatanodeInfoWithStorage[] {dnInfoStorage});
DatanodeInfo retDNInfo =
dfsInputStream.getBestNodeDNAddrPair(lb, null).info;
assertEquals(dnInfo, retDNInfo);

View File

@ -668,6 +668,179 @@ public void testGetBlockLocationConsiderLoadWithNodesOfSameDistance()
assertEquals(2, ipSet.size());
}
@Test
public void testGetBlockLocationConsiderStorageType()
throws IOException, URISyntaxException {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY,
true);
conf.setBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
URL shellScript = getClass()
.getResource("/" + Shell.appendScriptExtension("topology-script"));
Path resourcePath = Paths.get(shellScript.toURI());
FileUtil.setExecutable(resourcePath.toFile(), true);
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
resourcePath.toString());
DatanodeManager dm = mockDatanodeManager(fsn, conf);
int totalDNs = 5;
// Register 5 datanodes and 2 nodes per rack with different load.
DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
String[] storageIDs = new String[totalDNs];
List<StorageType> storageTypesList =
new ArrayList<>(Arrays.asList(StorageType.ARCHIVE, StorageType.DISK,
StorageType.SSD, StorageType.DEFAULT, StorageType.SSD));
StorageType[] storageTypes = storageTypesList.toArray(new StorageType[0]);
for (int i = 0; i < totalDNs; i++) {
// Register new datanode.
String uuid = "UUID-" + i;
String ip = "IP-" + i / 2 + "-" + i;
DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
Mockito.when(dr.getIpAddr()).thenReturn(ip);
dm.registerDatanode(dr);
// Get location and storage information.
locs[i] = dm.getDatanode(uuid);
storageIDs[i] = "storageID-" + i;
}
// Set node 0 decommissioned.
locs[0].setDecommissioned();
// Create LocatedBlock with above locations.
ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes);
List<LocatedBlock> blocks = new ArrayList<>();
blocks.add(block);
// Test client located at locs[3] in cluster.
final String targetIpInCluster = locs[3].getIpAddr();
dm.sortLocatedBlocks(targetIpInCluster, blocks);
DatanodeInfo[] sortedLocs = block.getLocations();
assertEquals(totalDNs, sortedLocs.length);
// Ensure the local node is first.
assertEquals(targetIpInCluster, sortedLocs[0].getIpAddr());
// Ensure choose fast storage type node when distance is same.
assertEquals(locs[3].getIpAddr(), sortedLocs[0].getIpAddr());
assertEquals(locs[2].getIpAddr(), sortedLocs[1].getIpAddr());
assertEquals(locs[4].getIpAddr(), sortedLocs[2].getIpAddr());
assertEquals(locs[1].getIpAddr(), sortedLocs[3].getIpAddr());
// Ensure the two decommissioned DNs were moved to the end.
assertThat(sortedLocs[4].getAdminState(),
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
assertEquals(locs[0].getIpAddr(), sortedLocs[4].getIpAddr());
// Test client not in cluster but same rack with locs[3].
final String targetIpNotInCluster = locs[3].getIpAddr() + "-client";
dm.sortLocatedBlocks(targetIpNotInCluster, blocks);
DatanodeInfo[] sortedLocs2 = block.getLocations();
assertEquals(totalDNs, sortedLocs2.length);
// Ensure the local rack is first and choose fast storage type node
// when distance is same.
assertEquals(locs[2].getIpAddr(), sortedLocs2[0].getIpAddr());
assertEquals(locs[3].getIpAddr(), sortedLocs2[1].getIpAddr());
assertEquals(locs[4].getIpAddr(), sortedLocs2[2].getIpAddr());
assertEquals(locs[1].getIpAddr(), sortedLocs2[3].getIpAddr());
// Ensure the two decommissioned DNs were moved to the end.
assertThat(sortedLocs[4].getAdminState(),
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr());
}
@Test
public void testGetBlockLocationConsiderStorageTypeAndLoad()
throws IOException, URISyntaxException {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY,
true);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY, true);
conf.setBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
URL shellScript = getClass()
.getResource("/" + Shell.appendScriptExtension("topology-script"));
Path resourcePath = Paths.get(shellScript.toURI());
FileUtil.setExecutable(resourcePath.toFile(), true);
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
resourcePath.toString());
DatanodeManager dm = mockDatanodeManager(fsn, conf);
int totalDNs = 5;
// Register 5 datanodes and 2 nodes per rack with different load.
DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
String[] storageIDs = new String[totalDNs];
List<StorageType> storageTypesList =
new ArrayList<>(Arrays.asList(StorageType.DISK, StorageType.DISK,
StorageType.DEFAULT, StorageType.SSD, StorageType.SSD));
StorageType[] storageTypes = storageTypesList.toArray(new StorageType[0]);
for (int i = 0; i < totalDNs; i++) {
// Register new datanode.
String uuid = "UUID-" + i;
String ip = "IP-" + i / 2 + "-" + i;
DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
Mockito.when(dr.getIpAddr()).thenReturn(ip);
dm.registerDatanode(dr);
// Get location and storage information.
locs[i] = dm.getDatanode(uuid);
storageIDs[i] = "storageID-" + i;
// Set load for datanodes.
locs[i].setXceiverCount(i);
}
// Set node 0 decommissioned.
locs[0].setDecommissioned();
// Create LocatedBlock with above locations.
ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes);
List<LocatedBlock> blocks = new ArrayList<>();
blocks.add(block);
// Test client located at locs[3] in cluster.
final String targetIpInCluster = locs[3].getIpAddr();
dm.sortLocatedBlocks(targetIpInCluster, blocks);
DatanodeInfo[] sortedLocs = block.getLocations();
assertEquals(totalDNs, sortedLocs.length);
// Ensure the local node is first.
assertEquals(targetIpInCluster, sortedLocs[0].getIpAddr());
// Ensure choose the light weight node between light weight and fast storage
// type node when distance is same.
assertEquals(locs[3].getIpAddr(), sortedLocs[0].getIpAddr());
assertEquals(locs[2].getIpAddr(), sortedLocs[1].getIpAddr());
assertEquals(locs[1].getIpAddr(), sortedLocs[2].getIpAddr());
assertEquals(locs[4].getIpAddr(), sortedLocs[3].getIpAddr());
// Ensure the two decommissioned DNs were moved to the end.
assertThat(sortedLocs[4].getAdminState(),
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
assertEquals(locs[0].getIpAddr(), sortedLocs[4].getIpAddr());
// Test client not in cluster but same rack with locs[3].
final String targetIpNotInCluster = locs[3].getIpAddr() + "-client";
dm.sortLocatedBlocks(targetIpNotInCluster, blocks);
DatanodeInfo[] sortedLocs2 = block.getLocations();
assertEquals(totalDNs, sortedLocs2.length);
// Ensure the local rack is first and choose the light weight node between
// light weight and fast storage type node when distance is same.
assertEquals(locs[2].getIpAddr(), sortedLocs2[0].getIpAddr());
assertEquals(locs[3].getIpAddr(), sortedLocs2[1].getIpAddr());
assertEquals(locs[1].getIpAddr(), sortedLocs2[2].getIpAddr());
assertEquals(locs[4].getIpAddr(), sortedLocs2[3].getIpAddr());
// Ensure the two decommissioned DNs were moved to the end.
assertThat(sortedLocs[4].getAdminState(),
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr());
}
/**
* Test whether removing a host from the includes list without adding it to
* the excludes list will exclude it from data node reports.