HDFS-10959. Adding per disk IO statistics and metrics in DataNode. Contributed by Xiaoyu Yao.
This commit is contained in:
parent
575773a357
commit
fe4ff64a4a
@ -0,0 +1,110 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* {@link FileIoEvents} that profiles the performance of the metadata and data
|
||||
* related operations on datanode volumes.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class ProfilingFileIoEvents implements FileIoEvents {
|
||||
|
||||
@Override
|
||||
public long beforeMetadataOp(@Nullable FsVolumeSpi volume,
|
||||
FileIoProvider.OPERATION op) {
|
||||
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||
if (metrics != null) {
|
||||
return Time.monotonicNow();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterMetadataOp(@Nullable FsVolumeSpi volume,
|
||||
FileIoProvider.OPERATION op, long begin) {
|
||||
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||
if (metrics != null) {
|
||||
metrics.addMetadastaOperationLatency(Time.monotonicNow() - begin);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long beforeFileIo(@Nullable FsVolumeSpi volume,
|
||||
FileIoProvider.OPERATION op, long len) {
|
||||
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||
if (metrics != null) {
|
||||
return Time.monotonicNow();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterFileIo(@Nullable FsVolumeSpi volume,
|
||||
FileIoProvider.OPERATION op, long begin, long len) {
|
||||
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||
if (metrics != null) {
|
||||
long latency = Time.monotonicNow() - begin;
|
||||
metrics.addDataFileIoLatency(latency);
|
||||
switch (op) {
|
||||
case SYNC:
|
||||
metrics.addSyncIoLatency(latency);
|
||||
break;
|
||||
case FLUSH:
|
||||
metrics.addFlushIoLatency(latency);
|
||||
break;
|
||||
case READ:
|
||||
metrics.addReadIoLatency(latency);
|
||||
break;
|
||||
case WRITE:
|
||||
metrics.addWriteIoLatency(latency);
|
||||
break;
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@Nullable FsVolumeSpi volume,
|
||||
FileIoProvider.OPERATION op, Exception e, long begin) {
|
||||
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||
if (metrics != null) {
|
||||
metrics.addFileIoError(Time.monotonicNow() - begin);
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String getStatistics() {
|
||||
return null;
|
||||
}
|
||||
|
||||
private DataNodeVolumeMetrics getVolumeMetrics(final FsVolumeSpi volume) {
|
||||
if (volume != null) {
|
||||
return volume.getMetrics();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
@ -0,0 +1,289 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
|
||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
/**
|
||||
* This class is for maintaining Datanode Volume IO related statistics and
|
||||
* publishing them through the metrics interfaces.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
@Metrics(name = "DataNodeVolume", about = "DataNode Volume metrics",
|
||||
context = "dfs")
|
||||
public class DataNodeVolumeMetrics {
|
||||
private final MetricsRegistry registry = new MetricsRegistry("FsVolume");
|
||||
|
||||
@Metric("number of metadata operations")
|
||||
private MutableCounterLong totalMetadataOperations;
|
||||
@Metric("metadata operation rate")
|
||||
private MutableRate metadataOperationRate;
|
||||
private MutableQuantiles[] metadataOperationLatencyQuantiles;
|
||||
|
||||
@Metric("number of data file io operations")
|
||||
private MutableCounterLong totalDataFileIos;
|
||||
@Metric("data file io operation rate")
|
||||
private MutableRate dataFileIoRate;
|
||||
private MutableQuantiles[] dataFileIoLatencyQuantiles;
|
||||
|
||||
@Metric("file io flush rate")
|
||||
private MutableRate flushIoRate;
|
||||
private MutableQuantiles[] flushIoLatencyQuantiles;
|
||||
|
||||
@Metric("file io sync rate")
|
||||
private MutableRate syncIoRate;
|
||||
private MutableQuantiles[] syncIoLatencyQuantiles;
|
||||
|
||||
@Metric("file io read rate")
|
||||
private MutableRate readIoRate;
|
||||
private MutableQuantiles[] readIoLatencyQuantiles;
|
||||
|
||||
@Metric("file io write rate")
|
||||
private MutableRate writeIoRate;
|
||||
private MutableQuantiles[] writeIoLatencyQuantiles;
|
||||
|
||||
@Metric("number of file io errors")
|
||||
private MutableCounterLong totalFileIoErrors;
|
||||
@Metric("file io error rate")
|
||||
private MutableRate fileIoErrorRate;
|
||||
|
||||
public long getTotalMetadataOperations() {
|
||||
return totalMetadataOperations.value();
|
||||
}
|
||||
|
||||
// Based on metadataOperationRate
|
||||
public long getMetadataOperationSampleCount() {
|
||||
return metadataOperationRate.lastStat().numSamples();
|
||||
}
|
||||
|
||||
public double getMetadataOperationMean() {
|
||||
return metadataOperationRate.lastStat().mean();
|
||||
}
|
||||
|
||||
public double getMetadataOperationStdDev() {
|
||||
return metadataOperationRate.lastStat().stddev();
|
||||
}
|
||||
|
||||
public long getTotalDataFileIos() {
|
||||
return totalDataFileIos.value();
|
||||
}
|
||||
|
||||
// Based on dataFileIoRate
|
||||
public long getDataFileIoSampleCount() {
|
||||
return dataFileIoRate.lastStat().numSamples();
|
||||
}
|
||||
|
||||
public double getDataFileIoMean() {
|
||||
return dataFileIoRate.lastStat().mean();
|
||||
}
|
||||
|
||||
public double getDataFileIoStdDev() {
|
||||
return dataFileIoRate.lastStat().stddev();
|
||||
}
|
||||
|
||||
// Based on flushIoRate
|
||||
public long getFlushIoSampleCount() {
|
||||
return flushIoRate.lastStat().numSamples();
|
||||
}
|
||||
|
||||
public double getFlushIoMean() {
|
||||
return flushIoRate.lastStat().mean();
|
||||
}
|
||||
|
||||
public double getFlushIoStdDev() {
|
||||
return flushIoRate.lastStat().stddev();
|
||||
}
|
||||
|
||||
// Based on syncIoRate
|
||||
public long getSyncIoSampleCount() {
|
||||
return syncIoRate.lastStat().numSamples();
|
||||
}
|
||||
|
||||
public double getSyncIoMean() {
|
||||
return syncIoRate.lastStat().mean();
|
||||
}
|
||||
|
||||
public double getSyncIoStdDev() {
|
||||
return syncIoRate.lastStat().stddev();
|
||||
}
|
||||
|
||||
// Based on readIoRate
|
||||
public long getReadIoSampleCount() {
|
||||
return readIoRate.lastStat().numSamples();
|
||||
}
|
||||
|
||||
public double getReadIoMean() {
|
||||
return readIoRate.lastStat().mean();
|
||||
}
|
||||
|
||||
public double getReadIoStdDev() {
|
||||
return readIoRate.lastStat().stddev();
|
||||
}
|
||||
|
||||
// Based on writeIoRate
|
||||
public long getWriteIoSampleCount() {
|
||||
return syncIoRate.lastStat().numSamples();
|
||||
}
|
||||
|
||||
public double getWriteIoMean() {
|
||||
return syncIoRate.lastStat().mean();
|
||||
}
|
||||
|
||||
public double getWriteIoStdDev() {
|
||||
return syncIoRate.lastStat().stddev();
|
||||
}
|
||||
|
||||
public long getTotalFileIoErrors() {
|
||||
return totalFileIoErrors.value();
|
||||
}
|
||||
|
||||
// Based on fileIoErrorRate
|
||||
public long getFileIoErrorSampleCount() {
|
||||
return fileIoErrorRate.lastStat().numSamples();
|
||||
}
|
||||
|
||||
public double getFileIoErrorMean() {
|
||||
return fileIoErrorRate.lastStat().mean();
|
||||
}
|
||||
|
||||
public double getFileIoErrorStdDev() {
|
||||
return fileIoErrorRate.lastStat().stddev();
|
||||
}
|
||||
|
||||
private final String name;
|
||||
private final MetricsSystem ms;
|
||||
|
||||
public DataNodeVolumeMetrics(final MetricsSystem metricsSystem,
|
||||
final String volumeName, final int[] intervals) {
|
||||
this.ms = metricsSystem;
|
||||
this.name = volumeName;
|
||||
final int len = intervals.length;
|
||||
metadataOperationLatencyQuantiles = new MutableQuantiles[len];
|
||||
dataFileIoLatencyQuantiles = new MutableQuantiles[len];
|
||||
flushIoLatencyQuantiles = new MutableQuantiles[len];
|
||||
syncIoLatencyQuantiles = new MutableQuantiles[len];
|
||||
readIoLatencyQuantiles = new MutableQuantiles[len];
|
||||
writeIoLatencyQuantiles = new MutableQuantiles[len];
|
||||
for (int i = 0; i < len; i++) {
|
||||
int interval = intervals[i];
|
||||
metadataOperationLatencyQuantiles[i] = registry.newQuantiles(
|
||||
"metadataOperationLatency" + interval + "s",
|
||||
"Meatadata Operation Latency in ms", "ops", "latency", interval);
|
||||
dataFileIoLatencyQuantiles[i] = registry.newQuantiles(
|
||||
"dataFileIoLatency" + interval + "s",
|
||||
"Data File Io Latency in ms", "ops", "latency", interval);
|
||||
flushIoLatencyQuantiles[i] = registry.newQuantiles(
|
||||
"flushIoLatency" + interval + "s",
|
||||
"Data flush Io Latency in ms", "ops", "latency", interval);
|
||||
syncIoLatencyQuantiles[i] = registry.newQuantiles(
|
||||
"syncIoLatency" + interval + "s",
|
||||
"Data sync Io Latency in ms", "ops", "latency", interval);
|
||||
readIoLatencyQuantiles[i] = registry.newQuantiles(
|
||||
"readIoLatency" + interval + "s",
|
||||
"Data read Io Latency in ms", "ops", "latency", interval);
|
||||
writeIoLatencyQuantiles[i] = registry.newQuantiles(
|
||||
"writeIoLatency" + interval + "s",
|
||||
"Data write Io Latency in ms", "ops", "latency", interval);
|
||||
}
|
||||
}
|
||||
|
||||
public static DataNodeVolumeMetrics create(final Configuration conf,
|
||||
final String volumeName) {
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
String name = "DataNodeVolume-"+ (volumeName.isEmpty()
|
||||
? "UndefinedDataNodeVolume"+ ThreadLocalRandom.current().nextInt()
|
||||
: volumeName.replace(':', '-'));
|
||||
|
||||
// Percentile measurement is off by default, by watching no intervals
|
||||
int[] intervals =
|
||||
conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
|
||||
return ms.register(name, null, new DataNodeVolumeMetrics(ms, name,
|
||||
intervals));
|
||||
}
|
||||
|
||||
public String name() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void unRegister() {
|
||||
ms.unregisterSource(name);
|
||||
}
|
||||
|
||||
public void addMetadastaOperationLatency(final long latency) {
|
||||
totalMetadataOperations.incr();
|
||||
metadataOperationRate.add(latency);
|
||||
for (MutableQuantiles q : metadataOperationLatencyQuantiles) {
|
||||
q.add(latency);
|
||||
}
|
||||
}
|
||||
|
||||
public void addDataFileIoLatency(final long latency) {
|
||||
totalDataFileIos.incr();
|
||||
dataFileIoRate.add(latency);
|
||||
for (MutableQuantiles q : dataFileIoLatencyQuantiles) {
|
||||
q.add(latency);
|
||||
}
|
||||
}
|
||||
|
||||
public void addSyncIoLatency(final long latency) {
|
||||
syncIoRate.add(latency);
|
||||
for (MutableQuantiles q : syncIoLatencyQuantiles) {
|
||||
q.add(latency);
|
||||
}
|
||||
}
|
||||
|
||||
public void addFlushIoLatency(final long latency) {
|
||||
flushIoRate.add(latency);
|
||||
for (MutableQuantiles q : flushIoLatencyQuantiles) {
|
||||
q.add(latency);
|
||||
}
|
||||
}
|
||||
|
||||
public void addReadIoLatency(final long latency) {
|
||||
readIoRate.add(latency);
|
||||
for (MutableQuantiles q : readIoLatencyQuantiles) {
|
||||
q.add(latency);
|
||||
}
|
||||
}
|
||||
|
||||
public void addWriteIoLatency(final long latency) {
|
||||
writeIoRate.add(latency);
|
||||
for (MutableQuantiles q: writeIoLatencyQuantiles) {
|
||||
q.add(latency);
|
||||
}
|
||||
}
|
||||
|
||||
public void addFileIoError(final long latency) {
|
||||
totalFileIoErrors.incr();
|
||||
metadataOperationRate.add(latency);
|
||||
}
|
||||
}
|
@ -432,4 +432,6 @@ class VolumeCheckContext {
|
||||
}
|
||||
|
||||
FileIoProvider getFileIoProvider();
|
||||
|
||||
DataNodeVolumeMetrics getMetrics();
|
||||
}
|
||||
|
@ -48,6 +48,7 @@
|
||||
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
@ -130,6 +131,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
||||
// query from the filesystem.
|
||||
protected volatile long configuredCapacity;
|
||||
private final FileIoProvider fileIoProvider;
|
||||
private final DataNodeVolumeMetrics metrics;
|
||||
|
||||
/**
|
||||
* Per-volume worker pool that processes new blocks to cache.
|
||||
@ -163,6 +165,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
||||
this.conf = conf;
|
||||
this.fileIoProvider = fileIoProvider;
|
||||
cacheExecutor = initializeCacheExecutor(parent);
|
||||
this.metrics = DataNodeVolumeMetrics.create(conf, getBaseURI().getPath());
|
||||
}
|
||||
|
||||
protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
|
||||
@ -1008,6 +1011,9 @@ void shutdown() {
|
||||
for (Entry<String, BlockPoolSlice> entry : set) {
|
||||
entry.getValue().shutdown(null);
|
||||
}
|
||||
if (metrics != null) {
|
||||
metrics.unRegister();
|
||||
}
|
||||
}
|
||||
|
||||
void addBlockPool(String bpid, Configuration c) throws IOException {
|
||||
@ -1305,6 +1311,11 @@ public FileIoProvider getFileIoProvider() {
|
||||
return fileIoProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataNodeVolumeMetrics getMetrics() {
|
||||
return metrics;
|
||||
}
|
||||
|
||||
private LinkedList<ScanInfo> compileReport(File bpFinalizedDir,
|
||||
File dir, LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
|
||||
throws InterruptedException {
|
||||
|
@ -22,7 +22,11 @@
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.ReconfigurationException;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
@ -34,6 +38,9 @@
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
|
||||
@ -43,6 +50,8 @@
|
||||
* dependencies to {@link MiniDFSCluster}.
|
||||
*/
|
||||
public class DataNodeTestUtils {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(DataNodeTestUtils.class);
|
||||
private static final String DIR_FAILURE_SUFFIX = ".origin";
|
||||
|
||||
public final static String TEST_CLUSTER_ID = "testClusterID";
|
||||
@ -203,4 +212,34 @@ public Object answer(InvocationOnMock invocation) throws IOException {
|
||||
}
|
||||
}).when(dn.data).getPinning(any(ExtendedBlock.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconfigure a DataNode by setting a new list of volumes.
|
||||
*
|
||||
* @param dn DataNode to reconfigure
|
||||
* @param newVols new volumes to configure
|
||||
* @throws Exception if there is any failure
|
||||
*/
|
||||
public static void reconfigureDataNode(DataNode dn, File... newVols)
|
||||
throws Exception {
|
||||
StringBuilder dnNewDataDirs = new StringBuilder();
|
||||
for (File newVol: newVols) {
|
||||
if (dnNewDataDirs.length() > 0) {
|
||||
dnNewDataDirs.append(',');
|
||||
}
|
||||
dnNewDataDirs.append(newVol.getAbsolutePath());
|
||||
}
|
||||
try {
|
||||
assertThat(
|
||||
dn.reconfigurePropertyImpl(
|
||||
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
||||
dnNewDataDirs.toString()),
|
||||
is(dn.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)));
|
||||
} catch (ReconfigurationException e) {
|
||||
// This can be thrown if reconfiguration tries to use a failed volume.
|
||||
// We need to swallow the exception, because some of our tests want to
|
||||
// cover this case.
|
||||
LOG.warn("Could not reconfigure DataNode.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -50,6 +50,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
@ -476,11 +477,14 @@ false, getCapacity(), getUsed(), getFree(),
|
||||
static class SimulatedVolume implements FsVolumeSpi {
|
||||
private final SimulatedStorage storage;
|
||||
private final FileIoProvider fileIoProvider;
|
||||
private final DataNodeVolumeMetrics metrics;
|
||||
|
||||
SimulatedVolume(final SimulatedStorage storage,
|
||||
final FileIoProvider fileIoProvider) {
|
||||
final FileIoProvider fileIoProvider,
|
||||
final DataNodeVolumeMetrics metrics) {
|
||||
this.storage = storage;
|
||||
this.fileIoProvider = fileIoProvider;
|
||||
this.metrics = metrics;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -574,6 +578,11 @@ public FileIoProvider getFileIoProvider() {
|
||||
return fileIoProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataNodeVolumeMetrics getMetrics() {
|
||||
return metrics;
|
||||
}
|
||||
|
||||
@Override
|
||||
public VolumeCheckResult check(VolumeCheckContext context)
|
||||
throws Exception {
|
||||
@ -609,7 +618,12 @@ public SimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration
|
||||
this.storage = new SimulatedStorage(
|
||||
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
|
||||
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
|
||||
this.volume = new SimulatedVolume(this.storage, this.fileIoProvider);
|
||||
|
||||
// TODO: per volume id or path
|
||||
DataNodeVolumeMetrics volumeMetrics = DataNodeVolumeMetrics.create(conf,
|
||||
datanodeUuid);
|
||||
this.volume = new SimulatedVolume(this.storage, this.fileIoProvider,
|
||||
volumeMetrics);
|
||||
this.datasetLock = new AutoCloseableLock();
|
||||
}
|
||||
|
||||
|
@ -20,12 +20,10 @@
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
@ -36,7 +34,6 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.ReconfigurationException;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
@ -61,7 +58,8 @@
|
||||
*/
|
||||
public class TestDataNodeVolumeFailureReporting {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestDataNodeVolumeFailureReporting.class);
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestDataNodeVolumeFailureReporting.class);
|
||||
{
|
||||
GenericTestUtils.setLogLevel(TestDataNodeVolumeFailureReporting.LOG,
|
||||
Level.ALL);
|
||||
@ -389,8 +387,8 @@ public void testDataNodeReconfigureWithVolumeFailures() throws Exception {
|
||||
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
|
||||
|
||||
// Reconfigure again to try to add back the failed volumes.
|
||||
reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
|
||||
reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
|
||||
DataNodeTestUtils.reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
|
||||
DataNodeTestUtils.reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
|
||||
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
|
||||
@ -410,8 +408,8 @@ public void testDataNodeReconfigureWithVolumeFailures() throws Exception {
|
||||
|
||||
// Reconfigure a third time with the failed volumes. Afterwards, we expect
|
||||
// the same volume failures to be reported. (No double-counting.)
|
||||
reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
|
||||
reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
|
||||
DataNodeTestUtils.reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
|
||||
DataNodeTestUtils.reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
|
||||
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
|
||||
@ -432,8 +430,8 @@ public void testDataNodeReconfigureWithVolumeFailures() throws Exception {
|
||||
// Replace failed volume with healthy volume and run reconfigure DataNode.
|
||||
// The failed volume information should be cleared.
|
||||
DataNodeTestUtils.restoreDataDirFromFailure(dn1Vol1, dn2Vol1);
|
||||
reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
|
||||
reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
|
||||
DataNodeTestUtils.reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
|
||||
DataNodeTestUtils.reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
|
||||
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
|
||||
@ -670,34 +668,4 @@ private void initCluster(int numDataNodes, int storagesPerDatanode,
|
||||
cluster.getNamesystem().getBlockManager().getDatanodeManager(), 0);
|
||||
volumeCapacity = dnCapacity / cluster.getStoragesPerDatanode();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconfigure a DataNode by setting a new list of volumes.
|
||||
*
|
||||
* @param dn DataNode to reconfigure
|
||||
* @param newVols new volumes to configure
|
||||
* @throws Exception if there is any failure
|
||||
*/
|
||||
private static void reconfigureDataNode(DataNode dn, File... newVols)
|
||||
throws Exception {
|
||||
StringBuilder dnNewDataDirs = new StringBuilder();
|
||||
for (File newVol: newVols) {
|
||||
if (dnNewDataDirs.length() > 0) {
|
||||
dnNewDataDirs.append(',');
|
||||
}
|
||||
dnNewDataDirs.append(newVol.getAbsolutePath());
|
||||
}
|
||||
try {
|
||||
assertThat(
|
||||
dn.reconfigurePropertyImpl(
|
||||
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
||||
dnNewDataDirs.toString()),
|
||||
is(dn.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)));
|
||||
} catch (ReconfigurationException e) {
|
||||
// This can be thrown if reconfiguration tries to use a failed volume.
|
||||
// We need to swallow the exception, because some of our tests want to
|
||||
// cover this case.
|
||||
LOG.warn("Could not reconfigure DataNode.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,182 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
/**
|
||||
* Test class for DataNodeVolumeMetrics.
|
||||
*/
|
||||
public class TestDataNodeVolumeMetrics {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestDataNodeVolumeMetrics.class);
|
||||
|
||||
private static final int BLOCK_SIZE = 1024;
|
||||
private static final short REPL = 1;
|
||||
private static final int NUM_DATANODES = 1;
|
||||
|
||||
@Rule
|
||||
public Timeout timeout = new Timeout(300000);
|
||||
|
||||
@Test
|
||||
public void testVolumeMetrics() throws Exception {
|
||||
MiniDFSCluster cluster = setupClusterForVolumeMetrics();
|
||||
try {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
final Path fileName = new Path("/test.dat");
|
||||
final long fileLen = Integer.MAX_VALUE + 1L;
|
||||
DFSTestUtil.createFile(fs, fileName, false, BLOCK_SIZE, fileLen,
|
||||
fs.getDefaultBlockSize(fileName),
|
||||
REPL, 1L, true);
|
||||
|
||||
try (FSDataOutputStream out = fs.append(fileName)) {
|
||||
out.writeBytes("hello world");
|
||||
((DFSOutputStream) out.getWrappedStream()).hsync();
|
||||
}
|
||||
|
||||
verifyDataNodeVolumeMetrics(fs, cluster, fileName);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVolumeMetricsWithVolumeDepartureArrival() throws Exception {
|
||||
MiniDFSCluster cluster = setupClusterForVolumeMetrics();
|
||||
try {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
final Path fileName = new Path("/test.dat");
|
||||
final long fileLen = Integer.MAX_VALUE + 1L;
|
||||
DFSTestUtil.createFile(fs, fileName, false, BLOCK_SIZE, fileLen,
|
||||
fs.getDefaultBlockSize(fileName),
|
||||
REPL, 1L, true);
|
||||
|
||||
try (FSDataOutputStream out = fs.append(fileName)) {
|
||||
out.writeBytes("hello world");
|
||||
((DFSOutputStream) out.getWrappedStream()).hsync();
|
||||
}
|
||||
|
||||
ArrayList<DataNode> dns = cluster.getDataNodes();
|
||||
assertTrue("DN1 should be up", dns.get(0).isDatanodeUp());
|
||||
|
||||
final String dataDir = cluster.getDataDirectory();
|
||||
final File dn1Vol2 = new File(dataDir, "data2");
|
||||
|
||||
DataNodeTestUtils.injectDataDirFailure(dn1Vol2);
|
||||
verifyDataNodeVolumeMetrics(fs, cluster, fileName);
|
||||
|
||||
DataNodeTestUtils.restoreDataDirFromFailure(dn1Vol2);
|
||||
DataNodeTestUtils.reconfigureDataNode(dns.get(0), dn1Vol2);
|
||||
verifyDataNodeVolumeMetrics(fs, cluster, fileName);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private MiniDFSCluster setupClusterForVolumeMetrics() throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.set(DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY,
|
||||
"org.apache.hadoop.hdfs.server.datanode.ProfilingFileIoEvents");
|
||||
SimulatedFSDataset.setFactory(conf);
|
||||
return new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(NUM_DATANODES)
|
||||
.storageTypes(new StorageType[]{StorageType.RAM_DISK, StorageType.DISK})
|
||||
.storagesPerDatanode(2)
|
||||
.build();
|
||||
}
|
||||
|
||||
private void verifyDataNodeVolumeMetrics(final FileSystem fs,
|
||||
final MiniDFSCluster cluster, final Path fileName) throws IOException {
|
||||
List<DataNode> datanodes = cluster.getDataNodes();
|
||||
DataNode datanode = datanodes.get(0);
|
||||
|
||||
final ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
|
||||
final FsVolumeSpi volume = datanode.getFSDataset().getVolume(block);
|
||||
DataNodeVolumeMetrics metrics = volume.getMetrics();
|
||||
|
||||
MetricsRecordBuilder rb = getMetrics(volume.getMetrics().name());
|
||||
assertCounter("TotalDataFileIos", metrics.getTotalDataFileIos(), rb);
|
||||
|
||||
LOG.info("TotalMetadataOperations : " +
|
||||
metrics.getTotalMetadataOperations());
|
||||
LOG.info("TotalDataFileIos : " + metrics.getTotalDataFileIos());
|
||||
LOG.info("TotalFileIoErrors : " + metrics.getTotalFileIoErrors());
|
||||
|
||||
LOG.info("MetadataOperationSampleCount : " +
|
||||
metrics.getMetadataOperationSampleCount());
|
||||
LOG.info("MetadataOperationMean : " + metrics.getMetadataOperationMean());
|
||||
LOG.info("MetadataFileIoStdDev : " +
|
||||
metrics.getMetadataOperationStdDev());
|
||||
|
||||
LOG.info("DataFileIoSampleCount : " + metrics.getDataFileIoSampleCount());
|
||||
LOG.info("DataFileIoMean : " + metrics.getDataFileIoMean());
|
||||
LOG.info("DataFileIoStdDev : " + metrics.getDataFileIoStdDev());
|
||||
|
||||
LOG.info("flushIoSampleCount : " + metrics.getFlushIoSampleCount());
|
||||
LOG.info("flushIoMean : " + metrics.getFlushIoMean());
|
||||
LOG.info("flushIoStdDev : " + metrics.getFlushIoStdDev());
|
||||
|
||||
LOG.info("syncIoSampleCount : " + metrics.getSyncIoSampleCount());
|
||||
LOG.info("syncIoMean : " + metrics.getSyncIoMean());
|
||||
LOG.info("syncIoStdDev : " + metrics.getSyncIoStdDev());
|
||||
|
||||
LOG.info("readIoSampleCount : " + metrics.getReadIoMean());
|
||||
LOG.info("readIoMean : " + metrics.getReadIoMean());
|
||||
LOG.info("readIoStdDev : " + metrics.getReadIoStdDev());
|
||||
|
||||
LOG.info("writeIoSampleCount : " + metrics.getWriteIoSampleCount());
|
||||
LOG.info("writeIoMean : " + metrics.getWriteIoMean());
|
||||
LOG.info("writeIoStdDev : " + metrics.getWriteIoStdDev());
|
||||
|
||||
LOG.info("fileIoErrorSampleCount : "
|
||||
+ metrics.getFileIoErrorSampleCount());
|
||||
LOG.info("fileIoErrorMean : " + metrics.getFileIoErrorMean());
|
||||
LOG.info("fileIoErrorStdDev : " + metrics.getFileIoErrorStdDev());
|
||||
}
|
||||
}
|
@ -55,6 +55,7 @@
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
@ -916,6 +917,11 @@ public FileIoProvider getFileIoProvider() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataNodeVolumeMetrics getMetrics() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public VolumeCheckResult check(VolumeCheckContext context)
|
||||
|
@ -31,6 +31,7 @@
|
||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
|
||||
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
@ -127,6 +128,11 @@ public FileIoProvider getFileIoProvider() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataNodeVolumeMetrics getMetrics() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public VolumeCheckResult check(VolumeCheckContext context)
|
||||
throws Exception {
|
||||
|
Loading…
Reference in New Issue
Block a user