diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java index 5b12370536..cc32975513 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java @@ -144,6 +144,16 @@ public void stop() { scheduledTask = null; } + /** + * Get the quantile estimator. + * + * @return the quantile estimator + */ + @VisibleForTesting + public synchronized QuantileEstimator getEstimator() { + return estimator; + } + public synchronized void setEstimator(QuantileEstimator quantileEstimator) { this.estimator = quantileEstimator; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DiskValidatorFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DiskValidatorFactory.java index 29ab2ad510..7d04db23ca 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DiskValidatorFactory.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DiskValidatorFactory.java @@ -62,7 +62,8 @@ private DiskValidatorFactory() { /** * Returns {@link DiskValidator} instance corresponding to its name. - * The diskValidator parameter can be "basic" for {@link BasicDiskValidator}. + * The diskValidator parameter can be "basic" for {@link BasicDiskValidator} + * or "read-write" for {@link ReadWriteDiskValidator}. * @param diskValidator canonical class name, for example, "basic" * @throws DiskErrorException if the class cannot be located */ @@ -74,6 +75,8 @@ public static DiskValidator getInstance(String diskValidator) if (diskValidator.equalsIgnoreCase(BasicDiskValidator.NAME)) { clazz = BasicDiskValidator.class; + } else if (diskValidator.equalsIgnoreCase(ReadWriteDiskValidator.NAME)) { + clazz = ReadWriteDiskValidator.class; } else { try { clazz = Class.forName(diskValidator); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReadWriteDiskValidator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReadWriteDiskValidator.java new file mode 100644 index 0000000000..d80bb45d1e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReadWriteDiskValidator.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.util.DiskChecker.DiskErrorException; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +/** + * ReadWriteDiskValidator is the class to check a directory by to create a file, + * write some bytes into it, read back, and verify if they are identical. + * Read time and write time are recorded and put into an + * {@link ReadWriteDiskValidatorMetrics}. + */ +public class ReadWriteDiskValidator implements DiskValidator { + + public static final String NAME = "read-write"; + private static final Random RANDOM = new Random(); + + @Override + public void checkStatus(File dir) throws DiskErrorException { + ReadWriteDiskValidatorMetrics metric = + ReadWriteDiskValidatorMetrics.getMetric(dir.toString()); + Path tmpFile = null; + try { + if (!dir.isDirectory()) { + metric.diskCheckFailed(); + throw new DiskErrorException(dir + " is not a directory!"); + } + + // check the directory presence and permission. + DiskChecker.checkDir(dir); + + // create a tmp file under the dir + tmpFile = Files.createTempFile(dir.toPath(), "test", "tmp"); + + // write 16 bytes into the tmp file + byte[] inputBytes = new byte[16]; + RANDOM.nextBytes(inputBytes); + long startTime = System.nanoTime(); + Files.write(tmpFile, inputBytes); + long writeLatency = TimeUnit.MICROSECONDS.convert( + System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + metric.addWriteFileLatency(writeLatency); + + // read back + startTime = System.nanoTime(); + byte[] outputBytes = Files.readAllBytes(tmpFile); + long readLatency = TimeUnit.MICROSECONDS.convert( + System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + metric.addReadFileLatency(readLatency); + + // validation + if (!Arrays.equals(inputBytes, outputBytes)) { + metric.diskCheckFailed(); + throw new DiskErrorException("Data in file has been corrupted."); + } + } catch (IOException e) { + metric.diskCheckFailed(); + throw new DiskErrorException("Disk Check failed!", e); + } finally { + // delete the file + if (tmpFile != null) { + try { + Files.delete(tmpFile); + } catch (IOException e) { + metric.diskCheckFailed(); + throw new DiskErrorException("File deletion failed!", e); + } + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReadWriteDiskValidatorMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReadWriteDiskValidatorMetrics.java new file mode 100644 index 0000000000..620186298d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReadWriteDiskValidatorMetrics.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.lib.*; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +/** + * The metrics for a directory generated by {@link ReadWriteDiskValidator}. + */ +@InterfaceAudience.Private +public class ReadWriteDiskValidatorMetrics { + @Metric("# of disk failure") MutableCounterInt failureCount; + @Metric("Time of last failure") MutableGaugeLong lastFailureTime; + + private final MetricsRegistry registry; + private static final MetricsInfo RECORD_INFO = + info("ReadWriteDiskValidatorMetrics", "Metrics for the DiskValidator"); + + private final int[] quantileIntervals = new int[] { + 60 * 60, // 1h + 24 * 60 * 60, //1 day + 10 * 24 * 60 * 60 //10 day + }; + private final MutableQuantiles[] fileReadQuantiles; + private final MutableQuantiles[] fileWriteQuantiles; + + public ReadWriteDiskValidatorMetrics() { + registry = new MetricsRegistry(RECORD_INFO); + + fileReadQuantiles = new MutableQuantiles[quantileIntervals.length]; + for (int i = 0; i < fileReadQuantiles.length; i++) { + int interval = quantileIntervals[i]; + fileReadQuantiles[i] = registry.newQuantiles( + "readLatency" + interval + "s", + "File read latency", "Ops", "latencyMicros", interval); + } + + fileWriteQuantiles = new MutableQuantiles[quantileIntervals.length]; + for (int i = 0; i < fileWriteQuantiles.length; i++) { + int interval = quantileIntervals[i]; + fileWriteQuantiles[i] = registry.newQuantiles( + "writeLatency" + interval + "s", + "File write latency", "Ops", "latencyMicros", interval); + } + } + + /** + * Simple metrics cache to help prevent re-registrations and help to access + * metrics. + */ + protected final static Map DIR_METRICS + = new HashMap<>(); + + /** + * Get a metric by given directory name. + * + * @param dirName directory name + * @return the metric + */ + public synchronized static ReadWriteDiskValidatorMetrics getMetric( + String dirName) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + + ReadWriteDiskValidatorMetrics metrics = DIR_METRICS.get(dirName); + if (metrics == null) { + metrics = new ReadWriteDiskValidatorMetrics(); + + // Register with the MetricsSystems + if (ms != null) { + metrics = ms.register(sourceName(dirName), + "Metrics for directory: " + dirName, metrics); + } + DIR_METRICS.put(dirName, metrics); + } + + return metrics; + } + + /** + * Add the file write latency to {@link MutableQuantiles} metrics. + * + * @param writeLatency file write latency in microseconds + */ + public void addWriteFileLatency(long writeLatency) { + if (fileWriteQuantiles != null) { + for (MutableQuantiles q : fileWriteQuantiles) { + q.add(writeLatency); + } + } + } + + /** + * Add the file read latency to {@link MutableQuantiles} metrics. + * + * @param readLatency file read latency in microseconds + */ + public void addReadFileLatency(long readLatency) { + if (fileReadQuantiles!= null) { + for (MutableQuantiles q : fileReadQuantiles) { + q.add(readLatency); + } + } + } + + /** + * Get a source name by given directory name. + * + * @param dirName directory name + * @return the source name + */ + protected static String sourceName(String dirName) { + StringBuilder sb = new StringBuilder(RECORD_INFO.name()); + sb.append(",dir=").append(dirName); + return sb.toString(); + } + + /** + * Increase the failure count and update the last failure timestamp. + */ + public void diskCheckFailed() { + failureCount.incr(); + lastFailureTime.set(System.nanoTime()); + } + + /** + * Get {@link MutableQuantiles} metrics for the file read time. + * + * @return {@link MutableQuantiles} metrics for the file read time + */ + @VisibleForTesting + protected MutableQuantiles[] getFileReadQuantiles() { + return fileReadQuantiles; + } + + /** + * Get {@link MutableQuantiles} metrics for the file write time. + * + * @return {@link MutableQuantiles} metrics for the file write time + */ + @VisibleForTesting + protected MutableQuantiles[] getFileWriteQuantiles() { + return fileWriteQuantiles; + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java index 3c0999ed5d..5d52cad66b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java @@ -49,6 +49,22 @@ public static void assertMetric(MetricsRecord record, assertEquals(expectedValue, resourceLimitMetric.value()); } + public static Number getMetricValueByName(MetricsRecord record, + String metricName) { + AbstractMetric resourceLimitMetric = getFirstMetricByName( + record, metricName); + assertNotNull(resourceLimitMetric); + return resourceLimitMetric.value(); + } + + public static void assertMetricNotNull(MetricsRecord record, + String metricName) { + AbstractMetric resourceLimitMetric = getFirstMetricByName( + record, metricName); + assertNotNull("Metric " + metricName + " doesn't exist", + resourceLimitMetric); + } + private static MetricsTag getFirstTagByName(MetricsRecord record, String name) { return Iterables.getFirst(Iterables.filter(record.tags(), new MetricsTagPredicate(name)), null); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestReadWriteDiskValidator.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestReadWriteDiskValidator.java new file mode 100644 index 0000000000..46f40337f3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestReadWriteDiskValidator.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; +import org.apache.hadoop.metrics2.impl.MetricsRecords; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.junit.Before; +import org.junit.Test; +import org.junit.Assert; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * The class to test {@link ReadWriteDiskValidator} and + * {@link ReadWriteDiskValidatorMetrics}. + */ +public class TestReadWriteDiskValidator { + + private MetricsSystem ms; + + @Before + public void setUp() { + ms = DefaultMetricsSystem.instance(); + } + + @Test + public void testReadWriteDiskValidator() + throws DiskErrorException, InterruptedException { + int count = 100; + File testDir = new File(System.getProperty("test.build.data")); + ReadWriteDiskValidator readWriteDiskValidator = + (ReadWriteDiskValidator) DiskValidatorFactory.getInstance( + ReadWriteDiskValidator.NAME); + + for (int i = 0; i < count; i++) { + readWriteDiskValidator.checkStatus(testDir); + } + + ReadWriteDiskValidatorMetrics metric = + ReadWriteDiskValidatorMetrics.getMetric(testDir.toString()); + Assert.assertEquals("The count number of estimator in MutableQuantiles" + + "metrics of file read is not right", + metric.getFileReadQuantiles()[0].getEstimator().getCount(), count); + + Assert.assertEquals("The count number of estimator in MutableQuantiles" + + "metrics of file write is not right", + metric.getFileWriteQuantiles()[0].getEstimator().getCount(), + count); + + MetricsSource source = ms.getSource( + ReadWriteDiskValidatorMetrics.sourceName(testDir.toString())); + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + source.getMetrics(collector, true); + + MetricsRecords.assertMetric(collector.getRecords().get(0), + "FailureCount", 0); + MetricsRecords.assertMetric(collector.getRecords().get(0), + "LastFailureTime", (long)0); + + // All MutableQuantiles haven't rolled over yet because the minimum + // interval is 1 hours, so we just test if these metrics exist. + MetricsRecords.assertMetricNotNull(collector.getRecords().get(0), + "WriteLatency3600sNumOps"); + MetricsRecords.assertMetricNotNull(collector.getRecords().get(0), + "WriteLatency3600s50thPercentileLatencyMicros"); + MetricsRecords.assertMetricNotNull(collector.getRecords().get(0), + "WriteLatency86400sNumOps"); + MetricsRecords.assertMetricNotNull(collector.getRecords().get(0), + "WriteLatency864000sNumOps"); + + MetricsRecords.assertMetricNotNull(collector.getRecords().get(0), + "ReadLatency3600sNumOps"); + MetricsRecords.assertMetricNotNull(collector.getRecords().get(0), + "ReadLatency3600s50thPercentileLatencyMicros"); + MetricsRecords.assertMetricNotNull(collector.getRecords().get(0), + "ReadLatency86400sNumOps"); + MetricsRecords.assertMetricNotNull(collector.getRecords().get(0), + "ReadLatency864000sNumOps"); + } + + @Test + public void testCheckFailures() throws Throwable { + ReadWriteDiskValidator readWriteDiskValidator = + (ReadWriteDiskValidator) DiskValidatorFactory.getInstance( + ReadWriteDiskValidator.NAME); + + // create a temporary test directory under the system test directory + File testDir = Files.createTempDirectory( + Paths.get(System.getProperty("test.build.data")), "test").toFile(); + + try { + Shell.execCommand(Shell.getSetPermissionCommand("000", false, + testDir.getAbsolutePath())); + } catch (Exception e){ + testDir.delete(); + throw e; + } + + try { + readWriteDiskValidator.checkStatus(testDir); + fail("Disk check should fail."); + } catch (DiskErrorException e) { + assertTrue(e.getMessage().equals("Disk Check failed!")); + } + + MetricsSource source = ms.getSource( + ReadWriteDiskValidatorMetrics.sourceName(testDir.toString())); + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + source.getMetrics(collector, true); + + try { + readWriteDiskValidator.checkStatus(testDir); + fail("Disk check should fail."); + } catch (DiskErrorException e) { + assertTrue(e.getMessage().equals("Disk Check failed!")); + } + + source.getMetrics(collector, true); + + // verify the first metrics record + MetricsRecords.assertMetric(collector.getRecords().get(0), + "FailureCount", 1); + Long lastFailureTime1 = (Long) MetricsRecords.getMetricValueByName( + collector.getRecords().get(0), "LastFailureTime"); + + // verify the second metrics record + MetricsRecords.assertMetric(collector.getRecords().get(1), + "FailureCount", 2); + Long lastFailureTime2 = (Long) MetricsRecords.getMetricValueByName( + collector.getRecords().get(1), "LastFailureTime"); + assertTrue("The first failure time should be less than the second one", + lastFailureTime1 < lastFailureTime2); + + testDir.delete(); + } +}