HDDS-324. Use pipeline name as Ratis groupID to allow datanode to report pipeline info. Contributed by Mukul Kumar Singh.

This commit is contained in:
Xiaoyu Yao 2018-08-13 12:39:05 -07:00
parent f760a544a7
commit b4031a8f1b
43 changed files with 362 additions and 200 deletions

View File

@ -598,7 +598,7 @@ function hadoop_bootstrap
MAPRED_LIB_JARS_DIR=${MAPRED_LIB_JARS_DIR:-"share/hadoop/mapreduce/lib"}
HDDS_DIR=${HDDS_DIR:-"share/hadoop/hdds"}
HDDS_LIB_JARS_DIR=${HDDS_LIB_JARS_DIR:-"share/hadoop/hdds/lib"}
OZONE_DIR=${OZONE_DIR:-"share/hadoop/ozone"}
OZONE_DIR=${OZONE_DIR:-"share/hadoop/ozone"}q
OZONE_LIB_JARS_DIR=${OZONE_LIB_JARS_DIR:-"share/hadoop/ozone/lib"}
OZONEFS_DIR=${OZONEFS_DIR:-"share/hadoop/ozonefs"}

View File

@ -188,11 +188,10 @@ public ContainerProtos.ContainerCommandResponseProto sendCommand(
/**
* Create a pipeline.
*
* @param pipelineID - Name of the pipeline.
* @param datanodes - Datanodes
* @param pipeline - pipeline to be created.
*/
@Override
public void createPipeline(String pipelineID, List<DatanodeDetails> datanodes)
public void createPipeline(Pipeline pipeline)
throws IOException {
// For stand alone pipeline, there is no notion called setup pipeline.
return;

View File

@ -218,11 +218,10 @@ private void reconnect() throws IOException {
/**
* Create a pipeline.
*
* @param pipelineID - Name of the pipeline.
* @param datanodes - Datanodes
* @param pipeline - pipeline to be created.
*/
@Override
public void createPipeline(String pipelineID, List<DatanodeDetails> datanodes)
public void createPipeline(Pipeline pipeline)
throws IOException {
// For stand alone pipeline, there is no notion called setup pipeline.
return;

View File

@ -35,6 +35,7 @@
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
@ -87,12 +88,13 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
/**
* {@inheritDoc}
*/
public void createPipeline(String clusterId, List<DatanodeDetails> datanodes)
public void createPipeline(Pipeline pipeline)
throws IOException {
RaftGroup group = RatisHelper.newRaftGroup(datanodes);
LOG.debug("initializing pipeline:{} with nodes:{}", clusterId,
group.getPeers());
reinitialize(datanodes, group);
RaftGroupId groupId = pipeline.getId().getRaftGroupID();
RaftGroup group = RatisHelper.newRaftGroup(groupId, pipeline.getMachines());
LOG.debug("initializing pipeline:{} with nodes:{}",
pipeline.getId(), group.getPeers());
reinitialize(pipeline.getMachines(), group);
}
/**
@ -157,7 +159,7 @@ public Pipeline getPipeline() {
@Override
public void connect() throws Exception {
LOG.debug("Connecting to pipeline:{} leader:{}",
getPipeline().getPipelineName(),
getPipeline().getId(),
RatisHelper.toRaftPeerId(pipeline.getLeader()));
// TODO : XceiverClient ratis should pass the config value of
// maxOutstandingRequests so as to set the upper bound on max no of async

View File

@ -158,7 +158,7 @@ public void createContainer(XceiverClientSpi client,
private void createPipeline(XceiverClientSpi client, Pipeline pipeline)
throws IOException {
Preconditions.checkNotNull(pipeline.getPipelineName(), "Pipeline " +
Preconditions.checkNotNull(pipeline.getId(), "Pipeline " +
"name cannot be null when client create flag is set.");
// Pipeline creation is a three step process.
@ -180,8 +180,7 @@ private void createPipeline(XceiverClientSpi client, Pipeline pipeline)
// ObjectStageChangeRequestProto.Op.create,
// ObjectStageChangeRequestProto.Stage.begin);
client.createPipeline(pipeline.getPipelineName(),
pipeline.getMachines());
client.createPipeline(pipeline);
//storageContainerLocationClient.notifyObjectStageChange(
// ObjectStageChangeRequestProto.Type.pipeline,

View File

@ -114,11 +114,9 @@ public abstract ContainerCommandResponseProto sendCommand(
/**
* Create a pipeline.
*
* @param pipelineID - Name of the pipeline.
* @param datanodes - Datanodes
* @param pipeline - pipeline to be created.
*/
public abstract void createPipeline(String pipelineID,
List<DatanodeDetails> datanodes) throws IOException;
public abstract void createPipeline(Pipeline pipeline) throws IOException;
/**
* Returns pipeline Type.

View File

@ -58,7 +58,7 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
}
private HddsProtos.LifeCycleState state;
private String pipelineName;
private PipelineID pipelineID;
private ReplicationFactor replicationFactor;
private ReplicationType replicationType;
// Bytes allocated by SCM for clients.
@ -82,7 +82,7 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
ContainerInfo(
long containerID,
HddsProtos.LifeCycleState state,
String pipelineName,
PipelineID pipelineID,
long allocatedBytes,
long usedBytes,
long numberOfKeys,
@ -92,7 +92,7 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
ReplicationFactor replicationFactor,
ReplicationType repType) {
this.containerID = containerID;
this.pipelineName = pipelineName;
this.pipelineID = pipelineID;
this.allocatedBytes = allocatedBytes;
this.usedBytes = usedBytes;
this.numberOfKeys = numberOfKeys;
@ -113,7 +113,8 @@ public ContainerInfo() {
public static ContainerInfo fromProtobuf(HddsProtos.SCMContainerInfo info) {
ContainerInfo.Builder builder = new ContainerInfo.Builder();
return builder.setPipelineName(info.getPipelineName())
return builder.setPipelineID(
PipelineID.getFromProtobuf(info.getPipelineID()))
.setAllocatedBytes(info.getAllocatedBytes())
.setUsedBytes(info.getUsedBytes())
.setNumberOfKeys(info.getNumberOfKeys())
@ -147,8 +148,8 @@ public ReplicationFactor getReplicationFactor() {
return replicationFactor;
}
public String getPipelineName() {
return pipelineName;
public PipelineID getPipelineID() {
return pipelineID;
}
public long getAllocatedBytes() {
@ -217,7 +218,7 @@ public HddsProtos.SCMContainerInfo getProtobuf() {
.setNumberOfKeys(getNumberOfKeys()).setState(getState())
.setStateEnterTime(getStateEnterTime()).setContainerID(getContainerID())
.setDeleteTransactionId(getDeleteTransactionId())
.setPipelineName(getPipelineName())
.setPipelineID(getPipelineID().getProtobuf())
.setReplicationFactor(getReplicationFactor())
.setReplicationType(getReplicationType())
.setOwner(getOwner())
@ -236,7 +237,7 @@ public void setOwner(String owner) {
public String toString() {
return "ContainerInfo{"
+ "state=" + state
+ ", pipelineName=" + pipelineName
+ ", pipelineID=" + pipelineID
+ ", stateEnterTime=" + stateEnterTime
+ ", owner=" + owner
+ '}';
@ -389,7 +390,7 @@ public static class Builder {
private String owner;
private long containerID;
private long deleteTransactionId;
private String pipelineName;
private PipelineID pipelineID;
private ReplicationFactor replicationFactor;
private ReplicationType replicationType;
@ -399,8 +400,8 @@ public Builder setReplicationType(
return this;
}
public Builder setPipelineName(String pipelineName) {
this.pipelineName = pipelineName;
public Builder setPipelineID(PipelineID pipelineID) {
this.pipelineID = pipelineID;
return this;
}
@ -451,7 +452,7 @@ public Builder setDeleteTransactionId(long deleteTransactionId) {
}
public ContainerInfo build() {
return new ContainerInfo(containerID, state, pipelineName, allocated,
return new ContainerInfo(containerID, state, pipelineID, allocated,
used, keys, stateEnterTime, owner, deleteTransactionId,
replicationFactor, replicationType);
}

View File

@ -64,9 +64,7 @@ public class Pipeline {
private HddsProtos.LifeCycleState lifeCycleState;
private HddsProtos.ReplicationType type;
private HddsProtos.ReplicationFactor factor;
private String name;
// TODO: change to long based id
//private long id;
private PipelineID id;
/**
* Constructs a new pipeline data structure.
@ -75,16 +73,16 @@ public class Pipeline {
* @param lifeCycleState - Pipeline State
* @param replicationType - Replication protocol
* @param replicationFactor - replication count on datanodes
* @param name - pipelineName
* @param id - pipeline ID
*/
public Pipeline(String leaderID, HddsProtos.LifeCycleState lifeCycleState,
HddsProtos.ReplicationType replicationType,
HddsProtos.ReplicationFactor replicationFactor, String name) {
HddsProtos.ReplicationFactor replicationFactor, PipelineID id) {
this.leaderID = leaderID;
this.lifeCycleState = lifeCycleState;
this.type = replicationType;
this.factor = replicationFactor;
this.name = name;
this.id = id;
datanodes = new TreeMap<>();
}
@ -102,7 +100,7 @@ public static Pipeline getFromProtoBuf(
pipelineProto.getState(),
pipelineProto.getType(),
pipelineProto.getFactor(),
pipelineProto.getName());
PipelineID.getFromProtobuf(pipelineProto.getId()));
for (HddsProtos.DatanodeDetailsProto dataID :
pipelineProto.getMembersList()) {
@ -191,15 +189,19 @@ public HddsProtos.Pipeline getProtobufMessage() {
}
builder.setLeaderID(leaderID);
if (this.getLifeCycleState() != null) {
builder.setState(this.getLifeCycleState());
if (lifeCycleState != null) {
builder.setState(lifeCycleState);
}
if (this.getType() != null) {
builder.setType(this.getType());
if (type != null) {
builder.setType(type);
}
if (this.getFactor() != null) {
builder.setFactor(this.getFactor());
if (factor != null) {
builder.setFactor(factor);
}
if (id != null) {
builder.setId(id.getProtobuf());
}
return builder.build();
}
@ -221,12 +223,12 @@ public void setLifeCycleState(HddsProtos.LifeCycleState nextState) {
}
/**
* Gets the pipeline Name.
* Gets the pipeline id.
*
* @return - Name of the pipeline
* @return - Id of the pipeline
*/
public String getPipelineName() {
return name;
public PipelineID getId() {
return id;
}
/**
@ -245,7 +247,7 @@ public String toString() {
getDatanodes().keySet().stream()
.forEach(id -> b.
append(id.endsWith(getLeaderID()) ? "*" + id : id));
b.append(" name:").append(getPipelineName());
b.append(" id:").append(id);
if (getType() != null) {
b.append(" type:").append(getType().toString());
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container.common.helpers;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.ratis.protocol.RaftGroupId;
import java.util.UUID;
/**
* ID for the pipeline, the ID is based on UUID so that it can be used
* in Ratis as RaftGroupId, GroupID is used by the datanodes to initialize
* the ratis group they are part of.
*/
public class PipelineID {
private UUID id;
private RaftGroupId groupId;
private PipelineID(UUID id) {
this.id = id;
this.groupId = RaftGroupId.valueOf(id);
}
public static PipelineID randomId() {
return new PipelineID(UUID.randomUUID());
}
public static PipelineID valueOf(RaftGroupId groupId) {
return new PipelineID(groupId.getUuid());
}
public RaftGroupId getRaftGroupID() {
return groupId;
}
public UUID getId() {
return id;
}
public HddsProtos.PipelineID getProtobuf() {
return HddsProtos.PipelineID.newBuilder().setId(id.toString()).build();
}
public static PipelineID getFromProtobuf(HddsProtos.PipelineID protos) {
return new PipelineID(UUID.fromString(protos.getId()));
}
@Override
public String toString() {
return "pipelineId=" + id;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PipelineID that = (PipelineID) o;
return id.equals(that.id);
}
@Override
public int hashCode() {
return id.hashCode();
}
}

View File

@ -88,25 +88,28 @@ static RaftGroup emptyRaftGroup() {
return EMPTY_GROUP;
}
static RaftGroup newRaftGroup(List<DatanodeDetails> datanodes) {
final List<RaftPeer> newPeers = datanodes.stream()
.map(RatisHelper::toRaftPeer)
.collect(Collectors.toList());
return RatisHelper.newRaftGroup(newPeers);
}
static RaftGroup newRaftGroup(Collection<RaftPeer> peers) {
return peers.isEmpty()? emptyRaftGroup()
: new RaftGroup(DUMMY_GROUP_ID, peers);
}
static RaftGroup newRaftGroup(RaftGroupId groupId,
Collection<DatanodeDetails> peers) {
final List<RaftPeer> newPeers = peers.stream()
.map(RatisHelper::toRaftPeer)
.collect(Collectors.toList());
return peers.isEmpty() ? new RaftGroup(groupId, Collections.emptyList())
: new RaftGroup(groupId, newPeers);
}
static RaftGroup newRaftGroup(Pipeline pipeline) {
return newRaftGroup(toRaftPeers(pipeline));
}
static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) {
return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()),
newRaftGroup(pipeline));
newRaftGroup(pipeline.getId().getRaftGroupID(),
pipeline.getMachines()));
}
static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader) {

View File

@ -40,13 +40,17 @@ message Port {
required uint32 value = 2;
}
message PipelineID {
required string id = 1;
}
message Pipeline {
required string leaderID = 1;
repeated DatanodeDetailsProto members = 2;
optional LifeCycleState state = 3 [default = OPEN];
optional ReplicationType type = 4 [default = STAND_ALONE];
optional ReplicationFactor factor = 5 [default = ONE];
optional string name = 6;
required PipelineID id = 6;
}
message KeyValue {
@ -129,7 +133,7 @@ enum LifeCycleEvent {
message SCMContainerInfo {
required int64 containerID = 1;
required LifeCycleState state = 2;
optional string pipelineName = 3;
optional PipelineID pipelineID = 3;
// This is not total size of container, but space allocated by SCM for
// clients to write blocks
required uint64 allocatedBytes = 4;

View File

@ -73,6 +73,7 @@ public void handle(SCMCommand command, OzoneContainer container,
CloseContainerCommandProto
.parseFrom(command.getProtoBufMessage());
containerID = closeContainerProto.getContainerID();
HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID();
HddsProtos.ReplicationType replicationType =
closeContainerProto.getReplicationType();
@ -87,7 +88,7 @@ public void handle(SCMCommand command, OzoneContainer container,
context.getParent().getDatanodeDetails().getUuidString());
// submit the close container request for the XceiverServer to handle
container.submitContainerRequest(
request.build(), replicationType);
request.build(), replicationType, pipelineID);
cmdExecuted = true;
} catch (Exception e) {
LOG.error("Can't close container " + containerID, e);

View File

@ -19,7 +19,8 @@
package org.apache.hadoop.ozone.container.common.transport.server;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.ratis.shaded.io.netty.bootstrap.ServerBootstrap;
import org.apache.ratis.shaded.io.netty.channel.Channel;
import org.apache.ratis.shaded.io.netty.channel.EventLoopGroup;
@ -132,8 +133,8 @@ public void stop() {
}
@Override
public void submitRequest(
ContainerProtos.ContainerCommandRequestProto request) throws IOException {
public void submitRequest(ContainerCommandRequestProto request,
HddsProtos.PipelineID pipelineID) {
storageContainer.dispatch(request);
}
}

View File

@ -21,7 +21,8 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@ -108,8 +109,8 @@ public void stop() {
}
@Override
public void submitRequest(
ContainerProtos.ContainerCommandRequestProto request) throws IOException {
public void submitRequest(ContainerCommandRequestProto request,
HddsProtos.PipelineID pipelineID) {
storageContainer.dispatch(request);
}
}

View File

@ -18,7 +18,8 @@
package org.apache.hadoop.ozone.container.common.transport.server;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import java.io.IOException;
@ -45,6 +46,7 @@ public interface XceiverServerSpi {
* submits a containerRequest to be performed by the replication pipeline.
* @param request ContainerCommandRequest
*/
void submitRequest(ContainerProtos.ContainerCommandRequestProto request)
void submitRequest(ContainerCommandRequestProto request,
HddsProtos.PipelineID pipelineID)
throws IOException;
}

View File

@ -23,8 +23,10 @@
import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.transport.server
@ -35,12 +37,17 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.shaded.proto.RaftProtos;
import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
@ -73,6 +80,7 @@ private static long nextCallId() {
private final int port;
private final RaftServer server;
private ThreadPoolExecutor chunkExecutor;
private ClientId clientId = ClientId.randomId();
private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir,
ContainerDispatcher dispatcher, Configuration conf) throws IOException {
@ -282,17 +290,23 @@ private void processReply(RaftClientReply reply) {
@Override
public void submitRequest(
ContainerProtos.ContainerCommandRequestProto request) throws IOException {
ClientId clientId = ClientId.randomId();
ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID)
throws IOException {
// ReplicationLevel.ALL ensures the transactions corresponding to
// the request here are applied on all the raft servers.
RaftClientRequest raftClientRequest =
new RaftClientRequest(clientId, server.getId(),
RatisHelper.emptyRaftGroup().getGroupId(), nextCallId(), 0,
Message.valueOf(request.toByteString()), RaftClientRequest
// ReplicationLevel.ALL ensures the transactions corresponding to
// the request here are applied on all the raft servers.
.writeRequestType(RaftProtos.ReplicationLevel.ALL));
createRaftClientRequest(request, pipelineID,
RaftClientRequest.writeRequestType(ReplicationLevel.ALL));
CompletableFuture<RaftClientReply> reply =
server.submitClientRequestAsync(raftClientRequest);
reply.thenAccept(this::processReply);
}
private RaftClientRequest createRaftClientRequest(
ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID,
RaftClientRequest.Type type) {
return new RaftClientRequest(clientId, server.getId(),
PipelineID.getFromProtobuf(pipelineID).getRaftGroupID(),
nextCallId(),0, Message.valueOf(request.toByteString()), type);
}
}

View File

@ -168,23 +168,25 @@ public ContainerSet getContainerSet() {
* Submit ContainerRequest.
* @param request
* @param replicationType
* @param pipelineID
* @throws IOException
*/
public void submitContainerRequest(
ContainerProtos.ContainerCommandRequestProto request,
HddsProtos.ReplicationType replicationType) throws IOException {
HddsProtos.ReplicationType replicationType,
HddsProtos.PipelineID pipelineID) throws IOException {
XceiverServerSpi serverInstance;
long containerId = getContainerIdForCmd(request);
if (replicationType == HddsProtos.ReplicationType.RATIS) {
serverInstance = getRatisSerer();
Preconditions.checkNotNull(serverInstance);
serverInstance.submitRequest(request);
serverInstance.submitRequest(request, pipelineID);
LOG.info("submitting {} request over RATIS server for container {}",
request.getCmdType(), containerId);
} else {
serverInstance = getStandaAloneSerer();
Preconditions.checkNotNull(serverInstance);
getStandaAloneSerer().submitRequest(request);
getStandaAloneSerer().submitRequest(request, pipelineID);
LOG.info(
"submitting {} request over STAND_ALONE server for container {}",
request.getCmdType(), containerId);

View File

@ -22,6 +22,7 @@
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
/**
* Asks datanode to close a container.
@ -31,20 +32,25 @@ public class CloseContainerCommand
private long containerID;
private HddsProtos.ReplicationType replicationType;
private PipelineID pipelineID;
public CloseContainerCommand(long containerID,
HddsProtos.ReplicationType replicationType) {
HddsProtos.ReplicationType replicationType,
PipelineID pipelineID) {
super();
this.containerID = containerID;
this.replicationType = replicationType;
this.pipelineID = pipelineID;
}
// Should be called only for protobuf conversion
private CloseContainerCommand(long containerID,
HddsProtos.ReplicationType replicationType, long id) {
HddsProtos.ReplicationType replicationType,
PipelineID pipelineID, long id) {
super(id);
this.containerID = containerID;
this.replicationType = replicationType;
this.pipelineID = pipelineID;
}
/**
@ -71,15 +77,18 @@ public CloseContainerCommandProto getProto() {
return CloseContainerCommandProto.newBuilder()
.setContainerID(containerID)
.setCmdId(getId())
.setReplicationType(replicationType).build();
.setReplicationType(replicationType)
.setPipelineID(pipelineID.getProtobuf())
.build();
}
public static CloseContainerCommand getFromProtobuf(
CloseContainerCommandProto closeContainerProto) {
Preconditions.checkNotNull(closeContainerProto);
return new CloseContainerCommand(closeContainerProto.getContainerID(),
closeContainerProto.getReplicationType(), closeContainerProto
.getCmdId());
closeContainerProto.getReplicationType(),
PipelineID.getFromProtobuf(closeContainerProto.getPipelineID()),
closeContainerProto.getCmdId());
}
public long getContainerID() {

View File

@ -247,6 +247,7 @@ message CloseContainerCommandProto {
required int64 containerID = 1;
required hadoop.hdds.ReplicationType replicationType = 2;
required int64 cmdId = 3;
required PipelineID pipelineID = 4;
}
/**

View File

@ -78,7 +78,7 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) {
CommandForDatanode closeContainerCommand = new CommandForDatanode<>(
datanode.getUuid(),
new CloseContainerCommand(containerID.getId(),
info.getReplicationType()));
info.getReplicationType(), info.getPipelineID()));
publisher.fireEvent(DATANODE_COMMAND, closeContainerCommand);
}
try {

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@ -201,7 +202,7 @@ public ContainerWithPipeline getContainerWithPipeline(long containerID)
.parseFrom(containerBytes);
contInfo = ContainerInfo.fromProtobuf(temp);
Pipeline pipeline = pipelineSelector
.getPipeline(contInfo.getPipelineName(),
.getPipeline(contInfo.getPipelineID(),
contInfo.getReplicationType());
if(pipeline == null) {
@ -381,7 +382,7 @@ public HddsProtos.LifeCycleState updateContainerState(
.updateContainerState(containerInfo, event);
if (!updatedContainer.isContainerOpen()) {
Pipeline pipeline = pipelineSelector
.getPipeline(containerInfo.getPipelineName(),
.getPipeline(containerInfo.getPipelineID(),
containerInfo.getReplicationType());
pipelineSelector.closePipelineIfNoOpenContainers(pipeline);
}
@ -462,7 +463,7 @@ public ContainerWithPipeline getMatchingContainerWithPipeline(final long size,
return null;
}
Pipeline pipeline = pipelineSelector
.getPipeline(containerInfo.getPipelineName(),
.getPipeline(containerInfo.getPipelineID(),
containerInfo.getReplicationType());
if (pipeline == null) {
pipeline = pipelineSelector
@ -527,7 +528,8 @@ public void processContainerReports(DatanodeDetails datanodeDetails,
// If the container is closed, then state is already written to SCM
Pipeline pipeline =
pipelineSelector.getPipeline(newState.getPipelineName(),
pipelineSelector.getPipeline(
PipelineID.getFromProtobuf(newState.getPipelineID()),
newState.getReplicationType());
if(pipeline == null) {
pipeline = pipelineSelector
@ -570,7 +572,7 @@ private HddsProtos.SCMContainerInfo reconcileState(
HddsProtos.SCMContainerInfo.Builder builder =
HddsProtos.SCMContainerInfo.newBuilder();
builder.setContainerID(knownState.getContainerID())
.setPipelineName(knownState.getPipelineName())
.setPipelineID(knownState.getPipelineID())
.setReplicationType(knownState.getReplicationType())
.setReplicationFactor(knownState.getReplicationFactor());
@ -725,7 +727,7 @@ public void flushContainerInfo() throws IOException {
.setAllocatedBytes(info.getAllocatedBytes())
.setNumberOfKeys(oldInfo.getNumberOfKeys())
.setOwner(oldInfo.getOwner())
.setPipelineName(oldInfo.getPipelineName())
.setPipelineID(oldInfo.getPipelineID())
.setState(oldInfo.getState())
.setUsedBytes(oldInfo.getUsedBytes())
.setDeleteTransactionId(oldInfo.getDeleteTransactionId())

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.states.ContainerState;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@ -299,7 +300,7 @@ public ContainerWithPipeline allocateContainer(PipelineSelector selector, HddsPr
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(HddsProtos.LifeCycleState.ALLOCATED)
.setPipelineName(pipeline.getPipelineName())
.setPipelineID(pipeline.getId())
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
@ -467,12 +468,12 @@ public NavigableSet<ContainerID> getMatchingContainerIDs(
/**
* Returns a set of open ContainerIDs that reside on a pipeline.
*
* @param pipeline Pipeline of the Containers.
* @param pipelineID PipelineID of the Containers.
* @return Set of containers that match the specific query parameters.
*/
public NavigableSet<ContainerID> getMatchingContainerIDsByPipeline(String
pipeline) {
return containers.getOpenContainerIDsByPipeline(pipeline);
public NavigableSet<ContainerID> getMatchingContainerIDsByPipeline(PipelineID
pipelineID) {
return containers.getOpenContainerIDsByPipeline(pipelineID);
}
/**
@ -485,7 +486,8 @@ public NavigableSet<ContainerID> getMatchingContainerIDsByPipeline(String
public ContainerWithPipeline getContainer(PipelineSelector selector,
ContainerID containerID) throws IOException {
ContainerInfo info = containers.getContainerInfo(containerID.getId());
Pipeline pipeline = selector.getPipeline(info.getPipelineName(), info.getReplicationType());
Pipeline pipeline = selector.getPipeline(info.getPipelineID(),
info.getReplicationType());
return new ContainerWithPipeline(info, pipeline);
}

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
@ -132,7 +133,8 @@ public void close(SCMContainerInfo info,
for (DatanodeDetails datanodeDetails : pipeline.getMachines()) {
nodeManager.addDatanodeCommand(datanodeDetails.getUuid(),
new CloseContainerCommand(info.getContainerID(),
info.getReplicationType()));
info.getReplicationType(),
PipelineID.getFromProtobuf(info.getPipelineID())));
}
if (!commandIssued.containsKey(info.getContainerID())) {
commandIssued.put(info.getContainerID(),

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@ -96,7 +97,7 @@ public class ContainerStateMap {
//1. Dead datanode.
//2. Datanode out of space.
//3. Volume loss or volume out of space.
private final ContainerAttribute<String> openPipelineMap;
private final ContainerAttribute<PipelineID> openPipelineMap;
private final Map<ContainerID, ContainerInfo> containerMap;
// Map to hold replicas of given container.
@ -153,7 +154,7 @@ public void addContainer(ContainerInfo info)
factorMap.insert(info.getReplicationFactor(), id);
typeMap.insert(info.getReplicationType(), id);
if (info.isContainerOpen()) {
openPipelineMap.insert(info.getPipelineName(), id);
openPipelineMap.insert(info.getPipelineID(), id);
}
LOG.trace("Created container with {} successfully.", id);
}
@ -347,7 +348,7 @@ public void updateState(ContainerInfo info, LifeCycleState currentState,
// In case the container is set to closed state, it needs to be removed from
// the pipeline Map.
if (!info.isContainerOpen()) {
openPipelineMap.remove(info.getPipelineName(), id);
openPipelineMap.remove(info.getPipelineID(), id);
}
}
@ -382,14 +383,15 @@ NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) {
/**
* Returns Open containers in the SCM by the Pipeline
*
* @param pipeline - Pipeline name.
* @param pipelineID - Pipeline id.
* @return NavigableSet<ContainerID>
*/
public NavigableSet<ContainerID> getOpenContainerIDsByPipeline(String pipeline) {
Preconditions.checkNotNull(pipeline);
public NavigableSet<ContainerID> getOpenContainerIDsByPipeline(
PipelineID pipelineID) {
Preconditions.checkNotNull(pipelineID);
try (AutoCloseableLock lock = autoLock.acquire()) {
return openPipelineMap.getCollection(pipeline);
return openPipelineMap.getCollection(pipelineID);
}
}

View File

@ -107,6 +107,14 @@ public List<StorageReportProto> getStorageReports() {
}
}
/**
* Returns the last updated time of datanode info.
* @return the last updated time of datanode info.
*/
public long getLastStatsUpdatedTime() {
return lastStatsUpdatedTime;
}
@Override
public int hashCode() {
return super.hashCode();

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,8 +38,8 @@
public abstract class PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineManager.class);
private final List<Pipeline> activePipelines;
private final Map<String, Pipeline> pipelineMap;
private final List<PipelineID> activePipelines;
private final Map<PipelineID, Pipeline> pipelineMap;
private final AtomicInteger pipelineIndex;
private final Node2PipelineMap node2PipelineMap;
@ -64,7 +65,7 @@ public synchronized final Pipeline getPipeline(
if (pipeline != null) {
LOG.debug("re-used pipeline:{} for container with " +
"replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor);
pipeline.getId(), replicationType, replicationFactor);
}
if (pipeline == null) {
LOG.error("Get pipeline call failed. We are not able to find" +
@ -78,19 +79,19 @@ public synchronized final Pipeline getPipeline(
/**
* This function to get pipeline with given pipeline name.
*
* @param pipelineName
* @param id
* @return a Pipeline.
*/
public synchronized final Pipeline getPipeline(String pipelineName) {
public synchronized final Pipeline getPipeline(PipelineID id) {
Pipeline pipeline = null;
// 1. Check if pipeline already exists
if (pipelineMap.containsKey(pipelineName)) {
pipeline = pipelineMap.get(pipelineName);
LOG.debug("Returning pipeline for pipelineName:{}", pipelineName);
if (pipelineMap.containsKey(id)) {
pipeline = pipelineMap.get(id);
LOG.debug("Returning pipeline for pipelineName:{}", id);
return pipeline;
} else {
LOG.debug("Unable to find pipeline for pipelineName:{}", pipelineName);
LOG.debug("Unable to find pipeline for pipelineName:{}", id);
}
return pipeline;
}
@ -132,9 +133,10 @@ private Pipeline findOpenPipeline(
int nextIndex = sentinal;
for (; startIndex != nextIndex; nextIndex = getNextIndex()) {
// Just walk the list in a circular way.
Pipeline temp =
PipelineID id =
activePipelines
.get(nextIndex != sentinal ? nextIndex : startIndex);
Pipeline temp = pipelineMap.get(id);
// if we find an operational pipeline just return that.
if ((temp.getLifeCycleState() == LifeCycleState.OPEN) &&
(temp.getFactor() == factor) && (temp.getType() == type)) {
@ -165,9 +167,9 @@ public Pipeline createPipeline(ReplicationFactor replicationFactor,
if (pipeline != null) {
LOG.debug("created new pipeline:{} for container with "
+ "replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor);
activePipelines.add(pipeline);
pipelineMap.put(pipeline.getPipelineName(), pipeline);
pipeline.getId(), replicationType, replicationFactor);
activePipelines.add(pipeline.getId());
pipelineMap.put(pipeline.getId(), pipeline);
node2PipelineMap.addPipeline(pipeline);
}
return pipeline;
@ -178,7 +180,7 @@ public Pipeline createPipeline(ReplicationFactor replicationFactor,
* @param pipeline pipeline to be finalized
*/
public synchronized void finalizePipeline(Pipeline pipeline) {
activePipelines.remove(pipeline);
activePipelines.remove(pipeline.getId());
}
/**
@ -186,7 +188,7 @@ public synchronized void finalizePipeline(Pipeline pipeline) {
* @param pipeline
*/
public void closePipeline(Pipeline pipeline) {
pipelineMap.remove(pipeline.getPipelineName());
pipelineMap.remove(pipeline.getId());
node2PipelineMap.removePipeline(pipeline);
}
@ -194,12 +196,12 @@ public void closePipeline(Pipeline pipeline) {
* list members in the pipeline .
* @return the datanode
*/
public abstract List<DatanodeDetails> getMembers(String pipelineID)
public abstract List<DatanodeDetails> getMembers(PipelineID pipelineID)
throws IOException;
/**
* Update the datanode list of the pipeline.
*/
public abstract void updatePipeline(String pipelineID,
public abstract void updatePipeline(PipelineID pipelineID,
List<DatanodeDetails> newDatanodes) throws IOException;
}

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
@ -184,13 +185,13 @@ private void initializeStateMachine() {
*/
public static Pipeline newPipelineFromNodes(
List<DatanodeDetails> nodes, ReplicationType replicationType,
ReplicationFactor replicationFactor, String name) {
ReplicationFactor replicationFactor, PipelineID id) {
Preconditions.checkNotNull(nodes);
Preconditions.checkArgument(nodes.size() > 0);
String leaderId = nodes.get(0).getUuidString();
// A new pipeline always starts in allocated state
Pipeline pipeline = new Pipeline(leaderId, LifeCycleState.ALLOCATED,
replicationType, replicationFactor, name);
replicationType, replicationFactor, id);
for (DatanodeDetails node : nodes) {
pipeline.addMember(node);
}
@ -304,16 +305,16 @@ public Pipeline getReplicationPipeline(ReplicationType replicationType,
* This function to return pipeline for given pipeline name and replication
* type.
*/
public Pipeline getPipeline(String pipelineName,
public Pipeline getPipeline(PipelineID pipelineID,
ReplicationType replicationType) throws IOException {
if (pipelineName == null) {
if (pipelineID == null) {
return null;
}
PipelineManager manager = getPipelineManager(replicationType);
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Getting replication pipeline forReplicationType {} :" +
" pipelineName:{}", replicationType, pipelineName);
return manager.getPipeline(pipelineName);
" pipelineName:{}", replicationType, pipelineID);
return manager.getPipeline(pipelineID);
}
/**
@ -322,7 +323,7 @@ public Pipeline getPipeline(String pipelineName,
public void finalizePipeline(Pipeline pipeline) throws IOException {
PipelineManager manager = getPipelineManager(pipeline.getType());
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Finalizing pipeline. pipelineID: {}", pipeline.getPipelineName());
LOG.debug("Finalizing pipeline. pipelineID: {}", pipeline.getId());
// Remove the pipeline from active allocation
manager.finalizePipeline(pipeline);
updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE);
@ -337,10 +338,10 @@ public void closePipelineIfNoOpenContainers(Pipeline pipeline) throws IOExceptio
return;
}
NavigableSet<ContainerID> containerIDS = containerStateManager
.getMatchingContainerIDsByPipeline(pipeline.getPipelineName());
.getMatchingContainerIDsByPipeline(pipeline.getId());
if (containerIDS.size() == 0) {
updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CLOSE);
LOG.info("Closing pipeline. pipelineID: {}", pipeline.getPipelineName());
LOG.info("Closing pipeline. pipelineID: {}", pipeline.getId());
}
}
@ -350,10 +351,10 @@ public void closePipelineIfNoOpenContainers(Pipeline pipeline) throws IOExceptio
private void closePipeline(Pipeline pipeline) {
PipelineManager manager = getPipelineManager(pipeline.getType());
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getPipelineName());
LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getId());
NavigableSet<ContainerID> containers =
containerStateManager
.getMatchingContainerIDsByPipeline(pipeline.getPipelineName());
.getMatchingContainerIDsByPipeline(pipeline.getId());
Preconditions.checkArgument(containers.size() == 0);
manager.closePipeline(pipeline);
}
@ -361,7 +362,7 @@ private void closePipeline(Pipeline pipeline) {
private void closeContainersByPipeline(Pipeline pipeline) {
NavigableSet<ContainerID> containers =
containerStateManager
.getMatchingContainerIDsByPipeline(pipeline.getPipelineName());
.getMatchingContainerIDsByPipeline(pipeline.getId());
for (ContainerID id : containers) {
eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id);
}
@ -372,7 +373,7 @@ private void closeContainersByPipeline(Pipeline pipeline) {
*/
public List<DatanodeDetails> getDatanodes(ReplicationType replicationType,
String pipelineID) throws IOException {
PipelineID pipelineID) throws IOException {
PipelineManager manager = getPipelineManager(replicationType);
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Getting data nodes from pipeline : {}", pipelineID);
@ -383,7 +384,7 @@ public List<DatanodeDetails> getDatanodes(ReplicationType replicationType,
* Update the datanodes in the list of the pipeline.
*/
public void updateDatanodes(ReplicationType replicationType, String
public void updateDatanodes(ReplicationType replicationType, PipelineID
pipelineID, List<DatanodeDetails> newDatanodes) throws IOException {
PipelineManager manager = getPipelineManager(replicationType);
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
@ -423,7 +424,7 @@ public void updatePipelineState(Pipeline pipeline,
String error = String.format("Failed to update pipeline state %s, " +
"reason: invalid state transition from state: %s upon " +
"event: %s.",
pipeline.getPipelineName(), pipeline.getLifeCycleState(), event);
pipeline.getId(), pipeline.getLifeCycleState(), event);
LOG.error(error);
throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
}

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@ -38,7 +39,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
/**
* Implementation of {@link PipelineManager}.
@ -48,7 +48,6 @@
public class RatisManagerImpl extends PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(RatisManagerImpl.class);
private static final String PREFIX = "Ratis-";
private final Configuration conf;
private final NodeManager nodeManager;
private final Set<DatanodeDetails> ratisMembers;
@ -87,12 +86,11 @@ public Pipeline allocatePipeline(ReplicationFactor factor) {
// once a datanode has been added to a pipeline, exclude it from
// further allocations
ratisMembers.addAll(newNodesList);
LOG.info("Allocating a new ratis pipeline of size: {}", count);
// Start all pipeline names with "Ratis", easy to grep the logs.
String pipelineName = PREFIX +
UUID.randomUUID().toString().substring(PREFIX.length());
PipelineID pipelineID = PipelineID.randomId();
LOG.info("Allocating a new ratis pipeline of size: {} id: {}",
count, pipelineID);
return PipelineSelector.newPipelineFromNodes(newNodesList,
ReplicationType.RATIS, factor, pipelineName);
ReplicationType.RATIS, factor, pipelineID);
}
}
}
@ -103,7 +101,7 @@ public void initializePipeline(Pipeline pipeline) throws IOException {
//TODO:move the initialization from SCM to client
try (XceiverClientRatis client =
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
client.createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
client.createPipeline(pipeline);
}
}
@ -126,7 +124,7 @@ public void closePipeline(Pipeline pipeline) {
* @return the datanode
*/
@Override
public List<DatanodeDetails> getMembers(String pipelineID)
public List<DatanodeDetails> getMembers(PipelineID pipelineID)
throws IOException {
return null;
}
@ -138,7 +136,7 @@ public List<DatanodeDetails> getMembers(String pipelineID)
* @param newDatanodes
*/
@Override
public void updatePipeline(String pipelineID,
public void updatePipeline(PipelineID pipelineID,
List<DatanodeDetails> newDatanodes)
throws IOException {

View File

@ -18,6 +18,7 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@ -36,7 +37,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
/**
* Standalone Manager Impl to prove that pluggable interface
@ -85,11 +85,11 @@ public Pipeline allocatePipeline(ReplicationFactor factor) {
// once a datanode has been added to a pipeline, exclude it from
// further allocations
standAloneMembers.addAll(newNodesList);
LOG.info("Allocating a new standalone pipeline of size: {}", count);
String pipelineName =
"SA-" + UUID.randomUUID().toString().substring(3);
PipelineID pipelineID = PipelineID.randomId();
LOG.info("Allocating a new standalone pipeline of size: {} id: {}",
count, pipelineID);
return PipelineSelector.newPipelineFromNodes(newNodesList,
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineID);
}
}
}
@ -118,7 +118,7 @@ public void closePipeline(Pipeline pipeline) {
* @return the datanode
*/
@Override
public List<DatanodeDetails> getMembers(String pipelineID)
public List<DatanodeDetails> getMembers(PipelineID pipelineID)
throws IOException {
return null;
}
@ -130,7 +130,7 @@ public List<DatanodeDetails> getMembers(String pipelineID)
* @param newDatanodes
*/
@Override
public void updatePipeline(String pipelineID, List<DatanodeDetails>
public void updatePipeline(PipelineID pipelineID, List<DatanodeDetails>
newDatanodes) throws IOException {
}

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -381,11 +382,12 @@ public void testDeletedBlockTransactions() throws IOException {
private void mockContainerInfo(long containerID, DatanodeDetails dd) throws IOException {
Pipeline pipeline =
new Pipeline("fake", LifeCycleState.OPEN,
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "fake");
ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
PipelineID.randomId());
pipeline.addMember(dd);
ContainerInfo.Builder builder = new ContainerInfo.Builder();
builder.setPipelineName(pipeline.getPipelineName())
builder.setPipelineID(pipeline.getId())
.setReplicationType(pipeline.getType())
.setReplicationFactor(pipeline.getFactor());

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo
.Builder;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.replication
.ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
@ -109,7 +110,8 @@ public void test() throws IOException {
PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class);
Pipeline pipeline = new Pipeline("leader", LifeCycleState.CLOSED,
ReplicationType.STAND_ALONE, ReplicationFactor.THREE, "pipeline1");
ReplicationType.STAND_ALONE, ReplicationFactor.THREE,
PipelineID.randomId());
when(pipelineSelector.getReplicationPipeline(ReplicationType.STAND_ALONE,
ReplicationFactor.THREE)).thenReturn(pipeline);

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
@ -206,10 +207,10 @@ public static Pipeline createPipeline(Iterable<DatanodeDetails> ids)
final Iterator<DatanodeDetails> i = ids.iterator();
Preconditions.checkArgument(i.hasNext());
final DatanodeDetails leader = i.next();
String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(3);
final Pipeline pipeline =
new Pipeline(leader.getUuidString(), LifeCycleState.OPEN,
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
PipelineID.randomId());
pipeline.addMember(leader);
for (; i.hasNext(); ) {
pipeline.addMember(i.next());

View File

@ -24,6 +24,7 @@
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -1123,7 +1124,8 @@ public void testHandlingSCMCommandEvent() {
.register(datanodeDetails, TestUtils.createNodeReport(report));
eq.fireEvent(DATANODE_COMMAND,
new CommandForDatanode<>(datanodeDetails.getUuid(),
new CloseContainerCommand(1L, ReplicationType.STAND_ALONE)));
new CloseContainerCommand(1L, ReplicationType.STAND_ALONE,
PipelineID.randomId())));
eq.processAll(1000L);
List<SCMCommand> command =

View File

@ -51,6 +51,7 @@
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.statemachine
@ -430,6 +431,7 @@ private void addScmCommands() {
CloseContainerCommandProto.newBuilder().setCmdId(1)
.setContainerID(1)
.setReplicationType(ReplicationType.RATIS)
.setPipelineID(PipelineID.randomId().getProtobuf())
.build())
.setCommandType(Type.closeContainerCommand)
.build();

View File

@ -87,7 +87,7 @@ public static void shutdown() {
public void testPipelineMap() throws IOException {
NavigableSet<ContainerID> set = stateMap.getOpenContainerIDsByPipeline(
ratisContainer.getPipeline().getPipelineName());
ratisContainer.getPipeline().getId());
long cId = ratisContainer.getContainerInfo().getContainerID();
Assert.assertEquals(1, set.size());
@ -100,8 +100,8 @@ public void testPipelineMap() throws IOException {
Set<Pipeline> pipelines = mapping.getPipelineSelector()
.getNode2PipelineMap().getPipelines(dns.get(0).getUuid());
Assert.assertEquals(1, pipelines.size());
pipelines.forEach(p -> Assert.assertEquals(p.getPipelineName(),
ratisContainer.getPipeline().getPipelineName()));
pipelines.forEach(p -> Assert.assertEquals(p.getId(),
ratisContainer.getPipeline().getId()));
// Now close the container and it should not show up while fetching
@ -115,7 +115,7 @@ public void testPipelineMap() throws IOException {
mapping
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
NavigableSet<ContainerID> set2 = stateMap.getOpenContainerIDsByPipeline(
ratisContainer.getPipeline().getPipelineName());
ratisContainer.getPipeline().getId());
Assert.assertEquals(0, set2.size());
try {

View File

@ -93,7 +93,7 @@ public static void shutdown() {
@Test
public void testPipelineCloseWithClosedContainer() throws IOException {
NavigableSet<ContainerID> set = stateMap.getOpenContainerIDsByPipeline(
ratisContainer1.getPipeline().getPipelineName());
ratisContainer1.getPipeline().getId());
long cId = ratisContainer1.getContainerInfo().getContainerID();
Assert.assertEquals(1, set.size());
@ -111,12 +111,12 @@ public void testPipelineCloseWithClosedContainer() throws IOException {
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
NavigableSet<ContainerID> setClosed = stateMap.getOpenContainerIDsByPipeline(
ratisContainer1.getPipeline().getPipelineName());
ratisContainer1.getPipeline().getId());
Assert.assertEquals(0, setClosed.size());
pipelineSelector.finalizePipeline(ratisContainer1.getPipeline());
Pipeline pipeline1 = pipelineSelector
.getPipeline(ratisContainer1.getPipeline().getPipelineName(),
.getPipeline(ratisContainer1.getPipeline().getId(),
ratisContainer1.getContainerInfo().getReplicationType());
Assert.assertNull(pipeline1);
Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(),
@ -132,7 +132,7 @@ public void testPipelineCloseWithClosedContainer() throws IOException {
public void testPipelineCloseWithOpenContainer() throws IOException,
TimeoutException, InterruptedException {
NavigableSet<ContainerID> setOpen = stateMap.getOpenContainerIDsByPipeline(
ratisContainer2.getPipeline().getPipelineName());
ratisContainer2.getPipeline().getId());
Assert.assertEquals(1, setOpen.size());
long cId2 = ratisContainer2.getContainerInfo().getContainerID();
@ -144,7 +144,7 @@ public void testPipelineCloseWithOpenContainer() throws IOException,
Assert.assertEquals(ratisContainer2.getPipeline().getLifeCycleState(),
HddsProtos.LifeCycleState.CLOSING);
Pipeline pipeline2 = pipelineSelector
.getPipeline(ratisContainer2.getPipeline().getPipelineName(),
.getPipeline(ratisContainer2.getPipeline().getId(),
ratisContainer2.getContainerInfo().getReplicationType());
Assert.assertEquals(pipeline2.getLifeCycleState(),
HddsProtos.LifeCycleState.CLOSING);

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
@ -95,7 +96,7 @@ public void testStartMultipleDatanodes() throws Exception {
new Pipeline(datanodeDetails.getUuidString(),
HddsProtos.LifeCycleState.OPEN,
HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.ONE, "test");
HddsProtos.ReplicationFactor.ONE, PipelineID.randomId());
pipeline.addMember(datanodeDetails);
// Verify client is able to connect to the container

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.container;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hdds.client.BlockID;
@ -136,10 +137,10 @@ public static Pipeline createPipeline(
final Iterator<DatanodeDetails> i = ids.iterator();
Preconditions.checkArgument(i.hasNext());
final DatanodeDetails leader = i.next();
String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(3);
final Pipeline pipeline =
new Pipeline(leader.getUuidString(), LifeCycleState.OPEN,
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
PipelineID.randomId());
pipeline.addMember(leader);
for(; i.hasNext();) {
pipeline.addMember(i.next());

View File

@ -23,7 +23,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -108,10 +108,11 @@ public void testIfCloseContainerCommandHandlerIsInvoked() throws Exception {
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = omKeyLocationInfo.getContainerID();
List<DatanodeDetails> datanodes = cluster.getStorageContainerManager()
Pipeline pipeline = cluster.getStorageContainerManager()
.getScmContainerManager().getContainerWithPipeline(containerID)
.getPipeline().getMachines();
Assert.assertTrue(datanodes.size() == 1);
.getPipeline();
List<DatanodeDetails> datanodes = pipeline.getMachines();
Assert.assertEquals(datanodes.size(), 1);
DatanodeDetails datanodeDetails = datanodes.get(0);
HddsDatanodeService datanodeService = null;
@ -131,7 +132,7 @@ public void testIfCloseContainerCommandHandlerIsInvoked() throws Exception {
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(datanodeDetails.getUuid(),
new CloseContainerCommand(containerID,
HddsProtos.ReplicationType.STAND_ALONE));
HddsProtos.ReplicationType.STAND_ALONE, pipeline.getId()));
GenericTestUtils
.waitFor(() -> isContainerClosed(cluster, containerID, datanodeDetails),
500, 5 * 1000);
@ -142,7 +143,7 @@ public void testIfCloseContainerCommandHandlerIsInvoked() throws Exception {
}
@Test
public void testCloseContainerViaStandaAlone()
public void testCloseContainerViaStandAlone()
throws IOException, TimeoutException, InterruptedException {
OzoneOutputStream key = objectStore.getVolume("test").getBucket("test")
@ -163,10 +164,11 @@ public void testCloseContainerViaStandaAlone()
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = omKeyLocationInfo.getContainerID();
List<DatanodeDetails> datanodes = cluster.getStorageContainerManager()
Pipeline pipeline = cluster.getStorageContainerManager()
.getScmContainerManager().getContainerWithPipeline(containerID)
.getPipeline().getMachines();
Assert.assertTrue(datanodes.size() == 1);
.getPipeline();
List<DatanodeDetails> datanodes = pipeline.getMachines();
Assert.assertEquals(datanodes.size(), 1);
DatanodeDetails datanodeDetails = datanodes.get(0);
Assert
@ -178,7 +180,7 @@ public void testCloseContainerViaStandaAlone()
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(datanodeDetails.getUuid(),
new CloseContainerCommand(containerID,
HddsProtos.ReplicationType.STAND_ALONE));
HddsProtos.ReplicationType.STAND_ALONE, pipeline.getId()));
GenericTestUtils
.waitFor(() -> isContainerClosed(cluster, containerID, datanodeDetails),
@ -216,10 +218,11 @@ public void testCloseContainerViaRatis() throws IOException,
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = omKeyLocationInfo.getContainerID();
List<DatanodeDetails> datanodes = cluster.getStorageContainerManager()
Pipeline pipeline = cluster.getStorageContainerManager()
.getScmContainerManager().getContainerWithPipeline(containerID)
.getPipeline().getMachines();
Assert.assertTrue(datanodes.size() == 3);
.getPipeline();
List<DatanodeDetails> datanodes = pipeline.getMachines();
Assert.assertEquals(3, datanodes.size());
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(OzoneContainer.LOG);
@ -230,7 +233,7 @@ public void testCloseContainerViaRatis() throws IOException,
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(details.getUuid(),
new CloseContainerCommand(containerID,
HddsProtos.ReplicationType.RATIS));
HddsProtos.ReplicationType.RATIS, pipeline.getId()));
}
for (DatanodeDetails datanodeDetails : datanodes) {

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
@ -33,7 +34,6 @@
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
@ -80,6 +80,9 @@ public void test() throws IOException, TimeoutException, InterruptedException,
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = omKeyLocationInfo.getContainerID();
Pipeline pipeline = cluster.getStorageContainerManager()
.getScmContainerManager().getContainerWithPipeline(containerID)
.getPipeline();
Assert.assertFalse(isContainerClosed(cluster, containerID));
@ -89,7 +92,7 @@ public void test() throws IOException, TimeoutException, InterruptedException,
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(datanodeDetails.getUuid(),
new CloseContainerCommand(containerID,
HddsProtos.ReplicationType.STAND_ALONE));
HddsProtos.ReplicationType.STAND_ALONE, pipeline.getId()));
GenericTestUtils.waitFor(() -> isContainerClosed(cluster, containerID),
500,

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.util.Time;
@ -60,7 +61,7 @@ public void initialize() throws IOException {
try {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(CLOSED)
.setPipelineName(pipeline.getPipelineName())
.setPipelineID(pipeline.getId())
.setReplicationType(pipeline.getType())
.setReplicationFactor(pipeline.getFactor())
// This is bytes allocated for blocks inside container, not the
@ -83,7 +84,7 @@ public void initialize() throws IOException {
try {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(OPEN)
.setPipelineName(pipeline.getPipelineName())
.setPipelineID(pipeline.getId())
.setReplicationType(pipeline.getType())
.setReplicationFactor(pipeline.getFactor())
// This is bytes allocated for blocks inside container, not the
@ -105,7 +106,7 @@ public void initialize() throws IOException {
try {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(OPEN)
.setPipelineName(pipeline.getPipelineName())
.setPipelineID(pipeline.getId())
.setReplicationType(pipeline.getType())
.setReplicationFactor(pipeline.getFactor())
// This is bytes allocated for blocks inside container, not the
@ -154,10 +155,10 @@ public static Pipeline createPipeline(String containerName,
final Iterator<DatanodeDetails> i = ids.iterator();
Preconditions.checkArgument(i.hasNext());
final DatanodeDetails leader = i.next();
String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(5);
final Pipeline pipeline =
new Pipeline(leader.getUuidString(), OPEN,
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
PipelineID.randomId());
pipeline.addMember(leader);
for (; i.hasNext();) {
pipeline.addMember(i.next());
@ -172,7 +173,7 @@ public void createContainerBenchMark(BenchMarkContainerStateMap state,
int cid = state.containerID.incrementAndGet();
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(CLOSED)
.setPipelineName(pipeline.getPipelineName())
.setPipelineID(pipeline.getId())
.setReplicationType(pipeline.getType())
.setReplicationFactor(pipeline.getFactor())
// This is bytes allocated for blocks inside container, not the

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.genesis;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine
@ -92,7 +93,7 @@ public void initialize() throws IOException {
datanodeUuid = UUID.randomUUID().toString();
pipeline = new Pipeline("127.0.0.1",
LifeCycleState.OPEN, ReplicationType.STAND_ALONE,
ReplicationFactor.ONE, "SA-" + UUID.randomUUID());
ReplicationFactor.ONE, PipelineID.randomId());
// 1 MB of data
data = ByteString.copyFromUtf8(RandomStringUtils.randomAscii(1048576));

View File

@ -97,7 +97,7 @@
<ldap-api.version>1.0.0-M33</ldap-api.version>
<!-- Apache Ratis version -->
<ratis.version>0.3.0-c242317-SNAPSHOT</ratis.version>
<ratis.version>0.3.0-e4a016f-SNAPSHOT</ratis.version>
<jcache.version>1.0-alpha-1</jcache.version>
<ehcache.version>3.3.1</ehcache.version>
<hikari.version>2.4.12</hikari.version>