HDFS-8232. Missing datanode counters when using Metrics2 sink interface. Contributed by Anu Engineer.
This commit is contained in:
parent
db1b674b50
commit
feb68cb547
@ -574,6 +574,9 @@ Release 2.8.0 - UNRELEASED
|
||||
HDFS-8205. CommandFormat#parse() should not parse option as
|
||||
value of option. (Peter Shi and Xiaoyu Yao via Arpit Agarwal)
|
||||
|
||||
HDFS-8232. Missing datanode counters when using Metrics2 sink interface.
|
||||
(Anu Engineer via cnauroth)
|
||||
|
||||
Release 2.7.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -71,6 +71,7 @@
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetricHelper;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||
@ -104,6 +105,9 @@
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
@ -316,6 +320,13 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
|
||||
lazyWriter = new Daemon(new LazyWriter(conf));
|
||||
lazyWriter.start();
|
||||
registerMBean(datanode.getDatanodeUuid());
|
||||
|
||||
// Add a Metrics2 Source Interface. This is same
|
||||
// data as MXBean. We can remove the registerMbean call
|
||||
// in a release where we can break backward compatibility
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
ms.register("FSDatasetState", "FSDatasetState", this);
|
||||
|
||||
localFS = FileSystem.getLocal(conf);
|
||||
blockPinningEnabled = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED,
|
||||
@ -636,6 +647,22 @@ public long getNumBlocksFailedToUncache() {
|
||||
return cacheManager.getNumBlocksFailedToUncache();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get metrics from the metrics source
|
||||
*
|
||||
* @param collector to contain the resulting metrics snapshot
|
||||
* @param all if true, return all metrics even if unchanged.
|
||||
*/
|
||||
@Override
|
||||
public void getMetrics(MetricsCollector collector, boolean all) {
|
||||
try {
|
||||
DataNodeMetricHelper.getMetrics(collector, this, "FSDatasetState");
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Exception thrown while metric collection. Exception : "
|
||||
+ e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override // FSDatasetMBean
|
||||
public long getNumBlocksCached() {
|
||||
return cacheManager.getNumBlocksCached();
|
||||
|
@ -0,0 +1,79 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.metrics;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||
import org.apache.hadoop.metrics2.MetricsTag;
|
||||
import org.apache.hadoop.metrics2.lib.Interns;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class DataNodeMetricHelper {
|
||||
|
||||
/**
|
||||
* Get metrics helper provides Helper function for
|
||||
* metrics2 interface to act as a Metric source
|
||||
*
|
||||
* @param collector Metrics Collector that is passed in
|
||||
* @param beanClass The Class that currently impliments the metric functions
|
||||
* @param context A string that idenitifies the context
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void getMetrics(MetricsCollector collector,
|
||||
FSDatasetMBean beanClass, String context)
|
||||
throws IOException {
|
||||
|
||||
if (beanClass == null) {
|
||||
throw new IOException("beanClass cannot be null");
|
||||
}
|
||||
|
||||
String className = beanClass.getClass().getName();
|
||||
|
||||
collector.addRecord(className)
|
||||
.setContext(context)
|
||||
.addGauge(Interns.info("Capacity", "Total storage capacity"),
|
||||
beanClass.getCapacity())
|
||||
.addGauge(Interns.info("DfsUsed", "Total bytes used by dfs datanode"),
|
||||
beanClass.getDfsUsed())
|
||||
.addGauge(Interns.info("Remaining", "Total bytes of free storage"),
|
||||
beanClass.getRemaining())
|
||||
.add(new MetricsTag(Interns.info("StorageInfo", "Storage ID"),
|
||||
beanClass.getStorageInfo()))
|
||||
.addGauge(Interns.info("NumFailedVolumes", "Number of failed Volumes" +
|
||||
" in the data Node"), beanClass.getNumFailedVolumes())
|
||||
.addGauge(Interns.info("LastVolumeFailureDate", "Last Volume failure in" +
|
||||
" milliseconds from epoch"), beanClass.getLastVolumeFailureDate())
|
||||
.addGauge(Interns.info("EstimatedCapacityLostTotal", "Total capacity lost"
|
||||
+ " due to volume failure"), beanClass.getEstimatedCapacityLostTotal())
|
||||
.addGauge(Interns.info("CacheUsed", "Datanode cache used in bytes"),
|
||||
beanClass.getCacheUsed())
|
||||
.addGauge(Interns.info("CacheCapacity", "Datanode cache capacity"),
|
||||
beanClass.getCacheCapacity())
|
||||
.addGauge(Interns.info("NumBlocksCached", "Datanode number" +
|
||||
" of blocks cached"), beanClass.getNumBlocksCached())
|
||||
.addGauge(Interns.info("NumBlocksFailedToCache", "Datanode number of " +
|
||||
"blocks failed to cache"), beanClass.getNumBlocksFailedToCache())
|
||||
.addGauge(Interns.info("NumBlocksFailedToUnCache", "Datanode number of" +
|
||||
" blocks failed in cache eviction"),
|
||||
beanClass.getNumBlocksFailedToUncache());
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -20,6 +20,7 @@
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.metrics2.MetricsSource;
|
||||
|
||||
/**
|
||||
*
|
||||
@ -37,7 +38,7 @@
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface FSDatasetMBean {
|
||||
public interface FSDatasetMBean extends MetricsSource {
|
||||
|
||||
/**
|
||||
* Returns the total space (in bytes) used by a block pool
|
||||
|
@ -23,8 +23,6 @@
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
@ -52,6 +50,7 @@
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetricHelper;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
@ -60,9 +59,9 @@
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
|
||||
/**
|
||||
* This class implements a simulated FSDataset.
|
||||
@ -690,6 +689,21 @@ public long getNumBlocksFailedToUncache() {
|
||||
return 0l;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get metrics from the metrics source
|
||||
*
|
||||
* @param collector to contain the resulting metrics snapshot
|
||||
* @param all if true, return all metrics even if unchanged.
|
||||
*/
|
||||
@Override
|
||||
public void getMetrics(MetricsCollector collector, boolean all) {
|
||||
try {
|
||||
DataNodeMetricHelper.getMetrics(collector, this, "SimulatedFSDataset");
|
||||
} catch (Exception e){
|
||||
//ignore Exceptions
|
||||
}
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized long getLength(ExtendedBlock b) throws IOException {
|
||||
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
|
||||
|
@ -0,0 +1,136 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import org.apache.commons.configuration.SubsetConfiguration;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.metrics2.AbstractMetric;
|
||||
import org.apache.hadoop.metrics2.MetricsRecord;
|
||||
import org.apache.hadoop.metrics2.MetricsSink;
|
||||
import org.apache.hadoop.metrics2.MetricsTag;
|
||||
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class TestDataNodeFSDataSetSink {
|
||||
private static final MetricsSystemImpl ms = new
|
||||
MetricsSystemImpl("TestFSDataSet");
|
||||
|
||||
class FSDataSetSinkTest implements MetricsSink {
|
||||
private Set<String> nameMap;
|
||||
private int count;
|
||||
|
||||
/**
|
||||
* add a metrics record in the sink
|
||||
*
|
||||
* @param record the record to add
|
||||
*/
|
||||
@Override
|
||||
public void putMetrics(MetricsRecord record) {
|
||||
// let us do this only once, otherwise
|
||||
// our count could go out of sync.
|
||||
if (count == 0) {
|
||||
for (AbstractMetric m : record.metrics()) {
|
||||
if (nameMap.contains(m.name())) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
for (MetricsTag t : record.tags()) {
|
||||
if (nameMap.contains(t.name())) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush any buffered metrics
|
||||
*/
|
||||
@Override
|
||||
public void flush() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the plugin
|
||||
*
|
||||
* @param conf the configuration object for the plugin
|
||||
*/
|
||||
@Override
|
||||
public void init(SubsetConfiguration conf) {
|
||||
nameMap = new TreeSet<>();
|
||||
nameMap.add("DfsUsed");
|
||||
nameMap.add("Capacity");
|
||||
nameMap.add("Remaining");
|
||||
nameMap.add("StorageInfo");
|
||||
nameMap.add("NumFailedVolumes");
|
||||
nameMap.add("LastVolumeFailureDate");
|
||||
nameMap.add("EstimatedCapacityLostTotal");
|
||||
nameMap.add("CacheUsed");
|
||||
nameMap.add("CacheCapacity");
|
||||
nameMap.add("NumBlocksCached");
|
||||
nameMap.add("NumBlocksFailedToCache");
|
||||
nameMap.add("NumBlocksFailedToUnCache");
|
||||
nameMap.add("Context");
|
||||
nameMap.add("Hostname");
|
||||
}
|
||||
|
||||
public int getMapCount() {
|
||||
return nameMap.size();
|
||||
}
|
||||
|
||||
public int getFoundKeyCount() {
|
||||
return count;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
/**
|
||||
* This test creates a Source and then calls into the Sink that we
|
||||
* have registered. That is calls into FSDataSetSinkTest
|
||||
*/
|
||||
public void testFSDataSetMetrics() throws InterruptedException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
String bpid = "FSDatSetSink-Test";
|
||||
SimulatedFSDataset fsdataset = new SimulatedFSDataset(null, conf);
|
||||
fsdataset.addBlockPool(bpid, conf);
|
||||
FSDataSetSinkTest sink = new FSDataSetSinkTest();
|
||||
sink.init(null);
|
||||
ms.init("Test");
|
||||
ms.start();
|
||||
ms.register("FSDataSetSource", "FSDataSetSource", fsdataset);
|
||||
ms.register("FSDataSetSink", "FSDataSetSink", sink);
|
||||
ms.startMetricsMBeans();
|
||||
ms.publishMetricsNow();
|
||||
|
||||
Thread.sleep(4000);
|
||||
|
||||
ms.stopMetricsMBeans();
|
||||
ms.shutdown();
|
||||
|
||||
// make sure we got all expected metric in the call back
|
||||
assertEquals(sink.getMapCount(), sink.getFoundKeyCount());
|
||||
|
||||
}
|
||||
}
|
@ -35,14 +35,14 @@
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetricHelper;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||
|
||||
public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
|
||||
|
||||
@ -420,6 +420,21 @@ public long getNumBlocksFailedToUncache() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get metrics from the metrics source
|
||||
*
|
||||
* @param collector to contain the resulting metrics snapshot
|
||||
* @param all if true, return all metrics even if unchanged.
|
||||
*/
|
||||
@Override
|
||||
public void getMetrics(MetricsCollector collector, boolean all) {
|
||||
try {
|
||||
DataNodeMetricHelper.getMetrics(collector, this, "ExternalDataset");
|
||||
} catch (Exception e){
|
||||
//ignore exceptions
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPinning(ExtendedBlock block) throws IOException {
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user