HDDS-448. Move NodeStat to NodeStatemanager from SCMNodeManager. Contributed by LiXin Ge.
This commit is contained in:
parent
d1c1dde309
commit
3c798c1e3c
@ -150,8 +150,8 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
|
||||
private boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
|
||||
long sizeRequired) {
|
||||
SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanodeDetails);
|
||||
return (nodeMetric != null) && nodeMetric.get().getRemaining()
|
||||
.hasResources(sizeRequired);
|
||||
return (nodeMetric != null) && (nodeMetric.get() != null)
|
||||
&& nodeMetric.get().getRemaining().hasResources(sizeRequired);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -55,6 +55,7 @@ public class SCMNodeMetric implements DatanodeMetric<SCMNodeStat, Long> {
|
||||
*/
|
||||
@Override
|
||||
public boolean isGreater(SCMNodeStat o) {
|
||||
Preconditions.checkNotNull(this.stat, "Argument cannot be null");
|
||||
Preconditions.checkNotNull(o, "Argument cannot be null");
|
||||
|
||||
// if zero, replace with 1 for the division to work.
|
||||
|
@ -121,7 +121,8 @@ public interface NodeManager extends StorageContainerNodeProtocol,
|
||||
/**
|
||||
* Return the node stat of the specified datanode.
|
||||
* @param datanodeDetails DatanodeDetails.
|
||||
* @return node stat if it is live/stale, null if it is dead or does't exist.
|
||||
* @return node stat if it is live/stale, null if it is decommissioned or
|
||||
* doesn't exist.
|
||||
*/
|
||||
SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails);
|
||||
|
||||
|
@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
|
||||
import org.apache.hadoop.hdds.scm.HddsServerUtil;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
|
||||
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
|
||||
@ -39,13 +40,7 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Predicate;
|
||||
@ -416,6 +411,51 @@ public class NodeStateManager implements Runnable, Closeable {
|
||||
nodeStateMap.removeNode(datanodeDetails.getUuid());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current stats of the node.
|
||||
*
|
||||
* @param uuid node id
|
||||
*
|
||||
* @return SCMNodeStat
|
||||
*
|
||||
* @throws NodeNotFoundException if the node is not present
|
||||
*/
|
||||
public SCMNodeStat getNodeStat(UUID uuid) throws NodeNotFoundException {
|
||||
return nodeStateMap.getNodeStat(uuid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a unmodifiable copy of nodeStats.
|
||||
* @return map with node stats.
|
||||
*/
|
||||
public Map<UUID, SCMNodeStat> getNodeStatsMap() {
|
||||
return nodeStateMap.getNodeStats();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the stat for the node.
|
||||
*
|
||||
* @param uuid node id.
|
||||
*
|
||||
* @param newstat new stat that will set to the specify node.
|
||||
*/
|
||||
public void setNodeStat(UUID uuid, SCMNodeStat newstat) {
|
||||
nodeStateMap.setNodeStat(uuid, newstat);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the current stats of the specify node.
|
||||
*
|
||||
* @param uuid node id
|
||||
*
|
||||
* @return SCMNodeStat the stat removed from the node.
|
||||
*
|
||||
* @throws NodeNotFoundException if the node is not present.
|
||||
*/
|
||||
public SCMNodeStat removeNodeStat(UUID uuid) throws NodeNotFoundException {
|
||||
return nodeStateMap.removeNodeStat(uuid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Move Stale or Dead node to healthy if we got a heartbeat from them.
|
||||
* Move healthy nodes to stale nodes if it is needed.
|
||||
|
@ -56,12 +56,10 @@ import org.slf4j.LoggerFactory;
|
||||
import javax.management.ObjectName;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
@ -85,11 +83,7 @@ public class SCMNodeManager
|
||||
static final Logger LOG =
|
||||
LoggerFactory.getLogger(SCMNodeManager.class);
|
||||
|
||||
|
||||
private final NodeStateManager nodeStateManager;
|
||||
// Individual live node stats
|
||||
// TODO: NodeStat should be moved to NodeStatemanager (NodeStateMap)
|
||||
private final ConcurrentHashMap<UUID, SCMNodeStat> nodeStats;
|
||||
// Should we maintain aggregated stats? If this is not frequently used, we
|
||||
// can always calculate it from nodeStats whenever required.
|
||||
// Aggregated node stats
|
||||
@ -124,7 +118,6 @@ public class SCMNodeManager
|
||||
StorageContainerManager scmManager, EventPublisher eventPublisher)
|
||||
throws IOException {
|
||||
this.nodeStateManager = new NodeStateManager(conf, eventPublisher);
|
||||
this.nodeStats = new ConcurrentHashMap<>();
|
||||
this.scmStat = new SCMNodeStat();
|
||||
this.clusterID = clusterID;
|
||||
this.version = VersionInfo.getLatestVersion();
|
||||
@ -297,8 +290,10 @@ public class SCMNodeManager
|
||||
|
||||
|
||||
private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) {
|
||||
SCMNodeStat stat = nodeStats.get(dnId);
|
||||
if (stat == null) {
|
||||
SCMNodeStat stat;
|
||||
try {
|
||||
stat = nodeStateManager.getNodeStat(dnId);
|
||||
} catch (NodeNotFoundException e) {
|
||||
LOG.debug("SCM updateNodeStat based on heartbeat from previous" +
|
||||
"dead datanode {}", dnId);
|
||||
stat = new SCMNodeStat();
|
||||
@ -317,9 +312,9 @@ public class SCMNodeManager
|
||||
}
|
||||
scmStat.subtract(stat);
|
||||
stat.set(totalCapacity, totalScmUsed, totalRemaining);
|
||||
nodeStats.put(dnId, stat);
|
||||
scmStat.add(stat);
|
||||
}
|
||||
nodeStateManager.setNodeStat(dnId, stat);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -377,7 +372,7 @@ public class SCMNodeManager
|
||||
UUID dnId = datanodeDetails.getUuid();
|
||||
try {
|
||||
nodeStateManager.addNode(datanodeDetails);
|
||||
nodeStats.put(dnId, new SCMNodeStat());
|
||||
nodeStateManager.setNodeStat(dnId, new SCMNodeStat());
|
||||
if(inStartupChillMode.get() &&
|
||||
nodeStateManager.getTotalNodeCount() >= getMinimumChillModeNodes()) {
|
||||
inStartupChillMode.getAndSet(false);
|
||||
@ -446,17 +441,25 @@ public class SCMNodeManager
|
||||
*/
|
||||
@Override
|
||||
public Map<UUID, SCMNodeStat> getNodeStats() {
|
||||
return Collections.unmodifiableMap(nodeStats);
|
||||
return nodeStateManager.getNodeStatsMap();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the node stat of the specified datanode.
|
||||
* @param datanodeDetails - datanode ID.
|
||||
* @return node stat if it is live/stale, null if it is dead or does't exist.
|
||||
* @return node stat if it is live/stale, null if it is decommissioned or
|
||||
* doesn't exist.
|
||||
*/
|
||||
@Override
|
||||
public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) {
|
||||
return new SCMNodeMetric(nodeStats.get(datanodeDetails.getUuid()));
|
||||
try {
|
||||
return new SCMNodeMetric(
|
||||
nodeStateManager.getNodeStat(datanodeDetails.getUuid()));
|
||||
} catch (NodeNotFoundException e) {
|
||||
LOG.info("SCM getNodeStat from a decommissioned or removed datanode {}",
|
||||
datanodeDetails.getUuid());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -492,18 +495,22 @@ public class SCMNodeManager
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the node stats and update the storage stats
|
||||
* in this SCM Node Manager.
|
||||
* Update the node stats and cluster storage stats in this SCM Node Manager.
|
||||
*
|
||||
* @param dnUuid datanode uuid.
|
||||
*/
|
||||
@Override
|
||||
public void processDeadNode(UUID dnUuid) {
|
||||
SCMNodeStat stat = nodeStats.get(dnUuid);
|
||||
LOG.trace("Update stat values as Datanode {} is dead.", dnUuid);
|
||||
if (stat != null) {
|
||||
scmStat.subtract(stat);
|
||||
stat.set(0, 0, 0);
|
||||
try {
|
||||
SCMNodeStat stat = nodeStateManager.getNodeStat(dnUuid);
|
||||
if (stat != null) {
|
||||
LOG.trace("Update stat values as Datanode {} is dead.", dnUuid);
|
||||
scmStat.subtract(stat);
|
||||
stat.set(0, 0, 0);
|
||||
}
|
||||
} catch (NodeNotFoundException e) {
|
||||
LOG.warn("Can't update stats based on message of dead Datanode {}, it"
|
||||
+ " doesn't exist or decommissioned already.", dnUuid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -20,14 +20,10 @@ package org.apache.hadoop.hdds.scm.node.states;
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
|
||||
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
@ -48,6 +44,11 @@ public class NodeStateMap {
|
||||
* Represents the current state of node.
|
||||
*/
|
||||
private final ConcurrentHashMap<NodeState, Set<UUID>> stateMap;
|
||||
/**
|
||||
* Represents the current stats of node.
|
||||
*/
|
||||
private final ConcurrentHashMap<UUID, SCMNodeStat> nodeStats;
|
||||
|
||||
private final ReadWriteLock lock;
|
||||
|
||||
/**
|
||||
@ -57,6 +58,7 @@ public class NodeStateMap {
|
||||
lock = new ReentrantReadWriteLock();
|
||||
nodeMap = new ConcurrentHashMap<>();
|
||||
stateMap = new ConcurrentHashMap<>();
|
||||
nodeStats = new ConcurrentHashMap<>();
|
||||
initStateMap();
|
||||
}
|
||||
|
||||
@ -259,6 +261,60 @@ public class NodeStateMap {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current stats of the node.
|
||||
*
|
||||
* @param uuid node id
|
||||
*
|
||||
* @return SCMNodeStat of the specify node.
|
||||
*
|
||||
* @throws NodeNotFoundException if the node is not found
|
||||
*/
|
||||
public SCMNodeStat getNodeStat(UUID uuid) throws NodeNotFoundException {
|
||||
SCMNodeStat stat = nodeStats.get(uuid);
|
||||
if (stat == null) {
|
||||
throw new NodeNotFoundException("Node UUID: " + uuid);
|
||||
}
|
||||
return stat;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a unmodifiable copy of nodeStats.
|
||||
*
|
||||
* @return map with node stats.
|
||||
*/
|
||||
public Map<UUID, SCMNodeStat> getNodeStats() {
|
||||
return Collections.unmodifiableMap(nodeStats);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the current stats of the node.
|
||||
*
|
||||
* @param uuid node id
|
||||
*
|
||||
* @param newstat stat that will set to the specify node.
|
||||
*/
|
||||
public void setNodeStat(UUID uuid, SCMNodeStat newstat) {
|
||||
nodeStats.put(uuid, newstat);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the current stats of the specify node.
|
||||
*
|
||||
* @param uuid node id
|
||||
*
|
||||
* @return SCMNodeStat the stat removed from the node.
|
||||
*
|
||||
* @throws NodeNotFoundException if the node is not found
|
||||
*/
|
||||
public SCMNodeStat removeNodeStat(UUID uuid) throws NodeNotFoundException {
|
||||
SCMNodeStat stat = nodeStats.remove(uuid);
|
||||
if (stat == null) {
|
||||
throw new NodeNotFoundException("Node UUID: " + uuid);
|
||||
}
|
||||
return stat;
|
||||
}
|
||||
|
||||
/**
|
||||
* Since we don't hold a global lock while constructing this string,
|
||||
* the result might be inconsistent. If someone has changed the state of node
|
||||
|
@ -266,11 +266,16 @@ public class MockNodeManager implements NodeManager {
|
||||
/**
|
||||
* Return the node stat of the specified datanode.
|
||||
* @param datanodeDetails - datanode details.
|
||||
* @return node stat if it is live/stale, null if it is dead or does't exist.
|
||||
* @return node stat if it is live/stale, null if it is decommissioned or
|
||||
* doesn't exist.
|
||||
*/
|
||||
@Override
|
||||
public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) {
|
||||
return new SCMNodeMetric(nodeMetricMap.get(datanodeDetails.getUuid()));
|
||||
SCMNodeStat stat = nodeMetricMap.get(datanodeDetails.getUuid());
|
||||
if (stat == null) {
|
||||
return null;
|
||||
}
|
||||
return new SCMNodeMetric(stat);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -57,9 +57,12 @@ public class TestNodeReportHandler implements EventPublisher {
|
||||
StorageReportProto storageOne = TestUtils
|
||||
.createStorageReport(dn.getUuid(), storagePath, 100, 10, 90, null);
|
||||
|
||||
SCMNodeMetric nodeMetric = nodeManager.getNodeStat(dn);
|
||||
Assert.assertNull(nodeMetric);
|
||||
|
||||
nodeReportHandler.onMessage(
|
||||
getNodeReport(dn, storageOne), this);
|
||||
SCMNodeMetric nodeMetric = nodeManager.getNodeStat(dn);
|
||||
nodeMetric = nodeManager.getNodeStat(dn);
|
||||
|
||||
Assert.assertTrue(nodeMetric.get().getCapacity().get() == 100);
|
||||
Assert.assertTrue(nodeMetric.get().getRemaining().get() == 90);
|
||||
|
@ -196,7 +196,8 @@ public class ReplicationNodeManagerMock implements NodeManager {
|
||||
* Return the node stat of the specified datanode.
|
||||
*
|
||||
* @param dd - datanode details.
|
||||
* @return node stat if it is live/stale, null if it is dead or does't exist.
|
||||
* @return node stat if it is live/stale, null if it is decommissioned or
|
||||
* doesn't exist.
|
||||
*/
|
||||
@Override
|
||||
public SCMNodeMetric getNodeStat(DatanodeDetails dd) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user