diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java index 26b8b95b04..d06ea2a3b3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java @@ -23,6 +23,7 @@ .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.util.Time; +import java.util.Collections; import java.util.List; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -38,7 +39,6 @@ public class DatanodeInfo extends DatanodeDetails { private volatile long lastHeartbeatTime; private long lastStatsUpdatedTime; - // If required we can dissect StorageReportProto and store the raw data private List storageReports; /** @@ -48,8 +48,9 @@ public class DatanodeInfo extends DatanodeDetails { */ public DatanodeInfo(DatanodeDetails datanodeDetails) { super(datanodeDetails); - lock = new ReentrantReadWriteLock(); - lastHeartbeatTime = Time.monotonicNow(); + this.lock = new ReentrantReadWriteLock(); + this.lastHeartbeatTime = Time.monotonicNow(); + this.storageReports = Collections.emptyList(); } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index 8e713998b0..a75a51a60f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -58,7 +58,6 @@ public DeadNodeHandler(NodeManager nodeManager, @Override public void onMessage(DatanodeDetails datanodeDetails, EventPublisher publisher) { - nodeManager.processDeadNode(datanodeDetails.getUuid()); // TODO: check if there are any pipeline on this node and fire close // pipeline event diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index d8865a861b..6b8d477c03 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -93,8 +93,7 @@ public interface NodeManager extends StorageContainerNodeProtocol, * Return a map of node stats. * @return a map of individual node stats (live/stale but not dead). */ - // TODO: try to change the return type to Map - Map getNodeStats(); + Map getNodeStats(); /** * Return the node stat of the specified datanode. @@ -159,17 +158,11 @@ Set getContainers(DatanodeDetails datanodeDetails) /** * Process node report. * - * @param dnUuid + * @param datanodeDetails * @param nodeReport */ - void processNodeReport(DatanodeDetails dnUuid, NodeReportProto nodeReport); - - /** - * Process a dead node event in this Node Manager. - * - * @param dnUuid datanode uuid. - */ - void processDeadNode(UUID dnUuid); + void processNodeReport(DatanodeDetails datanodeDetails, + NodeReportProto nodeReport); /** * Get list of SCMCommands in the Command Queue for a particular Datanode. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java index 5ff9dfa33e..c54944bc35 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; -import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.states.*; import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; @@ -275,20 +274,6 @@ public DatanodeInfo getNode(DatanodeDetails datanodeDetails) return nodeStateMap.getNodeInfo(datanodeDetails.getUuid()); } - /** - * Get information about the node. - * - * @param datanodeUUID datanode UUID - * - * @return DatanodeInfo - * - * @throws NodeNotFoundException if the node is not present - */ - public DatanodeInfo getNode(UUID datanodeUUID) - throws NodeNotFoundException { - return nodeStateMap.getNodeInfo(datanodeUUID); - } - /** * Updates the last heartbeat time of the node. * @@ -319,7 +304,7 @@ public NodeState getNodeState(DatanodeDetails datanodeDetails) * * @return list of healthy nodes */ - public List getHealthyNodes() { + public List getHealthyNodes() { return getNodes(NodeState.HEALTHY); } @@ -328,7 +313,7 @@ public List getHealthyNodes() { * * @return list of stale nodes */ - public List getStaleNodes() { + public List getStaleNodes() { return getNodes(NodeState.STALE); } @@ -337,7 +322,7 @@ public List getStaleNodes() { * * @return list of dead nodes */ - public List getDeadNodes() { + public List getDeadNodes() { return getNodes(NodeState.DEAD); } @@ -348,12 +333,12 @@ public List getDeadNodes() { * * @return list of nodes */ - public List getNodes(NodeState state) { - List nodes = new ArrayList<>(); + public List getNodes(NodeState state) { + List nodes = new ArrayList<>(); nodeStateMap.getNodes(state).forEach( uuid -> { try { - nodes.add(nodeStateMap.getNodeDetails(uuid)); + nodes.add(nodeStateMap.getNodeInfo(uuid)); } catch (NodeNotFoundException e) { // This should not happen unless someone else other than // NodeStateManager is directly modifying NodeStateMap and removed @@ -369,12 +354,12 @@ public List getNodes(NodeState state) { * * @return all the managed nodes */ - public List getAllNodes() { - List nodes = new ArrayList<>(); + public List getAllNodes() { + List nodes = new ArrayList<>(); nodeStateMap.getAllNodes().forEach( uuid -> { try { - nodes.add(nodeStateMap.getNodeDetails(uuid)); + nodes.add(nodeStateMap.getNodeInfo(uuid)); } catch (NodeNotFoundException e) { // This should not happen unless someone else other than // NodeStateManager is directly modifying NodeStateMap and removed @@ -441,38 +426,6 @@ public int getTotalNodeCount() { return nodeStateMap.getTotalNodeCount(); } - /** - * Returns the current stats of the node. - * - * @param uuid node id - * - * @return SCMNodeStat - * - * @throws NodeNotFoundException if the node is not present - */ - public SCMNodeStat getNodeStat(UUID uuid) throws NodeNotFoundException { - return nodeStateMap.getNodeStat(uuid); - } - - /** - * Returns a unmodifiable copy of nodeStats. - * @return map with node stats. - */ - public Map getNodeStatsMap() { - return nodeStateMap.getNodeStats(); - } - - /** - * Set the stat for the node. - * - * @param uuid node id. - * - * @param newstat new stat that will set to the specify node. - */ - public void setNodeStat(UUID uuid, SCMNodeStat newstat) { - nodeStateMap.setNodeStat(uuid, newstat); - } - /** * Removes a pipeline from the node2PipelineMap. * @param pipeline - Pipeline to be removed diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 61ddb5bae2..3c5eaf8695 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -19,7 +19,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -65,6 +66,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; /** * Maintains information about the Datanodes on SCM side. @@ -80,18 +82,13 @@ * get functions in this file as a snap-shot of information that is inconsistent * as soon as you read it. */ -public class SCMNodeManager - implements NodeManager, StorageContainerNodeProtocol { +public class SCMNodeManager implements NodeManager { @VisibleForTesting static final Logger LOG = LoggerFactory.getLogger(SCMNodeManager.class); private final NodeStateManager nodeStateManager; - // Should we maintain aggregated stats? If this is not frequently used, we - // can always calculate it from nodeStats whenever required. - // Aggregated node stats - private SCMNodeStat scmStat; private final String clusterID; private final VersionInfo version; private final CommandQueue commandQueue; @@ -108,7 +105,6 @@ public SCMNodeManager(OzoneConfiguration conf, String clusterID, StorageContainerManager scmManager, EventPublisher eventPublisher) throws IOException { this.nodeStateManager = new NodeStateManager(conf, eventPublisher); - this.scmStat = new SCMNodeStat(); this.clusterID = clusterID; this.version = VersionInfo.getLatestVersion(); this.commandQueue = new CommandQueue(); @@ -131,7 +127,7 @@ private void unregisterMXBean() { /** - * Gets all datanodes that are in a certain state. This function works by + * Returns all datanode that are in the given state. This function works by * taking a snapshot of the current collection and then returning the list * from that collection. This means that real map might have changed by the * time we return this list. @@ -140,7 +136,8 @@ private void unregisterMXBean() { */ @Override public List getNodes(NodeState nodestate) { - return nodeStateManager.getNodes(nodestate); + return nodeStateManager.getNodes(nodestate).stream() + .map(node -> (DatanodeDetails)node).collect(Collectors.toList()); } /** @@ -150,13 +147,14 @@ public List getNodes(NodeState nodestate) { */ @Override public List getAllNodes() { - return nodeStateManager.getAllNodes(); + return nodeStateManager.getAllNodes().stream() + .map(node -> (DatanodeDetails)node).collect(Collectors.toList()); } /** * Returns the Number of Datanodes by State they are in. * - * @return int -- count + * @return count */ @Override public int getNodeCount(NodeState nodestate) { @@ -166,7 +164,7 @@ public int getNodeCount(NodeState nodestate) { /** * Returns the node state of a specific node. * - * @param datanodeDetails - Datanode Details + * @param datanodeDetails Datanode Details * @return Healthy/Stale/Dead/Unknown. */ @Override @@ -179,47 +177,6 @@ public NodeState getNodeState(DatanodeDetails datanodeDetails) { } } - - private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) { - SCMNodeStat stat; - try { - stat = nodeStateManager.getNodeStat(dnId); - - // Updating the storage report for the datanode. - // I dont think we will get NotFound exception, as we are taking - // nodeInfo from nodeStateMap, as I see it is not being removed from - // the map, just we change the states. And during first time - // registration we call this, after adding to nodeStateMap. And also - // from eventhandler it is called only if it has node Report. - DatanodeInfo datanodeInfo = nodeStateManager.getNode(dnId); - if (nodeReport != null) { - datanodeInfo.updateStorageReports(nodeReport.getStorageReportList()); - } - - } catch (NodeNotFoundException e) { - LOG.debug("SCM updateNodeStat based on heartbeat from previous " + - "dead datanode {}", dnId); - stat = new SCMNodeStat(); - } - - if (nodeReport != null && nodeReport.getStorageReportCount() > 0) { - long totalCapacity = 0; - long totalRemaining = 0; - long totalScmUsed = 0; - List storageReports = nodeReport - .getStorageReportList(); - for (StorageReportProto report : storageReports) { - totalCapacity += report.getCapacity(); - totalRemaining += report.getRemaining(); - totalScmUsed+= report.getScmUsed(); - } - scmStat.subtract(stat); - stat.set(totalCapacity, totalScmUsed, totalRemaining); - scmStat.add(stat); - } - nodeStateManager.setNodeStat(dnId, stat); - } - /** * Closes this stream and releases any system resources associated with it. If * the stream is already closed then invoking this method has no effect. @@ -275,7 +232,7 @@ public RegisteredCommand register( try { nodeStateManager.addNode(datanodeDetails); // Updating Node Report, as registration is successful - updateNodeStat(datanodeDetails.getUuid(), nodeReport); + processNodeReport(datanodeDetails, nodeReport); LOG.info("Registered Data node : {}", datanodeDetails); } catch (NodeAlreadyExistsException e) { LOG.trace("Datanode is already registered. Datanode: {}", @@ -321,13 +278,21 @@ public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) { /** * Process node report. * - * @param dnUuid + * @param datanodeDetails * @param nodeReport */ @Override - public void processNodeReport(DatanodeDetails dnUuid, + public void processNodeReport(DatanodeDetails datanodeDetails, NodeReportProto nodeReport) { - this.updateNodeStat(dnUuid.getUuid(), nodeReport); + try { + DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails); + if (nodeReport != null) { + datanodeInfo.updateStorageReports(nodeReport.getStorageReportList()); + } + } catch (NodeNotFoundException e) { + LOG.warn("Got node report from unregistered datanode {}", + datanodeDetails); + } } /** @@ -336,7 +301,16 @@ public void processNodeReport(DatanodeDetails dnUuid, */ @Override public SCMNodeStat getStats() { - return new SCMNodeStat(this.scmStat); + long capacity = 0L; + long used = 0L; + long remaining = 0L; + + for (SCMNodeStat stat : getNodeStats().values()) { + capacity += stat.getCapacity().get(); + used += stat.getScmUsed().get(); + remaining += stat.getRemaining().get(); + } + return new SCMNodeStat(capacity, used, remaining); } /** @@ -344,8 +318,24 @@ public SCMNodeStat getStats() { * @return a map of individual node stats (live/stale but not dead). */ @Override - public Map getNodeStats() { - return nodeStateManager.getNodeStatsMap(); + public Map getNodeStats() { + + final Map nodeStats = new HashMap<>(); + + final List healthyNodes = nodeStateManager + .getNodes(NodeState.HEALTHY); + final List staleNodes = nodeStateManager + .getNodes(NodeState.STALE); + final List datanodes = new ArrayList<>(healthyNodes); + datanodes.addAll(staleNodes); + + for (DatanodeInfo dnInfo : datanodes) { + SCMNodeStat nodeStat = getNodeStatInternal(dnInfo); + if (nodeStat != null) { + nodeStats.put(dnInfo, nodeStat); + } + } + return nodeStats; } /** @@ -356,11 +346,28 @@ public Map getNodeStats() { */ @Override public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) { + final SCMNodeStat nodeStat = getNodeStatInternal(datanodeDetails); + return nodeStat != null ? new SCMNodeMetric(nodeStat) : null; + } + + private SCMNodeStat getNodeStatInternal(DatanodeDetails datanodeDetails) { try { - return new SCMNodeMetric( - nodeStateManager.getNodeStat(datanodeDetails.getUuid())); + long capacity = 0L; + long used = 0L; + long remaining = 0L; + + final DatanodeInfo datanodeInfo = nodeStateManager + .getNode(datanodeDetails); + final List storageReportProtos = datanodeInfo + .getStorageReports(); + for (StorageReportProto reportProto : storageReportProtos) { + capacity += reportProto.getCapacity(); + used += reportProto.getScmUsed(); + remaining += reportProto.getRemaining(); + } + return new SCMNodeStat(capacity, used, remaining); } catch (NodeNotFoundException e) { - LOG.info("SCM getNodeStat from a decommissioned or removed datanode {}", + LOG.warn("Cannot generate NodeStat, datanode {} not found.", datanodeDetails.getUuid()); return null; } @@ -375,6 +382,8 @@ public Map getNodeCount() { return nodeCountMap; } + // We should introduce DISK, SSD, etc., notion in + // SCMNodeStat and try to use it. @Override public Map getNodeInfo() { long diskCapacity = 0L; @@ -385,14 +394,15 @@ public Map getNodeInfo() { long ssdUsed = 0L; long ssdRemaining = 0L; - List healthyNodes = getNodes(NodeState.HEALTHY); - List staleNodes = getNodes(NodeState.STALE); + List healthyNodes = nodeStateManager + .getNodes(NodeState.HEALTHY); + List staleNodes = nodeStateManager + .getNodes(NodeState.STALE); - List datanodes = new ArrayList<>(healthyNodes); + List datanodes = new ArrayList<>(healthyNodes); datanodes.addAll(staleNodes); - for (DatanodeDetails datanodeDetails : datanodes) { - DatanodeInfo dnInfo = (DatanodeInfo) datanodeDetails; + for (DatanodeInfo dnInfo : datanodes) { List storageReportProtos = dnInfo.getStorageReports(); for (StorageReportProto reportProto : storageReportProtos) { if (reportProto.getStorageType() == @@ -498,27 +508,6 @@ public void onMessage(CommandForDatanode commandForDatanode, commandForDatanode.getCommand()); } - /** - * Update the node stats and cluster storage stats in this SCM Node Manager. - * - * @param dnUuid datanode uuid. - */ - @Override - // TODO: This should be removed. - public void processDeadNode(UUID dnUuid) { - try { - SCMNodeStat stat = nodeStateManager.getNodeStat(dnUuid); - if (stat != null) { - LOG.trace("Update stat values as Datanode {} is dead.", dnUuid); - scmStat.subtract(stat); - stat.set(0, 0, 0); - } - } catch (NodeNotFoundException e) { - LOG.warn("Can't update stats based on message of dead Datanode {}, it" - + " doesn't exist or decommissioned already.", dnUuid); - } - } - @Override public List getCommandQueue(UUID dnID) { return commandQueue.getCommand(dnID); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java index a68e2b52ec..fd87ccae68 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java @@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import java.util.*; @@ -45,10 +44,6 @@ public class NodeStateMap { * Represents the current state of node. */ private final ConcurrentHashMap> stateMap; - /** - * Represents the current stats of node. - */ - private final ConcurrentHashMap nodeStats; /** * Node to set of containers on the node. */ @@ -63,7 +58,6 @@ public NodeStateMap() { lock = new ReentrantReadWriteLock(); nodeMap = new ConcurrentHashMap<>(); stateMap = new ConcurrentHashMap<>(); - nodeStats = new ConcurrentHashMap<>(); nodeToContainer = new ConcurrentHashMap<>(); initStateMap(); } @@ -94,7 +88,6 @@ public void addNode(DatanodeDetails datanodeDetails, NodeState nodeState) throw new NodeAlreadyExistsException("Node UUID: " + id); } nodeMap.put(id, new DatanodeInfo(datanodeDetails)); - nodeStats.put(id, new SCMNodeStat()); nodeToContainer.put(id, Collections.emptySet()); stateMap.get(nodeState).add(id); } finally { @@ -126,20 +119,6 @@ public void updateNodeState(UUID nodeId, NodeState currentState, } } - /** - * Returns DatanodeDetails for the given node id. - * - * @param uuid Node Id - * - * @return DatanodeDetails of the node - * - * @throws NodeNotFoundException if the node is not present - */ - public DatanodeDetails getNodeDetails(UUID uuid) - throws NodeNotFoundException { - return getNodeInfo(uuid); - } - /** * Returns DatanodeInfo for the given node id. * @@ -245,43 +224,6 @@ public NodeState getNodeState(UUID uuid) throws NodeNotFoundException { } } - /** - * Returns the current stats of the node. - * - * @param uuid node id - * - * @return SCMNodeStat of the specify node. - * - * @throws NodeNotFoundException if the node is not found - */ - public SCMNodeStat getNodeStat(UUID uuid) throws NodeNotFoundException { - SCMNodeStat stat = nodeStats.get(uuid); - if (stat == null) { - throw new NodeNotFoundException("Node UUID: " + uuid); - } - return stat; - } - - /** - * Returns a unmodifiable copy of nodeStats. - * - * @return map with node stats. - */ - public Map getNodeStats() { - return Collections.unmodifiableMap(nodeStats); - } - - /** - * Set the current stats of the node. - * - * @param uuid node id - * - * @param newstat stat that will set to the specify node. - */ - public void setNodeStat(UUID uuid, SCMNodeStat newstat) { - nodeStats.put(uuid, newstat); - } - public void setContainers(UUID uuid, Set containers) throws NodeNotFoundException{ if (!nodeToContainer.containsKey(uuid)) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 33fa1fac87..ffdea0e1ac 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -76,7 +76,7 @@ public class MockNodeManager implements NodeManager { private final List healthyNodes; private final List staleNodes; private final List deadNodes; - private final Map nodeMetricMap; + private final Map nodeMetricMap; private final SCMNodeStat aggregateStat; private boolean chillmode; private final Map> commandMap; @@ -114,7 +114,7 @@ private void populateNodeMetric(DatanodeDetails datanodeDetails, int x) { newStat.set( (NODES[x % NODES.length].capacity), (NODES[x % NODES.length].used), remaining); - this.nodeMetricMap.put(datanodeDetails.getUuid(), newStat); + this.nodeMetricMap.put(datanodeDetails, newStat); aggregateStat.add(newStat); if (NODES[x % NODES.length].getCurrentState() == NodeData.HEALTHY) { @@ -201,7 +201,7 @@ public SCMNodeStat getStats() { * @return a list of individual node stats (live/stale but not dead). */ @Override - public Map getNodeStats() { + public Map getNodeStats() { return nodeMetricMap; } @@ -213,7 +213,7 @@ public Map getNodeStats() { */ @Override public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) { - SCMNodeStat stat = nodeMetricMap.get(datanodeDetails.getUuid()); + SCMNodeStat stat = nodeMetricMap.get(datanodeDetails); if (stat == null) { return null; } @@ -413,12 +413,12 @@ public Map getNodeInfo() { * @param size number of bytes. */ public void addContainer(DatanodeDetails datanodeDetails, long size) { - SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid()); + SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails); if (stat != null) { aggregateStat.subtract(stat); stat.getCapacity().add(size); aggregateStat.add(stat); - nodeMetricMap.put(datanodeDetails.getUuid(), stat); + nodeMetricMap.put(datanodeDetails, stat); } } @@ -429,12 +429,12 @@ public void addContainer(DatanodeDetails datanodeDetails, long size) { * @param size number of bytes. */ public void delContainer(DatanodeDetails datanodeDetails, long size) { - SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid()); + SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails); if (stat != null) { aggregateStat.subtract(stat); stat.getCapacity().subtract(size); aggregateStat.add(stat); - nodeMetricMap.put(datanodeDetails.getUuid(), stat); + nodeMetricMap.put(datanodeDetails, stat); } } @@ -445,21 +445,6 @@ public void onMessage(CommandForDatanode commandForDatanode, commandForDatanode.getCommand()); } - /** - * Remove the node stats and update the storage stats - * in this Node Manager. - * - * @param dnUuid UUID of the datanode. - */ - @Override - public void processDeadNode(UUID dnUuid) { - SCMNodeStat stat = this.nodeMetricMap.get(dnUuid); - if (stat != null) { - aggregateStat.subtract(stat); - stat.set(0, 0, 0); - } - } - @Override public List getCommandQueue(UUID dnID) { return null; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 8dbf389b6c..37b28ebb81 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; @@ -81,6 +82,10 @@ public void setup() throws IOException, AuthenticationException { storageDir = GenericTestUtils.getTempPath( TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID()); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir); + conf.set(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, "100ms"); + conf.set(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, "50ms"); + conf.set(ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL, "1s"); + conf.set(ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL, "2s"); eventQueue = new EventQueue(); scm = HddsTestUtils.getScm(conf); nodeManager = (SCMNodeManager) scm.getScmNodeManager(); @@ -237,20 +242,21 @@ public void testStatisticsUpdate() throws Exception { Assert.assertTrue(nodeStat.get().getRemaining().get() == 90); Assert.assertTrue(nodeStat.get().getScmUsed().get() == 10); - //WHEN datanode1 is dead. - eventQueue.fireEvent(SCMEvents.DEAD_NODE, datanode1); - Thread.sleep(100); + //TODO: Support logic to mark a node as dead in NodeManager. + nodeManager.processHeartbeat(datanode2); + Thread.sleep(1000); + nodeManager.processHeartbeat(datanode2); + Thread.sleep(1000); + nodeManager.processHeartbeat(datanode2); + Thread.sleep(1000); + nodeManager.processHeartbeat(datanode2); //THEN statistics in SCM should changed. stat = nodeManager.getStats(); - Assert.assertTrue(stat.getCapacity().get() == 200); - Assert.assertTrue(stat.getRemaining().get() == 180); - Assert.assertTrue(stat.getScmUsed().get() == 20); - - nodeStat = nodeManager.getNodeStat(datanode1); - Assert.assertTrue(nodeStat.get().getCapacity().get() == 0); - Assert.assertTrue(nodeStat.get().getRemaining().get() == 0); - Assert.assertTrue(nodeStat.get().getScmUsed().get() == 0); + Assert.assertEquals(200L, stat.getCapacity().get().longValue()); + Assert.assertEquals(180L, + stat.getRemaining().get().longValue()); + Assert.assertEquals(20L, stat.getScmUsed().get().longValue()); } @Test diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java index fa163eb23a..1cb9bcdc96 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java @@ -63,8 +63,7 @@ public void testNodeReport() throws IOException { SCMNodeMetric nodeMetric = nodeManager.getNodeStat(dn); Assert.assertNull(nodeMetric); - nodeReportHandler.onMessage( - getNodeReport(dn, storageOne), this); + nodeManager.register(dn, getNodeReport(dn, storageOne).getReport(), null); nodeMetric = nodeManager.getNodeStat(dn); Assert.assertTrue(nodeMetric.get().getCapacity().get() == 100); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 3f31708ced..35cc1aa901 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -126,7 +126,7 @@ public SCMNodeStat getStats() { * @return a map of individual node stats (live/stale but not dead). */ @Override - public Map getNodeStats() { + public Map getNodeStats() { return null; } @@ -305,15 +305,6 @@ public void onMessage(CommandForDatanode commandForDatanode, // do nothing. } - /** - * Empty implementation for processDeadNode. - * @param dnUuid - */ - @Override - public void processDeadNode(UUID dnUuid) { - // do nothing. - } - @Override public List getCommandQueue(UUID dnID) { return null;