HDFS-15879. Exclude slow nodes when choose targets for blocks (#2748)
Reviewed-by: Dinesh Chitlangia <dineshc@apache.org> Reviewed-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
parent
f5c1557288
commit
72037a63b1
@ -1023,6 +1023,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
"dfs.datanode.outliers.report.interval";
|
||||
public static final String DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT =
|
||||
"30m";
|
||||
public static final String DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY =
|
||||
"dfs.namenode.max.slowpeer.collect.nodes";
|
||||
public static final int DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT =
|
||||
5;
|
||||
public static final String DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY =
|
||||
"dfs.namenode.slowpeer.collect.interval";
|
||||
public static final String DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT =
|
||||
"30m";
|
||||
|
||||
// property for fsimage compression
|
||||
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
|
||||
@ -1176,6 +1184,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
public static final String DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_KEY =
|
||||
"dfs.namenode.block-placement-policy.default.prefer-local-node";
|
||||
public static final boolean DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_DEFAULT = true;
|
||||
public static final String
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY =
|
||||
"dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled";
|
||||
public static final boolean
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT =
|
||||
false;
|
||||
|
||||
public static final String DFS_NAMENODE_GC_TIME_MONITOR_ENABLE =
|
||||
"dfs.namenode.gc.time.monitor.enable";
|
||||
public static final boolean DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT =
|
||||
|
@ -17,6 +17,8 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
@ -82,7 +84,8 @@ private enum NodeNotChosenReason {
|
||||
NODE_TOO_BUSY("the node is too busy"),
|
||||
TOO_MANY_NODES_ON_RACK("the rack has too many chosen nodes"),
|
||||
NOT_ENOUGH_STORAGE_SPACE("not enough storage space to place the block"),
|
||||
NO_REQUIRED_STORAGE_TYPE("required storage types are unavailable");
|
||||
NO_REQUIRED_STORAGE_TYPE("required storage types are unavailable"),
|
||||
NODE_SLOW("the node is too slow");
|
||||
|
||||
private final String text;
|
||||
|
||||
@ -99,6 +102,8 @@ private String getText() {
|
||||
private boolean considerLoadByStorageType;
|
||||
protected double considerLoadFactor;
|
||||
private boolean preferLocalNode;
|
||||
private boolean dataNodePeerStatsEnabled;
|
||||
private boolean excludeSlowNodesEnabled;
|
||||
protected NetworkTopology clusterMap;
|
||||
protected Host2NodesMap host2datanodeMap;
|
||||
private FSClusterStats stats;
|
||||
@ -144,6 +149,12 @@ public void initialize(Configuration conf, FSClusterStats stats,
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_KEY,
|
||||
DFSConfigKeys.
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_DEFAULT);
|
||||
this.dataNodePeerStatsEnabled = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
|
||||
this.excludeSlowNodesEnabled = conf.getBoolean(
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1091,6 +1102,15 @@ boolean isGoodDatanode(DatanodeDescriptor node,
|
||||
return false;
|
||||
}
|
||||
|
||||
// check if the target is a slow node
|
||||
if (dataNodePeerStatsEnabled && excludeSlowNodesEnabled) {
|
||||
Set<Node> nodes = DatanodeManager.getSlowNodes();
|
||||
if (nodes.contains(node)) {
|
||||
logNodeIsNotChosen(node, NodeNotChosenReason.NODE_SLOW);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -18,8 +18,12 @@
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses;
|
||||
@ -53,6 +57,7 @@
|
||||
import org.apache.hadoop.net.*;
|
||||
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
|
||||
@ -201,8 +206,16 @@ public class DatanodeManager {
|
||||
*/
|
||||
private final boolean useDfsNetworkTopology;
|
||||
|
||||
private static final String IP_PORT_SEPARATOR = ":";
|
||||
|
||||
@Nullable
|
||||
private final SlowPeerTracker slowPeerTracker;
|
||||
private static Set<Node> slowNodesSet = Sets.newConcurrentHashSet();
|
||||
private Daemon slowPeerCollectorDaemon;
|
||||
private final long slowPeerCollectionInterval;
|
||||
private final int maxSlowPeerReportNodes;
|
||||
private boolean excludeSlowNodesEnabled;
|
||||
|
||||
@Nullable
|
||||
private final SlowDiskTracker slowDiskTracker;
|
||||
|
||||
@ -242,11 +255,22 @@ public class DatanodeManager {
|
||||
DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
|
||||
DFSConfigKeys.
|
||||
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_DEFAULT));
|
||||
|
||||
final Timer timer = new Timer();
|
||||
this.slowPeerTracker = dataNodePeerStatsEnabled ?
|
||||
new SlowPeerTracker(conf, timer) : null;
|
||||
|
||||
this.excludeSlowNodesEnabled = conf.getBoolean(
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT);
|
||||
this.maxSlowPeerReportNodes = conf.getInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT);
|
||||
this.slowPeerCollectionInterval = conf.getTimeDuration(
|
||||
DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
if (slowPeerTracker != null && excludeSlowNodesEnabled) {
|
||||
startSlowPeerCollector();
|
||||
}
|
||||
this.slowDiskTracker = dataNodeDiskStatsEnabled ?
|
||||
new SlowDiskTracker(conf, timer) : null;
|
||||
|
||||
@ -356,6 +380,44 @@ public class DatanodeManager {
|
||||
DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT);
|
||||
}
|
||||
|
||||
private void startSlowPeerCollector() {
|
||||
if (slowPeerCollectorDaemon != null) {
|
||||
return;
|
||||
}
|
||||
slowPeerCollectorDaemon = new Daemon(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
slowNodesSet = getSlowPeers();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to collect slow peers", e);
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(slowPeerCollectionInterval);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Slow peers collection thread interrupted", e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
slowPeerCollectorDaemon.start();
|
||||
}
|
||||
|
||||
public void stopSlowPeerCollector() {
|
||||
if (slowPeerCollectorDaemon == null) {
|
||||
return;
|
||||
}
|
||||
slowPeerCollectorDaemon.interrupt();
|
||||
try {
|
||||
slowPeerCollectorDaemon.join();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Slow peers collection thread did not shutdown", e);
|
||||
}
|
||||
}
|
||||
|
||||
private static long getStaleIntervalFromConf(Configuration conf,
|
||||
long heartbeatExpireInterval) {
|
||||
long staleInterval = conf.getLong(
|
||||
@ -401,6 +463,7 @@ void activate(final Configuration conf) {
|
||||
void close() {
|
||||
datanodeAdminManager.close();
|
||||
heartbeatManager.close();
|
||||
stopSlowPeerCollector();
|
||||
}
|
||||
|
||||
/** @return the network topology. */
|
||||
@ -2019,6 +2082,48 @@ public String getSlowPeersReport() {
|
||||
return slowPeerTracker != null ? slowPeerTracker.getJson() : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all tracking slow peers.
|
||||
* @return
|
||||
*/
|
||||
public Set<Node> getSlowPeers() {
|
||||
Set<Node> slowPeersSet = Sets.newConcurrentHashSet();
|
||||
if (slowPeerTracker == null) {
|
||||
return slowPeersSet;
|
||||
}
|
||||
ArrayList<String> slowNodes =
|
||||
slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
|
||||
for (String slowNode : slowNodes) {
|
||||
if (StringUtils.isBlank(slowNode)
|
||||
|| !slowNode.contains(IP_PORT_SEPARATOR)) {
|
||||
continue;
|
||||
}
|
||||
String ipAddr = slowNode.split(IP_PORT_SEPARATOR)[0];
|
||||
DatanodeDescriptor datanodeByHost =
|
||||
host2DatanodeMap.getDatanodeByHost(ipAddr);
|
||||
if (datanodeByHost != null) {
|
||||
slowPeersSet.add(datanodeByHost);
|
||||
}
|
||||
}
|
||||
return slowPeersSet;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all tracking slow peers.
|
||||
* @return
|
||||
*/
|
||||
public static Set<Node> getSlowNodes() {
|
||||
return slowNodesSet;
|
||||
}
|
||||
|
||||
/**
|
||||
* Use only for testing.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public SlowPeerTracker getSlowPeerTracker() {
|
||||
return slowPeerTracker;
|
||||
}
|
||||
|
||||
/**
|
||||
* Use only for testing.
|
||||
*/
|
||||
|
@ -34,6 +34,7 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
@ -233,6 +234,23 @@ public SortedSet<String> getReportingNodes() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all tracking slow peers.
|
||||
* @param numNodes
|
||||
* @return
|
||||
*/
|
||||
public ArrayList<String> getSlowNodes(int numNodes) {
|
||||
Collection<ReportForJson> jsonReports = getJsonReports(numNodes);
|
||||
ArrayList<String> slowNodes = new ArrayList<>();
|
||||
for (ReportForJson jsonReport : jsonReports) {
|
||||
slowNodes.add(jsonReport.getSlowNode());
|
||||
}
|
||||
if (!slowNodes.isEmpty()) {
|
||||
LOG.warn("Slow nodes list: " + slowNodes);
|
||||
}
|
||||
return slowNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve reports in a structure for generating JSON, limiting the
|
||||
* output to the top numNodes nodes i.e nodes with the most reports.
|
||||
|
@ -2368,6 +2368,36 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
If this is set to true, we will filter out slow nodes
|
||||
when choosing targets for blocks.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.max.slowpeer.collect.nodes</name>
|
||||
<value>5</value>
|
||||
<description>
|
||||
How many slow nodes we will collect for filtering out
|
||||
when choosing targets for blocks.
|
||||
|
||||
It is ignored if dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled is false.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.slowpeer.collect.interval</name>
|
||||
<value>30m</value>
|
||||
<description>
|
||||
Interval at which the slow peer trackers runs in the background to collect slow peers.
|
||||
|
||||
It is ignored if dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled is false.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.datanode.fileio.profiling.sampling.percentage</name>
|
||||
<value>0</value>
|
||||
|
@ -0,0 +1,131 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestReplicationPolicyExcludeSlowNodes
|
||||
extends BaseReplicationPolicyTest {
|
||||
|
||||
public TestReplicationPolicyExcludeSlowNodes(String blockPlacementPolicy) {
|
||||
this.blockPlacementPolicy = blockPlacementPolicy;
|
||||
}
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Iterable<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{BlockPlacementPolicyDefault.class.getName()},
|
||||
{BlockPlacementPolicyWithUpgradeDomain.class.getName()},
|
||||
{AvailableSpaceBlockPlacementPolicy.class.getName()},
|
||||
{BlockPlacementPolicyRackFaultTolerant.class.getName()},
|
||||
{AvailableSpaceRackFaultTolerantBlockPlacementPolicy.class.getName()},
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
|
||||
conf.setBoolean(DFSConfigKeys
|
||||
.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
|
||||
true);
|
||||
conf.setStrings(DFSConfigKeys
|
||||
.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY,
|
||||
"1s");
|
||||
conf.setBoolean(DFSConfigKeys
|
||||
.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
|
||||
true);
|
||||
final String[] racks = {
|
||||
"/rack1",
|
||||
"/rack2",
|
||||
"/rack3",
|
||||
"/rack4",
|
||||
"/rack5",
|
||||
"/rack6"};
|
||||
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
|
||||
return DFSTestUtil.toDatanodeDescriptor(storages);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that chooseTarget when excludeSlowNodesEnabled set to true.
|
||||
*/
|
||||
@Test
|
||||
public void testChooseTargetExcludeSlowNodes() throws Exception {
|
||||
namenode.getNamesystem().writeLock();
|
||||
try {
|
||||
// add nodes
|
||||
for (int i = 0; i < dataNodes.length; i++) {
|
||||
dnManager.addDatanode(dataNodes[i]);
|
||||
}
|
||||
|
||||
// mock slow nodes
|
||||
SlowPeerTracker tracker = dnManager.getSlowPeerTracker();
|
||||
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[3].getInfoAddr());
|
||||
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[4].getInfoAddr());
|
||||
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[4].getInfoAddr());
|
||||
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[5].getInfoAddr());
|
||||
tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[3].getInfoAddr());
|
||||
tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[5].getInfoAddr());
|
||||
|
||||
// waiting for slow nodes collector run
|
||||
Thread.sleep(3000);
|
||||
|
||||
// fetch slow nodes
|
||||
Set<Node> slowPeers = dnManager.getSlowPeers();
|
||||
|
||||
// assert slow nodes
|
||||
assertEquals(3, slowPeers.size());
|
||||
for (int i = 0; i < slowPeers.size(); i++) {
|
||||
assertTrue(slowPeers.contains(dataNodes[i]));
|
||||
}
|
||||
|
||||
// mock writer
|
||||
DatanodeDescriptor writerDn = dataNodes[0];
|
||||
|
||||
// call chooseTarget()
|
||||
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
|
||||
.getBlockPlacementPolicy().chooseTarget("testFile.txt", 3,
|
||||
writerDn, new ArrayList<DatanodeStorageInfo>(), false, null,
|
||||
1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
|
||||
|
||||
// assert targets
|
||||
assertEquals(3, targets.length);
|
||||
for (int i = 0; i < targets.length; i++) {
|
||||
assertTrue(!slowPeers.contains(targets[i].getDatanodeDescriptor()));
|
||||
}
|
||||
} finally {
|
||||
namenode.getNamesystem().writeUnlock();
|
||||
}
|
||||
NameNode.LOG.info("Done working on it");
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user