HDDS-239. Add PipelineStateManager to track pipeline state transition. Contributed by Mukul Kumar Singh.

This commit is contained in:
Xiaoyu Yao 2018-07-20 14:22:02 -07:00
parent d2acf8d560
commit 9be25e3476
10 changed files with 273 additions and 100 deletions

View File

@ -236,6 +236,11 @@ public final class ScmConfigKeys {
public static final String
OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
public static final String OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT =
"ozone.scm.pipeline.creation.lease.timeout";
public static final String
OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY =
"ozone.scm.block.deletion.max.retry";

View File

@ -213,6 +213,13 @@ public HddsProtos.LifeCycleState getLifeCycleState() {
return lifeCycleState;
}
/**
* Update the State of the pipeline.
*/
public void setLifeCycleState(HddsProtos.LifeCycleState nextState) {
lifeCycleState = nextState;
}
/**
* Gets the pipeline Name.
*

View File

@ -1085,5 +1085,17 @@
executed since last report. Unit could be defined with
postfix (ns,ms,s,m,h,d)</description>
</property>
<property>
<name>ozone.scm.pipeline.creation.lease.timeout</name>
<value>60s</value>
<tag>OZONE, SCM, PIPELINE</tag>
<description>
Pipeline creation timeout in milliseconds to be used by SCM. When
BEGIN_CREATE event happens the pipeline is moved from ALLOCATED to
CREATING state, SCM will now wait for the configured amount of time
to get COMPLETE_CREATE event if it doesn't receive it will move the
pipeline to DELETING.
</description>
</property>
</configuration>

View File

@ -658,6 +658,10 @@ public void close() throws IOException {
if (containerStore != null) {
containerStore.close();
}
if (pipelineSelector != null) {
pipelineSelector.shutdown();
}
}
/**

View File

@ -107,6 +107,7 @@ public enum ResultCodes {
FAILED_TO_LOAD_OPEN_CONTAINER,
FAILED_TO_ALLOCATE_CONTAINER,
FAILED_TO_CHANGE_CONTAINER_STATE,
FAILED_TO_CHANGE_PIPELINE_STATE,
CONTAINER_EXISTS,
FAILED_TO_FIND_CONTAINER,
FAILED_TO_FIND_CONTAINER_WITH_SPACE,

View File

@ -59,41 +59,16 @@ public PipelineManager(Node2PipelineMap map) {
* @return a Pipeline.
*/
public synchronized final Pipeline getPipeline(
ReplicationFactor replicationFactor, ReplicationType replicationType)
throws IOException {
/**
* In the Ozone world, we have a very simple policy.
*
* 1. Try to create a pipeline if there are enough free nodes.
*
* 2. This allows all nodes to part of a pipeline quickly.
*
* 3. if there are not enough free nodes, return pipeline in a
* round-robin fashion.
*
* TODO: Might have to come up with a better algorithm than this.
* Create a new placement policy that returns pipelines in round robin
* fashion.
*/
Pipeline pipeline = allocatePipeline(replicationFactor);
ReplicationFactor replicationFactor, ReplicationType replicationType) {
Pipeline pipeline = findOpenPipeline(replicationType, replicationFactor);
if (pipeline != null) {
LOG.debug("created new pipeline:{} for container with " +
LOG.debug("re-used pipeline:{} for container with " +
"replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor);
activePipelines.add(pipeline);
activePipelineMap.put(pipeline.getPipelineName(), pipeline);
node2PipelineMap.addPipeline(pipeline);
} else {
pipeline = findOpenPipeline(replicationType, replicationFactor);
if (pipeline != null) {
LOG.debug("re-used pipeline:{} for container with " +
"replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor);
}
}
if (pipeline == null) {
LOG.error("Get pipeline call failed. We are not able to find" +
"free nodes or operational pipeline.");
" operational pipeline.");
return null;
} else {
return pipeline;
@ -109,7 +84,7 @@ public synchronized final Pipeline getPipeline(
public synchronized final Pipeline getPipeline(String pipelineName) {
Pipeline pipeline = null;
// 1. Check if pipeline channel already exists
// 1. Check if pipeline already exists
if (activePipelineMap.containsKey(pipelineName)) {
pipeline = activePipelineMap.get(pipelineName);
LOG.debug("Returning pipeline for pipelineName:{}", pipelineName);
@ -132,7 +107,13 @@ protected int getReplicationCount(ReplicationFactor factor) {
}
public abstract Pipeline allocatePipeline(
ReplicationFactor replicationFactor) throws IOException;
ReplicationFactor replicationFactor);
/**
* Initialize the pipeline
* TODO: move the initialization to Ozone Client later
*/
public abstract void initializePipeline(Pipeline pipeline) throws IOException;
public void removePipeline(Pipeline pipeline) {
activePipelines.remove(pipeline);
@ -179,12 +160,23 @@ private int getNextIndex() {
}
/**
* Creates a pipeline from a specified set of Nodes.
* @param pipelineID - Name of the pipeline
* @param datanodes - The list of datanodes that make this pipeline.
* Creates a pipeline with a specified replication factor and type.
* @param replicationFactor - Replication Factor.
* @param replicationType - Replication Type.
*/
public abstract void createPipeline(String pipelineID,
List<DatanodeDetails> datanodes) throws IOException;
public Pipeline createPipeline(ReplicationFactor replicationFactor,
ReplicationType replicationType) throws IOException {
Pipeline pipeline = allocatePipeline(replicationFactor);
if (pipeline != null) {
LOG.debug("created new pipeline:{} for container with "
+ "replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor);
activePipelines.add(pipeline);
activePipelineMap.put(pipeline.getPipelineName(), pipeline);
node2PipelineMap.addPipeline(pipeline);
}
return pipeline;
}
/**
* Close the pipeline with the given clusterId.

View File

@ -24,6 +24,7 @@
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.SCMContainerPlacementRandom;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
@ -33,17 +34,28 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.statemachine
.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
import org.apache.hadoop.ozone.lease.Lease;
import org.apache.hadoop.ozone.lease.LeaseException;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.FAILED_TO_CHANGE_PIPELINE_STATE;
/**
* Sends the request to the right pipeline manager.
*/
@ -57,6 +69,10 @@ public class PipelineSelector {
private final StandaloneManagerImpl standaloneManager;
private final long containerSize;
private final Node2PipelineMap node2PipelineMap;
private final LeaseManager<Pipeline> pipelineLeaseManager;
private final StateMachine<LifeCycleState,
HddsProtos.LifeCycleEvent> stateMachine;
/**
* Constructs a pipeline Selector.
*
@ -77,6 +93,74 @@ public PipelineSelector(NodeManager nodeManager, Configuration conf) {
this.ratisManager =
new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
conf, node2PipelineMap);
// Initialize the container state machine.
Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
long pipelineCreationLeaseTimeout = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
LOG.trace("Starting Pipeline Lease Manager.");
pipelineLeaseManager = new LeaseManager<>(pipelineCreationLeaseTimeout);
pipelineLeaseManager.start();
// These are the steady states of a container.
finalStates.add(HddsProtos.LifeCycleState.OPEN);
finalStates.add(HddsProtos.LifeCycleState.CLOSED);
this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
finalStates);
initializeStateMachine();
}
/**
* Event and State Transition Mapping:
*
* State: ALLOCATED ---------------> CREATING
* Event: CREATE
*
* State: CREATING ---------------> OPEN
* Event: CREATED
*
* State: OPEN ---------------> CLOSING
* Event: FINALIZE
*
* State: CLOSING ---------------> CLOSED
* Event: CLOSE
*
* State: CREATING ---------------> CLOSED
* Event: TIMEOUT
*
*
* Container State Flow:
*
* [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
* (CREATE) | (CREATED) (FINALIZE) |
* | |
* | |
* |(TIMEOUT) |(CLOSE)
* | |
* +--------> [CLOSED] <--------+
*/
private void initializeStateMachine() {
stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
HddsProtos.LifeCycleState.CREATING,
HddsProtos.LifeCycleEvent.CREATE);
stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
HddsProtos.LifeCycleState.OPEN,
HddsProtos.LifeCycleEvent.CREATED);
stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
HddsProtos.LifeCycleState.CLOSING,
HddsProtos.LifeCycleEvent.FINALIZE);
stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
HddsProtos.LifeCycleState.CLOSED,
HddsProtos.LifeCycleEvent.CLOSE);
stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
HddsProtos.LifeCycleState.CLOSED,
HddsProtos.LifeCycleEvent.TIMEOUT);
}
/**
@ -88,15 +172,14 @@ public PipelineSelector(NodeManager nodeManager, Configuration conf) {
* @return pipeline corresponding to nodes
*/
public static Pipeline newPipelineFromNodes(
List<DatanodeDetails> nodes, LifeCycleState state,
ReplicationType replicationType, ReplicationFactor replicationFactor,
String name) {
List<DatanodeDetails> nodes, ReplicationType replicationType,
ReplicationFactor replicationFactor, String name) {
Preconditions.checkNotNull(nodes);
Preconditions.checkArgument(nodes.size() > 0);
String leaderId = nodes.get(0).getUuidString();
Pipeline
pipeline = new Pipeline(leaderId, state, replicationType,
replicationFactor, name);
// A new pipeline always starts in allocated state
Pipeline pipeline = new Pipeline(leaderId, LifeCycleState.ALLOCATED,
replicationType, replicationFactor, name);
for (DatanodeDetails node : nodes) {
pipeline.addMember(node);
}
@ -175,8 +258,35 @@ public Pipeline getReplicationPipeline(ReplicationType replicationType,
LOG.debug("Getting replication pipeline forReplicationType {} :" +
" ReplicationFactor {}", replicationType.toString(),
replicationFactor.toString());
return manager.
getPipeline(replicationFactor, replicationType);
/**
* In the Ozone world, we have a very simple policy.
*
* 1. Try to create a pipeline if there are enough free nodes.
*
* 2. This allows all nodes to part of a pipeline quickly.
*
* 3. if there are not enough free nodes, return already allocated pipeline
* in a round-robin fashion.
*
* TODO: Might have to come up with a better algorithm than this.
* Create a new placement policy that returns pipelines in round robin
* fashion.
*/
Pipeline pipeline =
manager.createPipeline(replicationFactor, replicationType);
if (pipeline == null) {
// try to return a pipeline from already allocated pipelines
pipeline = manager.getPipeline(replicationFactor, replicationType);
} else {
// if a new pipeline is created, initialize its state machine
updatePipelineState(pipeline,HddsProtos.LifeCycleEvent.CREATE);
//TODO: move the initialization of pipeline to Ozone Client
manager.initializePipeline(pipeline);
updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATED);
}
return pipeline;
}
/**
@ -194,19 +304,6 @@ public Pipeline getPipeline(String pipelineName,
" pipelineName:{}", replicationType, pipelineName);
return manager.getPipeline(pipelineName);
}
/**
* Creates a pipeline from a specified set of Nodes.
*/
public void createPipeline(ReplicationType replicationType, String
pipelineID, List<DatanodeDetails> datanodes) throws IOException {
PipelineManager manager = getPipelineManager(replicationType);
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Creating a pipeline: {} with nodes:{}", pipelineID,
datanodes.stream().map(DatanodeDetails::toString)
.collect(Collectors.joining(",")));
manager.createPipeline(pipelineID, datanodes);
}
/**
* Close the pipeline with the given clusterId.
@ -251,12 +348,77 @@ public Node2PipelineMap getNode2PipelineMap() {
}
public void removePipeline(UUID dnId) {
Set<Pipeline> pipelineChannelSet =
Set<Pipeline> pipelineSet =
node2PipelineMap.getPipelines(dnId);
for (Pipeline pipelineChannel : pipelineChannelSet) {
getPipelineManager(pipelineChannel.getType())
.removePipeline(pipelineChannel);
for (Pipeline pipeline : pipelineSet) {
getPipelineManager(pipeline.getType())
.removePipeline(pipeline);
}
node2PipelineMap.removeDatanode(dnId);
}
/**
* Update the Pipeline State to the next state.
*
* @param pipeline - Pipeline
* @param event - LifeCycle Event
* @throws SCMException on Failure.
*/
public void updatePipelineState(Pipeline pipeline,
HddsProtos.LifeCycleEvent event) throws IOException {
HddsProtos.LifeCycleState newState;
try {
newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event);
} catch (InvalidStateTransitionException ex) {
String error = String.format("Failed to update pipeline state %s, " +
"reason: invalid state transition from state: %s upon " +
"event: %s.",
pipeline.getPipelineName(), pipeline.getLifeCycleState(), event);
LOG.error(error);
throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
}
// This is a post condition after executing getNextState.
Preconditions.checkNotNull(newState);
Preconditions.checkNotNull(pipeline);
try {
switch (event) {
case CREATE:
// Acquire lease on pipeline
Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline);
// Register callback to be executed in case of timeout
pipelineLease.registerCallBack(() -> {
updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.TIMEOUT);
return null;
});
break;
case CREATED:
// Release the lease on pipeline
pipelineLeaseManager.release(pipeline);
break;
case FINALIZE:
//TODO: cleanup pipeline by closing all the containers on the pipeline
break;
case CLOSE:
case TIMEOUT:
// TODO: Release the nodes here when pipelines are destroyed
break;
default:
throw new SCMException("Unsupported pipeline LifeCycleEvent.",
FAILED_TO_CHANGE_PIPELINE_STATE);
}
pipeline.setLifeCycleState(newState);
} catch (LeaseException e) {
throw new IOException("Lease Exception.", e);
}
}
public void shutdown() {
if (pipelineLeaseManager != null) {
pipelineLeaseManager.shutdown();
}
}
}

