HDDS-1750. Add block allocation metrics for pipelines in SCM. Contributed by Lokesh Jain. (#1047)
This commit is contained in:
parent
738c09349e
commit
b5d30e4914
@ -243,6 +243,7 @@ private AllocatedBlock newBlock(ContainerInfo containerInfo) {
|
|||||||
.setPipeline(pipeline);
|
.setPipeline(pipeline);
|
||||||
LOG.trace("New block allocated : {} Container ID: {}", localID,
|
LOG.trace("New block allocated : {} Container ID: {}", localID,
|
||||||
containerID);
|
containerID);
|
||||||
|
pipelineManager.incNumBlocksAllocatedMetric(pipeline.getId());
|
||||||
return abb.build();
|
return abb.build();
|
||||||
} catch (PipelineNotFoundException ex) {
|
} catch (PipelineNotFoundException ex) {
|
||||||
LOG.error("Pipeline Machine count is zero.", ex);
|
LOG.error("Pipeline Machine count is zero.", ex);
|
||||||
|
@ -75,4 +75,6 @@ void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
|
|||||||
void startPipelineCreator();
|
void startPipelineCreator();
|
||||||
|
|
||||||
void triggerPipelineCreation();
|
void triggerPipelineCreation();
|
||||||
|
|
||||||
|
void incNumBlocksAllocatedMetric(PipelineID id);
|
||||||
}
|
}
|
||||||
|
@ -152,6 +152,7 @@ public synchronized Pipeline createPipeline(
|
|||||||
stateManager.addPipeline(pipeline);
|
stateManager.addPipeline(pipeline);
|
||||||
nodeManager.addPipeline(pipeline);
|
nodeManager.addPipeline(pipeline);
|
||||||
metrics.incNumPipelineCreated();
|
metrics.incNumPipelineCreated();
|
||||||
|
metrics.createPerPipelineMetrics(pipeline);
|
||||||
return pipeline;
|
return pipeline;
|
||||||
} catch (InsufficientDatanodesException idEx) {
|
} catch (InsufficientDatanodesException idEx) {
|
||||||
throw idEx;
|
throw idEx;
|
||||||
@ -285,7 +286,8 @@ public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
|
|||||||
public void openPipeline(PipelineID pipelineId) throws IOException {
|
public void openPipeline(PipelineID pipelineId) throws IOException {
|
||||||
lock.writeLock().lock();
|
lock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
stateManager.openPipeline(pipelineId);
|
Pipeline pipeline = stateManager.openPipeline(pipelineId);
|
||||||
|
metrics.createPerPipelineMetrics(pipeline);
|
||||||
} finally {
|
} finally {
|
||||||
lock.writeLock().unlock();
|
lock.writeLock().unlock();
|
||||||
}
|
}
|
||||||
@ -362,6 +364,7 @@ private void finalizePipeline(PipelineID pipelineId) throws IOException {
|
|||||||
for (ContainerID containerID : containerIDs) {
|
for (ContainerID containerID : containerIDs) {
|
||||||
eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
|
eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
|
||||||
}
|
}
|
||||||
|
metrics.removePipelineMetrics(pipelineId);
|
||||||
} finally {
|
} finally {
|
||||||
lock.writeLock().unlock();
|
lock.writeLock().unlock();
|
||||||
}
|
}
|
||||||
@ -402,6 +405,11 @@ private void removePipeline(PipelineID pipelineId) throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void incNumBlocksAllocatedMetric(PipelineID id) {
|
||||||
|
metrics.incNumBlocksAllocated(id);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
if (scheduler != null) {
|
if (scheduler != null) {
|
||||||
|
@ -19,31 +19,46 @@
|
|||||||
package org.apache.hadoop.hdds.scm.pipeline;
|
package org.apache.hadoop.hdds.scm.pipeline;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSource;
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.lib.Interns;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class maintains Pipeline related metrics.
|
* This class maintains Pipeline related metrics.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@Metrics(about = "SCM PipelineManager Metrics", context = "ozone")
|
@Metrics(about = "SCM PipelineManager Metrics", context = "ozone")
|
||||||
public final class SCMPipelineMetrics {
|
public final class SCMPipelineMetrics implements MetricsSource {
|
||||||
|
|
||||||
private static final String SOURCE_NAME =
|
private static final String SOURCE_NAME =
|
||||||
SCMPipelineMetrics.class.getSimpleName();
|
SCMPipelineMetrics.class.getSimpleName();
|
||||||
|
|
||||||
|
private MetricsRegistry registry;
|
||||||
|
|
||||||
private @Metric MutableCounterLong numPipelineCreated;
|
private @Metric MutableCounterLong numPipelineCreated;
|
||||||
private @Metric MutableCounterLong numPipelineCreationFailed;
|
private @Metric MutableCounterLong numPipelineCreationFailed;
|
||||||
private @Metric MutableCounterLong numPipelineDestroyed;
|
private @Metric MutableCounterLong numPipelineDestroyed;
|
||||||
private @Metric MutableCounterLong numPipelineDestroyFailed;
|
private @Metric MutableCounterLong numPipelineDestroyFailed;
|
||||||
private @Metric MutableCounterLong numPipelineReportProcessed;
|
private @Metric MutableCounterLong numPipelineReportProcessed;
|
||||||
private @Metric MutableCounterLong numPipelineReportProcessingFailed;
|
private @Metric MutableCounterLong numPipelineReportProcessingFailed;
|
||||||
|
private Map<PipelineID, MutableCounterLong> numBlocksAllocated;
|
||||||
|
|
||||||
/** Private constructor. */
|
/** Private constructor. */
|
||||||
private SCMPipelineMetrics() { }
|
private SCMPipelineMetrics() {
|
||||||
|
this.registry = new MetricsRegistry(SOURCE_NAME);
|
||||||
|
numBlocksAllocated = new ConcurrentHashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create and returns SCMPipelineMetrics instance.
|
* Create and returns SCMPipelineMetrics instance.
|
||||||
@ -64,6 +79,43 @@ public void unRegister() {
|
|||||||
ms.unregisterSource(SOURCE_NAME);
|
ms.unregisterSource(SOURCE_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("SuspiciousMethodCalls")
|
||||||
|
public void getMetrics(MetricsCollector collector, boolean all) {
|
||||||
|
MetricsRecordBuilder recordBuilder = collector.addRecord(SOURCE_NAME);
|
||||||
|
numPipelineCreated.snapshot(recordBuilder, true);
|
||||||
|
numPipelineCreationFailed.snapshot(recordBuilder, true);
|
||||||
|
numPipelineDestroyed.snapshot(recordBuilder, true);
|
||||||
|
numPipelineDestroyFailed.snapshot(recordBuilder, true);
|
||||||
|
numPipelineReportProcessed.snapshot(recordBuilder, true);
|
||||||
|
numPipelineReportProcessingFailed.snapshot(recordBuilder, true);
|
||||||
|
numBlocksAllocated
|
||||||
|
.forEach((pid, metric) -> metric.snapshot(recordBuilder, true));
|
||||||
|
}
|
||||||
|
|
||||||
|
void createPerPipelineMetrics(Pipeline pipeline) {
|
||||||
|
numBlocksAllocated.put(pipeline.getId(), new MutableCounterLong(Interns
|
||||||
|
.info(getBlockAllocationMetricName(pipeline),
|
||||||
|
"Number of blocks allocated in pipeline " + pipeline.getId()), 0L));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getBlockAllocationMetricName(Pipeline pipeline) {
|
||||||
|
return "NumBlocksAllocated-" + pipeline.getType() + "-" + pipeline
|
||||||
|
.getFactor() + "-" + pipeline.getId().getId();
|
||||||
|
}
|
||||||
|
|
||||||
|
void removePipelineMetrics(PipelineID pipelineID) {
|
||||||
|
numBlocksAllocated.remove(pipelineID);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increments number of blocks allocated for the pipeline.
|
||||||
|
*/
|
||||||
|
void incNumBlocksAllocated(PipelineID pipelineID) {
|
||||||
|
Optional.of(numBlocksAllocated.get(pipelineID)).ifPresent(
|
||||||
|
MutableCounterLong::incr);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Increments number of successful pipeline creation count.
|
* Increments number of successful pipeline creation count.
|
||||||
*/
|
*/
|
||||||
|
@ -19,6 +19,9 @@
|
|||||||
package org.apache.hadoop.ozone.scm.pipeline;
|
package org.apache.hadoop.ozone.scm.pipeline;
|
||||||
|
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||||
|
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
|
||||||
|
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
|
||||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
|
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
|
||||||
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineMetrics;
|
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineMetrics;
|
||||||
@ -90,6 +93,38 @@ public void testPipelineDestroy() {
|
|||||||
assertCounter("NumPipelineDestroyed", 1L, metrics);
|
assertCounter("NumPipelineDestroyed", 1L, metrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNumBlocksAllocated() throws IOException {
|
||||||
|
AllocatedBlock block =
|
||||||
|
cluster.getStorageContainerManager().getScmBlockManager()
|
||||||
|
.allocateBlock(5, HddsProtos.ReplicationType.RATIS,
|
||||||
|
HddsProtos.ReplicationFactor.ONE, "Test", new ExcludeList());
|
||||||
|
MetricsRecordBuilder metrics =
|
||||||
|
getMetrics(SCMPipelineMetrics.class.getSimpleName());
|
||||||
|
Pipeline pipeline = block.getPipeline();
|
||||||
|
long numBlocksAllocated = getLongCounter(
|
||||||
|
SCMPipelineMetrics.getBlockAllocationMetricName(pipeline), metrics);
|
||||||
|
Assert.assertEquals(numBlocksAllocated, 1);
|
||||||
|
|
||||||
|
// destroy the pipeline
|
||||||
|
try {
|
||||||
|
cluster.getStorageContainerManager().getClientProtocolServer()
|
||||||
|
.closePipeline(pipeline.getId().getProtobuf());
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
metrics = getMetrics(SCMPipelineMetrics.class.getSimpleName());
|
||||||
|
try {
|
||||||
|
getLongCounter(SCMPipelineMetrics.getBlockAllocationMetricName(pipeline),
|
||||||
|
metrics);
|
||||||
|
Assert.fail("Metric should not be present for closed pipeline.");
|
||||||
|
} catch (AssertionError e) {
|
||||||
|
Assert.assertTrue(e.getMessage().contains(
|
||||||
|
"Expected exactly one metric for name " + SCMPipelineMetrics
|
||||||
|
.getBlockAllocationMetricName(block.getPipeline())));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void teardown() {
|
public void teardown() {
|
||||||
|
Loading…
Reference in New Issue
Block a user