HDFS-16528. Reconfigure slow peer enable for Namenode (#4251)

This commit is contained in:
Viraj Jasani 2022-05-01 17:03:02 -07:00 committed by GitHub
parent cc204c9611
commit ee450bbbc6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 212 additions and 40 deletions

View File

@ -193,10 +193,6 @@ public class DatanodeManager {
private final HashMap<String, Integer> datanodesSoftwareVersions =
new HashMap<>(4, 0.75f);
/**
* True if we should process latency metrics from downstream peers.
*/
private final boolean dataNodePeerStatsEnabled;
/**
* True if we should process latency metrics from individual DN disks.
*/
@ -210,7 +206,7 @@ public class DatanodeManager {
private static final String IP_PORT_SEPARATOR = ":";
@Nullable
private final SlowPeerTracker slowPeerTracker;
private SlowPeerTracker slowPeerTracker;
private static Set<String> slowNodesUuidSet = Sets.newConcurrentHashSet();
private Daemon slowPeerCollectorDaemon;
private final long slowPeerCollectionInterval;
@ -247,16 +243,15 @@ public class DatanodeManager {
this.datanodeAdminManager = new DatanodeAdminManager(namesystem,
blockManager, heartbeatManager);
this.fsClusterStats = newFSClusterStats();
this.dataNodePeerStatsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
this.dataNodeDiskStatsEnabled = Util.isDiskStatsEnabled(conf.getInt(
DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
DFSConfigKeys.
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_DEFAULT));
final Timer timer = new Timer();
this.slowPeerTracker = dataNodePeerStatsEnabled ?
new SlowPeerTracker(conf, timer) : null;
final boolean dataNodePeerStatsEnabledVal =
conf.getBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
initSlowPeerTracker(conf, timer, dataNodePeerStatsEnabledVal);
this.maxSlowPeerReportNodes = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT);
@ -264,7 +259,7 @@ public class DatanodeManager {
DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
if (slowPeerTracker != null) {
if (slowPeerTracker.isSlowPeerTrackerEnabled()) {
startSlowPeerCollector();
}
this.slowDiskTracker = dataNodeDiskStatsEnabled ?
@ -366,6 +361,21 @@ public class DatanodeManager {
DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT);
}
/**
* Determines whether slow peer tracker should be enabled. If dataNodePeerStatsEnabledVal is
* true, slow peer tracker is initialized.
*
* @param conf The configuration to use while initializing slowPeerTracker.
* @param timer Timer object for slowPeerTracker.
* @param dataNodePeerStatsEnabled To determine whether slow peer tracking should be enabled.
*/
public void initSlowPeerTracker(Configuration conf, Timer timer,
boolean dataNodePeerStatsEnabled) {
this.slowPeerTracker = dataNodePeerStatsEnabled ?
new SlowPeerTracker(conf, timer) :
new SlowPeerDisabledTracker(conf, timer);
}
private void startSlowPeerCollector() {
if (slowPeerCollectorDaemon != null) {
return;
@ -1856,12 +1866,13 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
nodeinfo.setBalancerBandwidth(0);
}
if (slowPeerTracker != null) {
Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
if (slowPeerTracker.isSlowPeerTrackerEnabled()) {
final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers();
if (!slowPeersMap.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("DataNode " + nodeReg + " reported slow peers: " +
slowPeersMap);
LOG.debug("DataNode " + nodeReg + " reported slow peers: " + slowPeersMap);
}
for (String slowNodeId : slowPeersMap.keySet()) {
slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
@ -2109,7 +2120,8 @@ public void setBlockInvalidateLimit(int configuredBlockInvalidateLimit) {
* @return
*/
public String getSlowPeersReport() {
return slowPeerTracker != null ? slowPeerTracker.getJson() : null;
Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
return slowPeerTracker.getJson();
}
/**
@ -2118,11 +2130,9 @@ public String getSlowPeersReport() {
*/
public Set<String> getSlowPeersUuidSet() {
Set<String> slowPeersUuidSet = Sets.newConcurrentHashSet();
if (slowPeerTracker == null) {
return slowPeersUuidSet;
}
ArrayList<String> slowNodes =
slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
List<String> slowNodes;
Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
slowNodes = slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
for (String slowNode : slowNodes) {
if (StringUtils.isBlank(slowNode)
|| !slowNode.contains(IP_PORT_SEPARATOR)) {

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.Timer;
/**
* Disabled tracker for slow peers. To be used when dfs.datanode.peer.stats.enabled is disabled.
*/
@InterfaceAudience.Private
public class SlowPeerDisabledTracker extends SlowPeerTracker {
private static final Logger LOG = LoggerFactory.getLogger(SlowPeerDisabledTracker.class);
public SlowPeerDisabledTracker(Configuration conf, Timer timer) {
super(conf, timer);
final boolean dataNodePeerStatsEnabledVal =
conf.getBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
Preconditions.checkArgument(!dataNodePeerStatsEnabledVal,
"SlowPeerDisabledTracker should only be used for disabled slow peer stats.");
}
@Override
public boolean isSlowPeerTrackerEnabled() {
return false;
}
@Override
public void addReport(String slowNode, String reportingNode) {
LOG.trace("Adding slow peer report is disabled. To enable it, please enable config {}.",
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
}
@Override
public Set<String> getReportsForNode(String slowNode) {
LOG.trace("Retrieval of slow peer report is disabled. To enable it, please enable config {}.",
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
return ImmutableSet.of();
}
@Override
public Map<String, SortedSet<String>> getReportsForAllDataNodes() {
LOG.trace("Retrieval of slow peer report for all nodes is disabled. "
+ "To enable it, please enable config {}.",
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
return ImmutableMap.of();
}
@Override
public String getJson() {
LOG.trace("Retrieval of slow peer reports as json string is disabled. "
+ "To enable it, please enable config {}.",
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
return null;
}
@Override
public List<String> getSlowNodes(int numNodes) {
return ImmutableList.of();
}
}

View File

@ -39,6 +39,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
@ -109,6 +110,15 @@ public SlowPeerTracker(Configuration conf, Timer timer) {
DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_DEFAULT);
}
/**
* If SlowPeerTracker is enabled, return true, else returns false.
*
* @return true if slow peer tracking is enabled, else false.
*/
public boolean isSlowPeerTrackerEnabled() {
return true;
}
/**
* Add a new report. DatanodeIds can be the DataNodeIds or addresses
* We don't care as long as the caller is consistent.
@ -239,7 +249,7 @@ public SortedSet<String> getReportingNodes() {
* @param numNodes
* @return
*/
public ArrayList<String> getSlowNodes(int numNodes) {
public List<String> getSlowNodes(int numNodes) {
Collection<ReportForJson> jsonReports = getJsonReports(numNodes);
ArrayList<String> slowNodes = new ArrayList<>();
for (ReportForJson jsonReport : jsonReports) {

View File

@ -97,6 +97,8 @@
import org.apache.hadoop.util.GcTimeMonitor;
import org.apache.hadoop.util.GcTimeMonitor.Builder;
import org.apache.hadoop.tracing.Tracer;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -121,6 +123,8 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT;
@ -338,7 +342,8 @@ public enum OperationCategory {
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY,
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
DFS_BLOCK_INVALIDATE_LIMIT_KEY));
DFS_BLOCK_INVALIDATE_LIMIT_KEY,
DFS_DATANODE_PEER_STATS_ENABLED_KEY));
private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t["
@ -2204,9 +2209,10 @@ protected String reconfigurePropertyImpl(String property, String newVal)
return newVal;
} else if (property.equals(DFS_IMAGE_PARALLEL_LOAD_KEY)) {
return reconfigureParallelLoad(newVal);
} else if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY)
|| (property.equals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY))
|| (property.equals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY))) {
} else if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY) || (property.equals(
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY)) || (property.equals(
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY)) || (property.equals(
DFS_DATANODE_PEER_STATS_ENABLED_KEY))) {
return reconfigureSlowNodesParameters(datanodeManager, property, newVal);
} else if (property.equals(DFS_BLOCK_INVALIDATE_LIMIT_KEY)) {
return reconfigureBlockInvalidateLimit(datanodeManager, property, newVal);
@ -2402,27 +2408,48 @@ String reconfigureSlowNodesParameters(final DatanodeManager datanodeManager,
namesystem.writeLock();
String result;
try {
if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY)) {
boolean enable = (newVal == null ? DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT :
switch (property) {
case DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY: {
boolean enable = (newVal == null ?
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT :
Boolean.parseBoolean(newVal));
result = Boolean.toString(enable);
datanodeManager.setAvoidSlowDataNodesForReadEnabled(enable);
} else if (property.equals(
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY)) {
break;
}
case DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY: {
boolean enable = (newVal == null ?
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT :
Boolean.parseBoolean(newVal));
result = Boolean.toString(enable);
bm.setExcludeSlowNodesEnabled(enable);
} else if (property.equals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY)) {
break;
}
case DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY: {
int maxSlowpeerCollectNodes = (newVal == null ?
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT :
Integer.parseInt(newVal));
result = Integer.toString(maxSlowpeerCollectNodes);
datanodeManager.setMaxSlowpeerCollectNodes(maxSlowpeerCollectNodes);
} else {
throw new IllegalArgumentException("Unexpected property " +
property + " in reconfigureSlowNodesParameters");
break;
}
case DFS_DATANODE_PEER_STATS_ENABLED_KEY: {
Timer timer = new Timer();
if (newVal != null && !newVal.equalsIgnoreCase("true") && !newVal.equalsIgnoreCase(
"false")) {
throw new IllegalArgumentException(newVal + " is not boolean value");
}
final boolean peerStatsEnabled = newVal == null ?
DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT :
Boolean.parseBoolean(newVal);
result = Boolean.toString(peerStatsEnabled);
datanodeManager.initSlowPeerTracker(getConf(), timer, peerStatsEnabled);
break;
}
default: {
throw new IllegalArgumentException(
"Unexpected property " + property + " in reconfigureSlowNodesParameters");
}
}
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
return result;

View File

@ -46,6 +46,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
@ -480,6 +481,34 @@ public void testBlockInvalidateLimit() throws ReconfigurationException {
datanodeManager.getBlockInvalidateLimit());
}
@Test
public void testSlowPeerTrackerEnabled() throws Exception {
final NameNode nameNode = cluster.getNameNode();
final DatanodeManager datanodeManager = nameNode.namesystem.getBlockManager()
.getDatanodeManager();
assertFalse("SlowNode tracker is already enabled. It should be disabled by default",
datanodeManager.getSlowPeerTracker().isSlowPeerTrackerEnabled());
try {
nameNode.reconfigurePropertyImpl(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "non-boolean");
fail("should not reach here");
} catch (ReconfigurationException e) {
assertEquals(
"Could not change property dfs.datanode.peer.stats.enabled from 'false' to 'non-boolean'",
e.getMessage());
}
nameNode.reconfigurePropertyImpl(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "True");
assertTrue("SlowNode tracker is still disabled. Reconfiguration could not be successful",
datanodeManager.getSlowPeerTracker().isSlowPeerTrackerEnabled());
nameNode.reconfigurePropertyImpl(DFS_DATANODE_PEER_STATS_ENABLED_KEY, null);
assertFalse("SlowNode tracker is still enabled. Reconfiguration could not be successful",
datanodeManager.getSlowPeerTracker().isSlowPeerTrackerEnabled());
}
@After
public void shutDown() throws IOException {
if (cluster != null) {

View File

@ -20,6 +20,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
@ -426,17 +427,18 @@ public void testNameNodeGetReconfigurableProperties() throws IOException {
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("namenode", address, outs, errs);
assertEquals(17, outs.size());
assertEquals(18, outs.size());
assertTrue(outs.get(0).contains("Reconfigurable properties:"));
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1));
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2));
assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(3));
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(4));
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(5));
assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(6));
assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(7));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(8));
assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(9));
assertEquals(DFS_DATANODE_PEER_STATS_ENABLED_KEY, outs.get(4));
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(5));
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(6));
assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(7));
assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(8));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(9));
assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(10));
assertEquals(errs.size(), 0);
}