HDDS-1579. Create OMDoubleBuffer metrics. (#871)
This commit is contained in:
parent
5962a518bd
commit
d9a9e9913e
@ -23,15 +23,17 @@
|
|||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
||||||
import org.apache.hadoop.ozone.om.ratis.helpers.DoubleBufferEntry;
|
import org.apache.hadoop.ozone.om.ratis.helpers.DoubleBufferEntry;
|
||||||
|
import org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerDoubleBufferMetrics;
|
||||||
import org.apache.hadoop.ozone.om.response.OMClientResponse;
|
import org.apache.hadoop.ozone.om.response.OMClientResponse;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
import org.apache.hadoop.utils.db.BatchOperation;
|
import org.apache.hadoop.utils.db.BatchOperation;
|
||||||
|
|
||||||
import org.apache.ratis.util.ExitUtils;
|
import org.apache.ratis.util.ExitUtils;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class implements DoubleBuffer implementation of OMClientResponse's. In
|
* This class implements DoubleBuffer implementation of OMClientResponse's. In
|
||||||
@ -63,6 +65,8 @@ public class OzoneManagerDoubleBuffer {
|
|||||||
private final AtomicLong flushedTransactionCount = new AtomicLong(0);
|
private final AtomicLong flushedTransactionCount = new AtomicLong(0);
|
||||||
private final AtomicLong flushIterations = new AtomicLong(0);
|
private final AtomicLong flushIterations = new AtomicLong(0);
|
||||||
private volatile boolean isRunning;
|
private volatile boolean isRunning;
|
||||||
|
private OzoneManagerDoubleBufferMetrics ozoneManagerDoubleBufferMetrics;
|
||||||
|
private long maxFlushedTransactionsInOneIteration;
|
||||||
|
|
||||||
private final OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot;
|
private final OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot;
|
||||||
|
|
||||||
@ -71,8 +75,9 @@ public OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager,
|
|||||||
this.currentBuffer = new ConcurrentLinkedQueue<>();
|
this.currentBuffer = new ConcurrentLinkedQueue<>();
|
||||||
this.readyBuffer = new ConcurrentLinkedQueue<>();
|
this.readyBuffer = new ConcurrentLinkedQueue<>();
|
||||||
this.omMetadataManager = omMetadataManager;
|
this.omMetadataManager = omMetadataManager;
|
||||||
|
|
||||||
this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapShot;
|
this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapShot;
|
||||||
|
this.ozoneManagerDoubleBufferMetrics =
|
||||||
|
OzoneManagerDoubleBufferMetrics.create();
|
||||||
|
|
||||||
isRunning = true;
|
isRunning = true;
|
||||||
// Daemon thread which runs in back ground and flushes transactions to DB.
|
// Daemon thread which runs in back ground and flushes transactions to DB.
|
||||||
@ -80,7 +85,6 @@ public OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager,
|
|||||||
daemon.setName("OMDoubleBufferFlushThread");
|
daemon.setName("OMDoubleBufferFlushThread");
|
||||||
daemon.start();
|
daemon.start();
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -120,6 +124,7 @@ private void flushTransactions() {
|
|||||||
.max(Long::compareTo).get();
|
.max(Long::compareTo).get();
|
||||||
|
|
||||||
readyBuffer.clear();
|
readyBuffer.clear();
|
||||||
|
|
||||||
// cleanup cache.
|
// cleanup cache.
|
||||||
cleanupCache(lastRatisTransactionIndex);
|
cleanupCache(lastRatisTransactionIndex);
|
||||||
|
|
||||||
@ -129,6 +134,9 @@ private void flushTransactions() {
|
|||||||
// update the last updated index in OzoneManagerStateMachine.
|
// update the last updated index in OzoneManagerStateMachine.
|
||||||
ozoneManagerRatisSnapShot.updateLastAppliedIndex(
|
ozoneManagerRatisSnapShot.updateLastAppliedIndex(
|
||||||
lastRatisTransactionIndex);
|
lastRatisTransactionIndex);
|
||||||
|
|
||||||
|
// set metrics.
|
||||||
|
updateMetrics(flushedTransactionsSize);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException ex) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
@ -162,6 +170,23 @@ private void cleanupCache(long lastRatisTransactionIndex) {
|
|||||||
omMetadataManager.getUserTable().cleanupCache(lastRatisTransactionIndex);
|
omMetadataManager.getUserTable().cleanupCache(lastRatisTransactionIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update OzoneManagerDoubleBuffer metrics values.
|
||||||
|
* @param flushedTransactionsSize
|
||||||
|
*/
|
||||||
|
private void updateMetrics(
|
||||||
|
long flushedTransactionsSize) {
|
||||||
|
ozoneManagerDoubleBufferMetrics.incrTotalNumOfFlushOperations();
|
||||||
|
ozoneManagerDoubleBufferMetrics.incrTotalSizeOfFlushedTransactions(
|
||||||
|
flushedTransactionsSize);
|
||||||
|
if (maxFlushedTransactionsInOneIteration < flushedTransactionsSize) {
|
||||||
|
maxFlushedTransactionsInOneIteration = flushedTransactionsSize;
|
||||||
|
ozoneManagerDoubleBufferMetrics
|
||||||
|
.setMaxNumberOfTransactionsFlushedInOneIteration(
|
||||||
|
flushedTransactionsSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop OM DoubleBuffer flush thread.
|
* Stop OM DoubleBuffer flush thread.
|
||||||
*/
|
*/
|
||||||
@ -170,6 +195,9 @@ public synchronized void stop() {
|
|||||||
LOG.info("Stopping OMDoubleBuffer flush thread");
|
LOG.info("Stopping OMDoubleBuffer flush thread");
|
||||||
isRunning = false;
|
isRunning = false;
|
||||||
daemon.interrupt();
|
daemon.interrupt();
|
||||||
|
|
||||||
|
// stop metrics.
|
||||||
|
ozoneManagerDoubleBufferMetrics.unRegister();
|
||||||
} else {
|
} else {
|
||||||
LOG.info("OMDoubleBuffer flush thread is not running.");
|
LOG.info("OMDoubleBuffer flush thread is not running.");
|
||||||
}
|
}
|
||||||
@ -236,5 +264,10 @@ private synchronized void setReadyBuffer() {
|
|||||||
readyBuffer = temp;
|
readyBuffer = temp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public OzoneManagerDoubleBufferMetrics getOzoneManagerDoubleBufferMetrics() {
|
||||||
|
return ozoneManagerDoubleBufferMetrics;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,89 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.om.ratis.metrics;
|
||||||
|
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class which maintains metrics related to OzoneManager DoubleBuffer.
|
||||||
|
*/
|
||||||
|
public class OzoneManagerDoubleBufferMetrics {
|
||||||
|
|
||||||
|
private static final String SOURCE_NAME =
|
||||||
|
OzoneManagerDoubleBufferMetrics.class.getSimpleName();
|
||||||
|
|
||||||
|
@Metric(about = "Total Number of flush operations happened in " +
|
||||||
|
"OzoneManagerDoubleBuffer.")
|
||||||
|
private MutableCounterLong totalNumOfFlushOperations;
|
||||||
|
|
||||||
|
@Metric(about = "Total Number of flushed transactions happened in " +
|
||||||
|
"OzoneManagerDoubleBuffer.")
|
||||||
|
private MutableCounterLong totalNumOfFlushedTransactions;
|
||||||
|
|
||||||
|
@Metric(about = "Max Number of transactions flushed in a iteration in " +
|
||||||
|
"OzoneManagerDoubleBuffer. This will provide a value which is maximum " +
|
||||||
|
"number of transactions flushed in a single flush iteration till now.")
|
||||||
|
private MutableCounterLong maxNumberOfTransactionsFlushedInOneIteration;
|
||||||
|
|
||||||
|
|
||||||
|
public static OzoneManagerDoubleBufferMetrics create() {
|
||||||
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
|
return ms.register(SOURCE_NAME,
|
||||||
|
"OzoneManager DoubleBuffer Metrics",
|
||||||
|
new OzoneManagerDoubleBufferMetrics());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrTotalNumOfFlushOperations() {
|
||||||
|
this.totalNumOfFlushOperations.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrTotalSizeOfFlushedTransactions(
|
||||||
|
long flushedTransactions) {
|
||||||
|
this.totalNumOfFlushedTransactions.incr(flushedTransactions);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxNumberOfTransactionsFlushedInOneIteration(
|
||||||
|
long maxTransactions) {
|
||||||
|
// We should set the value with maxTransactions, so decrement old value
|
||||||
|
// first and then add the new value.
|
||||||
|
this.maxNumberOfTransactionsFlushedInOneIteration.incr(
|
||||||
|
Math.negateExact(getMaxNumberOfTransactionsFlushedInOneIteration())
|
||||||
|
+ maxTransactions);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTotalNumOfFlushOperations() {
|
||||||
|
return totalNumOfFlushOperations.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTotalNumOfFlushedTransactions() {
|
||||||
|
return totalNumOfFlushedTransactions.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMaxNumberOfTransactionsFlushedInOneIteration() {
|
||||||
|
return maxNumberOfTransactionsFlushedInOneIteration.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unRegister() {
|
||||||
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
|
ms.unregisterSource(SOURCE_NAME);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,21 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* package which contains metrics classes.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.om.ratis.metrics;
|
@ -22,11 +22,6 @@
|
|||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
|
|
||||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
|
||||||
.CreateBucketResponse;
|
|
||||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
|
||||||
.OMResponse;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -34,7 +29,12 @@
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
||||||
|
.CreateBucketResponse;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
||||||
|
.OMResponse;
|
||||||
|
import org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerDoubleBufferMetrics;
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
||||||
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
|
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
|
||||||
@ -91,6 +91,18 @@ public void stop() {
|
|||||||
public void testDoubleBufferWithDummyResponse() throws Exception {
|
public void testDoubleBufferWithDummyResponse() throws Exception {
|
||||||
String volumeName = UUID.randomUUID().toString();
|
String volumeName = UUID.randomUUID().toString();
|
||||||
int bucketCount = 100;
|
int bucketCount = 100;
|
||||||
|
OzoneManagerDoubleBufferMetrics ozoneManagerDoubleBufferMetrics =
|
||||||
|
doubleBuffer.getOzoneManagerDoubleBufferMetrics();
|
||||||
|
|
||||||
|
// As we have not flushed/added any transactions, all metrics should have
|
||||||
|
// value zero.
|
||||||
|
Assert.assertTrue(ozoneManagerDoubleBufferMetrics
|
||||||
|
.getTotalNumOfFlushOperations() == 0);
|
||||||
|
Assert.assertTrue(ozoneManagerDoubleBufferMetrics
|
||||||
|
.getTotalNumOfFlushedTransactions() == 0);
|
||||||
|
Assert.assertTrue(ozoneManagerDoubleBufferMetrics
|
||||||
|
.getMaxNumberOfTransactionsFlushedInOneIteration() == 0);
|
||||||
|
|
||||||
for (int i=0; i < bucketCount; i++) {
|
for (int i=0; i < bucketCount; i++) {
|
||||||
doubleBuffer.add(createDummyBucketResponse(volumeName,
|
doubleBuffer.add(createDummyBucketResponse(volumeName,
|
||||||
UUID.randomUUID().toString()), trxId.incrementAndGet());
|
UUID.randomUUID().toString()), trxId.incrementAndGet());
|
||||||
@ -98,6 +110,13 @@ public void testDoubleBufferWithDummyResponse() throws Exception {
|
|||||||
GenericTestUtils.waitFor(() ->
|
GenericTestUtils.waitFor(() ->
|
||||||
doubleBuffer.getFlushedTransactionCount() == bucketCount, 100,
|
doubleBuffer.getFlushedTransactionCount() == bucketCount, 100,
|
||||||
60000);
|
60000);
|
||||||
|
|
||||||
|
Assert.assertTrue(ozoneManagerDoubleBufferMetrics
|
||||||
|
.getTotalNumOfFlushOperations() > 0);
|
||||||
|
Assert.assertTrue(ozoneManagerDoubleBufferMetrics
|
||||||
|
.getTotalNumOfFlushedTransactions() == bucketCount);
|
||||||
|
Assert.assertTrue(ozoneManagerDoubleBufferMetrics
|
||||||
|
.getMaxNumberOfTransactionsFlushedInOneIteration() > 0);
|
||||||
Assert.assertTrue(omMetadataManager.countRowsInTable(
|
Assert.assertTrue(omMetadataManager.countRowsInTable(
|
||||||
omMetadataManager.getBucketTable()) == (bucketCount));
|
omMetadataManager.getBucketTable()) == (bucketCount));
|
||||||
Assert.assertTrue(doubleBuffer.getFlushIterations() > 0);
|
Assert.assertTrue(doubleBuffer.getFlushIterations() > 0);
|
||||||
|
Loading…
Reference in New Issue
Block a user