diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java index 38fc8f3b17..93d4438e99 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java @@ -18,8 +18,6 @@ package org.apache.hadoop.scm; import com.google.common.base.Preconditions; -import com.sun.tools.javac.util.Pair; - import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -29,7 +27,6 @@ .ContainerCommandResponseProto; import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +36,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; + /** * Netty client handler. */ @@ -47,21 +45,18 @@ public class XceiverClientHandler extends static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class); private final ConcurrentMap, Long>> responses = + CompletableFuture> responses = new ConcurrentHashMap<>(); private final Pipeline pipeline; private volatile Channel channel; - private XceiverClientMetrics metrics; - /** * Constructs a client that can communicate to a container server. */ public XceiverClientHandler(Pipeline pipeline) { super(false); this.pipeline = pipeline; - this.metrics = XceiverClientManager.getXceiverClientMetrics(); } /** @@ -81,18 +76,11 @@ public void channelRead0(ChannelHandlerContext ctx, ContainerProtos.ContainerCommandResponseProto msg) throws Exception { Preconditions.checkNotNull(msg); - metrics.decrPendingContainerOpsMetrics(msg.getCmdType()); - String key = msg.getTraceID(); - Pair, Long> future = + CompletableFuture future = responses.remove(key); - if (future != null) { - future.fst.complete(msg); - - long requestTime = future.snd; - metrics.addContainerOpsLatency(msg.getCmdType(), - Time.monotonicNowNanos() - requestTime); + future.complete(msg); } else { LOG.error("A reply received for message that was not queued. trace " + "ID: {}", msg.getTraceID()); @@ -142,14 +130,11 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { if(StringUtils.isEmpty(request.getTraceID())) { throw new IllegalArgumentException("Invalid trace ID"); } - metrics.incrPendingContainerOpsMetrics(request.getCmdType()); - CompletableFuture future - = new CompletableFuture<>(); - Pair, Long> response = - new Pair, - Long>(future, Time.monotonicNowNanos()); - Pair, Long> previous = + CompletableFuture response = + new CompletableFuture<>(); + + CompletableFuture previous = responses.putIfAbsent(request.getTraceID(), response); if (previous != null) { @@ -162,6 +147,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } channel.writeAndFlush(request); - return response.fst; + return response; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java index 8174e8471e..508c004d71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java @@ -62,7 +62,6 @@ public class XceiverClientManager implements Closeable { private final Cache clientCache; private final boolean useRatis; - private static XceiverClientMetrics metrics; /** * Creates a new XceiverClientManager. * @@ -165,10 +164,6 @@ public void close() { //closing is done through RemovalListener clientCache.invalidateAll(); clientCache.cleanUp(); - - if (metrics != null) { - metrics.unRegister(); - } } /** @@ -202,14 +197,4 @@ public OzoneProtos.ReplicationType getType() { return OzoneProtos.ReplicationType.STAND_ALONE; } - /** - * Get xceiver client metric. - */ - public synchronized static XceiverClientMetrics getXceiverClientMetrics() { - if (metrics == null) { - metrics = XceiverClientMetrics.create(); - } - - return metrics; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java deleted file mode 100644 index 6359db1dbf..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.scm; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; -import org.apache.hadoop.metrics2.MetricsSystem; -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.lib.MetricsRegistry; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; -import org.apache.hadoop.metrics2.lib.MutableRate; - -/** - * The client metrics for the Storage Container protocol. - */ -@InterfaceAudience.Private -@Metrics(about = "Storage Container Client Metrics", context = "dfs") -public class XceiverClientMetrics { - public static final String SOURCE_NAME = XceiverClientMetrics.class - .getSimpleName(); - - private @Metric MutableCounterLong pendingOps; - private MutableCounterLong[] pendingOpsArray; - private MutableRate[] containerOpsLatency; - private MetricsRegistry registry; - - public XceiverClientMetrics() { - int numEnumEntries = ContainerProtos.Type.values().length; - this.registry = new MetricsRegistry(SOURCE_NAME); - - this.pendingOpsArray = new MutableCounterLong[numEnumEntries]; - this.containerOpsLatency = new MutableRate[numEnumEntries]; - for (int i = 0; i < numEnumEntries; i++) { - pendingOpsArray[i] = registry.newCounter( - "numPending" + ContainerProtos.Type.valueOf(i + 1), - "number of pending" + ContainerProtos.Type.valueOf(i + 1) + " ops", - (long) 0); - - containerOpsLatency[i] = registry.newRate( - ContainerProtos.Type.valueOf(i + 1) + "Latency", - "latency of " + ContainerProtos.Type.valueOf(i + 1) - + " ops"); - } - } - - public static XceiverClientMetrics create() { - MetricsSystem ms = DefaultMetricsSystem.instance(); - return ms.register(SOURCE_NAME, "Storage Container Client Metrics", - new XceiverClientMetrics()); - } - - public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) { - pendingOps.incr(); - pendingOpsArray[type.ordinal()].incr(); - } - - public void decrPendingContainerOpsMetrics(ContainerProtos.Type type) { - pendingOps.incr(-1); - pendingOpsArray[type.ordinal()].incr(-1); - } - - public void addContainerOpsLatency(ContainerProtos.Type type, - long latencyNanos) { - containerOpsLatency[type.ordinal()].add(latencyNanos); - } - - public long getContainerOpsMetrics(ContainerProtos.Type type) { - return pendingOpsArray[type.ordinal()].value(); - } - - public void unRegister() { - MetricsSystem ms = DefaultMetricsSystem.instance(); - ms.unregisterSource(SOURCE_NAME); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md index 2548959bfc..21e3474c3d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md @@ -44,17 +44,6 @@ many times a specific operation has been performed. Eg.`NumCreateContainer` tells us how many times create container has been invoked on this datanode. -*Total number of pending operation* - This is an array which counts how -many times a specific operation is waitting to be processed from the client -point of view. -Eg.`NumPendingCreateContainer` tells us how many create container requests that -waitting to be processed. - -*Average latency of each pending operation in nanoseconds* - The average latency -of the operation from the client point of view. -Eg. `CreateContainerLatencyAvgTime` - This tells us the average latency of -Create Container from the client point of view. - *Number of bytes involved in a specific command* - This is an array that is maintained for all operations, but makes sense only for read and write operations. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java deleted file mode 100644 index 7fd41a780f..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.scm; - -import static org.apache.hadoop.test.MetricsAsserts.assertCounter; -import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; -import static org.apache.hadoop.test.MetricsAsserts.getMetrics; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -import org.apache.commons.lang.RandomStringUtils; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; -import org.apache.hadoop.metrics2.MetricsRecordBuilder; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConfiguration; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.ContainerTestHelper; -import org.apache.hadoop.ozone.web.utils.OzoneUtils; -import org.apache.hadoop.scm.XceiverClientManager; -import org.apache.hadoop.scm.XceiverClientMetrics; -import org.apache.hadoop.scm.XceiverClientSpi; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.test.GenericTestUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * This class tests the metrics of XceiverClient. - */ -public class TestXceiverClientMetrics { - private static OzoneConfiguration config; - private static MiniOzoneCluster cluster; - private static StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient; - - @BeforeClass - public static void init() throws IOException { - config = new OzoneConfiguration(); - cluster = new MiniOzoneCluster.Builder(config) - .numDataNodes(1) - .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); - storageContainerLocationClient = cluster - .createStorageContainerLocationClient(); - } - - @AfterClass - public static void shutdown() { - cluster.shutdown(); - } - - @Test - public void testMetrics() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - XceiverClientManager clientManager = new XceiverClientManager(conf); - - String containerName = "container" + RandomStringUtils.randomNumeric(10); - Pipeline pipeline = storageContainerLocationClient.allocateContainer( - clientManager.getType(), clientManager.getFactor(), containerName); - XceiverClientSpi client = clientManager.acquireClient(pipeline); - - ContainerCommandRequestProto request = ContainerTestHelper - .getCreateContainerRequest(containerName); - client.sendCommand(request); - - MetricsRecordBuilder containerMetrics = getMetrics( - XceiverClientMetrics.SOURCE_NAME); - // Above request command is in a synchronous way, so there will be no - // pending requests. - assertCounter("PendingOps", 0L, containerMetrics); - assertCounter("numPendingCreateContainer", 0L, containerMetrics); - // the counter value of average latency metric should be increased - assertCounter("CreateContainerLatencyNumOps", 1L, containerMetrics); - - List> computeResults - = new ArrayList<>(); - int numRequest = 10; - // start new thread to send async requests - Thread sendThread = new Thread(() -> { - while (true) { - try { - // use async interface for testing pending metrics - for (int i = 0; i < numRequest; i++) { - String keyName = OzoneUtils.getRequestID(); - ContainerProtos.ContainerCommandRequestProto smallFileRequest; - - smallFileRequest = ContainerTestHelper.getWriteSmallFileRequest( - client.getPipeline(), containerName, keyName, 1024); - CompletableFuture - response = client.sendCommandAsync(smallFileRequest); - computeResults.add(response); - } - Thread.sleep(1000); - } catch (Exception ignored) { - } - } - }); - sendThread.start(); - - GenericTestUtils.waitFor(() -> { - // check if pending metric count is increased - MetricsRecordBuilder metric = - getMetrics(XceiverClientMetrics.SOURCE_NAME); - long pendingOps = getLongCounter("PendingOps", metric); - long pendingPutSmallFileOps = - getLongCounter("numPendingPutSmallFile", metric); - return pendingOps > 0 && pendingPutSmallFileOps > 0; - }, 100, 60000); - sendThread.interrupt(); - - // Wait for all futures being done. - GenericTestUtils.waitFor(() -> { - for (CompletableFuture future : computeResults) { - if (!future.isDone()) { - return false; - } - } - - return true; - }, 100, 60000); - - // the counter value of pending metrics should be decreased to 0 - containerMetrics = getMetrics(XceiverClientMetrics.SOURCE_NAME); - containerMetrics = getMetrics(XceiverClientMetrics.SOURCE_NAME); - assertCounter("PendingOps", 0L, containerMetrics); - assertCounter("numPendingPutSmallFile", 0L, containerMetrics); - - clientManager.close(); - } -}