From 744ce200d20a8f33b1dff1ad561843410c722501 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Elek?= Date: Mon, 27 Aug 2018 15:42:22 +0200 Subject: [PATCH] HDDS-313. Add metrics to containerState Machine. Contributed by chencan. --- .../transport/server/ratis/CSMMetrics.java | 115 ++++++++++ .../server/ratis/ContainerStateMachine.java | 15 ++ .../server/ratis/TestCSMMetrics.java | 202 ++++++++++++++++++ 3 files changed, 332 insertions(+) create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java new file mode 100644 index 0000000000..b6aed605a6 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.transport.server.ratis; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; + +/** + * This class is for maintaining Container State Machine statistics. + */ +@InterfaceAudience.Private +@Metrics(about="Container State Machine Metrics", context="dfs") +public class CSMMetrics { + public static final String SOURCE_NAME = + CSMMetrics.class.getSimpleName(); + + // ratis op metrics metrics + private @Metric MutableCounterLong numWriteStateMachineOps; + private @Metric MutableCounterLong numReadStateMachineOps; + private @Metric MutableCounterLong numApplyTransactionOps; + + // Failure Metrics + private @Metric MutableCounterLong numWriteStateMachineFails; + private @Metric MutableCounterLong numReadStateMachineFails; + private @Metric MutableCounterLong numApplyTransactionFails; + + public CSMMetrics() { + } + + public static CSMMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(SOURCE_NAME, + "Container State Machine", + new CSMMetrics()); + } + + public void incNumWriteStateMachineOps() { + numWriteStateMachineOps.incr(); + } + + public void incNumReadStateMachineOps() { + numReadStateMachineOps.incr(); + } + + public void incNumApplyTransactionsOps() { + numApplyTransactionOps.incr(); + } + + public void incNumWriteStateMachineFails() { + numWriteStateMachineFails.incr(); + } + + public void incNumReadStateMachineFails() { + numReadStateMachineFails.incr(); + } + + public void incNumApplyTransactionsFails() { + numApplyTransactionFails.incr(); + } + + @VisibleForTesting + public long getNumWriteStateMachineOps() { + return numWriteStateMachineOps.value(); + } + + @VisibleForTesting + public long getNumReadStateMachineOps() { + return numReadStateMachineOps.value(); + } + + @VisibleForTesting + public long getNumApplyTransactionsOps() { + return numApplyTransactionOps.value(); + } + + @VisibleForTesting + public long getNumWriteStateMachineFails() { + return numWriteStateMachineFails.value(); + } + + @VisibleForTesting + public long getNumReadStateMachineFails() { + return numReadStateMachineFails.value(); + } + + @VisibleForTesting + public long getNumApplyTransactionsFails() { + return numApplyTransactionFails.value(); + } + + public void unRegister() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(SOURCE_NAME); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 52ea3aa094..ede87f4840 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -117,6 +117,10 @@ public class ContainerStateMachine extends BaseStateMachine { private final ConcurrentHashMap> writeChunkFutureMap; private final ConcurrentHashMap stateMachineMap; + /** + * CSM metrics. + */ + private final CSMMetrics metrics; public ContainerStateMachine(ContainerDispatcher dispatcher, ThreadPoolExecutor chunkExecutor) { @@ -124,6 +128,7 @@ public ContainerStateMachine(ContainerDispatcher dispatcher, this.chunkExecutor = chunkExecutor; this.writeChunkFutureMap = new ConcurrentHashMap<>(); this.stateMachineMap = new ConcurrentHashMap<>(); + metrics = CSMMetrics.create(); } @Override @@ -131,6 +136,10 @@ public StateMachineStorage getStateMachineStorage() { return storage; } + public CSMMetrics getMetrics() { + return metrics; + } + @Override public void initialize( RaftServer server, RaftGroupId id, RaftStorage raftStorage) @@ -220,6 +229,7 @@ private Message runCommand(ContainerCommandRequestProto requestProto) { @Override public CompletableFuture writeStateMachineData(LogEntryProto entry) { try { + metrics.incNumWriteStateMachineOps(); final ContainerCommandRequestProto requestProto = getRequestProto(entry.getSmLogEntry().getStateMachineData()); Type cmdType = requestProto.getCmdType(); @@ -235,6 +245,7 @@ public CompletableFuture writeStateMachineData(LogEntryProto entry) { } return stateMachineFuture; } catch (IOException e) { + metrics.incNumWriteStateMachineFails(); return completeExceptionally(e); } } @@ -242,10 +253,12 @@ public CompletableFuture writeStateMachineData(LogEntryProto entry) { @Override public CompletableFuture query(Message request) { try { + metrics.incNumReadStateMachineOps(); final ContainerCommandRequestProto requestProto = getRequestProto(request.getContent()); return CompletableFuture.completedFuture(runCommand(requestProto)); } catch (IOException e) { + metrics.incNumReadStateMachineFails(); return completeExceptionally(e); } } @@ -347,6 +360,7 @@ public CompletableFuture readStateMachineData( @Override public CompletableFuture applyTransaction(TransactionContext trx) { try { + metrics.incNumApplyTransactionsOps(); ContainerCommandRequestProto requestProto = getRequestProto(trx.getSMLogEntry().getData()); Preconditions.checkState(!HddsUtils.isReadOnly(requestProto)); @@ -357,6 +371,7 @@ public CompletableFuture applyTransaction(TransactionContext trx) { return stateMachineMap.get(requestProto.getContainerID()) .executeContainerCommand(requestProto, index); } catch (IOException e) { + metrics.incNumApplyTransactionsFails(); return completeExceptionally(e); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java new file mode 100644 index 0000000000..8b324b5341 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.transport.server.ratis; + +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerCommandResponseProto; +import org.apache.hadoop.hdds.scm.*; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.RatisTestHelper; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; +import org.apache.hadoop.ozone.container.common.transport.server + .XceiverServerSpi; +import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; + +import org.apache.ratis.RatisHelper; +import org.apache.ratis.rpc.RpcType; +import static org.apache.ratis.rpc.SupportedRpcType.GRPC; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.util.CheckedBiConsumer; + +import java.util.function.BiConsumer; + +import org.junit.Test; +import org.junit.Assert; + +/** + * This class tests the metrics of ContainerStateMachine. + */ +public class TestCSMMetrics { + static final String TEST_DIR + = GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator; + + @FunctionalInterface + interface CheckedBiFunction { + OUT apply(LEFT left, RIGHT right) throws THROWABLE; + } + + @Test + public void testContainerStateMachineMetrics() throws Exception { + runContainerStateMachineMetrics(1, + (pipeline, conf) -> RatisTestHelper.initRatisConf(GRPC, conf), + XceiverClientRatis::newXceiverClientRatis, + TestCSMMetrics::newXceiverServerRatis, + (dn, p) -> initXceiverServerRatis(GRPC, dn, p)); + } + + static void runContainerStateMachineMetrics( + int numDatanodes, + BiConsumer initConf, + TestCSMMetrics.CheckedBiFunction createClient, + TestCSMMetrics.CheckedBiFunction createServer, + CheckedBiConsumer initServer) + throws Exception { + final List servers = new ArrayList<>(); + XceiverClientSpi client = null; + String containerName = OzoneUtils.getRequestID(); + try { + final Pipeline pipeline = ContainerTestHelper.createPipeline( + numDatanodes); + final OzoneConfiguration conf = new OzoneConfiguration(); + initConf.accept(pipeline, conf); + + for (DatanodeDetails dn : pipeline.getMachines()) { + final XceiverServerSpi s = createServer.apply(dn, conf); + servers.add(s); + s.start(); + initServer.accept(dn, pipeline); + } + + client = createClient.apply(pipeline, conf); + client.connect(); + + // Before Read Chunk/Write Chunk + MetricsRecordBuilder metric = getMetrics(CSMMetrics.SOURCE_NAME); + assertCounter("NumWriteStateMachineOps", 0L, metric); + assertCounter("NumReadStateMachineOps", 0L, metric); + assertCounter("NumApplyTransactionOps", 0L, metric); + + // Write Chunk + BlockID blockID = ContainerTestHelper.getTestBlockID(ContainerTestHelper. + getTestContainerID()); + ContainerProtos.ContainerCommandRequestProto writeChunkRequest = + ContainerTestHelper.getWriteChunkRequest( + pipeline, blockID, 1024); + ContainerCommandResponseProto response = + client.sendCommand(writeChunkRequest); + Assert.assertEquals(ContainerProtos.Result.SUCCESS, + response.getResult()); + + metric = getMetrics(CSMMetrics.SOURCE_NAME); + assertCounter("NumWriteStateMachineOps", 1L, metric); + assertCounter("NumApplyTransactionOps", 1L, metric); + + //Read Chunk + ContainerProtos.ContainerCommandRequestProto readChunkRequest = + ContainerTestHelper.getReadChunkRequest(pipeline, writeChunkRequest + .getWriteChunk()); + response = client.sendCommand(readChunkRequest); + Assert.assertEquals(ContainerProtos.Result.SUCCESS, + response.getResult()); + + metric = getMetrics(CSMMetrics.SOURCE_NAME); + assertCounter("NumReadStateMachineOps", 1L, metric); + assertCounter("NumApplyTransactionOps", 1L, metric); + } finally { + if (client != null) { + client.close(); + } + servers.stream().forEach(XceiverServerSpi::stop); + } + } + + static XceiverServerRatis newXceiverServerRatis( + DatanodeDetails dn, OzoneConfiguration conf) throws IOException { + conf.setInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT, + dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue()); + final String dir = TEST_DIR + dn.getUuid(); + conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir); + + final ContainerDispatcher dispatcher = new TestContainerDispatcher(); + return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher); + } + + static void initXceiverServerRatis( + RpcType rpc, DatanodeDetails dd, Pipeline pipeline) throws IOException { + final RaftPeer p = RatisHelper.toRaftPeer(dd); + final RaftClient client = RatisHelper.newRaftClient(rpc, p); + RaftGroupId groupId = pipeline.getId().getRaftGroupID(); + client.reinitialize(RatisHelper.newRaftGroup(groupId, + pipeline.getMachines()), p.getId()); + } + + private static class TestContainerDispatcher implements ContainerDispatcher { + /** + * Dispatches commands to container layer. + * + * @param msg - Command Request + * @return Command Response + */ + @Override + public ContainerCommandResponseProto dispatch( + ContainerCommandRequestProto msg) { + return ContainerTestHelper.getCreateContainerResponse(msg); + } + + @Override + public void init() { + } + + @Override + public void shutdown() { + } + + @Override + public Handler getHandler(ContainerProtos.ContainerType containerType) { + return null; + } + + @Override + public void setScmId(String scmId) { + + } + } +}