HDFS-12268. Ozone: Add metrics for pending storage container requests. Contributed by Yiqun Lin.

This commit is contained in:
Chen Liang 2017-09-27 13:49:31 -07:00 committed by Owen O'Malley
parent 94090acf3a
commit 9530153f33
5 changed files with 333 additions and 14 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.scm; package org.apache.hadoop.scm;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
@ -27,6 +28,7 @@
.ContainerCommandResponseProto; .ContainerCommandResponseProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -36,7 +38,6 @@
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
/** /**
* Netty client handler. * Netty client handler.
*/ */
@ -44,19 +45,21 @@ public class XceiverClientHandler extends
SimpleChannelInboundHandler<ContainerCommandResponseProto> { SimpleChannelInboundHandler<ContainerCommandResponseProto> {
static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class); static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class);
private final ConcurrentMap<String, private final ConcurrentMap<String, ResponseFuture> responses =
CompletableFuture<ContainerCommandResponseProto>> responses =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private final Pipeline pipeline; private final Pipeline pipeline;
private volatile Channel channel; private volatile Channel channel;
private XceiverClientMetrics metrics;
/** /**
* Constructs a client that can communicate to a container server. * Constructs a client that can communicate to a container server.
*/ */
public XceiverClientHandler(Pipeline pipeline) { public XceiverClientHandler(Pipeline pipeline) {
super(false); super(false);
this.pipeline = pipeline; this.pipeline = pipeline;
this.metrics = XceiverClientManager.getXceiverClientMetrics();
} }
/** /**
@ -76,11 +79,17 @@ public void channelRead0(ChannelHandlerContext ctx,
ContainerProtos.ContainerCommandResponseProto msg) ContainerProtos.ContainerCommandResponseProto msg)
throws Exception { throws Exception {
Preconditions.checkNotNull(msg); Preconditions.checkNotNull(msg);
metrics.decrPendingContainerOpsMetrics(msg.getCmdType());
String key = msg.getTraceID(); String key = msg.getTraceID();
CompletableFuture<ContainerCommandResponseProto> future = ResponseFuture response = responses.remove(key);
responses.remove(key);
if (future != null) { if (response != null) {
future.complete(msg); response.getFuture().complete(msg);
long requestTime = response.getRequestTime();
metrics.addContainerOpsLatency(msg.getCmdType(),
Time.monotonicNowNanos() - requestTime);
} else { } else {
LOG.error("A reply received for message that was not queued. trace " + LOG.error("A reply received for message that was not queued. trace " +
"ID: {}", msg.getTraceID()); "ID: {}", msg.getTraceID());
@ -130,13 +139,14 @@ public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
if (StringUtils.isEmpty(request.getTraceID())) { if (StringUtils.isEmpty(request.getTraceID())) {
throw new IllegalArgumentException("Invalid trace ID"); throw new IllegalArgumentException("Invalid trace ID");
} }
metrics.incrPendingContainerOpsMetrics(request.getCmdType());
CompletableFuture<ContainerCommandResponseProto> response = CompletableFuture<ContainerCommandResponseProto> future
new CompletableFuture<>(); = new CompletableFuture<>();
ResponseFuture response = new ResponseFuture(future,
CompletableFuture<ContainerCommandResponseProto> previous = Time.monotonicNowNanos());
responses.putIfAbsent(request.getTraceID(), response); ResponseFuture previous = responses.putIfAbsent(
request.getTraceID(), response);
if (previous != null) { if (previous != null) {
LOG.error("Command with Trace already exists. Ignoring this command. " + LOG.error("Command with Trace already exists. Ignoring this command. " +
"{}. Previous Command: {}", request.getTraceID(), "{}. Previous Command: {}", request.getTraceID(),
@ -147,6 +157,28 @@ public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
} }
channel.writeAndFlush(request); channel.writeAndFlush(request);
return response; return response.getFuture();
}
/**
* Class wraps response future info.
*/
static class ResponseFuture {
private final long requestTime;
private final CompletableFuture<ContainerCommandResponseProto> future;
ResponseFuture(CompletableFuture<ContainerCommandResponseProto> future,
long requestTime) {
this.future = future;
this.requestTime = requestTime;
}
public long getRequestTime() {
return requestTime;
}
public CompletableFuture<ContainerCommandResponseProto> getFuture() {
return future;
}
} }
} }

