HDDS-656. Add logic for pipeline report and action processing in new pipeline code. Contributed by Lokesh Jain.

This commit is contained in:
Nandakumar 2018-10-17 13:56:54 +05:30
parent 533138718c
commit 64a43c92c2
17 changed files with 804 additions and 280 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.thirdparty.com.google.protobuf
@ -73,6 +74,24 @@ public static XceiverClientRatis newXceiverClientRatis(
retryPolicy);
}
public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
Configuration ozoneConf) {
final String rpcType = ozoneConf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
Pipeline pipeline1 =
new Pipeline(pipeline.getNodes().get(0).getUuidString(),
HddsProtos.LifeCycleState.OPEN, pipeline.getType(),
pipeline.getFactor(), PipelineID.valueOf(pipeline.getID().getId()));
return new XceiverClientRatis(pipeline1,
SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
retryPolicy);
}
private final Pipeline pipeline;
private final RpcType rpcType;
private final AtomicReference<RaftClient> client = new AtomicReference<>();

View File

@ -23,12 +23,14 @@
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
@ -40,17 +42,17 @@ public final class Pipeline {
private final ReplicationType type;
private final ReplicationFactor factor;
private LifeCycleState state;
private List<DatanodeDetails> nodes;
private PipelineState state;
private Map<DatanodeDetails, Long> nodeStatus;
private Pipeline(PipelineID id, ReplicationType type,
ReplicationFactor factor, LifeCycleState state,
List<DatanodeDetails> nodes) {
ReplicationFactor factor, PipelineState state,
Map<DatanodeDetails, Long> nodeStatus) {
this.id = id;
this.type = type;
this.factor = factor;
this.state = state;
this.nodes = nodes;
this.nodeStatus = nodeStatus;
}
/**
@ -85,36 +87,68 @@ public ReplicationFactor getFactor() {
*
* @return - LifeCycleStates.
*/
public LifeCycleState getLifeCycleState() {
PipelineState getPipelineState() {
// TODO: See if we need to expose this.
return state;
}
public boolean isClosed() {
return state == PipelineState.CLOSED;
}
public boolean isOpen() {
return state == PipelineState.OPEN;
}
void reportDatanode(DatanodeDetails dn) throws IOException {
if (nodeStatus.get(dn) == null) {
throw new IOException(
String.format("Datanode=%s not part of pipeline=%s", dn, id));
}
nodeStatus.put(dn, System.currentTimeMillis());
}
boolean isHealthy() {
for (Long reportedTime : nodeStatus.values()) {
if (reportedTime < 0) {
return false;
}
}
return true;
}
/**
* Returns the list of nodes which form this pipeline.
*
* @return List of DatanodeDetails
*/
public List<DatanodeDetails> getNodes() {
return new ArrayList<>(nodes);
return new ArrayList<>(nodeStatus.keySet());
}
public HddsProtos.Pipeline getProtobufMessage() {
HddsProtos.Pipeline.Builder builder = HddsProtos.Pipeline.newBuilder();
builder.setId(id.getProtobuf());
builder.setType(type);
builder.setState(state);
builder.addAllMembers(nodes.stream().map(
DatanodeDetails::getProtoBufMessage).collect(Collectors.toList()));
HddsProtos.Pipeline.Builder builder = HddsProtos.Pipeline.newBuilder()
.setId(id.getProtobuf())
.setType(type)
.setFactor(factor)
.setLeaderID("")
.addAllMembers(nodeStatus.keySet().stream()
.map(DatanodeDetails::getProtoBufMessage)
.collect(Collectors.toList()));
return builder.build();
}
public static Pipeline fromProtobuf(HddsProtos.Pipeline pipeline) {
return new Pipeline(PipelineID.getFromProtobuf(pipeline.getId()),
pipeline.getType(), pipeline.getFactor(), pipeline.getState(),
pipeline.getMembersList().stream().map(DatanodeDetails::getFromProtoBuf)
.collect(Collectors.toList()));
return new Builder().setId(PipelineID.getFromProtobuf(pipeline.getId()))
.setFactor(pipeline.getFactor())
.setType(pipeline.getType())
.setState(PipelineState.ALLOCATED)
.setNodes(pipeline.getMembersList().stream()
.map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList()))
.build();
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -131,7 +165,7 @@ public boolean equals(Object o) {
.append(type, that.type)
.append(factor, that.factor)
.append(state, that.state)
.append(nodes, that.nodes)
.append(nodeStatus, that.nodeStatus)
.isEquals();
}
@ -142,7 +176,7 @@ public int hashCode() {
.append(type)
.append(factor)
.append(state)
.append(nodes)
.append(nodeStatus)
.toHashCode();
}
@ -161,17 +195,17 @@ public static class Builder {
private PipelineID id = null;
private ReplicationType type = null;
private ReplicationFactor factor = null;
private LifeCycleState state = null;
private List<DatanodeDetails> nodes = null;
private PipelineState state = null;
private Map<DatanodeDetails, Long> nodeStatus = null;
public Builder() {}
public Builder(Pipeline pipeline) {
this.id = pipeline.getID();
this.type = pipeline.getType();
this.factor = pipeline.getFactor();
this.state = pipeline.getLifeCycleState();
this.nodes = pipeline.getNodes();
this.id = pipeline.id;
this.type = pipeline.type;
this.factor = pipeline.factor;
this.state = pipeline.state;
this.nodeStatus = pipeline.nodeStatus;
}
public Builder setId(PipelineID id1) {
@ -189,13 +223,14 @@ public Builder setFactor(ReplicationFactor factor1) {
return this;
}
public Builder setState(LifeCycleState state1) {
public Builder setState(PipelineState state1) {
this.state = state1;
return this;
}
public Builder setNodes(List<DatanodeDetails> nodes1) {
this.nodes = nodes1;
public Builder setNodes(List<DatanodeDetails> nodes) {
this.nodeStatus = new LinkedHashMap<>();
nodes.forEach(node -> nodeStatus.put(node, -1L));
return this;
}
@ -204,8 +239,12 @@ public Pipeline build() {
Preconditions.checkNotNull(type);
Preconditions.checkNotNull(factor);
Preconditions.checkNotNull(state);
Preconditions.checkNotNull(nodes);
return new Pipeline(id, type, factor, state, nodes);
Preconditions.checkNotNull(nodeStatus);
return new Pipeline(id, type, factor, state, nodeStatus);
}
}
enum PipelineState {
ALLOCATED, OPEN, CLOSED
}
}

View File

@ -47,6 +47,7 @@ message PipelineID {
message Pipeline {
required string leaderID = 1;
repeated DatanodeDetailsProto members = 2;
// TODO: remove the state and leaderID from this class
optional LifeCycleState state = 3 [default = OPEN];
optional ReplicationType type = 4 [default = STAND_ALONE];
optional ReplicationFactor factor = 5 [default = ONE];

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.PipelineActionsFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Handles pipeline actions from datanode.
*/
public class PipelineActionHandler implements
EventHandler<PipelineActionsFromDatanode> {
public static final Logger LOG = LoggerFactory.getLogger(
PipelineActionHandler.class);
private final PipelineManager pipelineManager;
public PipelineActionHandler(PipelineManager pipelineManager) {
this.pipelineManager = pipelineManager;
}
@Override
public void onMessage(PipelineActionsFromDatanode report,
EventPublisher publisher) {
for (PipelineAction action : report.getReport().getPipelineActionsList()) {
if (action.getAction() == PipelineAction.Action.CLOSE) {
PipelineID pipelineID = null;
try {
pipelineID = PipelineID.
getFromProtobuf(action.getClosePipeline().getPipelineID());
pipelineManager.finalizePipeline(pipelineID);
} catch (IOException ioe) {
LOG.error("Could not execute pipeline action={} pipeline={} {}",
action, pipelineID, ioe);
}
} else {
LOG.error("unknown pipeline action:{}" + action.getAction());
}
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@ -35,13 +36,13 @@ public final class PipelineFactory {
private Map<ReplicationType, PipelineProvider> providers;
PipelineFactory(NodeManager nodeManager,
PipelineStateManager stateManager) {
PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager,
Configuration conf) {
providers = new HashMap<>();
providers.put(ReplicationType.STAND_ALONE,
new SimplePipelineProvider(nodeManager));
providers.put(ReplicationType.RATIS,
new RatisPipelineProvider(nodeManager, stateManager));
new RatisPipelineProvider(nodeManager, stateManager, conf));
}
public Pipeline create(ReplicationType type, ReplicationFactor factor)

View File

@ -41,18 +41,25 @@ Pipeline createPipeline(ReplicationType type, List<DatanodeDetails> nodes)
Pipeline getPipeline(PipelineID pipelineID) throws IOException;
List<Pipeline> getPipelinesByType(ReplicationType type);
List<Pipeline> getPipelinesByTypeAndFactor(ReplicationType type,
ReplicationFactor factor);
void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID)
throws IOException;
void removeContainerFromPipeline(PipelineID pipelineID, ContainerID containerID)
throws IOException;
void removeContainerFromPipeline(PipelineID pipelineID,
ContainerID containerID) throws IOException;
Set<ContainerID> getContainersInPipeline(PipelineID pipelineID)
throws IOException;
int getNumberOfContainers(PipelineID pipelineID) throws IOException;
void finalizePipeline(PipelineID pipelineID) throws IOException;
void closePipeline(PipelineID pipelineId) throws IOException;
void openPipeline(PipelineID pipelineId) throws IOException;
void removePipeline(PipelineID pipelineID) throws IOException;
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.server
.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Handles Pipeline Reports from datanode.
*/
public class PipelineReportHandler implements
EventHandler<PipelineReportFromDatanode> {
private static final Logger LOGGER = LoggerFactory
.getLogger(PipelineReportHandler.class);
private final PipelineManager pipelineManager;
private final Configuration conf;
public PipelineReportHandler(PipelineManager pipelineManager,
Configuration conf) {
Preconditions.checkNotNull(pipelineManager);
this.pipelineManager = pipelineManager;
this.conf = conf;
}
@Override
public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
EventPublisher publisher) {
Preconditions.checkNotNull(pipelineReportFromDatanode);
DatanodeDetails dn = pipelineReportFromDatanode.getDatanodeDetails();
PipelineReportsProto pipelineReport =
pipelineReportFromDatanode.getReport();
Preconditions.checkNotNull(dn, "Pipeline Report is "
+ "missing DatanodeDetails.");
LOGGER.trace("Processing pipeline report for dn: {}", dn);
for (PipelineReport report : pipelineReport.getPipelineReportList()) {
try {
processPipelineReport(report, dn);
} catch (IOException e) {
LOGGER.error("Could not process pipeline report={} from dn={} {}",
report, dn, e);
}
}
}
private void processPipelineReport(PipelineReport report, DatanodeDetails dn)
throws IOException {
PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID());
Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
pipeline.reportDatanode(dn);
if (pipeline.isHealthy()) {
// if all the dns have reported, pipeline can be moved to OPEN state
pipelineManager.openPipeline(pipelineID);
}
} else if (pipeline.isClosed()) {
int numContainers = pipelineManager.getNumberOfContainers(pipelineID);
if (numContainers == 0) {
// if all the containers have been closed the pipeline can be destroyed
try (XceiverClientRatis client =
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
client.destroyPipeline();
}
// after successfully destroying the pipeline, the pipeline can be
// removed from the pipeline manager
pipelineManager.removePipeline(pipelineID);
}
} else {
// In OPEN state case just report the datanode
pipeline.reportDatanode(dn);
}
}
}

View File

@ -19,25 +19,16 @@
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE;
/**
* Manages the state of pipelines in SCM. All write operations like pipeline
@ -52,95 +43,9 @@ class PipelineStateManager {
org.apache.hadoop.hdds.scm.pipelines.PipelineStateManager.class);
private final PipelineStateMap pipelineStateMap;
private final StateMachine<LifeCycleState, LifeCycleEvent> stateMachine;
private final LeaseManager<Pipeline> pipelineLeaseManager;
PipelineStateManager(Configuration conf) {
this.pipelineStateMap = new PipelineStateMap();
Set<LifeCycleState> finalStates = new HashSet<>();
long pipelineCreationLeaseTimeout = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
// TODO: Use LeaseManager for creation of pipelines.
// Add pipeline initialization logic.
this.pipelineLeaseManager = new LeaseManager<>("PipelineCreation",
pipelineCreationLeaseTimeout);
this.pipelineLeaseManager.start();
finalStates.add(LifeCycleState.CLOSED);
this.stateMachine = new StateMachine<>(LifeCycleState.ALLOCATED,
finalStates);
initializeStateMachine();
}
/*
* Event and State Transition Mapping.
*
* State: ALLOCATED ---------------> CREATING
* Event: CREATE
*
* State: CREATING ---------------> OPEN
* Event: CREATED
*
* State: OPEN ---------------> CLOSING
* Event: FINALIZE
*
* State: CLOSING ---------------> CLOSED
* Event: CLOSE
*
* State: CREATING ---------------> CLOSED
* Event: TIMEOUT
*
*
* Container State Flow:
*
* [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
* (CREATE) | (CREATED) (FINALIZE) |
* | |
* | |
* |(TIMEOUT) |(CLOSE)
* | |
* +--------> [CLOSED] <--------+
*/
/**
* Add javadoc.
*/
private void initializeStateMachine() {
stateMachine.addTransition(LifeCycleState.ALLOCATED,
LifeCycleState.CREATING, LifeCycleEvent.CREATE);
stateMachine.addTransition(LifeCycleState.CREATING,
LifeCycleState.OPEN, LifeCycleEvent.CREATED);
stateMachine.addTransition(LifeCycleState.OPEN,
LifeCycleState.CLOSING, LifeCycleEvent.FINALIZE);
stateMachine.addTransition(LifeCycleState.CLOSING,
LifeCycleState.CLOSED, LifeCycleEvent.CLOSE);
stateMachine.addTransition(LifeCycleState.CREATING,
LifeCycleState.CLOSED, LifeCycleEvent.TIMEOUT);
}
Pipeline updatePipelineState(PipelineID pipelineID, LifeCycleEvent event)
throws IOException {
Pipeline pipeline = null;
try {
pipeline = pipelineStateMap.getPipeline(pipelineID);
LifeCycleState newState =
stateMachine.getNextState(pipeline.getLifeCycleState(), event);
return pipelineStateMap.updatePipelineState(pipeline.getID(), newState);
} catch (InvalidStateTransitionException ex) {
String error = String.format("Failed to update pipeline state %s, "
+ "reason: invalid state transition from state: %s upon "
+ "event: %s.", pipeline.getID(), pipeline.getLifeCycleState(),
event);
LOG.error(error);
throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
}
}
void addPipeline(Pipeline pipeline) throws IOException {
@ -156,14 +61,23 @@ Pipeline getPipeline(PipelineID pipelineID) throws IOException {
return pipelineStateMap.getPipeline(pipelineID);
}
List<Pipeline> getPipelines(HddsProtos.ReplicationType type) {
return pipelineStateMap.getPipelines(type);
List<Pipeline> getPipelinesByType(ReplicationType type) {
return pipelineStateMap.getPipelinesByType(type);
}
List<Pipeline> getPipelinesByTypeAndFactor(ReplicationType type,
ReplicationFactor factor) {
return pipelineStateMap.getPipelinesByTypeAndFactor(type, factor);
}
Set<ContainerID> getContainers(PipelineID pipelineID) throws IOException {
return pipelineStateMap.getContainers(pipelineID);
}
int getNumberOfContainers(PipelineID pipelineID) throws IOException {
return pipelineStateMap.getNumberOfContainers(pipelineID);
}
void removePipeline(PipelineID pipelineID) throws IOException {
pipelineStateMap.removePipeline(pipelineID);
}
@ -173,7 +87,24 @@ void removeContainerFromPipeline(PipelineID pipelineID,
pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
}
void close() {
pipelineLeaseManager.shutdown();
Pipeline finalizePipeline(PipelineID pipelineId) throws IOException {
Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId);
if (!pipeline.isClosed()) {
pipeline = pipelineStateMap
.updatePipelineState(pipelineId, PipelineState.CLOSED);
}
return pipeline;
}
Pipeline openPipeline(PipelineID pipelineId) throws IOException {
Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId);
if (pipeline.isClosed()) {
throw new IOException("Closed pipeline can not be opened");
}
if (pipeline.getPipelineState() == PipelineState.ALLOCATED) {
pipeline = pipelineStateMap
.updatePipelineState(pipelineId, PipelineState.OPEN);
}
return pipeline;
}
}

View File

@ -18,9 +18,10 @@
package org.apache.hadoop.hdds.scm.pipeline;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,6 +46,7 @@ class PipelineStateMap {
PipelineStateMap() {
// TODO: Use TreeMap for range operations?
this.pipelineMap = new HashMap<>();
this.pipeline2container = new HashMap<>();
@ -86,8 +88,7 @@ void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID)
"container Id cannot be null");
Pipeline pipeline = getPipeline(pipelineID);
// TODO: verify the state we need the pipeline to be in
if (!isOpen(pipeline)) {
if (!pipeline.isOpen()) {
throw new IOException(
String.format("%s is not in open state", pipelineID));
}
@ -115,7 +116,7 @@ Pipeline getPipeline(PipelineID pipelineID) throws IOException {
* @param type - ReplicationType
* @return List of pipelines which have the specified replication type
*/
List<Pipeline> getPipelines(ReplicationType type) {
List<Pipeline> getPipelinesByType(ReplicationType type) {
Preconditions.checkNotNull(type, "Replication type cannot be null");
return pipelineMap.values().stream().filter(p -> p.getType().equals(type))
@ -123,10 +124,25 @@ List<Pipeline> getPipelines(ReplicationType type) {
}
/**
* Get set of containers corresponding to a pipeline.
* Get open pipeline corresponding to specified replication type and factor.
*
* @param type - ReplicationType
* @param factor - ReplicationFactor
* @return List of open pipelines with specified replication type and factor
*/
List<Pipeline> getPipelinesByTypeAndFactor(ReplicationType type,
ReplicationFactor factor) {
return pipelineMap.values().stream()
.filter(pipeline -> pipeline.isOpen() && pipeline.getType() == type
&& pipeline.getFactor() == factor)
.collect(Collectors.toList());
}
/**
* Get set of containerIDs corresponding to a pipeline.
*
* @param pipelineID - PipelineID
* @return Set of Containers belonging to the pipeline
* @return Set of containerIDs belonging to the pipeline
* @throws IOException if pipeline is not found
*/
Set<ContainerID> getContainers(PipelineID pipelineID)
@ -138,6 +154,21 @@ Set<ContainerID> getContainers(PipelineID pipelineID)
return new HashSet<>(containerIDs);
}
/**
* Get number of containers corresponding to a pipeline.
*
* @param pipelineID - PipelineID
* @return Number of containers belonging to the pipeline
* @throws IOException if pipeline is not found
*/
int getNumberOfContainers(PipelineID pipelineID) throws IOException {
Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
if (containerIDs == null) {
throw new IOException(String.format("%s not found", pipelineID));
}
return containerIDs.size();
}
/**
* Remove pipeline from the data structures.
*
@ -147,12 +178,18 @@ Set<ContainerID> getContainers(PipelineID pipelineID)
void removePipeline(PipelineID pipelineID) throws IOException {
Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null");
//TODO: Add a flag which suppresses exception if pipeline does not exist?
Set<ContainerID> containerIDs = getContainers(pipelineID);
Pipeline pipeline = getPipeline(pipelineID);
if (!pipeline.isClosed()) {
throw new IOException(
String.format("Pipeline with %s is not yet closed", pipelineID));
}
Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
if (containerIDs.size() != 0) {
throw new IOException(
String.format("Pipeline with %s is not empty", pipelineID));
}
pipelineMap.remove(pipelineID);
pipeline2container.remove(pipelineID);
}
@ -172,12 +209,8 @@ void removeContainerFromPipeline(PipelineID pipelineID,
Preconditions.checkNotNull(containerID,
"container Id cannot be null");
Pipeline pipeline = getPipeline(pipelineID);
Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
containerIDs.remove(containerID);
if (containerIDs.size() == 0 && isClosingOrClosed(pipeline)) {
removePipeline(pipelineID);
}
}
/**
@ -189,24 +222,13 @@ void removeContainerFromPipeline(PipelineID pipelineID,
* @return Pipeline with the updated state
* @throws IOException if pipeline does not exist
*/
Pipeline updatePipelineState(PipelineID pipelineID, LifeCycleState state)
Pipeline updatePipelineState(PipelineID pipelineID, PipelineState state)
throws IOException {
Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null");
Preconditions.checkNotNull(state, "Pipeline LifeCycleState cannot be null");
Pipeline pipeline = getPipeline(pipelineID);
pipeline = pipelineMap
.put(pipelineID, Pipeline.newBuilder(pipeline).setState(state).build());
// TODO: Verify if need to throw exception for non-existent pipeline
return pipeline;
}
private boolean isClosingOrClosed(Pipeline pipeline) {
LifeCycleState state = pipeline.getLifeCycleState();
return state == LifeCycleState.CLOSING || state == LifeCycleState.CLOSED;
}
private boolean isOpen(Pipeline pipeline) {
return pipeline.getLifeCycleState() == LifeCycleState.OPEN;
final Pipeline pipeline = getPipeline(pipelineID);
return pipelineMap.compute(pipelineID,
(id, p) -> Pipeline.newBuilder(pipeline).setState(state).build());
}
}

View File

@ -23,11 +23,12 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
import java.io.IOException;
import java.lang.reflect.Constructor;
@ -44,11 +45,13 @@ public class RatisPipelineProvider implements PipelineProvider {
private final NodeManager nodeManager;
private final PipelineStateManager stateManager;
private final Configuration conf;
RatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager) {
PipelineStateManager stateManager, Configuration conf) {
this.nodeManager = nodeManager;
this.stateManager = stateManager;
this.conf = conf;
}
/**
@ -90,7 +93,7 @@ private static ContainerPlacementPolicy createContainerPlacementPolicy(
public Pipeline create(ReplicationFactor factor) throws IOException {
// Get set of datanodes already used for ratis pipeline
Set<DatanodeDetails> dnsUsed = new HashSet<>();
stateManager.getPipelines(ReplicationType.RATIS)
stateManager.getPipelinesByType(ReplicationType.RATIS)
.forEach(p -> dnsUsed.addAll(p.getNodes()));
// Get list of healthy nodes
@ -107,13 +110,15 @@ public Pipeline create(ReplicationFactor factor) throws IOException {
throw new IOException(e);
}
return Pipeline.newBuilder()
Pipeline pipeline = Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setState(LifeCycleState.ALLOCATED)
.setState(PipelineState.ALLOCATED)
.setType(ReplicationType.RATIS)
.setFactor(factor)
.setNodes(dns)
.build();
initializePipeline(pipeline);
return pipeline;
}
@Override
@ -126,10 +131,19 @@ public Pipeline create(List<DatanodeDetails> nodes) throws IOException {
}
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setState(LifeCycleState.ALLOCATED)
.setState(PipelineState.ALLOCATED)
.setType(ReplicationType.RATIS)
.setFactor(factor)
.setNodes(nodes)
.build();
}
private void initializePipeline(Pipeline pipeline)
throws IOException {
// TODO: remove old code in XceiverClientRatis#newXceiverClientRatis
try (XceiverClientRatis client =
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
client.createPipeline();
}
}
}

View File

@ -22,11 +22,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
@ -63,11 +64,14 @@ public class SCMPipelineManager implements PipelineManager {
private final PipelineStateManager stateManager;
private final MetadataStore pipelineStore;
public SCMPipelineManager(Configuration conf, NodeManager nodeManager)
throws IOException {
private final EventPublisher eventPublisher;
private final NodeManager nodeManager;
public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
EventPublisher eventPublisher) throws IOException {
this.lock = new ReentrantReadWriteLock();
this.stateManager = new PipelineStateManager(conf);
this.pipelineFactory = new PipelineFactory(nodeManager, stateManager);
this.pipelineFactory = new PipelineFactory(nodeManager, stateManager, conf);
int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
File metaDir = getOzoneMetaDirPath(conf);
@ -78,8 +82,10 @@ public SCMPipelineManager(Configuration conf, NodeManager nodeManager)
.setDbFile(pipelineDBPath)
.setCacheSize(cacheSize * OzoneConsts.MB)
.build();
initializePipelineState();
this.eventPublisher = eventPublisher;
this.nodeManager = nodeManager;
}
private void initializePipelineState() throws IOException {
@ -95,6 +101,8 @@ private void initializePipelineState() throws IOException {
.fromProtobuf(HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
Preconditions.checkNotNull(pipeline);
stateManager.addPipeline(pipeline);
// TODO: add pipeline to node manager
// nodeManager.addPipeline(pipeline);
}
}
@ -104,16 +112,10 @@ public synchronized Pipeline createPipeline(
lock.writeLock().lock();
try {
Pipeline pipeline = pipelineFactory.create(type, factor);
pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(),
pipeline.getProtobufMessage().toByteArray());
stateManager.addPipeline(pipeline);
try {
pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(),
pipeline.getProtobufMessage().toByteArray());
} catch (IOException ioe) {
// if db operation fails we need to revert the pipeline creation in
// state manager.
stateManager.removePipeline(pipeline.getID());
throw ioe;
}
// TODO: add pipeline to node manager
return pipeline;
} finally {
lock.writeLock().unlock();
@ -143,6 +145,27 @@ public Pipeline getPipeline(PipelineID pipelineID) throws IOException {
}
}
@Override
public List<Pipeline> getPipelinesByType(ReplicationType type) {
lock.readLock().lock();
try {
return stateManager.getPipelinesByType(type);
} finally {
lock.readLock().unlock();
}
}
@Override
public List<Pipeline> getPipelinesByTypeAndFactor(ReplicationType type,
ReplicationFactor factor) {
lock.readLock().lock();
try {
return stateManager.getPipelinesByTypeAndFactor(type, factor);
} finally {
lock.readLock().unlock();
}
}
@Override
public void addContainerToPipeline(PipelineID pipelineID,
ContainerID containerID) throws IOException {
@ -176,28 +199,30 @@ public Set<ContainerID> getContainersInPipeline(PipelineID pipelineID)
}
}
@Override
public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
return stateManager.getNumberOfContainers(pipelineID);
}
@Override
public void finalizePipeline(PipelineID pipelineId) throws IOException {
lock.writeLock().lock();
try {
//TODO: close all containers in this pipeline
Pipeline pipeline =
stateManager.updatePipelineState(pipelineId, LifeCycleEvent.FINALIZE);
pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(),
pipeline.getProtobufMessage().toByteArray());
stateManager.finalizePipeline(pipelineId);
Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
for (ContainerID containerID : containerIDs) {
eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
}
} finally {
lock.writeLock().unlock();
}
}
@Override
public void closePipeline(PipelineID pipelineId) throws IOException {
public void openPipeline(PipelineID pipelineId) throws IOException {
lock.writeLock().lock();
try {
Pipeline pipeline =
stateManager.updatePipelineState(pipelineId, LifeCycleEvent.CLOSE);
pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(),
pipeline.getProtobufMessage().toByteArray());
stateManager.openPipeline(pipelineId);
} finally {
lock.writeLock().unlock();
}
@ -209,6 +234,7 @@ public void removePipeline(PipelineID pipelineID) throws IOException {
try {
stateManager.removePipeline(pipelineID);
pipelineStore.delete(pipelineID.getProtobuf().toByteArray());
// TODO: remove pipeline from node manager
} finally {
lock.writeLock().unlock();
}
@ -216,11 +242,8 @@ public void removePipeline(PipelineID pipelineID) throws IOException {
@Override
public void close() throws IOException {
lock.writeLock().lock();
try {
stateManager.close();
} finally {
lock.writeLock().unlock();
if (pipelineStore != null) {
pipelineStore.close();
}
}
}

View File

@ -22,8 +22,8 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
import java.io.IOException;
import java.util.Collections;
@ -54,7 +54,7 @@ public Pipeline create(ReplicationFactor factor) throws IOException {
Collections.shuffle(dns);
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setState(LifeCycleState.ALLOCATED)
.setState(PipelineState.ALLOCATED)
.setType(ReplicationType.STAND_ALONE)
.setFactor(factor)
.setNodes(dns.subList(0, factor.getNumber()))
@ -71,7 +71,7 @@ public Pipeline create(List<DatanodeDetails> nodes) throws IOException {
}
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setState(LifeCycleState.ALLOCATED)
.setState(PipelineState.ALLOCATED)
.setType(ReplicationType.STAND_ALONE)
.setFactor(factor)
.setNodes(nodes)

View File

@ -17,8 +17,12 @@
package org.apache.hadoop.hdds.scm;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.server
.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.mockito.Mockito;
import static org.mockito.Mockito.when;
@ -307,6 +311,19 @@ public static PipelineReportsProto getRandomPipelineReports() {
return PipelineReportsProto.newBuilder().build();
}
public static PipelineReportFromDatanode getRandomPipelineReportFromDatanode(
DatanodeDetails dn,
org.apache.hadoop.hdds.scm.pipeline.PipelineID... pipelineIDs) {
PipelineReportsProto.Builder reportBuilder =
PipelineReportsProto.newBuilder();
for (org.apache.hadoop.hdds.scm.pipeline.PipelineID pipelineID :
pipelineIDs) {
reportBuilder.addPipelineReport(
PipelineReport.newBuilder().setPipelineID(pipelineID.getProtobuf()));
}
return new PipelineReportFromDatanode(dn, reportBuilder.build());
}
/**
* Creates container report with the given ContainerInfo(s).
*

View File

@ -48,15 +48,21 @@ public void init() throws Exception {
}
private Pipeline createDummyPipeline(int numNodes) {
return createDummyPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE, numNodes);
}
private Pipeline createDummyPipeline(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor, int numNodes) {
List<DatanodeDetails> nodes = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
nodes.add(TestUtils.randomDatanodeDetails());
}
return Pipeline.newBuilder()
.setType(HddsProtos.ReplicationType.RATIS)
.setFactor(HddsProtos.ReplicationFactor.ONE)
.setType(type)
.setFactor(factor)
.setNodes(nodes)
.setState(HddsProtos.LifeCycleState.ALLOCATED)
.setState(Pipeline.PipelineState.ALLOCATED)
.setId(PipelineID.randomId())
.build();
}
@ -89,7 +95,7 @@ public void testAddAndGetPipeline() throws IOException {
Assert.assertTrue(pipeline == pipeline1);
// clean up
stateManager.removePipeline(pipeline1.getID());
removePipeline(pipeline);
}
@Test
@ -102,9 +108,63 @@ public void testGetPipelines() throws IOException {
pipelines.add(pipeline);
stateManager.addPipeline(pipeline);
Set<Pipeline> pipelines1 = new HashSet<>(stateManager.getPipelines(
Set<Pipeline> pipelines1 = new HashSet<>(stateManager.getPipelinesByType(
HddsProtos.ReplicationType.RATIS));
Assert.assertEquals(pipelines, pipelines1);
// clean up
for (Pipeline pipeline1 : pipelines) {
removePipeline(pipeline1);
}
}
@Test
public void testGetPipelinesByTypeAndFactor() throws IOException {
Set<Pipeline> pipelines = new HashSet<>();
for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType
.values()) {
for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
.values()) {
for (int i = 0; i < 5; i++) {
// 5 pipelines in allocated state for each type and factor
Pipeline pipeline =
createDummyPipeline(type, factor, factor.getNumber());
stateManager.addPipeline(pipeline);
pipelines.add(pipeline);
// 5 pipelines in allocated state for each type and factor
pipeline = createDummyPipeline(type, factor, factor.getNumber());
stateManager.addPipeline(pipeline);
stateManager.openPipeline(pipeline.getID());
pipelines.add(pipeline);
// 5 pipelines in allocated state for each type and factor
pipeline = createDummyPipeline(type, factor, factor.getNumber());
stateManager.addPipeline(pipeline);
stateManager.finalizePipeline(pipeline.getID());
pipelines.add(pipeline);
}
}
}
for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType
.values()) {
for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
.values()) {
// verify pipelines received
List<Pipeline> pipelines1 =
stateManager.getPipelinesByTypeAndFactor(type, factor);
Assert.assertEquals(5, pipelines1.size());
pipelines1.stream().forEach(p -> {
Assert.assertEquals(p.getType(), type);
Assert.assertEquals(p.getFactor(), factor);
});
}
}
//clean up
for (Pipeline pipeline : pipelines) {
removePipeline(pipeline);
}
}
@Test
@ -115,8 +175,8 @@ public void testAddAndGetContainer() throws IOException {
pipeline = stateManager.getPipeline(pipeline.getID());
try {
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
stateManager.addContainerToPipeline(pipeline.getID(),
ContainerID.valueof(++containerID));
Assert.fail("Container should not have been added");
} catch (IOException e) {
// add container possible only in container with open state
@ -124,16 +184,15 @@ public void testAddAndGetContainer() throws IOException {
}
// move pipeline to open state
updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
HddsProtos.LifeCycleEvent.CREATED);
stateManager.openPipeline(pipeline.getID());
// add three containers
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(containerID));
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
stateManager.addContainerToPipeline(pipeline.getID(),
ContainerID.valueof(containerID));
stateManager.addContainerToPipeline(pipeline.getID(),
ContainerID.valueof(++containerID));
stateManager.addContainerToPipeline(pipeline.getID(),
ContainerID.valueof(++containerID));
//verify the number of containers returned
Set<ContainerID> containerIDs =
@ -142,8 +201,8 @@ public void testAddAndGetContainer() throws IOException {
removePipeline(pipeline);
try {
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
stateManager.addContainerToPipeline(pipeline.getID(),
ContainerID.valueof(++containerID));
Assert.fail("Container should not have been added");
} catch (IOException e) {
// Can not add a container to removed pipeline
@ -155,11 +214,22 @@ public void testAddAndGetContainer() throws IOException {
public void testRemovePipeline() throws IOException {
Pipeline pipeline = createDummyPipeline(1);
stateManager.addPipeline(pipeline);
updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
HddsProtos.LifeCycleEvent.CREATED);
// close the pipeline
stateManager.openPipeline(pipeline.getID());
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(1));
try {
stateManager.removePipeline(pipeline.getID());
Assert.fail("Pipeline should not have been removed");
} catch (IOException e) {
// can not remove a pipeline which already has containers
Assert.assertTrue(e.getMessage().contains("not yet closed"));
}
// close the pipeline
stateManager.finalizePipeline(pipeline.getID());
try {
stateManager.removePipeline(pipeline.getID());
Assert.fail("Pipeline should not have been removed");
@ -178,64 +248,87 @@ public void testRemoveContainer() throws IOException {
Pipeline pipeline = createDummyPipeline(1);
// create an open pipeline in stateMap
stateManager.addPipeline(pipeline);
updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
HddsProtos.LifeCycleEvent.CREATED);
stateManager.openPipeline(pipeline.getID());
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(containerID));
stateManager
.removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(containerID));
// removeContainerFromPipeline in open pipeline does not lead to removal of pipeline
Assert.assertNotNull(stateManager.getPipeline(pipeline.getID()));
stateManager.addContainerToPipeline(pipeline.getID(),
ContainerID.valueof(containerID));
Assert.assertEquals(1, stateManager.getContainers(pipeline.getID()).size());
stateManager.removeContainerFromPipeline(pipeline.getID(),
ContainerID.valueof(containerID));
Assert.assertEquals(0, stateManager.getContainers(pipeline.getID()).size());
// add two containers in the pipeline
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
stateManager.addContainerToPipeline(pipeline.getID(),
ContainerID.valueof(++containerID));
stateManager.addContainerToPipeline(pipeline.getID(),
ContainerID.valueof(++containerID));
Assert.assertEquals(2, stateManager.getContainers(pipeline.getID()).size());
// move pipeline to closing state
updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.FINALIZE);
stateManager.finalizePipeline(pipeline.getID());
stateManager
.removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(containerID));
// removal of second last container in closing or closed pipeline should
// not lead to removal of pipeline
Assert.assertNotNull(stateManager.getPipeline(pipeline.getID()));
stateManager
.removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(--containerID));
// removal of last container in closing or closed pipeline should lead to
// removal of pipeline
try {
stateManager.getPipeline(pipeline.getID());
Assert.fail("getPipeline should have failed.");
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains(" not found"));
}
stateManager.removeContainerFromPipeline(pipeline.getID(),
ContainerID.valueof(containerID));
stateManager.removeContainerFromPipeline(pipeline.getID(),
ContainerID.valueof(--containerID));
Assert.assertEquals(0, stateManager.getContainers(pipeline.getID()).size());
// clean up
stateManager.removePipeline(pipeline.getID());
}
@Test
public void testUpdatePipelineState() throws IOException {
public void testFinalizePipeline() throws IOException {
Pipeline pipeline = createDummyPipeline(1);
stateManager.addPipeline(pipeline);
updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
HddsProtos.LifeCycleEvent.CREATED, HddsProtos.LifeCycleEvent.FINALIZE,
HddsProtos.LifeCycleEvent.CLOSE);
// finalize on ALLOCATED pipeline
stateManager.finalizePipeline(pipeline.getID());
Assert.assertEquals(Pipeline.PipelineState.CLOSED,
stateManager.getPipeline(pipeline.getID()).getPipelineState());
// clean up
removePipeline(pipeline);
pipeline = createDummyPipeline(1);
stateManager.addPipeline(pipeline);
updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
HddsProtos.LifeCycleEvent.TIMEOUT);
stateManager.openPipeline(pipeline.getID());
// finalize on OPEN pipeline
stateManager.finalizePipeline(pipeline.getID());
Assert.assertEquals(Pipeline.PipelineState.CLOSED,
stateManager.getPipeline(pipeline.getID()).getPipelineState());
// clean up
removePipeline(pipeline);
pipeline = createDummyPipeline(1);
stateManager.addPipeline(pipeline);
stateManager.openPipeline(pipeline.getID());
stateManager.finalizePipeline(pipeline.getID());
// finalize should work on already closed pipeline
stateManager.finalizePipeline(pipeline.getID());
Assert.assertEquals(Pipeline.PipelineState.CLOSED,
stateManager.getPipeline(pipeline.getID()).getPipelineState());
// clean up
removePipeline(pipeline);
}
private void updateEvents(PipelineID pipelineID,
HddsProtos.LifeCycleEvent... events) throws IOException {
for (HddsProtos.LifeCycleEvent event : events) {
stateManager.updatePipelineState(pipelineID, event);
}
@Test
public void testOpenPipeline() throws IOException {
Pipeline pipeline = createDummyPipeline(1);
stateManager.addPipeline(pipeline);
// open on ALLOCATED pipeline
stateManager.openPipeline(pipeline.getID());
Assert.assertEquals(Pipeline.PipelineState.OPEN,
stateManager.getPipeline(pipeline.getID()).getPipelineState());
stateManager.openPipeline(pipeline.getID());
// open should work on already open pipeline
Assert.assertEquals(Pipeline.PipelineState.OPEN,
stateManager.getPipeline(pipeline.getID()).getPipelineState());
// clean up
removePipeline(pipeline);
}
private void removePipeline(Pipeline pipeline) throws IOException {
stateManager.finalizePipeline(pipeline.getID());
Set<ContainerID> containerIDs =
stateManager.getContainers(pipeline.getID());
for (ContainerID containerID : containerIDs) {

View File

@ -47,7 +47,7 @@ public void init() throws Exception {
nodeManager = new MockNodeManager(true, 10);
stateManager = new PipelineStateManager(new OzoneConfiguration());
provider = new RatisPipelineProvider(nodeManager,
stateManager);
stateManager, new OzoneConfiguration());
}
@Test
@ -57,8 +57,8 @@ public void testCreatePipelineWithFactor() throws IOException {
stateManager.addPipeline(pipeline);
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getLifeCycleState(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(pipeline.getPipelineState(),
Pipeline.PipelineState.ALLOCATED);
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
factor = HddsProtos.ReplicationFactor.ONE;
@ -70,8 +70,8 @@ public void testCreatePipelineWithFactor() throws IOException {
.isEmpty());
Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline1.getFactor(), factor);
Assert.assertEquals(pipeline1.getLifeCycleState(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(pipeline1.getPipelineState(),
Pipeline.PipelineState.ALLOCATED);
Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
}
@ -89,16 +89,16 @@ public void testCreatePipelineWithNodes() throws IOException {
Pipeline pipeline = provider.create(createListOfNodes(factor.getNumber()));
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getLifeCycleState(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(
pipeline.getPipelineState(), Pipeline.PipelineState.ALLOCATED);
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
factor = HddsProtos.ReplicationFactor.ONE;
pipeline = provider.create(createListOfNodes(factor.getNumber()));
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getLifeCycleState(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(pipeline.getPipelineState(),
Pipeline.PipelineState.ALLOCATED);
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
}
}

View File

@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.TestSCMContainerManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Test cases to verify PipelineManager.
*/
public class TestSCMPipelineManager {
private static MockNodeManager nodeManager;
private static File testDir;
private static Configuration conf;
@BeforeClass
public static void setUp() throws Exception {
conf = new OzoneConfiguration();
testDir = GenericTestUtils
.getTestDir(TestSCMContainerManager.class.getSimpleName());
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
boolean folderExisted = testDir.exists() || testDir.mkdirs();
if (!folderExisted) {
throw new IOException("Unable to create test directory path");
}
nodeManager = new MockNodeManager(true, 20);
}
@AfterClass
public static void cleanup() throws IOException {
FileUtil.fullyDelete(testDir);
}
@Test
public void testPipelineReload() throws IOException {
PipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager, new EventQueue());
Set<Pipeline> pipelines = new HashSet<>();
for (int i = 0; i < 5; i++) {
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
pipelines.add(pipeline);
}
pipelineManager.close();
// new pipeline manager should be able to load the pipelines from the db
pipelineManager =
new SCMPipelineManager(conf, nodeManager,
new EventQueue());
List<Pipeline> pipelineList =
pipelineManager.getPipelinesByType(HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipelines, new HashSet<>(pipelineList));
// clean up
for (Pipeline pipeline : pipelines) {
pipelineManager.finalizePipeline(pipeline.getID());
pipelineManager.removePipeline(pipeline.getID());
}
pipelineManager.close();
}
@Test
public void testRemovePipeline() throws IOException {
PipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager, new EventQueue());
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
pipelineManager.openPipeline(pipeline.getID());
pipelineManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(1));
pipelineManager.finalizePipeline(pipeline.getID());
pipelineManager
.removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(1));
pipelineManager.removePipeline(pipeline.getID());
pipelineManager.close();
// new pipeline manager should not be able to load removed pipelines
pipelineManager =
new SCMPipelineManager(conf, nodeManager,
new EventQueue());
try {
pipelineManager.getPipeline(pipeline.getID());
Assert.fail("Pipeline should not have been retrieved");
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains("not found"));
}
// clean up
pipelineManager.close();
}
@Test
public void testPipelineReport() throws IOException {
PipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager, new EventQueue());
// create a pipeline in allocated state with no dns yet reported
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
Assert
.assertFalse(pipelineManager.getPipeline(pipeline.getID()).isHealthy());
Assert
.assertFalse(pipelineManager.getPipeline(pipeline.getID()).isOpen());
// get pipeline report from each dn in the pipeline
PipelineReportHandler pipelineReportHandler =
new PipelineReportHandler(pipelineManager, conf);
for (DatanodeDetails dn: pipeline.getNodes()) {
PipelineReportFromDatanode pipelineReportFromDatanode =
TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getID());
// pipeline is not healthy until all dns report
Assert.assertFalse(
pipelineManager.getPipeline(pipeline.getID()).isHealthy());
pipelineReportHandler
.onMessage(pipelineReportFromDatanode, new EventQueue());
}
// pipeline is healthy when all dns report
Assert
.assertTrue(pipelineManager.getPipeline(pipeline.getID()).isHealthy());
// pipeline should now move to open state
Assert
.assertTrue(pipelineManager.getPipeline(pipeline.getID()).isOpen());
// close the pipeline
pipelineManager.finalizePipeline(pipeline.getID());
for (DatanodeDetails dn: pipeline.getNodes()) {
PipelineReportFromDatanode pipelineReportFromDatanode =
TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getID());
// pipeline report for a closed pipeline should destroy the pipeline
// and remove it from the pipeline manager
pipelineReportHandler
.onMessage(pipelineReportFromDatanode, new EventQueue());
}
try {
pipelineManager.getPipeline(pipeline.getID());
Assert.fail("Pipeline should not have been retrieved");
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains("not found"));
}
// clean up
pipelineManager.close();
}
}

