HDFS-14383. Compute datanode load based on StoragePolicy. Contributed by Ayush Saxena.

(cherry picked from commit 2e8cafac3b)
This commit is contained in:
Ayush Saxena 2020-10-19 10:48:47 +05:30 committed by Wei-Chiu Chuang
parent 56ef16468a
commit 7f20fad419
8 changed files with 180 additions and 3 deletions

View File

@ -239,6 +239,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY; HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY;
public static final boolean DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_DEFAULT = public static final boolean DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_DEFAULT =
true; true;
public static final String
DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY =
"dfs.namenode.redundancy.considerLoadByStorageType";
public static final boolean
DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT = false;
public static final String DFS_NAMENODE_READ_CONSIDERLOAD_KEY = public static final String DFS_NAMENODE_READ_CONSIDERLOAD_KEY =
"dfs.namenode.read.considerLoad"; "dfs.namenode.read.considerLoad";
public static final boolean DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT = public static final boolean DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT =

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY;
import static org.apache.hadoop.util.Time.monotonicNow; import static org.apache.hadoop.util.Time.monotonicNow;
import java.util.*; import java.util.*;
@ -92,7 +94,8 @@ private String getText() {
} }
} }
protected boolean considerLoad; protected boolean considerLoad;
private boolean considerLoadByStorageType;
protected double considerLoadFactor; protected double considerLoadFactor;
private boolean preferLocalNode; private boolean preferLocalNode;
protected NetworkTopology clusterMap; protected NetworkTopology clusterMap;
@ -116,6 +119,9 @@ public void initialize(Configuration conf, FSClusterStats stats,
this.considerLoad = conf.getBoolean( this.considerLoad = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_DEFAULT); DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_DEFAULT);
this.considerLoadByStorageType = conf.getBoolean(
DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY,
DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT);
this.considerLoadFactor = conf.getDouble( this.considerLoadFactor = conf.getDouble(
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR, DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR,
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR_DEFAULT); DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR_DEFAULT);
@ -976,8 +982,9 @@ private static void logNodeIsNotChosen(DatanodeDescriptor node,
* @return Return true if the datanode should be excluded, otherwise false * @return Return true if the datanode should be excluded, otherwise false
*/ */
boolean excludeNodeByLoad(DatanodeDescriptor node){ boolean excludeNodeByLoad(DatanodeDescriptor node){
final double maxLoad = considerLoadFactor * double inServiceXceiverCount = getInServiceXceiverAverage(node);
stats.getInServiceXceiverAverage(); final double maxLoad = considerLoadFactor * inServiceXceiverCount;
final int nodeLoad = node.getXceiverCount(); final int nodeLoad = node.getXceiverCount();
if ((nodeLoad > maxLoad) && (maxLoad > 0)) { if ((nodeLoad > maxLoad) && (maxLoad > 0)) {
logNodeIsNotChosen(node, NodeNotChosenReason.NODE_TOO_BUSY, logNodeIsNotChosen(node, NodeNotChosenReason.NODE_TOO_BUSY,
@ -987,6 +994,48 @@ boolean excludeNodeByLoad(DatanodeDescriptor node){
return false; return false;
} }
/**
* Gets the inServiceXceiver average count for the cluster, if
* considerLoadByStorageType is true, then load is calculated only for the
* storage types present on the datanode.
* @param node the datanode whose storage types are to be taken into account.
* @return the InServiceXceiverAverage count.
*/
private double getInServiceXceiverAverage(DatanodeDescriptor node) {
double inServiceXceiverCount;
if (considerLoadByStorageType) {
inServiceXceiverCount =
getInServiceXceiverAverageByStorageType(node.getStorageTypes());
} else {
inServiceXceiverCount = stats.getInServiceXceiverAverage();
}
return inServiceXceiverCount;
}
/**
* Gets the average xceiver count with respect to the storage types.
* @param storageTypes the storage types.
* @return the average xceiver count wrt the provided storage types.
*/
private double getInServiceXceiverAverageByStorageType(
Set<StorageType> storageTypes) {
double avgLoad = 0;
final Map<StorageType, StorageTypeStats> storageStats =
stats.getStorageTypeStats();
int numNodes = 0;
int numXceiver = 0;
for (StorageType s : storageTypes) {
StorageTypeStats storageTypeStats = storageStats.get(s);
numNodes += storageTypeStats.getNodesInService();
numXceiver += storageTypeStats.getNodesInServiceXceiverCount();
}
if (numNodes != 0) {
avgLoad = (double) numXceiver / numNodes;
}
return avgLoad;
}
/** /**
* Determine if a datanode is good for placing block. * Determine if a datanode is good for placing block.
* *

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses; import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses;
import org.apache.hadoop.fs.StorageType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
@ -1978,6 +1979,11 @@ public double getInServiceXceiverAverage() {
} }
return avgLoad; return avgLoad;
} }
@Override
public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
return heartbeatManager.getStorageTypeStats();
}
}; };
} }

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import java.util.Map;
/** /**
* This interface is used for retrieving the load related statistics of * This interface is used for retrieving the load related statistics of
@ -57,4 +60,10 @@ public interface FSClusterStats {
* writes that are currently occurring on the cluster. * writes that are currently occurring on the cluster.
*/ */
public double getInServiceXceiverAverage(); public double getInServiceXceiverAverage();
/**
* Indicates the storage statistics per storage type.
* @return storage statistics per storage type.
*/
Map<StorageType, StorageTypeStats> getStorageTypeStats();
} }

View File

@ -20,6 +20,7 @@
import java.beans.ConstructorProperties; import java.beans.ConstructorProperties;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
@ -39,6 +40,15 @@ public class StorageTypeStats {
private int nodesInService = 0; private int nodesInService = 0;
private StorageType storageType; private StorageType storageType;
@VisibleForTesting
void setDataNodesInServiceXceiverCount(int avgXceiverPerDatanode,
int numNodesInService) {
this.nodesInService = numNodesInService;
this.nodesInServiceXceiverCount = numNodesInService * avgXceiverPerDatanode;
}
private int nodesInServiceXceiverCount;
@ConstructorProperties({"capacityTotal", "capacityUsed", "capacityNonDfsUsed", @ConstructorProperties({"capacityTotal", "capacityUsed", "capacityNonDfsUsed",
"capacityRemaining", "blockPoolUsed", "nodesInService"}) "capacityRemaining", "blockPoolUsed", "nodesInService"})
public StorageTypeStats( public StorageTypeStats(
@ -101,6 +111,10 @@ public int getNodesInService() {
return nodesInService; return nodesInService;
} }
public int getNodesInServiceXceiverCount() {
return nodesInServiceXceiverCount;
}
StorageTypeStats(StorageType storageType) { StorageTypeStats(StorageType storageType) {
this.storageType = storageType; this.storageType = storageType;
} }
@ -131,6 +145,7 @@ void addStorage(final DatanodeStorageInfo info,
void addNode(final DatanodeDescriptor node) { void addNode(final DatanodeDescriptor node) {
if (node.isInService()) { if (node.isInService()) {
nodesInService++; nodesInService++;
nodesInServiceXceiverCount += node.getXceiverCount();
} }
} }
@ -151,6 +166,7 @@ void subtractStorage(final DatanodeStorageInfo info,
void subtractNode(final DatanodeDescriptor node) { void subtractNode(final DatanodeDescriptor node) {
if (node.isInService()) { if (node.isInService()) {
nodesInService--; nodesInService--;
nodesInServiceXceiverCount -= node.getXceiverCount();
} }
} }
} }

View File

@ -313,6 +313,17 @@
</description> </description>
</property> </property>
<property>
<name>dfs.namenode.redundancy.considerLoadByStorageType</name>
<value>false</value>
<description>
Decide if chooseTarget considers the target's load with respect to the
storage type. Typically to be used when datanodes contain homogenous
storage types. Irrelevent if dfs.namenode.redundancy.considerLoad is
false.
</description>
</property>
<property> <property>
<name>dfs.namenode.redundancy.considerLoad.factor</name> <name>dfs.namenode.redundancy.considerLoad.factor</name>
<value>2.0</value> <value>2.0</value>

View File

@ -33,13 +33,18 @@
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
@ -218,4 +223,51 @@ public void testStorageTypeStatsWhenStorageFailed() throws Exception {
storageTypeStats = storageTypeStatsMap.get(StorageType.ARCHIVE); storageTypeStats = storageTypeStatsMap.get(StorageType.ARCHIVE);
assertEquals(3, storageTypeStats.getNodesInService()); assertEquals(3, storageTypeStats.getNodesInService());
} }
@Test
public void testStorageTypeLoad() throws Exception {
HeartbeatManager heartbeatManager =
cluster.getNamesystem().getBlockManager().getDatanodeManager()
.getHeartbeatManager();
Map<StorageType, StorageTypeStats> storageTypeStatsMap =
heartbeatManager.getStorageTypeStats();
DistributedFileSystem dfs = cluster.getFileSystem();
// Create a file with HOT storage policy.
Path hotSpDir = new Path("/HOT");
dfs.mkdir(hotSpDir, FsPermission.getDirDefault());
dfs.setStoragePolicy(hotSpDir, "HOT");
FSDataOutputStream hotSpFileStream =
dfs.create(new Path(hotSpDir, "hotFile"));
hotSpFileStream.write("Storage Policy Hot".getBytes());
hotSpFileStream.hflush();
// Create a file with COLD storage policy.
Path coldSpDir = new Path("/COLD");
dfs.mkdir(coldSpDir, FsPermission.getDirDefault());
dfs.setStoragePolicy(coldSpDir, "COLD");
FSDataOutputStream coldSpFileStream =
dfs.create(new Path(coldSpDir, "coldFile"));
coldSpFileStream.write("Writing to ARCHIVE storage type".getBytes());
coldSpFileStream.hflush();
// Trigger heartbeats manually to speed up the test.
cluster.triggerHeartbeats();
// The load would be 2*replication since both the
// write xceiver & packet responder threads are counted.
GenericTestUtils.waitFor(() -> storageTypeStatsMap.get(StorageType.DISK)
.getNodesInServiceXceiverCount() == 6, 100, 5000);
// The count for ARCHIVE should be independent of the value of DISK.
GenericTestUtils.waitFor(() -> storageTypeStatsMap.get(StorageType.ARCHIVE)
.getNodesInServiceXceiverCount() == 6, 100, 5000);
// The total count should stay unaffected, that is sum of load from all
// datanodes.
GenericTestUtils
.waitFor(() -> heartbeatManager.getInServiceXceiverCount() == 12, 100,
5000);
IOUtils.closeStreams(hotSpFileStream, coldSpFileStream);
}
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
@ -1618,5 +1619,33 @@ public void testMaxLoad() {
when(node.getXceiverCount()).thenReturn(10); when(node.getXceiverCount()).thenReturn(10);
assertTrue(bppd.excludeNodeByLoad(node)); assertTrue(bppd.excludeNodeByLoad(node));
// Enable load check per storage type.
conf.setBoolean(DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY,
true);
bppd.initialize(conf, statistics, null, null);
Map<StorageType, StorageTypeStats> storageStats = new HashMap<>();
StorageTypeStats diskStorageTypeStats =
new StorageTypeStats(StorageType.DISK);
// Set xceiver count as 500 for DISK.
diskStorageTypeStats.setDataNodesInServiceXceiverCount(50, 10);
storageStats.put(StorageType.DISK, diskStorageTypeStats);
//Set xceiver count as 900 for ARCHIVE
StorageTypeStats archiveStorageTypeStats =
new StorageTypeStats(StorageType.ARCHIVE);
archiveStorageTypeStats.setDataNodesInServiceXceiverCount(10, 90);
storageStats.put(StorageType.ARCHIVE, diskStorageTypeStats);
when(statistics.getStorageTypeStats()).thenReturn(storageStats);
when(node.getXceiverCount()).thenReturn(29);
when(node.getStorageTypes()).thenReturn(EnumSet.of(StorageType.DISK));
when(statistics.getInServiceXceiverAverage()).thenReturn(0.0);
//Added for sanity, the number of datanodes are 100, the average xceiver
// shall be (50*100+90*100)/100 = 14
when(statistics.getInServiceXceiverAverage()).thenReturn(14.0);
when(node.getXceiverCount()).thenReturn(100);
assertFalse(bppd.excludeNodeByLoad(node));
} }
} }