From cf571133b89643e4db96ec4cc7988d4a2f850be9 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Wed, 19 Dec 2018 13:42:06 +0530 Subject: [PATCH] HDDS-893. pipeline status is ALLOCATED in scmcli listPipelines command. Contributed by Lokesh Jain. --- .../common/helpers/ContainerWithPipeline.java | 7 ++- .../hadoop/hdds/scm/pipeline/Pipeline.java | 38 +++++++++++++-- .../UnknownPipelineStateException.java | 46 +++++++++++++++++++ ...ocationProtocolClientSideTranslatorPB.java | 9 ++-- ...ocationProtocolServerSideTranslatorPB.java | 8 ++-- hadoop-hdds/common/src/main/proto/hdds.proto | 8 +++- .../hdds/scm/pipeline/SCMPipelineManager.java | 6 ++- 7 files changed, 107 insertions(+), 15 deletions(-) create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/UnknownPipelineStateException.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java index 8f4925571d..5b01bd2c65 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.UnknownPipelineStateException; /** * Class wraps ozone container info. @@ -48,13 +49,15 @@ public Pipeline getPipeline() { } public static ContainerWithPipeline fromProtobuf( - HddsProtos.ContainerWithPipeline allocatedContainer) { + HddsProtos.ContainerWithPipeline allocatedContainer) + throws UnknownPipelineStateException { return new ContainerWithPipeline( ContainerInfo.fromProtobuf(allocatedContainer.getContainerInfo()), Pipeline.getFromProtobuf(allocatedContainer.getPipeline())); } - public HddsProtos.ContainerWithPipeline getProtobuf() { + public HddsProtos.ContainerWithPipeline getProtobuf() + throws UnknownPipelineStateException { HddsProtos.ContainerWithPipeline.Builder builder = HddsProtos.ContainerWithPipeline.newBuilder(); builder.setContainerInfo(getContainerInfo().getProtobuf()) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index a103bd7439..7fcfc8aad7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -136,11 +136,13 @@ public boolean isEmpty() { return nodeStatus.isEmpty(); } - public HddsProtos.Pipeline getProtobufMessage() { + public HddsProtos.Pipeline getProtobufMessage() + throws UnknownPipelineStateException { HddsProtos.Pipeline.Builder builder = HddsProtos.Pipeline.newBuilder() .setId(id.getProtobuf()) .setType(type) .setFactor(factor) + .setState(PipelineState.getProtobuf(state)) .setLeaderID("") .addAllMembers(nodeStatus.keySet().stream() .map(DatanodeDetails::getProtoBufMessage) @@ -148,11 +150,13 @@ public HddsProtos.Pipeline getProtobufMessage() { return builder.build(); } - public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline) { + public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline) + throws UnknownPipelineStateException { + Preconditions.checkNotNull(pipeline, "Pipeline is null"); return new Builder().setId(PipelineID.getFromProtobuf(pipeline.getId())) .setFactor(pipeline.getFactor()) .setType(pipeline.getType()) - .setState(PipelineState.ALLOCATED) + .setState(PipelineState.fromProtobuf(pipeline.getState())) .setNodes(pipeline.getMembersList().stream() .map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList())) .build(); @@ -270,6 +274,32 @@ public Pipeline build() { * Possible Pipeline states in SCM. */ public enum PipelineState { - ALLOCATED, OPEN, CLOSED + ALLOCATED, OPEN, CLOSED; + + public static PipelineState fromProtobuf(HddsProtos.PipelineState state) + throws UnknownPipelineStateException { + Preconditions.checkNotNull(state, "Pipeline state is null"); + switch (state) { + case PIPELINE_ALLOCATED: return ALLOCATED; + case PIPELINE_OPEN: return OPEN; + case PIPELINE_CLOSED: return CLOSED; + default: + throw new UnknownPipelineStateException( + "Pipeline state: " + state + " is not recognized."); + } + } + + public static HddsProtos.PipelineState getProtobuf(PipelineState state) + throws UnknownPipelineStateException { + Preconditions.checkNotNull(state, "Pipeline state is null"); + switch (state) { + case ALLOCATED: return HddsProtos.PipelineState.PIPELINE_ALLOCATED; + case OPEN: return HddsProtos.PipelineState.PIPELINE_OPEN; + case CLOSED: return HddsProtos.PipelineState.PIPELINE_CLOSED; + default: + throw new UnknownPipelineStateException( + "Pipeline state: " + state + " is not recognized."); + } + } } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/UnknownPipelineStateException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/UnknownPipelineStateException.java new file mode 100644 index 0000000000..1aa935803b --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/UnknownPipelineStateException.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + + import java.io.IOException; + +/** + * Signals that a pipeline state is not recognized. + */ +public class UnknownPipelineStateException extends IOException { + /** + * Constructs an {@code UnknownPipelineStateException} with {@code null} + * as its error detail message. + */ + public UnknownPipelineStateException() { + super(); + } + + /** + * Constructs an {@code UnknownPipelineStateException} with the specified + * detail message. + * + * @param message + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public UnknownPipelineStateException(String message) { + super(message); + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 117e58dd4a..a24ba88a39 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -315,9 +315,12 @@ public List listPipelines() throws IOException { .newBuilder().build(); ListPipelineResponseProto response = rpcProxy.listPipelines( NULL_RPC_CONTROLLER, request); - return response.getPipelinesList().stream() - .map(Pipeline::getFromProtobuf) - .collect(Collectors.toList()); + List list = new ArrayList<>(); + for (HddsProtos.Pipeline pipeline : response.getPipelinesList()) { + Pipeline fromProtobuf = Pipeline.getFromProtobuf(pipeline); + list.add(fromProtobuf); + } + return list; } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java index 2ae559a4aa..1630875b7e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -227,9 +227,11 @@ public ListPipelineResponseProto listPipelines( try { ListPipelineResponseProto.Builder builder = ListPipelineResponseProto .newBuilder(); - List pipelineIDs = impl.listPipelines(); - pipelineIDs.stream().map(Pipeline::getProtobufMessage) - .forEach(builder::addPipelines); + List pipelines = impl.listPipelines(); + for (Pipeline pipeline : pipelines) { + HddsProtos.Pipeline protobufMessage = pipeline.getProtobufMessage(); + builder.addPipelines(protobufMessage); + } return builder.build(); } catch (IOException e) { throw new ServiceException(e); diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index cf3d6d4519..2d8f43ff1e 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -44,11 +44,17 @@ message PipelineID { required string id = 1; } +enum PipelineState { + PIPELINE_ALLOCATED = 1; + PIPELINE_OPEN = 2; + PIPELINE_CLOSED = 3; +} + message Pipeline { required string leaderID = 1; repeated DatanodeDetailsProto members = 2; // TODO: remove the state and leaderID from this class - optional LifeCycleState state = 3 [default = OPEN]; + optional PipelineState state = 3 [default = PIPELINE_ALLOCATED]; optional ReplicationType type = 4 [default = STAND_ALONE]; optional ReplicationFactor factor = 5 [default = ONE]; required PipelineID id = 6; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 382483f87a..8b7a56dd3d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -99,8 +99,10 @@ private void initializePipelineState() throws IOException { (MetadataKeyFilters.MetadataKeyFilter[])null); for (Map.Entry entry : pipelines) { - Pipeline pipeline = Pipeline.getFromProtobuf( - HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue())); + HddsProtos.Pipeline.Builder pipelineBuilder = HddsProtos.Pipeline + .newBuilder(HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue())); + Pipeline pipeline = Pipeline.getFromProtobuf(pipelineBuilder.setState( + HddsProtos.PipelineState.PIPELINE_ALLOCATED).build()); Preconditions.checkNotNull(pipeline); stateManager.addPipeline(pipeline); nodeManager.addPipeline(pipeline);