From f66a5226959f2fc78fcc4ac75499d703b6b11f79 Mon Sep 17 00:00:00 2001 From: Chen Liang Date: Wed, 13 Sep 2017 13:00:26 -0700 Subject: [PATCH] HDFS-12268. Ozone: Add metrics for pending storage container requests. Contributed by Yiqun Lin. --- .../hadoop/scm/XceiverClientHandler.java | 33 ++-- .../hadoop/scm/XceiverClientManager.java | 15 ++ .../hadoop/scm/XceiverClientMetrics.java | 92 +++++++++++ .../src/site/markdown/OzoneMetrics.md | 11 ++ .../ozone/scm/TestXceiverClientMetrics.java | 151 ++++++++++++++++++ 5 files changed, 293 insertions(+), 9 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java index 93d4438e99..38fc8f3b17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java @@ -18,6 +18,8 @@ package org.apache.hadoop.scm; import com.google.common.base.Preconditions; +import com.sun.tools.javac.util.Pair; + import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -27,6 +29,7 @@ .ContainerCommandResponseProto; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +39,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; - /** * Netty client handler. */ @@ -45,18 +47,21 @@ public class XceiverClientHandler extends static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class); private final ConcurrentMap> responses = + Pair, Long>> responses = new ConcurrentHashMap<>(); private final Pipeline pipeline; private volatile Channel channel; + private XceiverClientMetrics metrics; + /** * Constructs a client that can communicate to a container server. */ public XceiverClientHandler(Pipeline pipeline) { super(false); this.pipeline = pipeline; + this.metrics = XceiverClientManager.getXceiverClientMetrics(); } /** @@ -76,11 +81,18 @@ public void channelRead0(ChannelHandlerContext ctx, ContainerProtos.ContainerCommandResponseProto msg) throws Exception { Preconditions.checkNotNull(msg); + metrics.decrPendingContainerOpsMetrics(msg.getCmdType()); + String key = msg.getTraceID(); - CompletableFuture future = + Pair, Long> future = responses.remove(key); + if (future != null) { - future.complete(msg); + future.fst.complete(msg); + + long requestTime = future.snd; + metrics.addContainerOpsLatency(msg.getCmdType(), + Time.monotonicNowNanos() - requestTime); } else { LOG.error("A reply received for message that was not queued. trace " + "ID: {}", msg.getTraceID()); @@ -130,11 +142,14 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { if(StringUtils.isEmpty(request.getTraceID())) { throw new IllegalArgumentException("Invalid trace ID"); } + metrics.incrPendingContainerOpsMetrics(request.getCmdType()); - CompletableFuture response = - new CompletableFuture<>(); - - CompletableFuture previous = + CompletableFuture future + = new CompletableFuture<>(); + Pair, Long> response = + new Pair, + Long>(future, Time.monotonicNowNanos()); + Pair, Long> previous = responses.putIfAbsent(request.getTraceID(), response); if (previous != null) { @@ -147,6 +162,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } channel.writeAndFlush(request); - return response; + return response.fst; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java index 508c004d71..8174e8471e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java @@ -62,6 +62,7 @@ public class XceiverClientManager implements Closeable { private final Cache clientCache; private final boolean useRatis; + private static XceiverClientMetrics metrics; /** * Creates a new XceiverClientManager. * @@ -164,6 +165,10 @@ public void close() { //closing is done through RemovalListener clientCache.invalidateAll(); clientCache.cleanUp(); + + if (metrics != null) { + metrics.unRegister(); + } } /** @@ -197,4 +202,14 @@ public OzoneProtos.ReplicationType getType() { return OzoneProtos.ReplicationType.STAND_ALONE; } + /** + * Get xceiver client metric. + */ + public synchronized static XceiverClientMetrics getXceiverClientMetrics() { + if (metrics == null) { + metrics = XceiverClientMetrics.create(); + } + + return metrics; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java new file mode 100644 index 0000000000..6359db1dbf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.scm; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRate; + +/** + * The client metrics for the Storage Container protocol. + */ +@InterfaceAudience.Private +@Metrics(about = "Storage Container Client Metrics", context = "dfs") +public class XceiverClientMetrics { + public static final String SOURCE_NAME = XceiverClientMetrics.class + .getSimpleName(); + + private @Metric MutableCounterLong pendingOps; + private MutableCounterLong[] pendingOpsArray; + private MutableRate[] containerOpsLatency; + private MetricsRegistry registry; + + public XceiverClientMetrics() { + int numEnumEntries = ContainerProtos.Type.values().length; + this.registry = new MetricsRegistry(SOURCE_NAME); + + this.pendingOpsArray = new MutableCounterLong[numEnumEntries]; + this.containerOpsLatency = new MutableRate[numEnumEntries]; + for (int i = 0; i < numEnumEntries; i++) { + pendingOpsArray[i] = registry.newCounter( + "numPending" + ContainerProtos.Type.valueOf(i + 1), + "number of pending" + ContainerProtos.Type.valueOf(i + 1) + " ops", + (long) 0); + + containerOpsLatency[i] = registry.newRate( + ContainerProtos.Type.valueOf(i + 1) + "Latency", + "latency of " + ContainerProtos.Type.valueOf(i + 1) + + " ops"); + } + } + + public static XceiverClientMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(SOURCE_NAME, "Storage Container Client Metrics", + new XceiverClientMetrics()); + } + + public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) { + pendingOps.incr(); + pendingOpsArray[type.ordinal()].incr(); + } + + public void decrPendingContainerOpsMetrics(ContainerProtos.Type type) { + pendingOps.incr(-1); + pendingOpsArray[type.ordinal()].incr(-1); + } + + public void addContainerOpsLatency(ContainerProtos.Type type, + long latencyNanos) { + containerOpsLatency[type.ordinal()].add(latencyNanos); + } + + public long getContainerOpsMetrics(ContainerProtos.Type type) { + return pendingOpsArray[type.ordinal()].value(); + } + + public void unRegister() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(SOURCE_NAME); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md index 21e3474c3d..2548959bfc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md @@ -44,6 +44,17 @@ many times a specific operation has been performed. Eg.`NumCreateContainer` tells us how many times create container has been invoked on this datanode. +*Total number of pending operation* - This is an array which counts how +many times a specific operation is waitting to be processed from the client +point of view. +Eg.`NumPendingCreateContainer` tells us how many create container requests that +waitting to be processed. + +*Average latency of each pending operation in nanoseconds* - The average latency +of the operation from the client point of view. +Eg. `CreateContainerLatencyAvgTime` - This tells us the average latency of +Create Container from the client point of view. + *Number of bytes involved in a specific command* - This is an array that is maintained for all operations, but makes sense only for read and write operations. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java new file mode 100644 index 0000000000..7fd41a780f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.scm; + +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.XceiverClientMetrics; +import org.apache.hadoop.scm.XceiverClientSpi; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * This class tests the metrics of XceiverClient. + */ +public class TestXceiverClientMetrics { + private static OzoneConfiguration config; + private static MiniOzoneCluster cluster; + private static StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + + @BeforeClass + public static void init() throws IOException { + config = new OzoneConfiguration(); + cluster = new MiniOzoneCluster.Builder(config) + .numDataNodes(1) + .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); + storageContainerLocationClient = cluster + .createStorageContainerLocationClient(); + } + + @AfterClass + public static void shutdown() { + cluster.shutdown(); + } + + @Test + public void testMetrics() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + XceiverClientManager clientManager = new XceiverClientManager(conf); + + String containerName = "container" + RandomStringUtils.randomNumeric(10); + Pipeline pipeline = storageContainerLocationClient.allocateContainer( + clientManager.getType(), clientManager.getFactor(), containerName); + XceiverClientSpi client = clientManager.acquireClient(pipeline); + + ContainerCommandRequestProto request = ContainerTestHelper + .getCreateContainerRequest(containerName); + client.sendCommand(request); + + MetricsRecordBuilder containerMetrics = getMetrics( + XceiverClientMetrics.SOURCE_NAME); + // Above request command is in a synchronous way, so there will be no + // pending requests. + assertCounter("PendingOps", 0L, containerMetrics); + assertCounter("numPendingCreateContainer", 0L, containerMetrics); + // the counter value of average latency metric should be increased + assertCounter("CreateContainerLatencyNumOps", 1L, containerMetrics); + + List> computeResults + = new ArrayList<>(); + int numRequest = 10; + // start new thread to send async requests + Thread sendThread = new Thread(() -> { + while (true) { + try { + // use async interface for testing pending metrics + for (int i = 0; i < numRequest; i++) { + String keyName = OzoneUtils.getRequestID(); + ContainerProtos.ContainerCommandRequestProto smallFileRequest; + + smallFileRequest = ContainerTestHelper.getWriteSmallFileRequest( + client.getPipeline(), containerName, keyName, 1024); + CompletableFuture + response = client.sendCommandAsync(smallFileRequest); + computeResults.add(response); + } + Thread.sleep(1000); + } catch (Exception ignored) { + } + } + }); + sendThread.start(); + + GenericTestUtils.waitFor(() -> { + // check if pending metric count is increased + MetricsRecordBuilder metric = + getMetrics(XceiverClientMetrics.SOURCE_NAME); + long pendingOps = getLongCounter("PendingOps", metric); + long pendingPutSmallFileOps = + getLongCounter("numPendingPutSmallFile", metric); + return pendingOps > 0 && pendingPutSmallFileOps > 0; + }, 100, 60000); + sendThread.interrupt(); + + // Wait for all futures being done. + GenericTestUtils.waitFor(() -> { + for (CompletableFuture future : computeResults) { + if (!future.isDone()) { + return false; + } + } + + return true; + }, 100, 60000); + + // the counter value of pending metrics should be decreased to 0 + containerMetrics = getMetrics(XceiverClientMetrics.SOURCE_NAME); + containerMetrics = getMetrics(XceiverClientMetrics.SOURCE_NAME); + assertCounter("PendingOps", 0L, containerMetrics); + assertCounter("numPendingPutSmallFile", 0L, containerMetrics); + + clientManager.close(); + } +}