View File

@ -27,7 +27,6 @@
import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@ -72,7 +71,7 @@ public RatisManagerImpl(NodeManager nodeManager,
* Allocates a new ratis Pipeline from the free nodes.
*
* @param factor - One or Three
* @return PipelineChannel.
* @return Pipeline.
*/
public Pipeline allocatePipeline(ReplicationFactor factor) {
List<DatanodeDetails> newNodesList = new LinkedList<>();
@ -89,35 +88,23 @@ public Pipeline allocatePipeline(ReplicationFactor factor) {
// further allocations
ratisMembers.addAll(newNodesList);
LOG.info("Allocating a new ratis pipeline of size: {}", count);
// Start all channel names with "Ratis", easy to grep the logs.
// Start all pipeline names with "Ratis", easy to grep the logs.
String pipelineName = PREFIX +
UUID.randomUUID().toString().substring(PREFIX.length());
Pipeline pipeline=
PipelineSelector.newPipelineFromNodes(newNodesList,
LifeCycleState.OPEN, ReplicationType.RATIS, factor, pipelineName);
try (XceiverClientRatis client =
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
client.createPipeline(pipeline.getPipelineName(), newNodesList);
} catch (IOException e) {
return null;
}
return pipeline;
return PipelineSelector.newPipelineFromNodes(newNodesList,
ReplicationType.RATIS, factor, pipelineName);
}
}
}
return null;
}
/**
* Creates a pipeline from a specified set of Nodes.
*
* @param pipelineID - Name of the pipeline
* @param datanodes - The list of datanodes that make this pipeline.
*/
@Override
public void createPipeline(String pipelineID,
List<DatanodeDetails> datanodes) {
public void initializePipeline(Pipeline pipeline) throws IOException {
//TODO:move the initialization from SCM to client
try (XceiverClientRatis client =
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
client.createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
}
}
/**

View File

@ -25,7 +25,6 @@
import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@ -86,29 +85,19 @@ public Pipeline allocatePipeline(ReplicationFactor factor) {
// once a datanode has been added to a pipeline, exclude it from
// further allocations
standAloneMembers.addAll(newNodesList);
LOG.info("Allocating a new standalone pipeline channel of size: {}",
count);
String channelName =
LOG.info("Allocating a new standalone pipeline of size: {}", count);
String pipelineName =
"SA-" + UUID.randomUUID().toString().substring(3);
return PipelineSelector.newPipelineFromNodes(newNodesList,
LifeCycleState.OPEN, ReplicationType.STAND_ALONE,
ReplicationFactor.ONE, channelName);
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
}
}
}
return null;
}
/**
* Creates a pipeline from a specified set of Nodes.
*
* @param pipelineID - Name of the pipeline
* @param datanodes - The list of datanodes that make this pipeline.
*/
@Override
public void createPipeline(String pipelineID,
List<DatanodeDetails> datanodes) {
//return newPipelineFromNodes(datanodes, pipelineID);
public void initializePipeline(Pipeline pipeline) {
// Nothing to be done for standalone pipeline
}
/**

View File

@ -26,6 +26,8 @@
.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.junit.AfterClass;
@ -51,6 +53,7 @@ public class TestNode2PipelineMap {
private static ContainerWithPipeline ratisContainer;
private static ContainerStateMap stateMap;
private static ContainerMapping mapping;
private static PipelineSelector pipelineSelector;
/**
* Create a MiniDFSCluster for testing.
@ -66,6 +69,7 @@ public static void init() throws Exception {
mapping = (ContainerMapping)scm.getScmContainerManager();
stateMap = mapping.getStateManager().getContainerStateMap();
ratisContainer = mapping.allocateContainer(RATIS, THREE, "testOwner");
pipelineSelector = mapping.getPipelineSelector();
}
/**
@ -113,5 +117,15 @@ public void testPipelineMap() throws IOException {
NavigableSet<ContainerID> set2 = stateMap.getOpenContainerIDsByPipeline(
ratisContainer.getPipeline().getPipelineName());
Assert.assertEquals(0, set2.size());
try {
pipelineSelector.updatePipelineState(ratisContainer.getPipeline(),
HddsProtos.LifeCycleEvent.CLOSE);
Assert.fail("closing of pipeline without finalize should fail");
} catch (Exception e) {
Assert.assertTrue(e instanceof SCMException);
Assert.assertEquals(((SCMException)e).getResult(),
SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE);
}
}
}