HDDS-399. Persist open pipeline information across SCM restart. Contributed by Mukul Kumar Singh.

This commit is contained in:
Nanda kumar 2018-09-17 21:51:54 +05:30
parent 9a265fa673
commit 846936698b
19 changed files with 506 additions and 320 deletions

View File

@ -86,6 +86,30 @@ public Pipeline(String leaderID, HddsProtos.LifeCycleState lifeCycleState,
datanodes = new TreeMap<>();
}
@Override
public int hashCode() {
return id.hashCode();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Pipeline that = (Pipeline) o;
return id.equals(that.id)
&& factor.equals(that.factor)
&& type.equals(that.type)
&& lifeCycleState.equals(that.lifeCycleState)
&& leaderID.equals(that.leaderID);
}
/**
* Gets pipeline object from protobuf.
*

View File

@ -90,7 +90,9 @@ public final class OzoneConsts {
* level DB names used by SCM and data nodes.
*/
public static final String CONTAINER_DB_SUFFIX = "container.db";
public static final String PIPELINE_DB_SUFFIX = "pipeline.db";
public static final String SCM_CONTAINER_DB = "scm-" + CONTAINER_DB_SUFFIX;
public static final String SCM_PIPELINE_DB = "scm-" + PIPELINE_DB_SUFFIX;
public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX;
public static final String DELETED_BLOCK_DB = "deletedBlock.db";
public static final String OM_DB_NAME = "om.db";

View File

