HDDS-277. PipelineStateMachine should handle closure of pipelines in SCM. Contributed by Mukul Kumar Singh.

This commit is contained in:
Xiaoyu Yao 2018-07-26 13:15:27 -07:00
parent be150a17b1
commit fd31cb6cfe
18 changed files with 332 additions and 81 deletions

View File

@ -459,12 +459,13 @@ public ContainerInfo build() {
/**
* Check if a container is in open state, this will check if the
* container is either open or allocated or creating. Any containers in
* these states is managed as an open container by SCM.
* container is either open, allocated, creating or creating.
* Any containers in these states is managed as an open container by SCM.
*/
public boolean isContainerOpen() {
return state == HddsProtos.LifeCycleState.ALLOCATED ||
state == HddsProtos.LifeCycleState.CREATING ||
state == HddsProtos.LifeCycleState.OPEN;
state == HddsProtos.LifeCycleState.OPEN ||
state == HddsProtos.LifeCycleState.CLOSING;
}
}

View File

@ -21,7 +21,6 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
@ -63,13 +62,13 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) {
containerManager.getContainerWithPipeline(containerID.getId());
info = containerWithPipeline.getContainerInfo();
if (info == null) {
LOG.info("Failed to update the container state. Container with id : {} "
LOG.error("Failed to update the container state. Container with id : {} "
+ "does not exist", containerID.getId());
return;
}
} catch (IOException e) {
LOG.info("Failed to update the container state. Container with id : {} "
+ "does not exist", containerID.getId());
LOG.error("Failed to update the container state. Container with id : {} "
+ "does not exist", containerID.getId(), e);
return;
}
@ -85,11 +84,22 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) {
try {
// Finalize event will make sure the state of the container transitions
// from OPEN to CLOSING in containerStateManager.
containerManager.getStateManager()
.updateContainerState(info, HddsProtos.LifeCycleEvent.FINALIZE);
} catch (SCMException ex) {
LOG.error("Failed to update the container state for container : {}"
+ containerID);
containerManager.updateContainerState(containerID.getId(),
HddsProtos.LifeCycleEvent.FINALIZE);
} catch (IOException ex) {
LOG.error("Failed to update the container state to FINALIZE for"
+ "container : {}" + containerID, ex);
}
} else if (info.getState() == HddsProtos.LifeCycleState.ALLOCATED) {
try {
// Create event will make sure the state of the container transitions
// from OPEN to CREATING in containerStateManager, this will move
// the container out of active allocation path.
containerManager.updateContainerState(containerID.getId(),
HddsProtos.LifeCycleEvent.CREATE);
} catch (IOException ex) {
LOG.error("Failed to update the container state to CREATE for"
+ "container:{}" + containerID, ex);
}
} else {
LOG.info("container with id : {} is in {} state and need not be closed.",

View File

@ -38,6 +38,7 @@
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.lease.Lease;
import org.apache.hadoop.ozone.lease.LeaseException;
@ -104,7 +105,7 @@ public class ContainerMapping implements Mapping {
@SuppressWarnings("unchecked")
public ContainerMapping(
final Configuration conf, final NodeManager nodeManager, final int
cacheSizeMB) throws IOException {
cacheSizeMB, EventPublisher eventPublisher) throws IOException {
this.nodeManager = nodeManager;
this.cacheSize = cacheSizeMB;
this.closer = new ContainerCloser(nodeManager, conf);
@ -122,14 +123,15 @@ public ContainerMapping(
this.lock = new ReentrantLock();
this.pipelineSelector = new PipelineSelector(nodeManager, conf);
// To be replaced with code getStorageSize once it is committed.
size = conf.getLong(OZONE_SCM_CONTAINER_SIZE_GB,
OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024;
this.containerStateManager =
new ContainerStateManager(conf, this);
this.pipelineSelector = new PipelineSelector(nodeManager,
containerStateManager, conf, eventPublisher);
this.containerCloseThreshold = conf.getFloat(
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
@ -372,6 +374,12 @@ public HddsProtos.LifeCycleState updateContainerState(
// Like releasing the lease in case of BEGIN_CREATE.
ContainerInfo updatedContainer = containerStateManager
.updateContainerState(containerInfo, event);
if (!updatedContainer.isContainerOpen()) {
Pipeline pipeline = pipelineSelector
.getPipeline(containerInfo.getPipelineName(),
containerInfo.getReplicationType());
pipelineSelector.closePipelineIfNoOpenContainers(pipeline);
}
containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
return updatedContainer.getState();
} catch (LeaseException e) {
@ -446,7 +454,7 @@ public ContainerWithPipeline getMatchingContainerWithPipeline(final long size,
.getPipeline(containerInfo.getPipelineName(),
containerInfo.getReplicationType());
if (pipeline == null) {
pipelineSelector
pipeline = pipelineSelector
.getReplicationPipeline(containerInfo.getReplicationType(),
containerInfo.getReplicationFactor());
}

View File

@ -462,6 +462,17 @@ public NavigableSet<ContainerID> getMatchingContainerIDs(
factor, type);
}
/**
* Returns a set of open ContainerIDs that reside on a pipeline.
*
* @param pipeline Pipeline of the Containers.
* @return Set of containers that match the specific query parameters.
*/
public NavigableSet<ContainerID> getMatchingContainerIDsByPipeline(String
pipeline) {
return containers.getOpenContainerIDsByPipeline(pipeline);
}
/**
* Returns the containerInfo with pipeline for the given container id.
* @param selector -- Pipeline selector class.

View File

@ -346,7 +346,7 @@ public void updateState(ContainerInfo info, LifeCycleState currentState,
}
// In case the container is set to closed state, it needs to be removed from
// the pipeline Map.
if (newState == LifeCycleState.CLOSED) {
if (!info.isContainerOpen()) {
openPipelineMap.remove(info.getPipelineName(), id);
}
}

View File

@ -102,19 +102,28 @@ public Set<Pipeline> getPipelines(UUID datanode) {
Collections.unmodifiableSet(v));
}
/**
/**
* Adds a pipeline entry to a given dataNode in the map.
* @param pipeline Pipeline to be added
*/
public synchronized void addPipeline(Pipeline pipeline) throws SCMException {
public synchronized void addPipeline(Pipeline pipeline) {
for (DatanodeDetails details : pipeline.getDatanodes().values()) {
UUID dnId = details.getUuid();
dn2PipelineMap
.computeIfAbsent(dnId,k->Collections.synchronizedSet(new HashSet<>()))
.computeIfAbsent(dnId,
k -> Collections.synchronizedSet(new HashSet<>()))
.add(pipeline);
}
}
public synchronized void removePipeline(Pipeline pipeline) {
for (DatanodeDetails details : pipeline.getDatanodes().values()) {
UUID dnId = details.getUuid();
dn2PipelineMap.computeIfPresent(dnId,
(k, v) -> {v.remove(pipeline); return v;});
}
}
public Map<UUID, Set<Pipeline>> getDn2PipelineMap() {
return Collections.unmodifiableMap(dn2PipelineMap);
}

View File

@ -38,14 +38,14 @@ public abstract class PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineManager.class);
private final List<Pipeline> activePipelines;
private final Map<String, Pipeline> activePipelineMap;
private final Map<String, Pipeline> pipelineMap;
private final AtomicInteger pipelineIndex;
private final Node2PipelineMap node2PipelineMap;
public PipelineManager(Node2PipelineMap map) {
activePipelines = new LinkedList<>();
pipelineIndex = new AtomicInteger(0);
activePipelineMap = new WeakHashMap<>();
pipelineMap = new WeakHashMap<>();
node2PipelineMap = map;
}
@ -85,8 +85,8 @@ public synchronized final Pipeline getPipeline(String pipelineName) {
Pipeline pipeline = null;
// 1. Check if pipeline already exists
if (activePipelineMap.containsKey(pipelineName)) {
pipeline = activePipelineMap.get(pipelineName);
if (pipelineMap.containsKey(pipelineName)) {
pipeline = pipelineMap.get(pipelineName);
LOG.debug("Returning pipeline for pipelineName:{}", pipelineName);
return pipeline;
} else {
@ -115,11 +115,6 @@ public abstract Pipeline allocatePipeline(
*/
public abstract void initializePipeline(Pipeline pipeline) throws IOException;
public void removePipeline(Pipeline pipeline) {
activePipelines.remove(pipeline);
activePipelineMap.remove(pipeline.getPipelineName());
}
/**
* Find a Pipeline that is operational.
*
@ -172,16 +167,28 @@ public Pipeline createPipeline(ReplicationFactor replicationFactor,
+ "replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor);
activePipelines.add(pipeline);
activePipelineMap.put(pipeline.getPipelineName(), pipeline);
pipelineMap.put(pipeline.getPipelineName(), pipeline);
node2PipelineMap.addPipeline(pipeline);
}
return pipeline;
}
/**
* Close the pipeline with the given clusterId.
* Remove the pipeline from active allocation
* @param pipeline pipeline to be finalized
*/
public abstract void closePipeline(String pipelineID) throws IOException;
public synchronized void finalizePipeline(Pipeline pipeline) {
activePipelines.remove(pipeline);
}
/**
*
* @param pipeline
*/
public void closePipeline(Pipeline pipeline) {
pipelineMap.remove(pipeline.getPipelineName());
node2PipelineMap.removePipeline(pipeline);
}
/**
* list members in the pipeline .

View File

@ -19,11 +19,14 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.SCMContainerPlacementRandom;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
@ -33,6 +36,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.statemachine
.InvalidStateTransitionException;
@ -48,6 +52,7 @@
import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -65,6 +70,8 @@ public class PipelineSelector {
private final ContainerPlacementPolicy placementPolicy;
private final NodeManager nodeManager;
private final Configuration conf;
private final ContainerStateManager containerStateManager;
private final EventPublisher eventPublisher;
private final RatisManagerImpl ratisManager;
private final StandaloneManagerImpl standaloneManager;
private final long containerSize;
@ -79,9 +86,12 @@ public class PipelineSelector {
* @param nodeManager - node manager
* @param conf - Ozone Config
*/
public PipelineSelector(NodeManager nodeManager, Configuration conf) {
public PipelineSelector(NodeManager nodeManager,
ContainerStateManager containerStateManager, Configuration conf,
EventPublisher eventPublisher) {
this.nodeManager = nodeManager;
this.conf = conf;
this.eventPublisher = eventPublisher;
this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf);
this.containerSize = OzoneConsts.GB * this.conf.getInt(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
@ -99,6 +109,7 @@ public PipelineSelector(NodeManager nodeManager, Configuration conf) {
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
this.containerStateManager = containerStateManager;
pipelineLeaseManager = new LeaseManager<>("PipelineCreation",
pipelineCreationLeaseTimeout);
pipelineLeaseManager.start();
@ -306,15 +317,54 @@ public Pipeline getPipeline(String pipelineName,
}
/**
* Close the pipeline with the given clusterId.
* Finalize a given pipeline.
*/
public void closePipeline(ReplicationType replicationType, String
pipelineID) throws IOException {
PipelineManager manager = getPipelineManager(replicationType);
public void finalizePipeline(Pipeline pipeline) throws IOException {
PipelineManager manager = getPipelineManager(pipeline.getType());
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Closing pipeline. pipelineID: {}", pipelineID);
manager.closePipeline(pipelineID);
LOG.debug("Finalizing pipeline. pipelineID: {}", pipeline.getPipelineName());
// Remove the pipeline from active allocation
manager.finalizePipeline(pipeline);
updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE);
closePipelineIfNoOpenContainers(pipeline);
}
/**
* Close a given pipeline.
*/
public void closePipelineIfNoOpenContainers(Pipeline pipeline) throws IOException {
if (pipeline.getLifeCycleState() != LifeCycleState.CLOSING) {
return;
}
NavigableSet<ContainerID> containerIDS = containerStateManager
.getMatchingContainerIDsByPipeline(pipeline.getPipelineName());
if (containerIDS.size() == 0) {
updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CLOSE);
LOG.info("Closing pipeline. pipelineID: {}", pipeline.getPipelineName());
}
}
/**
* Close a given pipeline.
*/
private void closePipeline(Pipeline pipeline) {
PipelineManager manager = getPipelineManager(pipeline.getType());
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getPipelineName());
NavigableSet<ContainerID> containers =
containerStateManager
.getMatchingContainerIDsByPipeline(pipeline.getPipelineName());
Preconditions.checkArgument(containers.size() == 0);
manager.closePipeline(pipeline);
}
private void closeContainersByPipeline(Pipeline pipeline) {
NavigableSet<ContainerID> containers =
containerStateManager
.getMatchingContainerIDsByPipeline(pipeline.getPipelineName());
for (ContainerID id : containers) {
eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id);
}
}
/**
@ -352,7 +402,7 @@ public void removePipeline(UUID dnId) {
node2PipelineMap.getPipelines(dnId);
for (Pipeline pipeline : pipelineSet) {
getPipelineManager(pipeline.getType())
.removePipeline(pipeline);
.closePipeline(pipeline);
}
node2PipelineMap.removeDatanode(dnId);
}
@ -398,12 +448,12 @@ public void updatePipelineState(Pipeline pipeline,
break;
case FINALIZE:
//TODO: cleanup pipeline by closing all the containers on the pipeline
closeContainersByPipeline(pipeline);
break;
case CLOSE:
case TIMEOUT:
// TODO: Release the nodes here when pipelines are destroyed
closePipeline(pipeline);
break;
default:
throw new SCMException("Unsupported pipeline LifeCycleEvent.",

View File

@ -108,13 +108,15 @@ public void initializePipeline(Pipeline pipeline) throws IOException {
}
/**
* Close the pipeline with the given clusterId.
*
* @param pipelineID
* Close the pipeline.
*/
@Override
public void closePipeline(String pipelineID) throws IOException {
public void closePipeline(Pipeline pipeline) {
super.closePipeline(pipeline);
for (DatanodeDetails node : pipeline.getMachines()) {
// A node should always be the in ratis members list.
Preconditions.checkArgument(ratisMembers.remove(node));
}
//TODO: should the raft ring also be destroyed as well?
}
/**

View File

@ -101,13 +101,14 @@ public void initializePipeline(Pipeline pipeline) {
}
/**
* Close the pipeline with the given clusterId.
*
* @param pipelineID
* Close the pipeline.
*/
@Override
public void closePipeline(String pipelineID) throws IOException {
public void closePipeline(Pipeline pipeline) {
super.closePipeline(pipeline);
for (DatanodeDetails node : pipeline.getMachines()) {
// A node should always be the in standalone members list.
Preconditions.checkArgument(standAloneMembers.remove(node));
}
}
/**

View File

@ -192,7 +192,7 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
scmNodeManager = new SCMNodeManager(
conf, scmStorage.getClusterID(), this, eventQueue);
scmContainerManager = new ContainerMapping(
conf, getScmNodeManager(), cacheSize);
conf, getScmNodeManager(), cacheSize, eventQueue);
scmBlockManager = new BlockManagerImpl(
conf, getScmNodeManager(), scmContainerManager, eventQueue);

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
@ -73,7 +74,8 @@ public static void setUp() throws Exception {
throw new IOException("Unable to create test directory path");
}
nodeManager = new MockNodeManager(true, 10);
mapping = new ContainerMapping(conf, nodeManager, 128);
mapping =
new ContainerMapping(conf, nodeManager, 128, new EventQueue());
blockManager = new BlockManagerImpl(conf, nodeManager, mapping, null);
if(conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT)){

View File

@ -66,7 +66,8 @@ public static void setUp() throws Exception {
configuration
.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
nodeManager = new MockNodeManager(true, 10);
mapping = new ContainerMapping(configuration, nodeManager, 128);
mapping = new ContainerMapping(configuration, nodeManager, 128,
new EventQueue());
eventQueue = new EventQueue();
eventQueue.addHandler(CLOSE_CONTAINER,
new CloseContainerEventHandler(mapping));
@ -122,12 +123,7 @@ public void testCloseContainerEventWithValidContainers() throws IOException {
// state, so firing close container event should not queue CLOSE
// command in the Datanode
Assert.assertEquals(0, nodeManager.getCommandCount(datanode));
// Make sure the information is logged
Assert.assertTrue(logCapturer.getOutput().contains(
"container with id : " + id.getId()
+ " is in ALLOCATED state and need not be closed"));
//Execute these state transitions so that we can close the container.
mapping.updateContainerState(id.getId(), CREATE);
mapping.updateContainerState(id.getId(), CREATED);
eventQueue.fireEvent(CLOSE_CONTAINER,
new ContainerID(
@ -164,12 +160,7 @@ public void testCloseContainerEventWithRatis() throws IOException {
Assert.assertEquals(closeCount[i], nodeManager.getCommandCount(details));
i++;
}
// Make sure the information is logged
Assert.assertTrue(logCapturer.getOutput().contains(
"container with id : " + id.getId()
+ " is in ALLOCATED state and need not be closed"));
//Execute these state transitions so that we can close the container.
mapping.updateContainerState(id.getId(), CREATE);
mapping.updateContainerState(id.getId(), CREATED);
eventQueue.fireEvent(CLOSE_CONTAINER, id);
eventQueue.processAll(1000);

View File

@ -30,6 +30,7 @@
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
@ -84,7 +85,8 @@ public static void setUp() throws Exception {
throw new IOException("Unable to create test directory path");
}
nodeManager = new MockNodeManager(true, 10);
mapping = new ContainerMapping(conf, nodeManager, 128);
mapping = new ContainerMapping(conf, nodeManager, 128,
new EventQueue());
xceiverClientManager = new XceiverClientManager(conf);
random = new Random();
}

View File

@ -32,6 +32,7 @@
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
@ -79,7 +80,8 @@ public static void setUp() throws Exception {
configuration.set(OzoneConfigKeys.OZONE_METADATA_DIRS,
testDir.getAbsolutePath());
nodeManager = new MockNodeManager(true, 10);
mapping = new ContainerMapping(configuration, nodeManager, 128);
mapping = new ContainerMapping(configuration, nodeManager, 128,
new EventQueue());
}
@AfterClass

View File

@ -105,9 +105,10 @@ SCMNodeManager createNodeManager(OzoneConfiguration config)
ContainerMapping createContainerManager(Configuration config,
NodeManager scmNodeManager) throws IOException {
EventQueue eventQueue = new EventQueue();
final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
return new ContainerMapping(config, scmNodeManager, cacheSize);
return new ContainerMapping(config, scmNodeManager, cacheSize, eventQueue);
}

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.common.helpers
.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.NavigableSet;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
.ReplicationType.RATIS;
public class TestPipelineClose {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static StorageContainerManager scm;
private static ContainerWithPipeline ratisContainer1;
private static ContainerWithPipeline ratisContainer2;
private static ContainerStateMap stateMap;
private static ContainerMapping mapping;
private static PipelineSelector pipelineSelector;
/**
* Create a MiniDFSCluster for testing.
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(6).build();
cluster.waitForClusterToBeReady();
scm = cluster.getStorageContainerManager();
mapping = (ContainerMapping)scm.getScmContainerManager();
stateMap = mapping.getStateManager().getContainerStateMap();
ratisContainer1 = mapping.allocateContainer(RATIS, THREE, "testOwner");
ratisContainer2 = mapping.allocateContainer(RATIS, THREE, "testOwner");
pipelineSelector = mapping.getPipelineSelector();
// At this stage, there should be 2 pipeline one with 1 open container each.
// Try closing the both the pipelines, one with a closed container and
// the other with an open container.
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testPipelineCloseWithClosedContainer() throws IOException {
NavigableSet<ContainerID> set = stateMap.getOpenContainerIDsByPipeline(
ratisContainer1.getPipeline().getPipelineName());
long cId = ratisContainer1.getContainerInfo().getContainerID();
Assert.assertEquals(1, set.size());
Assert.assertEquals(cId, set.first().getId());
// Now close the container and it should not show up while fetching
// containers by pipeline
mapping
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CREATE);
mapping
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CREATED);
mapping
.updateContainerState(cId, HddsProtos.LifeCycleEvent.FINALIZE);
mapping
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
NavigableSet<ContainerID> setClosed = stateMap.getOpenContainerIDsByPipeline(
ratisContainer1.getPipeline().getPipelineName());
Assert.assertEquals(0, setClosed.size());
pipelineSelector.finalizePipeline(ratisContainer1.getPipeline());
Pipeline pipeline1 = pipelineSelector
.getPipeline(ratisContainer1.getPipeline().getPipelineName(),
ratisContainer1.getContainerInfo().getReplicationType());
Assert.assertNull(pipeline1);
Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(),
HddsProtos.LifeCycleState.CLOSED);
for (DatanodeDetails dn : ratisContainer1.getPipeline().getMachines()) {
// Assert that the pipeline has been removed from Node2PipelineMap as well
Assert.assertEquals(pipelineSelector.getNode2PipelineMap()
.getPipelines(dn.getUuid()).size(), 0);
}
}
@Test
public void testPipelineCloseWithOpenContainer() throws IOException,
TimeoutException, InterruptedException {
NavigableSet<ContainerID> setOpen = stateMap.getOpenContainerIDsByPipeline(
ratisContainer2.getPipeline().getPipelineName());
Assert.assertEquals(1, setOpen.size());
long cId2 = ratisContainer2.getContainerInfo().getContainerID();
mapping
.updateContainerState(cId2, HddsProtos.LifeCycleEvent.CREATE);
mapping
.updateContainerState(cId2, HddsProtos.LifeCycleEvent.CREATED);
pipelineSelector.finalizePipeline(ratisContainer2.getPipeline());
Assert.assertEquals(ratisContainer2.getPipeline().getLifeCycleState(),
HddsProtos.LifeCycleState.CLOSING);
Pipeline pipeline2 = pipelineSelector
.getPipeline(ratisContainer2.getPipeline().getPipelineName(),
ratisContainer2.getContainerInfo().getReplicationType());
Assert.assertEquals(pipeline2.getLifeCycleState(),
HddsProtos.LifeCycleState.CLOSING);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.scm;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@ -115,7 +116,8 @@ public void setup() throws Exception {
cluster.getStorageContainerManager().stop();
nodeManager = cluster.getStorageContainerManager().getScmNodeManager();
mapping = new ContainerMapping(conf, nodeManager, 128);
mapping = new ContainerMapping(conf, nodeManager, 128,
new EventQueue());
blockManager = new BlockManagerImpl(conf, nodeManager, mapping, null);
// blockManager.allocateBlock() will create containers if there is none