From feb68cb5470dc3e6c16b6bc1549141613e360601 Mon Sep 17 00:00:00 2001 From: cnauroth Date: Mon, 27 Apr 2015 16:48:13 -0700 Subject: [PATCH] HDFS-8232. Missing datanode counters when using Metrics2 sink interface. Contributed by Anu Engineer. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../fsdataset/impl/FsDatasetImpl.java | 27 ++++ .../metrics/DataNodeMetricHelper.java | 79 ++++++++++ .../datanode/metrics/FSDatasetMBean.java | 3 +- .../server/datanode/SimulatedFSDataset.java | 20 ++- .../datanode/TestDataNodeFSDataSetSink.java | 136 ++++++++++++++++++ .../extdataset/ExternalDatasetImpl.java | 19 ++- 7 files changed, 281 insertions(+), 6 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetricHelper.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeFSDataSetSink.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d56ea0ce1e..326de0be4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -574,6 +574,9 @@ Release 2.8.0 - UNRELEASED HDFS-8205. CommandFormat#parse() should not parse option as value of option. (Peter Shi and Xiaoyu Yao via Arpit Agarwal) + HDFS-8232. Missing datanode counters when using Metrics2 sink interface. + (Anu Engineer via cnauroth) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 8869f5a378..b87daecefc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetricHelper; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; @@ -104,6 +105,9 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; @@ -316,6 +320,13 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) lazyWriter = new Daemon(new LazyWriter(conf)); lazyWriter.start(); registerMBean(datanode.getDatanodeUuid()); + + // Add a Metrics2 Source Interface. This is same + // data as MXBean. We can remove the registerMbean call + // in a release where we can break backward compatibility + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.register("FSDatasetState", "FSDatasetState", this); + localFS = FileSystem.getLocal(conf); blockPinningEnabled = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, @@ -636,6 +647,22 @@ public long getNumBlocksFailedToUncache() { return cacheManager.getNumBlocksFailedToUncache(); } + /** + * Get metrics from the metrics source + * + * @param collector to contain the resulting metrics snapshot + * @param all if true, return all metrics even if unchanged. + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + try { + DataNodeMetricHelper.getMetrics(collector, this, "FSDatasetState"); + } catch (Exception e) { + LOG.warn("Exception thrown while metric collection. Exception : " + + e.getMessage()); + } + } + @Override // FSDatasetMBean public long getNumBlocksCached() { return cacheManager.getNumBlocksCached(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetricHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetricHelper.java new file mode 100644 index 0000000000..8bbe08bc05 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetricHelper.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.metrics; + +import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.lib.Interns; + +import java.io.IOException; + +public class DataNodeMetricHelper { + + /** + * Get metrics helper provides Helper function for + * metrics2 interface to act as a Metric source + * + * @param collector Metrics Collector that is passed in + * @param beanClass The Class that currently impliments the metric functions + * @param context A string that idenitifies the context + * + * @throws IOException + */ + public static void getMetrics(MetricsCollector collector, + FSDatasetMBean beanClass, String context) + throws IOException { + + if (beanClass == null) { + throw new IOException("beanClass cannot be null"); + } + + String className = beanClass.getClass().getName(); + + collector.addRecord(className) + .setContext(context) + .addGauge(Interns.info("Capacity", "Total storage capacity"), + beanClass.getCapacity()) + .addGauge(Interns.info("DfsUsed", "Total bytes used by dfs datanode"), + beanClass.getDfsUsed()) + .addGauge(Interns.info("Remaining", "Total bytes of free storage"), + beanClass.getRemaining()) + .add(new MetricsTag(Interns.info("StorageInfo", "Storage ID"), + beanClass.getStorageInfo())) + .addGauge(Interns.info("NumFailedVolumes", "Number of failed Volumes" + + " in the data Node"), beanClass.getNumFailedVolumes()) + .addGauge(Interns.info("LastVolumeFailureDate", "Last Volume failure in" + + " milliseconds from epoch"), beanClass.getLastVolumeFailureDate()) + .addGauge(Interns.info("EstimatedCapacityLostTotal", "Total capacity lost" + + " due to volume failure"), beanClass.getEstimatedCapacityLostTotal()) + .addGauge(Interns.info("CacheUsed", "Datanode cache used in bytes"), + beanClass.getCacheUsed()) + .addGauge(Interns.info("CacheCapacity", "Datanode cache capacity"), + beanClass.getCacheCapacity()) + .addGauge(Interns.info("NumBlocksCached", "Datanode number" + + " of blocks cached"), beanClass.getNumBlocksCached()) + .addGauge(Interns.info("NumBlocksFailedToCache", "Datanode number of " + + "blocks failed to cache"), beanClass.getNumBlocksFailedToCache()) + .addGauge(Interns.info("NumBlocksFailedToUnCache", "Datanode number of" + + " blocks failed in cache eviction"), + beanClass.getNumBlocksFailedToUncache()); + + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java index 5f22540afd..c2f175b97d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java @@ -20,6 +20,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsSource; /** * @@ -37,7 +38,7 @@ * */ @InterfaceAudience.Private -public interface FSDatasetMBean { +public interface FSDatasetMBean extends MetricsSource { /** * Returns the total space (in bytes) used by a block pool diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 060e055d24..bc2423765d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -23,8 +23,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -52,6 +50,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetricHelper; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -60,9 +59,9 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.util.DataChecksum; -import org.apache.hadoop.util.DiskChecker.DiskErrorException; /** * This class implements a simulated FSDataset. @@ -690,6 +689,21 @@ public long getNumBlocksFailedToUncache() { return 0l; } + /** + * Get metrics from the metrics source + * + * @param collector to contain the resulting metrics snapshot + * @param all if true, return all metrics even if unchanged. + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + try { + DataNodeMetricHelper.getMetrics(collector, this, "SimulatedFSDataset"); + } catch (Exception e){ + //ignore Exceptions + } + } + @Override // FsDatasetSpi public synchronized long getLength(ExtendedBlock b) throws IOException { final Map map = getMap(b.getBlockPoolId()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeFSDataSetSink.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeFSDataSetSink.java new file mode 100644 index 0000000000..dbd6bb0b1c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeFSDataSetSink.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsSink; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.junit.Test; + +import java.util.Set; +import java.util.TreeSet; + +import static org.junit.Assert.assertEquals; + +public class TestDataNodeFSDataSetSink { + private static final MetricsSystemImpl ms = new + MetricsSystemImpl("TestFSDataSet"); + + class FSDataSetSinkTest implements MetricsSink { + private Set nameMap; + private int count; + + /** + * add a metrics record in the sink + * + * @param record the record to add + */ + @Override + public void putMetrics(MetricsRecord record) { + // let us do this only once, otherwise + // our count could go out of sync. + if (count == 0) { + for (AbstractMetric m : record.metrics()) { + if (nameMap.contains(m.name())) { + count++; + } + } + + for (MetricsTag t : record.tags()) { + if (nameMap.contains(t.name())) { + count++; + } + } + } + } + + /** + * Flush any buffered metrics + */ + @Override + public void flush() { + + } + + /** + * Initialize the plugin + * + * @param conf the configuration object for the plugin + */ + @Override + public void init(SubsetConfiguration conf) { + nameMap = new TreeSet<>(); + nameMap.add("DfsUsed"); + nameMap.add("Capacity"); + nameMap.add("Remaining"); + nameMap.add("StorageInfo"); + nameMap.add("NumFailedVolumes"); + nameMap.add("LastVolumeFailureDate"); + nameMap.add("EstimatedCapacityLostTotal"); + nameMap.add("CacheUsed"); + nameMap.add("CacheCapacity"); + nameMap.add("NumBlocksCached"); + nameMap.add("NumBlocksFailedToCache"); + nameMap.add("NumBlocksFailedToUnCache"); + nameMap.add("Context"); + nameMap.add("Hostname"); + } + + public int getMapCount() { + return nameMap.size(); + } + + public int getFoundKeyCount() { + return count; + } + } + + @Test + /** + * This test creates a Source and then calls into the Sink that we + * have registered. That is calls into FSDataSetSinkTest + */ + public void testFSDataSetMetrics() throws InterruptedException { + Configuration conf = new HdfsConfiguration(); + String bpid = "FSDatSetSink-Test"; + SimulatedFSDataset fsdataset = new SimulatedFSDataset(null, conf); + fsdataset.addBlockPool(bpid, conf); + FSDataSetSinkTest sink = new FSDataSetSinkTest(); + sink.init(null); + ms.init("Test"); + ms.start(); + ms.register("FSDataSetSource", "FSDataSetSource", fsdataset); + ms.register("FSDataSetSink", "FSDataSetSink", sink); + ms.startMetricsMBeans(); + ms.publishMetricsNow(); + + Thread.sleep(4000); + + ms.stopMetricsMBeans(); + ms.shutdown(); + + // make sure we got all expected metric in the call back + assertEquals(sink.getMapCount(), sink.getFoundKeyCount()); + + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 2c6d868de6..b7c20289d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -35,14 +35,14 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetricHelper; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; -import org.apache.hadoop.util.DiskChecker; -import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.metrics2.MetricsCollector; public class ExternalDatasetImpl implements FsDatasetSpi { @@ -420,6 +420,21 @@ public long getNumBlocksFailedToUncache() { return 0; } + /** + * Get metrics from the metrics source + * + * @param collector to contain the resulting metrics snapshot + * @param all if true, return all metrics even if unchanged. + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + try { + DataNodeMetricHelper.getMetrics(collector, this, "ExternalDataset"); + } catch (Exception e){ + //ignore exceptions + } + } + @Override public void setPinning(ExtendedBlock block) throws IOException { }