@ -130,12 +130,13 @@ public ContainerMapping(
size = (long)conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
this.containerStateManager =
new ContainerStateManager(conf, this);
LOG.trace("Container State Manager created.");
this.pipelineSelector = new PipelineSelector(nodeManager,
containerStateManager, conf, eventPublisher);
conf, eventPublisher, cacheSizeMB);
this.containerStateManager =
new ContainerStateManager(conf, this, pipelineSelector);
LOG.trace("Container State Manager created.");
this.eventPublisher = eventPublisher;
@ -202,11 +203,6 @@ public ContainerWithPipeline getContainerWithPipeline(long containerID)
if (contInfo.isContainerOpen()) {
// If pipeline with given pipeline Id already exist return it
pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID());
if (pipeline == null) {
pipeline = pipelineSelector
.getReplicationPipeline(contInfo.getReplicationType(),
contInfo.getReplicationFactor());
}
} else {
// For close containers create pipeline from datanodes with replicas
Set<DatanodeDetails> dnWithReplicas = containerStateManager
@ -392,9 +388,8 @@ public HddsProtos.LifeCycleState updateContainerState(
ContainerInfo updatedContainer = containerStateManager
.updateContainerState(containerInfo, event);
if (!updatedContainer.isContainerOpen()) {
Pipeline pipeline = pipelineSelector
.getPipeline(containerInfo.getPipelineID());
pipelineSelector.closePipelineIfNoOpenContainers(pipeline);
pipelineSelector.removeContainerFromPipeline(
containerInfo.getPipelineID(), containerID);
}
containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
return updatedContainer.getState();
@ -474,11 +469,6 @@ public ContainerWithPipeline getMatchingContainerWithPipeline(
}
Pipeline pipeline = pipelineSelector
.getPipeline(containerInfo.getPipelineID());
if (pipeline == null) {
pipeline = pipelineSelector
.getReplicationPipeline(containerInfo.getReplicationType(),
containerInfo.getReplicationFactor());
}
return new ContainerWithPipeline(containerInfo, pipeline);
}

View File

@ -27,7 +27,6 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.container.states.ContainerState;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
@ -137,7 +136,7 @@ public class ContainerStateManager implements Closeable {
*/
@SuppressWarnings("unchecked")
public ContainerStateManager(Configuration configuration,
Mapping containerMapping) {
Mapping containerMapping, PipelineSelector pipelineSelector) {
// Initialize the container state machine.
Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
@ -159,10 +158,11 @@ public ContainerStateManager(Configuration configuration,
lastUsedMap = new ConcurrentHashMap<>();
containerCount = new AtomicLong(0);
containers = new ContainerStateMap();
loadExistingContainers(containerMapping);
loadExistingContainers(containerMapping, pipelineSelector);
}
private void loadExistingContainers(Mapping containerMapping) {
private void loadExistingContainers(Mapping containerMapping,
PipelineSelector pipelineSelector) {
List<ContainerInfo> containerList;
try {
@ -184,6 +184,8 @@ private void loadExistingContainers(Mapping containerMapping) {
long maxID = 0;
for (ContainerInfo container : containerList) {
containers.addContainer(container);
pipelineSelector.addContainerToPipeline(
container.getPipelineID(), container.getContainerID());
if (maxID < container.getContainerID()) {
maxID = container.getContainerID();
@ -303,6 +305,7 @@ public ContainerWithPipeline allocateContainer(PipelineSelector selector,
+ "replication=%s couldn't be found for the new container. "
+ "Do you have enough nodes?", type, replicationFactor);
long containerID = containerCount.incrementAndGet();
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(HddsProtos.LifeCycleState.ALLOCATED)
.setPipelineID(pipeline.getId())
@ -313,11 +316,12 @@ public ContainerWithPipeline allocateContainer(PipelineSelector selector,
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner(owner)
.setContainerID(containerCount.incrementAndGet())
.setContainerID(containerID)
.setDeleteTransactionId(0)
.setReplicationFactor(replicationFactor)
.setReplicationType(pipeline.getType())
.build();
selector.addContainerToPipeline(pipeline.getId(), containerID);
Preconditions.checkNotNull(containerInfo);
containers.addContainer(containerInfo);
LOG.trace("New container allocated: {}", containerInfo);
@ -470,17 +474,6 @@ public NavigableSet<ContainerID> getMatchingContainerIDs(
factor, type);
}
/**
* Returns a set of open ContainerIDs that reside on a pipeline.
*
* @param pipelineID PipelineID of the Containers.
* @return Set of containers that match the specific query parameters.
*/
public NavigableSet<ContainerID> getMatchingContainerIDsByPipeline(PipelineID
pipelineID) {
return containers.getOpenContainerIDsByPipeline(pipelineID);
}
/**
* Returns the containerInfo with pipeline for the given container id.
* @param selector -- Pipeline selector class.

View File

@ -25,7 +25,6 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@ -75,9 +74,6 @@
* Replica and THREE Replica. User can specify how many copies should be made
* for a ozone key.
* <p>
* 5.Pipeline - The pipeline constitute the set of Datanodes on which the
* open container resides physically.
* <p>
* The most common access pattern of this class is to select a container based
* on all these parameters, for example, when allocating a block we will
* select a container that belongs to user1, with Ratis replication which can
@ -92,14 +88,6 @@ public class ContainerStateMap {
private final ContainerAttribute<String> ownerMap;
private final ContainerAttribute<ReplicationFactor> factorMap;
private final ContainerAttribute<ReplicationType> typeMap;
// This map constitutes the pipeline to open container mappings.
// This map will be queried for the list of open containers on a particular
// pipeline and issue a close on corresponding containers in case of
// following events:
//1. Dead datanode.
//2. Datanode out of space.
//3. Volume loss or volume out of space.
private final ContainerAttribute<PipelineID> openPipelineMap;
private final Map<ContainerID, ContainerInfo> containerMap;
// Map to hold replicas of given container.
@ -121,7 +109,6 @@ public ContainerStateMap() {
ownerMap = new ContainerAttribute<>();
factorMap = new ContainerAttribute<>();
typeMap = new ContainerAttribute<>();
openPipelineMap = new ContainerAttribute<>();
containerMap = new HashMap<>();
lock = new ReentrantReadWriteLock();
contReplicaMap = new HashMap<>();
@ -158,9 +145,6 @@ public void addContainer(ContainerInfo info)
ownerMap.insert(info.getOwner(), id);
factorMap.insert(info.getReplicationFactor(), id);
typeMap.insert(info.getReplicationType(), id);
if (info.isContainerOpen()) {
openPipelineMap.insert(info.getPipelineID(), id);
}
// Flush the cache of this container type, will be added later when
// get container queries are executed.
@ -391,11 +375,6 @@ public void updateState(ContainerInfo info, LifeCycleState currentState,
throw new SCMException("Updating the container map failed.", ex,
FAILED_TO_CHANGE_CONTAINER_STATE);
}
// In case the container is set to closed state, it needs to be removed
// from the pipeline Map.
if (!info.isContainerOpen()) {
openPipelineMap.remove(info.getPipelineID(), id);
}
} finally {
lock.writeLock().unlock();
}
@ -433,23 +412,6 @@ NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) {
}
}
/**
* Returns Open containers in the SCM by the Pipeline.
*
* @param pipelineID - Pipeline id.
* @return NavigableSet<ContainerID>
*/
public NavigableSet<ContainerID> getOpenContainerIDsByPipeline(
PipelineID pipelineID) {
Preconditions.checkNotNull(pipelineID);
lock.readLock().lock();
try {
return openPipelineMap.getCollection(pipelineID);
} finally {
lock.readLock().unlock();
}
}
/**
* Returns Containers by replication factor.
*

View File

@ -16,11 +16,10 @@
*/
package org.apache.hadoop.hdds.scm.pipelines;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Map;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
@ -37,17 +36,53 @@
public abstract class PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineManager.class);
private final List<PipelineID> activePipelines;
private final Map<PipelineID, Pipeline> pipelineMap;
private final AtomicInteger pipelineIndex;
private final Node2PipelineMap node2PipelineMap;
private final ArrayList<ActivePipelines> activePipelines;
public PipelineManager(Node2PipelineMap map,
Map<PipelineID, Pipeline> pipelineMap) {
activePipelines = new LinkedList<>();
pipelineIndex = new AtomicInteger(0);
this.pipelineMap = pipelineMap;
this.node2PipelineMap = map;
public PipelineManager() {
activePipelines = new ArrayList<>();
for (ReplicationFactor factor : ReplicationFactor.values()) {
activePipelines.add(factor.ordinal(), new ActivePipelines());
}
}
private static class ActivePipelines {
private final List<PipelineID> activePipelines;
private final AtomicInteger pipelineIndex;
ActivePipelines() {
activePipelines = new LinkedList<>();
pipelineIndex = new AtomicInteger(0);
}
void addPipeline(PipelineID pipelineID) {
activePipelines.add(pipelineID);
}
void removePipeline(PipelineID pipelineID) {
activePipelines.remove(pipelineID);
}
/**
* Find a Pipeline that is operational.
*
* @return - Pipeline or null
*/
PipelineID findOpenPipeline() {
if (activePipelines.size() == 0) {
LOG.error("No Operational pipelines found. Returning null.");
return null;
}
return activePipelines.get(getNextIndex());
}
/**
* gets the next index of the Pipeline to get.
*
* @return index in the link list to get.
*/
private int getNextIndex() {
return pipelineIndex.incrementAndGet() % activePipelines.size();
}
}
/**
@ -59,44 +94,30 @@ public PipelineManager(Node2PipelineMap map,
* @param replicationFactor - Replication Factor
* @return a Pipeline.
*/
public synchronized final Pipeline getPipeline(
public synchronized final PipelineID getPipeline(
ReplicationFactor replicationFactor, ReplicationType replicationType) {
Pipeline pipeline = findOpenPipeline(replicationType, replicationFactor);
if (pipeline != null) {
PipelineID id =
activePipelines.get(replicationFactor.ordinal()).findOpenPipeline();
if (id != null) {
LOG.debug("re-used pipeline:{} for container with " +
"replicationType:{} replicationFactor:{}",
pipeline.getId(), replicationType, replicationFactor);
id, replicationType, replicationFactor);
}
if (pipeline == null) {
if (id == null) {
LOG.error("Get pipeline call failed. We are not able to find" +
" operational pipeline.");
return null;
} else {
return pipeline;
return id;
}
}
/**
* This function to get pipeline with given pipeline name.
*
* @param id
* @return a Pipeline.
*/
public synchronized final Pipeline getPipeline(PipelineID id) {
Pipeline pipeline = null;
// 1. Check if pipeline already exists
if (pipelineMap.containsKey(id)) {
pipeline = pipelineMap.get(id);
LOG.debug("Returning pipeline for pipelineName:{}", id);
return pipeline;
} else {
LOG.debug("Unable to find pipeline for pipelineName:{}", id);
}
return pipeline;
void addOpenPipeline(Pipeline pipeline) {
activePipelines.get(pipeline.getFactor().ordinal())
.addPipeline(pipeline.getId());
}
protected int getReplicationCount(ReplicationFactor factor) {
protected static int getReplicationCount(ReplicationFactor factor) {
switch (factor) {
case ONE:
return 1;
@ -116,46 +137,6 @@ public abstract Pipeline allocatePipeline(
*/
public abstract void initializePipeline(Pipeline pipeline) throws IOException;
/**
* Find a Pipeline that is operational.
*
* @return - Pipeline or null
*/
private Pipeline findOpenPipeline(
ReplicationType type, ReplicationFactor factor) {
Pipeline pipeline = null;
final int sentinal = -1;
if (activePipelines.size() == 0) {
LOG.error("No Operational pipelines found. Returning null.");
return null;
}
int startIndex = getNextIndex();
int nextIndex = sentinal;
for (; startIndex != nextIndex; nextIndex = getNextIndex()) {
// Just walk the list in a circular way.
PipelineID id =
activePipelines
.get(nextIndex != sentinal ? nextIndex : startIndex);
Pipeline temp = pipelineMap.get(id);
// if we find an operational pipeline just return that.
if ((temp.getLifeCycleState() == LifeCycleState.OPEN) &&
(temp.getFactor() == factor) && (temp.getType() == type)) {
pipeline = temp;
break;
}
}
return pipeline;
}
/**
* gets the next index of the Pipeline to get.
*
* @return index in the link list to get.
*/
private int getNextIndex() {
return pipelineIndex.incrementAndGet() % activePipelines.size();
}
/**
* Creates a pipeline with a specified replication factor and type.
* @param replicationFactor - Replication Factor.
@ -168,9 +149,6 @@ public Pipeline createPipeline(ReplicationFactor replicationFactor,
LOG.debug("created new pipeline:{} for container with "
+ "replicationType:{} replicationFactor:{}",
pipeline.getId(), replicationType, replicationFactor);
activePipelines.add(pipeline.getId());
pipelineMap.put(pipeline.getId(), pipeline);
node2PipelineMap.addPipeline(pipeline);
}
return pipeline;
}
@ -180,17 +158,15 @@ public Pipeline createPipeline(ReplicationFactor replicationFactor,
* @param pipeline pipeline to be finalized
*/
public synchronized void finalizePipeline(Pipeline pipeline) {
activePipelines.remove(pipeline.getId());
activePipelines.get(pipeline.getFactor().ordinal())
.removePipeline(pipeline.getId());
}
/**
*
* @param pipeline
*/
public void closePipeline(Pipeline pipeline) throws IOException {
pipelineMap.remove(pipeline.getId());
node2PipelineMap.removePipeline(pipeline);
}
public abstract void closePipeline(Pipeline pipeline) throws IOException;
/**
* list members in the pipeline.

View File

@ -21,7 +21,6 @@
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
@ -39,30 +38,34 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.common.statemachine
.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.lease.Lease;
import org.apache.hadoop.ozone.lease.LeaseException;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.List;
import java.util.NavigableSet;
import java.util.HashMap;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.FAILED_TO_CHANGE_PIPELINE_STATE;
import static org.apache.hadoop.hdds.server
.ServerUtils.getOzoneMetaDirPath;
import static org.apache.hadoop.ozone
.OzoneConsts.SCM_PIPELINE_DB;
/**
* Sends the request to the right pipeline manager.
@ -73,16 +76,16 @@ public class PipelineSelector {
private final ContainerPlacementPolicy placementPolicy;
private final NodeManager nodeManager;
private final Configuration conf;
private final ContainerStateManager containerStateManager;
private final EventPublisher eventPublisher;
private final RatisManagerImpl ratisManager;
private final StandaloneManagerImpl standaloneManager;
private final long containerSize;
private final MetadataStore pipelineStore;
private final PipelineStateManager stateManager;
private final Node2PipelineMap node2PipelineMap;
private final Map<PipelineID, HashSet<ContainerID>> pipeline2ContainerMap;
private final Map<PipelineID, Pipeline> pipelineMap;
private final LeaseManager<Pipeline> pipelineLeaseManager;
private final StateMachine<LifeCycleState,
HddsProtos.LifeCycleEvent> stateMachine;
/**
* Constructs a pipeline Selector.
@ -90,9 +93,8 @@ public class PipelineSelector {
* @param nodeManager - node manager
* @param conf - Ozone Config
*/
public PipelineSelector(NodeManager nodeManager,
ContainerStateManager containerStateManager, Configuration conf,
EventPublisher eventPublisher) {
public PipelineSelector(NodeManager nodeManager, Configuration conf,
EventPublisher eventPublisher, int cacheSizeMB) throws IOException {
this.nodeManager = nodeManager;
this.conf = conf;
this.eventPublisher = eventPublisher;
@ -105,79 +107,66 @@ public PipelineSelector(NodeManager nodeManager,
pipelineMap = new ConcurrentHashMap<>();
this.standaloneManager =
new StandaloneManagerImpl(this.nodeManager, placementPolicy,
containerSize, node2PipelineMap, pipelineMap);
containerSize);
this.ratisManager =
new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
conf, node2PipelineMap, pipelineMap);
// Initialize the container state machine.
Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
conf);
long pipelineCreationLeaseTimeout = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
this.containerStateManager = containerStateManager;
pipelineLeaseManager = new LeaseManager<>("PipelineCreation",
pipelineCreationLeaseTimeout);
pipelineLeaseManager.start();
// These are the steady states of a container.
finalStates.add(HddsProtos.LifeCycleState.OPEN);
finalStates.add(HddsProtos.LifeCycleState.CLOSED);
stateManager = new PipelineStateManager();
pipeline2ContainerMap = new HashMap<>();
this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
finalStates);
initializeStateMachine();
// Write the container name to pipeline mapping.
File metaDir = getOzoneMetaDirPath(conf);
File containerDBPath = new File(metaDir, SCM_PIPELINE_DB);
pipelineStore = MetadataStoreBuilder.newBuilder()
.setConf(conf)
.setDbFile(containerDBPath)
.setCacheSize(cacheSizeMB * OzoneConsts.MB)
.build();
reloadExistingPipelines();
}
/**
* Event and State Transition Mapping.
*
* State: ALLOCATED ---------------> CREATING
* Event: CREATE
*
* State: CREATING ---------------> OPEN
* Event: CREATED
*
* State: OPEN ---------------> CLOSING
* Event: FINALIZE
*
* State: CLOSING ---------------> CLOSED
* Event: CLOSE
*
* State: CREATING ---------------> CLOSED
* Event: TIMEOUT
*
*
* Container State Flow:
*
* [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
* (CREATE) | (CREATED) (FINALIZE) |
* | |
* | |
* |(TIMEOUT) |(CLOSE)
* | |
* +--------> [CLOSED] <--------+
*/
private void initializeStateMachine() {
stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
HddsProtos.LifeCycleState.CREATING,
HddsProtos.LifeCycleEvent.CREATE);
private void reloadExistingPipelines() throws IOException {
if (pipelineStore.isEmpty()) {
// Nothing to do just return
return;
}
stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
HddsProtos.LifeCycleState.OPEN,
HddsProtos.LifeCycleEvent.CREATED);
List<Map.Entry<byte[], byte[]>> range =
pipelineStore.getSequentialRangeKVs(null, Integer.MAX_VALUE, null);
stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
HddsProtos.LifeCycleState.CLOSING,
HddsProtos.LifeCycleEvent.FINALIZE);
// Transform the values into the pipelines.
// TODO: filter by pipeline state
for (Map.Entry<byte[], byte[]> entry : range) {
Pipeline pipeline = Pipeline.getFromProtoBuf(
HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
Preconditions.checkNotNull(pipeline);
addExistingPipeline(pipeline);
}
}
stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
HddsProtos.LifeCycleState.CLOSED,
HddsProtos.LifeCycleEvent.CLOSE);
public Set<ContainerID> getOpenContainerIDsByPipeline(PipelineID pipelineID) {
return pipeline2ContainerMap.get(pipelineID);
}
stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
HddsProtos.LifeCycleState.CLOSED,
HddsProtos.LifeCycleEvent.TIMEOUT);
public void addContainerToPipeline(PipelineID pipelineID, long containerID) {
pipeline2ContainerMap.get(pipelineID)
.add(ContainerID.valueof(containerID));
}
public void removeContainerFromPipeline(PipelineID pipelineID,
long containerID) throws IOException {
pipeline2ContainerMap.get(pipelineID)
.remove(ContainerID.valueof(containerID));
closePipelineIfNoOpenContainers(pipelineMap.get(pipelineID));
}
/**
@ -294,8 +283,14 @@ public Pipeline getReplicationPipeline(ReplicationType replicationType,
manager.createPipeline(replicationFactor, replicationType);
if (pipeline == null) {
// try to return a pipeline from already allocated pipelines
pipeline = manager.getPipeline(replicationFactor, replicationType);
PipelineID pipelineId =
manager.getPipeline(replicationFactor, replicationType);
pipeline = pipelineMap.get(pipelineId);
Preconditions.checkArgument(pipeline.getLifeCycleState() ==
LifeCycleState.OPEN);
} else {
pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
pipeline.getProtobufMessage().toByteArray());
// if a new pipeline is created, initialize its state machine
updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATE);
@ -343,8 +338,8 @@ public void closePipelineIfNoOpenContainers(Pipeline pipeline)
if (pipeline.getLifeCycleState() != LifeCycleState.CLOSING) {
return;
}
NavigableSet<ContainerID> containerIDS = containerStateManager
.getMatchingContainerIDsByPipeline(pipeline.getId());
HashSet<ContainerID> containerIDS =
pipeline2ContainerMap.get(pipeline.getId());
if (containerIDS.size() == 0) {
updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CLOSE);
LOG.info("Closing pipeline. pipelineID: {}", pipeline.getId());
@ -358,56 +353,58 @@ private void closePipeline(Pipeline pipeline) throws IOException {
PipelineManager manager = getPipelineManager(pipeline.getType());
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getId());
NavigableSet<ContainerID> containers =
containerStateManager
.getMatchingContainerIDsByPipeline(pipeline.getId());
HashSet<ContainerID> containers =
pipeline2ContainerMap.get(pipeline.getId());
Preconditions.checkArgument(containers.size() == 0);
manager.closePipeline(pipeline);
}
/**
* Add to a given pipeline.
*/
private void addOpenPipeline(Pipeline pipeline) {
PipelineManager manager = getPipelineManager(pipeline.getType());
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Adding Open pipeline. pipelineID: {}", pipeline.getId());
manager.addOpenPipeline(pipeline);
}
private void closeContainersByPipeline(Pipeline pipeline) {
NavigableSet<ContainerID> containers =
containerStateManager
.getMatchingContainerIDsByPipeline(pipeline.getId());
HashSet<ContainerID> containers =
pipeline2ContainerMap.get(pipeline.getId());
for (ContainerID id : containers) {
eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id);
}
}
/**
* list members in the pipeline .
*/
public List<DatanodeDetails> getDatanodes(ReplicationType replicationType,
PipelineID pipelineID) throws IOException {
PipelineManager manager = getPipelineManager(replicationType);
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Getting data nodes from pipeline : {}", pipelineID);
return manager.getMembers(pipelineID);
}
/**
* Update the datanodes in the list of the pipeline.
*/
public void updateDatanodes(ReplicationType replicationType, PipelineID
pipelineID, List<DatanodeDetails> newDatanodes) throws IOException {
PipelineManager manager = getPipelineManager(replicationType);
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Updating pipeline: {} with new nodes:{}", pipelineID,
newDatanodes.stream().map(DatanodeDetails::toString)
.collect(Collectors.joining(",")));
manager.updatePipeline(pipelineID, newDatanodes);
}
public Node2PipelineMap getNode2PipelineMap() {
return node2PipelineMap;
}
public Set<PipelineID> getPipelineId(UUID dnId) {
return node2PipelineMap.getPipelines(dnId);
}
private void addExistingPipeline(Pipeline pipeline) throws IOException {
LifeCycleState state = pipeline.getLifeCycleState();
switch (state) {
case ALLOCATED:
// a pipeline in allocated state is only present in SCM and does not exist
// on datanode, on SCM restart, this pipeline can be ignored.
break;
case CREATING:
case OPEN:
case CLOSING:
//TODO: process pipeline report and move pipeline to active queue
// when all the nodes have reported.
pipelineMap.put(pipeline.getId(), pipeline);
pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>());
node2PipelineMap.addPipeline(pipeline);
break;
case CLOSED:
// if the pipeline is in closed state, nothing to do.
break;
default:
throw new IOException("invalid pipeline state:" + state);
}
}
/**
* Update the Pipeline State to the next state.
*
@ -417,24 +414,12 @@ public Set<PipelineID> getPipelineId(UUID dnId) {
*/
public void updatePipelineState(Pipeline pipeline,
HddsProtos.LifeCycleEvent event) throws IOException {
HddsProtos.LifeCycleState newState;
try {
newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event);
} catch (InvalidStateTransitionException ex) {
String error = String.format("Failed to update pipeline state %s, " +
"reason: invalid state transition from state: %s upon " +
"event: %s.",
pipeline.getId(), pipeline.getLifeCycleState(), event);
LOG.error(error);
throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
}
// This is a post condition after executing getNextState.
Preconditions.checkNotNull(newState);
Preconditions.checkNotNull(pipeline);
try {
switch (event) {
case CREATE:
pipelineMap.put(pipeline.getId(), pipeline);
pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>());
node2PipelineMap.addPipeline(pipeline);
// Acquire lease on pipeline
Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline);
// Register callback to be executed in case of timeout
@ -446,6 +431,7 @@ public void updatePipelineState(Pipeline pipeline,
case CREATED:
// Release the lease on pipeline
pipelineLeaseManager.release(pipeline);
addOpenPipeline(pipeline);
break;
case FINALIZE:
@ -455,21 +441,30 @@ public void updatePipelineState(Pipeline pipeline,
case CLOSE:
case TIMEOUT:
closePipeline(pipeline);
pipeline2ContainerMap.remove(pipeline.getId());
node2PipelineMap.removePipeline(pipeline);
pipelineMap.remove(pipeline.getId());
break;
default:
throw new SCMException("Unsupported pipeline LifeCycleEvent.",
FAILED_TO_CHANGE_PIPELINE_STATE);
}
pipeline.setLifeCycleState(newState);
stateManager.updatePipelineState(pipeline, event);
pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
pipeline.getProtobufMessage().toByteArray());
} catch (LeaseException e) {
throw new IOException("Lease Exception.", e);
}
}
public void shutdown() {
public void shutdown() throws IOException {
if (pipelineLeaseManager != null) {
pipelineLeaseManager.shutdown();
}
if (pipelineStore != null) {
pipelineStore.close();
}
}
}

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.pipelines;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.common.statemachine
.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.FAILED_TO_CHANGE_PIPELINE_STATE;
/**
* Manages Pipeline states.
*/
public class PipelineStateManager {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineStateManager.class);
private final StateMachine<HddsProtos.LifeCycleState,
HddsProtos.LifeCycleEvent> stateMachine;
PipelineStateManager() {
// Initialize the container state machine.
Set<HddsProtos.LifeCycleState> finalStates = new HashSet<>();
// These are the steady states of a container.
finalStates.add(HddsProtos.LifeCycleState.OPEN);
finalStates.add(HddsProtos.LifeCycleState.CLOSED);
this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
finalStates);
initializeStateMachine();
}
/**
* Event and State Transition Mapping.
*
* State: ALLOCATED ---------------> CREATING
* Event: CREATE
*
* State: CREATING ---------------> OPEN
* Event: CREATED
*
* State: OPEN ---------------> CLOSING
* Event: FINALIZE
*
* State: CLOSING ---------------> CLOSED
* Event: CLOSE
*
* State: CREATING ---------------> CLOSED
* Event: TIMEOUT
*
*
* Container State Flow:
*
* [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
* (CREATE) | (CREATED) (FINALIZE) |
* | |
* | |
* |(TIMEOUT) |(CLOSE)
* | |
* +--------> [CLOSED] <--------+
*/
private void initializeStateMachine() {
stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
HddsProtos.LifeCycleState.CREATING,
HddsProtos.LifeCycleEvent.CREATE);
stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
HddsProtos.LifeCycleState.OPEN,
HddsProtos.LifeCycleEvent.CREATED);
stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
HddsProtos.LifeCycleState.CLOSING,
HddsProtos.LifeCycleEvent.FINALIZE);
stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
HddsProtos.LifeCycleState.CLOSED,
HddsProtos.LifeCycleEvent.CLOSE);
stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
HddsProtos.LifeCycleState.CLOSED,
HddsProtos.LifeCycleEvent.TIMEOUT);
}
/**
* Update the Pipeline State to the next state.
*
* @param pipeline - Pipeline
* @param event - LifeCycle Event
* @throws SCMException on Failure.
*/
public void updatePipelineState(Pipeline pipeline,
HddsProtos.LifeCycleEvent event) throws IOException {
HddsProtos.LifeCycleState newState;
try {
newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event);
} catch (InvalidStateTransitionException ex) {
String error = String.format("Failed to update pipeline state %s, " +
"reason: invalid state transition from state: %s upon " +
"event: %s.",
pipeline.getId(), pipeline.getLifeCycleState(), event);
LOG.error(error);
throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
}
// This is a post condition after executing getNextState.
Preconditions.checkNotNull(newState);
Preconditions.checkNotNull(pipeline);
pipeline.setLifeCycleState(newState);
}
}

