diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 71184cf890..6e940adbc3 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -236,6 +236,11 @@ public final class ScmConfigKeys {
public static final String
OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
+ public static final String OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT =
+ "ozone.scm.pipeline.creation.lease.timeout";
+
+ public static final String
+ OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY =
"ozone.scm.block.deletion.max.retry";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
index c5794f4c03..534c9fd541 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
@@ -213,6 +213,13 @@ public HddsProtos.LifeCycleState getLifeCycleState() {
return lifeCycleState;
}
+ /**
+ * Update the State of the pipeline.
+ */
+ public void setLifeCycleState(HddsProtos.LifeCycleState nextState) {
+ lifeCycleState = nextState;
+ }
+
/**
* Gets the pipeline Name.
*
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 5a1d26a8dd..69a382a013 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1085,5 +1085,17 @@
executed since last report. Unit could be defined with
postfix (ns,ms,s,m,h,d)
+
+ ozone.scm.pipeline.creation.lease.timeout
+ 60s
+ OZONE, SCM, PIPELINE
+
+ Pipeline creation timeout in milliseconds to be used by SCM. When
+ BEGIN_CREATE event happens the pipeline is moved from ALLOCATED to
+ CREATING state, SCM will now wait for the configured amount of time
+ to get COMPLETE_CREATE event if it doesn't receive it will move the
+ pipeline to DELETING.
+
+
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 26f4d868f7..f07d22baaf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -658,6 +658,10 @@ public void close() throws IOException {
if (containerStore != null) {
containerStore.close();
}
+
+ if (pipelineSelector != null) {
+ pipelineSelector.shutdown();
+ }
}
/**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
index d7d70ef98c..00855426ea 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
@@ -107,6 +107,7 @@ public enum ResultCodes {
FAILED_TO_LOAD_OPEN_CONTAINER,
FAILED_TO_ALLOCATE_CONTAINER,
FAILED_TO_CHANGE_CONTAINER_STATE,
+ FAILED_TO_CHANGE_PIPELINE_STATE,
CONTAINER_EXISTS,
FAILED_TO_FIND_CONTAINER,
FAILED_TO_FIND_CONTAINER_WITH_SPACE,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index a041973f93..77d8211699 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -59,41 +59,16 @@ public PipelineManager(Node2PipelineMap map) {
* @return a Pipeline.
*/
public synchronized final Pipeline getPipeline(
- ReplicationFactor replicationFactor, ReplicationType replicationType)
- throws IOException {
- /**
- * In the Ozone world, we have a very simple policy.
- *
- * 1. Try to create a pipeline if there are enough free nodes.
- *
- * 2. This allows all nodes to part of a pipeline quickly.
- *
- * 3. if there are not enough free nodes, return pipeline in a
- * round-robin fashion.
- *
- * TODO: Might have to come up with a better algorithm than this.
- * Create a new placement policy that returns pipelines in round robin
- * fashion.
- */
- Pipeline pipeline = allocatePipeline(replicationFactor);
+ ReplicationFactor replicationFactor, ReplicationType replicationType) {
+ Pipeline pipeline = findOpenPipeline(replicationType, replicationFactor);
if (pipeline != null) {
- LOG.debug("created new pipeline:{} for container with " +
+ LOG.debug("re-used pipeline:{} for container with " +
"replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor);
- activePipelines.add(pipeline);
- activePipelineMap.put(pipeline.getPipelineName(), pipeline);
- node2PipelineMap.addPipeline(pipeline);
- } else {
- pipeline = findOpenPipeline(replicationType, replicationFactor);
- if (pipeline != null) {
- LOG.debug("re-used pipeline:{} for container with " +
- "replicationType:{} replicationFactor:{}",
- pipeline.getPipelineName(), replicationType, replicationFactor);
- }
}
if (pipeline == null) {
LOG.error("Get pipeline call failed. We are not able to find" +
- "free nodes or operational pipeline.");
+ " operational pipeline.");
return null;
} else {
return pipeline;
@@ -109,7 +84,7 @@ public synchronized final Pipeline getPipeline(
public synchronized final Pipeline getPipeline(String pipelineName) {
Pipeline pipeline = null;
- // 1. Check if pipeline channel already exists
+ // 1. Check if pipeline already exists
if (activePipelineMap.containsKey(pipelineName)) {
pipeline = activePipelineMap.get(pipelineName);
LOG.debug("Returning pipeline for pipelineName:{}", pipelineName);
@@ -132,7 +107,13 @@ protected int getReplicationCount(ReplicationFactor factor) {
}
public abstract Pipeline allocatePipeline(
- ReplicationFactor replicationFactor) throws IOException;
+ ReplicationFactor replicationFactor);
+
+ /**
+ * Initialize the pipeline
+ * TODO: move the initialization to Ozone Client later
+ */
+ public abstract void initializePipeline(Pipeline pipeline) throws IOException;
public void removePipeline(Pipeline pipeline) {
activePipelines.remove(pipeline);
@@ -179,12 +160,23 @@ private int getNextIndex() {
}
/**
- * Creates a pipeline from a specified set of Nodes.
- * @param pipelineID - Name of the pipeline
- * @param datanodes - The list of datanodes that make this pipeline.
+ * Creates a pipeline with a specified replication factor and type.
+ * @param replicationFactor - Replication Factor.
+ * @param replicationType - Replication Type.
*/
- public abstract void createPipeline(String pipelineID,
- List datanodes) throws IOException;
+ public Pipeline createPipeline(ReplicationFactor replicationFactor,
+ ReplicationType replicationType) throws IOException {
+ Pipeline pipeline = allocatePipeline(replicationFactor);
+ if (pipeline != null) {
+ LOG.debug("created new pipeline:{} for container with "
+ + "replicationType:{} replicationFactor:{}",
+ pipeline.getPipelineName(), replicationType, replicationFactor);
+ activePipelines.add(pipeline);
+ activePipelineMap.put(pipeline.getPipelineName(), pipeline);
+ node2PipelineMap.addPipeline(pipeline);
+ }
+ return pipeline;
+ }
/**
* Close the pipeline with the given clusterId.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index 2955af58fb..08710e7937 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -24,6 +24,7 @@
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
@@ -33,17 +34,28 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.statemachine
+ .InvalidStateTransitionException;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.apache.hadoop.ozone.lease.Lease;
+import org.apache.hadoop.ozone.lease.LeaseException;
+import org.apache.hadoop.ozone.lease.LeaseManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+ .FAILED_TO_CHANGE_PIPELINE_STATE;
+
/**
* Sends the request to the right pipeline manager.
*/
@@ -57,6 +69,10 @@ public class PipelineSelector {
private final StandaloneManagerImpl standaloneManager;
private final long containerSize;
private final Node2PipelineMap node2PipelineMap;
+ private final LeaseManager pipelineLeaseManager;
+ private final StateMachine stateMachine;
+
/**
* Constructs a pipeline Selector.
*
@@ -77,6 +93,74 @@ public PipelineSelector(NodeManager nodeManager, Configuration conf) {
this.ratisManager =
new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
conf, node2PipelineMap);
+ // Initialize the container state machine.
+ Set finalStates = new HashSet();
+ long pipelineCreationLeaseTimeout = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ LOG.trace("Starting Pipeline Lease Manager.");
+ pipelineLeaseManager = new LeaseManager<>(pipelineCreationLeaseTimeout);
+ pipelineLeaseManager.start();
+
+ // These are the steady states of a container.
+ finalStates.add(HddsProtos.LifeCycleState.OPEN);
+ finalStates.add(HddsProtos.LifeCycleState.CLOSED);
+
+ this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
+ finalStates);
+ initializeStateMachine();
+ }
+
+ /**
+ * Event and State Transition Mapping:
+ *
+ * State: ALLOCATED ---------------> CREATING
+ * Event: CREATE
+ *
+ * State: CREATING ---------------> OPEN
+ * Event: CREATED
+ *
+ * State: OPEN ---------------> CLOSING
+ * Event: FINALIZE
+ *
+ * State: CLOSING ---------------> CLOSED
+ * Event: CLOSE
+ *
+ * State: CREATING ---------------> CLOSED
+ * Event: TIMEOUT
+ *
+ *
+ * Container State Flow:
+ *
+ * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
+ * (CREATE) | (CREATED) (FINALIZE) |
+ * | |
+ * | |
+ * |(TIMEOUT) |(CLOSE)
+ * | |
+ * +--------> [CLOSED] <--------+
+ */
+ private void initializeStateMachine() {
+ stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
+ HddsProtos.LifeCycleState.CREATING,
+ HddsProtos.LifeCycleEvent.CREATE);
+
+ stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
+ HddsProtos.LifeCycleState.OPEN,
+ HddsProtos.LifeCycleEvent.CREATED);
+
+ stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
+ HddsProtos.LifeCycleState.CLOSING,
+ HddsProtos.LifeCycleEvent.FINALIZE);
+
+ stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
+ HddsProtos.LifeCycleState.CLOSED,
+ HddsProtos.LifeCycleEvent.CLOSE);
+
+ stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
+ HddsProtos.LifeCycleState.CLOSED,
+ HddsProtos.LifeCycleEvent.TIMEOUT);
}
/**
@@ -88,15 +172,14 @@ public PipelineSelector(NodeManager nodeManager, Configuration conf) {
* @return pipeline corresponding to nodes
*/
public static Pipeline newPipelineFromNodes(
- List nodes, LifeCycleState state,
- ReplicationType replicationType, ReplicationFactor replicationFactor,
- String name) {
+ List nodes, ReplicationType replicationType,
+ ReplicationFactor replicationFactor, String name) {
Preconditions.checkNotNull(nodes);
Preconditions.checkArgument(nodes.size() > 0);
String leaderId = nodes.get(0).getUuidString();
- Pipeline
- pipeline = new Pipeline(leaderId, state, replicationType,
- replicationFactor, name);
+ // A new pipeline always starts in allocated state
+ Pipeline pipeline = new Pipeline(leaderId, LifeCycleState.ALLOCATED,
+ replicationType, replicationFactor, name);
for (DatanodeDetails node : nodes) {
pipeline.addMember(node);
}
@@ -175,8 +258,35 @@ public Pipeline getReplicationPipeline(ReplicationType replicationType,
LOG.debug("Getting replication pipeline forReplicationType {} :" +
" ReplicationFactor {}", replicationType.toString(),
replicationFactor.toString());
- return manager.
- getPipeline(replicationFactor, replicationType);
+
+ /**
+ * In the Ozone world, we have a very simple policy.
+ *
+ * 1. Try to create a pipeline if there are enough free nodes.
+ *
+ * 2. This allows all nodes to part of a pipeline quickly.
+ *
+ * 3. if there are not enough free nodes, return already allocated pipeline
+ * in a round-robin fashion.
+ *
+ * TODO: Might have to come up with a better algorithm than this.
+ * Create a new placement policy that returns pipelines in round robin
+ * fashion.
+ */
+ Pipeline pipeline =
+ manager.createPipeline(replicationFactor, replicationType);
+ if (pipeline == null) {
+ // try to return a pipeline from already allocated pipelines
+ pipeline = manager.getPipeline(replicationFactor, replicationType);
+ } else {
+ // if a new pipeline is created, initialize its state machine
+ updatePipelineState(pipeline,HddsProtos.LifeCycleEvent.CREATE);
+
+ //TODO: move the initialization of pipeline to Ozone Client
+ manager.initializePipeline(pipeline);
+ updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATED);
+ }
+ return pipeline;
}
/**
@@ -194,19 +304,6 @@ public Pipeline getPipeline(String pipelineName,
" pipelineName:{}", replicationType, pipelineName);
return manager.getPipeline(pipelineName);
}
- /**
- * Creates a pipeline from a specified set of Nodes.
- */
-
- public void createPipeline(ReplicationType replicationType, String
- pipelineID, List datanodes) throws IOException {
- PipelineManager manager = getPipelineManager(replicationType);
- Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
- LOG.debug("Creating a pipeline: {} with nodes:{}", pipelineID,
- datanodes.stream().map(DatanodeDetails::toString)
- .collect(Collectors.joining(",")));
- manager.createPipeline(pipelineID, datanodes);
- }
/**
* Close the pipeline with the given clusterId.
@@ -251,12 +348,77 @@ public Node2PipelineMap getNode2PipelineMap() {
}
public void removePipeline(UUID dnId) {
- Set pipelineChannelSet =
+ Set pipelineSet =
node2PipelineMap.getPipelines(dnId);
- for (Pipeline pipelineChannel : pipelineChannelSet) {
- getPipelineManager(pipelineChannel.getType())
- .removePipeline(pipelineChannel);
+ for (Pipeline pipeline : pipelineSet) {
+ getPipelineManager(pipeline.getType())
+ .removePipeline(pipeline);
}
node2PipelineMap.removeDatanode(dnId);
}
+
+ /**
+ * Update the Pipeline State to the next state.
+ *
+ * @param pipeline - Pipeline
+ * @param event - LifeCycle Event
+ * @throws SCMException on Failure.
+ */
+ public void updatePipelineState(Pipeline pipeline,
+ HddsProtos.LifeCycleEvent event) throws IOException {
+ HddsProtos.LifeCycleState newState;
+ try {
+ newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event);
+ } catch (InvalidStateTransitionException ex) {
+ String error = String.format("Failed to update pipeline state %s, " +
+ "reason: invalid state transition from state: %s upon " +
+ "event: %s.",
+ pipeline.getPipelineName(), pipeline.getLifeCycleState(), event);
+ LOG.error(error);
+ throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
+ }
+
+ // This is a post condition after executing getNextState.
+ Preconditions.checkNotNull(newState);
+ Preconditions.checkNotNull(pipeline);
+ try {
+ switch (event) {
+ case CREATE:
+ // Acquire lease on pipeline
+ Lease pipelineLease = pipelineLeaseManager.acquire(pipeline);
+ // Register callback to be executed in case of timeout
+ pipelineLease.registerCallBack(() -> {
+ updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.TIMEOUT);
+ return null;
+ });
+ break;
+ case CREATED:
+ // Release the lease on pipeline
+ pipelineLeaseManager.release(pipeline);
+ break;
+
+ case FINALIZE:
+ //TODO: cleanup pipeline by closing all the containers on the pipeline
+ break;
+
+ case CLOSE:
+ case TIMEOUT:
+ // TODO: Release the nodes here when pipelines are destroyed
+ break;
+ default:
+ throw new SCMException("Unsupported pipeline LifeCycleEvent.",
+ FAILED_TO_CHANGE_PIPELINE_STATE);
+ }
+
+ pipeline.setLifeCycleState(newState);
+ } catch (LeaseException e) {
+ throw new IOException("Lease Exception.", e);
+ }
+ }
+
+ public void shutdown() {
+ if (pipelineLeaseManager != null) {
+ pipelineLeaseManager.shutdown();
+ }
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index a8f8b206df..c726ef6fb4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -27,7 +27,6 @@
import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -72,7 +71,7 @@ public RatisManagerImpl(NodeManager nodeManager,
* Allocates a new ratis Pipeline from the free nodes.
*
* @param factor - One or Three
- * @return PipelineChannel.
+ * @return Pipeline.
*/
public Pipeline allocatePipeline(ReplicationFactor factor) {
List newNodesList = new LinkedList<>();
@@ -89,35 +88,23 @@ public Pipeline allocatePipeline(ReplicationFactor factor) {
// further allocations
ratisMembers.addAll(newNodesList);
LOG.info("Allocating a new ratis pipeline of size: {}", count);
- // Start all channel names with "Ratis", easy to grep the logs.
+ // Start all pipeline names with "Ratis", easy to grep the logs.
String pipelineName = PREFIX +
UUID.randomUUID().toString().substring(PREFIX.length());
- Pipeline pipeline=
- PipelineSelector.newPipelineFromNodes(newNodesList,
- LifeCycleState.OPEN, ReplicationType.RATIS, factor, pipelineName);
- try (XceiverClientRatis client =
- XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
- client.createPipeline(pipeline.getPipelineName(), newNodesList);
- } catch (IOException e) {
- return null;
- }
- return pipeline;
+ return PipelineSelector.newPipelineFromNodes(newNodesList,
+ ReplicationType.RATIS, factor, pipelineName);
}
}
}
return null;
}
- /**
- * Creates a pipeline from a specified set of Nodes.
- *
- * @param pipelineID - Name of the pipeline
- * @param datanodes - The list of datanodes that make this pipeline.
- */
- @Override
- public void createPipeline(String pipelineID,
- List datanodes) {
-
+ public void initializePipeline(Pipeline pipeline) throws IOException {
+ //TODO:move the initialization from SCM to client
+ try (XceiverClientRatis client =
+ XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
+ client.createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
+ }
}
/**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index cf691bfb20..bb4951f22c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -25,7 +25,6 @@
import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -86,29 +85,19 @@ public Pipeline allocatePipeline(ReplicationFactor factor) {
// once a datanode has been added to a pipeline, exclude it from
// further allocations
standAloneMembers.addAll(newNodesList);
- LOG.info("Allocating a new standalone pipeline channel of size: {}",
- count);
- String channelName =
+ LOG.info("Allocating a new standalone pipeline of size: {}", count);
+ String pipelineName =
"SA-" + UUID.randomUUID().toString().substring(3);
return PipelineSelector.newPipelineFromNodes(newNodesList,
- LifeCycleState.OPEN, ReplicationType.STAND_ALONE,
- ReplicationFactor.ONE, channelName);
+ ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
}
}
}
return null;
}
- /**
- * Creates a pipeline from a specified set of Nodes.
- *
- * @param pipelineID - Name of the pipeline
- * @param datanodes - The list of datanodes that make this pipeline.
- */
- @Override
- public void createPipeline(String pipelineID,
- List datanodes) {
- //return newPipelineFromNodes(datanodes, pipelineID);
+ public void initializePipeline(Pipeline pipeline) {
+ // Nothing to be done for standalone pipeline
}
/**
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index bc3505fa7e..ffac6d5288 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -26,6 +26,8 @@
.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.junit.AfterClass;
@@ -51,6 +53,7 @@ public class TestNode2PipelineMap {
private static ContainerWithPipeline ratisContainer;
private static ContainerStateMap stateMap;
private static ContainerMapping mapping;
+ private static PipelineSelector pipelineSelector;
/**
* Create a MiniDFSCluster for testing.
@@ -66,6 +69,7 @@ public static void init() throws Exception {
mapping = (ContainerMapping)scm.getScmContainerManager();
stateMap = mapping.getStateManager().getContainerStateMap();
ratisContainer = mapping.allocateContainer(RATIS, THREE, "testOwner");
+ pipelineSelector = mapping.getPipelineSelector();
}
/**
@@ -113,5 +117,15 @@ public void testPipelineMap() throws IOException {
NavigableSet set2 = stateMap.getOpenContainerIDsByPipeline(
ratisContainer.getPipeline().getPipelineName());
Assert.assertEquals(0, set2.size());
+
+ try {
+ pipelineSelector.updatePipelineState(ratisContainer.getPipeline(),
+ HddsProtos.LifeCycleEvent.CLOSE);
+ Assert.fail("closing of pipeline without finalize should fail");
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof SCMException);
+ Assert.assertEquals(((SCMException)e).getResult(),
+ SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE);
+ }
}
}