HDDS-313. Add metrics to containerState Machine. Contributed by chencan.

This commit is contained in:
Márton Elek 2018-08-27 15:42:22 +02:00
parent 12b2f362cc
commit 744ce200d2
3 changed files with 332 additions and 0 deletions

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
/**
* This class is for maintaining Container State Machine statistics.
*/
@InterfaceAudience.Private
@Metrics(about="Container State Machine Metrics", context="dfs")
public class CSMMetrics {
public static final String SOURCE_NAME =
CSMMetrics.class.getSimpleName();
// ratis op metrics metrics
private @Metric MutableCounterLong numWriteStateMachineOps;
private @Metric MutableCounterLong numReadStateMachineOps;
private @Metric MutableCounterLong numApplyTransactionOps;
// Failure Metrics
private @Metric MutableCounterLong numWriteStateMachineFails;
private @Metric MutableCounterLong numReadStateMachineFails;
private @Metric MutableCounterLong numApplyTransactionFails;
public CSMMetrics() {
}
public static CSMMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
return ms.register(SOURCE_NAME,
"Container State Machine",
new CSMMetrics());
}
public void incNumWriteStateMachineOps() {
numWriteStateMachineOps.incr();
}
public void incNumReadStateMachineOps() {
numReadStateMachineOps.incr();
}
public void incNumApplyTransactionsOps() {
numApplyTransactionOps.incr();
}
public void incNumWriteStateMachineFails() {
numWriteStateMachineFails.incr();
}
public void incNumReadStateMachineFails() {
numReadStateMachineFails.incr();
}
public void incNumApplyTransactionsFails() {
numApplyTransactionFails.incr();
}
@VisibleForTesting
public long getNumWriteStateMachineOps() {
return numWriteStateMachineOps.value();
}
@VisibleForTesting
public long getNumReadStateMachineOps() {
return numReadStateMachineOps.value();
}
@VisibleForTesting
public long getNumApplyTransactionsOps() {
return numApplyTransactionOps.value();
}
@VisibleForTesting
public long getNumWriteStateMachineFails() {
return numWriteStateMachineFails.value();
}
@VisibleForTesting
public long getNumReadStateMachineFails() {
return numReadStateMachineFails.value();
}
@VisibleForTesting
public long getNumApplyTransactionsFails() {
return numApplyTransactionFails.value();
}
public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
}
}

View File

@ -117,6 +117,10 @@ public class ContainerStateMachine extends BaseStateMachine {
private final ConcurrentHashMap<Long, CompletableFuture<Message>> private final ConcurrentHashMap<Long, CompletableFuture<Message>>
writeChunkFutureMap; writeChunkFutureMap;
private final ConcurrentHashMap<Long, StateMachineHelper> stateMachineMap; private final ConcurrentHashMap<Long, StateMachineHelper> stateMachineMap;
/**
* CSM metrics.
*/
private final CSMMetrics metrics;
public ContainerStateMachine(ContainerDispatcher dispatcher, public ContainerStateMachine(ContainerDispatcher dispatcher,
ThreadPoolExecutor chunkExecutor) { ThreadPoolExecutor chunkExecutor) {
@ -124,6 +128,7 @@ public ContainerStateMachine(ContainerDispatcher dispatcher,
this.chunkExecutor = chunkExecutor; this.chunkExecutor = chunkExecutor;
this.writeChunkFutureMap = new ConcurrentHashMap<>(); this.writeChunkFutureMap = new ConcurrentHashMap<>();
this.stateMachineMap = new ConcurrentHashMap<>(); this.stateMachineMap = new ConcurrentHashMap<>();
metrics = CSMMetrics.create();
} }
@Override @Override
@ -131,6 +136,10 @@ public StateMachineStorage getStateMachineStorage() {
return storage; return storage;
} }
public CSMMetrics getMetrics() {
return metrics;
}
@Override @Override
public void initialize( public void initialize(
RaftServer server, RaftGroupId id, RaftStorage raftStorage) RaftServer server, RaftGroupId id, RaftStorage raftStorage)
@ -220,6 +229,7 @@ private Message runCommand(ContainerCommandRequestProto requestProto) {
@Override @Override
public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) { public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
try { try {
metrics.incNumWriteStateMachineOps();
final ContainerCommandRequestProto requestProto = final ContainerCommandRequestProto requestProto =
getRequestProto(entry.getSmLogEntry().getStateMachineData()); getRequestProto(entry.getSmLogEntry().getStateMachineData());
Type cmdType = requestProto.getCmdType(); Type cmdType = requestProto.getCmdType();
@ -235,6 +245,7 @@ public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
} }
return stateMachineFuture; return stateMachineFuture;
} catch (IOException e) { } catch (IOException e) {
metrics.incNumWriteStateMachineFails();
return completeExceptionally(e); return completeExceptionally(e);
} }
} }
@ -242,10 +253,12 @@ public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
@Override @Override
public CompletableFuture<Message> query(Message request) { public CompletableFuture<Message> query(Message request) {
try { try {
metrics.incNumReadStateMachineOps();
final ContainerCommandRequestProto requestProto = final ContainerCommandRequestProto requestProto =
getRequestProto(request.getContent()); getRequestProto(request.getContent());
return CompletableFuture.completedFuture(runCommand(requestProto)); return CompletableFuture.completedFuture(runCommand(requestProto));
} catch (IOException e) { } catch (IOException e) {
metrics.incNumReadStateMachineFails();
return completeExceptionally(e); return completeExceptionally(e);
} }
} }
@ -347,6 +360,7 @@ public CompletableFuture<LogEntryProto> readStateMachineData(
@Override @Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) { public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
try { try {
metrics.incNumApplyTransactionsOps();
ContainerCommandRequestProto requestProto = ContainerCommandRequestProto requestProto =
getRequestProto(trx.getSMLogEntry().getData()); getRequestProto(trx.getSMLogEntry().getData());
Preconditions.checkState(!HddsUtils.isReadOnly(requestProto)); Preconditions.checkState(!HddsUtils.isReadOnly(requestProto));
@ -357,6 +371,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
return stateMachineMap.get(requestProto.getContainerID()) return stateMachineMap.get(requestProto.getContainerID())
.executeContainerCommand(requestProto, index); .executeContainerCommand(requestProto, index);
} catch (IOException e) { } catch (IOException e) {
metrics.incNumApplyTransactionsFails();
return completeExceptionally(e); return completeExceptionally(e);
} }
} }

