HDFS-15255. Consider StorageType when DatanodeManager#sortLocatedBlock(). Contributed by Lisheng Sun.
This commit is contained in:
parent
ef41229f70
commit
433aaeefa4
@ -950,6 +950,7 @@ public <T extends Node> void sortByDistanceUsingNetworkLocation(Node reader,
|
||||
* <p>
|
||||
* As an additional twist, we also randomize the nodes at each network
|
||||
* distance. This helps with load balancing when there is data skew.
|
||||
* And it helps choose node with more fast storage type.
|
||||
*
|
||||
* @param reader Node where data will be read
|
||||
* @param nodes Available replicas with the requested data
|
||||
|
@ -158,7 +158,7 @@ public ExtendedBlock getBlock() {
|
||||
* {@link org.apache.hadoop.hdfs.protocol.LocatedBlock#updateCachedStorageInfo}
|
||||
* to update the cached Storage ID/Type arrays.
|
||||
*/
|
||||
public DatanodeInfo[] getLocations() {
|
||||
public DatanodeInfoWithStorage[] getLocations() {
|
||||
return locs;
|
||||
}
|
||||
|
||||
|
@ -46,6 +46,7 @@
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.ha.HAServiceStatus;
|
||||
@ -58,6 +59,7 @@
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
@ -537,7 +539,10 @@ private static LocatedBlock getMockLocatedBlock(final String nsId) {
|
||||
DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0",
|
||||
1111, 1112, 1113, 1114);
|
||||
DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId);
|
||||
when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo});
|
||||
DatanodeInfoWithStorage datanodeInfoWithStorage =
|
||||
new DatanodeInfoWithStorage(dnInfo, "storageID", StorageType.DEFAULT);
|
||||
when(lb.getLocations())
|
||||
.thenReturn(new DatanodeInfoWithStorage[] {datanodeInfoWithStorage});
|
||||
ExtendedBlock eb = mock(ExtendedBlock.class);
|
||||
when(eb.getBlockPoolId()).thenReturn(nsId);
|
||||
when(lb.getBlock()).thenReturn(eb);
|
||||
|
@ -243,6 +243,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
"dfs.namenode.read.considerLoad";
|
||||
public static final boolean DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT =
|
||||
false;
|
||||
public static final String DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY =
|
||||
"dfs.namenode.read.considerStorageType";
|
||||
public static final boolean DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_DEFAULT =
|
||||
false;
|
||||
public static final String DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR =
|
||||
"dfs.namenode.redundancy.considerLoad.factor";
|
||||
public static final double
|
||||
|
@ -137,6 +137,9 @@ public class DatanodeManager {
|
||||
/** Whether or not to consider lad for reading. */
|
||||
private final boolean readConsiderLoad;
|
||||
|
||||
/** Whether or not to consider storageType for reading. */
|
||||
private final boolean readConsiderStorageType;
|
||||
|
||||
/**
|
||||
* Whether or not to avoid using stale DataNodes for writing.
|
||||
* Note that, even if this is configured, the policy may be
|
||||
@ -320,6 +323,15 @@ public class DatanodeManager {
|
||||
this.readConsiderLoad = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT);
|
||||
this.readConsiderStorageType = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_DEFAULT);
|
||||
LOG.warn(
|
||||
"{} and {} are incompatible and only one can be enabled. "
|
||||
+ "Both are currently enabled.",
|
||||
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY);
|
||||
|
||||
this.avoidStaleDataNodesForWrite = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
|
||||
@ -524,7 +536,7 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
|
||||
}
|
||||
}
|
||||
|
||||
DatanodeInfo[] di = lb.getLocations();
|
||||
DatanodeInfoWithStorage[] di = lb.getLocations();
|
||||
// Move decommissioned/stale datanodes to the bottom
|
||||
Arrays.sort(di, comparator);
|
||||
|
||||
@ -547,10 +559,15 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
|
||||
lb.updateCachedStorageInfo();
|
||||
}
|
||||
|
||||
private Consumer<List<DatanodeInfo>> createSecondaryNodeSorter() {
|
||||
Consumer<List<DatanodeInfo>> secondarySort = null;
|
||||
private Consumer<List<DatanodeInfoWithStorage>> createSecondaryNodeSorter() {
|
||||
Consumer<List<DatanodeInfoWithStorage>> secondarySort = null;
|
||||
if (readConsiderStorageType) {
|
||||
Comparator<DatanodeInfoWithStorage> comp =
|
||||
Comparator.comparing(DatanodeInfoWithStorage::getStorageType);
|
||||
secondarySort = list -> Collections.sort(list, comp);
|
||||
}
|
||||
if (readConsiderLoad) {
|
||||
Comparator<DatanodeInfo> comp =
|
||||
Comparator<DatanodeInfoWithStorage> comp =
|
||||
Comparator.comparingInt(DatanodeInfo::getXceiverCount);
|
||||
secondarySort = list -> Collections.sort(list, comp);
|
||||
}
|
||||
|
@ -43,6 +43,7 @@
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -935,6 +936,11 @@ public void setCachedLocations(LocatedBlocks locations) {
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressFBWarnings(
|
||||
value="EC_UNRELATED_TYPES",
|
||||
justification="HDFS-15255 Asked Wei-Chiu and Pifta to review this" +
|
||||
" warning and we all agree the code is OK and the warning is not " +
|
||||
"needed")
|
||||
private void setCachedLocations(LocatedBlock block) {
|
||||
CachedBlock cachedBlock =
|
||||
new CachedBlock(block.getBlock().getBlockId(),
|
||||
|
@ -327,9 +327,23 @@
|
||||
<description>
|
||||
Decide if sort block locations considers the target's load or not when read.
|
||||
Turn off by default.
|
||||
It is not possible to enable this feature along with dfs.namenode.read.considerStorageType as only one sort can be
|
||||
enabled at a time.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.read.considerStorageType</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
Decide if sort block locations considers the target's storage type or not when read. Any locations with the same
|
||||
network distance are sorted in order of the storage speed, fastest first (RAM, SSD, Disk, Archive). This is
|
||||
disabled by default, and the locations will be ordered randomly.
|
||||
It is not possible to enable this feature along with dfs.namenode.read.considerLoad as only one sort can be
|
||||
enabled at a time.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.datanode.httpserver.filter.handlers</name>
|
||||
<value>org.apache.hadoop.hdfs.server.datanode.web.RestCsrfPreventionFilterHandler</value>
|
||||
|
@ -35,9 +35,11 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.net.unix.DomainSocket;
|
||||
@ -273,7 +275,10 @@ public void testReadWithoutPreferredCachingReplica() throws IOException {
|
||||
DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", 1111,
|
||||
1112, 1113, 1114);
|
||||
DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId);
|
||||
when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo});
|
||||
DatanodeInfoWithStorage dnInfoStorage =
|
||||
new DatanodeInfoWithStorage(dnInfo, "DISK", StorageType.DISK);
|
||||
when(lb.getLocations()).thenReturn(
|
||||
new DatanodeInfoWithStorage[] {dnInfoStorage});
|
||||
DatanodeInfo retDNInfo =
|
||||
dfsInputStream.getBestNodeDNAddrPair(lb, null).info;
|
||||
assertEquals(dnInfo, retDNInfo);
|
||||
|
@ -668,6 +668,179 @@ public void testGetBlockLocationConsiderLoadWithNodesOfSameDistance()
|
||||
assertEquals(2, ipSet.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetBlockLocationConsiderStorageType()
|
||||
throws IOException, URISyntaxException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY,
|
||||
true);
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
|
||||
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
|
||||
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
|
||||
URL shellScript = getClass()
|
||||
.getResource("/" + Shell.appendScriptExtension("topology-script"));
|
||||
Path resourcePath = Paths.get(shellScript.toURI());
|
||||
FileUtil.setExecutable(resourcePath.toFile(), true);
|
||||
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
|
||||
resourcePath.toString());
|
||||
DatanodeManager dm = mockDatanodeManager(fsn, conf);
|
||||
|
||||
int totalDNs = 5;
|
||||
// Register 5 datanodes and 2 nodes per rack with different load.
|
||||
DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
|
||||
String[] storageIDs = new String[totalDNs];
|
||||
List<StorageType> storageTypesList =
|
||||
new ArrayList<>(Arrays.asList(StorageType.ARCHIVE, StorageType.DISK,
|
||||
StorageType.SSD, StorageType.DEFAULT, StorageType.SSD));
|
||||
StorageType[] storageTypes = storageTypesList.toArray(new StorageType[0]);
|
||||
|
||||
for (int i = 0; i < totalDNs; i++) {
|
||||
// Register new datanode.
|
||||
String uuid = "UUID-" + i;
|
||||
String ip = "IP-" + i / 2 + "-" + i;
|
||||
DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
|
||||
Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
|
||||
Mockito.when(dr.getIpAddr()).thenReturn(ip);
|
||||
dm.registerDatanode(dr);
|
||||
|
||||
// Get location and storage information.
|
||||
locs[i] = dm.getDatanode(uuid);
|
||||
storageIDs[i] = "storageID-" + i;
|
||||
}
|
||||
|
||||
// Set node 0 decommissioned.
|
||||
locs[0].setDecommissioned();
|
||||
|
||||
// Create LocatedBlock with above locations.
|
||||
ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
|
||||
LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes);
|
||||
List<LocatedBlock> blocks = new ArrayList<>();
|
||||
blocks.add(block);
|
||||
|
||||
// Test client located at locs[3] in cluster.
|
||||
final String targetIpInCluster = locs[3].getIpAddr();
|
||||
dm.sortLocatedBlocks(targetIpInCluster, blocks);
|
||||
DatanodeInfo[] sortedLocs = block.getLocations();
|
||||
assertEquals(totalDNs, sortedLocs.length);
|
||||
// Ensure the local node is first.
|
||||
assertEquals(targetIpInCluster, sortedLocs[0].getIpAddr());
|
||||
// Ensure choose fast storage type node when distance is same.
|
||||
assertEquals(locs[3].getIpAddr(), sortedLocs[0].getIpAddr());
|
||||
assertEquals(locs[2].getIpAddr(), sortedLocs[1].getIpAddr());
|
||||
assertEquals(locs[4].getIpAddr(), sortedLocs[2].getIpAddr());
|
||||
assertEquals(locs[1].getIpAddr(), sortedLocs[3].getIpAddr());
|
||||
// Ensure the two decommissioned DNs were moved to the end.
|
||||
assertThat(sortedLocs[4].getAdminState(),
|
||||
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
|
||||
assertEquals(locs[0].getIpAddr(), sortedLocs[4].getIpAddr());
|
||||
|
||||
// Test client not in cluster but same rack with locs[3].
|
||||
final String targetIpNotInCluster = locs[3].getIpAddr() + "-client";
|
||||
dm.sortLocatedBlocks(targetIpNotInCluster, blocks);
|
||||
DatanodeInfo[] sortedLocs2 = block.getLocations();
|
||||
assertEquals(totalDNs, sortedLocs2.length);
|
||||
// Ensure the local rack is first and choose fast storage type node
|
||||
// when distance is same.
|
||||
assertEquals(locs[2].getIpAddr(), sortedLocs2[0].getIpAddr());
|
||||
assertEquals(locs[3].getIpAddr(), sortedLocs2[1].getIpAddr());
|
||||
assertEquals(locs[4].getIpAddr(), sortedLocs2[2].getIpAddr());
|
||||
assertEquals(locs[1].getIpAddr(), sortedLocs2[3].getIpAddr());
|
||||
// Ensure the two decommissioned DNs were moved to the end.
|
||||
assertThat(sortedLocs[4].getAdminState(),
|
||||
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
|
||||
assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetBlockLocationConsiderStorageTypeAndLoad()
|
||||
throws IOException, URISyntaxException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY,
|
||||
true);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY, true);
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
|
||||
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
|
||||
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
|
||||
URL shellScript = getClass()
|
||||
.getResource("/" + Shell.appendScriptExtension("topology-script"));
|
||||
Path resourcePath = Paths.get(shellScript.toURI());
|
||||
FileUtil.setExecutable(resourcePath.toFile(), true);
|
||||
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
|
||||
resourcePath.toString());
|
||||
DatanodeManager dm = mockDatanodeManager(fsn, conf);
|
||||
|
||||
int totalDNs = 5;
|
||||
// Register 5 datanodes and 2 nodes per rack with different load.
|
||||
DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
|
||||
String[] storageIDs = new String[totalDNs];
|
||||
List<StorageType> storageTypesList =
|
||||
new ArrayList<>(Arrays.asList(StorageType.DISK, StorageType.DISK,
|
||||
StorageType.DEFAULT, StorageType.SSD, StorageType.SSD));
|
||||
StorageType[] storageTypes = storageTypesList.toArray(new StorageType[0]);
|
||||
|
||||
for (int i = 0; i < totalDNs; i++) {
|
||||
// Register new datanode.
|
||||
String uuid = "UUID-" + i;
|
||||
String ip = "IP-" + i / 2 + "-" + i;
|
||||
DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
|
||||
Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
|
||||
Mockito.when(dr.getIpAddr()).thenReturn(ip);
|
||||
dm.registerDatanode(dr);
|
||||
|
||||
// Get location and storage information.
|
||||
locs[i] = dm.getDatanode(uuid);
|
||||
storageIDs[i] = "storageID-" + i;
|
||||
|
||||
// Set load for datanodes.
|
||||
locs[i].setXceiverCount(i);
|
||||
}
|
||||
|
||||
// Set node 0 decommissioned.
|
||||
locs[0].setDecommissioned();
|
||||
|
||||
// Create LocatedBlock with above locations.
|
||||
ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
|
||||
LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes);
|
||||
List<LocatedBlock> blocks = new ArrayList<>();
|
||||
blocks.add(block);
|
||||
|
||||
// Test client located at locs[3] in cluster.
|
||||
final String targetIpInCluster = locs[3].getIpAddr();
|
||||
dm.sortLocatedBlocks(targetIpInCluster, blocks);
|
||||
DatanodeInfo[] sortedLocs = block.getLocations();
|
||||
assertEquals(totalDNs, sortedLocs.length);
|
||||
// Ensure the local node is first.
|
||||
assertEquals(targetIpInCluster, sortedLocs[0].getIpAddr());
|
||||
// Ensure choose the light weight node between light weight and fast storage
|
||||
// type node when distance is same.
|
||||
assertEquals(locs[3].getIpAddr(), sortedLocs[0].getIpAddr());
|
||||
assertEquals(locs[2].getIpAddr(), sortedLocs[1].getIpAddr());
|
||||
assertEquals(locs[1].getIpAddr(), sortedLocs[2].getIpAddr());
|
||||
assertEquals(locs[4].getIpAddr(), sortedLocs[3].getIpAddr());
|
||||
// Ensure the two decommissioned DNs were moved to the end.
|
||||
assertThat(sortedLocs[4].getAdminState(),
|
||||
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
|
||||
assertEquals(locs[0].getIpAddr(), sortedLocs[4].getIpAddr());
|
||||
|
||||
// Test client not in cluster but same rack with locs[3].
|
||||
final String targetIpNotInCluster = locs[3].getIpAddr() + "-client";
|
||||
dm.sortLocatedBlocks(targetIpNotInCluster, blocks);
|
||||
DatanodeInfo[] sortedLocs2 = block.getLocations();
|
||||
assertEquals(totalDNs, sortedLocs2.length);
|
||||
// Ensure the local rack is first and choose the light weight node between
|
||||
// light weight and fast storage type node when distance is same.
|
||||
assertEquals(locs[2].getIpAddr(), sortedLocs2[0].getIpAddr());
|
||||
assertEquals(locs[3].getIpAddr(), sortedLocs2[1].getIpAddr());
|
||||
assertEquals(locs[1].getIpAddr(), sortedLocs2[2].getIpAddr());
|
||||
assertEquals(locs[4].getIpAddr(), sortedLocs2[3].getIpAddr());
|
||||
// Ensure the two decommissioned DNs were moved to the end.
|
||||
assertThat(sortedLocs[4].getAdminState(),
|
||||
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
|
||||
assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test whether removing a host from the includes list without adding it to
|
||||
* the excludes list will exclude it from data node reports.
|
||||
|
Loading…
Reference in New Issue
Block a user