HDDS-561. Move Node2ContainerMap and Node2PipelineMap to NodeManager. Contributed by Lokesh Jain.

This commit is contained in:
Nanda kumar 2018-10-02 19:47:15 +05:30
parent 81072d5e3d
commit a39296260f
18 changed files with 445 additions and 82 deletions

View File

@ -29,7 +29,7 @@
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.ReportResult;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
@ -48,7 +48,7 @@ public class ContainerReportHandler implements
private static final Logger LOG =
LoggerFactory.getLogger(ContainerReportHandler.class);
private final Node2ContainerMap node2ContainerMap;
private final NodeManager nodeManager;
private final Mapping containerMapping;
@ -57,14 +57,14 @@ public class ContainerReportHandler implements
private ReplicationActivityStatus replicationStatus;
public ContainerReportHandler(Mapping containerMapping,
Node2ContainerMap node2ContainerMap,
NodeManager nodeManager,
ReplicationActivityStatus replicationActivityStatus) {
Preconditions.checkNotNull(containerMapping);
Preconditions.checkNotNull(node2ContainerMap);
Preconditions.checkNotNull(nodeManager);
Preconditions.checkNotNull(replicationActivityStatus);
this.containerStateManager = containerMapping.getStateManager();
this.nodeManager = nodeManager;
this.containerMapping = containerMapping;
this.node2ContainerMap = node2ContainerMap;
this.replicationStatus = replicationActivityStatus;
}
@ -89,11 +89,11 @@ public void onMessage(ContainerReportFromDatanode containerReportFromDatanode,
.map(ContainerID::new)
.collect(Collectors.toSet());
ReportResult<ContainerID> reportResult = node2ContainerMap
.processReport(datanodeOrigin.getUuid(), containerIds);
ReportResult<ContainerID> reportResult = nodeManager
.processContainerReport(datanodeOrigin.getUuid(), containerIds);
//we have the report, so we can update the states for the next iteration.
node2ContainerMap
nodeManager
.setContainersForDatanode(datanodeOrigin.getUuid(), containerIds);
for (ContainerID containerID : reportResult.getMissingEntries()) {

View File

@ -26,7 +26,6 @@
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@ -38,8 +37,6 @@
*/
public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
private final Node2ContainerMap node2ContainerMap;
private final ContainerStateManager containerStateManager;
private final NodeManager nodeManager;
@ -47,10 +44,8 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
private static final Logger LOG =
LoggerFactory.getLogger(DeadNodeHandler.class);
public DeadNodeHandler(
Node2ContainerMap node2ContainerMap,
ContainerStateManager containerStateManager, NodeManager nodeManager) {
this.node2ContainerMap = node2ContainerMap;
public DeadNodeHandler(NodeManager nodeManager,
ContainerStateManager containerStateManager) {
this.containerStateManager = containerStateManager;
this.nodeManager = nodeManager;
}
@ -61,7 +56,7 @@ public void onMessage(DatanodeDetails datanodeDetails,
nodeManager.processDeadNode(datanodeDetails.getUuid());
Set<ContainerID> containers =
node2ContainerMap.getContainers(datanodeDetails.getUuid());
nodeManager.getContainers(datanodeDetails.getUuid());
if (containers == null) {
LOG.info("There's no containers in dead datanode {}, no replica will be"
+ " removed from the in-memory state.", datanodeDetails.getUuid());

View File

@ -20,7 +20,6 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@ -31,17 +30,17 @@
*/
public class NewNodeHandler implements EventHandler<DatanodeDetails> {
private final Node2ContainerMap node2ContainerMap;
private final NodeManager nodeManager;
public NewNodeHandler(Node2ContainerMap node2ContainerMap) {
this.node2ContainerMap = node2ContainerMap;
public NewNodeHandler(NodeManager nodeManager) {
this.nodeManager = nodeManager;
}
@Override
public void onMessage(DatanodeDetails datanodeDetails,
EventPublisher publisher) {
try {
node2ContainerMap.insertNewDatanode(datanodeDetails.getUuid(),
nodeManager.addDatanodeInContainerMap(datanodeDetails.getUuid(),
Collections.emptySet());
} catch (SCMException e) {
// TODO: log exception message.

View File

@ -18,11 +18,16 @@
package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.node.states.ReportResult;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@ -31,6 +36,7 @@
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
/**
@ -133,6 +139,61 @@ public interface NodeManager extends StorageContainerNodeProtocol,
*/
NodeState getNodeState(DatanodeDetails datanodeDetails);
/**
* Get set of pipelines a datanode is part of.
* @param dnId - datanodeID
* @return Set of PipelineID
*/
Set<PipelineID> getPipelineByDnID(UUID dnId);
/**
* Add pipeline information in the NodeManager.
* @param pipeline - Pipeline to be added
*/
void addPipeline(Pipeline pipeline);
/**
* Remove a pipeline information from the NodeManager.
* @param pipeline - Pipeline to be removed
*/
void removePipeline(Pipeline pipeline);
/**
* Update set of containers available on a datanode.
* @param uuid - DatanodeID
* @param containerIds - Set of containerIDs
* @throws SCMException - if datanode is not known. For new datanode use
* addDatanodeInContainerMap call.
*/
void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds)
throws SCMException;
/**
* Process containerReport received from datanode.
* @param uuid - DataonodeID
* @param containerIds - Set of containerIDs
* @return The result after processing containerReport
*/
ReportResult<ContainerID> processContainerReport(UUID uuid,
Set<ContainerID> containerIds);
/**
* Return set of containerIDs available on a datanode.
* @param uuid - DatanodeID
* @return - set of containerIDs
*/
Set<ContainerID> getContainers(UUID uuid);
/**
* Insert a new datanode with set of containerIDs for containers available
* on it.
* @param uuid - DatanodeID
* @param containerIDs - Set of ContainerIDs
* @throws SCMException - if datanode already exists
*/
void addDatanodeInContainerMap(UUID uuid, Set<ContainerID> containerIDs)
throws SCMException;
/**
* Add a {@link SCMCommand} to the command queue, which are
* handled by HB thread asynchronously.

View File

@ -24,11 +24,14 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.node.states.NodeStateMap;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.*;
import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.common.statemachine
@ -86,6 +89,15 @@ private enum NodeLifeCycleEvent {
* This is the map which maintains the current state of all datanodes.
*/
private final NodeStateMap nodeStateMap;
/**
* Maintains the mapping from node to pipelines a node is part of.
*/
private final Node2PipelineMap node2PipelineMap;
/**
* Maintains the map from node to ContainerIDs for the containers
* available on the node.
*/
private final Node2ContainerMap node2ContainerMap;
/**
* Used for publishing node state change events.
*/
@ -118,6 +130,8 @@ private enum NodeLifeCycleEvent {
*/
public NodeStateManager(Configuration conf, EventPublisher eventPublisher) {
this.nodeStateMap = new NodeStateMap();
this.node2PipelineMap = new Node2PipelineMap();
this.node2ContainerMap = new Node2ContainerMap();
this.eventPublisher = eventPublisher;
this.state2EventMap = new HashMap<>();
initialiseState2EventMap();
@ -242,6 +256,14 @@ public void addNode(DatanodeDetails datanodeDetails)
eventPublisher.fireEvent(SCMEvents.NEW_NODE, datanodeDetails);
}
/**
* Adds a pipeline in the node2PipelineMap.
* @param pipeline - Pipeline to be added
*/
public void addPipeline(Pipeline pipeline) {
node2PipelineMap.addPipeline(pipeline);
}
/**
* Get information about the node.
*
@ -352,6 +374,15 @@ public List<DatanodeDetails> getAllNodes() {
return nodes;
}
/**
* Gets set of pipelineID a datanode belongs to.
* @param dnId - Datanode ID
* @return Set of PipelineID
*/
public Set<PipelineID> getPipelineByDnID(UUID dnId) {
return node2PipelineMap.getPipelines(dnId);
}
/**
* Returns the count of healthy nodes.
*
@ -456,6 +487,57 @@ public SCMNodeStat removeNodeStat(UUID uuid) throws NodeNotFoundException {
return nodeStateMap.removeNodeStat(uuid);
}
/**
* Removes a pipeline from the node2PipelineMap.
* @param pipeline - Pipeline to be removed
*/
public void removePipeline(Pipeline pipeline) {
node2PipelineMap.removePipeline(pipeline);
}
/**
* Update set of containers available on a datanode.
* @param uuid - DatanodeID
* @param containerIds - Set of containerIDs
* @throws SCMException - if datanode is not known. For new datanode use
* addDatanodeInContainerMap call.
*/
public void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds)
throws SCMException {
node2ContainerMap.setContainersForDatanode(uuid, containerIds);
}
/**
* Process containerReport received from datanode.
* @param uuid - DataonodeID
* @param containerIds - Set of containerIDs
* @return The result after processing containerReport
*/
public ReportResult<ContainerID> processContainerReport(UUID uuid,
Set<ContainerID> containerIds) {
return node2ContainerMap.processReport(uuid, containerIds);
}
/**
* Return set of containerIDs available on a datanode.
* @param uuid - DatanodeID
* @return - set of containerIDs
*/
public Set<ContainerID> getContainers(UUID uuid) {
return node2ContainerMap.getContainers(uuid);
}
/**
* Insert a new datanode with set of containerIDs for containers available
* on it.
* @param uuid - DatanodeID
* @param containerIDs - Set of ContainerIDs
* @throws SCMException - if datanode already exists
*/
public void addDatanodeInContainerMap(UUID uuid,
Set<ContainerID> containerIDs) throws SCMException {
node2ContainerMap.insertNewDatanode(uuid, containerIDs);
}
/**
* Move Stale or Dead node to healthy if we got a heartbeat from them.
* Move healthy nodes to stale nodes if it is needed.

View File

@ -21,8 +21,13 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.node.states.ReportResult;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.VersionInfo;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@ -59,6 +64,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@ -471,6 +477,83 @@ public Map<String, Integer> getNodeCount() {
return nodeCountMap;
}
/**
* Get set of pipelines a datanode is part of.
* @param dnId - datanodeID
* @return Set of PipelineID
*/
@Override
public Set<PipelineID> getPipelineByDnID(UUID dnId) {
return nodeStateManager.getPipelineByDnID(dnId);
}
/**
* Add pipeline information in the NodeManager.
* @param pipeline - Pipeline to be added
*/
@Override
public void addPipeline(Pipeline pipeline) {
nodeStateManager.addPipeline(pipeline);
}
/**
* Remove a pipeline information from the NodeManager.
* @param pipeline - Pipeline to be removed
*/
@Override
public void removePipeline(Pipeline pipeline) {
nodeStateManager.removePipeline(pipeline);
}
/**
* Update set of containers available on a datanode.
* @param uuid - DatanodeID
* @param containerIds - Set of containerIDs
* @throws SCMException - if datanode is not known. For new datanode use
* addDatanodeInContainerMap call.
*/
@Override
public void setContainersForDatanode(UUID uuid,
Set<ContainerID> containerIds) throws SCMException {
nodeStateManager.setContainersForDatanode(uuid, containerIds);
}
/**
* Process containerReport received from datanode.
* @param uuid - DataonodeID
* @param containerIds - Set of containerIDs
* @return The result after processing containerReport
*/
@Override
public ReportResult<ContainerID> processContainerReport(UUID uuid,
Set<ContainerID> containerIds) {
return nodeStateManager.processContainerReport(uuid, containerIds);
}
/**
* Return set of containerIDs available on a datanode.
* @param uuid - DatanodeID
* @return - set of containerIDs
*/
@Override
public Set<ContainerID> getContainers(UUID uuid) {
return nodeStateManager.getContainers(uuid);
}
/**
* Insert a new datanode with set of containerIDs for containers available
* on it.
* @param uuid - DatanodeID
* @param containerIDs - Set of ContainerIDs
* @throws SCMException - if datanode already exists
*/
@Override
public void addDatanodeInContainerMap(UUID uuid,
Set<ContainerID> containerIDs) throws SCMException {
nodeStateManager.addDatanodeInContainerMap(uuid, containerIDs);
}
// TODO:
// Since datanode commands are added through event queue, onMessage method
// should take care of adding commands to command queue.

View File

@ -147,7 +147,7 @@ private void unregisterMXBean() {
* @param datanodeID - UUID of DN.
* @param report - set of Storage Reports for the Datanode.
* @throws SCMException - if we don't know about this datanode, for new DN
* use insertNewDatanode.
* use addDatanodeInContainerMap.
*/
public void updateDatanodeMap(UUID datanodeID,
Set<StorageLocationReport> report) throws SCMException {

View File

@ -19,25 +19,18 @@
package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Handles Stale node event.
*/
public class StaleNodeHandler implements EventHandler<DatanodeDetails> {
static final Logger LOG = LoggerFactory.getLogger(StaleNodeHandler.class);
private final Node2ContainerMap node2ContainerMap;
private final PipelineSelector pipelineSelector;
public StaleNodeHandler(Node2ContainerMap node2ContainerMap,
PipelineSelector pipelineSelector) {
this.node2ContainerMap = node2ContainerMap;
public StaleNodeHandler(PipelineSelector pipelineSelector) {
this.pipelineSelector = pipelineSelector;
}

View File

@ -70,7 +70,7 @@ public void insertNewDatanode(UUID datanodeID, Set<ContainerID> containerIDs)
* @param datanodeID - UUID of DN.
* @param containers - Set of Containers tht is present on DN.
* @throws SCMException - if we don't know about this datanode, for new DN
* use insertNewDatanode.
* use addDatanodeInContainerMap.
*/
public void setContainersForDatanode(UUID datanodeID,
Set<ContainerID> containers) throws SCMException {

View File

@ -33,7 +33,6 @@
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -61,7 +60,6 @@
import java.util.Set;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
@ -85,7 +83,7 @@ public class PipelineSelector {
private final long containerSize;
private final MetadataStore pipelineStore;
private final PipelineStateManager stateManager;
private final Node2PipelineMap node2PipelineMap;
private final NodeManager nodeManager;
private final Map<PipelineID, HashSet<ContainerID>> pipeline2ContainerMap;
private final Map<PipelineID, Pipeline> pipelineMap;
private final LeaseManager<Pipeline> pipelineLeaseManager;
@ -105,7 +103,6 @@ public PipelineSelector(NodeManager nodeManager, Configuration conf,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
StorageUnit.BYTES);
node2PipelineMap = new Node2PipelineMap();
pipelineMap = new ConcurrentHashMap<>();
pipelineManagerMap = new HashMap<>();
@ -124,6 +121,7 @@ public PipelineSelector(NodeManager nodeManager, Configuration conf,
pipelineLeaseManager.start();
stateManager = new PipelineStateManager();
this.nodeManager = nodeManager;
pipeline2ContainerMap = new HashMap<>();
// Write the container name to pipeline mapping.
@ -361,10 +359,6 @@ private void closeContainersByPipeline(Pipeline pipeline) {
}
}
public Set<PipelineID> getPipelineByDnID(UUID dnId) {
return node2PipelineMap.getPipelines(dnId);
}
private void addExistingPipeline(Pipeline pipeline) throws IOException {
LifeCycleState state = pipeline.getLifeCycleState();
switch (state) {
@ -379,7 +373,7 @@ private void addExistingPipeline(Pipeline pipeline) throws IOException {
// when all the nodes have reported.
pipelineMap.put(pipeline.getId(), pipeline);
pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>());
node2PipelineMap.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
// reset the datanodes in the pipeline
// they will be reset on
pipeline.resetPipeline();
@ -393,7 +387,7 @@ private void addExistingPipeline(Pipeline pipeline) throws IOException {
}
public void handleStaleNode(DatanodeDetails dn) {
Set<PipelineID> pipelineIDs = getPipelineByDnID(dn.getUuid());
Set<PipelineID> pipelineIDs = nodeManager.getPipelineByDnID(dn.getUuid());
for (PipelineID id : pipelineIDs) {
LOG.info("closing pipeline {}.", id);
eventPublisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id);
@ -436,7 +430,7 @@ public void updatePipelineState(Pipeline pipeline,
case CREATE:
pipelineMap.put(pipeline.getId(), pipeline);
pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>());
node2PipelineMap.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
// Acquire lease on pipeline
Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline);
// Register callback to be executed in case of timeout
@ -459,7 +453,7 @@ public void updatePipelineState(Pipeline pipeline,
case TIMEOUT:
closePipeline(pipeline);
pipeline2ContainerMap.remove(pipeline.getId());
node2PipelineMap.removePipeline(pipeline);
nodeManager.removePipeline(pipeline);
pipelineMap.remove(pipeline.getId());
break;
default:

View File

@ -62,7 +62,6 @@
import org.apache.hadoop.hdds.scm.node.NodeReportHandler;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineCloseHandler;
import org.apache.hadoop.hdds.scm.pipelines.PipelineActionEventHandler;
import org.apache.hadoop.hdds.scm.pipelines.PipelineReportHandler;
@ -212,8 +211,6 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
scmBlockManager = new BlockManagerImpl(
conf, getScmNodeManager(), scmContainerManager, eventQueue);
Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
replicationStatus = new ReplicationActivityStatus();
CloseContainerEventHandler closeContainerHandler =
@ -226,18 +223,17 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
CommandStatusReportHandler cmdStatusReportHandler =
new CommandStatusReportHandler();
NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap);
NewNodeHandler newNodeHandler = new NewNodeHandler(scmNodeManager);
StaleNodeHandler staleNodeHandler =
new StaleNodeHandler(node2ContainerMap,
scmContainerManager.getPipelineSelector());
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap,
getScmContainerManager().getStateManager(), scmNodeManager);
new StaleNodeHandler(scmContainerManager.getPipelineSelector());
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
getScmContainerManager().getStateManager());
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
PendingDeleteHandler pendingDeleteHandler =
new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
ContainerReportHandler containerReportHandler =
new ContainerReportHandler(scmContainerManager, node2ContainerMap,
new ContainerReportHandler(scmContainerManager, scmNodeManager,
replicationStatus);
scmChillModeManager = new SCMChillModeManager(conf,
getScmContainerManager().getStateManager().getAllContainers(),

View File

@ -19,9 +19,14 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@ -29,6 +34,7 @@
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.hdds.scm.node.states.ReportResult;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.VersionResponse;
@ -42,6 +48,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
@ -73,12 +80,16 @@ public class MockNodeManager implements NodeManager {
private final SCMNodeStat aggregateStat;
private boolean chillmode;
private final Map<UUID, List<SCMCommand>> commandMap;
private final Node2PipelineMap node2PipelineMap;
private final Node2ContainerMap node2ContainerMap;
public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
this.healthyNodes = new LinkedList<>();
this.staleNodes = new LinkedList<>();
this.deadNodes = new LinkedList<>();
this.nodeMetricMap = new HashMap<>();
this.node2PipelineMap = new Node2PipelineMap();
this.node2ContainerMap = new Node2ContainerMap();
aggregateStat = new SCMNodeStat();
if (initializeFakeNodes) {
for (int x = 0; x < nodeCount; x++) {
@ -289,6 +300,34 @@ public HddsProtos.NodeState getNodeState(DatanodeDetails dd) {
return null;
}
/**
* Get set of pipelines a datanode is part of.
* @param dnId - datanodeID
* @return Set of PipelineID
*/
@Override
public Set<PipelineID> getPipelineByDnID(UUID dnId) {
return node2PipelineMap.getPipelines(dnId);
}
/**
* Add pipeline information in the NodeManager.
* @param pipeline - Pipeline to be added
*/
@Override
public void addPipeline(Pipeline pipeline) {
node2PipelineMap.addPipeline(pipeline);
}
/**
* Remove a pipeline information from the NodeManager.
* @param pipeline - Pipeline to be removed
*/
@Override
public void removePipeline(Pipeline pipeline) {
node2PipelineMap.removePipeline(pipeline);
}
@Override
public void addDatanodeCommand(UUID dnId, SCMCommand command) {
if(commandMap.containsKey(dnId)) {
@ -313,6 +352,54 @@ public void processNodeReport(UUID dnUuid, NodeReportProto nodeReport) {
// do nothing
}
/**
* Update set of containers available on a datanode.
* @param uuid - DatanodeID
* @param containerIds - Set of containerIDs
* @throws SCMException - if datanode is not known. For new datanode use
* addDatanodeInContainerMap call.
*/
@Override
public void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds)
throws SCMException {
node2ContainerMap.setContainersForDatanode(uuid, containerIds);
}
/**
* Process containerReport received from datanode.
* @param uuid - DataonodeID
* @param containerIds - Set of containerIDs
* @return The result after processing containerReport
*/
@Override
public ReportResult<ContainerID> processContainerReport(UUID uuid,
Set<ContainerID> containerIds) {
return node2ContainerMap.processReport(uuid, containerIds);
}
/**
* Return set of containerIDs available on a datanode.
* @param uuid - DatanodeID
* @return - set of containerIDs
*/
@Override
public Set<ContainerID> getContainers(UUID uuid) {
return node2ContainerMap.getContainers(uuid);
}
/**
* Insert a new datanode with set of containerIDs for containers available
* on it.
* @param uuid - DatanodeID
* @param containerIDs - Set of ContainerIDs
* @throws SCMException - if datanode already exists
*/
@Override
public void addDatanodeInContainerMap(UUID uuid,
Set<ContainerID> containerIDs) throws SCMException {
node2ContainerMap.insertNewDatanode(uuid, containerIDs);
}
// Returns the number of commands that is queued to this node manager.
public int getCommandCount(DatanodeDetails dd) {
List<SCMCommand> list = commandMap.get(dd.getUuid());

View File

@ -37,7 +37,7 @@
import org.apache.hadoop.hdds.scm.container.replication
.ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerReportFromDatanode;
@ -60,6 +60,7 @@
public class TestContainerReportHandler implements EventPublisher {
private List<Object> publishedEvents = new ArrayList<>();
private final NodeManager nodeManager = new MockNodeManager(true, 1);
private static final Logger LOG =
LoggerFactory.getLogger(TestContainerReportHandler.class);
@ -73,7 +74,6 @@ public void resetEventCollector() {
public void test() throws IOException {
//GIVEN
OzoneConfiguration conf = new OzoneConfiguration();
Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
Mapping mapping = Mockito.mock(Mapping.class);
PipelineSelector selector = Mockito.mock(PipelineSelector.class);
@ -96,17 +96,17 @@ public void test() throws IOException {
new ReplicationActivityStatus();
ContainerReportHandler reportHandler =
new ContainerReportHandler(mapping, node2ContainerMap,
new ContainerReportHandler(mapping, nodeManager,
replicationActivityStatus);
DatanodeDetails dn1 = TestUtils.randomDatanodeDetails();
DatanodeDetails dn2 = TestUtils.randomDatanodeDetails();
DatanodeDetails dn3 = TestUtils.randomDatanodeDetails();
DatanodeDetails dn4 = TestUtils.randomDatanodeDetails();
node2ContainerMap.insertNewDatanode(dn1.getUuid(), new HashSet<>());
node2ContainerMap.insertNewDatanode(dn2.getUuid(), new HashSet<>());
node2ContainerMap.insertNewDatanode(dn3.getUuid(), new HashSet<>());
node2ContainerMap.insertNewDatanode(dn4.getUuid(), new HashSet<>());
nodeManager.addDatanodeInContainerMap(dn1.getUuid(), new HashSet<>());
nodeManager.addDatanodeInContainerMap(dn2.getUuid(), new HashSet<>());
nodeManager.addDatanodeInContainerMap(dn3.getUuid(), new HashSet<>());
nodeManager.addDatanodeInContainerMap(dn4.getUuid(), new HashSet<>());
PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class);
Pipeline pipeline = new Pipeline("leader", LifeCycleState.CLOSED,

View File

@ -39,7 +39,6 @@
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@ -60,7 +59,6 @@ public class TestDeadNodeHandler {
private List<ReplicationRequest> sentEvents = new ArrayList<>();
private SCMNodeManager nodeManager;
private Node2ContainerMap node2ContainerMap;
private ContainerStateManager containerStateManager;
private NodeReportHandler nodeReportHandler;
private DeadNodeHandler deadNodeHandler;
@ -70,14 +68,13 @@ public class TestDeadNodeHandler {
@Before
public void setup() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
node2ContainerMap = new Node2ContainerMap();
containerStateManager = new ContainerStateManager(conf,
Mockito.mock(Mapping.class),
Mockito.mock(PipelineSelector.class));
eventQueue = new EventQueue();
nodeManager = new SCMNodeManager(conf, "cluster1", null, eventQueue);
deadNodeHandler = new DeadNodeHandler(node2ContainerMap,
containerStateManager, nodeManager);
deadNodeHandler = new DeadNodeHandler(nodeManager,
containerStateManager);
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
publisher = Mockito.mock(EventPublisher.class);
nodeReportHandler = new NodeReportHandler(nodeManager);
@ -96,8 +93,8 @@ public void testOnMessage() throws IOException {
ContainerInfo container3 =
TestUtils.allocateContainer(containerStateManager);
registerReplicas(node2ContainerMap, datanode1, container1, container2);
registerReplicas(node2ContainerMap, datanode2, container1, container3);
registerReplicas(datanode1, container1, container2);
registerReplicas(datanode2, container1, container3);
registerReplicas(containerStateManager, container1, datanode1, datanode2);
registerReplicas(containerStateManager, container2, datanode1);
@ -105,13 +102,8 @@ public void testOnMessage() throws IOException {
TestUtils.closeContainer(containerStateManager, container1);
//WHEN datanode1 is dead
deadNodeHandler.onMessage(datanode1, publisher);
//THEN
//node2ContainerMap has not been changed
Assert.assertEquals(2, node2ContainerMap.size());
Set<DatanodeDetails> container1Replicas =
containerStateManager.getContainerStateMap()
.getContainerReplicas(new ContainerID(container1.getContainerID()));
@ -168,7 +160,7 @@ public void testStatisticsUpdate() throws Exception {
ContainerInfo container1 =
TestUtils.allocateContainer(containerStateManager);
registerReplicas(node2ContainerMap, datanode1, container1);
registerReplicas(datanode1, container1);
SCMNodeStat stat = nodeManager.getStats();
Assert.assertTrue(stat.getCapacity().get() == 300);
@ -211,7 +203,7 @@ public void testOnMessageReplicaFailure() throws Exception {
ContainerInfo container1 =
TestUtils.allocateContainer(containerStateManager);
registerReplicas(node2ContainerMap, dn1, container1);
registerReplicas(dn1, container1);
deadNodeHandler.onMessage(dn1, eventQueue);
Assert.assertTrue(logCapturer.getOutput().contains(
@ -226,12 +218,11 @@ private void registerReplicas(ContainerStateManager csm,
datanodes);
}
private void registerReplicas(Node2ContainerMap node2ConMap,
DatanodeDetails datanode,
private void registerReplicas(DatanodeDetails datanode,
ContainerInfo... containers)
throws SCMException {
node2ConMap
.insertNewDatanode(datanode.getUuid(),
nodeManager
.addDatanodeInContainerMap(datanode.getUuid(),
Arrays.stream(containers)
.map(container -> new ContainerID(container.getContainerID()))
.collect(Collectors.toSet()));

View File

@ -19,8 +19,12 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.CommandQueue;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@ -30,6 +34,7 @@
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.hdds.scm.node.states.ReportResult;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@ -39,6 +44,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
/**
@ -216,6 +222,82 @@ public NodeState getNodeState(DatanodeDetails dd) {
return nodeStateMap.get(dd);
}
/**
* Get set of pipelines a datanode is part of.
* @param dnId - datanodeID
* @return Set of PipelineID
*/
@Override
public Set<PipelineID> getPipelineByDnID(UUID dnId) {
throw new UnsupportedOperationException("Not yet implemented");
}
/**
* Add pipeline information in the NodeManager.
* @param pipeline - Pipeline to be added
*/
@Override
public void addPipeline(Pipeline pipeline) {
throw new UnsupportedOperationException("Not yet implemented");
}
/**
* Remove a pipeline information from the NodeManager.
* @param pipeline - Pipeline to be removed
*/
@Override
public void removePipeline(Pipeline pipeline) {
throw new UnsupportedOperationException("Not yet implemented");
}
/**
* Update set of containers available on a datanode.
* @param uuid - DatanodeID
* @param containerIds - Set of containerIDs
* @throws SCMException - if datanode is not known. For new datanode use
* addDatanodeInContainerMap call.
*/
@Override
public void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds)
throws SCMException {
throw new UnsupportedOperationException("Not yet implemented");
}
/**
* Process containerReport received from datanode.
* @param uuid - DataonodeID
* @param containerIds - Set of containerIDs
* @return The result after processing containerReport
*/
@Override
public ReportResult<ContainerID> processContainerReport(UUID uuid,
Set<ContainerID> containerIds) {
throw new UnsupportedOperationException("Not yet implemented");
}
/**
* Return set of containerIDs available on a datanode.
* @param uuid - DatanodeID
* @return - set of containerIDs
*/
@Override
public Set<ContainerID> getContainers(UUID uuid) {
throw new UnsupportedOperationException("Not yet implemented");
}
/**
* Insert a new datanode with set of containerIDs for containers available
* on it.
* @param uuid - DatanodeID
* @param containerIDs - Set of ContainerIDs
* @throws SCMException - if datanode already exists
*/
@Override
public void addDatanodeInContainerMap(UUID uuid,
Set<ContainerID> containerIDs) throws SCMException {
throw new UnsupportedOperationException("Not yet implemented");
}
/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this

View File

@ -97,7 +97,7 @@ public void testPipelineMap() throws IOException {
Assert.assertEquals(3, dns.size());
// get pipeline details by dnid
Set<PipelineID> pipelines = mapping.getPipelineSelector()
Set<PipelineID> pipelines = scm.getScmNodeManager()
.getPipelineByDnID(dns.get(0).getUuid());
Assert.assertEquals(1, pipelines.size());
pipelines.forEach(p -> Assert.assertEquals(p,

View File

@ -119,7 +119,7 @@ public void testPipelineCloseWithClosedContainer() throws IOException {
HddsProtos.LifeCycleState.CLOSED);
for (DatanodeDetails dn : ratisContainer1.getPipeline().getMachines()) {
// Assert that the pipeline has been removed from Node2PipelineMap as well
Assert.assertEquals(pipelineSelector.getPipelineByDnID(
Assert.assertEquals(scm.getScmNodeManager().getPipelineByDnID(
dn.getUuid()).size(), 0);
}
}