HDDS-1728. Add metrics for leader's latency in ContainerStateMachine. Contributed by Mukul Kumar Singh. (#1022)
This commit is contained in:
parent
9c90729486
commit
ce91d35b29
@ -19,11 +19,14 @@
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||
import org.apache.ratis.protocol.RaftGroupId;
|
||||
|
||||
/**
|
||||
@ -43,14 +46,28 @@ public class CSMMetrics {
|
||||
private @Metric MutableCounterLong numBytesWrittenCount;
|
||||
private @Metric MutableCounterLong numBytesCommittedCount;
|
||||
|
||||
private @Metric MutableRate transactionLatency;
|
||||
private MutableRate[] opsLatency;
|
||||
private MetricsRegistry registry = null;
|
||||
|
||||
// Failure Metrics
|
||||
private @Metric MutableCounterLong numWriteStateMachineFails;
|
||||
private @Metric MutableCounterLong numQueryStateMachineFails;
|
||||
private @Metric MutableCounterLong numApplyTransactionFails;
|
||||
private @Metric MutableCounterLong numReadStateMachineFails;
|
||||
private @Metric MutableCounterLong numReadStateMachineMissCount;
|
||||
private @Metric MutableCounterLong numStartTransactionVerifyFailures;
|
||||
private @Metric MutableCounterLong numContainerNotOpenVerifyFailures;
|
||||
|
||||
public CSMMetrics() {
|
||||
int numCmdTypes = ContainerProtos.Type.values().length;
|
||||
this.opsLatency = new MutableRate[numCmdTypes];
|
||||
this.registry = new MetricsRegistry(CSMMetrics.class.getName());
|
||||
for (int i = 0; i < numCmdTypes; i++) {
|
||||
opsLatency[i] = registry.newRate(
|
||||
ContainerProtos.Type.forNumber(i + 1).toString(),
|
||||
ContainerProtos.Type.forNumber(i + 1) + " op");
|
||||
}
|
||||
}
|
||||
|
||||
public static CSMMetrics create(RaftGroupId gid) {
|
||||
@ -154,6 +171,19 @@ public long getNumBytesCommittedCount() {
|
||||
return numBytesCommittedCount.value();
|
||||
}
|
||||
|
||||
public void incPipelineLatency(ContainerProtos.Type type, long latencyNanos) {
|
||||
opsLatency[type.ordinal()].add(latencyNanos);
|
||||
transactionLatency.add(latencyNanos);
|
||||
}
|
||||
|
||||
public void incNumStartTransactionVerifyFailures() {
|
||||
numStartTransactionVerifyFailures.incr();
|
||||
}
|
||||
|
||||
public void incNumContainerNotOpenVerifyFailures() {
|
||||
numContainerNotOpenVerifyFailures.incr();
|
||||
}
|
||||
|
||||
|
||||
public void unRegister() {
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
|
@ -25,8 +25,10 @@
|
||||
import org.apache.hadoop.hdds.HddsUtils;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
|
||||
import org.apache.ratis.protocol.RaftGroupId;
|
||||
import org.apache.ratis.server.RaftServer;
|
||||
@ -261,12 +263,19 @@ public long takeSnapshot() throws IOException {
|
||||
@Override
|
||||
public TransactionContext startTransaction(RaftClientRequest request)
|
||||
throws IOException {
|
||||
long startTime = Time.monotonicNowNanos();
|
||||
final ContainerCommandRequestProto proto =
|
||||
getContainerCommandRequestProto(request.getMessage().getContent());
|
||||
Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
|
||||
try {
|
||||
dispatcher.validateContainerCommand(proto);
|
||||
} catch (IOException ioe) {
|
||||
if (ioe instanceof ContainerNotOpenException) {
|
||||
metrics.incNumContainerNotOpenVerifyFailures();
|
||||
} else {
|
||||
metrics.incNumStartTransactionVerifyFailures();
|
||||
LOG.error("startTransaction validation failed on leader", ioe);
|
||||
}
|
||||
TransactionContext ctxt = TransactionContext.newBuilder()
|
||||
.setClientRequest(request)
|
||||
.setStateMachine(this)
|
||||
@ -296,6 +305,7 @@ public TransactionContext startTransaction(RaftClientRequest request)
|
||||
.setClientRequest(request)
|
||||
.setStateMachine(this)
|
||||
.setServerRole(RaftPeerRole.LEADER)
|
||||
.setStateMachineContext(startTime)
|
||||
.setStateMachineData(write.getData())
|
||||
.setLogData(commitContainerCommandProto.toByteString())
|
||||
.build();
|
||||
@ -304,6 +314,7 @@ public TransactionContext startTransaction(RaftClientRequest request)
|
||||
.setClientRequest(request)
|
||||
.setStateMachine(this)
|
||||
.setServerRole(RaftPeerRole.LEADER)
|
||||
.setStateMachineContext(startTime)
|
||||
.setLogData(request.getMessage().getContent())
|
||||
.build();
|
||||
}
|
||||
@ -450,8 +461,10 @@ public CompletableFuture<Message> query(Message request) {
|
||||
}
|
||||
|
||||
private ByteString readStateMachineData(
|
||||
ContainerCommandRequestProto requestProto, long term, long index)
|
||||
throws IOException {
|
||||
ContainerCommandRequestProto requestProto, long term, long index) {
|
||||
// the stateMachine data is not present in the stateMachine cache,
|
||||
// increment the stateMachine cache miss count
|
||||
metrics.incNumReadStateMachineMissCount();
|
||||
WriteChunkRequestProto writeChunkRequestProto =
|
||||
requestProto.getWriteChunk();
|
||||
ContainerProtos.ChunkInfo chunkInfo = writeChunkRequestProto.getChunkData();
|
||||
@ -526,9 +539,6 @@ public CompletableFuture<ByteString> readStateMachineData(
|
||||
return CompletableFuture.completedFuture(ByteString.EMPTY);
|
||||
}
|
||||
try {
|
||||
// the stateMachine data is not present in the stateMachine cache,
|
||||
// increment the stateMachine cache miss count
|
||||
metrics.incNumReadStateMachineMissCount();
|
||||
final ContainerCommandRequestProto requestProto =
|
||||
getContainerCommandRequestProto(
|
||||
entry.getStateMachineLogEntry().getLogData());
|
||||
@ -621,6 +631,12 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
|
||||
getCommandExecutor(requestProto));
|
||||
|
||||
future.thenAccept(m -> {
|
||||
if (trx.getServerRole() == RaftPeerRole.LEADER) {
|
||||
long startTime = (long) trx.getStateMachineContext();
|
||||
metrics.incPipelineLatency(cmdType,
|
||||
Time.monotonicNowNanos() - startTime);
|
||||
}
|
||||
|
||||
final Long previous =
|
||||
applyTransactionCompletionMap
|
||||
.put(index, trx.getLogEntry().getTerm());
|
||||
|
@ -115,6 +115,9 @@ static void runContainerStateMachineMetrics(
|
||||
assertCounter("NumApplyTransactionOps", 0L, metric);
|
||||
assertCounter("NumBytesWrittenCount", 0L, metric);
|
||||
assertCounter("NumBytesCommittedCount", 0L, metric);
|
||||
assertCounter("NumStartTransactionVerifyFailures", 0L, metric);
|
||||
assertCounter("NumContainerNotOpenVerifyFailures", 0L, metric);
|
||||
assertCounter("WriteChunkNumOps", 0L, metric);
|
||||
|
||||
// Write Chunk
|
||||
BlockID blockID = ContainerTestHelper.getTestBlockID(ContainerTestHelper.
|
||||
@ -133,6 +136,9 @@ static void runContainerStateMachineMetrics(
|
||||
assertCounter("NumBytesWrittenCount", 1024L, metric);
|
||||
assertCounter("NumApplyTransactionOps", 1L, metric);
|
||||
assertCounter("NumBytesCommittedCount", 1024L, metric);
|
||||
assertCounter("NumStartTransactionVerifyFailures", 0L, metric);
|
||||
assertCounter("NumContainerNotOpenVerifyFailures", 0L, metric);
|
||||
assertCounter("WriteChunkNumOps", 1L, metric);
|
||||
|
||||
//Read Chunk
|
||||
ContainerProtos.ContainerCommandRequestProto readChunkRequest =
|
||||
|
Loading…
Reference in New Issue
Block a user