HDDS-238. Add Node2Pipeline Map in SCM to track ratis/standalone pipelines. Contributed by Mukul Kumar Singh.

This commit is contained in:
Xiaoyu Yao 2018-07-12 22:02:57 -07:00
parent f89e265905
commit 3f3f72221f
10 changed files with 343 additions and 23 deletions

View File

@ -456,4 +456,15 @@ public ContainerInfo build() {
replicationFactor, replicationType);
}
}
/**
* Check if a container is in open state, this will check if the
* container is either open or allocated or creating. Any containers in
* these states is managed as an open container by SCM.
*/
public boolean isContainerOpen() {
return state == HddsProtos.LifeCycleState.ALLOCATED ||
state == HddsProtos.LifeCycleState.CREATING ||
state == HddsProtos.LifeCycleState.OPEN;
}
}

View File

@ -477,7 +477,7 @@ public void processContainerReports(DatanodeDetails datanodeDetails,
List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
containerInfos = reports.getReportsList();
for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
containerInfos) {
byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID());
lock.lock();
@ -498,7 +498,9 @@ public void processContainerReports(DatanodeDetails datanodeDetails,
containerStore.put(dbKey, newState.toByteArray());
// If the container is closed, then state is already written to SCM
Pipeline pipeline = pipelineSelector.getPipeline(newState.getPipelineName(), newState.getReplicationType());
Pipeline pipeline =
pipelineSelector.getPipeline(newState.getPipelineName(),
newState.getReplicationType());
if(pipeline == null) {
pipeline = pipelineSelector
.getReplicationPipeline(newState.getReplicationType(),
@ -713,4 +715,9 @@ public void flushContainerInfo() throws IOException {
public MetadataStore getContainerStore() {
return containerStore;
}
@VisibleForTesting
public PipelineSelector getPipelineSelector() {
return pipelineSelector;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm.container;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -522,4 +523,9 @@ public boolean removeContainerReplica(ContainerID containerID,
DatanodeDetails dn) throws SCMException {
return containers.removeContainerReplica(containerID, dn);
}
@VisibleForTesting
public ContainerStateMap getContainerStateMap() {
return containers;
}
}

View File

@ -51,7 +51,7 @@
* Container State Map acts like a unified map for various attributes that are
* used to select containers when we need allocated blocks.
* <p>
* This class provides the ability to query 4 classes of attributes. They are
* This class provides the ability to query 5 classes of attributes. They are
* <p>
* 1. LifeCycleStates - LifeCycle States of container describe in which state
* a container is. For example, a container needs to be in Open State for a
@ -72,6 +72,9 @@
* Replica and THREE Replica. User can specify how many copies should be made
* for a ozone key.
* <p>
* 5.Pipeline - The pipeline constitute the set of Datanodes on which the
* open container resides physically.
* <p>
* The most common access pattern of this class is to select a container based
* on all these parameters, for example, when allocating a block we will
* select a container that belongs to user1, with Ratis replication which can
@ -86,6 +89,14 @@ public class ContainerStateMap {
private final ContainerAttribute<String> ownerMap;
private final ContainerAttribute<ReplicationFactor> factorMap;
private final ContainerAttribute<ReplicationType> typeMap;
// This map constitutes the pipeline to open container mappings.
// This map will be queried for the list of open containers on a particular
// pipeline and issue a close on corresponding containers in case of
// following events:
//1. Dead datanode.
//2. Datanode out of space.
//3. Volume loss or volume out of space.
private final ContainerAttribute<String> openPipelineMap;
private final Map<ContainerID, ContainerInfo> containerMap;
// Map to hold replicas of given container.
@ -106,6 +117,7 @@ public ContainerStateMap() {
ownerMap = new ContainerAttribute<>();
factorMap = new ContainerAttribute<>();
typeMap = new ContainerAttribute<>();
openPipelineMap = new ContainerAttribute<>();
containerMap = new HashMap<>();
autoLock = new AutoCloseableLock();
contReplicaMap = new HashMap<>();
@ -140,6 +152,9 @@ public void addContainer(ContainerInfo info)
ownerMap.insert(info.getOwner(), id);
factorMap.insert(info.getReplicationFactor(), id);
typeMap.insert(info.getReplicationType(), id);
if (info.isContainerOpen()) {
openPipelineMap.insert(info.getPipelineName(), id);
}
LOG.trace("Created container with {} successfully.", id);
}
}
@ -329,6 +344,11 @@ public void updateState(ContainerInfo info, LifeCycleState currentState,
throw new SCMException("Updating the container map failed.", ex,
FAILED_TO_CHANGE_CONTAINER_STATE);
}
// In case the container is set to closed state, it needs to be removed from
// the pipeline Map.
if (newState == LifeCycleState.CLOSED) {
openPipelineMap.remove(info.getPipelineName(), id);
}
}
/**
@ -359,6 +379,20 @@ NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) {
}
}
/**
* Returns Open containers in the SCM by the Pipeline
*
* @param pipeline - Pipeline name.
* @return NavigableSet<ContainerID>
*/
public NavigableSet<ContainerID> getOpenContainerIDsByPipeline(String pipeline) {
Preconditions.checkNotNull(pipeline);
try (AutoCloseableLock lock = autoLock.acquire()) {
return openPipelineMap.getCollection(pipeline);
}
}
/**
* Returns Containers by replication factor.
*

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.hadoop.hdds.scm.pipelines;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import java.util.Set;
import java.util.UUID;
import java.util.Map;
import java.util.HashSet;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.DUPLICATE_DATANODE;
/**
* This data structure maintains the list of pipelines which the given datanode
* is a part of.
* This information will be added whenever a new pipeline allocation happens.
*
* TODO: this information needs to be regenerated from pipeline reports on
* SCM restart
*/
public class Node2PipelineMap {
private final Map<UUID, Set<Pipeline>> dn2PipelineMap;
/**
* Constructs a Node2PipelineMap Object.
*/
public Node2PipelineMap() {
dn2PipelineMap = new ConcurrentHashMap<>();
}
/**
* Returns true if this a datanode that is already tracked by
* Node2PipelineMap.
*
* @param datanodeID - UUID of the Datanode.
* @return True if this is tracked, false if this map does not know about it.
*/
private boolean isKnownDatanode(UUID datanodeID) {
Preconditions.checkNotNull(datanodeID);
return dn2PipelineMap.containsKey(datanodeID);
}
/**
* Insert a new datanode into Node2Pipeline Map.
*
* @param datanodeID -- Datanode UUID
* @param pipelines - set of pipelines.
*/
private void insertNewDatanode(UUID datanodeID, Set<Pipeline> pipelines)
throws SCMException {
Preconditions.checkNotNull(pipelines);
Preconditions.checkNotNull(datanodeID);
if(dn2PipelineMap.putIfAbsent(datanodeID, pipelines) != null) {
throw new SCMException("Node already exists in the map",
DUPLICATE_DATANODE);
}
}
/**
* Removes datanode Entry from the map.
* @param datanodeID - Datanode ID.
*/
public synchronized void removeDatanode(UUID datanodeID) {
Preconditions.checkNotNull(datanodeID);
dn2PipelineMap.computeIfPresent(datanodeID, (k, v) -> null);
}
/**
* Returns null if there no pipelines associated with this datanode ID.
*
* @param datanode - UUID
* @return Set of pipelines or Null.
*/
public Set<Pipeline> getPipelines(UUID datanode) {
Preconditions.checkNotNull(datanode);
return dn2PipelineMap.computeIfPresent(datanode, (k, v) ->
Collections.unmodifiableSet(v));
}
/**
* Adds a pipeline entry to a given dataNode in the map.
* @param pipeline Pipeline to be added
*/
public synchronized void addPipeline(Pipeline pipeline) throws SCMException {
for (DatanodeDetails details : pipeline.getDatanodes().values()) {
UUID dnId = details.getUuid();
dn2PipelineMap
.computeIfAbsent(dnId,k->Collections.synchronizedSet(new HashSet<>()))
.add(pipeline);
}
}
public Map<UUID, Set<Pipeline>> getDn2PipelineMap() {
return Collections.unmodifiableMap(dn2PipelineMap);
}
}

View File

@ -40,11 +40,13 @@ public abstract class PipelineManager {
private final List<Pipeline> activePipelines;
private final Map<String, Pipeline> activePipelineMap;
private final AtomicInteger pipelineIndex;
private final Node2PipelineMap node2PipelineMap;
public PipelineManager() {
public PipelineManager(Node2PipelineMap map) {
activePipelines = new LinkedList<>();
pipelineIndex = new AtomicInteger(0);
activePipelineMap = new WeakHashMap<>();
node2PipelineMap = map;
}
/**
@ -66,24 +68,23 @@ public synchronized final Pipeline getPipeline(
*
* 2. This allows all nodes to part of a pipeline quickly.
*
* 3. if there are not enough free nodes, return conduits in a
* 3. if there are not enough free nodes, return pipeline in a
* round-robin fashion.
*
* TODO: Might have to come up with a better algorithm than this.
* Create a new placement policy that returns conduits in round robin
* Create a new placement policy that returns pipelines in round robin
* fashion.
*/
Pipeline pipeline =
allocatePipeline(replicationFactor);
Pipeline pipeline = allocatePipeline(replicationFactor);
if (pipeline != null) {
LOG.debug("created new pipeline:{} for container with " +
"replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor);
activePipelines.add(pipeline);
activePipelineMap.put(pipeline.getPipelineName(), pipeline);
node2PipelineMap.addPipeline(pipeline);
} else {
pipeline =
findOpenPipeline(replicationType, replicationFactor);
pipeline = findOpenPipeline(replicationType, replicationFactor);
if (pipeline != null) {
LOG.debug("re-used pipeline:{} for container with " +
"replicationType:{} replicationFactor:{}",
@ -133,6 +134,11 @@ protected int getReplicationCount(ReplicationFactor factor) {
public abstract Pipeline allocatePipeline(
ReplicationFactor replicationFactor) throws IOException;
public void removePipeline(Pipeline pipeline) {
activePipelines.remove(pipeline);
activePipelineMap.remove(pipeline.getPipelineName());
}
/**
* Find a Pipeline that is operational.
*
@ -143,7 +149,7 @@ private Pipeline findOpenPipeline(
Pipeline pipeline = null;
final int sentinal = -1;
if (activePipelines.size() == 0) {
LOG.error("No Operational conduits found. Returning null.");
LOG.error("No Operational pipelines found. Returning null.");
return null;
}
int startIndex = getNextIndex();

View File

@ -19,7 +19,6 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
@ -41,6 +40,8 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
/**
@ -55,7 +56,7 @@ public class PipelineSelector {
private final RatisManagerImpl ratisManager;
private final StandaloneManagerImpl standaloneManager;
private final long containerSize;
private final Node2PipelineMap node2PipelineMap;
/**
* Constructs a pipeline Selector.
*
@ -69,12 +70,13 @@ public PipelineSelector(NodeManager nodeManager, Configuration conf) {
this.containerSize = OzoneConsts.GB * this.conf.getInt(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
node2PipelineMap = new Node2PipelineMap();
this.standaloneManager =
new StandaloneManagerImpl(this.nodeManager, placementPolicy,
containerSize);
containerSize, node2PipelineMap);
this.ratisManager =
new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
conf);
conf, node2PipelineMap);
}
/**
@ -243,4 +245,18 @@ public void updateDatanodes(ReplicationType replicationType, String
.collect(Collectors.joining(",")));
manager.updatePipeline(pipelineID, newDatanodes);
}
public Node2PipelineMap getNode2PipelineMap() {
return node2PipelineMap;
}
public void removePipeline(UUID dnId) {
Set<Pipeline> pipelineChannelSet =
node2PipelineMap.getPipelines(dnId);
for (Pipeline pipelineChannel : pipelineChannelSet) {
getPipelineManager(pipelineChannel.getType())
.removePipeline(pipelineChannel);
}
node2PipelineMap.removeDatanode(dnId);
}
}

View File

@ -19,11 +19,11 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -60,8 +60,9 @@ public class RatisManagerImpl extends PipelineManager {
* @param nodeManager
*/
public RatisManagerImpl(NodeManager nodeManager,
ContainerPlacementPolicy placementPolicy, long size, Configuration conf) {
super();
ContainerPlacementPolicy placementPolicy, long size, Configuration conf,
Node2PipelineMap map) {
super(map);
this.conf = conf;
this.nodeManager = nodeManager;
ratisMembers = new HashSet<>();
@ -89,11 +90,11 @@ public Pipeline allocatePipeline(ReplicationFactor factor) {
ratisMembers.addAll(newNodesList);
LOG.info("Allocating a new ratis pipeline of size: {}", count);
// Start all channel names with "Ratis", easy to grep the logs.
String conduitName = PREFIX +
String pipelineName = PREFIX +
UUID.randomUUID().toString().substring(PREFIX.length());
Pipeline pipeline=
PipelineSelector.newPipelineFromNodes(newNodesList,
LifeCycleState.OPEN, ReplicationType.RATIS, factor, conduitName);
LifeCycleState.OPEN, ReplicationType.RATIS, factor, pipelineName);
try (XceiverClientRatis client =
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
client.createPipeline(pipeline.getPipelineName(), newNodesList);

View File

@ -17,11 +17,11 @@
package org.apache.hadoop.hdds.scm.pipelines.standalone;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -58,8 +58,9 @@ public class StandaloneManagerImpl extends PipelineManager {
* @param containerSize - Container Size.
*/
public StandaloneManagerImpl(NodeManager nodeManager,
ContainerPlacementPolicy placementPolicy, long containerSize) {
super();
ContainerPlacementPolicy placementPolicy, long containerSize,
Node2PipelineMap map) {
super(map);
this.nodeManager = nodeManager;
this.placementPolicy = placementPolicy;
this.containerSize = containerSize;

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.common.helpers
.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
.ReplicationType.RATIS;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
.ReplicationFactor.THREE;
public class TestNode2PipelineMap {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static StorageContainerManager scm;
private static ContainerWithPipeline ratisContainer;
private static ContainerStateMap stateMap;
private static ContainerMapping mapping;
/**
* Create a MiniDFSCluster for testing.
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
cluster.waitForClusterToBeReady();
scm = cluster.getStorageContainerManager();
mapping = (ContainerMapping)scm.getScmContainerManager();
stateMap = mapping.getStateManager().getContainerStateMap();
ratisContainer = mapping.allocateContainer(RATIS, THREE, "testOwner");
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testPipelineMap() throws IOException {
NavigableSet<ContainerID> set = stateMap.getOpenContainerIDsByPipeline(
ratisContainer.getPipeline().getPipelineName());
long cId = ratisContainer.getContainerInfo().getContainerID();
Assert.assertEquals(1, set.size());
Assert.assertEquals(cId, set.first().getId());
List<DatanodeDetails> dns = ratisContainer.getPipeline().getMachines();
Assert.assertEquals(3, dns.size());
// get pipeline details by dnid
Set<Pipeline> pipelines = mapping.getPipelineSelector()
.getNode2PipelineMap().getPipelines(dns.get(0).getUuid());
Assert.assertEquals(1, pipelines.size());
pipelines.forEach(p -> Assert.assertEquals(p.getPipelineName(),
ratisContainer.getPipeline().getPipelineName()));
// Now close the container and it should not show up while fetching
// containers by pipeline
mapping
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CREATE);
mapping
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CREATED);
mapping
.updateContainerState(cId, HddsProtos.LifeCycleEvent.FINALIZE);
mapping
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
NavigableSet<ContainerID> set2 = stateMap.getOpenContainerIDsByPipeline(
ratisContainer.getPipeline().getPipelineName());
Assert.assertEquals(0, set2.size());
}
}