HDDS-1070. Adding Node and Pipeline related metrics in SCM.

Contributed by Nandakumar.
This commit is contained in:
Anu Engineer 2019-02-24 15:39:41 -08:00
parent 3aa0a57ea0
commit 92b1fdcece
11 changed files with 663 additions and 4 deletions

View File

@ -91,10 +91,9 @@ public class SCMNodeManager implements NodeManager {
private final String clusterID;
private final VersionInfo version;
private final CommandQueue commandQueue;
private final SCMNodeMetrics metrics;
// Node manager MXBean
private ObjectName nmInfoBean;
// Node pool manager.
private final StorageContainerManager scmManager;
/**
@ -103,6 +102,7 @@ public class SCMNodeManager implements NodeManager {
public SCMNodeManager(OzoneConfiguration conf, String clusterID,
StorageContainerManager scmManager, EventPublisher eventPublisher)
throws IOException {
this.metrics = SCMNodeMetrics.create();
this.nodeStateManager = new NodeStateManager(conf, eventPublisher);
this.clusterID = clusterID;
this.version = VersionInfo.getLatestVersion();
@ -185,6 +185,7 @@ public NodeState getNodeState(DatanodeDetails datanodeDetails) {
@Override
public void close() throws IOException {
unregisterMXBean();
metrics.unRegister();
}
/**
@ -257,7 +258,9 @@ public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
"DatanodeDetails.");
try {
nodeStateManager.updateLastHeartbeatTime(datanodeDetails);
metrics.incNumHBProcessed();
} catch (NodeNotFoundException e) {
metrics.incNumHBProcessingFailed();
LOG.error("SCM trying to process heartbeat from an " +
"unregistered node {}. Ignoring the heartbeat.", datanodeDetails);
}
@ -287,8 +290,10 @@ public void processNodeReport(DatanodeDetails datanodeDetails,
DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails);
if (nodeReport != null) {
datanodeInfo.updateStorageReports(nodeReport.getStorageReportList());
metrics.incNumNodeReportProcessed();
}
} catch (NodeNotFoundException e) {
metrics.incNumNodeReportProcessingFailed();
LOG.warn("Got node report from unregistered datanode {}",
datanodeDetails);
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
/**
* This class maintains Node related metrics.
*/
@InterfaceAudience.Private
@Metrics(about = "SCM NodeManager Metrics", context = "ozone")
public final class SCMNodeMetrics {
private static final String SOURCE_NAME =
SCMNodeMetrics.class.getSimpleName();
private @Metric MutableCounterLong numHBProcessed;
private @Metric MutableCounterLong numHBProcessingFailed;
private @Metric MutableCounterLong numNodeReportProcessed;
private @Metric MutableCounterLong numNodeReportProcessingFailed;
/** Private constructor. */
private SCMNodeMetrics() { }
/**
* Create and returns SCMNodeMetrics instance.
*
* @return SCMNodeMetrics
*/
public static SCMNodeMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
return ms.register(SOURCE_NAME, "SCM NodeManager Metrics",
new SCMNodeMetrics());
}
/**
* Unregister the metrics instance.
*/
public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
}
/**
* Increments number of heartbeat processed count.
*/
void incNumHBProcessed() {
numHBProcessed.incr();
}
/**
* Increments number of heartbeat processing failed count.
*/
void incNumHBProcessingFailed() {
numHBProcessingFailed.incr();
}
/**
* Increments number of node report processed count.
*/
void incNumNodeReportProcessed() {
numNodeReportProcessed.incr();
}
/**
* Increments number of node report processing failed count.
*/
void incNumNodeReportProcessingFailed() {
numNodeReportProcessingFailed.incr();
}
}

View File