View File

@ -62,6 +62,7 @@ public class XceiverClientManager implements Closeable {
private final Cache<String, XceiverClientSpi> clientCache; private final Cache<String, XceiverClientSpi> clientCache;
private final boolean useRatis; private final boolean useRatis;
private static XceiverClientMetrics metrics;
/** /**
* Creates a new XceiverClientManager. * Creates a new XceiverClientManager.
* *
@ -164,6 +165,10 @@ public void close() {
//closing is done through RemovalListener //closing is done through RemovalListener
clientCache.invalidateAll(); clientCache.invalidateAll();
clientCache.cleanUp(); clientCache.cleanUp();
if (metrics != null) {
metrics.unRegister();
}
} }
/** /**
@ -197,4 +202,14 @@ public OzoneProtos.ReplicationType getType() {
return OzoneProtos.ReplicationType.STAND_ALONE; return OzoneProtos.ReplicationType.STAND_ALONE;
} }
/**
* Get xceiver client metric.
*/
public synchronized static XceiverClientMetrics getXceiverClientMetrics() {
if (metrics == null) {
metrics = XceiverClientMetrics.create();
}
return metrics;
}
} }

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.scm;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
/**
* The client metrics for the Storage Container protocol.
*/
@InterfaceAudience.Private
@Metrics(about = "Storage Container Client Metrics", context = "dfs")
public class XceiverClientMetrics {
public static final String SOURCE_NAME = XceiverClientMetrics.class
.getSimpleName();
private @Metric MutableCounterLong pendingOps;
private MutableCounterLong[] pendingOpsArray;
private MutableRate[] containerOpsLatency;
private MetricsRegistry registry;
public XceiverClientMetrics() {
int numEnumEntries = ContainerProtos.Type.values().length;
this.registry = new MetricsRegistry(SOURCE_NAME);
this.pendingOpsArray = new MutableCounterLong[numEnumEntries];
this.containerOpsLatency = new MutableRate[numEnumEntries];
for (int i = 0; i < numEnumEntries; i++) {
pendingOpsArray[i] = registry.newCounter(
"numPending" + ContainerProtos.Type.valueOf(i + 1),
"number of pending" + ContainerProtos.Type.valueOf(i + 1) + " ops",
(long) 0);
containerOpsLatency[i] = registry.newRate(
ContainerProtos.Type.valueOf(i + 1) + "Latency",
"latency of " + ContainerProtos.Type.valueOf(i + 1)
+ " ops");
}
}
public static XceiverClientMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
return ms.register(SOURCE_NAME, "Storage Container Client Metrics",
new XceiverClientMetrics());
}
public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) {
pendingOps.incr();
pendingOpsArray[type.ordinal()].incr();
}
public void decrPendingContainerOpsMetrics(ContainerProtos.Type type) {
pendingOps.incr(-1);
pendingOpsArray[type.ordinal()].incr(-1);
}
public void addContainerOpsLatency(ContainerProtos.Type type,
long latencyNanos) {
containerOpsLatency[type.ordinal()].add(latencyNanos);
}
public long getContainerOpsMetrics(ContainerProtos.Type type) {
return pendingOpsArray[type.ordinal()].value();
}
public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
}
}

View File

