YARN-5529. Create new DiskValidator class with metrics (yufeigu via rkanter)

This commit is contained in:
Robert Kanter 2017-01-03 12:13:32 -08:00
parent 7fcc73fc0d
commit 591fb15944
6 changed files with 456 additions and 1 deletions

View File

@ -144,6 +144,16 @@ public void stop() {
scheduledTask = null; scheduledTask = null;
} }
/**
* Get the quantile estimator.
*
* @return the quantile estimator
*/
@VisibleForTesting
public synchronized QuantileEstimator getEstimator() {
return estimator;
}
public synchronized void setEstimator(QuantileEstimator quantileEstimator) { public synchronized void setEstimator(QuantileEstimator quantileEstimator) {
this.estimator = quantileEstimator; this.estimator = quantileEstimator;
} }

View File

@ -62,7 +62,8 @@ private DiskValidatorFactory() {
/** /**
* Returns {@link DiskValidator} instance corresponding to its name. * Returns {@link DiskValidator} instance corresponding to its name.
* The diskValidator parameter can be "basic" for {@link BasicDiskValidator}. * The diskValidator parameter can be "basic" for {@link BasicDiskValidator}
* or "read-write" for {@link ReadWriteDiskValidator}.
* @param diskValidator canonical class name, for example, "basic" * @param diskValidator canonical class name, for example, "basic"
* @throws DiskErrorException if the class cannot be located * @throws DiskErrorException if the class cannot be located
*/ */
@ -74,6 +75,8 @@ public static DiskValidator getInstance(String diskValidator)
if (diskValidator.equalsIgnoreCase(BasicDiskValidator.NAME)) { if (diskValidator.equalsIgnoreCase(BasicDiskValidator.NAME)) {
clazz = BasicDiskValidator.class; clazz = BasicDiskValidator.class;
} else if (diskValidator.equalsIgnoreCase(ReadWriteDiskValidator.NAME)) {
clazz = ReadWriteDiskValidator.class;
} else { } else {
try { try {
clazz = Class.forName(diskValidator); clazz = Class.forName(diskValidator);

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* ReadWriteDiskValidator is the class to check a directory by to create a file,
* write some bytes into it, read back, and verify if they are identical.
* Read time and write time are recorded and put into an
* {@link ReadWriteDiskValidatorMetrics}.
*/
public class ReadWriteDiskValidator implements DiskValidator {
public static final String NAME = "read-write";
private static final Random RANDOM = new Random();
@Override
public void checkStatus(File dir) throws DiskErrorException {
ReadWriteDiskValidatorMetrics metric =
ReadWriteDiskValidatorMetrics.getMetric(dir.toString());
Path tmpFile = null;
try {
if (!dir.isDirectory()) {
metric.diskCheckFailed();
throw new DiskErrorException(dir + " is not a directory!");
}
// check the directory presence and permission.
DiskChecker.checkDir(dir);
// create a tmp file under the dir
tmpFile = Files.createTempFile(dir.toPath(), "test", "tmp");
// write 16 bytes into the tmp file
byte[] inputBytes = new byte[16];
RANDOM.nextBytes(inputBytes);
long startTime = System.nanoTime();
Files.write(tmpFile, inputBytes);
long writeLatency = TimeUnit.MICROSECONDS.convert(
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
metric.addWriteFileLatency(writeLatency);
// read back
startTime = System.nanoTime();
byte[] outputBytes = Files.readAllBytes(tmpFile);
long readLatency = TimeUnit.MICROSECONDS.convert(
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
metric.addReadFileLatency(readLatency);
// validation
if (!Arrays.equals(inputBytes, outputBytes)) {
metric.diskCheckFailed();
throw new DiskErrorException("Data in file has been corrupted.");
}
} catch (IOException e) {
metric.diskCheckFailed();
throw new DiskErrorException("Disk Check failed!", e);
} finally {
// delete the file
if (tmpFile != null) {
try {
Files.delete(tmpFile);
} catch (IOException e) {
metric.diskCheckFailed();
throw new DiskErrorException("File deletion failed!", e);
}
}
}
}
}

View File

@ -0,0 +1,170 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.lib.*;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hadoop.metrics2.lib.Interns.info;
/**
* The metrics for a directory generated by {@link ReadWriteDiskValidator}.
*/
@InterfaceAudience.Private
public class ReadWriteDiskValidatorMetrics {
@Metric("# of disk failure") MutableCounterInt failureCount;
@Metric("Time of last failure") MutableGaugeLong lastFailureTime;
private final MetricsRegistry registry;
private static final MetricsInfo RECORD_INFO =
info("ReadWriteDiskValidatorMetrics", "Metrics for the DiskValidator");
private final int[] quantileIntervals = new int[] {
60 * 60, // 1h
24 * 60 * 60, //1 day
10 * 24 * 60 * 60 //10 day
};
private final MutableQuantiles[] fileReadQuantiles;
private final MutableQuantiles[] fileWriteQuantiles;
public ReadWriteDiskValidatorMetrics() {
registry = new MetricsRegistry(RECORD_INFO);
fileReadQuantiles = new MutableQuantiles[quantileIntervals.length];
for (int i = 0; i < fileReadQuantiles.length; i++) {
int interval = quantileIntervals[i];
fileReadQuantiles[i] = registry.newQuantiles(
"readLatency" + interval + "s",
"File read latency", "Ops", "latencyMicros", interval);
}
fileWriteQuantiles = new MutableQuantiles[quantileIntervals.length];
for (int i = 0; i < fileWriteQuantiles.length; i++) {
int interval = quantileIntervals[i];
fileWriteQuantiles[i] = registry.newQuantiles(
"writeLatency" + interval + "s",
"File write latency", "Ops", "latencyMicros", interval);
}
}
/**
* Simple metrics cache to help prevent re-registrations and help to access
* metrics.
*/
protected final static Map<String, ReadWriteDiskValidatorMetrics> DIR_METRICS
= new HashMap<>();
/**
* Get a metric by given directory name.
*
* @param dirName directory name
* @return the metric
*/
public synchronized static ReadWriteDiskValidatorMetrics getMetric(
String dirName) {
MetricsSystem ms = DefaultMetricsSystem.instance();
ReadWriteDiskValidatorMetrics metrics = DIR_METRICS.get(dirName);
if (metrics == null) {
metrics = new ReadWriteDiskValidatorMetrics();
// Register with the MetricsSystems
if (ms != null) {
metrics = ms.register(sourceName(dirName),
"Metrics for directory: " + dirName, metrics);
}
DIR_METRICS.put(dirName, metrics);
}
return metrics;
}
/**
* Add the file write latency to {@link MutableQuantiles} metrics.
*
* @param writeLatency file write latency in microseconds
*/
public void addWriteFileLatency(long writeLatency) {
if (fileWriteQuantiles != null) {
for (MutableQuantiles q : fileWriteQuantiles) {
q.add(writeLatency);
}
}
}
/**
* Add the file read latency to {@link MutableQuantiles} metrics.
*
* @param readLatency file read latency in microseconds
*/
public void addReadFileLatency(long readLatency) {
if (fileReadQuantiles!= null) {
for (MutableQuantiles q : fileReadQuantiles) {
q.add(readLatency);
}
}
}
/**
* Get a source name by given directory name.
*
* @param dirName directory name
* @return the source name
*/
protected static String sourceName(String dirName) {
StringBuilder sb = new StringBuilder(RECORD_INFO.name());
sb.append(",dir=").append(dirName);
return sb.toString();
}
/**
* Increase the failure count and update the last failure timestamp.
*/
public void diskCheckFailed() {
failureCount.incr();
lastFailureTime.set(System.nanoTime());
}
/**
* Get {@link MutableQuantiles} metrics for the file read time.
*
* @return {@link MutableQuantiles} metrics for the file read time
*/
@VisibleForTesting
protected MutableQuantiles[] getFileReadQuantiles() {
return fileReadQuantiles;
}
/**
* Get {@link MutableQuantiles} metrics for the file write time.
*
* @return {@link MutableQuantiles} metrics for the file write time
*/
@VisibleForTesting
protected MutableQuantiles[] getFileWriteQuantiles() {
return fileWriteQuantiles;
}
}

View File

@ -49,6 +49,22 @@ public static void assertMetric(MetricsRecord record,
assertEquals(expectedValue, resourceLimitMetric.value()); assertEquals(expectedValue, resourceLimitMetric.value());
} }
public static Number getMetricValueByName(MetricsRecord record,
String metricName) {
AbstractMetric resourceLimitMetric = getFirstMetricByName(
record, metricName);
assertNotNull(resourceLimitMetric);
return resourceLimitMetric.value();
}
public static void assertMetricNotNull(MetricsRecord record,
String metricName) {
AbstractMetric resourceLimitMetric = getFirstMetricByName(
record, metricName);
assertNotNull("Metric " + metricName + " doesn't exist",
resourceLimitMetric);
}
private static MetricsTag getFirstTagByName(MetricsRecord record, String name) { private static MetricsTag getFirstTagByName(MetricsRecord record, String name) {
return Iterables.getFirst(Iterables.filter(record.tags(), return Iterables.getFirst(Iterables.filter(record.tags(),
new MetricsTagPredicate(name)), null); new MetricsTagPredicate(name)), null);

View File

@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.metrics2.impl.MetricsRecords;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.junit.Before;
import org.junit.Test;
import org.junit.Assert;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
/**
* The class to test {@link ReadWriteDiskValidator} and
* {@link ReadWriteDiskValidatorMetrics}.
*/
public class TestReadWriteDiskValidator {
private MetricsSystem ms;
@Before
public void setUp() {
ms = DefaultMetricsSystem.instance();
}
@Test
public void testReadWriteDiskValidator()
throws DiskErrorException, InterruptedException {
int count = 100;
File testDir = new File(System.getProperty("test.build.data"));
ReadWriteDiskValidator readWriteDiskValidator =
(ReadWriteDiskValidator) DiskValidatorFactory.getInstance(
ReadWriteDiskValidator.NAME);
for (int i = 0; i < count; i++) {
readWriteDiskValidator.checkStatus(testDir);
}
ReadWriteDiskValidatorMetrics metric =
ReadWriteDiskValidatorMetrics.getMetric(testDir.toString());
Assert.assertEquals("The count number of estimator in MutableQuantiles"
+ "metrics of file read is not right",
metric.getFileReadQuantiles()[0].getEstimator().getCount(), count);
Assert.assertEquals("The count number of estimator in MutableQuantiles"
+ "metrics of file write is not right",
metric.getFileWriteQuantiles()[0].getEstimator().getCount(),
count);
MetricsSource source = ms.getSource(
ReadWriteDiskValidatorMetrics.sourceName(testDir.toString()));
MetricsCollectorImpl collector = new MetricsCollectorImpl();
source.getMetrics(collector, true);
MetricsRecords.assertMetric(collector.getRecords().get(0),
"FailureCount", 0);
MetricsRecords.assertMetric(collector.getRecords().get(0),
"LastFailureTime", (long)0);
// All MutableQuantiles haven't rolled over yet because the minimum
// interval is 1 hours, so we just test if these metrics exist.
MetricsRecords.assertMetricNotNull(collector.getRecords().get(0),
"WriteLatency3600sNumOps");
MetricsRecords.assertMetricNotNull(collector.getRecords().get(0),
"WriteLatency3600s50thPercentileLatencyMicros");
MetricsRecords.assertMetricNotNull(collector.getRecords().get(0),
"WriteLatency86400sNumOps");
MetricsRecords.assertMetricNotNull(collector.getRecords().get(0),
"WriteLatency864000sNumOps");
MetricsRecords.assertMetricNotNull(collector.getRecords().get(0),
"ReadLatency3600sNumOps");
MetricsRecords.assertMetricNotNull(collector.getRecords().get(0),
"ReadLatency3600s50thPercentileLatencyMicros");
MetricsRecords.assertMetricNotNull(collector.getRecords().get(0),
"ReadLatency86400sNumOps");
MetricsRecords.assertMetricNotNull(collector.getRecords().get(0),
"ReadLatency864000sNumOps");
}
@Test
public void testCheckFailures() throws Throwable {
ReadWriteDiskValidator readWriteDiskValidator =
(ReadWriteDiskValidator) DiskValidatorFactory.getInstance(
ReadWriteDiskValidator.NAME);
// create a temporary test directory under the system test directory
File testDir = Files.createTempDirectory(
Paths.get(System.getProperty("test.build.data")), "test").toFile();
try {
Shell.execCommand(Shell.getSetPermissionCommand("000", false,
testDir.getAbsolutePath()));
} catch (Exception e){
testDir.delete();
throw e;
}
try {
readWriteDiskValidator.checkStatus(testDir);
fail("Disk check should fail.");
} catch (DiskErrorException e) {
assertTrue(e.getMessage().equals("Disk Check failed!"));
}
MetricsSource source = ms.getSource(
ReadWriteDiskValidatorMetrics.sourceName(testDir.toString()));
MetricsCollectorImpl collector = new MetricsCollectorImpl();
source.getMetrics(collector, true);
try {
readWriteDiskValidator.checkStatus(testDir);
fail("Disk check should fail.");
} catch (DiskErrorException e) {
assertTrue(e.getMessage().equals("Disk Check failed!"));
}
source.getMetrics(collector, true);
// verify the first metrics record
MetricsRecords.assertMetric(collector.getRecords().get(0),
"FailureCount", 1);
Long lastFailureTime1 = (Long) MetricsRecords.getMetricValueByName(
collector.getRecords().get(0), "LastFailureTime");
// verify the second metrics record
MetricsRecords.assertMetric(collector.getRecords().get(1),
"FailureCount", 2);
Long lastFailureTime2 = (Long) MetricsRecords.getMetricValueByName(
collector.getRecords().get(1), "LastFailureTime");
assertTrue("The first failure time should be less than the second one",
lastFailureTime1 < lastFailureTime2);
testDir.delete();
}
}