@ -31,7 +31,7 @@
/**
* Interface which exposes the api for pipeline management.
*/
public interface PipelineManager extends Closeable {
public interface PipelineManager extends Closeable, PipelineManagerMXBean {
Pipeline createPipeline(ReplicationType type, ReplicationFactor factor)
throws IOException;

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.classification.InterfaceAudience;
import java.util.Map;
/**
* This is the JMX management interface for information related to
* PipelineManager.
*/
@InterfaceAudience.Private
public interface PipelineManagerMXBean {
/**
* Returns the number of pipelines in different state.
* @return state to number of pipeline map
*/
Map<String, Integer> getPipelineInfo();
}

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataStore;
@ -36,8 +37,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
@ -68,6 +71,9 @@ public class SCMPipelineManager implements PipelineManager {
private final EventPublisher eventPublisher;
private final NodeManager nodeManager;
private final SCMPipelineMetrics metrics;
// Pipeline Manager MXBean
private ObjectName pmInfoBean;
public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
EventPublisher eventPublisher) throws IOException {
@ -87,6 +93,9 @@ public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
.build();
this.eventPublisher = eventPublisher;
this.nodeManager = nodeManager;
this.metrics = SCMPipelineMetrics.create();
this.pmInfoBean = MBeans.register("SCMPipelineManager",
"SCMPipelineManagerInfo", this);
initializePipelineState();
}
@ -115,12 +124,16 @@ public synchronized Pipeline createPipeline(
ReplicationType type, ReplicationFactor factor) throws IOException {
lock.writeLock().lock();
try {
Pipeline pipeline = pipelineFactory.create(type, factor);
Pipeline pipeline = pipelineFactory.create(type, factor);
pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
pipeline.getProtobufMessage().toByteArray());
stateManager.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
metrics.incNumPipelineCreated();
return pipeline;
} catch (IOException ex) {
metrics.incNumPipelineCreationFailed();
throw ex;
} finally {
lock.writeLock().unlock();
}
@ -130,6 +143,7 @@ public synchronized Pipeline createPipeline(
public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
List<DatanodeDetails> nodes) {
// This will mostly be used to create dummy pipeline for SimplePipelines.
// We don't update the metrics for SimplePipelines.
lock.writeLock().lock();
try {
return pipelineFactory.create(type, factor, nodes);
@ -260,11 +274,27 @@ public void removePipeline(PipelineID pipelineID) throws IOException {
pipelineStore.delete(pipelineID.getProtobuf().toByteArray());
Pipeline pipeline = stateManager.removePipeline(pipelineID);
nodeManager.removePipeline(pipeline);
metrics.incNumPipelineDestroyed();
} catch (IOException ex) {
metrics.incNumPipelineDestroyFailed();
throw ex;
} finally {
lock.writeLock().unlock();
}
}
@Override
public Map<String, Integer> getPipelineInfo() {
final Map<String, Integer> pipelineInfo = new HashMap<>();
for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
pipelineInfo.put(state.toString(), 0);
}
stateManager.getPipelines().forEach(pipeline ->
pipelineInfo.computeIfPresent(
pipeline.getPipelineState().toString(), (k, v) -> v + 1));
return pipelineInfo;
}
@Override
public void close() throws IOException {
if (pipelineFactory != null) {
@ -274,5 +304,12 @@ public void close() throws IOException {
if (pipelineStore != null) {
pipelineStore.close();
}
if(pmInfoBean != null) {
MBeans.unregister(this.pmInfoBean);
pmInfoBean = null;
}
if(metrics != null) {
metrics.unRegister();
}
}
}

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
/**
* This class maintains Pipeline related metrics.
*/
@InterfaceAudience.Private
@Metrics(about = "SCM PipelineManager Metrics", context = "ozone")
public final class SCMPipelineMetrics {
private static final String SOURCE_NAME =
SCMPipelineMetrics.class.getSimpleName();
private @Metric MutableCounterLong numPipelineCreated;
private @Metric MutableCounterLong numPipelineCreationFailed;
private @Metric MutableCounterLong numPipelineDestroyed;
private @Metric MutableCounterLong numPipelineDestroyFailed;
private @Metric MutableCounterLong numPipelineReportProcessed;
private @Metric MutableCounterLong numPipelineReportProcessingFailed;
/** Private constructor. */
private SCMPipelineMetrics() { }
/**
* Create and returns SCMPipelineMetrics instance.
*
* @return SCMPipelineMetrics
*/
public static SCMPipelineMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
return ms.register(SOURCE_NAME, "SCM PipelineManager Metrics",
new SCMPipelineMetrics());
}
/**
* Unregister the metrics instance.
*/
public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
}
/**
* Increments number of successful pipeline creation count.
*/
void incNumPipelineCreated() {
numPipelineCreated.incr();
}
/**
* Increments number of failed pipeline creation count.
*/
void incNumPipelineCreationFailed() {
numPipelineCreationFailed.incr();
}
/**
* Increments number of successful pipeline destroy count.
*/
void incNumPipelineDestroyed() {
numPipelineDestroyed.incr();
}
/**
* Increments number of failed pipeline destroy count.
*/
void incNumPipelineDestroyFailed() {
numPipelineDestroyFailed.incr();
}
/**
* Increments number of pipeline report processed count.
*/
void incNumPipelineReportProcessed() {
numPipelineReportProcessed.incr();
}
/**
* Increments number of pipeline report processing failed count.
*/
void incNumPipelineReportProcessingFailed() {
numPipelineReportProcessingFailed.incr();
}
}

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.node;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.node.SCMNodeMetrics;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
/**
* Test cases to verify the metrics exposed by SCMNodeManager.
*/
public class TestSCMNodeMetrics {
private MiniOzoneCluster cluster;
@Before
public void setup() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf).build();
cluster.waitForClusterToBeReady();
}
/**
* Verifies heartbeat processing count.
*
* @throws InterruptedException
*/
@Test
public void testHBProcessing() throws InterruptedException {
MetricsRecordBuilder metrics = getMetrics(
SCMNodeMetrics.class.getSimpleName());
long hbProcessed = getLongCounter("NumHBProcessed", metrics);
cluster.getHddsDatanodes().get(0)
.getDatanodeStateMachine().triggerHeartbeat();
// Give some time so that SCM receives and processes the heartbeat.
Thread.sleep(100L);
assertCounter("NumHBProcessed", hbProcessed + 1,
getMetrics(SCMNodeMetrics.class.getSimpleName()));
}
/**
* Verifies heartbeat processing failure count.
*/
@Test
public void testHBProcessingFailure() {
MetricsRecordBuilder metrics = getMetrics(
SCMNodeMetrics.class.getSimpleName());
long hbProcessedFailed = getLongCounter("NumHBProcessingFailed", metrics);
cluster.getStorageContainerManager().getScmNodeManager()
.processHeartbeat(TestUtils.randomDatanodeDetails());
assertCounter("NumHBProcessingFailed", hbProcessedFailed + 1,
getMetrics(SCMNodeMetrics.class.getSimpleName()));
}
/**
* Verifies node report processing count.
*
* @throws InterruptedException
*/
@Test
public void testNodeReportProcessing() throws InterruptedException {
MetricsRecordBuilder metrics = getMetrics(
SCMNodeMetrics.class.getSimpleName());
long nrProcessed = getLongCounter("NumNodeReportProcessed", metrics);
HddsDatanodeService datanode = cluster.getHddsDatanodes().get(0);
StorageReportProto storageReport = TestUtils.createStorageReport(
datanode.getDatanodeDetails().getUuid(), "/tmp", 100, 10, 90, null);
NodeReportProto nodeReport = NodeReportProto.newBuilder()
.addStorageReport(storageReport).build();
datanode.getDatanodeStateMachine().getContext().addReport(nodeReport);
datanode.getDatanodeStateMachine().triggerHeartbeat();
// Give some time so that SCM receives and processes the heartbeat.
Thread.sleep(100L);
assertCounter("NumNodeReportProcessed", nrProcessed + 1,
getMetrics(SCMNodeMetrics.class.getSimpleName()));
}
/**
* Verifies node report processing failure count.
*/
@Test
public void testNodeReportProcessingFailure() {
MetricsRecordBuilder metrics = getMetrics(
SCMNodeMetrics.class.getSimpleName());
long nrProcessed = getLongCounter("NumNodeReportProcessingFailed",
metrics);
DatanodeDetails datanode = TestUtils.randomDatanodeDetails();
StorageReportProto storageReport = TestUtils.createStorageReport(
datanode.getUuid(), "/tmp", 100, 10, 90, null);
NodeReportProto nodeReport = NodeReportProto.newBuilder()
.addStorageReport(storageReport).build();
cluster.getStorageContainerManager().getScmNodeManager()
.processNodeReport(datanode, nodeReport);
assertCounter("NumNodeReportProcessingFailed", nrProcessed + 1,
getMetrics(SCMNodeMetrics.class.getSimpleName()));
}
@After
public void teardown() {
cluster.shutdown();
}
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* <p>
* Utility classes to encode/decode DTO objects to/from byte array.
*/
/**
* Unit tests for Node related functions in SCM.
*/
package org.apache.hadoop.ozone.scm.node;

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.pipeline;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Test cases to verify the metrics exposed by SCMPipelineManager via MXBean.
*/
public class TestPipelineManagerMXBean {
private MiniOzoneCluster cluster;
private static MBeanServer mbs;
@Before
public void init()
throws IOException, TimeoutException, InterruptedException {
OzoneConfiguration conf = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf).build();
cluster.waitForClusterToBeReady();
mbs = ManagementFactory.getPlatformMBeanServer();
}
/**
* Verifies SCMPipelineManagerInfo metrics.
*
* @throws Exception
*/
@Test
public void testPipelineInfo() throws Exception {
ObjectName bean = new ObjectName(
"Hadoop:service=SCMPipelineManager,name=SCMPipelineManagerInfo");
TabularData data = (TabularData) mbs.getAttribute(bean, "PipelineInfo");
Map<String, Integer> datanodeInfo = cluster.getStorageContainerManager()
.getPipelineManager().getPipelineInfo();
verifyEquals(data, datanodeInfo);
}
private void verifyEquals(TabularData actualData, Map<String, Integer>
expectedData) {
if (actualData == null || expectedData == null) {
fail("Data should not be null.");
}
for (Object obj : actualData.values()) {
assertTrue(obj instanceof CompositeData);
CompositeData cds = (CompositeData) obj;
assertEquals(2, cds.values().size());
Iterator<?> it = cds.values().iterator();
String key = it.next().toString();
String value = it.next().toString();
long num = Long.parseLong(value);
assertTrue(expectedData.containsKey(key));
assertEquals(expectedData.remove(key).longValue(), num);
}
assertTrue(expectedData.isEmpty());
}
@After
public void teardown() {
cluster.shutdown();
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.pipeline;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineMetrics;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Optional;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
/**
* Test cases to verify the metrics exposed by SCMPipelineManager.
*/
public class TestSCMPipelineMetrics {
private MiniOzoneCluster cluster;
@Before
public void setup() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
.build();
cluster.waitForClusterToBeReady();
}
/**
* Verifies pipeline creation metric.
*/
@Test
public void testPipelineCreation() {
MetricsRecordBuilder metrics = getMetrics(
SCMPipelineMetrics.class.getSimpleName());
long numPipelineCreated = getLongCounter("NumPipelineCreated", metrics);
// Pipelines are created in background when the cluster starts.
Assert.assertTrue(numPipelineCreated > 0);
}
/**
* Verifies pipeline destroy metric.
*/
@Test
public void testPipelineDestroy() {
PipelineManager pipelineManager = cluster
.getStorageContainerManager().getPipelineManager();
Optional<Pipeline> pipeline = pipelineManager
.getPipelines().stream().findFirst();
Assert.assertTrue(pipeline.isPresent());
pipeline.ifPresent(pipeline1 -> {
try {
cluster.getStorageContainerManager()
.getClientProtocolServer().closePipeline(
pipeline.get().getId().getProtobuf());
} catch (IOException e) {
e.printStackTrace();
Assert.fail();
}
});
MetricsRecordBuilder metrics = getMetrics(
SCMPipelineMetrics.class.getSimpleName());
assertCounter("NumPipelineDestroyed", 1L, metrics);
}
@After
public void teardown() {
cluster.shutdown();
}
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* <p>
* Utility classes to encode/decode DTO objects to/from byte array.
*/
/**
* Unit tests for Pipeline related functions in SCM.
*/
package org.apache.hadoop.ozone.scm.pipeline;