Revert "HDFS-12268. Ozone: Add metrics for pending storage container requests. Contributed by Yiqun Lin." as it uses class not part of JRE.
This reverts commit c29aff44cfc78a2839f15e8cde3b80f0a31c80dd.
This commit is contained in:
parent
eaf3732217
commit
b127ecd1ba
@ -18,8 +18,6 @@
|
|||||||
package org.apache.hadoop.scm;
|
package org.apache.hadoop.scm;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.sun.tools.javac.util.Pair;
|
|
||||||
|
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
@ -29,7 +27,6 @@
|
|||||||
.ContainerCommandResponseProto;
|
.ContainerCommandResponseProto;
|
||||||
|
|
||||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
import org.apache.hadoop.util.Time;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -39,6 +36,7 @@
|
|||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Netty client handler.
|
* Netty client handler.
|
||||||
*/
|
*/
|
||||||
@ -47,21 +45,18 @@ public class XceiverClientHandler extends
|
|||||||
|
|
||||||
static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class);
|
static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class);
|
||||||
private final ConcurrentMap<String,
|
private final ConcurrentMap<String,
|
||||||
Pair<CompletableFuture<ContainerCommandResponseProto>, Long>> responses =
|
CompletableFuture<ContainerCommandResponseProto>> responses =
|
||||||
new ConcurrentHashMap<>();
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final Pipeline pipeline;
|
private final Pipeline pipeline;
|
||||||
private volatile Channel channel;
|
private volatile Channel channel;
|
||||||
|
|
||||||
private XceiverClientMetrics metrics;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a client that can communicate to a container server.
|
* Constructs a client that can communicate to a container server.
|
||||||
*/
|
*/
|
||||||
public XceiverClientHandler(Pipeline pipeline) {
|
public XceiverClientHandler(Pipeline pipeline) {
|
||||||
super(false);
|
super(false);
|
||||||
this.pipeline = pipeline;
|
this.pipeline = pipeline;
|
||||||
this.metrics = XceiverClientManager.getXceiverClientMetrics();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -81,18 +76,11 @@ public void channelRead0(ChannelHandlerContext ctx,
|
|||||||
ContainerProtos.ContainerCommandResponseProto msg)
|
ContainerProtos.ContainerCommandResponseProto msg)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Preconditions.checkNotNull(msg);
|
Preconditions.checkNotNull(msg);
|
||||||
metrics.decrPendingContainerOpsMetrics(msg.getCmdType());
|
|
||||||
|
|
||||||
String key = msg.getTraceID();
|
String key = msg.getTraceID();
|
||||||
Pair<CompletableFuture<ContainerCommandResponseProto>, Long> future =
|
CompletableFuture<ContainerCommandResponseProto> future =
|
||||||
responses.remove(key);
|
responses.remove(key);
|
||||||
|
|
||||||
if (future != null) {
|
if (future != null) {
|
||||||
future.fst.complete(msg);
|
future.complete(msg);
|
||||||
|
|
||||||
long requestTime = future.snd;
|
|
||||||
metrics.addContainerOpsLatency(msg.getCmdType(),
|
|
||||||
Time.monotonicNowNanos() - requestTime);
|
|
||||||
} else {
|
} else {
|
||||||
LOG.error("A reply received for message that was not queued. trace " +
|
LOG.error("A reply received for message that was not queued. trace " +
|
||||||
"ID: {}", msg.getTraceID());
|
"ID: {}", msg.getTraceID());
|
||||||
@ -142,14 +130,11 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
|||||||
if(StringUtils.isEmpty(request.getTraceID())) {
|
if(StringUtils.isEmpty(request.getTraceID())) {
|
||||||
throw new IllegalArgumentException("Invalid trace ID");
|
throw new IllegalArgumentException("Invalid trace ID");
|
||||||
}
|
}
|
||||||
metrics.incrPendingContainerOpsMetrics(request.getCmdType());
|
|
||||||
|
|
||||||
CompletableFuture<ContainerCommandResponseProto> future
|
CompletableFuture<ContainerCommandResponseProto> response =
|
||||||
= new CompletableFuture<>();
|
new CompletableFuture<>();
|
||||||
Pair<CompletableFuture<ContainerCommandResponseProto>, Long> response =
|
|
||||||
new Pair<CompletableFuture<ContainerCommandResponseProto>,
|
CompletableFuture<ContainerCommandResponseProto> previous =
|
||||||
Long>(future, Time.monotonicNowNanos());
|
|
||||||
Pair<CompletableFuture<ContainerCommandResponseProto>, Long> previous =
|
|
||||||
responses.putIfAbsent(request.getTraceID(), response);
|
responses.putIfAbsent(request.getTraceID(), response);
|
||||||
|
|
||||||
if (previous != null) {
|
if (previous != null) {
|
||||||
@ -162,6 +147,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
channel.writeAndFlush(request);
|
channel.writeAndFlush(request);
|
||||||
return response.fst;
|
return response;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -62,7 +62,6 @@ public class XceiverClientManager implements Closeable {
|
|||||||
private final Cache<String, XceiverClientSpi> clientCache;
|
private final Cache<String, XceiverClientSpi> clientCache;
|
||||||
private final boolean useRatis;
|
private final boolean useRatis;
|
||||||
|
|
||||||
private static XceiverClientMetrics metrics;
|
|
||||||
/**
|
/**
|
||||||
* Creates a new XceiverClientManager.
|
* Creates a new XceiverClientManager.
|
||||||
*
|
*
|
||||||
@ -165,10 +164,6 @@ public void close() {
|
|||||||
//closing is done through RemovalListener
|
//closing is done through RemovalListener
|
||||||
clientCache.invalidateAll();
|
clientCache.invalidateAll();
|
||||||
clientCache.cleanUp();
|
clientCache.cleanUp();
|
||||||
|
|
||||||
if (metrics != null) {
|
|
||||||
metrics.unRegister();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -202,14 +197,4 @@ public OzoneProtos.ReplicationType getType() {
|
|||||||
return OzoneProtos.ReplicationType.STAND_ALONE;
|
return OzoneProtos.ReplicationType.STAND_ALONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get xceiver client metric.
|
|
||||||
*/
|
|
||||||
public synchronized static XceiverClientMetrics getXceiverClientMetrics() {
|
|
||||||
if (metrics == null) {
|
|
||||||
metrics = XceiverClientMetrics.create();
|
|
||||||
}
|
|
||||||
|
|
||||||
return metrics;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1,92 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.scm;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
|
||||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
|
||||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
||||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
|
||||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The client metrics for the Storage Container protocol.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
@Metrics(about = "Storage Container Client Metrics", context = "dfs")
|
|
||||||
public class XceiverClientMetrics {
|
|
||||||
public static final String SOURCE_NAME = XceiverClientMetrics.class
|
|
||||||
.getSimpleName();
|
|
||||||
|
|
||||||
private @Metric MutableCounterLong pendingOps;
|
|
||||||
private MutableCounterLong[] pendingOpsArray;
|
|
||||||
private MutableRate[] containerOpsLatency;
|
|
||||||
private MetricsRegistry registry;
|
|
||||||
|
|
||||||
public XceiverClientMetrics() {
|
|
||||||
int numEnumEntries = ContainerProtos.Type.values().length;
|
|
||||||
this.registry = new MetricsRegistry(SOURCE_NAME);
|
|
||||||
|
|
||||||
this.pendingOpsArray = new MutableCounterLong[numEnumEntries];
|
|
||||||
this.containerOpsLatency = new MutableRate[numEnumEntries];
|
|
||||||
for (int i = 0; i < numEnumEntries; i++) {
|
|
||||||
pendingOpsArray[i] = registry.newCounter(
|
|
||||||
"numPending" + ContainerProtos.Type.valueOf(i + 1),
|
|
||||||
"number of pending" + ContainerProtos.Type.valueOf(i + 1) + " ops",
|
|
||||||
(long) 0);
|
|
||||||
|
|
||||||
containerOpsLatency[i] = registry.newRate(
|
|
||||||
ContainerProtos.Type.valueOf(i + 1) + "Latency",
|
|
||||||
"latency of " + ContainerProtos.Type.valueOf(i + 1)
|
|
||||||
+ " ops");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static XceiverClientMetrics create() {
|
|
||||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
|
||||||
return ms.register(SOURCE_NAME, "Storage Container Client Metrics",
|
|
||||||
new XceiverClientMetrics());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) {
|
|
||||||
pendingOps.incr();
|
|
||||||
pendingOpsArray[type.ordinal()].incr();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void decrPendingContainerOpsMetrics(ContainerProtos.Type type) {
|
|
||||||
pendingOps.incr(-1);
|
|
||||||
pendingOpsArray[type.ordinal()].incr(-1);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void addContainerOpsLatency(ContainerProtos.Type type,
|
|
||||||
long latencyNanos) {
|
|
||||||
containerOpsLatency[type.ordinal()].add(latencyNanos);
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getContainerOpsMetrics(ContainerProtos.Type type) {
|
|
||||||
return pendingOpsArray[type.ordinal()].value();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void unRegister() {
|
|
||||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
|
||||||
ms.unregisterSource(SOURCE_NAME);
|
|
||||||
}
|
|
||||||
}
|
|
@ -44,17 +44,6 @@ many times a specific operation has been performed.
|
|||||||
Eg.`NumCreateContainer` tells us how many times create container has been
|
Eg.`NumCreateContainer` tells us how many times create container has been
|
||||||
invoked on this datanode.
|
invoked on this datanode.
|
||||||
|
|
||||||
*Total number of pending operation* - This is an array which counts how
|
|
||||||
many times a specific operation is waitting to be processed from the client
|
|
||||||
point of view.
|
|
||||||
Eg.`NumPendingCreateContainer` tells us how many create container requests that
|
|
||||||
waitting to be processed.
|
|
||||||
|
|
||||||
*Average latency of each pending operation in nanoseconds* - The average latency
|
|
||||||
of the operation from the client point of view.
|
|
||||||
Eg. `CreateContainerLatencyAvgTime` - This tells us the average latency of
|
|
||||||
Create Container from the client point of view.
|
|
||||||
|
|
||||||
*Number of bytes involved in a specific command* - This is an array that is
|
*Number of bytes involved in a specific command* - This is an array that is
|
||||||
maintained for all operations, but makes sense only for read and write
|
maintained for all operations, but makes sense only for read and write
|
||||||
operations.
|
operations.
|
||||||
|
@ -1,151 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.ozone.scm;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
|
|
||||||
import org.apache.commons.lang.RandomStringUtils;
|
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
|
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
|
|
||||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
|
||||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
|
||||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
|
||||||
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
|
||||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
|
||||||
import org.apache.hadoop.scm.XceiverClientManager;
|
|
||||||
import org.apache.hadoop.scm.XceiverClientMetrics;
|
|
||||||
import org.apache.hadoop.scm.XceiverClientSpi;
|
|
||||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
|
||||||
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
|
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class tests the metrics of XceiverClient.
|
|
||||||
*/
|
|
||||||
public class TestXceiverClientMetrics {
|
|
||||||
private static OzoneConfiguration config;
|
|
||||||
private static MiniOzoneCluster cluster;
|
|
||||||
private static StorageContainerLocationProtocolClientSideTranslatorPB
|
|
||||||
storageContainerLocationClient;
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void init() throws IOException {
|
|
||||||
config = new OzoneConfiguration();
|
|
||||||
cluster = new MiniOzoneCluster.Builder(config)
|
|
||||||
.numDataNodes(1)
|
|
||||||
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
|
|
||||||
storageContainerLocationClient = cluster
|
|
||||||
.createStorageContainerLocationClient();
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void shutdown() {
|
|
||||||
cluster.shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMetrics() throws Exception {
|
|
||||||
OzoneConfiguration conf = new OzoneConfiguration();
|
|
||||||
XceiverClientManager clientManager = new XceiverClientManager(conf);
|
|
||||||
|
|
||||||
String containerName = "container" + RandomStringUtils.randomNumeric(10);
|
|
||||||
Pipeline pipeline = storageContainerLocationClient.allocateContainer(
|
|
||||||
clientManager.getType(), clientManager.getFactor(), containerName);
|
|
||||||
XceiverClientSpi client = clientManager.acquireClient(pipeline);
|
|
||||||
|
|
||||||
ContainerCommandRequestProto request = ContainerTestHelper
|
|
||||||
.getCreateContainerRequest(containerName);
|
|
||||||
client.sendCommand(request);
|
|
||||||
|
|
||||||
MetricsRecordBuilder containerMetrics = getMetrics(
|
|
||||||
XceiverClientMetrics.SOURCE_NAME);
|
|
||||||
// Above request command is in a synchronous way, so there will be no
|
|
||||||
// pending requests.
|
|
||||||
assertCounter("PendingOps", 0L, containerMetrics);
|
|
||||||
assertCounter("numPendingCreateContainer", 0L, containerMetrics);
|
|
||||||
// the counter value of average latency metric should be increased
|
|
||||||
assertCounter("CreateContainerLatencyNumOps", 1L, containerMetrics);
|
|
||||||
|
|
||||||
List<CompletableFuture<ContainerCommandResponseProto>> computeResults
|
|
||||||
= new ArrayList<>();
|
|
||||||
int numRequest = 10;
|
|
||||||
// start new thread to send async requests
|
|
||||||
Thread sendThread = new Thread(() -> {
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
// use async interface for testing pending metrics
|
|
||||||
for (int i = 0; i < numRequest; i++) {
|
|
||||||
String keyName = OzoneUtils.getRequestID();
|
|
||||||
ContainerProtos.ContainerCommandRequestProto smallFileRequest;
|
|
||||||
|
|
||||||
smallFileRequest = ContainerTestHelper.getWriteSmallFileRequest(
|
|
||||||
client.getPipeline(), containerName, keyName, 1024);
|
|
||||||
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
|
|
||||||
response = client.sendCommandAsync(smallFileRequest);
|
|
||||||
computeResults.add(response);
|
|
||||||
}
|
|
||||||
Thread.sleep(1000);
|
|
||||||
} catch (Exception ignored) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
sendThread.start();
|
|
||||||
|
|
||||||
GenericTestUtils.waitFor(() -> {
|
|
||||||
// check if pending metric count is increased
|
|
||||||
MetricsRecordBuilder metric =
|
|
||||||
getMetrics(XceiverClientMetrics.SOURCE_NAME);
|
|
||||||
long pendingOps = getLongCounter("PendingOps", metric);
|
|
||||||
long pendingPutSmallFileOps =
|
|
||||||
getLongCounter("numPendingPutSmallFile", metric);
|
|
||||||
return pendingOps > 0 && pendingPutSmallFileOps > 0;
|
|
||||||
}, 100, 60000);
|
|
||||||
sendThread.interrupt();
|
|
||||||
|
|
||||||
// Wait for all futures being done.
|
|
||||||
GenericTestUtils.waitFor(() -> {
|
|
||||||
for (CompletableFuture future : computeResults) {
|
|
||||||
if (!future.isDone()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}, 100, 60000);
|
|
||||||
|
|
||||||
// the counter value of pending metrics should be decreased to 0
|
|
||||||
containerMetrics = getMetrics(XceiverClientMetrics.SOURCE_NAME);
|
|
||||||
containerMetrics = getMetrics(XceiverClientMetrics.SOURCE_NAME);
|
|
||||||
assertCounter("PendingOps", 0L, containerMetrics);
|
|
||||||
assertCounter("numPendingPutSmallFile", 0L, containerMetrics);
|
|
||||||
|
|
||||||
clientManager.close();
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user