View File

@ -24,7 +24,6 @@
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -39,7 +38,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.Map;
/**
* Implementation of {@link PipelineManager}.
@ -59,9 +57,8 @@ public class RatisManagerImpl extends PipelineManager {
* @param nodeManager
*/
public RatisManagerImpl(NodeManager nodeManager,
ContainerPlacementPolicy placementPolicy, long size, Configuration conf,
Node2PipelineMap map, Map<PipelineID, Pipeline> pipelineMap) {
super(map, pipelineMap);
ContainerPlacementPolicy placementPolicy, long size, Configuration conf) {
super();
this.conf = conf;
this.nodeManager = nodeManager;
ratisMembers = new HashSet<>();
@ -114,7 +111,6 @@ public void closePipeline(Pipeline pipeline) throws IOException {
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
client.destroyPipeline();
}
super.closePipeline(pipeline);
for (DatanodeDetails node : pipeline.getMachines()) {
// A node should always be the in ratis members list.
Preconditions.checkArgument(ratisMembers.remove(node));

View File

@ -22,7 +22,6 @@
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -37,7 +36,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.Map;
/**
* Standalone Manager Impl to prove that pluggable interface
@ -58,9 +56,8 @@ public class StandaloneManagerImpl extends PipelineManager {
* @param containerSize - Container Size.
*/
public StandaloneManagerImpl(NodeManager nodeManager,
ContainerPlacementPolicy placementPolicy, long containerSize,
Node2PipelineMap map, Map<PipelineID, Pipeline> pipelineMap) {
super(map, pipelineMap);
ContainerPlacementPolicy placementPolicy, long containerSize) {
super();
this.nodeManager = nodeManager;
this.placementPolicy = placementPolicy;
this.containerSize = containerSize;
@ -105,7 +102,6 @@ public void initializePipeline(Pipeline pipeline) {
* Close the pipeline.
*/
public void closePipeline(Pipeline pipeline) throws IOException {
super.closePipeline(pipeline);
for (DatanodeDetails node : pipeline.getMachines()) {
// A node should always be the in standalone members list.
Preconditions.checkArgument(standAloneMembers.remove(node));

View File

@ -75,6 +75,7 @@ public void test() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
Mapping mapping = Mockito.mock(Mapping.class);
PipelineSelector selector = Mockito.mock(PipelineSelector.class);
when(mapping.getContainer(anyLong()))
.thenAnswer(
@ -87,7 +88,7 @@ public void test() throws IOException {
);
ContainerStateManager containerStateManager =
new ContainerStateManager(conf, mapping);
new ContainerStateManager(conf, mapping, selector);
when(mapping.getStateManager()).thenReturn(containerStateManager);

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -41,7 +42,8 @@ public class TestContainerStateManager {
public void init() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
Mapping mapping = Mockito.mock(Mapping.class);
containerStateManager = new ContainerStateManager(conf, mapping);
PipelineSelector selector = Mockito.mock(PipelineSelector.class);
containerStateManager = new ContainerStateManager(conf, mapping, selector);
}

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.junit.Assert;
@ -60,7 +61,8 @@ public void testOnMessage() throws IOException {
Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
ContainerStateManager containerStateManager = new ContainerStateManager(
new OzoneConfiguration(),
Mockito.mock(Mapping.class)
Mockito.mock(Mapping.class),
Mockito.mock(PipelineSelector.class)
);
ContainerInfo container1 =

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@ -54,8 +55,9 @@ public class TestContainerStateManagerIntegration {
private MiniOzoneCluster cluster;
private XceiverClientManager xceiverClientManager;
private StorageContainerManager scm;
private Mapping scmContainerMapping;
private ContainerMapping scmContainerMapping;
private ContainerStateManager containerStateManager;
private PipelineSelector selector;
private String containerOwner = "OZONE";
@ -66,8 +68,9 @@ public void setup() throws Exception {
cluster.waitForClusterToBeReady();
xceiverClientManager = new XceiverClientManager(conf);
scm = cluster.getStorageContainerManager();
scmContainerMapping = scm.getScmContainerManager();
scmContainerMapping = (ContainerMapping) scm.getScmContainerManager();
containerStateManager = scmContainerMapping.getStateManager();
selector = scmContainerMapping.getPipelineSelector();
}
@After
@ -133,8 +136,7 @@ public void testContainerStateManagerRestart() throws IOException {
// New instance of ContainerStateManager should load all the containers in
// container store.
ContainerStateManager stateManager =
new ContainerStateManager(conf, scmContainerMapping
);
new ContainerStateManager(conf, scmContainerMapping, selector);
int matchCount = stateManager
.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),

View File

@ -30,14 +30,13 @@
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
@ -60,8 +59,8 @@ public class TestNode2PipelineMap {
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
@Before
public void init() throws Exception {
conf = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
cluster.waitForClusterToBeReady();
@ -75,8 +74,8 @@ public static void init() throws Exception {
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
@After
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
@ -86,19 +85,20 @@ public static void shutdown() {
@Test
public void testPipelineMap() throws IOException {
NavigableSet<ContainerID> set = stateMap.getOpenContainerIDsByPipeline(
Set<ContainerID> set = pipelineSelector.getOpenContainerIDsByPipeline(
ratisContainer.getPipeline().getId());
long cId = ratisContainer.getContainerInfo().getContainerID();
Assert.assertEquals(1, set.size());
Assert.assertEquals(cId, set.first().getId());
set.forEach(containerID ->
Assert.assertEquals(containerID, ContainerID.valueof(cId)));
List<DatanodeDetails> dns = ratisContainer.getPipeline().getMachines();
Assert.assertEquals(3, dns.size());
// get pipeline details by dnid
Set<PipelineID> pipelines = mapping.getPipelineSelector()
.getNode2PipelineMap().getPipelines(dns.get(0).getUuid());
.getPipelineId(dns.get(0).getUuid());
Assert.assertEquals(1, pipelines.size());
pipelines.forEach(p -> Assert.assertEquals(p,
ratisContainer.getPipeline().getId()));
@ -114,7 +114,7 @@ public void testPipelineMap() throws IOException {
.updateContainerState(cId, HddsProtos.LifeCycleEvent.FINALIZE);
mapping
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
NavigableSet<ContainerID> set2 = stateMap.getOpenContainerIDsByPipeline(
Set<ContainerID> set2 = pipelineSelector.getOpenContainerIDsByPipeline(
ratisContainer.getPipeline().getId());
Assert.assertEquals(0, set2.size());

View File

@ -35,7 +35,7 @@
import org.junit.Test;
import java.io.IOException;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
@ -88,12 +88,13 @@ public static void shutdown() {
@Test
public void testPipelineCloseWithClosedContainer() throws IOException {
NavigableSet<ContainerID> set = stateMap.getOpenContainerIDsByPipeline(
Set<ContainerID> set = pipelineSelector.getOpenContainerIDsByPipeline(
ratisContainer1.getPipeline().getId());
long cId = ratisContainer1.getContainerInfo().getContainerID();
Assert.assertEquals(1, set.size());
Assert.assertEquals(cId, set.first().getId());
set.forEach(containerID ->
Assert.assertEquals(containerID, ContainerID.valueof(cId)));
// Now close the container and it should not show up while fetching
// containers by pipeline
@ -106,7 +107,7 @@ public void testPipelineCloseWithClosedContainer() throws IOException {
mapping
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
NavigableSet<ContainerID> setClosed = stateMap.getOpenContainerIDsByPipeline(
Set<ContainerID> setClosed = pipelineSelector.getOpenContainerIDsByPipeline(
ratisContainer1.getPipeline().getId());
Assert.assertEquals(0, setClosed.size());
@ -118,15 +119,15 @@ public void testPipelineCloseWithClosedContainer() throws IOException {
HddsProtos.LifeCycleState.CLOSED);
for (DatanodeDetails dn : ratisContainer1.getPipeline().getMachines()) {
// Assert that the pipeline has been removed from Node2PipelineMap as well
Assert.assertEquals(pipelineSelector.getNode2PipelineMap()
.getPipelines(dn.getUuid()).size(), 0);
Assert.assertEquals(pipelineSelector.getPipelineId(
dn.getUuid()).size(), 0);
}
}
@Test
public void testPipelineCloseWithOpenContainer() throws IOException,
TimeoutException, InterruptedException {
NavigableSet<ContainerID> setOpen = stateMap.getOpenContainerIDsByPipeline(
Set<ContainerID> setOpen = pipelineSelector.getOpenContainerIDsByPipeline(
ratisContainer2.getPipeline().getId());
Assert.assertEquals(1, setOpen.size());

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static org.apache.hadoop.hdds.protocol.proto
.HddsProtos.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.protocol.proto
.HddsProtos.ReplicationType.RATIS;
/**
* Test SCM restart and recovery wrt pipelines.
*/
public class TestSCMRestart {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static Pipeline ratisPipeline1;
private static Pipeline ratisPipeline2;
private static ContainerMapping mapping;
private static ContainerMapping newMapping;
/**
* Create a MiniDFSCluster for testing.
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(6)
.setHbInterval(1000)
.setHbProcessorInterval(1000)
.build();
cluster.waitForClusterToBeReady();
StorageContainerManager scm = cluster.getStorageContainerManager();
mapping = (ContainerMapping)scm.getScmContainerManager();
ratisPipeline1 =
mapping.allocateContainer(RATIS, THREE, "Owner1").getPipeline();
ratisPipeline2 =
mapping.allocateContainer(RATIS, ONE, "Owner2").getPipeline();
// At this stage, there should be 2 pipeline one with 1 open container
// each. Try restarting the SCM and then discover that pipeline are in
// correct state.
cluster.restartStorageContainerManager();
newMapping = (ContainerMapping)(cluster.getStorageContainerManager()
.getScmContainerManager());
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testPipelineWithScmRestart() {
// After restart make sure that the pipeline are still present
Pipeline ratisPipeline1AfterRestart = newMapping.getPipelineSelector()
.getPipeline(ratisPipeline1.getId());
Pipeline ratisPipeline2AfterRestart = newMapping.getPipelineSelector()
.getPipeline(ratisPipeline2.getId());
Assert.assertNotSame(ratisPipeline1AfterRestart, ratisPipeline1);
Assert.assertNotSame(ratisPipeline2AfterRestart, ratisPipeline2);
Assert.assertEquals(ratisPipeline1AfterRestart, ratisPipeline1);
Assert.assertEquals(ratisPipeline2AfterRestart, ratisPipeline2);
}
}

View File

@ -138,8 +138,11 @@ static Builder newBuilder(OzoneConfiguration conf) {
* Restarts StorageContainerManager instance.
*
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
void restartStorageContainerManager() throws IOException;
void restartStorageContainerManager() throws InterruptedException,
TimeoutException, IOException;
/**
* Restarts OzoneManager instance.

View File

@ -85,7 +85,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
LoggerFactory.getLogger(MiniOzoneClusterImpl.class);
private final OzoneConfiguration conf;
private final StorageContainerManager scm;
private StorageContainerManager scm;
private final OzoneManager ozoneManager;
private final List<HddsDatanodeService> hddsDatanodes;
@ -215,9 +215,13 @@ public OzoneClient getRestClient() throws IOException {
}
@Override
public void restartStorageContainerManager() throws IOException {
public void restartStorageContainerManager()
throws TimeoutException, InterruptedException, IOException {
scm.stop();
scm.join();
scm = StorageContainerManager.createSCM(null, conf);
scm.start();
waitForClusterToBeReady();
}
@Override