diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 3f156deb3e..71935f0aa2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.states.ReportResult; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -48,7 +48,7 @@ public class ContainerReportHandler implements private static final Logger LOG = LoggerFactory.getLogger(ContainerReportHandler.class); - private final Node2ContainerMap node2ContainerMap; + private final NodeManager nodeManager; private final Mapping containerMapping; @@ -57,14 +57,14 @@ public class ContainerReportHandler implements private ReplicationActivityStatus replicationStatus; public ContainerReportHandler(Mapping containerMapping, - Node2ContainerMap node2ContainerMap, + NodeManager nodeManager, ReplicationActivityStatus replicationActivityStatus) { Preconditions.checkNotNull(containerMapping); - Preconditions.checkNotNull(node2ContainerMap); + Preconditions.checkNotNull(nodeManager); Preconditions.checkNotNull(replicationActivityStatus); this.containerStateManager = containerMapping.getStateManager(); + this.nodeManager = nodeManager; this.containerMapping = containerMapping; - this.node2ContainerMap = node2ContainerMap; this.replicationStatus = replicationActivityStatus; } @@ -89,11 +89,11 @@ public void onMessage(ContainerReportFromDatanode containerReportFromDatanode, .map(ContainerID::new) .collect(Collectors.toSet()); - ReportResult reportResult = node2ContainerMap - .processReport(datanodeOrigin.getUuid(), containerIds); + ReportResult reportResult = nodeManager + .processContainerReport(datanodeOrigin.getUuid(), containerIds); //we have the report, so we can update the states for the next iteration. - node2ContainerMap + nodeManager .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds); for (ContainerID containerID : reportResult.getMissingEntries()) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index 253b3ec44a..17edf9ea17 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -38,8 +37,6 @@ */ public class DeadNodeHandler implements EventHandler { - private final Node2ContainerMap node2ContainerMap; - private final ContainerStateManager containerStateManager; private final NodeManager nodeManager; @@ -47,10 +44,8 @@ public class DeadNodeHandler implements EventHandler { private static final Logger LOG = LoggerFactory.getLogger(DeadNodeHandler.class); - public DeadNodeHandler( - Node2ContainerMap node2ContainerMap, - ContainerStateManager containerStateManager, NodeManager nodeManager) { - this.node2ContainerMap = node2ContainerMap; + public DeadNodeHandler(NodeManager nodeManager, + ContainerStateManager containerStateManager) { this.containerStateManager = containerStateManager; this.nodeManager = nodeManager; } @@ -61,7 +56,7 @@ public void onMessage(DatanodeDetails datanodeDetails, nodeManager.processDeadNode(datanodeDetails.getUuid()); Set containers = - node2ContainerMap.getContainers(datanodeDetails.getUuid()); + nodeManager.getContainers(datanodeDetails.getUuid()); if (containers == null) { LOG.info("There's no containers in dead datanode {}, no replica will be" + " removed from the in-memory state.", datanodeDetails.getUuid()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java index 79b75a5af0..780aa2b9e7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java @@ -20,7 +20,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -31,17 +30,17 @@ */ public class NewNodeHandler implements EventHandler { - private final Node2ContainerMap node2ContainerMap; + private final NodeManager nodeManager; - public NewNodeHandler(Node2ContainerMap node2ContainerMap) { - this.node2ContainerMap = node2ContainerMap; + public NewNodeHandler(NodeManager nodeManager) { + this.nodeManager = nodeManager; } @Override public void onMessage(DatanodeDetails datanodeDetails, EventPublisher publisher) { try { - node2ContainerMap.insertNewDatanode(datanodeDetails.getUuid(), + nodeManager.addDatanodeInContainerMap(datanodeDetails.getUuid(), Collections.emptySet()); } catch (SCMException e) { // TODO: log exception message. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index fad3ee35dc..0dc1a0c5d6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -18,11 +18,16 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.scm.node.states.ReportResult; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; @@ -31,6 +36,7 @@ import java.io.Closeable; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; /** @@ -133,6 +139,61 @@ public interface NodeManager extends StorageContainerNodeProtocol, */ NodeState getNodeState(DatanodeDetails datanodeDetails); + /** + * Get set of pipelines a datanode is part of. + * @param dnId - datanodeID + * @return Set of PipelineID + */ + Set getPipelineByDnID(UUID dnId); + + /** + * Add pipeline information in the NodeManager. + * @param pipeline - Pipeline to be added + */ + void addPipeline(Pipeline pipeline); + + /** + * Remove a pipeline information from the NodeManager. + * @param pipeline - Pipeline to be removed + */ + void removePipeline(Pipeline pipeline); + + /** + * Update set of containers available on a datanode. + * @param uuid - DatanodeID + * @param containerIds - Set of containerIDs + * @throws SCMException - if datanode is not known. For new datanode use + * addDatanodeInContainerMap call. + */ + void setContainersForDatanode(UUID uuid, Set containerIds) + throws SCMException; + + /** + * Process containerReport received from datanode. + * @param uuid - DataonodeID + * @param containerIds - Set of containerIDs + * @return The result after processing containerReport + */ + ReportResult processContainerReport(UUID uuid, + Set containerIds); + + /** + * Return set of containerIDs available on a datanode. + * @param uuid - DatanodeID + * @return - set of containerIDs + */ + Set getContainers(UUID uuid); + + /** + * Insert a new datanode with set of containerIDs for containers available + * on it. + * @param uuid - DatanodeID + * @param containerIDs - Set of ContainerIDs + * @throws SCMException - if datanode already exists + */ + void addDatanodeInContainerMap(UUID uuid, Set containerIDs) + throws SCMException; + /** * Add a {@link SCMCommand} to the command queue, which are * handled by HB thread asynchronously. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java index 1f99ffe12a..88f984b673 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -24,11 +24,14 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.HddsServerUtil; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; -import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; -import org.apache.hadoop.hdds.scm.node.states.NodeStateMap; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.states.*; +import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; import org.apache.hadoop.hdds.server.events.Event; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.common.statemachine @@ -86,6 +89,15 @@ private enum NodeLifeCycleEvent { * This is the map which maintains the current state of all datanodes. */ private final NodeStateMap nodeStateMap; + /** + * Maintains the mapping from node to pipelines a node is part of. + */ + private final Node2PipelineMap node2PipelineMap; + /** + * Maintains the map from node to ContainerIDs for the containers + * available on the node. + */ + private final Node2ContainerMap node2ContainerMap; /** * Used for publishing node state change events. */ @@ -118,6 +130,8 @@ private enum NodeLifeCycleEvent { */ public NodeStateManager(Configuration conf, EventPublisher eventPublisher) { this.nodeStateMap = new NodeStateMap(); + this.node2PipelineMap = new Node2PipelineMap(); + this.node2ContainerMap = new Node2ContainerMap(); this.eventPublisher = eventPublisher; this.state2EventMap = new HashMap<>(); initialiseState2EventMap(); @@ -242,6 +256,14 @@ public void addNode(DatanodeDetails datanodeDetails) eventPublisher.fireEvent(SCMEvents.NEW_NODE, datanodeDetails); } + /** + * Adds a pipeline in the node2PipelineMap. + * @param pipeline - Pipeline to be added + */ + public void addPipeline(Pipeline pipeline) { + node2PipelineMap.addPipeline(pipeline); + } + /** * Get information about the node. * @@ -352,6 +374,15 @@ public List getAllNodes() { return nodes; } + /** + * Gets set of pipelineID a datanode belongs to. + * @param dnId - Datanode ID + * @return Set of PipelineID + */ + public Set getPipelineByDnID(UUID dnId) { + return node2PipelineMap.getPipelines(dnId); + } + /** * Returns the count of healthy nodes. * @@ -456,6 +487,57 @@ public SCMNodeStat removeNodeStat(UUID uuid) throws NodeNotFoundException { return nodeStateMap.removeNodeStat(uuid); } + /** + * Removes a pipeline from the node2PipelineMap. + * @param pipeline - Pipeline to be removed + */ + public void removePipeline(Pipeline pipeline) { + node2PipelineMap.removePipeline(pipeline); + } + /** + * Update set of containers available on a datanode. + * @param uuid - DatanodeID + * @param containerIds - Set of containerIDs + * @throws SCMException - if datanode is not known. For new datanode use + * addDatanodeInContainerMap call. + */ + public void setContainersForDatanode(UUID uuid, Set containerIds) + throws SCMException { + node2ContainerMap.setContainersForDatanode(uuid, containerIds); + } + + /** + * Process containerReport received from datanode. + * @param uuid - DataonodeID + * @param containerIds - Set of containerIDs + * @return The result after processing containerReport + */ + public ReportResult processContainerReport(UUID uuid, + Set containerIds) { + return node2ContainerMap.processReport(uuid, containerIds); + } + + /** + * Return set of containerIDs available on a datanode. + * @param uuid - DatanodeID + * @return - set of containerIDs + */ + public Set getContainers(UUID uuid) { + return node2ContainerMap.getContainers(uuid); + } + + /** + * Insert a new datanode with set of containerIDs for containers available + * on it. + * @param uuid - DatanodeID + * @param containerIDs - Set of ContainerIDs + * @throws SCMException - if datanode already exists + */ + public void addDatanodeInContainerMap(UUID uuid, + Set containerIDs) throws SCMException { + node2ContainerMap.insertNewDatanode(uuid, containerIDs); + } + /** * Move Stale or Dead node to healthy if we got a heartbeat from them. * Move healthy nodes to stale nodes if it is needed. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 620c8169f8..36a6f154ad 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -21,8 +21,13 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.node.states.ReportResult; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.scm.VersionInfo; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; @@ -59,6 +64,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -471,6 +477,83 @@ public Map getNodeCount() { return nodeCountMap; } + /** + * Get set of pipelines a datanode is part of. + * @param dnId - datanodeID + * @return Set of PipelineID + */ + @Override + public Set getPipelineByDnID(UUID dnId) { + return nodeStateManager.getPipelineByDnID(dnId); + } + + + /** + * Add pipeline information in the NodeManager. + * @param pipeline - Pipeline to be added + */ + @Override + public void addPipeline(Pipeline pipeline) { + nodeStateManager.addPipeline(pipeline); + } + + /** + * Remove a pipeline information from the NodeManager. + * @param pipeline - Pipeline to be removed + */ + @Override + public void removePipeline(Pipeline pipeline) { + nodeStateManager.removePipeline(pipeline); + } + + /** + * Update set of containers available on a datanode. + * @param uuid - DatanodeID + * @param containerIds - Set of containerIDs + * @throws SCMException - if datanode is not known. For new datanode use + * addDatanodeInContainerMap call. + */ + @Override + public void setContainersForDatanode(UUID uuid, + Set containerIds) throws SCMException { + nodeStateManager.setContainersForDatanode(uuid, containerIds); + } + + /** + * Process containerReport received from datanode. + * @param uuid - DataonodeID + * @param containerIds - Set of containerIDs + * @return The result after processing containerReport + */ + @Override + public ReportResult processContainerReport(UUID uuid, + Set containerIds) { + return nodeStateManager.processContainerReport(uuid, containerIds); + } + + /** + * Return set of containerIDs available on a datanode. + * @param uuid - DatanodeID + * @return - set of containerIDs + */ + @Override + public Set getContainers(UUID uuid) { + return nodeStateManager.getContainers(uuid); + } + + /** + * Insert a new datanode with set of containerIDs for containers available + * on it. + * @param uuid - DatanodeID + * @param containerIDs - Set of ContainerIDs + * @throws SCMException - if datanode already exists + */ + @Override + public void addDatanodeInContainerMap(UUID uuid, + Set containerIDs) throws SCMException { + nodeStateManager.addDatanodeInContainerMap(uuid, containerIDs); + } + // TODO: // Since datanode commands are added through event queue, onMessage method // should take care of adding commands to command queue. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java index 6ea83dfff5..1b0e5b56e7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java @@ -147,7 +147,7 @@ private void unregisterMXBean() { * @param datanodeID - UUID of DN. * @param report - set of Storage Reports for the Datanode. * @throws SCMException - if we don't know about this datanode, for new DN - * use insertNewDatanode. + * use addDatanodeInContainerMap. */ public void updateDatanodeMap(UUID datanodeID, Set report) throws SCMException { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java index ddbba82533..48939f1bae 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java @@ -19,25 +19,18 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Handles Stale node event. */ public class StaleNodeHandler implements EventHandler { - static final Logger LOG = LoggerFactory.getLogger(StaleNodeHandler.class); - private final Node2ContainerMap node2ContainerMap; private final PipelineSelector pipelineSelector; - public StaleNodeHandler(Node2ContainerMap node2ContainerMap, - PipelineSelector pipelineSelector) { - this.node2ContainerMap = node2ContainerMap; + public StaleNodeHandler(PipelineSelector pipelineSelector) { this.pipelineSelector = pipelineSelector; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java index 549080a25e..9625f81908 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java @@ -70,7 +70,7 @@ public void insertNewDatanode(UUID datanodeID, Set containerIDs) * @param datanodeID - UUID of DN. * @param containers - Set of Containers tht is present on DN. * @throws SCMException - if we don't know about this datanode, for new DN - * use insertNewDatanode. + * use addDatanodeInContainerMap. */ public void setContainersForDatanode(UUID datanodeID, Set containers) throws SCMException { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java similarity index 100% rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java index 59d937ef7d..c8d22ff645 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl; import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -61,7 +60,6 @@ import java.util.Set; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.UUID; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes @@ -85,7 +83,7 @@ public class PipelineSelector { private final long containerSize; private final MetadataStore pipelineStore; private final PipelineStateManager stateManager; - private final Node2PipelineMap node2PipelineMap; + private final NodeManager nodeManager; private final Map> pipeline2ContainerMap; private final Map pipelineMap; private final LeaseManager pipelineLeaseManager; @@ -105,7 +103,6 @@ public PipelineSelector(NodeManager nodeManager, Configuration conf, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); - node2PipelineMap = new Node2PipelineMap(); pipelineMap = new ConcurrentHashMap<>(); pipelineManagerMap = new HashMap<>(); @@ -124,6 +121,7 @@ public PipelineSelector(NodeManager nodeManager, Configuration conf, pipelineLeaseManager.start(); stateManager = new PipelineStateManager(); + this.nodeManager = nodeManager; pipeline2ContainerMap = new HashMap<>(); // Write the container name to pipeline mapping. @@ -361,10 +359,6 @@ private void closeContainersByPipeline(Pipeline pipeline) { } } - public Set getPipelineByDnID(UUID dnId) { - return node2PipelineMap.getPipelines(dnId); - } - private void addExistingPipeline(Pipeline pipeline) throws IOException { LifeCycleState state = pipeline.getLifeCycleState(); switch (state) { @@ -379,7 +373,7 @@ private void addExistingPipeline(Pipeline pipeline) throws IOException { // when all the nodes have reported. pipelineMap.put(pipeline.getId(), pipeline); pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>()); - node2PipelineMap.addPipeline(pipeline); + nodeManager.addPipeline(pipeline); // reset the datanodes in the pipeline // they will be reset on pipeline.resetPipeline(); @@ -393,7 +387,7 @@ private void addExistingPipeline(Pipeline pipeline) throws IOException { } public void handleStaleNode(DatanodeDetails dn) { - Set pipelineIDs = getPipelineByDnID(dn.getUuid()); + Set pipelineIDs = nodeManager.getPipelineByDnID(dn.getUuid()); for (PipelineID id : pipelineIDs) { LOG.info("closing pipeline {}.", id); eventPublisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id); @@ -436,7 +430,7 @@ public void updatePipelineState(Pipeline pipeline, case CREATE: pipelineMap.put(pipeline.getId(), pipeline); pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>()); - node2PipelineMap.addPipeline(pipeline); + nodeManager.addPipeline(pipeline); // Acquire lease on pipeline Lease pipelineLease = pipelineLeaseManager.acquire(pipeline); // Register callback to be executed in case of timeout @@ -459,7 +453,7 @@ public void updatePipelineState(Pipeline pipeline, case TIMEOUT: closePipeline(pipeline); pipeline2ContainerMap.remove(pipeline.getId()); - node2PipelineMap.removePipeline(pipeline); + nodeManager.removePipeline(pipeline); pipelineMap.remove(pipeline.getId()); break; default: diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index bb72075305..a6a967c376 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -62,7 +62,6 @@ import org.apache.hadoop.hdds.scm.node.NodeReportHandler; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; import org.apache.hadoop.hdds.scm.node.StaleNodeHandler; -import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.scm.pipelines.PipelineCloseHandler; import org.apache.hadoop.hdds.scm.pipelines.PipelineActionEventHandler; import org.apache.hadoop.hdds.scm.pipelines.PipelineReportHandler; @@ -212,8 +211,6 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { scmBlockManager = new BlockManagerImpl( conf, getScmNodeManager(), scmContainerManager, eventQueue); - Node2ContainerMap node2ContainerMap = new Node2ContainerMap(); - replicationStatus = new ReplicationActivityStatus(); CloseContainerEventHandler closeContainerHandler = @@ -226,18 +223,17 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { CommandStatusReportHandler cmdStatusReportHandler = new CommandStatusReportHandler(); - NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap); + NewNodeHandler newNodeHandler = new NewNodeHandler(scmNodeManager); StaleNodeHandler staleNodeHandler = - new StaleNodeHandler(node2ContainerMap, - scmContainerManager.getPipelineSelector()); - DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap, - getScmContainerManager().getStateManager(), scmNodeManager); + new StaleNodeHandler(scmContainerManager.getPipelineSelector()); + DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager, + getScmContainerManager().getStateManager()); ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); PendingDeleteHandler pendingDeleteHandler = new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService()); ContainerReportHandler containerReportHandler = - new ContainerReportHandler(scmContainerManager, node2ContainerMap, + new ContainerReportHandler(scmContainerManager, scmNodeManager, replicationStatus); scmChillModeManager = new SCMChillModeManager(conf, getScmContainerManager().getStateManager().getAllContainers(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 0d90728baa..3221053573 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -19,9 +19,14 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -29,6 +34,7 @@ .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.hdds.scm.node.states.ReportResult; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.protocol.VersionResponse; @@ -42,6 +48,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; @@ -73,12 +80,16 @@ public class MockNodeManager implements NodeManager { private final SCMNodeStat aggregateStat; private boolean chillmode; private final Map> commandMap; + private final Node2PipelineMap node2PipelineMap; + private final Node2ContainerMap node2ContainerMap; public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { this.healthyNodes = new LinkedList<>(); this.staleNodes = new LinkedList<>(); this.deadNodes = new LinkedList<>(); this.nodeMetricMap = new HashMap<>(); + this.node2PipelineMap = new Node2PipelineMap(); + this.node2ContainerMap = new Node2ContainerMap(); aggregateStat = new SCMNodeStat(); if (initializeFakeNodes) { for (int x = 0; x < nodeCount; x++) { @@ -289,6 +300,34 @@ public HddsProtos.NodeState getNodeState(DatanodeDetails dd) { return null; } + /** + * Get set of pipelines a datanode is part of. + * @param dnId - datanodeID + * @return Set of PipelineID + */ + @Override + public Set getPipelineByDnID(UUID dnId) { + return node2PipelineMap.getPipelines(dnId); + } + + /** + * Add pipeline information in the NodeManager. + * @param pipeline - Pipeline to be added + */ + @Override + public void addPipeline(Pipeline pipeline) { + node2PipelineMap.addPipeline(pipeline); + } + + /** + * Remove a pipeline information from the NodeManager. + * @param pipeline - Pipeline to be removed + */ + @Override + public void removePipeline(Pipeline pipeline) { + node2PipelineMap.removePipeline(pipeline); + } + @Override public void addDatanodeCommand(UUID dnId, SCMCommand command) { if(commandMap.containsKey(dnId)) { @@ -313,6 +352,54 @@ public void processNodeReport(UUID dnUuid, NodeReportProto nodeReport) { // do nothing } + /** + * Update set of containers available on a datanode. + * @param uuid - DatanodeID + * @param containerIds - Set of containerIDs + * @throws SCMException - if datanode is not known. For new datanode use + * addDatanodeInContainerMap call. + */ + @Override + public void setContainersForDatanode(UUID uuid, Set containerIds) + throws SCMException { + node2ContainerMap.setContainersForDatanode(uuid, containerIds); + } + + /** + * Process containerReport received from datanode. + * @param uuid - DataonodeID + * @param containerIds - Set of containerIDs + * @return The result after processing containerReport + */ + @Override + public ReportResult processContainerReport(UUID uuid, + Set containerIds) { + return node2ContainerMap.processReport(uuid, containerIds); + } + + /** + * Return set of containerIDs available on a datanode. + * @param uuid - DatanodeID + * @return - set of containerIDs + */ + @Override + public Set getContainers(UUID uuid) { + return node2ContainerMap.getContainers(uuid); + } + + /** + * Insert a new datanode with set of containerIDs for containers available + * on it. + * @param uuid - DatanodeID + * @param containerIDs - Set of ContainerIDs + * @throws SCMException - if datanode already exists + */ + @Override + public void addDatanodeInContainerMap(UUID uuid, + Set containerIDs) throws SCMException { + node2ContainerMap.insertNewDatanode(uuid, containerIDs); + } + // Returns the number of commands that is queued to this node manager. public int getCommandCount(DatanodeDetails dd) { List list = commandMap.get(dd.getUuid()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index a59179bdff..f79ae1e32d 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hdds.scm.container.replication .ReplicationActivityStatus; import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; -import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .ContainerReportFromDatanode; @@ -60,6 +60,7 @@ public class TestContainerReportHandler implements EventPublisher { private List publishedEvents = new ArrayList<>(); + private final NodeManager nodeManager = new MockNodeManager(true, 1); private static final Logger LOG = LoggerFactory.getLogger(TestContainerReportHandler.class); @@ -73,7 +74,6 @@ public void resetEventCollector() { public void test() throws IOException { //GIVEN OzoneConfiguration conf = new OzoneConfiguration(); - Node2ContainerMap node2ContainerMap = new Node2ContainerMap(); Mapping mapping = Mockito.mock(Mapping.class); PipelineSelector selector = Mockito.mock(PipelineSelector.class); @@ -96,17 +96,17 @@ public void test() throws IOException { new ReplicationActivityStatus(); ContainerReportHandler reportHandler = - new ContainerReportHandler(mapping, node2ContainerMap, + new ContainerReportHandler(mapping, nodeManager, replicationActivityStatus); DatanodeDetails dn1 = TestUtils.randomDatanodeDetails(); DatanodeDetails dn2 = TestUtils.randomDatanodeDetails(); DatanodeDetails dn3 = TestUtils.randomDatanodeDetails(); DatanodeDetails dn4 = TestUtils.randomDatanodeDetails(); - node2ContainerMap.insertNewDatanode(dn1.getUuid(), new HashSet<>()); - node2ContainerMap.insertNewDatanode(dn2.getUuid(), new HashSet<>()); - node2ContainerMap.insertNewDatanode(dn3.getUuid(), new HashSet<>()); - node2ContainerMap.insertNewDatanode(dn4.getUuid(), new HashSet<>()); + nodeManager.addDatanodeInContainerMap(dn1.getUuid(), new HashSet<>()); + nodeManager.addDatanodeInContainerMap(dn2.getUuid(), new HashSet<>()); + nodeManager.addDatanodeInContainerMap(dn3.getUuid(), new HashSet<>()); + nodeManager.addDatanodeInContainerMap(dn4.getUuid(), new HashSet<>()); PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class); Pipeline pipeline = new Pipeline("leader", LifeCycleState.CLOSED, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 696632297a..7bba032145 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -60,7 +59,6 @@ public class TestDeadNodeHandler { private List sentEvents = new ArrayList<>(); private SCMNodeManager nodeManager; - private Node2ContainerMap node2ContainerMap; private ContainerStateManager containerStateManager; private NodeReportHandler nodeReportHandler; private DeadNodeHandler deadNodeHandler; @@ -70,14 +68,13 @@ public class TestDeadNodeHandler { @Before public void setup() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); - node2ContainerMap = new Node2ContainerMap(); containerStateManager = new ContainerStateManager(conf, Mockito.mock(Mapping.class), Mockito.mock(PipelineSelector.class)); eventQueue = new EventQueue(); nodeManager = new SCMNodeManager(conf, "cluster1", null, eventQueue); - deadNodeHandler = new DeadNodeHandler(node2ContainerMap, - containerStateManager, nodeManager); + deadNodeHandler = new DeadNodeHandler(nodeManager, + containerStateManager); eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler); publisher = Mockito.mock(EventPublisher.class); nodeReportHandler = new NodeReportHandler(nodeManager); @@ -96,8 +93,8 @@ public void testOnMessage() throws IOException { ContainerInfo container3 = TestUtils.allocateContainer(containerStateManager); - registerReplicas(node2ContainerMap, datanode1, container1, container2); - registerReplicas(node2ContainerMap, datanode2, container1, container3); + registerReplicas(datanode1, container1, container2); + registerReplicas(datanode2, container1, container3); registerReplicas(containerStateManager, container1, datanode1, datanode2); registerReplicas(containerStateManager, container2, datanode1); @@ -105,13 +102,8 @@ public void testOnMessage() throws IOException { TestUtils.closeContainer(containerStateManager, container1); - //WHEN datanode1 is dead deadNodeHandler.onMessage(datanode1, publisher); - //THEN - //node2ContainerMap has not been changed - Assert.assertEquals(2, node2ContainerMap.size()); - Set container1Replicas = containerStateManager.getContainerStateMap() .getContainerReplicas(new ContainerID(container1.getContainerID())); @@ -168,7 +160,7 @@ public void testStatisticsUpdate() throws Exception { ContainerInfo container1 = TestUtils.allocateContainer(containerStateManager); - registerReplicas(node2ContainerMap, datanode1, container1); + registerReplicas(datanode1, container1); SCMNodeStat stat = nodeManager.getStats(); Assert.assertTrue(stat.getCapacity().get() == 300); @@ -211,7 +203,7 @@ public void testOnMessageReplicaFailure() throws Exception { ContainerInfo container1 = TestUtils.allocateContainer(containerStateManager); - registerReplicas(node2ContainerMap, dn1, container1); + registerReplicas(dn1, container1); deadNodeHandler.onMessage(dn1, eventQueue); Assert.assertTrue(logCapturer.getOutput().contains( @@ -226,12 +218,11 @@ private void registerReplicas(ContainerStateManager csm, datanodes); } - private void registerReplicas(Node2ContainerMap node2ConMap, - DatanodeDetails datanode, + private void registerReplicas(DatanodeDetails datanode, ContainerInfo... containers) throws SCMException { - node2ConMap - .insertNewDatanode(datanode.getUuid(), + nodeManager + .addDatanodeInContainerMap(datanode.getUuid(), Arrays.stream(containers) .map(container -> new ContainerID(container.getContainerID())) .collect(Collectors.toSet())); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index a9a00ef9f4..74c3932eba 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -19,8 +19,12 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.CommandQueue; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; @@ -30,6 +34,7 @@ .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.hdds.scm.node.states.ReportResult; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; @@ -39,6 +44,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; /** @@ -216,6 +222,82 @@ public NodeState getNodeState(DatanodeDetails dd) { return nodeStateMap.get(dd); } + /** + * Get set of pipelines a datanode is part of. + * @param dnId - datanodeID + * @return Set of PipelineID + */ + @Override + public Set getPipelineByDnID(UUID dnId) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + /** + * Add pipeline information in the NodeManager. + * @param pipeline - Pipeline to be added + */ + @Override + public void addPipeline(Pipeline pipeline) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + /** + * Remove a pipeline information from the NodeManager. + * @param pipeline - Pipeline to be removed + */ + @Override + public void removePipeline(Pipeline pipeline) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + /** + * Update set of containers available on a datanode. + * @param uuid - DatanodeID + * @param containerIds - Set of containerIDs + * @throws SCMException - if datanode is not known. For new datanode use + * addDatanodeInContainerMap call. + */ + @Override + public void setContainersForDatanode(UUID uuid, Set containerIds) + throws SCMException { + throw new UnsupportedOperationException("Not yet implemented"); + } + + /** + * Process containerReport received from datanode. + * @param uuid - DataonodeID + * @param containerIds - Set of containerIDs + * @return The result after processing containerReport + */ + @Override + public ReportResult processContainerReport(UUID uuid, + Set containerIds) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + /** + * Return set of containerIDs available on a datanode. + * @param uuid - DatanodeID + * @return - set of containerIDs + */ + @Override + public Set getContainers(UUID uuid) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + /** + * Insert a new datanode with set of containerIDs for containers available + * on it. + * @param uuid - DatanodeID + * @param containerIDs - Set of ContainerIDs + * @throws SCMException - if datanode already exists + */ + @Override + public void addDatanodeInContainerMap(UUID uuid, + Set containerIDs) throws SCMException { + throw new UnsupportedOperationException("Not yet implemented"); + } + /** * Closes this stream and releases any system resources associated * with it. If the stream is already closed then invoking this diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java index ad3798ea36..e61a90904c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java @@ -97,7 +97,7 @@ public void testPipelineMap() throws IOException { Assert.assertEquals(3, dns.size()); // get pipeline details by dnid - Set pipelines = mapping.getPipelineSelector() + Set pipelines = scm.getScmNodeManager() .getPipelineByDnID(dns.get(0).getUuid()); Assert.assertEquals(1, pipelines.size()); pipelines.forEach(p -> Assert.assertEquals(p, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index 5eabfb9106..b02eae2abf 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -119,7 +119,7 @@ public void testPipelineCloseWithClosedContainer() throws IOException { HddsProtos.LifeCycleState.CLOSED); for (DatanodeDetails dn : ratisContainer1.getPipeline().getMachines()) { // Assert that the pipeline has been removed from Node2PipelineMap as well - Assert.assertEquals(pipelineSelector.getPipelineByDnID( + Assert.assertEquals(scm.getScmNodeManager().getPipelineByDnID( dn.getUuid()).size(), 0); } }