From 846936698b2c8c50662e43534ac999df82066a8b Mon Sep 17 00:00:00 2001 From: Nanda kumar Date: Mon, 17 Sep 2018 21:51:54 +0530 Subject: [PATCH] HDDS-399. Persist open pipeline information across SCM restart. Contributed by Mukul Kumar Singh. --- .../container/common/helpers/Pipeline.java | 24 ++ .../org/apache/hadoop/ozone/OzoneConsts.java | 2 + .../hdds/scm/container/ContainerMapping.java | 24 +- .../scm/container/ContainerStateManager.java | 25 +- .../container/states/ContainerStateMap.java | 38 --- .../hdds/scm/pipelines/PipelineManager.java | 146 +++++------ .../hdds/scm/pipelines/PipelineSelector.java | 243 +++++++++--------- .../scm/pipelines/PipelineStateManager.java | 136 ++++++++++ .../scm/pipelines/ratis/RatisManagerImpl.java | 8 +- .../standalone/StandaloneManagerImpl.java | 8 +- .../container/TestContainerReportHandler.java | 3 +- .../container/TestContainerStateManager.java | 4 +- .../hdds/scm/node/TestDeadNodeHandler.java | 4 +- .../TestContainerStateManagerIntegration.java | 10 +- .../scm/pipeline/TestNode2PipelineMap.java | 22 +- .../hdds/scm/pipeline/TestPipelineClose.java | 15 +- .../hdds/scm/pipeline/TestSCMRestart.java | 101 ++++++++ .../apache/hadoop/ozone/MiniOzoneCluster.java | 5 +- .../hadoop/ozone/MiniOzoneClusterImpl.java | 8 +- 19 files changed, 506 insertions(+), 320 deletions(-) create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java index 67572621ff..ef148e5550 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java @@ -86,6 +86,30 @@ public Pipeline(String leaderID, HddsProtos.LifeCycleState lifeCycleState, datanodes = new TreeMap<>(); } + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Pipeline that = (Pipeline) o; + + return id.equals(that.id) + && factor.equals(that.factor) + && type.equals(that.type) + && lifeCycleState.equals(that.lifeCycleState) + && leaderID.equals(that.leaderID); + + } + /** * Gets pipeline object from protobuf. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index bf4508b99a..0a15ec8b6e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -90,7 +90,9 @@ public final class OzoneConsts { * level DB names used by SCM and data nodes. */ public static final String CONTAINER_DB_SUFFIX = "container.db"; + public static final String PIPELINE_DB_SUFFIX = "pipeline.db"; public static final String SCM_CONTAINER_DB = "scm-" + CONTAINER_DB_SUFFIX; + public static final String SCM_PIPELINE_DB = "scm-" + PIPELINE_DB_SUFFIX; public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX; public static final String DELETED_BLOCK_DB = "deletedBlock.db"; public static final String OM_DB_NAME = "om.db"; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index 5678205a3e..11cc9eeee0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -130,12 +130,13 @@ public ContainerMapping( size = (long)conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE, OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); - this.containerStateManager = - new ContainerStateManager(conf, this); - LOG.trace("Container State Manager created."); this.pipelineSelector = new PipelineSelector(nodeManager, - containerStateManager, conf, eventPublisher); + conf, eventPublisher, cacheSizeMB); + + this.containerStateManager = + new ContainerStateManager(conf, this, pipelineSelector); + LOG.trace("Container State Manager created."); this.eventPublisher = eventPublisher; @@ -202,11 +203,6 @@ public ContainerWithPipeline getContainerWithPipeline(long containerID) if (contInfo.isContainerOpen()) { // If pipeline with given pipeline Id already exist return it pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID()); - if (pipeline == null) { - pipeline = pipelineSelector - .getReplicationPipeline(contInfo.getReplicationType(), - contInfo.getReplicationFactor()); - } } else { // For close containers create pipeline from datanodes with replicas Set dnWithReplicas = containerStateManager @@ -392,9 +388,8 @@ public HddsProtos.LifeCycleState updateContainerState( ContainerInfo updatedContainer = containerStateManager .updateContainerState(containerInfo, event); if (!updatedContainer.isContainerOpen()) { - Pipeline pipeline = pipelineSelector - .getPipeline(containerInfo.getPipelineID()); - pipelineSelector.closePipelineIfNoOpenContainers(pipeline); + pipelineSelector.removeContainerFromPipeline( + containerInfo.getPipelineID(), containerID); } containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray()); return updatedContainer.getState(); @@ -474,11 +469,6 @@ public ContainerWithPipeline getMatchingContainerWithPipeline( } Pipeline pipeline = pipelineSelector .getPipeline(containerInfo.getPipelineID()); - if (pipeline == null) { - pipeline = pipelineSelector - .getReplicationPipeline(containerInfo.getReplicationType(), - containerInfo.getReplicationFactor()); - } return new ContainerWithPipeline(containerInfo, pipeline); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 7989c55717..930c098f0f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.container.states.ContainerState; import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; @@ -137,7 +136,7 @@ public class ContainerStateManager implements Closeable { */ @SuppressWarnings("unchecked") public ContainerStateManager(Configuration configuration, - Mapping containerMapping) { + Mapping containerMapping, PipelineSelector pipelineSelector) { // Initialize the container state machine. Set finalStates = new HashSet(); @@ -159,10 +158,11 @@ public ContainerStateManager(Configuration configuration, lastUsedMap = new ConcurrentHashMap<>(); containerCount = new AtomicLong(0); containers = new ContainerStateMap(); - loadExistingContainers(containerMapping); + loadExistingContainers(containerMapping, pipelineSelector); } - private void loadExistingContainers(Mapping containerMapping) { + private void loadExistingContainers(Mapping containerMapping, + PipelineSelector pipelineSelector) { List containerList; try { @@ -184,6 +184,8 @@ private void loadExistingContainers(Mapping containerMapping) { long maxID = 0; for (ContainerInfo container : containerList) { containers.addContainer(container); + pipelineSelector.addContainerToPipeline( + container.getPipelineID(), container.getContainerID()); if (maxID < container.getContainerID()) { maxID = container.getContainerID(); @@ -303,6 +305,7 @@ public ContainerWithPipeline allocateContainer(PipelineSelector selector, + "replication=%s couldn't be found for the new container. " + "Do you have enough nodes?", type, replicationFactor); + long containerID = containerCount.incrementAndGet(); ContainerInfo containerInfo = new ContainerInfo.Builder() .setState(HddsProtos.LifeCycleState.ALLOCATED) .setPipelineID(pipeline.getId()) @@ -313,11 +316,12 @@ public ContainerWithPipeline allocateContainer(PipelineSelector selector, .setNumberOfKeys(0) .setStateEnterTime(Time.monotonicNow()) .setOwner(owner) - .setContainerID(containerCount.incrementAndGet()) + .setContainerID(containerID) .setDeleteTransactionId(0) .setReplicationFactor(replicationFactor) .setReplicationType(pipeline.getType()) .build(); + selector.addContainerToPipeline(pipeline.getId(), containerID); Preconditions.checkNotNull(containerInfo); containers.addContainer(containerInfo); LOG.trace("New container allocated: {}", containerInfo); @@ -470,17 +474,6 @@ public NavigableSet getMatchingContainerIDs( factor, type); } - /** - * Returns a set of open ContainerIDs that reside on a pipeline. - * - * @param pipelineID PipelineID of the Containers. - * @return Set of containers that match the specific query parameters. - */ - public NavigableSet getMatchingContainerIDsByPipeline(PipelineID - pipelineID) { - return containers.getOpenContainerIDsByPipeline(pipelineID); - } - /** * Returns the containerInfo with pipeline for the given container id. * @param selector -- Pipeline selector class. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java index 9657594405..880a715f6b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; @@ -75,9 +74,6 @@ * Replica and THREE Replica. User can specify how many copies should be made * for a ozone key. *

- * 5.Pipeline - The pipeline constitute the set of Datanodes on which the - * open container resides physically. - *

* The most common access pattern of this class is to select a container based * on all these parameters, for example, when allocating a block we will * select a container that belongs to user1, with Ratis replication which can @@ -92,14 +88,6 @@ public class ContainerStateMap { private final ContainerAttribute ownerMap; private final ContainerAttribute factorMap; private final ContainerAttribute typeMap; - // This map constitutes the pipeline to open container mappings. - // This map will be queried for the list of open containers on a particular - // pipeline and issue a close on corresponding containers in case of - // following events: - //1. Dead datanode. - //2. Datanode out of space. - //3. Volume loss or volume out of space. - private final ContainerAttribute openPipelineMap; private final Map containerMap; // Map to hold replicas of given container. @@ -121,7 +109,6 @@ public ContainerStateMap() { ownerMap = new ContainerAttribute<>(); factorMap = new ContainerAttribute<>(); typeMap = new ContainerAttribute<>(); - openPipelineMap = new ContainerAttribute<>(); containerMap = new HashMap<>(); lock = new ReentrantReadWriteLock(); contReplicaMap = new HashMap<>(); @@ -158,9 +145,6 @@ public void addContainer(ContainerInfo info) ownerMap.insert(info.getOwner(), id); factorMap.insert(info.getReplicationFactor(), id); typeMap.insert(info.getReplicationType(), id); - if (info.isContainerOpen()) { - openPipelineMap.insert(info.getPipelineID(), id); - } // Flush the cache of this container type, will be added later when // get container queries are executed. @@ -391,11 +375,6 @@ public void updateState(ContainerInfo info, LifeCycleState currentState, throw new SCMException("Updating the container map failed.", ex, FAILED_TO_CHANGE_CONTAINER_STATE); } - // In case the container is set to closed state, it needs to be removed - // from the pipeline Map. - if (!info.isContainerOpen()) { - openPipelineMap.remove(info.getPipelineID(), id); - } } finally { lock.writeLock().unlock(); } @@ -433,23 +412,6 @@ NavigableSet getContainerIDsByType(ReplicationType type) { } } - /** - * Returns Open containers in the SCM by the Pipeline. - * - * @param pipelineID - Pipeline id. - * @return NavigableSet - */ - public NavigableSet getOpenContainerIDsByPipeline( - PipelineID pipelineID) { - Preconditions.checkNotNull(pipelineID); - lock.readLock().lock(); - try { - return openPipelineMap.getCollection(pipelineID); - } finally { - lock.readLock().unlock(); - } - } - /** * Returns Containers by replication factor. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java index 102df8a9e8..07ff2b0918 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java @@ -16,11 +16,10 @@ */ package org.apache.hadoop.hdds.scm.pipelines; +import java.util.ArrayList; import java.util.LinkedList; -import java.util.Map; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; @@ -37,17 +36,53 @@ public abstract class PipelineManager { private static final Logger LOG = LoggerFactory.getLogger(PipelineManager.class); - private final List activePipelines; - private final Map pipelineMap; - private final AtomicInteger pipelineIndex; - private final Node2PipelineMap node2PipelineMap; + private final ArrayList activePipelines; - public PipelineManager(Node2PipelineMap map, - Map pipelineMap) { - activePipelines = new LinkedList<>(); - pipelineIndex = new AtomicInteger(0); - this.pipelineMap = pipelineMap; - this.node2PipelineMap = map; + public PipelineManager() { + activePipelines = new ArrayList<>(); + for (ReplicationFactor factor : ReplicationFactor.values()) { + activePipelines.add(factor.ordinal(), new ActivePipelines()); + } + } + + private static class ActivePipelines { + private final List activePipelines; + private final AtomicInteger pipelineIndex; + + ActivePipelines() { + activePipelines = new LinkedList<>(); + pipelineIndex = new AtomicInteger(0); + } + + void addPipeline(PipelineID pipelineID) { + activePipelines.add(pipelineID); + } + + void removePipeline(PipelineID pipelineID) { + activePipelines.remove(pipelineID); + } + + /** + * Find a Pipeline that is operational. + * + * @return - Pipeline or null + */ + PipelineID findOpenPipeline() { + if (activePipelines.size() == 0) { + LOG.error("No Operational pipelines found. Returning null."); + return null; + } + return activePipelines.get(getNextIndex()); + } + + /** + * gets the next index of the Pipeline to get. + * + * @return index in the link list to get. + */ + private int getNextIndex() { + return pipelineIndex.incrementAndGet() % activePipelines.size(); + } } /** @@ -59,44 +94,30 @@ public PipelineManager(Node2PipelineMap map, * @param replicationFactor - Replication Factor * @return a Pipeline. */ - public synchronized final Pipeline getPipeline( + public synchronized final PipelineID getPipeline( ReplicationFactor replicationFactor, ReplicationType replicationType) { - Pipeline pipeline = findOpenPipeline(replicationType, replicationFactor); - if (pipeline != null) { + PipelineID id = + activePipelines.get(replicationFactor.ordinal()).findOpenPipeline(); + if (id != null) { LOG.debug("re-used pipeline:{} for container with " + "replicationType:{} replicationFactor:{}", - pipeline.getId(), replicationType, replicationFactor); + id, replicationType, replicationFactor); } - if (pipeline == null) { + if (id == null) { LOG.error("Get pipeline call failed. We are not able to find" + " operational pipeline."); return null; } else { - return pipeline; + return id; } } - /** - * This function to get pipeline with given pipeline name. - * - * @param id - * @return a Pipeline. - */ - public synchronized final Pipeline getPipeline(PipelineID id) { - Pipeline pipeline = null; - - // 1. Check if pipeline already exists - if (pipelineMap.containsKey(id)) { - pipeline = pipelineMap.get(id); - LOG.debug("Returning pipeline for pipelineName:{}", id); - return pipeline; - } else { - LOG.debug("Unable to find pipeline for pipelineName:{}", id); - } - return pipeline; + void addOpenPipeline(Pipeline pipeline) { + activePipelines.get(pipeline.getFactor().ordinal()) + .addPipeline(pipeline.getId()); } - protected int getReplicationCount(ReplicationFactor factor) { + protected static int getReplicationCount(ReplicationFactor factor) { switch (factor) { case ONE: return 1; @@ -116,46 +137,6 @@ public abstract Pipeline allocatePipeline( */ public abstract void initializePipeline(Pipeline pipeline) throws IOException; - /** - * Find a Pipeline that is operational. - * - * @return - Pipeline or null - */ - private Pipeline findOpenPipeline( - ReplicationType type, ReplicationFactor factor) { - Pipeline pipeline = null; - final int sentinal = -1; - if (activePipelines.size() == 0) { - LOG.error("No Operational pipelines found. Returning null."); - return null; - } - int startIndex = getNextIndex(); - int nextIndex = sentinal; - for (; startIndex != nextIndex; nextIndex = getNextIndex()) { - // Just walk the list in a circular way. - PipelineID id = - activePipelines - .get(nextIndex != sentinal ? nextIndex : startIndex); - Pipeline temp = pipelineMap.get(id); - // if we find an operational pipeline just return that. - if ((temp.getLifeCycleState() == LifeCycleState.OPEN) && - (temp.getFactor() == factor) && (temp.getType() == type)) { - pipeline = temp; - break; - } - } - return pipeline; - } - - /** - * gets the next index of the Pipeline to get. - * - * @return index in the link list to get. - */ - private int getNextIndex() { - return pipelineIndex.incrementAndGet() % activePipelines.size(); - } - /** * Creates a pipeline with a specified replication factor and type. * @param replicationFactor - Replication Factor. @@ -168,9 +149,6 @@ public Pipeline createPipeline(ReplicationFactor replicationFactor, LOG.debug("created new pipeline:{} for container with " + "replicationType:{} replicationFactor:{}", pipeline.getId(), replicationType, replicationFactor); - activePipelines.add(pipeline.getId()); - pipelineMap.put(pipeline.getId(), pipeline); - node2PipelineMap.addPipeline(pipeline); } return pipeline; } @@ -180,17 +158,15 @@ public Pipeline createPipeline(ReplicationFactor replicationFactor, * @param pipeline pipeline to be finalized */ public synchronized void finalizePipeline(Pipeline pipeline) { - activePipelines.remove(pipeline.getId()); + activePipelines.get(pipeline.getFactor().ordinal()) + .removePipeline(pipeline.getId()); } /** * * @param pipeline */ - public void closePipeline(Pipeline pipeline) throws IOException { - pipelineMap.remove(pipeline.getId()); - node2PipelineMap.removePipeline(pipeline); - } + public abstract void closePipeline(Pipeline pipeline) throws IOException; /** * list members in the pipeline. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java index 63afbaa933..c9f51f7a42 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java @@ -21,7 +21,6 @@ import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.ContainerStateManager; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.algorithms @@ -39,30 +38,34 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.ozone.common.statemachine - .InvalidStateTransitionException; -import org.apache.hadoop.ozone.common.statemachine.StateMachine; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.lease.Lease; import org.apache.hadoop.ozone.lease.LeaseException; import org.apache.hadoop.ozone.lease.LeaseManager; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.HashSet; import java.util.List; -import java.util.NavigableSet; +import java.util.HashMap; import java.util.Set; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes .FAILED_TO_CHANGE_PIPELINE_STATE; +import static org.apache.hadoop.hdds.server + .ServerUtils.getOzoneMetaDirPath; +import static org.apache.hadoop.ozone + .OzoneConsts.SCM_PIPELINE_DB; /** * Sends the request to the right pipeline manager. @@ -73,16 +76,16 @@ public class PipelineSelector { private final ContainerPlacementPolicy placementPolicy; private final NodeManager nodeManager; private final Configuration conf; - private final ContainerStateManager containerStateManager; private final EventPublisher eventPublisher; private final RatisManagerImpl ratisManager; private final StandaloneManagerImpl standaloneManager; private final long containerSize; + private final MetadataStore pipelineStore; + private final PipelineStateManager stateManager; private final Node2PipelineMap node2PipelineMap; + private final Map> pipeline2ContainerMap; private final Map pipelineMap; private final LeaseManager pipelineLeaseManager; - private final StateMachine stateMachine; /** * Constructs a pipeline Selector. @@ -90,9 +93,8 @@ public class PipelineSelector { * @param nodeManager - node manager * @param conf - Ozone Config */ - public PipelineSelector(NodeManager nodeManager, - ContainerStateManager containerStateManager, Configuration conf, - EventPublisher eventPublisher) { + public PipelineSelector(NodeManager nodeManager, Configuration conf, + EventPublisher eventPublisher, int cacheSizeMB) throws IOException { this.nodeManager = nodeManager; this.conf = conf; this.eventPublisher = eventPublisher; @@ -105,79 +107,66 @@ public PipelineSelector(NodeManager nodeManager, pipelineMap = new ConcurrentHashMap<>(); this.standaloneManager = new StandaloneManagerImpl(this.nodeManager, placementPolicy, - containerSize, node2PipelineMap, pipelineMap); + containerSize); this.ratisManager = new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize, - conf, node2PipelineMap, pipelineMap); - // Initialize the container state machine. - Set finalStates = new HashSet(); + conf); long pipelineCreationLeaseTimeout = conf.getTimeDuration( ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT, ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); - this.containerStateManager = containerStateManager; pipelineLeaseManager = new LeaseManager<>("PipelineCreation", pipelineCreationLeaseTimeout); pipelineLeaseManager.start(); - // These are the steady states of a container. - finalStates.add(HddsProtos.LifeCycleState.OPEN); - finalStates.add(HddsProtos.LifeCycleState.CLOSED); + stateManager = new PipelineStateManager(); + pipeline2ContainerMap = new HashMap<>(); - this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED, - finalStates); - initializeStateMachine(); + // Write the container name to pipeline mapping. + File metaDir = getOzoneMetaDirPath(conf); + File containerDBPath = new File(metaDir, SCM_PIPELINE_DB); + pipelineStore = MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setDbFile(containerDBPath) + .setCacheSize(cacheSizeMB * OzoneConsts.MB) + .build(); + + reloadExistingPipelines(); } - /** - * Event and State Transition Mapping. - * - * State: ALLOCATED ---------------> CREATING - * Event: CREATE - * - * State: CREATING ---------------> OPEN - * Event: CREATED - * - * State: OPEN ---------------> CLOSING - * Event: FINALIZE - * - * State: CLOSING ---------------> CLOSED - * Event: CLOSE - * - * State: CREATING ---------------> CLOSED - * Event: TIMEOUT - * - * - * Container State Flow: - * - * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING] - * (CREATE) | (CREATED) (FINALIZE) | - * | | - * | | - * |(TIMEOUT) |(CLOSE) - * | | - * +--------> [CLOSED] <--------+ - */ - private void initializeStateMachine() { - stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED, - HddsProtos.LifeCycleState.CREATING, - HddsProtos.LifeCycleEvent.CREATE); + private void reloadExistingPipelines() throws IOException { + if (pipelineStore.isEmpty()) { + // Nothing to do just return + return; + } - stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING, - HddsProtos.LifeCycleState.OPEN, - HddsProtos.LifeCycleEvent.CREATED); + List> range = + pipelineStore.getSequentialRangeKVs(null, Integer.MAX_VALUE, null); - stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN, - HddsProtos.LifeCycleState.CLOSING, - HddsProtos.LifeCycleEvent.FINALIZE); + // Transform the values into the pipelines. + // TODO: filter by pipeline state + for (Map.Entry entry : range) { + Pipeline pipeline = Pipeline.getFromProtoBuf( + HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue())); + Preconditions.checkNotNull(pipeline); + addExistingPipeline(pipeline); + } + } - stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING, - HddsProtos.LifeCycleState.CLOSED, - HddsProtos.LifeCycleEvent.CLOSE); + public Set getOpenContainerIDsByPipeline(PipelineID pipelineID) { + return pipeline2ContainerMap.get(pipelineID); + } - stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING, - HddsProtos.LifeCycleState.CLOSED, - HddsProtos.LifeCycleEvent.TIMEOUT); + public void addContainerToPipeline(PipelineID pipelineID, long containerID) { + pipeline2ContainerMap.get(pipelineID) + .add(ContainerID.valueof(containerID)); + } + + public void removeContainerFromPipeline(PipelineID pipelineID, + long containerID) throws IOException { + pipeline2ContainerMap.get(pipelineID) + .remove(ContainerID.valueof(containerID)); + closePipelineIfNoOpenContainers(pipelineMap.get(pipelineID)); } /** @@ -294,8 +283,14 @@ public Pipeline getReplicationPipeline(ReplicationType replicationType, manager.createPipeline(replicationFactor, replicationType); if (pipeline == null) { // try to return a pipeline from already allocated pipelines - pipeline = manager.getPipeline(replicationFactor, replicationType); + PipelineID pipelineId = + manager.getPipeline(replicationFactor, replicationType); + pipeline = pipelineMap.get(pipelineId); + Preconditions.checkArgument(pipeline.getLifeCycleState() == + LifeCycleState.OPEN); } else { + pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(), + pipeline.getProtobufMessage().toByteArray()); // if a new pipeline is created, initialize its state machine updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATE); @@ -343,8 +338,8 @@ public void closePipelineIfNoOpenContainers(Pipeline pipeline) if (pipeline.getLifeCycleState() != LifeCycleState.CLOSING) { return; } - NavigableSet containerIDS = containerStateManager - .getMatchingContainerIDsByPipeline(pipeline.getId()); + HashSet containerIDS = + pipeline2ContainerMap.get(pipeline.getId()); if (containerIDS.size() == 0) { updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CLOSE); LOG.info("Closing pipeline. pipelineID: {}", pipeline.getId()); @@ -358,56 +353,58 @@ private void closePipeline(Pipeline pipeline) throws IOException { PipelineManager manager = getPipelineManager(pipeline.getType()); Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getId()); - NavigableSet containers = - containerStateManager - .getMatchingContainerIDsByPipeline(pipeline.getId()); + HashSet containers = + pipeline2ContainerMap.get(pipeline.getId()); Preconditions.checkArgument(containers.size() == 0); manager.closePipeline(pipeline); } + /** + * Add to a given pipeline. + */ + private void addOpenPipeline(Pipeline pipeline) { + PipelineManager manager = getPipelineManager(pipeline.getType()); + Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); + LOG.debug("Adding Open pipeline. pipelineID: {}", pipeline.getId()); + manager.addOpenPipeline(pipeline); + } + private void closeContainersByPipeline(Pipeline pipeline) { - NavigableSet containers = - containerStateManager - .getMatchingContainerIDsByPipeline(pipeline.getId()); + HashSet containers = + pipeline2ContainerMap.get(pipeline.getId()); for (ContainerID id : containers) { eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id); } } - /** - * list members in the pipeline . - */ - - public List getDatanodes(ReplicationType replicationType, - PipelineID pipelineID) throws IOException { - PipelineManager manager = getPipelineManager(replicationType); - Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); - LOG.debug("Getting data nodes from pipeline : {}", pipelineID); - return manager.getMembers(pipelineID); - } - - /** - * Update the datanodes in the list of the pipeline. - */ - - public void updateDatanodes(ReplicationType replicationType, PipelineID - pipelineID, List newDatanodes) throws IOException { - PipelineManager manager = getPipelineManager(replicationType); - Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); - LOG.debug("Updating pipeline: {} with new nodes:{}", pipelineID, - newDatanodes.stream().map(DatanodeDetails::toString) - .collect(Collectors.joining(","))); - manager.updatePipeline(pipelineID, newDatanodes); - } - - public Node2PipelineMap getNode2PipelineMap() { - return node2PipelineMap; - } - public Set getPipelineId(UUID dnId) { return node2PipelineMap.getPipelines(dnId); } + private void addExistingPipeline(Pipeline pipeline) throws IOException { + LifeCycleState state = pipeline.getLifeCycleState(); + switch (state) { + case ALLOCATED: + // a pipeline in allocated state is only present in SCM and does not exist + // on datanode, on SCM restart, this pipeline can be ignored. + break; + case CREATING: + case OPEN: + case CLOSING: + //TODO: process pipeline report and move pipeline to active queue + // when all the nodes have reported. + pipelineMap.put(pipeline.getId(), pipeline); + pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>()); + node2PipelineMap.addPipeline(pipeline); + break; + case CLOSED: + // if the pipeline is in closed state, nothing to do. + break; + default: + throw new IOException("invalid pipeline state:" + state); + } + } + /** * Update the Pipeline State to the next state. * @@ -417,24 +414,12 @@ public Set getPipelineId(UUID dnId) { */ public void updatePipelineState(Pipeline pipeline, HddsProtos.LifeCycleEvent event) throws IOException { - HddsProtos.LifeCycleState newState; - try { - newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event); - } catch (InvalidStateTransitionException ex) { - String error = String.format("Failed to update pipeline state %s, " + - "reason: invalid state transition from state: %s upon " + - "event: %s.", - pipeline.getId(), pipeline.getLifeCycleState(), event); - LOG.error(error); - throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE); - } - - // This is a post condition after executing getNextState. - Preconditions.checkNotNull(newState); - Preconditions.checkNotNull(pipeline); try { switch (event) { case CREATE: + pipelineMap.put(pipeline.getId(), pipeline); + pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>()); + node2PipelineMap.addPipeline(pipeline); // Acquire lease on pipeline Lease pipelineLease = pipelineLeaseManager.acquire(pipeline); // Register callback to be executed in case of timeout @@ -446,6 +431,7 @@ public void updatePipelineState(Pipeline pipeline, case CREATED: // Release the lease on pipeline pipelineLeaseManager.release(pipeline); + addOpenPipeline(pipeline); break; case FINALIZE: @@ -455,21 +441,30 @@ public void updatePipelineState(Pipeline pipeline, case CLOSE: case TIMEOUT: closePipeline(pipeline); + pipeline2ContainerMap.remove(pipeline.getId()); + node2PipelineMap.removePipeline(pipeline); + pipelineMap.remove(pipeline.getId()); break; default: throw new SCMException("Unsupported pipeline LifeCycleEvent.", FAILED_TO_CHANGE_PIPELINE_STATE); } - pipeline.setLifeCycleState(newState); + stateManager.updatePipelineState(pipeline, event); + pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(), + pipeline.getProtobufMessage().toByteArray()); } catch (LeaseException e) { throw new IOException("Lease Exception.", e); } } - public void shutdown() { + public void shutdown() throws IOException { if (pipelineLeaseManager != null) { pipelineLeaseManager.shutdown(); } + + if (pipelineStore != null) { + pipelineStore.close(); + } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java new file mode 100644 index 0000000000..6054f16544 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.pipelines; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.ozone.common.statemachine + .InvalidStateTransitionException; +import org.apache.hadoop.ozone.common.statemachine.StateMachine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes + .FAILED_TO_CHANGE_PIPELINE_STATE; + +/** + * Manages Pipeline states. + */ +public class PipelineStateManager { + private static final Logger LOG = + LoggerFactory.getLogger(PipelineStateManager.class); + + private final StateMachine stateMachine; + + PipelineStateManager() { + // Initialize the container state machine. + Set finalStates = new HashSet<>(); + // These are the steady states of a container. + finalStates.add(HddsProtos.LifeCycleState.OPEN); + finalStates.add(HddsProtos.LifeCycleState.CLOSED); + + this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED, + finalStates); + initializeStateMachine(); + } + + /** + * Event and State Transition Mapping. + * + * State: ALLOCATED ---------------> CREATING + * Event: CREATE + * + * State: CREATING ---------------> OPEN + * Event: CREATED + * + * State: OPEN ---------------> CLOSING + * Event: FINALIZE + * + * State: CLOSING ---------------> CLOSED + * Event: CLOSE + * + * State: CREATING ---------------> CLOSED + * Event: TIMEOUT + * + * + * Container State Flow: + * + * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING] + * (CREATE) | (CREATED) (FINALIZE) | + * | | + * | | + * |(TIMEOUT) |(CLOSE) + * | | + * +--------> [CLOSED] <--------+ + */ + private void initializeStateMachine() { + stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED, + HddsProtos.LifeCycleState.CREATING, + HddsProtos.LifeCycleEvent.CREATE); + + stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING, + HddsProtos.LifeCycleState.OPEN, + HddsProtos.LifeCycleEvent.CREATED); + + stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN, + HddsProtos.LifeCycleState.CLOSING, + HddsProtos.LifeCycleEvent.FINALIZE); + + stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING, + HddsProtos.LifeCycleState.CLOSED, + HddsProtos.LifeCycleEvent.CLOSE); + + stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING, + HddsProtos.LifeCycleState.CLOSED, + HddsProtos.LifeCycleEvent.TIMEOUT); + } + + + /** + * Update the Pipeline State to the next state. + * + * @param pipeline - Pipeline + * @param event - LifeCycle Event + * @throws SCMException on Failure. + */ + public void updatePipelineState(Pipeline pipeline, + HddsProtos.LifeCycleEvent event) throws IOException { + HddsProtos.LifeCycleState newState; + try { + newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event); + } catch (InvalidStateTransitionException ex) { + String error = String.format("Failed to update pipeline state %s, " + + "reason: invalid state transition from state: %s upon " + + "event: %s.", + pipeline.getId(), pipeline.getLifeCycleState(), event); + LOG.error(error); + throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE); + } + + // This is a post condition after executing getNextState. + Preconditions.checkNotNull(newState); + Preconditions.checkNotNull(pipeline); + pipeline.setLifeCycleState(newState); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java index 0342e18c55..d3cec882bb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap; import org.apache.hadoop.hdds.scm.pipelines.PipelineManager; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -39,7 +38,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; -import java.util.Map; /** * Implementation of {@link PipelineManager}. @@ -59,9 +57,8 @@ public class RatisManagerImpl extends PipelineManager { * @param nodeManager */ public RatisManagerImpl(NodeManager nodeManager, - ContainerPlacementPolicy placementPolicy, long size, Configuration conf, - Node2PipelineMap map, Map pipelineMap) { - super(map, pipelineMap); + ContainerPlacementPolicy placementPolicy, long size, Configuration conf) { + super(); this.conf = conf; this.nodeManager = nodeManager; ratisMembers = new HashSet<>(); @@ -114,7 +111,6 @@ public void closePipeline(Pipeline pipeline) throws IOException { XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { client.destroyPipeline(); } - super.closePipeline(pipeline); for (DatanodeDetails node : pipeline.getMachines()) { // A node should always be the in ratis members list. Preconditions.checkArgument(ratisMembers.remove(node)); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java index 2573b9c480..ed2fc2fe68 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap; import org.apache.hadoop.hdds.scm.pipelines.PipelineManager; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -37,7 +36,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; -import java.util.Map; /** * Standalone Manager Impl to prove that pluggable interface @@ -58,9 +56,8 @@ public class StandaloneManagerImpl extends PipelineManager { * @param containerSize - Container Size. */ public StandaloneManagerImpl(NodeManager nodeManager, - ContainerPlacementPolicy placementPolicy, long containerSize, - Node2PipelineMap map, Map pipelineMap) { - super(map, pipelineMap); + ContainerPlacementPolicy placementPolicy, long containerSize) { + super(); this.nodeManager = nodeManager; this.placementPolicy = placementPolicy; this.containerSize = containerSize; @@ -105,7 +102,6 @@ public void initializePipeline(Pipeline pipeline) { * Close the pipeline. */ public void closePipeline(Pipeline pipeline) throws IOException { - super.closePipeline(pipeline); for (DatanodeDetails node : pipeline.getMachines()) { // A node should always be the in standalone members list. Preconditions.checkArgument(standAloneMembers.remove(node)); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index d74a32f611..a59179bdff 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -75,6 +75,7 @@ public void test() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); Node2ContainerMap node2ContainerMap = new Node2ContainerMap(); Mapping mapping = Mockito.mock(Mapping.class); + PipelineSelector selector = Mockito.mock(PipelineSelector.class); when(mapping.getContainer(anyLong())) .thenAnswer( @@ -87,7 +88,7 @@ public void test() throws IOException { ); ContainerStateManager containerStateManager = - new ContainerStateManager(conf, mapping); + new ContainerStateManager(conf, mapping, selector); when(mapping.getStateManager()).thenReturn(containerStateManager); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java index fe92ee5f1e..b857740a5f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -41,7 +42,8 @@ public class TestContainerStateManager { public void init() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); Mapping mapping = Mockito.mock(Mapping.class); - containerStateManager = new ContainerStateManager(conf, mapping); + PipelineSelector selector = Mockito.mock(PipelineSelector.class); + containerStateManager = new ContainerStateManager(conf, mapping, selector); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 0b69f5f8f4..5ca9cb769a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.junit.Assert; @@ -60,7 +61,8 @@ public void testOnMessage() throws IOException { Node2ContainerMap node2ContainerMap = new Node2ContainerMap(); ContainerStateManager containerStateManager = new ContainerStateManager( new OzoneConfiguration(), - Mockito.mock(Mapping.class) + Mockito.mock(Mapping.class), + Mockito.mock(PipelineSelector.class) ); ContainerInfo container1 = diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java index c6e819bc6e..422a7debe7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -54,8 +55,9 @@ public class TestContainerStateManagerIntegration { private MiniOzoneCluster cluster; private XceiverClientManager xceiverClientManager; private StorageContainerManager scm; - private Mapping scmContainerMapping; + private ContainerMapping scmContainerMapping; private ContainerStateManager containerStateManager; + private PipelineSelector selector; private String containerOwner = "OZONE"; @@ -66,8 +68,9 @@ public void setup() throws Exception { cluster.waitForClusterToBeReady(); xceiverClientManager = new XceiverClientManager(conf); scm = cluster.getStorageContainerManager(); - scmContainerMapping = scm.getScmContainerManager(); + scmContainerMapping = (ContainerMapping) scm.getScmContainerManager(); containerStateManager = scmContainerMapping.getStateManager(); + selector = scmContainerMapping.getPipelineSelector(); } @After @@ -133,8 +136,7 @@ public void testContainerStateManagerRestart() throws IOException { // New instance of ContainerStateManager should load all the containers in // container store. ContainerStateManager stateManager = - new ContainerStateManager(conf, scmContainerMapping - ); + new ContainerStateManager(conf, scmContainerMapping, selector); int matchCount = stateManager .getMatchingContainerIDs(containerOwner, xceiverClientManager.getType(), xceiverClientManager.getFactor(), diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java index b8cb99701b..aefa6b0c5a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java @@ -30,14 +30,13 @@ import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.List; -import java.util.NavigableSet; import java.util.Set; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos @@ -60,8 +59,8 @@ public class TestNode2PipelineMap { * * @throws IOException */ - @BeforeClass - public static void init() throws Exception { + @Before + public void init() throws Exception { conf = new OzoneConfiguration(); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build(); cluster.waitForClusterToBeReady(); @@ -75,8 +74,8 @@ public static void init() throws Exception { /** * Shutdown MiniDFSCluster. */ - @AfterClass - public static void shutdown() { + @After + public void shutdown() { if (cluster != null) { cluster.shutdown(); } @@ -86,19 +85,20 @@ public static void shutdown() { @Test public void testPipelineMap() throws IOException { - NavigableSet set = stateMap.getOpenContainerIDsByPipeline( + Set set = pipelineSelector.getOpenContainerIDsByPipeline( ratisContainer.getPipeline().getId()); long cId = ratisContainer.getContainerInfo().getContainerID(); Assert.assertEquals(1, set.size()); - Assert.assertEquals(cId, set.first().getId()); + set.forEach(containerID -> + Assert.assertEquals(containerID, ContainerID.valueof(cId))); List dns = ratisContainer.getPipeline().getMachines(); Assert.assertEquals(3, dns.size()); // get pipeline details by dnid Set pipelines = mapping.getPipelineSelector() - .getNode2PipelineMap().getPipelines(dns.get(0).getUuid()); + .getPipelineId(dns.get(0).getUuid()); Assert.assertEquals(1, pipelines.size()); pipelines.forEach(p -> Assert.assertEquals(p, ratisContainer.getPipeline().getId())); @@ -114,7 +114,7 @@ public void testPipelineMap() throws IOException { .updateContainerState(cId, HddsProtos.LifeCycleEvent.FINALIZE); mapping .updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE); - NavigableSet set2 = stateMap.getOpenContainerIDsByPipeline( + Set set2 = pipelineSelector.getOpenContainerIDsByPipeline( ratisContainer.getPipeline().getId()); Assert.assertEquals(0, set2.size()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index 0f8f925475..a5828e117a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -35,7 +35,7 @@ import org.junit.Test; import java.io.IOException; -import java.util.NavigableSet; +import java.util.Set; import java.util.concurrent.TimeoutException; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos @@ -88,12 +88,13 @@ public static void shutdown() { @Test public void testPipelineCloseWithClosedContainer() throws IOException { - NavigableSet set = stateMap.getOpenContainerIDsByPipeline( + Set set = pipelineSelector.getOpenContainerIDsByPipeline( ratisContainer1.getPipeline().getId()); long cId = ratisContainer1.getContainerInfo().getContainerID(); Assert.assertEquals(1, set.size()); - Assert.assertEquals(cId, set.first().getId()); + set.forEach(containerID -> + Assert.assertEquals(containerID, ContainerID.valueof(cId))); // Now close the container and it should not show up while fetching // containers by pipeline @@ -106,7 +107,7 @@ public void testPipelineCloseWithClosedContainer() throws IOException { mapping .updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE); - NavigableSet setClosed = stateMap.getOpenContainerIDsByPipeline( + Set setClosed = pipelineSelector.getOpenContainerIDsByPipeline( ratisContainer1.getPipeline().getId()); Assert.assertEquals(0, setClosed.size()); @@ -118,15 +119,15 @@ public void testPipelineCloseWithClosedContainer() throws IOException { HddsProtos.LifeCycleState.CLOSED); for (DatanodeDetails dn : ratisContainer1.getPipeline().getMachines()) { // Assert that the pipeline has been removed from Node2PipelineMap as well - Assert.assertEquals(pipelineSelector.getNode2PipelineMap() - .getPipelines(dn.getUuid()).size(), 0); + Assert.assertEquals(pipelineSelector.getPipelineId( + dn.getUuid()).size(), 0); } } @Test public void testPipelineCloseWithOpenContainer() throws IOException, TimeoutException, InterruptedException { - NavigableSet setOpen = stateMap.getOpenContainerIDsByPipeline( + Set setOpen = pipelineSelector.getOpenContainerIDsByPipeline( ratisContainer2.getPipeline().getId()); Assert.assertEquals(1, setOpen.size()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java new file mode 100644 index 0000000000..3999d76658 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.container.ContainerMapping; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; +import static org.apache.hadoop.hdds.protocol.proto + .HddsProtos.ReplicationFactor.THREE; +import static org.apache.hadoop.hdds.protocol.proto + .HddsProtos.ReplicationType.RATIS; + +/** + * Test SCM restart and recovery wrt pipelines. + */ +public class TestSCMRestart { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + private static Pipeline ratisPipeline1; + private static Pipeline ratisPipeline2; + private static ContainerMapping mapping; + private static ContainerMapping newMapping; + + /** + * Create a MiniDFSCluster for testing. + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(6) + .setHbInterval(1000) + .setHbProcessorInterval(1000) + .build(); + cluster.waitForClusterToBeReady(); + StorageContainerManager scm = cluster.getStorageContainerManager(); + mapping = (ContainerMapping)scm.getScmContainerManager(); + ratisPipeline1 = + mapping.allocateContainer(RATIS, THREE, "Owner1").getPipeline(); + ratisPipeline2 = + mapping.allocateContainer(RATIS, ONE, "Owner2").getPipeline(); + // At this stage, there should be 2 pipeline one with 1 open container + // each. Try restarting the SCM and then discover that pipeline are in + // correct state. + cluster.restartStorageContainerManager(); + newMapping = (ContainerMapping)(cluster.getStorageContainerManager() + .getScmContainerManager()); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testPipelineWithScmRestart() { + // After restart make sure that the pipeline are still present + Pipeline ratisPipeline1AfterRestart = newMapping.getPipelineSelector() + .getPipeline(ratisPipeline1.getId()); + Pipeline ratisPipeline2AfterRestart = newMapping.getPipelineSelector() + .getPipeline(ratisPipeline2.getId()); + Assert.assertNotSame(ratisPipeline1AfterRestart, ratisPipeline1); + Assert.assertNotSame(ratisPipeline2AfterRestart, ratisPipeline2); + Assert.assertEquals(ratisPipeline1AfterRestart, ratisPipeline1); + Assert.assertEquals(ratisPipeline2AfterRestart, ratisPipeline2); + } +} \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 3cba83905e..d13efb4343 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -138,8 +138,11 @@ static Builder newBuilder(OzoneConfiguration conf) { * Restarts StorageContainerManager instance. * * @throws IOException + * @throws TimeoutException + * @throws InterruptedException */ - void restartStorageContainerManager() throws IOException; + void restartStorageContainerManager() throws InterruptedException, + TimeoutException, IOException; /** * Restarts OzoneManager instance. diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index c2169a351a..b34a7d11bc 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -85,7 +85,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster { LoggerFactory.getLogger(MiniOzoneClusterImpl.class); private final OzoneConfiguration conf; - private final StorageContainerManager scm; + private StorageContainerManager scm; private final OzoneManager ozoneManager; private final List hddsDatanodes; @@ -215,9 +215,13 @@ public OzoneClient getRestClient() throws IOException { } @Override - public void restartStorageContainerManager() throws IOException { + public void restartStorageContainerManager() + throws TimeoutException, InterruptedException, IOException { scm.stop(); + scm.join(); + scm = StorageContainerManager.createSCM(null, conf); scm.start(); + waitForClusterToBeReady(); } @Override