HDFS-16396. Reconfig slow peer parameters for datanode (#3827)
Reviewed-by: Ayush Saxena <ayushsaxena@apache.org>
(cherry picked from commit 0c194f2157
)
This commit is contained in:
parent
4c57fb4d6b
commit
db67952f9f
@ -534,7 +534,7 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
|
|||||||
volumeFailureSummary.getFailedStorageLocations().length : 0;
|
volumeFailureSummary.getFailedStorageLocations().length : 0;
|
||||||
final boolean outliersReportDue = scheduler.isOutliersReportDue(now);
|
final boolean outliersReportDue = scheduler.isOutliersReportDue(now);
|
||||||
final SlowPeerReports slowPeers =
|
final SlowPeerReports slowPeers =
|
||||||
outliersReportDue && dn.getPeerMetrics() != null ?
|
outliersReportDue && dnConf.peerStatsEnabled && dn.getPeerMetrics() != null ?
|
||||||
SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
|
SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
|
||||||
SlowPeerReports.EMPTY_REPORT;
|
SlowPeerReports.EMPTY_REPORT;
|
||||||
final SlowDiskReports slowDisks =
|
final SlowDiskReports slowDisks =
|
||||||
|
@ -878,7 +878,7 @@ private int receivePacket() throws IOException {
|
|||||||
*/
|
*/
|
||||||
private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
|
private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
|
||||||
final DataNodePeerMetrics peerMetrics = datanode.getPeerMetrics();
|
final DataNodePeerMetrics peerMetrics = datanode.getPeerMetrics();
|
||||||
if (peerMetrics != null && isPenultimateNode) {
|
if (datanode.getDnConf().peerStatsEnabled && peerMetrics != null && isPenultimateNode) {
|
||||||
peerMetrics.addSendPacketDownstream(mirrorNameForMetrics, elapsedMs);
|
peerMetrics.addSendPacketDownstream(mirrorNameForMetrics, elapsedMs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1093,7 +1093,7 @@ private void initPerfMonitoring(DatanodeInfo[] downstreams) {
|
|||||||
if (downstreams != null && downstreams.length > 0) {
|
if (downstreams != null && downstreams.length > 0) {
|
||||||
downstreamDNs = downstreams;
|
downstreamDNs = downstreams;
|
||||||
isPenultimateNode = (downstreams.length == 1);
|
isPenultimateNode = (downstreams.length == 1);
|
||||||
if (isPenultimateNode && datanode.getPeerMetrics() != null) {
|
if (isPenultimateNode && datanode.getDnConf().peerStatsEnabled) {
|
||||||
mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ?
|
mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ?
|
||||||
downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr());
|
downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr());
|
||||||
LOG.debug("Will collect peer metrics for downstream node {}",
|
LOG.debug("Will collect peer metrics for downstream node {}",
|
||||||
|
@ -108,7 +108,7 @@ public class DNConf {
|
|||||||
private final long lifelineIntervalMs;
|
private final long lifelineIntervalMs;
|
||||||
volatile long blockReportInterval;
|
volatile long blockReportInterval;
|
||||||
volatile long blockReportSplitThreshold;
|
volatile long blockReportSplitThreshold;
|
||||||
final boolean peerStatsEnabled;
|
volatile boolean peerStatsEnabled;
|
||||||
final boolean diskStatsEnabled;
|
final boolean diskStatsEnabled;
|
||||||
final long outliersReportIntervalMs;
|
final long outliersReportIntervalMs;
|
||||||
final long ibrInterval;
|
final long ibrInterval;
|
||||||
@ -507,4 +507,8 @@ void setInitBRDelayMs(String delayMs) {
|
|||||||
dn.getConf().set(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, delayMs);
|
dn.getConf().set(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, delayMs);
|
||||||
initBlockReportDelay();
|
initBlockReportDelay();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setPeerStatsEnabled(boolean enablePeerStats) {
|
||||||
|
peerStatsEnabled = enablePeerStats;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -44,11 +44,19 @@
|
|||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
||||||
@ -319,7 +327,11 @@ public class DataNode extends ReconfigurableBase
|
|||||||
DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
|
DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
|
||||||
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
|
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
|
||||||
DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
|
DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
|
||||||
DFS_CACHEREPORT_INTERVAL_MSEC_KEY));
|
DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
|
||||||
|
DFS_DATANODE_PEER_STATS_ENABLED_KEY,
|
||||||
|
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
|
||||||
|
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
|
||||||
|
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY));
|
||||||
|
|
||||||
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
|
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
|
||||||
|
|
||||||
@ -361,7 +373,7 @@ public static InetSocketAddress createSocketAddr(String target) {
|
|||||||
|
|
||||||
DataNodeMetrics metrics;
|
DataNodeMetrics metrics;
|
||||||
@Nullable
|
@Nullable
|
||||||
private DataNodePeerMetrics peerMetrics;
|
private volatile DataNodePeerMetrics peerMetrics;
|
||||||
private DataNodeDiskMetrics diskMetrics;
|
private DataNodeDiskMetrics diskMetrics;
|
||||||
private InetSocketAddress streamingAddr;
|
private InetSocketAddress streamingAddr;
|
||||||
|
|
||||||
@ -633,6 +645,11 @@ public String reconfigurePropertyImpl(String property, String newVal)
|
|||||||
return reconfDataXceiverParameters(property, newVal);
|
return reconfDataXceiverParameters(property, newVal);
|
||||||
case DFS_CACHEREPORT_INTERVAL_MSEC_KEY:
|
case DFS_CACHEREPORT_INTERVAL_MSEC_KEY:
|
||||||
return reconfCacheReportParameters(property, newVal);
|
return reconfCacheReportParameters(property, newVal);
|
||||||
|
case DFS_DATANODE_PEER_STATS_ENABLED_KEY:
|
||||||
|
case DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY:
|
||||||
|
case DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY:
|
||||||
|
case DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY:
|
||||||
|
return reconfSlowPeerParameters(property, newVal);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -712,6 +729,53 @@ private String reconfBlockReportParameters(String property, String newVal)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String reconfSlowPeerParameters(String property, String newVal)
|
||||||
|
throws ReconfigurationException {
|
||||||
|
String result = null;
|
||||||
|
try {
|
||||||
|
LOG.info("Reconfiguring {} to {}", property, newVal);
|
||||||
|
if (property.equals(DFS_DATANODE_PEER_STATS_ENABLED_KEY)) {
|
||||||
|
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
|
||||||
|
if (newVal != null && !newVal.equalsIgnoreCase("true")
|
||||||
|
&& !newVal.equalsIgnoreCase("false")) {
|
||||||
|
throw new IllegalArgumentException("Not a valid Boolean value for " + property +
|
||||||
|
" in reconfSlowPeerParameters");
|
||||||
|
}
|
||||||
|
boolean enable = (newVal == null ? DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT :
|
||||||
|
Boolean.parseBoolean(newVal));
|
||||||
|
result = Boolean.toString(enable);
|
||||||
|
dnConf.setPeerStatsEnabled(enable);
|
||||||
|
if (enable) {
|
||||||
|
// Create if it doesn't exist, overwrite if it does.
|
||||||
|
peerMetrics = DataNodePeerMetrics.create(getDisplayName(), getConf());
|
||||||
|
}
|
||||||
|
} else if (property.equals(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY)) {
|
||||||
|
Preconditions.checkNotNull(peerMetrics, "DataNode peer stats may be disabled.");
|
||||||
|
long minNodes = (newVal == null ? DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT :
|
||||||
|
Long.parseLong(newVal));
|
||||||
|
result = Long.toString(minNodes);
|
||||||
|
peerMetrics.setMinOutlierDetectionNodes(minNodes);
|
||||||
|
} else if (property.equals(DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY)) {
|
||||||
|
Preconditions.checkNotNull(peerMetrics, "DataNode peer stats may be disabled.");
|
||||||
|
long threshold = (newVal == null ? DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT :
|
||||||
|
Long.parseLong(newVal));
|
||||||
|
result = Long.toString(threshold);
|
||||||
|
peerMetrics.setLowThresholdMs(threshold);
|
||||||
|
} else if (property.equals(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY)) {
|
||||||
|
Preconditions.checkNotNull(peerMetrics, "DataNode peer stats may be disabled.");
|
||||||
|
long minSamples = (newVal == null ?
|
||||||
|
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT :
|
||||||
|
Long.parseLong(newVal));
|
||||||
|
result = Long.toString(minSamples);
|
||||||
|
peerMetrics.setMinOutlierDetectionSamples(minSamples);
|
||||||
|
}
|
||||||
|
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
|
||||||
|
return result;
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a list of the keys of the re-configurable properties in configuration.
|
* Get a list of the keys of the re-configurable properties in configuration.
|
||||||
*/
|
*/
|
||||||
@ -3764,7 +3828,7 @@ void setBlockScanner(BlockScanner blockScanner) {
|
|||||||
|
|
||||||
@Override // DataNodeMXBean
|
@Override // DataNodeMXBean
|
||||||
public String getSendPacketDownstreamAvgInfo() {
|
public String getSendPacketDownstreamAvgInfo() {
|
||||||
return peerMetrics != null ?
|
return dnConf.peerStatsEnabled && peerMetrics != null ?
|
||||||
peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson() : null;
|
peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson() : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -341,7 +341,7 @@ public void run() {
|
|||||||
* the thread dies away.
|
* the thread dies away.
|
||||||
*/
|
*/
|
||||||
private void collectThreadLocalStates() {
|
private void collectThreadLocalStates() {
|
||||||
if (datanode.getPeerMetrics() != null) {
|
if (datanode.getDnConf().peerStatsEnabled && datanode.getPeerMetrics() != null) {
|
||||||
datanode.getPeerMetrics().collectThreadLocalStates();
|
datanode.getPeerMetrics().collectThreadLocalStates();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,18 +21,23 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.metrics2.MetricsJsonBuilder;
|
import org.apache.hadoop.metrics2.MetricsJsonBuilder;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
|
import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
|
||||||
|
import org.apache.hadoop.util.Preconditions;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
|
* This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
|
||||||
@ -57,15 +62,15 @@ public class DataNodePeerMetrics {
|
|||||||
* for outlier detection. If the number of samples is below this then
|
* for outlier detection. If the number of samples is below this then
|
||||||
* outlier detection is skipped.
|
* outlier detection is skipped.
|
||||||
*/
|
*/
|
||||||
private final long minOutlierDetectionSamples;
|
private volatile long minOutlierDetectionSamples;
|
||||||
/**
|
/**
|
||||||
* Threshold in milliseconds below which a DataNode is definitely not slow.
|
* Threshold in milliseconds below which a DataNode is definitely not slow.
|
||||||
*/
|
*/
|
||||||
private final long lowThresholdMs;
|
private volatile long lowThresholdMs;
|
||||||
/**
|
/**
|
||||||
* Minimum number of nodes to run outlier detection.
|
* Minimum number of nodes to run outlier detection.
|
||||||
*/
|
*/
|
||||||
private final long minOutlierDetectionNodes;
|
private volatile long minOutlierDetectionNodes;
|
||||||
|
|
||||||
public DataNodePeerMetrics(final String name, Configuration conf) {
|
public DataNodePeerMetrics(final String name, Configuration conf) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
@ -73,11 +78,11 @@ public DataNodePeerMetrics(final String name, Configuration conf) {
|
|||||||
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY,
|
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY,
|
||||||
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT);
|
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT);
|
||||||
lowThresholdMs =
|
lowThresholdMs =
|
||||||
conf.getLong(DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
|
conf.getLong(DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT);
|
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT);
|
||||||
minOutlierDetectionNodes =
|
minOutlierDetectionNodes =
|
||||||
conf.getLong(DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
|
conf.getLong(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT);
|
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT);
|
||||||
this.slowNodeDetector =
|
this.slowNodeDetector =
|
||||||
new OutlierDetector(minOutlierDetectionNodes, lowThresholdMs);
|
new OutlierDetector(minOutlierDetectionNodes, lowThresholdMs);
|
||||||
sendPacketDownstreamRollingAverages = new MutableRollingAverages("Time");
|
sendPacketDownstreamRollingAverages = new MutableRollingAverages("Time");
|
||||||
@ -87,7 +92,7 @@ public String name() {
|
|||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
long getMinOutlierDetectionSamples() {
|
public long getMinOutlierDetectionSamples() {
|
||||||
return minOutlierDetectionSamples;
|
return minOutlierDetectionSamples;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -150,4 +155,38 @@ public Map<String, Double> getOutliers() {
|
|||||||
public MutableRollingAverages getSendPacketDownstreamRollingAverages() {
|
public MutableRollingAverages getSendPacketDownstreamRollingAverages() {
|
||||||
return sendPacketDownstreamRollingAverages;
|
return sendPacketDownstreamRollingAverages;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setMinOutlierDetectionNodes(long minNodes) {
|
||||||
|
Preconditions.checkArgument(minNodes > 0,
|
||||||
|
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY + " should be larger than 0");
|
||||||
|
minOutlierDetectionNodes = minNodes;
|
||||||
|
this.slowNodeDetector.setMinNumResources(minNodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMinOutlierDetectionNodes() {
|
||||||
|
return minOutlierDetectionNodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLowThresholdMs(long thresholdMs) {
|
||||||
|
Preconditions.checkArgument(thresholdMs > 0,
|
||||||
|
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY + " should be larger than 0");
|
||||||
|
lowThresholdMs = thresholdMs;
|
||||||
|
this.slowNodeDetector.setLowThresholdMs(thresholdMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLowThresholdMs() {
|
||||||
|
return lowThresholdMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMinOutlierDetectionSamples(long minSamples) {
|
||||||
|
Preconditions.checkArgument(minSamples > 0,
|
||||||
|
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY +
|
||||||
|
" should be larger than 0");
|
||||||
|
minOutlierDetectionSamples = minSamples;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public OutlierDetector getSlowNodeDetector() {
|
||||||
|
return this.slowNodeDetector;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -60,7 +60,7 @@ public class OutlierDetector {
|
|||||||
/**
|
/**
|
||||||
* Minimum number of resources to run outlier detection.
|
* Minimum number of resources to run outlier detection.
|
||||||
*/
|
*/
|
||||||
private final long minNumResources;
|
private volatile long minNumResources;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The multiplier is from Leys, C. et al.
|
* The multiplier is from Leys, C. et al.
|
||||||
@ -70,7 +70,7 @@ public class OutlierDetector {
|
|||||||
/**
|
/**
|
||||||
* Threshold in milliseconds below which a node/ disk is definitely not slow.
|
* Threshold in milliseconds below which a node/ disk is definitely not slow.
|
||||||
*/
|
*/
|
||||||
private final long lowThresholdMs;
|
private volatile long lowThresholdMs;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deviation multiplier. A sample is considered to be an outlier if it
|
* Deviation multiplier. A sample is considered to be an outlier if it
|
||||||
@ -180,4 +180,20 @@ public static Double computeMedian(List<Double> sortedValues) {
|
|||||||
}
|
}
|
||||||
return median;
|
return median;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setMinNumResources(long minNodes) {
|
||||||
|
minNumResources = minNodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMinOutlierDetectionNodes() {
|
||||||
|
return minNumResources;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLowThresholdMs(long thresholdMs) {
|
||||||
|
lowThresholdMs = thresholdMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLowThresholdMs() {
|
||||||
|
return lowThresholdMs;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,7 +28,14 @@
|
|||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
@ -45,6 +52,7 @@
|
|||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -84,6 +92,7 @@ public void tearDown() throws Exception {
|
|||||||
private void startDFSCluster(int numNameNodes, int numDataNodes)
|
private void startDFSCluster(int numNameNodes, int numDataNodes)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
|
||||||
|
|
||||||
MiniDFSNNTopology nnTopology = MiniDFSNNTopology
|
MiniDFSNNTopology nnTopology = MiniDFSNNTopology
|
||||||
.simpleFederatedTopology(numNameNodes);
|
.simpleFederatedTopology(numNameNodes);
|
||||||
@ -467,4 +476,82 @@ public void testCacheReportReconfiguration()
|
|||||||
dn.getConf().get(DFS_CACHEREPORT_INTERVAL_MSEC_KEY));
|
dn.getConf().get(DFS_CACHEREPORT_INTERVAL_MSEC_KEY));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSlowPeerParameters() throws Exception {
|
||||||
|
String[] slowPeersParameters = {
|
||||||
|
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
|
||||||
|
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
|
||||||
|
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY};
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_DATA_NODE; i++) {
|
||||||
|
DataNode dn = cluster.getDataNodes().get(i);
|
||||||
|
|
||||||
|
// Try invalid values.
|
||||||
|
LambdaTestUtils.intercept(ReconfigurationException.class,
|
||||||
|
"Could not change property dfs.datanode.peer.stats.enabled from 'true' to 'text'",
|
||||||
|
() -> dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "text"));
|
||||||
|
|
||||||
|
for (String parameter : slowPeersParameters) {
|
||||||
|
try {
|
||||||
|
dn.reconfigureProperty(parameter, "text");
|
||||||
|
fail("ReconfigurationException expected");
|
||||||
|
} catch (ReconfigurationException expected) {
|
||||||
|
assertTrue("expecting NumberFormatException",
|
||||||
|
expected.getCause() instanceof NumberFormatException);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
dn.reconfigureProperty(parameter, String.valueOf(-1));
|
||||||
|
fail("ReconfigurationException expected");
|
||||||
|
} catch (ReconfigurationException expected) {
|
||||||
|
assertTrue("expecting IllegalArgumentException",
|
||||||
|
expected.getCause() instanceof IllegalArgumentException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Change and verify properties.
|
||||||
|
dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "false");
|
||||||
|
assertFalse(dn.getDnConf().peerStatsEnabled);
|
||||||
|
|
||||||
|
// Reset DFS_DATANODE_PEER_STATS_ENABLED_KEY to true.
|
||||||
|
dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true");
|
||||||
|
for (String parameter : slowPeersParameters) {
|
||||||
|
dn.reconfigureProperty(parameter, "123");
|
||||||
|
}
|
||||||
|
assertEquals(123, dn.getPeerMetrics().getMinOutlierDetectionNodes());
|
||||||
|
assertEquals(123, dn.getPeerMetrics().getLowThresholdMs());
|
||||||
|
assertEquals(123, dn.getPeerMetrics().getMinOutlierDetectionSamples());
|
||||||
|
assertEquals(123,
|
||||||
|
dn.getPeerMetrics().getSlowNodeDetector().getMinOutlierDetectionNodes());
|
||||||
|
assertEquals(123,
|
||||||
|
dn.getPeerMetrics().getSlowNodeDetector().getLowThresholdMs());
|
||||||
|
|
||||||
|
// Revert to default and verify.
|
||||||
|
dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, null);
|
||||||
|
assertEquals(String.format("expect %s is not configured",
|
||||||
|
DFS_DATANODE_PEER_STATS_ENABLED_KEY), null,
|
||||||
|
dn.getConf().get(DFS_DATANODE_PEER_STATS_ENABLED_KEY));
|
||||||
|
|
||||||
|
// Reset DFS_DATANODE_PEER_STATS_ENABLED_KEY to true.
|
||||||
|
dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true");
|
||||||
|
|
||||||
|
for (String parameter : slowPeersParameters) {
|
||||||
|
dn.reconfigureProperty(parameter, null);
|
||||||
|
}
|
||||||
|
assertEquals(String.format("expect %s is not configured",
|
||||||
|
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY), null,
|
||||||
|
dn.getConf().get(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY));
|
||||||
|
assertEquals(String.format("expect %s is not configured",
|
||||||
|
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY), null,
|
||||||
|
dn.getConf().get(DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY));
|
||||||
|
assertEquals(String.format("expect %s is not configured",
|
||||||
|
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY), null,
|
||||||
|
dn.getConf().get(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY));
|
||||||
|
assertEquals(dn.getPeerMetrics().getSlowNodeDetector().getMinOutlierDetectionNodes(),
|
||||||
|
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT);
|
||||||
|
assertEquals(dn.getPeerMetrics().getSlowNodeDetector().getLowThresholdMs(),
|
||||||
|
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -330,7 +330,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException {
|
|||||||
final List<String> outs = Lists.newArrayList();
|
final List<String> outs = Lists.newArrayList();
|
||||||
final List<String> errs = Lists.newArrayList();
|
final List<String> errs = Lists.newArrayList();
|
||||||
getReconfigurableProperties("datanode", address, outs, errs);
|
getReconfigurableProperties("datanode", address, outs, errs);
|
||||||
assertEquals(8, outs.size());
|
assertEquals(12, outs.size());
|
||||||
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
|
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user