View File

@ -0,0 +1,202 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.*;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.transport.server
.XceiverServerSpi;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.rpc.RpcType;
import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.util.CheckedBiConsumer;
import java.util.function.BiConsumer;
import org.junit.Test;
import org.junit.Assert;
/**
* This class tests the metrics of ContainerStateMachine.
*/
public class TestCSMMetrics {
static final String TEST_DIR
= GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator;
@FunctionalInterface
interface CheckedBiFunction<LEFT, RIGHT, OUT, THROWABLE extends Throwable> {
OUT apply(LEFT left, RIGHT right) throws THROWABLE;
}
@Test
public void testContainerStateMachineMetrics() throws Exception {
runContainerStateMachineMetrics(1,
(pipeline, conf) -> RatisTestHelper.initRatisConf(GRPC, conf),
XceiverClientRatis::newXceiverClientRatis,
TestCSMMetrics::newXceiverServerRatis,
(dn, p) -> initXceiverServerRatis(GRPC, dn, p));
}
static void runContainerStateMachineMetrics(
int numDatanodes,
BiConsumer<Pipeline, OzoneConfiguration> initConf,
TestCSMMetrics.CheckedBiFunction<Pipeline, OzoneConfiguration,
XceiverClientSpi, IOException> createClient,
TestCSMMetrics.CheckedBiFunction<DatanodeDetails, OzoneConfiguration,
XceiverServerSpi, IOException> createServer,
CheckedBiConsumer<DatanodeDetails, Pipeline, IOException> initServer)
throws Exception {
final List<XceiverServerSpi> servers = new ArrayList<>();
XceiverClientSpi client = null;
String containerName = OzoneUtils.getRequestID();
try {
final Pipeline pipeline = ContainerTestHelper.createPipeline(
numDatanodes);
final OzoneConfiguration conf = new OzoneConfiguration();
initConf.accept(pipeline, conf);
for (DatanodeDetails dn : pipeline.getMachines()) {
final XceiverServerSpi s = createServer.apply(dn, conf);
servers.add(s);
s.start();
initServer.accept(dn, pipeline);
}
client = createClient.apply(pipeline, conf);
client.connect();
// Before Read Chunk/Write Chunk
MetricsRecordBuilder metric = getMetrics(CSMMetrics.SOURCE_NAME);
assertCounter("NumWriteStateMachineOps", 0L, metric);
assertCounter("NumReadStateMachineOps", 0L, metric);
assertCounter("NumApplyTransactionOps", 0L, metric);
// Write Chunk
BlockID blockID = ContainerTestHelper.getTestBlockID(ContainerTestHelper.
getTestContainerID());
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
ContainerTestHelper.getWriteChunkRequest(
pipeline, blockID, 1024);
ContainerCommandResponseProto response =
client.sendCommand(writeChunkRequest);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
metric = getMetrics(CSMMetrics.SOURCE_NAME);
assertCounter("NumWriteStateMachineOps", 1L, metric);
assertCounter("NumApplyTransactionOps", 1L, metric);
//Read Chunk
ContainerProtos.ContainerCommandRequestProto readChunkRequest =
ContainerTestHelper.getReadChunkRequest(pipeline, writeChunkRequest
.getWriteChunk());
response = client.sendCommand(readChunkRequest);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
metric = getMetrics(CSMMetrics.SOURCE_NAME);
assertCounter("NumReadStateMachineOps", 1L, metric);
assertCounter("NumApplyTransactionOps", 1L, metric);
} finally {
if (client != null) {
client.close();
}
servers.stream().forEach(XceiverServerSpi::stop);
}
}
static XceiverServerRatis newXceiverServerRatis(
DatanodeDetails dn, OzoneConfiguration conf) throws IOException {
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue());
final String dir = TEST_DIR + dn.getUuid();
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
final ContainerDispatcher dispatcher = new TestContainerDispatcher();
return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher);
}
static void initXceiverServerRatis(
RpcType rpc, DatanodeDetails dd, Pipeline pipeline) throws IOException {
final RaftPeer p = RatisHelper.toRaftPeer(dd);
final RaftClient client = RatisHelper.newRaftClient(rpc, p);
RaftGroupId groupId = pipeline.getId().getRaftGroupID();
client.reinitialize(RatisHelper.newRaftGroup(groupId,
pipeline.getMachines()), p.getId());
}
private static class TestContainerDispatcher implements ContainerDispatcher {
/**
* Dispatches commands to container layer.
*
* @param msg - Command Request
* @return Command Response
*/
@Override
public ContainerCommandResponseProto dispatch(
ContainerCommandRequestProto msg) {
return ContainerTestHelper.getCreateContainerResponse(msg);
}
@Override
public void init() {
}
@Override
public void shutdown() {
}
@Override
public Handler getHandler(ContainerProtos.ContainerType containerType) {
return null;
}
@Override
public void setScmId(String scmId) {
}
}
}