@ -44,6 +44,17 @@ many times a specific operation has been performed.
Eg.`NumCreateContainer` tells us how many times create container has been Eg.`NumCreateContainer` tells us how many times create container has been
invoked on this datanode. invoked on this datanode.
*Total number of pending operation* - This is an array which counts how
many times a specific operation is waitting to be processed from the client
point of view.
Eg.`NumPendingCreateContainer` tells us how many create container requests that
waitting to be processed.
*Average latency of each pending operation in nanoseconds* - The average latency
of the operation from the client point of view.
Eg. `CreateContainerLatencyAvgTime` - This tells us the average latency of
Create Container from the client point of view.
*Number of bytes involved in a specific command* - This is an array that is *Number of bytes involved in a specific command* - This is an array that is
maintained for all operations, but makes sense only for read and write maintained for all operations, but makes sense only for read and write
operations. operations.

View File

@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientMetrics;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* This class tests the metrics of XceiverClient.
*/
public class TestXceiverClientMetrics {
// only for testing
private volatile boolean breakFlag;
private CountDownLatch latch;
private static OzoneConfiguration config;
private static MiniOzoneCluster cluster;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
@BeforeClass
public static void init() throws IOException {
config = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(config)
.numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageContainerLocationClient = cluster
.createStorageContainerLocationClient();
}
@AfterClass
public static void shutdown() {
cluster.shutdown();
}
@Test
public void testMetrics() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
XceiverClientManager clientManager = new XceiverClientManager(conf);
String containerName = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline = storageContainerLocationClient.allocateContainer(
clientManager.getType(), clientManager.getFactor(), containerName);
XceiverClientSpi client = clientManager.acquireClient(pipeline);
ContainerCommandRequestProto request = ContainerTestHelper
.getCreateContainerRequest(containerName);
client.sendCommand(request);
MetricsRecordBuilder containerMetrics = getMetrics(
XceiverClientMetrics.SOURCE_NAME);
// Above request command is in a synchronous way, so there will be no
// pending requests.
assertCounter("PendingOps", 0L, containerMetrics);
assertCounter("numPendingCreateContainer", 0L, containerMetrics);
// the counter value of average latency metric should be increased
assertCounter("CreateContainerLatencyNumOps", 1L, containerMetrics);
breakFlag = false;
latch = new CountDownLatch(1);
int numRequest = 10;
List<CompletableFuture<ContainerCommandResponseProto>> computeResults
= new ArrayList<>();
// start new thread to send async requests
Thread sendThread = new Thread(() -> {
while (!breakFlag) {
try {
// use async interface for testing pending metrics
for (int i = 0; i < numRequest; i++) {
String keyName = OzoneUtils.getRequestID();
ContainerProtos.ContainerCommandRequestProto smallFileRequest;
smallFileRequest = ContainerTestHelper.getWriteSmallFileRequest(
client.getPipeline(), containerName, keyName, 1024);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
response = client.sendCommandAsync(smallFileRequest);
computeResults.add(response);
}
Thread.sleep(1000);
} catch (Exception ignored) {
}
}
latch.countDown();
});
sendThread.start();
GenericTestUtils.waitFor(() -> {
// check if pending metric count is increased
MetricsRecordBuilder metric =
getMetrics(XceiverClientMetrics.SOURCE_NAME);
long pendingOps = getLongCounter("PendingOps", metric);
long pendingPutSmallFileOps =
getLongCounter("numPendingPutSmallFile", metric);
if (pendingOps > 0 && pendingPutSmallFileOps > 0) {
// reset break flag
breakFlag = true;
return true;
} else {
return false;
}
}, 100, 60000);
// blocking until we stop sending async requests
latch.await();
// Wait for all futures being done.
GenericTestUtils.waitFor(() -> {
for (CompletableFuture future : computeResults) {
if (!future.isDone()) {
return false;
}
}
return true;
}, 100, 60000);
// the counter value of pending metrics should be decreased to 0
containerMetrics = getMetrics(XceiverClientMetrics.SOURCE_NAME);
assertCounter("PendingOps", 0L, containerMetrics);
assertCounter("numPendingPutSmallFile", 0L, containerMetrics);
clientManager.close();
}
}