View File

@ -56,8 +56,8 @@ public void testCreatePipelineWithFactor() throws IOException {
Assert.assertEquals(pipeline.getType(),
HddsProtos.ReplicationType.STAND_ALONE);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getLifeCycleState(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(pipeline.getPipelineState(),
Pipeline.PipelineState.ALLOCATED);
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
factor = HddsProtos.ReplicationFactor.ONE;
@ -66,8 +66,8 @@ public void testCreatePipelineWithFactor() throws IOException {
Assert.assertEquals(pipeline1.getType(),
HddsProtos.ReplicationType.STAND_ALONE);
Assert.assertEquals(pipeline1.getFactor(), factor);
Assert.assertEquals(pipeline1.getLifeCycleState(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(pipeline1.getPipelineState(),
Pipeline.PipelineState.ALLOCATED);
Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
}
@ -86,8 +86,8 @@ public void testCreatePipelineWithNodes() throws IOException {
Assert.assertEquals(pipeline.getType(),
HddsProtos.ReplicationType.STAND_ALONE);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getLifeCycleState(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(pipeline.getPipelineState(),
Pipeline.PipelineState.ALLOCATED);
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
factor = HddsProtos.ReplicationFactor.ONE;
@ -95,8 +95,8 @@ public void testCreatePipelineWithNodes() throws IOException {
Assert.assertEquals(pipeline.getType(),
HddsProtos.ReplicationType.STAND_ALONE);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getLifeCycleState(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(pipeline.getPipelineState(),
Pipeline.PipelineState.ALLOCATED);
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
}
}