HDDS-141. Remove PipeLine Class from SCM and move the data field in the Pipeline to ContainerInfo. Contributed by Shashikant Banerjee.
This commit is contained in:
parent
f34744603e
commit
2a4632d3d7
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.hdds.scm.container.common.helpers;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.PropertyAccessor;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.ObjectWriter;
|
||||
@ -30,6 +31,7 @@
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
|
||||
import static java.lang.Math.max;
|
||||
@ -63,6 +65,13 @@ public class ContainerInfo
|
||||
private String owner;
|
||||
private long containerID;
|
||||
private long deleteTransactionId;
|
||||
/**
|
||||
* Allows you to maintain private data on ContainerInfo. This is not
|
||||
* serialized via protobuf, just allows us to maintain some private data.
|
||||
*/
|
||||
@JsonIgnore
|
||||
private byte[] data;
|
||||
|
||||
ContainerInfo(
|
||||
long containerID,
|
||||
HddsProtos.LifeCycleState state,
|
||||
@ -295,6 +304,29 @@ public String toJsonString() throws IOException {
|
||||
return WRITER.writeValueAsString(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns private data that is set on this containerInfo.
|
||||
*
|
||||
* @return blob, the user can interpret it any way they like.
|
||||
*/
|
||||
public byte[] getData() {
|
||||
if (this.data != null) {
|
||||
return Arrays.copyOf(this.data, this.data.length);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set private data on ContainerInfo object.
|
||||
*
|
||||
* @param data -- private data.
|
||||
*/
|
||||
public void setData(byte[] data) {
|
||||
if (data != null) {
|
||||
this.data = Arrays.copyOf(data, data.length);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Builder class for ContainerInfo.
|
||||
*/
|
||||
|
@ -27,14 +27,14 @@
|
||||
import com.fasterxml.jackson.databind.ser.FilterProvider;
|
||||
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
|
||||
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -46,7 +46,7 @@ public class Pipeline {
|
||||
|
||||
static {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
String[] ignorableFieldNames = {"data"};
|
||||
String[] ignorableFieldNames = {"leaderID", "datanodes"};
|
||||
FilterProvider filters = new SimpleFilterProvider()
|
||||
.addFilter(PIPELINE_INFO, SimpleBeanPropertyFilter
|
||||
.serializeAllExcept(ignorableFieldNames));
|
||||
@ -57,38 +57,66 @@ public class Pipeline {
|
||||
WRITER = mapper.writer(filters);
|
||||
}
|
||||
|
||||
private PipelineChannel pipelineChannel;
|
||||
/**
|
||||
* Allows you to maintain private data on pipelines. This is not serialized
|
||||
* via protobuf, just allows us to maintain some private data.
|
||||
*/
|
||||
@JsonIgnore
|
||||
private byte[] data;
|
||||
private String leaderID;
|
||||
@JsonIgnore
|
||||
private Map<String, DatanodeDetails> datanodes;
|
||||
private HddsProtos.LifeCycleState lifeCycleState;
|
||||
private HddsProtos.ReplicationType type;
|
||||
private HddsProtos.ReplicationFactor factor;
|
||||
private String name;
|
||||
// TODO: change to long based id
|
||||
//private long id;
|
||||
|
||||
/**
|
||||
* Constructs a new pipeline data structure.
|
||||
*
|
||||
* @param pipelineChannel - transport information for this container
|
||||
* @param leaderID - Leader datanode id
|
||||
* @param lifeCycleState - Pipeline State
|
||||
* @param replicationType - Replication protocol
|
||||
* @param replicationFactor - replication count on datanodes
|
||||
* @param name - pipelineName
|
||||
*/
|
||||
public Pipeline(PipelineChannel pipelineChannel) {
|
||||
this.pipelineChannel = pipelineChannel;
|
||||
data = null;
|
||||
public Pipeline(String leaderID, HddsProtos.LifeCycleState lifeCycleState,
|
||||
HddsProtos.ReplicationType replicationType,
|
||||
HddsProtos.ReplicationFactor replicationFactor, String name) {
|
||||
this.leaderID = leaderID;
|
||||
this.lifeCycleState = lifeCycleState;
|
||||
this.type = replicationType;
|
||||
this.factor = replicationFactor;
|
||||
this.name = name;
|
||||
datanodes = new TreeMap<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets pipeline object from protobuf.
|
||||
*
|
||||
* @param pipeline - ProtoBuf definition for the pipeline.
|
||||
* @param pipelineProto - ProtoBuf definition for the pipeline.
|
||||
* @return Pipeline Object
|
||||
*/
|
||||
public static Pipeline getFromProtoBuf(HddsProtos.Pipeline pipeline) {
|
||||
Preconditions.checkNotNull(pipeline);
|
||||
PipelineChannel pipelineChannel =
|
||||
PipelineChannel.getFromProtoBuf(pipeline.getPipelineChannel());
|
||||
return new Pipeline(pipelineChannel);
|
||||
public static Pipeline getFromProtoBuf(
|
||||
HddsProtos.Pipeline pipelineProto) {
|
||||
Preconditions.checkNotNull(pipelineProto);
|
||||
Pipeline pipeline =
|
||||
new Pipeline(pipelineProto.getLeaderID(),
|
||||
pipelineProto.getState(),
|
||||
pipelineProto.getType(),
|
||||
pipelineProto.getFactor(),
|
||||
pipelineProto.getName());
|
||||
|
||||
for (HddsProtos.DatanodeDetailsProto dataID :
|
||||
pipelineProto.getMembersList()) {
|
||||
pipeline.addMember(DatanodeDetails.getFromProtoBuf(dataID));
|
||||
}
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
/**
|
||||
* returns the replication count.
|
||||
* @return Replication Factor
|
||||
*/
|
||||
public HddsProtos.ReplicationFactor getFactor() {
|
||||
return pipelineChannel.getFactor();
|
||||
return factor;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -98,19 +126,34 @@ public HddsProtos.ReplicationFactor getFactor() {
|
||||
*/
|
||||
@JsonIgnore
|
||||
public DatanodeDetails getLeader() {
|
||||
return pipelineChannel.getDatanodes().get(pipelineChannel.getLeaderID());
|
||||
return getDatanodes().get(leaderID);
|
||||
}
|
||||
|
||||
public void addMember(DatanodeDetails datanodeDetails) {
|
||||
datanodes.put(datanodeDetails.getUuid().toString(),
|
||||
datanodeDetails);
|
||||
}
|
||||
|
||||
public Map<String, DatanodeDetails> getDatanodes() {
|
||||
return datanodes;
|
||||
}
|
||||
/**
|
||||
* Returns the leader host.
|
||||
*
|
||||
* @return First Machine.
|
||||
*/
|
||||
public String getLeaderHost() {
|
||||
return pipelineChannel.getDatanodes()
|
||||
.get(pipelineChannel.getLeaderID()).getHostName();
|
||||
return getDatanodes()
|
||||
.get(leaderID).getHostName();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return lead
|
||||
*/
|
||||
public String getLeaderID() {
|
||||
return leaderID;
|
||||
}
|
||||
/**
|
||||
* Returns all machines that make up this pipeline.
|
||||
*
|
||||
@ -118,7 +161,7 @@ public String getLeaderHost() {
|
||||
*/
|
||||
@JsonIgnore
|
||||
public List<DatanodeDetails> getMachines() {
|
||||
return new ArrayList<>(pipelineChannel.getDatanodes().values());
|
||||
return new ArrayList<>(getDatanodes().values());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -128,7 +171,7 @@ public List<DatanodeDetails> getMachines() {
|
||||
*/
|
||||
public List<String> getDatanodeHosts() {
|
||||
List<String> dataHosts = new ArrayList<>();
|
||||
for (DatanodeDetails id : pipelineChannel.getDatanodes().values()) {
|
||||
for (DatanodeDetails id :getDatanodes().values()) {
|
||||
dataHosts.add(id.getHostName());
|
||||
}
|
||||
return dataHosts;
|
||||
@ -143,46 +186,31 @@ public List<String> getDatanodeHosts() {
|
||||
public HddsProtos.Pipeline getProtobufMessage() {
|
||||
HddsProtos.Pipeline.Builder builder =
|
||||
HddsProtos.Pipeline.newBuilder();
|
||||
builder.setPipelineChannel(this.pipelineChannel.getProtobufMessage());
|
||||
for (DatanodeDetails datanode : datanodes.values()) {
|
||||
builder.addMembers(datanode.getProtoBufMessage());
|
||||
}
|
||||
builder.setLeaderID(leaderID);
|
||||
|
||||
if (this.getLifeCycleState() != null) {
|
||||
builder.setState(this.getLifeCycleState());
|
||||
}
|
||||
if (this.getType() != null) {
|
||||
builder.setType(this.getType());
|
||||
}
|
||||
|
||||
if (this.getFactor() != null) {
|
||||
builder.setFactor(this.getFactor());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns private data that is set on this pipeline.
|
||||
*
|
||||
* @return blob, the user can interpret it any way they like.
|
||||
*/
|
||||
public byte[] getData() {
|
||||
if (this.data != null) {
|
||||
return Arrays.copyOf(this.data, this.data.length);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public PipelineChannel getPipelineChannel() {
|
||||
return pipelineChannel;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set private data on pipeline.
|
||||
*
|
||||
* @param data -- private data.
|
||||
*/
|
||||
public void setData(byte[] data) {
|
||||
if (data != null) {
|
||||
this.data = Arrays.copyOf(data, data.length);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the State of the pipeline.
|
||||
*
|
||||
* @return - LifeCycleStates.
|
||||
*/
|
||||
public HddsProtos.LifeCycleState getLifeCycleState() {
|
||||
return pipelineChannel.getLifeCycleState();
|
||||
return lifeCycleState;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -191,7 +219,7 @@ public HddsProtos.LifeCycleState getLifeCycleState() {
|
||||
* @return - Name of the pipeline
|
||||
*/
|
||||
public String getPipelineName() {
|
||||
return pipelineChannel.getName();
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -200,16 +228,16 @@ public String getPipelineName() {
|
||||
* @return type - Standalone, Ratis, Chained.
|
||||
*/
|
||||
public HddsProtos.ReplicationType getType() {
|
||||
return pipelineChannel.getType();
|
||||
return type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder b = new StringBuilder(getClass().getSimpleName())
|
||||
.append("[");
|
||||
pipelineChannel.getDatanodes().keySet().stream()
|
||||
getDatanodes().keySet().stream()
|
||||
.forEach(id -> b.
|
||||
append(id.endsWith(pipelineChannel.getLeaderID()) ? "*" + id : id));
|
||||
append(id.endsWith(getLeaderID()) ? "*" + id : id));
|
||||
b.append(" name:").append(getPipelineName());
|
||||
if (getType() != null) {
|
||||
b.append(" type:").append(getType().toString());
|
||||
|
@ -1,124 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.container.common.helpers;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
/**
|
||||
* PipelineChannel information for a {@link Pipeline}.
|
||||
*/
|
||||
public class PipelineChannel {
|
||||
@JsonIgnore
|
||||
private String leaderID;
|
||||
@JsonIgnore
|
||||
private Map<String, DatanodeDetails> datanodes;
|
||||
private LifeCycleState lifeCycleState;
|
||||
private ReplicationType type;
|
||||
private ReplicationFactor factor;
|
||||
private String name;
|
||||
// TODO: change to long based id
|
||||
//private long id;
|
||||
|
||||
public PipelineChannel(String leaderID, LifeCycleState lifeCycleState,
|
||||
ReplicationType replicationType, ReplicationFactor replicationFactor,
|
||||
String name) {
|
||||
this.leaderID = leaderID;
|
||||
this.lifeCycleState = lifeCycleState;
|
||||
this.type = replicationType;
|
||||
this.factor = replicationFactor;
|
||||
this.name = name;
|
||||
datanodes = new TreeMap<>();
|
||||
}
|
||||
|
||||
public String getLeaderID() {
|
||||
return leaderID;
|
||||
}
|
||||
|
||||
public Map<String, DatanodeDetails> getDatanodes() {
|
||||
return datanodes;
|
||||
}
|
||||
|
||||
public LifeCycleState getLifeCycleState() {
|
||||
return lifeCycleState;
|
||||
}
|
||||
|
||||
public ReplicationType getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public ReplicationFactor getFactor() {
|
||||
return factor;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void addMember(DatanodeDetails datanodeDetails) {
|
||||
datanodes.put(datanodeDetails.getUuid().toString(),
|
||||
datanodeDetails);
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public HddsProtos.PipelineChannel getProtobufMessage() {
|
||||
HddsProtos.PipelineChannel.Builder builder =
|
||||
HddsProtos.PipelineChannel.newBuilder();
|
||||
for (DatanodeDetails datanode : datanodes.values()) {
|
||||
builder.addMembers(datanode.getProtoBufMessage());
|
||||
}
|
||||
builder.setLeaderID(leaderID);
|
||||
|
||||
if (this.getLifeCycleState() != null) {
|
||||
builder.setState(this.getLifeCycleState());
|
||||
}
|
||||
if (this.getType() != null) {
|
||||
builder.setType(this.getType());
|
||||
}
|
||||
|
||||
if (this.getFactor() != null) {
|
||||
builder.setFactor(this.getFactor());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public static PipelineChannel getFromProtoBuf(
|
||||
HddsProtos.PipelineChannel transportProtos) {
|
||||
Preconditions.checkNotNull(transportProtos);
|
||||
PipelineChannel pipelineChannel =
|
||||
new PipelineChannel(transportProtos.getLeaderID(),
|
||||
transportProtos.getState(),
|
||||
transportProtos.getType(),
|
||||
transportProtos.getFactor(),
|
||||
transportProtos.getName());
|
||||
|
||||
for (HddsProtos.DatanodeDetailsProto dataID :
|
||||
transportProtos.getMembersList()) {
|
||||
pipelineChannel.addMember(DatanodeDetails.getFromProtoBuf(dataID));
|
||||
}
|
||||
return pipelineChannel;
|
||||
}
|
||||
}
|
@ -40,7 +40,7 @@ message Port {
|
||||
required uint32 value = 2;
|
||||
}
|
||||
|
||||
message PipelineChannel {
|
||||
message Pipeline {
|
||||
required string leaderID = 1;
|
||||
repeated DatanodeDetailsProto members = 2;
|
||||
optional LifeCycleState state = 3 [default = OPEN];
|
||||
@ -49,12 +49,6 @@ message PipelineChannel {
|
||||
optional string name = 6;
|
||||
}
|
||||
|
||||
// A pipeline is composed of PipelineChannel (Ratis/StandAlone) that back a
|
||||
// container.
|
||||
message Pipeline {
|
||||
required PipelineChannel pipelineChannel = 2;
|
||||
}
|
||||
|
||||
message KeyValue {
|
||||
required string key = 1;
|
||||
optional string value = 2;
|
||||
|
@ -127,12 +127,12 @@ public void close(HddsProtos.SCMContainerInfo info) {
|
||||
// to SCM. In that case also, data node will ignore this command.
|
||||
|
||||
HddsProtos.Pipeline pipeline = info.getPipeline();
|
||||
for (HddsProtos.DatanodeDetailsProto datanodeDetails : pipeline
|
||||
.getPipelineChannel().getMembersList()) {
|
||||
for (HddsProtos.DatanodeDetailsProto datanodeDetails :
|
||||
pipeline.getMembersList()) {
|
||||
nodeManager.addDatanodeCommand(
|
||||
DatanodeDetails.getFromProtoBuf(datanodeDetails).getUuid(),
|
||||
new CloseContainerCommand(info.getContainerID(),
|
||||
pipeline.getPipelineChannel().getType()));
|
||||
pipeline.getType()));
|
||||
}
|
||||
if (!commandIssued.containsKey(info.getContainerID())) {
|
||||
commandIssued.put(info.getContainerID(),
|
||||
|
@ -17,7 +17,6 @@
|
||||
package org.apache.hadoop.hdds.scm.pipelines;
|
||||
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
@ -36,12 +35,12 @@
|
||||
public abstract class PipelineManager {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(PipelineManager.class);
|
||||
private final List<PipelineChannel> activePipelineChannels;
|
||||
private final AtomicInteger conduitsIndex;
|
||||
private final List<Pipeline> activePipelines;
|
||||
private final AtomicInteger pipelineIndex;
|
||||
|
||||
public PipelineManager() {
|
||||
activePipelineChannels = new LinkedList<>();
|
||||
conduitsIndex = new AtomicInteger(0);
|
||||
activePipelines = new LinkedList<>();
|
||||
pipelineIndex = new AtomicInteger(0);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -59,9 +58,9 @@ public synchronized final Pipeline getPipeline(
|
||||
/**
|
||||
* In the Ozone world, we have a very simple policy.
|
||||
*
|
||||
* 1. Try to create a pipelineChannel if there are enough free nodes.
|
||||
* 1. Try to create a pipeline if there are enough free nodes.
|
||||
*
|
||||
* 2. This allows all nodes to part of a pipelineChannel quickly.
|
||||
* 2. This allows all nodes to part of a pipeline quickly.
|
||||
*
|
||||
* 3. if there are not enough free nodes, return conduits in a
|
||||
* round-robin fashion.
|
||||
@ -70,28 +69,28 @@ public synchronized final Pipeline getPipeline(
|
||||
* Create a new placement policy that returns conduits in round robin
|
||||
* fashion.
|
||||
*/
|
||||
PipelineChannel pipelineChannel =
|
||||
allocatePipelineChannel(replicationFactor);
|
||||
if (pipelineChannel != null) {
|
||||
LOG.debug("created new pipelineChannel:{} for container with " +
|
||||
Pipeline pipeline =
|
||||
allocatePipeline(replicationFactor);
|
||||
if (pipeline != null) {
|
||||
LOG.debug("created new pipeline:{} for container with " +
|
||||
"replicationType:{} replicationFactor:{}",
|
||||
pipelineChannel.getName(), replicationType, replicationFactor);
|
||||
activePipelineChannels.add(pipelineChannel);
|
||||
pipeline.getPipelineName(), replicationType, replicationFactor);
|
||||
activePipelines.add(pipeline);
|
||||
} else {
|
||||
pipelineChannel =
|
||||
findOpenPipelineChannel(replicationType, replicationFactor);
|
||||
if (pipelineChannel != null) {
|
||||
LOG.debug("re-used pipelineChannel:{} for container with " +
|
||||
pipeline =
|
||||
findOpenPipeline(replicationType, replicationFactor);
|
||||
if (pipeline != null) {
|
||||
LOG.debug("re-used pipeline:{} for container with " +
|
||||
"replicationType:{} replicationFactor:{}",
|
||||
pipelineChannel.getName(), replicationType, replicationFactor);
|
||||
pipeline.getPipelineName(), replicationType, replicationFactor);
|
||||
}
|
||||
}
|
||||
if (pipelineChannel == null) {
|
||||
LOG.error("Get pipelineChannel call failed. We are not able to find" +
|
||||
"free nodes or operational pipelineChannel.");
|
||||
if (pipeline == null) {
|
||||
LOG.error("Get pipeline call failed. We are not able to find" +
|
||||
"free nodes or operational pipeline.");
|
||||
return null;
|
||||
} else {
|
||||
return new Pipeline(pipelineChannel);
|
||||
return pipeline;
|
||||
}
|
||||
}
|
||||
|
||||
@ -106,19 +105,19 @@ protected int getReplicationCount(ReplicationFactor factor) {
|
||||
}
|
||||
}
|
||||
|
||||
public abstract PipelineChannel allocatePipelineChannel(
|
||||
public abstract Pipeline allocatePipeline(
|
||||
ReplicationFactor replicationFactor) throws IOException;
|
||||
|
||||
/**
|
||||
* Find a PipelineChannel that is operational.
|
||||
* Find a Pipeline that is operational.
|
||||
*
|
||||
* @return - Pipeline or null
|
||||
*/
|
||||
private PipelineChannel findOpenPipelineChannel(
|
||||
private Pipeline findOpenPipeline(
|
||||
ReplicationType type, ReplicationFactor factor) {
|
||||
PipelineChannel pipelineChannel = null;
|
||||
Pipeline pipeline = null;
|
||||
final int sentinal = -1;
|
||||
if (activePipelineChannels.size() == 0) {
|
||||
if (activePipelines.size() == 0) {
|
||||
LOG.error("No Operational conduits found. Returning null.");
|
||||
return null;
|
||||
}
|
||||
@ -126,26 +125,26 @@ private PipelineChannel findOpenPipelineChannel(
|
||||
int nextIndex = sentinal;
|
||||
for (; startIndex != nextIndex; nextIndex = getNextIndex()) {
|
||||
// Just walk the list in a circular way.
|
||||
PipelineChannel temp =
|
||||
activePipelineChannels
|
||||
Pipeline temp =
|
||||
activePipelines
|
||||
.get(nextIndex != sentinal ? nextIndex : startIndex);
|
||||
// if we find an operational pipelineChannel just return that.
|
||||
// if we find an operational pipeline just return that.
|
||||
if ((temp.getLifeCycleState() == LifeCycleState.OPEN) &&
|
||||
(temp.getFactor() == factor) && (temp.getType() == type)) {
|
||||
pipelineChannel = temp;
|
||||
pipeline = temp;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return pipelineChannel;
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
/**
|
||||
* gets the next index of the PipelineChannel to get.
|
||||
* gets the next index of the Pipeline to get.
|
||||
*
|
||||
* @return index in the link list to get.
|
||||
*/
|
||||
private int getNextIndex() {
|
||||
return conduitsIndex.incrementAndGet() % activePipelineChannels.size();
|
||||
return pipelineIndex.incrementAndGet() % activePipelines.size();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -20,7 +20,6 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.algorithms
|
||||
.ContainerPlacementPolicy;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.algorithms
|
||||
@ -85,20 +84,20 @@ public PipelineSelector(NodeManager nodeManager, Configuration conf) {
|
||||
* The first of the list will be the leader node.
|
||||
* @return pipeline corresponding to nodes
|
||||
*/
|
||||
public static PipelineChannel newPipelineFromNodes(
|
||||
public static Pipeline newPipelineFromNodes(
|
||||
List<DatanodeDetails> nodes, LifeCycleState state,
|
||||
ReplicationType replicationType, ReplicationFactor replicationFactor,
|
||||
String name) {
|
||||
Preconditions.checkNotNull(nodes);
|
||||
Preconditions.checkArgument(nodes.size() > 0);
|
||||
String leaderId = nodes.get(0).getUuidString();
|
||||
PipelineChannel
|
||||
pipelineChannel = new PipelineChannel(leaderId, state, replicationType,
|
||||
Pipeline
|
||||
pipeline = new Pipeline(leaderId, state, replicationType,
|
||||
replicationFactor, name);
|
||||
for (DatanodeDetails node : nodes) {
|
||||
pipelineChannel.addMember(node);
|
||||
pipeline.addMember(node);
|
||||
}
|
||||
return pipelineChannel;
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -20,7 +20,6 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.algorithms
|
||||
.ContainerPlacementPolicy;
|
||||
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||
@ -68,12 +67,12 @@ public RatisManagerImpl(NodeManager nodeManager,
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocates a new ratis PipelineChannel from the free nodes.
|
||||
* Allocates a new ratis Pipeline from the free nodes.
|
||||
*
|
||||
* @param factor - One or Three
|
||||
* @return PipelineChannel.
|
||||
*/
|
||||
public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) {
|
||||
public Pipeline allocatePipeline(ReplicationFactor factor) {
|
||||
List<DatanodeDetails> newNodesList = new LinkedList<>();
|
||||
List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
|
||||
int count = getReplicationCount(factor);
|
||||
@ -87,22 +86,20 @@ public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) {
|
||||
// once a datanode has been added to a pipeline, exclude it from
|
||||
// further allocations
|
||||
ratisMembers.addAll(newNodesList);
|
||||
LOG.info("Allocating a new ratis pipelineChannel of size: {}", count);
|
||||
LOG.info("Allocating a new ratis pipeline of size: {}", count);
|
||||
// Start all channel names with "Ratis", easy to grep the logs.
|
||||
String conduitName = PREFIX +
|
||||
UUID.randomUUID().toString().substring(PREFIX.length());
|
||||
PipelineChannel pipelineChannel =
|
||||
Pipeline pipeline=
|
||||
PipelineSelector.newPipelineFromNodes(newNodesList,
|
||||
LifeCycleState.OPEN, ReplicationType.RATIS, factor, conduitName);
|
||||
Pipeline pipeline =
|
||||
new Pipeline(pipelineChannel);
|
||||
try (XceiverClientRatis client =
|
||||
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
|
||||
client.createPipeline(pipeline.getPipelineName(), newNodesList);
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
return pipelineChannel;
|
||||
return pipeline;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,7 @@
|
||||
package org.apache.hadoop.hdds.scm.pipelines.standalone;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.algorithms
|
||||
.ContainerPlacementPolicy;
|
||||
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||
@ -67,12 +67,12 @@ public StandaloneManagerImpl(NodeManager nodeManager,
|
||||
|
||||
|
||||
/**
|
||||
* Allocates a new standalone PipelineChannel from the free nodes.
|
||||
* Allocates a new standalone Pipeline from the free nodes.
|
||||
*
|
||||
* @param factor - One
|
||||
* @return PipelineChannel.
|
||||
* @return Pipeline.
|
||||
*/
|
||||
public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) {
|
||||
public Pipeline allocatePipeline(ReplicationFactor factor) {
|
||||
List<DatanodeDetails> newNodesList = new LinkedList<>();
|
||||
List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
|
||||
int count = getReplicationCount(factor);
|
||||
|
@ -23,7 +23,6 @@
|
||||
import org.apache.hadoop.hdds.scm.container.Mapping;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
@ -357,11 +356,10 @@ public void testDeletedBlockTransactions() throws IOException {
|
||||
|
||||
private void mockContainerInfo(Mapping mappingService, long containerID,
|
||||
DatanodeDetails dd) throws IOException {
|
||||
PipelineChannel pipelineChannel =
|
||||
new PipelineChannel("fake", LifeCycleState.OPEN,
|
||||
Pipeline pipeline =
|
||||
new Pipeline("fake", LifeCycleState.OPEN,
|
||||
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "fake");
|
||||
pipelineChannel.addMember(dd);
|
||||
Pipeline pipeline = new Pipeline(pipelineChannel);
|
||||
pipeline.addMember(dd);
|
||||
|
||||
ContainerInfo.Builder builder = new ContainerInfo.Builder();
|
||||
builder.setPipeline(pipeline);
|
||||
|
@ -28,7 +28,6 @@
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.scm.TestUtils;
|
||||
import org.apache.hadoop.hdds.scm.XceiverClient;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.test.PathUtils;
|
||||
import org.apache.hadoop.test.TestGenericTestUtils;
|
||||
@ -92,13 +91,12 @@ public void testStartMultipleDatanodes() throws Exception {
|
||||
for(HddsDatanodeService dn : datanodes) {
|
||||
// Create a single member pipe line
|
||||
DatanodeDetails datanodeDetails = dn.getDatanodeDetails();
|
||||
final PipelineChannel pipelineChannel =
|
||||
new PipelineChannel(datanodeDetails.getUuidString(),
|
||||
final Pipeline pipeline =
|
||||
new Pipeline(datanodeDetails.getUuidString(),
|
||||
HddsProtos.LifeCycleState.OPEN,
|
||||
HddsProtos.ReplicationType.STAND_ALONE,
|
||||
HddsProtos.ReplicationFactor.ONE, "test");
|
||||
pipelineChannel.addMember(datanodeDetails);
|
||||
Pipeline pipeline = new Pipeline(pipelineChannel);
|
||||
pipeline.addMember(datanodeDetails);
|
||||
|
||||
// Verify client is able to connect to the container
|
||||
try (XceiverClient client = new XceiverClient(pipeline, conf)){
|
||||
|
@ -37,7 +37,6 @@
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
@ -136,14 +135,14 @@ public static Pipeline createPipeline(
|
||||
Preconditions.checkArgument(i.hasNext());
|
||||
final DatanodeDetails leader = i.next();
|
||||
String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(3);
|
||||
final PipelineChannel pipelineChannel =
|
||||
new PipelineChannel(leader.getUuidString(), LifeCycleState.OPEN,
|
||||
final Pipeline pipeline =
|
||||
new Pipeline(leader.getUuidString(), LifeCycleState.OPEN,
|
||||
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
|
||||
pipelineChannel.addMember(leader);
|
||||
pipeline.addMember(leader);
|
||||
for(; i.hasNext();) {
|
||||
pipelineChannel.addMember(i.next());
|
||||
pipeline.addMember(i.next());
|
||||
}
|
||||
return new Pipeline(pipelineChannel);
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -207,8 +206,6 @@ public static ContainerCommandRequestProto getWriteChunkRequest(
|
||||
ContainerProtos.WriteChunkRequestProto
|
||||
.newBuilder();
|
||||
|
||||
Pipeline newPipeline =
|
||||
new Pipeline(pipeline.getPipelineChannel());
|
||||
writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf());
|
||||
|
||||
byte[] data = getData(datalen);
|
||||
@ -223,7 +220,7 @@ public static ContainerCommandRequestProto getWriteChunkRequest(
|
||||
request.setCmdType(ContainerProtos.Type.WriteChunk);
|
||||
request.setWriteChunk(writeRequest);
|
||||
request.setTraceID(UUID.randomUUID().toString());
|
||||
request.setDatanodeUuid(newPipeline.getLeader().getUuidString());
|
||||
request.setDatanodeUuid(pipeline.getLeader().getUuidString());
|
||||
|
||||
return request.build();
|
||||
}
|
||||
@ -241,8 +238,6 @@ public static ContainerCommandRequestProto getWriteSmallFileRequest(
|
||||
throws Exception {
|
||||
ContainerProtos.PutSmallFileRequestProto.Builder smallFileRequest =
|
||||
ContainerProtos.PutSmallFileRequestProto.newBuilder();
|
||||
Pipeline newPipeline =
|
||||
new Pipeline(pipeline.getPipelineChannel());
|
||||
byte[] data = getData(dataLen);
|
||||
ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, dataLen);
|
||||
setDataChecksum(info, data);
|
||||
@ -266,7 +261,7 @@ public static ContainerCommandRequestProto getWriteSmallFileRequest(
|
||||
request.setCmdType(ContainerProtos.Type.PutSmallFile);
|
||||
request.setPutSmallFile(smallFileRequest);
|
||||
request.setTraceID(UUID.randomUUID().toString());
|
||||
request.setDatanodeUuid(newPipeline.getLeader().getUuidString());
|
||||
request.setDatanodeUuid(pipeline.getLeader().getUuidString());
|
||||
return request.build();
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,6 @@
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel;
|
||||
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.util.Time;
|
||||
@ -150,14 +149,14 @@ public static Pipeline createPipeline(String containerName,
|
||||
Preconditions.checkArgument(i.hasNext());
|
||||
final DatanodeDetails leader = i.next();
|
||||
String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(5);
|
||||
final PipelineChannel pipelineChannel =
|
||||
new PipelineChannel(leader.getUuidString(), OPEN,
|
||||
final Pipeline pipeline =
|
||||
new Pipeline(leader.getUuidString(), OPEN,
|
||||
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
|
||||
pipelineChannel.addMember(leader);
|
||||
pipeline.addMember(leader);
|
||||
for (; i.hasNext();) {
|
||||
pipelineChannel.addMember(i.next());
|
||||
pipeline.addMember(i.next());
|
||||
}
|
||||
return new Pipeline(pipelineChannel);
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.ozone.genesis;
|
||||
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
@ -32,7 +33,6 @@
|
||||
import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.Level;
|
||||
@ -78,7 +78,7 @@ public class BenchMarkDatanodeDispatcher {
|
||||
private String baseDir;
|
||||
private String datanodeUuid;
|
||||
private Dispatcher dispatcher;
|
||||
private PipelineChannel pipelineChannel;
|
||||
private Pipeline pipeline;
|
||||
private ByteString data;
|
||||
private Random random;
|
||||
private AtomicInteger containerCount;
|
||||
@ -96,7 +96,7 @@ public class BenchMarkDatanodeDispatcher {
|
||||
@Setup(Level.Trial)
|
||||
public void initialize() throws IOException {
|
||||
datanodeUuid = UUID.randomUUID().toString();
|
||||
pipelineChannel = new PipelineChannel("127.0.0.1",
|
||||
pipeline = new Pipeline("127.0.0.1",
|
||||
LifeCycleState.OPEN, ReplicationType.STAND_ALONE,
|
||||
ReplicationFactor.ONE, "SA-" + UUID.randomUUID());
|
||||
|
||||
|
@ -519,11 +519,11 @@ private void insertContainerDB(Connection conn, long containerID,
|
||||
LOG.info("Insert to sql container db, for container {}", containerID);
|
||||
String insertContainerInfo = String.format(
|
||||
INSERT_CONTAINER_INFO, containerID,
|
||||
pipeline.getPipelineChannel().getLeaderID());
|
||||
pipeline.getLeaderID());
|
||||
executeSQL(conn, insertContainerInfo);
|
||||
|
||||
for (HddsProtos.DatanodeDetailsProto dd :
|
||||
pipeline.getPipelineChannel().getMembersList()) {
|
||||
pipeline.getMembersList()) {
|
||||
String uuid = dd.getUuid();
|
||||
if (!uuidChecked.contains(uuid)) {
|
||||
// we may also not use this checked set, but catch exception instead
|
||||
|
Loading…
Reference in New Issue
Block a user