HDDS-817. Create SCM metrics for disk from node report. Contributed by Bharat Viswanadham.
This commit is contained in:
parent
1a00b4e325
commit
d0cc679441
@ -35,4 +35,11 @@ public interface NodeManagerMXBean {
|
||||
* @return A state to number of nodes that in this state mapping
|
||||
*/
|
||||
Map<String, Integer> getNodeCount();
|
||||
|
||||
/**
|
||||
* Get the disk metrics like capacity, usage and remaining based on the
|
||||
* storage type.
|
||||
*/
|
||||
Map<String, Long> getNodeInfo();
|
||||
|
||||
}
|
||||
|
@ -273,6 +273,20 @@ public DatanodeInfo getNode(DatanodeDetails datanodeDetails)
|
||||
return nodeStateMap.getNodeInfo(datanodeDetails.getUuid());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get information about the node.
|
||||
*
|
||||
* @param datanodeUUID datanode UUID
|
||||
*
|
||||
* @return DatanodeInfo
|
||||
*
|
||||
* @throws NodeNotFoundException if the node is not present
|
||||
*/
|
||||
public DatanodeInfo getNode(UUID datanodeUUID)
|
||||
throws NodeNotFoundException {
|
||||
return nodeStateMap.getNodeInfo(datanodeUUID);
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the last heartbeat time of the node.
|
||||
*
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
||||
@ -58,6 +59,7 @@
|
||||
import javax.management.ObjectName;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -182,6 +184,16 @@ private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) {
|
||||
SCMNodeStat stat;
|
||||
try {
|
||||
stat = nodeStateManager.getNodeStat(dnId);
|
||||
|
||||
// Updating the storage report for the datanode.
|
||||
// I dont think we will get NotFound exception, as we are taking
|
||||
// nodeInfo from nodeStateMap, as I see it is not being removed from
|
||||
// the map, just we change the states. And during first time
|
||||
// registration we call this, after adding to nodeStateMap. And also
|
||||
// from eventhandler it is called only if it has node Report.
|
||||
DatanodeInfo datanodeInfo = nodeStateManager.getNode(dnId);
|
||||
datanodeInfo.updateStorageReports(nodeReport.getStorageReportList());
|
||||
|
||||
} catch (NodeNotFoundException e) {
|
||||
LOG.debug("SCM updateNodeStat based on heartbeat from previous " +
|
||||
"dead datanode {}", dnId);
|
||||
@ -361,6 +373,52 @@ public Map<String, Integer> getNodeCount() {
|
||||
return nodeCountMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Long> getNodeInfo() {
|
||||
long diskCapacity = 0L;
|
||||
long diskUsed = 0L;
|
||||
long diskRemaning = 0L;
|
||||
|
||||
long ssdCapacity = 0L;
|
||||
long ssdUsed = 0L;
|
||||
long ssdRemaining = 0L;
|
||||
|
||||
List<DatanodeDetails> healthyNodes = getNodes(NodeState.HEALTHY);
|
||||
List<DatanodeDetails> staleNodes = getNodes(NodeState.STALE);
|
||||
|
||||
List<DatanodeDetails> datanodes = new ArrayList<>(healthyNodes);
|
||||
datanodes.addAll(staleNodes);
|
||||
|
||||
for (DatanodeDetails datanodeDetails : datanodes) {
|
||||
DatanodeInfo dnInfo = (DatanodeInfo) datanodeDetails;
|
||||
List<StorageReportProto> storageReportProtos = dnInfo.getStorageReports();
|
||||
for (StorageReportProto reportProto : storageReportProtos) {
|
||||
if (reportProto.getStorageType() ==
|
||||
StorageContainerDatanodeProtocolProtos.StorageTypeProto.DISK) {
|
||||
diskCapacity += reportProto.getCapacity();
|
||||
diskRemaning += reportProto.getRemaining();
|
||||
diskUsed += reportProto.getScmUsed();
|
||||
} else if (reportProto.getStorageType() ==
|
||||
StorageContainerDatanodeProtocolProtos.StorageTypeProto.SSD) {
|
||||
ssdCapacity += reportProto.getCapacity();
|
||||
ssdRemaining += reportProto.getRemaining();
|
||||
ssdUsed += reportProto.getScmUsed();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, Long> nodeInfo = new HashMap<>();
|
||||
nodeInfo.put("DISKCapacity", diskCapacity);
|
||||
nodeInfo.put("DISKUsed", diskUsed);
|
||||
nodeInfo.put("DISKRemaining", diskRemaning);
|
||||
|
||||
nodeInfo.put("SSDCapacity", ssdCapacity);
|
||||
nodeInfo.put("SSDUsed", ssdUsed);
|
||||
nodeInfo.put("SSDRemaining", ssdRemaining);
|
||||
return nodeInfo;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get set of pipelines a datanode is part of.
|
||||
* @param datanodeDetails - datanodeID
|
||||
@ -463,4 +521,6 @@ public void processDeadNode(UUID dnUuid) {
|
||||
public List<SCMCommand> getCommandQueue(UUID dnID) {
|
||||
return commandQueue.getCommand(dnID);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -396,6 +396,15 @@ public Map<String, Integer> getNodeCount() {
|
||||
return nodeCountMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Long> getNodeInfo() {
|
||||
Map<String, Long> nodeInfo = new HashMap<>();
|
||||
nodeInfo.put("Capacity", aggregateStat.getCapacity().get());
|
||||
nodeInfo.put("Used", aggregateStat.getScmUsed().get());
|
||||
nodeInfo.put("Remaining", aggregateStat.getRemaining().get());
|
||||
return nodeInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes it easy to add a container.
|
||||
*
|
||||
|
@ -73,6 +73,11 @@ public Map<String, Integer> getNodeCount() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Long> getNodeInfo() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets all Live Datanodes that is currently communicating with SCM.
|
||||
*
|
||||
|
@ -0,0 +1,112 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.scm;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.management.openmbean.TabularData;
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
* Class which tests the SCMNodeManagerInfo Bean.
|
||||
*/
|
||||
public class TestSCMNodeManagerMXBean {
|
||||
public static final Log LOG = LogFactory.getLog(TestSCMMXBean.class);
|
||||
private static int numOfDatanodes = 3;
|
||||
private static MiniOzoneCluster cluster;
|
||||
private static OzoneConfiguration conf;
|
||||
private static StorageContainerManager scm;
|
||||
private static MBeanServer mbs;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws IOException, TimeoutException,
|
||||
InterruptedException {
|
||||
conf = new OzoneConfiguration();
|
||||
conf.set(OZONE_SCM_STALENODE_INTERVAL, "60000ms");
|
||||
cluster = MiniOzoneCluster.newBuilder(conf)
|
||||
.setNumDatanodes(numOfDatanodes)
|
||||
.build();
|
||||
cluster.waitForClusterToBeReady();
|
||||
scm = cluster.getStorageContainerManager();
|
||||
mbs = ManagementFactory.getPlatformMBeanServer();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDiskUsage() throws Exception {
|
||||
ObjectName bean = new ObjectName(
|
||||
"Hadoop:service=SCMNodeManager,"
|
||||
+ "name=SCMNodeManagerInfo");
|
||||
|
||||
TabularData data = (TabularData) mbs.getAttribute(bean, "NodeInfo");
|
||||
Map<String, Long> datanodeInfo = scm.getScmNodeManager().getNodeInfo();
|
||||
verifyEquals(data, datanodeInfo);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeCount() throws Exception {
|
||||
ObjectName bean = new ObjectName(
|
||||
"Hadoop:service=SCMNodeManager,"
|
||||
+ "name=SCMNodeManagerInfo");
|
||||
|
||||
TabularData data = (TabularData) mbs.getAttribute(bean, "NodeCount");
|
||||
Map<String, Integer> nodeCount = scm.getScmNodeManager().getNodeCount();
|
||||
Map<String, Long> nodeCountLong = new HashMap<>();
|
||||
nodeCount.forEach((k, v) -> nodeCountLong.put(k, new Long(v)));
|
||||
verifyEquals(data, nodeCountLong);
|
||||
}
|
||||
|
||||
private void verifyEquals(TabularData actualData, Map<String, Long>
|
||||
expectedData) {
|
||||
if (actualData == null || expectedData == null) {
|
||||
fail("Data should not be null.");
|
||||
}
|
||||
for (Object obj : actualData.values()) {
|
||||
assertTrue(obj instanceof CompositeData);
|
||||
CompositeData cds = (CompositeData) obj;
|
||||
assertEquals(2, cds.values().size());
|
||||
Iterator<?> it = cds.values().iterator();
|
||||
String key = it.next().toString();
|
||||
String value = it.next().toString();
|
||||
long num = Long.parseLong(value);
|
||||
assertTrue(expectedData.containsKey(key));
|
||||
assertEquals(expectedData.remove(key).longValue(), num);
|
||||
}
|
||||
assertTrue(expectedData.isEmpty());
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user