HDDS-1502. Add metrics for Ozone Ratis performance.Contributed by Shashikant Banerjee(#833).
This commit is contained in:
parent
2b303e9d5f
commit
18c1eebc08
@ -37,13 +37,18 @@ public class CSMMetrics {
|
|||||||
|
|
||||||
// ratis op metrics metrics
|
// ratis op metrics metrics
|
||||||
private @Metric MutableCounterLong numWriteStateMachineOps;
|
private @Metric MutableCounterLong numWriteStateMachineOps;
|
||||||
private @Metric MutableCounterLong numReadStateMachineOps;
|
private @Metric MutableCounterLong numQueryStateMachineOps;
|
||||||
private @Metric MutableCounterLong numApplyTransactionOps;
|
private @Metric MutableCounterLong numApplyTransactionOps;
|
||||||
|
private @Metric MutableCounterLong numReadStateMachineOps;
|
||||||
|
private @Metric MutableCounterLong numBytesWrittenCount;
|
||||||
|
private @Metric MutableCounterLong numBytesCommittedCount;
|
||||||
|
|
||||||
// Failure Metrics
|
// Failure Metrics
|
||||||
private @Metric MutableCounterLong numWriteStateMachineFails;
|
private @Metric MutableCounterLong numWriteStateMachineFails;
|
||||||
private @Metric MutableCounterLong numReadStateMachineFails;
|
private @Metric MutableCounterLong numQueryStateMachineFails;
|
||||||
private @Metric MutableCounterLong numApplyTransactionFails;
|
private @Metric MutableCounterLong numApplyTransactionFails;
|
||||||
|
private @Metric MutableCounterLong numReadStateMachineFails;
|
||||||
|
private @Metric MutableCounterLong numReadStateMachineMissCount;
|
||||||
|
|
||||||
public CSMMetrics() {
|
public CSMMetrics() {
|
||||||
}
|
}
|
||||||
@ -59,6 +64,10 @@ public void incNumWriteStateMachineOps() {
|
|||||||
numWriteStateMachineOps.incr();
|
numWriteStateMachineOps.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incNumQueryStateMachineOps() {
|
||||||
|
numQueryStateMachineOps.incr();
|
||||||
|
}
|
||||||
|
|
||||||
public void incNumReadStateMachineOps() {
|
public void incNumReadStateMachineOps() {
|
||||||
numReadStateMachineOps.incr();
|
numReadStateMachineOps.incr();
|
||||||
}
|
}
|
||||||
@ -71,10 +80,26 @@ public void incNumWriteStateMachineFails() {
|
|||||||
numWriteStateMachineFails.incr();
|
numWriteStateMachineFails.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incNumQueryStateMachineFails() {
|
||||||
|
numQueryStateMachineFails.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incNumBytesWrittenCount(long value) {
|
||||||
|
numBytesWrittenCount.incr(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incNumBytesCommittedCount(long value) {
|
||||||
|
numBytesCommittedCount.incr(value);
|
||||||
|
}
|
||||||
|
|
||||||
public void incNumReadStateMachineFails() {
|
public void incNumReadStateMachineFails() {
|
||||||
numReadStateMachineFails.incr();
|
numReadStateMachineFails.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incNumReadStateMachineMissCount() {
|
||||||
|
numReadStateMachineMissCount.incr();
|
||||||
|
}
|
||||||
|
|
||||||
public void incNumApplyTransactionsFails() {
|
public void incNumApplyTransactionsFails() {
|
||||||
numApplyTransactionFails.incr();
|
numApplyTransactionFails.incr();
|
||||||
}
|
}
|
||||||
@ -85,8 +110,8 @@ public long getNumWriteStateMachineOps() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public long getNumReadStateMachineOps() {
|
public long getNumQueryStateMachineOps() {
|
||||||
return numReadStateMachineOps.value();
|
return numQueryStateMachineOps.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
@ -100,8 +125,8 @@ public long getNumWriteStateMachineFails() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public long getNumReadStateMachineFails() {
|
public long getNumQueryStateMachineFails() {
|
||||||
return numReadStateMachineFails.value();
|
return numQueryStateMachineFails.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
@ -109,6 +134,27 @@ public long getNumApplyTransactionsFails() {
|
|||||||
return numApplyTransactionFails.value();
|
return numApplyTransactionFails.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumReadStateMachineFails() {
|
||||||
|
return numReadStateMachineFails.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumReadStateMachineMissCount() {
|
||||||
|
return numReadStateMachineMissCount.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumBytesWrittenCount() {
|
||||||
|
return numBytesWrittenCount.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumBytesCommittedCount() {
|
||||||
|
return numBytesCommittedCount.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public void unRegister() {
|
public void unRegister() {
|
||||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
ms.unregisterSource(SOURCE_NAME);
|
ms.unregisterSource(SOURCE_NAME);
|
||||||
|
@ -391,6 +391,8 @@ private CompletableFuture<Message> handleWriteChunk(
|
|||||||
// Remove the future once it finishes execution from the
|
// Remove the future once it finishes execution from the
|
||||||
// writeChunkFutureMap.
|
// writeChunkFutureMap.
|
||||||
writeChunkFuture.thenApply(r -> {
|
writeChunkFuture.thenApply(r -> {
|
||||||
|
metrics.incNumBytesWrittenCount(
|
||||||
|
requestProto.getWriteChunk().getChunkData().getLen());
|
||||||
writeChunkFutureMap.remove(entryIndex);
|
writeChunkFutureMap.remove(entryIndex);
|
||||||
LOG.debug("writeChunk writeStateMachineData completed: blockId " + write
|
LOG.debug("writeChunk writeStateMachineData completed: blockId " + write
|
||||||
.getBlockID() + " logIndex " + entryIndex + " chunkName " + write
|
.getBlockID() + " logIndex " + entryIndex + " chunkName " + write
|
||||||
@ -438,12 +440,12 @@ public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
|
|||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Message> query(Message request) {
|
public CompletableFuture<Message> query(Message request) {
|
||||||
try {
|
try {
|
||||||
metrics.incNumReadStateMachineOps();
|
metrics.incNumQueryStateMachineOps();
|
||||||
final ContainerCommandRequestProto requestProto =
|
final ContainerCommandRequestProto requestProto =
|
||||||
getContainerCommandRequestProto(request.getContent());
|
getContainerCommandRequestProto(request.getContent());
|
||||||
return CompletableFuture.completedFuture(runCommand(requestProto, null));
|
return CompletableFuture.completedFuture(runCommand(requestProto, null));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
metrics.incNumReadStateMachineFails();
|
metrics.incNumQueryStateMachineFails();
|
||||||
return completeExceptionally(e);
|
return completeExceptionally(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -520,10 +522,14 @@ public CompletableFuture<Void> flushStateMachineData(long index) {
|
|||||||
public CompletableFuture<ByteString> readStateMachineData(
|
public CompletableFuture<ByteString> readStateMachineData(
|
||||||
LogEntryProto entry) {
|
LogEntryProto entry) {
|
||||||
StateMachineLogEntryProto smLogEntryProto = entry.getStateMachineLogEntry();
|
StateMachineLogEntryProto smLogEntryProto = entry.getStateMachineLogEntry();
|
||||||
|
metrics.incNumReadStateMachineOps();
|
||||||
if (!getStateMachineData(smLogEntryProto).isEmpty()) {
|
if (!getStateMachineData(smLogEntryProto).isEmpty()) {
|
||||||
return CompletableFuture.completedFuture(ByteString.EMPTY);
|
return CompletableFuture.completedFuture(ByteString.EMPTY);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
// the stateMachine data is not present in the stateMachine cache,
|
||||||
|
// increment the stateMachine cache miss count
|
||||||
|
metrics.incNumReadStateMachineMissCount();
|
||||||
final ContainerCommandRequestProto requestProto =
|
final ContainerCommandRequestProto requestProto =
|
||||||
getContainerCommandRequestProto(
|
getContainerCommandRequestProto(
|
||||||
entry.getStateMachineLogEntry().getLogData());
|
entry.getStateMachineLogEntry().getLogData());
|
||||||
@ -537,6 +543,7 @@ public CompletableFuture<ByteString> readStateMachineData(
|
|||||||
getCachedStateMachineData(entry.getIndex(), entry.getTerm(),
|
getCachedStateMachineData(entry.getIndex(), entry.getTerm(),
|
||||||
requestProto));
|
requestProto));
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
|
metrics.incNumReadStateMachineFails();
|
||||||
future.completeExceptionally(e);
|
future.completeExceptionally(e);
|
||||||
}
|
}
|
||||||
return future;
|
return future;
|
||||||
@ -547,6 +554,7 @@ public CompletableFuture<ByteString> readStateMachineData(
|
|||||||
+ " cannot have state machine data");
|
+ " cannot have state machine data");
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
metrics.incNumReadStateMachineFails();
|
||||||
LOG.error("unable to read stateMachineData:" + e);
|
LOG.error("unable to read stateMachineData:" + e);
|
||||||
return completeExceptionally(e);
|
return completeExceptionally(e);
|
||||||
}
|
}
|
||||||
@ -618,6 +626,10 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
|
|||||||
applyTransactionCompletionMap
|
applyTransactionCompletionMap
|
||||||
.put(index, trx.getLogEntry().getTerm());
|
.put(index, trx.getLogEntry().getTerm());
|
||||||
Preconditions.checkState(previous == null);
|
Preconditions.checkState(previous == null);
|
||||||
|
if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
|
||||||
|
metrics.incNumBytesCommittedCount(
|
||||||
|
requestProto.getWriteChunk().getChunkData().getLen());
|
||||||
|
}
|
||||||
updateLastApplied();
|
updateLastApplied();
|
||||||
});
|
});
|
||||||
return future;
|
return future;
|
||||||
|
@ -14,8 +14,10 @@
|
|||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
|
= GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator;
|
||||||
|
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
|
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
|
||||||
|
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
@ -29,9 +31,9 @@
|
|||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
.ContainerCommandRequestProto;
|
.ContainerCommandRequestProto;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
.ContainerCommandResponseProto;
|
.ContainerCommandResponseProto;
|
||||||
import org.apache.hadoop.hdds.scm.*;
|
import org.apache.hadoop.hdds.scm.*;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||||
@ -42,7 +44,7 @@
|
|||||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
||||||
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
|
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
|
||||||
import org.apache.hadoop.ozone.container.common.transport.server
|
import org.apache.hadoop.ozone.container.common.transport.server
|
||||||
.XceiverServerSpi;
|
.XceiverServerSpi;
|
||||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
@ -57,13 +59,11 @@
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class tests the metrics of ContainerStateMachine.
|
* This class tests the metrics of ContainerStateMachine.
|
||||||
*/
|
*/
|
||||||
public class TestCSMMetrics {
|
public class TestCSMMetrics {
|
||||||
static final String TEST_DIR
|
static final String TEST_DIR
|
||||||
= GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator;
|
|
||||||
|
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
interface CheckedBiFunction<LEFT, RIGHT, OUT, THROWABLE extends Throwable> {
|
interface CheckedBiFunction<LEFT, RIGHT, OUT, THROWABLE extends Throwable> {
|
||||||
OUT apply(LEFT left, RIGHT right) throws THROWABLE;
|
OUT apply(LEFT left, RIGHT right) throws THROWABLE;
|
||||||
@ -112,6 +112,8 @@ static void runContainerStateMachineMetrics(
|
|||||||
assertCounter("NumWriteStateMachineOps", 0L, metric);
|
assertCounter("NumWriteStateMachineOps", 0L, metric);
|
||||||
assertCounter("NumReadStateMachineOps", 0L, metric);
|
assertCounter("NumReadStateMachineOps", 0L, metric);
|
||||||
assertCounter("NumApplyTransactionOps", 0L, metric);
|
assertCounter("NumApplyTransactionOps", 0L, metric);
|
||||||
|
assertCounter("NumBytesWrittenCount", 0L, metric);
|
||||||
|
assertCounter("NumBytesCommittedCount", 0L, metric);
|
||||||
|
|
||||||
// Write Chunk
|
// Write Chunk
|
||||||
BlockID blockID = ContainerTestHelper.getTestBlockID(ContainerTestHelper.
|
BlockID blockID = ContainerTestHelper.getTestBlockID(ContainerTestHelper.
|
||||||
@ -127,7 +129,9 @@ static void runContainerStateMachineMetrics(
|
|||||||
metric = getMetrics(CSMMetrics.SOURCE_NAME +
|
metric = getMetrics(CSMMetrics.SOURCE_NAME +
|
||||||
RaftGroupId.valueOf(pipeline.getId().getId()).toString());
|
RaftGroupId.valueOf(pipeline.getId().getId()).toString());
|
||||||
assertCounter("NumWriteStateMachineOps", 1L, metric);
|
assertCounter("NumWriteStateMachineOps", 1L, metric);
|
||||||
|
assertCounter("NumBytesWrittenCount", 1024L, metric);
|
||||||
assertCounter("NumApplyTransactionOps", 1L, metric);
|
assertCounter("NumApplyTransactionOps", 1L, metric);
|
||||||
|
assertCounter("NumBytesCommittedCount", 1024L, metric);
|
||||||
|
|
||||||
//Read Chunk
|
//Read Chunk
|
||||||
ContainerProtos.ContainerCommandRequestProto readChunkRequest =
|
ContainerProtos.ContainerCommandRequestProto readChunkRequest =
|
||||||
@ -139,7 +143,7 @@ static void runContainerStateMachineMetrics(
|
|||||||
|
|
||||||
metric = getMetrics(CSMMetrics.SOURCE_NAME +
|
metric = getMetrics(CSMMetrics.SOURCE_NAME +
|
||||||
RaftGroupId.valueOf(pipeline.getId().getId()).toString());
|
RaftGroupId.valueOf(pipeline.getId().getId()).toString());
|
||||||
assertCounter("NumReadStateMachineOps", 1L, metric);
|
assertCounter("NumQueryStateMachineOps", 1L, metric);
|
||||||
assertCounter("NumApplyTransactionOps", 1L, metric);
|
assertCounter("NumApplyTransactionOps", 1L, metric);
|
||||||
} finally {
|
} finally {
|
||||||
if (client != null) {
|
if (client != null) {
|
||||||
|
Loading…
Reference in New Issue
Block a user