HDDS-175. Refactor ContainerInfo to remove Pipeline object from it.

Contributed by Ajay Kumar.
This commit is contained in:
Anu Engineer 2018-07-03 13:30:19 -07:00
parent 93ac01cb59
commit 7ca4f0cefa
47 changed files with 1139 additions and 504 deletions

View File

@ -20,6 +20,7 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.protocolPB
@ -87,16 +88,17 @@ public static void setContainerSizeB(long size) {
* @inheritDoc
*/
@Override
public ContainerInfo createContainer(String owner)
public ContainerWithPipeline createContainer(String owner)
throws IOException {
XceiverClientSpi client = null;
try {
ContainerInfo container =
ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), owner);
Pipeline pipeline = container.getPipeline();
client = xceiverClientManager.acquireClient(pipeline, container.getContainerID());
Pipeline pipeline = containerWithPipeline.getPipeline();
client = xceiverClientManager.acquireClient(pipeline,
containerWithPipeline.getContainerInfo().getContainerID());
// Allocated State means that SCM has allocated this pipeline in its
// namespace. The client needs to create the pipeline on the machines
@ -106,8 +108,9 @@ public ContainerInfo createContainer(String owner)
if (pipeline.getLifeCycleState() == ALLOCATED) {
createPipeline(client, pipeline);
}
createContainer(client, container.getContainerID());
return container;
createContainer(client,
containerWithPipeline.getContainerInfo().getContainerID());
return containerWithPipeline;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
@ -197,17 +200,17 @@ private void createPipeline(XceiverClientSpi client, Pipeline pipeline)
* @inheritDoc
*/
@Override
public ContainerInfo createContainer(HddsProtos.ReplicationType type,
public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor, String owner) throws IOException {
XceiverClientSpi client = null;
try {
// allocate container on SCM.
ContainerInfo container =
ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.allocateContainer(type, factor,
owner);
Pipeline pipeline = container.getPipeline();
Pipeline pipeline = containerWithPipeline.getPipeline();
client = xceiverClientManager.acquireClient(pipeline,
container.getContainerID());
containerWithPipeline.getContainerInfo().getContainerID());
// Allocated State means that SCM has allocated this pipeline in its
// namespace. The client needs to create the pipeline on the machines
@ -217,9 +220,10 @@ public ContainerInfo createContainer(HddsProtos.ReplicationType type,
}
// connect to pipeline leader and allocate container on leader datanode.
client = xceiverClientManager.acquireClient(pipeline,
container.getContainerID());
createContainer(client, container.getContainerID());
return container;
containerWithPipeline.getContainerInfo().getContainerID());
createContainer(client,
containerWithPipeline.getContainerInfo().getContainerID());
return containerWithPipeline;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
@ -256,24 +260,27 @@ public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
}
/**
* Delete the container, this will release any resource it uses.
* @param pipeline - Pipeline that represents the container.
* @param force - True to forcibly delete the container.
* Deletes an existing container.
*
* @param containerId - ID of the container.
* @param pipeline - Pipeline that represents the container.
* @param force - true to forcibly delete the container.
* @throws IOException
*/
@Override
public void deleteContainer(long containerID, Pipeline pipeline, boolean force)
throws IOException {
public void deleteContainer(long containerId, Pipeline pipeline,
boolean force) throws IOException {
XceiverClientSpi client = null;
try {
client = xceiverClientManager.acquireClient(pipeline, containerID);
client = xceiverClientManager.acquireClient(pipeline, containerId);
String traceID = UUID.randomUUID().toString();
ContainerProtocolCalls.deleteContainer(client, containerID, force, traceID);
ContainerProtocolCalls
.deleteContainer(client, containerId, force, traceID);
storageContainerLocationClient
.deleteContainer(containerID);
.deleteContainer(containerId);
if (LOG.isDebugEnabled()) {
LOG.debug("Deleted container {}, leader: {}, machines: {} ",
containerID,
containerId,
pipeline.getLeader(),
pipeline.getMachines());
}
@ -284,6 +291,19 @@ public void deleteContainer(long containerID, Pipeline pipeline, boolean force)
}
}
/**
* Delete the container, this will release any resource it uses.
* @param containerID - containerID.
* @param force - True to forcibly delete the container.
* @throws IOException
*/
@Override
public void deleteContainer(long containerID, boolean force)
throws IOException {
ContainerWithPipeline info = getContainerWithPipeline(containerID);
deleteContainer(containerID, info.getPipeline(), force);
}
/**
* {@inheritDoc}
*/
@ -297,9 +317,9 @@ public List<ContainerInfo> listContainer(long startContainerID,
/**
* Get meta data from an existing container.
*
* @param pipeline - pipeline that represents the container.
* @return ContainerInfo - a message of protobuf which has basic info
* of a container.
* @param containerID - ID of the container.
* @param pipeline - Pipeline where the container is located.
* @return ContainerInfo
* @throws IOException
*/
@Override
@ -325,6 +345,19 @@ public ContainerData readContainer(long containerID,
}
}
/**
* Get meta data from an existing container.
* @param containerID - ID of the container.
* @return ContainerInfo - a message of protobuf which has basic info
* of a container.
* @throws IOException
*/
@Override
public ContainerData readContainer(long containerID) throws IOException {
ContainerWithPipeline info = getContainerWithPipeline(containerID);
return readContainer(containerID, info.getPipeline());
}
/**
* Given an id, return the pipeline associated with the container.
* @param containerId - String Container ID
@ -337,6 +370,19 @@ public ContainerInfo getContainer(long containerId) throws
return storageContainerLocationClient.getContainer(containerId);
}
/**
* Gets a container by Name -- Throws if the container does not exist.
*
* @param containerId - Container ID
* @return ContainerWithPipeline
* @throws IOException
*/
@Override
public ContainerWithPipeline getContainerWithPipeline(long containerId)
throws IOException {
return storageContainerLocationClient.getContainerWithPipeline(containerId);
}
/**
* Close a container.
*
@ -391,6 +437,19 @@ public void closeContainer(long containerId, Pipeline pipeline)
}
}
/**
* Close a container.
*
* @throws IOException
*/
@Override
public void closeContainer(long containerId)
throws IOException {
ContainerWithPipeline info = getContainerWithPipeline(containerId);
Pipeline pipeline = info.getPipeline();
closeContainer(containerId, pipeline);
}
/**
* Get the the current usage information.
* @param containerID - ID of the container.

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.client;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@ -45,7 +46,7 @@ public interface ScmClient {
* @return ContainerInfo
* @throws IOException
*/
ContainerInfo createContainer(String owner) throws IOException;
ContainerWithPipeline createContainer(String owner) throws IOException;
/**
* Gets a container by Name -- Throws if the container does not exist.
@ -55,6 +56,14 @@ public interface ScmClient {
*/
ContainerInfo getContainer(long containerId) throws IOException;
/**
* Gets a container by Name -- Throws if the container does not exist.
* @param containerId - Container ID
* @return ContainerWithPipeline
* @throws IOException
*/
ContainerWithPipeline getContainerWithPipeline(long containerId) throws IOException;
/**
* Close a container.
*
@ -64,6 +73,14 @@ public interface ScmClient {
*/
void closeContainer(long containerId, Pipeline pipeline) throws IOException;
/**
* Close a container.
*
* @param containerId - ID of the container.
* @throws IOException
*/
void closeContainer(long containerId) throws IOException;
/**
* Deletes an existing container.
* @param containerId - ID of the container.
@ -73,6 +90,14 @@ public interface ScmClient {
*/
void deleteContainer(long containerId, Pipeline pipeline, boolean force) throws IOException;
/**
* Deletes an existing container.
* @param containerId - ID of the container.
* @param force - true to forcibly delete the container.
* @throws IOException
*/
void deleteContainer(long containerId, boolean force) throws IOException;
/**
* Lists a range of containers and get their info.
*
@ -95,6 +120,15 @@ List<ContainerInfo> listContainer(long startContainerID,
ContainerData readContainer(long containerID, Pipeline pipeline)
throws IOException;
/**
* Read meta data from an existing container.
* @param containerID - ID of the container.
* @return ContainerInfo
* @throws IOException
*/
ContainerData readContainer(long containerID)
throws IOException;
/**
* Gets the container size -- Computed by SCM from Container Reports.
* @param containerID - ID of the container.
@ -110,7 +144,7 @@ ContainerData readContainer(long containerID, Pipeline pipeline)
* @return ContainerInfo
* @throws IOException - in case of error.
*/
ContainerInfo createContainer(HddsProtos.ReplicationType type,
ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor replicationFactor,
String owner) throws IOException;

View File

@ -15,34 +15,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container.common.helpers;
import static java.lang.Math.max;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.util.Time;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import java.util.Comparator;
import static java.lang.Math.max;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.util.Time;
/**
* Class wraps ozone container info.
*/
public class ContainerInfo
implements Comparator<ContainerInfo>, Comparable<ContainerInfo> {
public class ContainerInfo implements Comparator<ContainerInfo>,
Comparable<ContainerInfo>, Externalizable {
private static final ObjectWriter WRITER;
private static final String SERIALIZATION_ERROR_MSG = "Java serialization not"
+ " supported. Use protobuf instead.";
static {
ObjectMapper mapper = new ObjectMapper();
@ -53,7 +58,9 @@ public class ContainerInfo
}
private HddsProtos.LifeCycleState state;
private Pipeline pipeline;
private String pipelineName;
private ReplicationFactor replicationFactor;
private ReplicationType replicationType;
// Bytes allocated by SCM for clients.
private long allocatedBytes;
// Actual container usage, updated through heartbeat.
@ -75,15 +82,17 @@ public class ContainerInfo
ContainerInfo(
long containerID,
HddsProtos.LifeCycleState state,
Pipeline pipeline,
String pipelineName,
long allocatedBytes,
long usedBytes,
long numberOfKeys,
long stateEnterTime,
String owner,
long deleteTransactionId) {
long deleteTransactionId,
ReplicationFactor replicationFactor,
ReplicationType repType) {
this.containerID = containerID;
this.pipeline = pipeline;
this.pipelineName = pipelineName;
this.allocatedBytes = allocatedBytes;
this.usedBytes = usedBytes;
this.numberOfKeys = numberOfKeys;
@ -92,6 +101,8 @@ public class ContainerInfo
this.stateEnterTime = stateEnterTime;
this.owner = owner;
this.deleteTransactionId = deleteTransactionId;
this.replicationFactor = replicationFactor;
this.replicationType = repType;
}
/**
@ -102,16 +113,18 @@ public ContainerInfo() {
public static ContainerInfo fromProtobuf(HddsProtos.SCMContainerInfo info) {
ContainerInfo.Builder builder = new ContainerInfo.Builder();
builder.setPipeline(Pipeline.getFromProtoBuf(info.getPipeline()));
builder.setAllocatedBytes(info.getAllocatedBytes());
builder.setUsedBytes(info.getUsedBytes());
builder.setNumberOfKeys(info.getNumberOfKeys());
builder.setState(info.getState());
builder.setStateEnterTime(info.getStateEnterTime());
builder.setOwner(info.getOwner());
builder.setContainerID(info.getContainerID());
builder.setDeleteTransactionId(info.getDeleteTransactionId());
return builder.build();
return builder.setPipelineName(info.getPipelineName())
.setAllocatedBytes(info.getAllocatedBytes())
.setUsedBytes(info.getUsedBytes())
.setNumberOfKeys(info.getNumberOfKeys())
.setState(info.getState())
.setStateEnterTime(info.getStateEnterTime())
.setOwner(info.getOwner())
.setContainerID(info.getContainerID())
.setDeleteTransactionId(info.getDeleteTransactionId())
.setReplicationFactor(info.getReplicationFactor())
.setReplicationType(info.getReplicationType())
.build();
}
public long getContainerID() {
@ -130,8 +143,12 @@ public long getStateEnterTime() {
return stateEnterTime;
}
public Pipeline getPipeline() {
return pipeline;
public ReplicationFactor getReplicationFactor() {
return replicationFactor;
}
public String getPipelineName() {
return pipelineName;
}
public long getAllocatedBytes() {
@ -177,6 +194,10 @@ public long getLastUsed() {
return lastUsed;
}
public ReplicationType getReplicationType() {
return replicationType;
}
public void updateLastUsedTime() {
lastUsed = Time.monotonicNow();
}
@ -190,19 +211,17 @@ public void allocate(long size) {
public HddsProtos.SCMContainerInfo getProtobuf() {
HddsProtos.SCMContainerInfo.Builder builder =
HddsProtos.SCMContainerInfo.newBuilder();
builder.setPipeline(getPipeline().getProtobufMessage());
builder.setAllocatedBytes(getAllocatedBytes());
builder.setUsedBytes(getUsedBytes());
builder.setNumberOfKeys(getNumberOfKeys());
builder.setState(state);
builder.setStateEnterTime(stateEnterTime);
builder.setContainerID(getContainerID());
builder.setDeleteTransactionId(deleteTransactionId);
if (getOwner() != null) {
builder.setOwner(getOwner());
}
return builder.build();
return builder.setAllocatedBytes(getAllocatedBytes())
.setContainerID(getContainerID())
.setUsedBytes(getUsedBytes())
.setNumberOfKeys(getNumberOfKeys()).setState(getState())
.setStateEnterTime(getStateEnterTime()).setContainerID(getContainerID())
.setDeleteTransactionId(getDeleteTransactionId())
.setPipelineName(getPipelineName())
.setReplicationFactor(getReplicationFactor())
.setReplicationType(getReplicationType())
.setOwner(getOwner())
.build();
}
public String getOwner() {
@ -217,7 +236,7 @@ public void setOwner(String owner) {
public String toString() {
return "ContainerInfo{"
+ "state=" + state
+ ", pipeline=" + pipeline
+ ", pipelineName=" + pipelineName
+ ", stateEnterTime=" + stateEnterTime
+ ", owner=" + owner
+ '}';
@ -252,9 +271,7 @@ public boolean equals(Object o) {
public int hashCode() {
return new HashCodeBuilder(11, 811)
.append(getContainerID())
.append(pipeline.getFactor())
.append(pipeline.getType())
.append(owner)
.append(getOwner())
.toHashCode();
}
@ -327,12 +344,44 @@ public void setData(byte[] data) {
this.data = Arrays.copyOf(data, data.length);
}
}
/**
* Throws IOException as default java serialization is not supported. Use
* serialization via protobuf instead.
*
* @param out the stream to write the object to
* @throws IOException Includes any I/O exceptions that may occur
* @serialData Overriding methods should use this tag to describe
* the data layout of this Externalizable object.
* List the sequence of element types and, if possible,
* relate the element to a public/protected field and/or
* method of this Externalizable class.
*/
@Override
public void writeExternal(ObjectOutput out) throws IOException {
throw new IOException(SERIALIZATION_ERROR_MSG);
}
/**
* Throws IOException as default java serialization is not supported. Use
* serialization via protobuf instead.
*
* @param in the stream to read data from in order to restore the object
* @throws IOException if I/O errors occur
* @throws ClassNotFoundException If the class for an object being
* restored cannot be found.
*/
@Override
public void readExternal(ObjectInput in)
throws IOException, ClassNotFoundException {
throw new IOException(SERIALIZATION_ERROR_MSG);
}
/**
* Builder class for ContainerInfo.
*/
public static class Builder {
private HddsProtos.LifeCycleState state;
private Pipeline pipeline;
private long allocated;
private long used;
private long keys;
@ -340,6 +389,25 @@ public static class Builder {
private String owner;
private long containerID;
private long deleteTransactionId;
private String pipelineName;
private ReplicationFactor replicationFactor;
private ReplicationType replicationType;
public Builder setReplicationType(
ReplicationType replicationType) {
this.replicationType = replicationType;
return this;
}
public Builder setPipelineName(String pipelineName) {
this.pipelineName = pipelineName;
return this;
}
public Builder setReplicationFactor(ReplicationFactor repFactor) {
this.replicationFactor = repFactor;
return this;
}
public Builder setContainerID(long id) {
Preconditions.checkState(id >= 0);
@ -352,11 +420,6 @@ public Builder setState(HddsProtos.LifeCycleState lifeCycleState) {
return this;
}
public Builder setPipeline(Pipeline containerPipeline) {
this.pipeline = containerPipeline;
return this;
}
public Builder setAllocatedBytes(long bytesAllocated) {
this.allocated = bytesAllocated;
return this;
@ -388,9 +451,9 @@ public Builder setDeleteTransactionId(long deleteTransactionId) {
}
public ContainerInfo build() {
return new
ContainerInfo(containerID, state, pipeline, allocated,
used, keys, stateEnterTime, owner, deleteTransactionId);
return new ContainerInfo(containerID, state, pipelineName, allocated,
used, keys, stateEnterTime, owner, deleteTransactionId,
replicationFactor, replicationType);
}
}
}

View File

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container.common.helpers;
import java.util.Comparator;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
/**
* Class wraps ozone container info.
*/
public class ContainerWithPipeline
implements Comparator<ContainerWithPipeline>, Comparable<ContainerWithPipeline> {
private final ContainerInfo containerInfo;
private final Pipeline pipeline;
public ContainerWithPipeline(ContainerInfo containerInfo, Pipeline pipeline) {
this.containerInfo = containerInfo;
this.pipeline = pipeline;
}
public ContainerInfo getContainerInfo() {
return containerInfo;
}
public Pipeline getPipeline() {
return pipeline;
}
public static ContainerWithPipeline fromProtobuf(HddsProtos.ContainerWithPipeline allocatedContainer) {
return new ContainerWithPipeline(
ContainerInfo.fromProtobuf(allocatedContainer.getContainerInfo()),
Pipeline.getFromProtoBuf(allocatedContainer.getPipeline()));
}
public HddsProtos.ContainerWithPipeline getProtobuf() {
HddsProtos.ContainerWithPipeline.Builder builder =
HddsProtos.ContainerWithPipeline.newBuilder();
builder.setContainerInfo(getContainerInfo().getProtobuf())
.setPipeline(getPipeline().getProtobufMessage());
return builder.build();
}
@Override
public String toString() {
return containerInfo.toString() + " | " + pipeline.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ContainerWithPipeline that = (ContainerWithPipeline) o;
return new EqualsBuilder()
.append(getContainerInfo(), that.getContainerInfo())
.append(getPipeline(), that.getPipeline())
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(11, 811)
.append(getContainerInfo())
.append(getPipeline())
.toHashCode();
}
/**
* Compares its two arguments for order. Returns a negative integer, zero, or
* a positive integer as the first argument is less than, equal to, or greater
* than the second.<p>
*
* @param o1 the first object to be compared.
* @param o2 the second object to be compared.
* @return a negative integer, zero, or a positive integer as the first
* argument is less than, equal to, or greater than the second.
* @throws NullPointerException if an argument is null and this comparator
* does not permit null arguments
* @throws ClassCastException if the arguments' types prevent them from
* being compared by this comparator.
*/
@Override
public int compare(ContainerWithPipeline o1, ContainerWithPipeline o2) {
return o1.getContainerInfo().compareTo(o2.getContainerInfo());
}
/**
* Compares this object with the specified object for order. Returns a
* negative integer, zero, or a positive integer as this object is less than,
* equal to, or greater than the specified object.
*
* @param o the object to be compared.
* @return a negative integer, zero, or a positive integer as this object is
* less than, equal to, or greater than the specified object.
* @throws NullPointerException if the specified object is null
* @throws ClassCastException if the specified object's type prevents it
* from being compared to this object.
*/
@Override
public int compareTo(ContainerWithPipeline o) {
return this.compare(this, o);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.protocol;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@ -38,7 +39,7 @@ public interface StorageContainerLocationProtocol {
* set of datanodes that should be used creating this container.
*
*/
ContainerInfo allocateContainer(HddsProtos.ReplicationType replicationType,
ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType replicationType,
HddsProtos.ReplicationFactor factor, String owner)
throws IOException;
@ -53,6 +54,16 @@ ContainerInfo allocateContainer(HddsProtos.ReplicationType replicationType,
*/
ContainerInfo getContainer(long containerID) throws IOException;
/**
* Ask SCM the location of the container. SCM responds with a group of
* nodes where this container and its replicas are located.
*
* @param containerID - ID of the container.
* @return ContainerWithPipeline - the container info with the pipeline.
* @throws IOException
*/
ContainerWithPipeline getContainerWithPipeline(long containerID) throws IOException;
/**
* Ask SCM a list of containers with a range of container names
* and the limit of count.

View File

@ -20,7 +20,10 @@
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
@ -95,7 +98,7 @@ public StorageContainerLocationProtocolClientSideTranslatorPB(
* @throws IOException
*/
@Override
public ContainerInfo allocateContainer(HddsProtos.ReplicationType type,
public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor, String owner) throws IOException {
ContainerRequestProto request = ContainerRequestProto.newBuilder()
@ -114,7 +117,7 @@ public ContainerInfo allocateContainer(HddsProtos.ReplicationType type,
throw new IOException(response.hasErrorMessage() ?
response.getErrorMessage() : "Allocate container failed.");
}
return ContainerInfo.fromProtobuf(response.getContainerInfo());
return ContainerWithPipeline.fromProtobuf(response.getContainerWithPipeline());
}
public ContainerInfo getContainer(long containerID) throws IOException {
@ -133,6 +136,25 @@ public ContainerInfo getContainer(long containerID) throws IOException {
}
}
/**
* {@inheritDoc}
*/
public ContainerWithPipeline getContainerWithPipeline(long containerID) throws IOException {
Preconditions.checkState(containerID >= 0,
"Container ID cannot be negative");
GetContainerWithPipelineRequestProto request = GetContainerWithPipelineRequestProto
.newBuilder()
.setContainerID(containerID)
.build();
try {
GetContainerWithPipelineResponseProto response =
rpcProxy.getContainerWithPipeline(NULL_RPC_CONTROLLER, request);
return ContainerWithPipeline.fromProtobuf(response.getContainerWithPipeline());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
/**
* {@inheritDoc}
*/

View File

@ -21,7 +21,10 @@
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
@ -82,10 +85,11 @@ public StorageContainerLocationProtocolServerSideTranslatorPB(
public ContainerResponseProto allocateContainer(RpcController unused,
ContainerRequestProto request) throws ServiceException {
try {
ContainerInfo container = impl.allocateContainer(request.getReplicationType(),
request.getReplicationFactor(), request.getOwner());
ContainerWithPipeline containerWithPipeline = impl
.allocateContainer(request.getReplicationType(),
request.getReplicationFactor(), request.getOwner());
return ContainerResponseProto.newBuilder()
.setContainerInfo(container.getProtobuf())
.setContainerWithPipeline(containerWithPipeline.getProtobuf())
.setErrorCode(ContainerResponseProto.Error.success)
.build();
@ -108,6 +112,21 @@ public GetContainerResponseProto getContainer(
}
}
@Override
public GetContainerWithPipelineResponseProto getContainerWithPipeline(
RpcController controller, GetContainerWithPipelineRequestProto request)
throws ServiceException {
try {
ContainerWithPipeline container = impl
.getContainerWithPipeline(request.getContainerID());
return GetContainerWithPipelineResponseProto.newBuilder()
.setContainerWithPipeline(container.getProtobuf())
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public SCMListContainerResponseProto listContainer(RpcController controller,
SCMListContainerRequestProto request) throws ServiceException {

View File

@ -52,7 +52,7 @@ message ContainerResponseProto {
errorContainerMissing = 3;
}
required Error errorCode = 1;
required SCMContainerInfo containerInfo = 2;
required ContainerWithPipeline containerWithPipeline = 2;
optional string errorMessage = 3;
}
@ -64,6 +64,14 @@ message GetContainerResponseProto {
required SCMContainerInfo containerInfo = 1;
}
message GetContainerWithPipelineRequestProto {
required int64 containerID = 1;
}
message GetContainerWithPipelineResponseProto {
required ContainerWithPipeline containerWithPipeline = 1;
}
message SCMListContainerRequestProto {
required uint32 count = 1;
optional uint64 startContainerID = 2;
@ -171,6 +179,11 @@ service StorageContainerLocationProtocolService {
*/
rpc getContainer(GetContainerRequestProto) returns (GetContainerResponseProto);
/**
* Returns the pipeline for a given container.
*/
rpc getContainerWithPipeline(GetContainerWithPipelineRequestProto) returns (GetContainerWithPipelineResponseProto);
rpc listContainer(SCMListContainerRequestProto) returns (SCMListContainerResponseProto);
/**

View File

@ -132,7 +132,7 @@ enum LifeCycleEvent {
message SCMContainerInfo {
required int64 containerID = 1;
required LifeCycleState state = 2;
required Pipeline pipeline = 3;
optional string pipelineName = 3;
// This is not total size of container, but space allocated by SCM for
// clients to write blocks
required uint64 allocatedBytes = 4;
@ -141,6 +141,13 @@ message SCMContainerInfo {
optional int64 stateEnterTime = 7;
required string owner = 8;
optional int64 deleteTransactionId = 9;
required ReplicationFactor replicationFactor = 10;
required ReplicationType replicationType = 11;
}
message ContainerWithPipeline {
required SCMContainerInfo containerInfo = 1;
required Pipeline pipeline = 2;
}
message GetScmInfoRequestProto {

View File

@ -16,10 +16,12 @@
*/
package org.apache.hadoop.hdds.scm.block;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.Mapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@ -156,13 +158,13 @@ private void preAllocateContainers(int count, ReplicationType type,
lock.lock();
try {
for (int i = 0; i < count; i++) {
ContainerInfo containerInfo = null;
ContainerWithPipeline containerWithPipeline = null;
try {
// TODO: Fix this later when Ratis is made the Default.
containerInfo = containerManager.allocateContainer(type, factor,
containerWithPipeline = containerManager.allocateContainer(type, factor,
owner);
if (containerInfo == null) {
if (containerWithPipeline == null) {
LOG.warn("Unable to allocate container.");
continue;
}
@ -231,30 +233,27 @@ public AllocatedBlock allocateBlock(final long size,
can use different kind of policies.
*/
ContainerInfo containerInfo;
ContainerWithPipeline containerWithPipeline;
// Look for ALLOCATED container that matches all other parameters.
containerInfo =
containerManager
.getStateManager()
.getMatchingContainer(
size, owner, type, factor, HddsProtos.LifeCycleState
.ALLOCATED);
if (containerInfo != null) {
containerManager.updateContainerState(containerInfo.getContainerID(),
containerWithPipeline = containerManager
.getMatchingContainerWithPipeline(size, owner, type, factor,
HddsProtos.LifeCycleState.ALLOCATED);
if (containerWithPipeline != null) {
containerManager.updateContainerState(
containerWithPipeline.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
return newBlock(containerInfo, HddsProtos.LifeCycleState.ALLOCATED);
return newBlock(containerWithPipeline,
HddsProtos.LifeCycleState.ALLOCATED);
}
// Since we found no allocated containers that match our criteria, let us
// look for OPEN containers that match the criteria.
containerInfo =
containerManager
.getStateManager()
.getMatchingContainer(size, owner, type, factor, HddsProtos
.LifeCycleState.OPEN);
if (containerInfo != null) {
return newBlock(containerInfo, HddsProtos.LifeCycleState.OPEN);
containerWithPipeline = containerManager
.getMatchingContainerWithPipeline(size, owner, type, factor,
HddsProtos.LifeCycleState.OPEN);
if (containerWithPipeline != null) {
return newBlock(containerWithPipeline, HddsProtos.LifeCycleState.OPEN);
}
// We found neither ALLOCATED or OPEN Containers. This generally means
@ -264,16 +263,15 @@ public AllocatedBlock allocateBlock(final long size,
preAllocateContainers(containerProvisionBatchSize, type, factor, owner);
// Since we just allocated a set of containers this should work
containerInfo =
containerManager
.getStateManager()
.getMatchingContainer(
size, owner, type, factor, HddsProtos.LifeCycleState
.ALLOCATED);
if (containerInfo != null) {
containerManager.updateContainerState(containerInfo.getContainerID(),
containerWithPipeline = containerManager
.getMatchingContainerWithPipeline(size, owner, type, factor,
HddsProtos.LifeCycleState.ALLOCATED);
if (containerWithPipeline != null) {
containerManager.updateContainerState(
containerWithPipeline.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
return newBlock(containerInfo, HddsProtos.LifeCycleState.ALLOCATED);
return newBlock(containerWithPipeline,
HddsProtos.LifeCycleState.ALLOCATED);
}
// we have tried all strategies we know and but somehow we are not able
@ -290,18 +288,28 @@ public AllocatedBlock allocateBlock(final long size,
}
}
private String getChannelName(ReplicationType type) {
switch (type) {
case RATIS:
return "RA" + UUID.randomUUID().toString().substring(3);
case STAND_ALONE:
return "SA" + UUID.randomUUID().toString().substring(3);
default:
return "RA" + UUID.randomUUID().toString().substring(3);
}
}
/**
* newBlock - returns a new block assigned to a container.
*
* @param containerInfo - Container Info.
* @param containerWithPipeline - Container Info.
* @param state - Current state of the container.
* @return AllocatedBlock
*/
private AllocatedBlock newBlock(
ContainerInfo containerInfo, HddsProtos.LifeCycleState state)
throws IOException {
if (containerInfo.getPipeline().getMachines().size() == 0) {
private AllocatedBlock newBlock(ContainerWithPipeline containerWithPipeline,
HddsProtos.LifeCycleState state) throws IOException {
ContainerInfo containerInfo = containerWithPipeline.getContainerInfo();
if (containerWithPipeline.getPipeline().getDatanodes().size() == 0) {
LOG.error("Pipeline Machine count is zero.");
return null;
}
@ -317,7 +325,7 @@ private AllocatedBlock newBlock(
AllocatedBlock.Builder abb =
new AllocatedBlock.Builder()
.setBlockID(new BlockID(containerID, localID))
.setPipeline(containerInfo.getPipeline())
.setPipeline(containerWithPipeline.getPipeline())
.setShouldCreateContainer(createContainer);
LOG.trace("New block allocated : {} Container ID: {}", localID,
containerID);

View File

@ -18,7 +18,6 @@
import com.google.common.collect.ArrayListMultimap;
import org.apache.hadoop.hdds.scm.container.Mapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
@ -29,6 +28,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
/**
* A wrapper class to hold info about datanode and all deleted block
@ -54,21 +54,22 @@ public class DatanodeDeletedBlockTransactions {
}
public void addTransaction(DeletedBlocksTransaction tx) throws IOException {
ContainerInfo info = null;
Pipeline pipeline = null;
try {
info = mappingService.getContainer(tx.getContainerID());
pipeline = mappingService.getContainerWithPipeline(tx.getContainerID())
.getPipeline();
} catch (IOException e) {
SCMBlockDeletingService.LOG.warn("Got container info error.", e);
}
if (info == null) {
if (pipeline == null) {
SCMBlockDeletingService.LOG.warn(
"Container {} not found, continue to process next",
tx.getContainerID());
return;
}
for (DatanodeDetails dd : info.getPipeline().getMachines()) {
for (DatanodeDetails dd : pipeline.getMachines()) {
UUID dnID = dd.getUuid();
if (transactions.containsKey(dnID)) {
List<DeletedBlocksTransaction> txs = transactions.get(dnID);

View File

@ -16,9 +16,11 @@
*/
package org.apache.hadoop.hdds.scm.container;
import java.io.IOException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@ -54,22 +56,32 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) {
LOG.info("Close container Event triggered for container : {}",
containerID.getId());
ContainerStateManager stateManager = containerManager.getStateManager();
ContainerInfo info = stateManager.getContainer(containerID);
if (info == null) {
LOG.info("Container with id : {} does not exist", containerID.getId());
ContainerWithPipeline containerWithPipeline = null;
ContainerInfo info;
try {
containerWithPipeline = containerManager.getContainerWithPipeline(containerID.getId());
info = containerWithPipeline.getContainerInfo();
if (info == null) {
LOG.info("Failed to update the container state. Container with id : {} "
+ "does not exist", containerID.getId());
return;
}
} catch (IOException e) {
LOG.info("Failed to update the container state. Container with id : {} "
+ "does not exist", containerID.getId());
return;
}
if (info.getState() == HddsProtos.LifeCycleState.OPEN) {
for (DatanodeDetails datanode : info.getPipeline().getMachines()) {
for (DatanodeDetails datanode : containerWithPipeline.getPipeline().getMachines()) {
containerManager.getNodeManager().addDatanodeCommand(datanode.getUuid(),
new CloseContainerCommand(containerID.getId(),
info.getPipeline().getType()));
info.getReplicationType()));
}
try {
// Finalize event will make sure the state of the container transitions
// from OPEN to CLOSING in containerStateManager.
stateManager
containerManager.getStateManager()
.updateContainerState(info, HddsProtos.LifeCycleEvent.FINALIZE);
} catch (SCMException ex) {
LOG.error("Failed to update the container state for container : {}"

View File

@ -21,6 +21,10 @@
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
@ -166,6 +170,44 @@ public ContainerInfo getContainer(final long containerID) throws
}
}
/**
* Returns the ContainerInfo from the container ID.
*
* @param containerID - ID of container.
* @return - ContainerWithPipeline such as creation state and the pipeline.
* @throws IOException
*/
@Override
public ContainerWithPipeline getContainerWithPipeline(long containerID)
throws IOException {
ContainerInfo contInfo;
lock.lock();
try {
byte[] containerBytes = containerStore.get(
Longs.toByteArray(containerID));
if (containerBytes == null) {
throw new SCMException(
"Specified key does not exist. key : " + containerID,
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
}
HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
.parseFrom(containerBytes);
contInfo = ContainerInfo.fromProtobuf(temp);
Pipeline pipeline = pipelineSelector
.getPipeline(contInfo.getPipelineName(),
contInfo.getReplicationType());
if(pipeline == null) {
pipeline = pipelineSelector
.getReplicationPipeline(contInfo.getReplicationType(),
contInfo.getReplicationFactor());
}
return new ContainerWithPipeline(contInfo, pipeline);
} finally {
lock.unlock();
}
}
/**
* {@inheritDoc}
*/
@ -208,13 +250,15 @@ public List<ContainerInfo> listContainer(long startContainerID,
* @throws IOException - Exception
*/
@Override
public ContainerInfo allocateContainer(
public ContainerWithPipeline allocateContainer(
ReplicationType type,
ReplicationFactor replicationFactor,
String owner)
throws IOException {
ContainerInfo containerInfo;
ContainerWithPipeline containerWithPipeline;
if (!nodeManager.isOutOfChillMode()) {
throw new SCMException(
"Unable to create container while in chill mode",
@ -223,9 +267,9 @@ public ContainerInfo allocateContainer(
lock.lock();
try {
containerInfo =
containerStateManager.allocateContainer(
containerWithPipeline = containerStateManager.allocateContainer(
pipelineSelector, type, replicationFactor, owner);
containerInfo = containerWithPipeline.getContainerInfo();
byte[] containerIDBytes = Longs.toByteArray(
containerInfo.getContainerID());
@ -234,7 +278,7 @@ public ContainerInfo allocateContainer(
} finally {
lock.unlock();
}
return containerInfo;
return containerWithPipeline;
}
/**
@ -380,6 +424,35 @@ public ContainerStateManager getStateManager() {
return containerStateManager;
}
/**
* Return a container matching the attributes specified.
*
* @param size - Space needed in the Container.
* @param owner - Owner of the container - A specific nameservice.
* @param type - Replication Type {StandAlone, Ratis}
* @param factor - Replication Factor {ONE, THREE}
* @param state - State of the Container-- {Open, Allocated etc.}
* @return ContainerInfo, null if there is no match found.
*/
public ContainerWithPipeline getMatchingContainerWithPipeline(final long size,
String owner, ReplicationType type, ReplicationFactor factor,
LifeCycleState state) throws IOException {
ContainerInfo containerInfo = getStateManager()
.getMatchingContainer(size, owner, type, factor, state);
if (containerInfo == null) {
return null;
}
Pipeline pipeline = pipelineSelector
.getPipeline(containerInfo.getPipelineName(),
containerInfo.getReplicationType());
if (pipeline == null) {
pipelineSelector
.getReplicationPipeline(containerInfo.getReplicationType(),
containerInfo.getReplicationFactor());
}
return new ContainerWithPipeline(containerInfo, pipeline);
}
/**
* Process container report from Datanode.
* <p>
@ -415,7 +488,7 @@ public void processContainerReports(DatanodeDetails datanodeDetails,
HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
HddsProtos.SCMContainerInfo newState =
reconcileState(datanodeState, knownState);
reconcileState(datanodeState, knownState, datanodeDetails);
// FIX ME: This can be optimized, we write twice to memory, where a
// single write would work well.
@ -425,8 +498,14 @@ public void processContainerReports(DatanodeDetails datanodeDetails,
containerStore.put(dbKey, newState.toByteArray());
// If the container is closed, then state is already written to SCM
Pipeline pipeline = pipelineSelector.getPipeline(newState.getPipelineName(), newState.getReplicationType());
if(pipeline == null) {
pipeline = pipelineSelector
.getReplicationPipeline(newState.getReplicationType(),
newState.getReplicationFactor());
}
// DB.TODO: So can we can write only once to DB.
if (closeContainerIfNeeded(newState)) {
if (closeContainerIfNeeded(newState, pipeline)) {
LOG.info("Closing the Container: {}", newState.getContainerID());
}
} else {
@ -447,15 +526,22 @@ public void processContainerReports(DatanodeDetails datanodeDetails,
*
* @param datanodeState - State from the Datanode.
* @param knownState - State inside SCM.
* @param dnDetails
* @return new SCM State for this container.
*/
private HddsProtos.SCMContainerInfo reconcileState(
StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState,
HddsProtos.SCMContainerInfo knownState) {
SCMContainerInfo knownState, DatanodeDetails dnDetails) {
HddsProtos.SCMContainerInfo.Builder builder =
HddsProtos.SCMContainerInfo.newBuilder();
builder.setContainerID(knownState.getContainerID());
builder.setPipeline(knownState.getPipeline());
builder.setContainerID(knownState.getContainerID())
.setPipelineName(knownState.getPipelineName())
.setReplicationType(knownState.getReplicationType())
.setReplicationFactor(knownState.getReplicationFactor());
// TODO: If current state doesn't have this DN in list of DataNodes with replica
// then add it in list of replicas.
// If used size is greater than allocated size, we will be updating
// allocated size with used size. This update is done as a fallback
// mechanism in case SCM crashes without properly updating allocated
@ -464,13 +550,13 @@ private HddsProtos.SCMContainerInfo reconcileState(
long usedSize = datanodeState.getUsed();
long allocated = knownState.getAllocatedBytes() > usedSize ?
knownState.getAllocatedBytes() : usedSize;
builder.setAllocatedBytes(allocated);
builder.setUsedBytes(usedSize);
builder.setNumberOfKeys(datanodeState.getKeyCount());
builder.setState(knownState.getState());
builder.setStateEnterTime(knownState.getStateEnterTime());
builder.setContainerID(knownState.getContainerID());
builder.setDeleteTransactionId(knownState.getDeleteTransactionId());
builder.setAllocatedBytes(allocated)
.setUsedBytes(usedSize)
.setNumberOfKeys(datanodeState.getKeyCount())
.setState(knownState.getState())
.setStateEnterTime(knownState.getStateEnterTime())
.setContainerID(knownState.getContainerID())
.setDeleteTransactionId(knownState.getDeleteTransactionId());
if (knownState.getOwner() != null) {
builder.setOwner(knownState.getOwner());
}
@ -485,9 +571,11 @@ private HddsProtos.SCMContainerInfo reconcileState(
* one protobuf in one file and another definition in another file.
*
* @param newState - This is the state we maintain in SCM.
* @param pipeline
* @throws IOException
*/
private boolean closeContainerIfNeeded(HddsProtos.SCMContainerInfo newState)
private boolean closeContainerIfNeeded(SCMContainerInfo newState,
Pipeline pipeline)
throws IOException {
float containerUsedPercentage = 1.0f *
newState.getUsedBytes() / this.size;
@ -498,7 +586,7 @@ private boolean closeContainerIfNeeded(HddsProtos.SCMContainerInfo newState)
// We will call closer till get to the closed state.
// That is SCM will make this call repeatedly until we reach the closed
// state.
closer.close(newState);
closer.close(newState, pipeline);
if (shouldClose(scmInfo)) {
// This event moves the Container from Open to Closing State, this is
@ -598,10 +686,12 @@ public void flushContainerInfo() throws IOException {
.setAllocatedBytes(info.getAllocatedBytes())
.setNumberOfKeys(oldInfo.getNumberOfKeys())
.setOwner(oldInfo.getOwner())
.setPipeline(oldInfo.getPipeline())
.setPipelineName(oldInfo.getPipelineName())
.setState(oldInfo.getState())
.setUsedBytes(oldInfo.getUsedBytes())
.setDeleteTransactionId(oldInfo.getDeleteTransactionId())
.setReplicationFactor(oldInfo.getReplicationFactor())
.setReplicationType(oldInfo.getReplicationType())
.build();
containerStore.put(dbKey, newInfo.getProtobuf().toByteArray());
} else {

View File

@ -20,6 +20,7 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.states.ContainerState;
@ -279,10 +280,10 @@ private void initializeStateMachine() {
* @param selector -- Pipeline selector class.
* @param type -- Replication type.
* @param replicationFactor - Replication replicationFactor.
* @return Container Info.
* @return ContainerWithPipeline
* @throws IOException on Failure.
*/
public ContainerInfo allocateContainer(PipelineSelector selector, HddsProtos
public ContainerWithPipeline allocateContainer(PipelineSelector selector, HddsProtos
.ReplicationType type, HddsProtos.ReplicationFactor replicationFactor,
String owner) throws IOException {
@ -295,7 +296,7 @@ public ContainerInfo allocateContainer(PipelineSelector selector, HddsProtos
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(HddsProtos.LifeCycleState.ALLOCATED)
.setPipeline(pipeline)
.setPipelineName(pipeline.getPipelineName())
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
@ -305,11 +306,13 @@ public ContainerInfo allocateContainer(PipelineSelector selector, HddsProtos
.setOwner(owner)
.setContainerID(containerCount.incrementAndGet())
.setDeleteTransactionId(0)
.setReplicationFactor(replicationFactor)
.setReplicationType(pipeline.getType())
.build();
Preconditions.checkNotNull(containerInfo);
containers.addContainer(containerInfo);
LOG.trace("New container allocated: {}", containerInfo);
return containerInfo;
return new ContainerWithPipeline(containerInfo, pipeline);
}
/**
@ -432,8 +435,8 @@ private ContainerInfo findContainerWithSpace(long size,
containerInfo.updateLastUsedTime();
ContainerState key = new ContainerState(owner,
containerInfo.getPipeline().getType(),
containerInfo.getPipeline().getFactor());
containerInfo.getReplicationType(),
containerInfo.getReplicationFactor());
lastUsedMap.put(key, containerInfo.containerID());
return containerInfo;
}
@ -457,6 +460,20 @@ public NavigableSet<ContainerID> getMatchingContainerIDs(
factor, type);
}
/**
* Returns the containerInfo with pipeline for the given container id.
* @param selector -- Pipeline selector class.
* @param containerID id of the container
* @return ContainerInfo containerInfo
* @throws IOException
*/
public ContainerWithPipeline getContainer(PipelineSelector selector,
ContainerID containerID) throws IOException {
ContainerInfo info = containers.getContainerInfo(containerID.getId());
Pipeline pipeline = selector.getPipeline(info.getPipelineName(), info.getReplicationType());
return new ContainerWithPipeline(info, pipeline);
}
/**
* Returns the containerInfo for the given container id.
* @param containerID id of the container
@ -466,6 +483,7 @@ public NavigableSet<ContainerID> getMatchingContainerIDs(
public ContainerInfo getContainer(ContainerID containerID) {
return containers.getContainerInfo(containerID.getId());
}
@Override
public void close() throws IOException {
}

View File

@ -17,6 +17,10 @@
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
@ -42,6 +46,16 @@ public interface Mapping extends Closeable {
*/
ContainerInfo getContainer(long containerID) throws IOException;
/**
* Returns the ContainerInfo from the container ID.
*
* @param containerID - ID of container.
* @return - ContainerWithPipeline such as creation state and the pipeline.
* @throws IOException
*/
ContainerWithPipeline getContainerWithPipeline(long containerID)
throws IOException;
/**
* Returns containers under certain conditions.
* Search container IDs from start ID(exclusive),
@ -65,10 +79,10 @@ List<ContainerInfo> listContainer(long startContainerID, int count)
*
* @param replicationFactor - replication factor of the container.
* @param owner
* @return - Container Info.
* @return - ContainerWithPipeline.
* @throws IOException
*/
ContainerInfo allocateContainer(HddsProtos.ReplicationType type,
ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor replicationFactor, String owner)
throws IOException;
@ -120,4 +134,12 @@ void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
* @return NodeManager
*/
NodeManager getNodeManager();
/**
* Returns the ContainerWithPipeline.
* @return NodeManager
*/
public ContainerWithPipeline getMatchingContainerWithPipeline(final long size,
String owner, ReplicationType type, ReplicationFactor factor,
LifeCycleState state) throws IOException;
}

View File

@ -22,6 +22,8 @@
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@ -90,8 +92,10 @@ public static int getCleanupWaterMark() {
* lives.
*
* @param info - ContainerInfo.
* @param pipeline
*/
public void close(HddsProtos.SCMContainerInfo info) {
public void close(SCMContainerInfo info,
Pipeline pipeline) {
if (commandIssued.containsKey(info.getContainerID())) {
// We check if we issued a close command in last 3 * reportInterval secs.
@ -126,13 +130,10 @@ public void close(HddsProtos.SCMContainerInfo info) {
// this queue can be emptied by a datanode after a close report is send
// to SCM. In that case also, data node will ignore this command.
HddsProtos.Pipeline pipeline = info.getPipeline();
for (HddsProtos.DatanodeDetailsProto datanodeDetails :
pipeline.getMembersList()) {
nodeManager.addDatanodeCommand(
DatanodeDetails.getFromProtoBuf(datanodeDetails).getUuid(),
for (DatanodeDetails datanodeDetails : pipeline.getMachines()) {
nodeManager.addDatanodeCommand(datanodeDetails.getUuid(),
new CloseContainerCommand(info.getContainerID(),
pipeline.getType()));
info.getReplicationType()));
}
if (!commandIssued.containsKey(info.getContainerID())) {
commandIssued.put(info.getContainerID(),

View File

@ -116,7 +116,8 @@ public ContainerStateMap() {
public void addContainer(ContainerInfo info)
throws SCMException {
Preconditions.checkNotNull(info, "Container Info cannot be null");
Preconditions.checkNotNull(info.getPipeline(), "Pipeline cannot be null");
Preconditions.checkArgument(info.getReplicationFactor().getNumber() > 0,
"ExpectedReplicaCount should be greater than 0");
try (AutoCloseableLock lock = autoLock.acquire()) {
ContainerID id = ContainerID.valueof(info.getContainerID());
@ -129,8 +130,8 @@ public void addContainer(ContainerInfo info)
lifeCycleStateMap.insert(info.getState(), id);
ownerMap.insert(info.getOwner(), id);
factorMap.insert(info.getPipeline().getFactor(), id);
typeMap.insert(info.getPipeline().getType(), id);
factorMap.insert(info.getReplicationFactor(), id);
typeMap.insert(info.getReplicationType(), id);
LOG.trace("Created container with {} successfully.", id);
}
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.hadoop.hdds.scm.pipelines;
import java.util.LinkedList;
import java.util.Map;
import java.util.WeakHashMap;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
@ -25,7 +28,6 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@ -36,11 +38,13 @@ public abstract class PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineManager.class);
private final List<Pipeline> activePipelines;
private final Map<String, Pipeline> activePipelineMap;
private final AtomicInteger pipelineIndex;
public PipelineManager() {
activePipelines = new LinkedList<>();
pipelineIndex = new AtomicInteger(0);
activePipelineMap = new WeakHashMap<>();
}
/**
@ -76,6 +80,7 @@ public synchronized final Pipeline getPipeline(
"replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor);
activePipelines.add(pipeline);
activePipelineMap.put(pipeline.getPipelineName(), pipeline);
} else {
pipeline =
findOpenPipeline(replicationType, replicationFactor);
@ -94,6 +99,26 @@ public synchronized final Pipeline getPipeline(
}
}
/**
* This function to get pipeline with given pipeline name.
*
* @param pipelineName
* @return a Pipeline.
*/
public synchronized final Pipeline getPipeline(String pipelineName) {
Pipeline pipeline = null;
// 1. Check if pipeline channel already exists
if (activePipelineMap.containsKey(pipelineName)) {
pipeline = activePipelineMap.get(pipelineName);
LOG.debug("Returning pipeline for pipelineName:{}", pipelineName);
return pipeline;
} else {
LOG.debug("Unable to find pipeline for pipelineName:{}", pipelineName);
}
return pipeline;
}
protected int getReplicationCount(ReplicationFactor factor) {
switch (factor) {
case ONE:

View File

@ -19,6 +19,7 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
@ -176,6 +177,21 @@ public Pipeline getReplicationPipeline(ReplicationType replicationType,
getPipeline(replicationFactor, replicationType);
}
/**
* This function to return pipeline for given pipeline name and replication
* type.
*/
public Pipeline getPipeline(String pipelineName,
ReplicationType replicationType) throws IOException {
if (pipelineName == null) {
return null;
}
PipelineManager manager = getPipelineManager(replicationType);
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Getting replication pipeline forReplicationType {} :" +
" pipelineName:{}", replicationType, pipelineName);
return manager.getPipeline(pipelineName);
}
/**
* Creates a pipeline from a specified set of Nodes.
*/

View File

@ -19,6 +19,7 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm.pipelines.standalone;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;

View File

@ -31,6 +31,7 @@
.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
@ -145,11 +146,12 @@ public String getRpcRemoteUsername() {
}
@Override
public ContainerInfo allocateContainer(HddsProtos.ReplicationType
public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType
replicationType, HddsProtos.ReplicationFactor factor,
String owner) throws IOException {
String remoteUser = getRpcRemoteUsername();
getScm().checkAdminAccess(remoteUser);
return scm.getScmContainerManager()
.allocateContainer(replicationType, factor, owner);
}
@ -162,6 +164,14 @@ public ContainerInfo getContainer(long containerID) throws IOException {
.getContainer(containerID);
}
@Override
public ContainerWithPipeline getContainerWithPipeline(long containerID) throws IOException {
String remoteUser = getRpcRemoteUsername();
getScm().checkAdminAccess(remoteUser);
return scm.getScmContainerManager()
.getContainerWithPipeline(containerID);
}
@Override
public List<ContainerInfo> listContainer(long startContainerID,
int count) throws IOException {
@ -248,7 +258,7 @@ public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
throws IOException {
// TODO: will be addressed in future patch.
// This is needed only for debugging purposes to make sure cluster is
// working correctly.
// working correctly.
return null;
}

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.Mapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@ -362,10 +363,16 @@ private void mockContainerInfo(Mapping mappingService, long containerID,
pipeline.addMember(dd);
ContainerInfo.Builder builder = new ContainerInfo.Builder();
builder.setPipeline(pipeline);
builder.setPipelineName(pipeline.getPipelineName())
.setReplicationType(pipeline.getType())
.setReplicationFactor(pipeline.getFactor());
ContainerInfo conatinerInfo = builder.build();
Mockito.doReturn(conatinerInfo).when(mappingService)
ContainerInfo containerInfo = builder.build();
ContainerWithPipeline containerWithPipeline = new ContainerWithPipeline(
containerInfo, pipeline);
Mockito.doReturn(containerInfo).when(mappingService)
.getContainer(containerID);
Mockito.doReturn(containerWithPipeline).when(mappingService)
.getContainerWithPipeline(containerID);
}
}

View File

@ -21,7 +21,7 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
@ -97,7 +97,7 @@ public void testCloseContainerEventWithInvalidContainer() {
new ContainerID(id));
eventQueue.processAll(1000);
Assert.assertTrue(logCapturer.getOutput()
.contains("Container with id : " + id + " does not exist"));
.contains("Failed to update the container state"));
}
@Test
@ -105,11 +105,12 @@ public void testCloseContainerEventWithValidContainers() throws IOException {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(CloseContainerEventHandler.LOG);
ContainerInfo info = mapping
ContainerWithPipeline containerWithPipeline = mapping
.allocateContainer(HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.ONE, "ozone");
ContainerID id = new ContainerID(info.getContainerID());
DatanodeDetails datanode = info.getPipeline().getLeader();
ContainerID id = new ContainerID(
containerWithPipeline.getContainerInfo().getContainerID());
DatanodeDetails datanode = containerWithPipeline.getPipeline().getLeader();
int closeCount = nodeManager.getCommandCount(datanode);
eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT, id);
eventQueue.processAll(1000);
@ -125,7 +126,8 @@ public void testCloseContainerEventWithValidContainers() throws IOException {
mapping.updateContainerState(id.getId(), CREATE);
mapping.updateContainerState(id.getId(), CREATED);
eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT,
new ContainerID(info.getContainerID()));
new ContainerID(
containerWithPipeline.getContainerInfo().getContainerID()));
eventQueue.processAll(1000);
Assert.assertEquals(closeCount + 1, nodeManager.getCommandCount(datanode));
Assert.assertEquals(HddsProtos.LifeCycleState.CLOSING,
@ -137,20 +139,23 @@ public void testCloseContainerEventWithRatis() throws IOException {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(CloseContainerEventHandler.LOG);
ContainerInfo info = mapping
ContainerWithPipeline containerWithPipeline = mapping
.allocateContainer(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, "ozone");
ContainerID id = new ContainerID(info.getContainerID());
ContainerID id = new ContainerID(
containerWithPipeline.getContainerInfo().getContainerID());
int[] closeCount = new int[3];
eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT, id);
eventQueue.processAll(1000);
int i = 0;
for (DatanodeDetails details : info.getPipeline().getMachines()) {
for (DatanodeDetails details : containerWithPipeline.getPipeline()
.getMachines()) {
closeCount[i] = nodeManager.getCommandCount(details);
i++;
}
i = 0;
for (DatanodeDetails details : info.getPipeline().getMachines()) {
for (DatanodeDetails details : containerWithPipeline.getPipeline()
.getMachines()) {
Assert.assertEquals(closeCount[i], nodeManager.getCommandCount(details));
i++;
}
@ -161,12 +166,12 @@ public void testCloseContainerEventWithRatis() throws IOException {
//Execute these state transitions so that we can close the container.
mapping.updateContainerState(id.getId(), CREATE);
mapping.updateContainerState(id.getId(), CREATED);
eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT,
new ContainerID(info.getContainerID()));
eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT, id);
eventQueue.processAll(1000);
i = 0;
// Make sure close is queued for each datanode on the pipeline
for (DatanodeDetails details : info.getPipeline().getMachines()) {
for (DatanodeDetails details : containerWithPipeline.getPipeline()
.getMachines()) {
Assert.assertEquals(closeCount[i] + 1,
nodeManager.getCommandCount(details));
Assert.assertEquals(HddsProtos.LifeCycleState.CLOSING,

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@ -103,7 +104,7 @@ public void clearChillMode() {
@Test
public void testallocateContainer() throws Exception {
ContainerInfo containerInfo = mapping.allocateContainer(
ContainerWithPipeline containerInfo = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
containerOwner);
@ -120,7 +121,7 @@ public void testallocateContainerDistributesAllocation() throws Exception {
*/
Set<UUID> pipelineList = new TreeSet<>();
for (int x = 0; x < 30; x++) {
ContainerInfo containerInfo = mapping.allocateContainer(
ContainerWithPipeline containerInfo = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
containerOwner);
@ -135,14 +136,13 @@ public void testallocateContainerDistributesAllocation() throws Exception {
@Test
public void testGetContainer() throws IOException {
ContainerInfo containerInfo = mapping.allocateContainer(
ContainerWithPipeline containerInfo = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
containerOwner);
Pipeline pipeline = containerInfo.getPipeline();
Assert.assertNotNull(pipeline);
Pipeline newPipeline = mapping.getContainer(
containerInfo.getContainerID()).getPipeline();
Pipeline newPipeline = containerInfo.getPipeline();
Assert.assertEquals(pipeline.getLeader().getUuid(),
newPipeline.getLeader().getUuid());
}
@ -165,12 +165,12 @@ public void testChillModeAllocateContainerFails() throws IOException {
public void testContainerCreationLeaseTimeout() throws IOException,
InterruptedException {
nodeManager.setChillmode(false);
ContainerInfo containerInfo = mapping.allocateContainer(
ContainerWithPipeline containerInfo = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
containerOwner);
mapping.updateContainerState(containerInfo.getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
mapping.updateContainerState(containerInfo.getContainerInfo()
.getContainerID(), HddsProtos.LifeCycleEvent.CREATE);
Thread.sleep(TIMEOUT + 1000);
NavigableSet<ContainerID> deleteContainers = mapping.getStateManager()
@ -179,12 +179,14 @@ public void testContainerCreationLeaseTimeout() throws IOException,
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.DELETING);
Assert.assertTrue(deleteContainers.contains(containerInfo.containerID()));
Assert.assertTrue(deleteContainers
.contains(containerInfo.getContainerInfo().containerID()));
thrown.expect(IOException.class);
thrown.expectMessage("Lease Exception");
mapping.updateContainerState(containerInfo.getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
mapping
.updateContainerState(containerInfo.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
}
@Test
@ -294,10 +296,11 @@ public void testCloseContainer() throws IOException {
private ContainerInfo createContainer()
throws IOException {
nodeManager.setChillmode(false);
ContainerInfo containerInfo = mapping.allocateContainer(
ContainerWithPipeline containerWithPipeline = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
containerOwner);
ContainerInfo containerInfo = containerWithPipeline.getContainerInfo();
mapping.updateContainerState(containerInfo.getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
mapping.updateContainerState(containerInfo.getContainerID(),

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.TestContainerMapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@ -91,9 +92,10 @@ public static void tearDown() throws Exception {
@Test
public void testClose() throws IOException {
ContainerInfo info = mapping.allocateContainer(
ContainerWithPipeline containerWithPipeline = mapping.allocateContainer(
HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.ONE, "ozone");
ContainerInfo info = containerWithPipeline.getContainerInfo();
//Execute these state transitions so that we can close the container.
mapping.updateContainerState(info.getContainerID(), CREATE);
@ -101,7 +103,7 @@ public void testClose() throws IOException {
long currentCount = mapping.getCloser().getCloseCount();
long runCount = mapping.getCloser().getThreadRunCount();
DatanodeDetails datanode = info.getPipeline().getLeader();
DatanodeDetails datanode = containerWithPipeline.getPipeline().getLeader();
// Send a container report with used set to 1 GB. This should not close.
sendContainerReport(info, 1 * GIGABYTE);
@ -138,9 +140,10 @@ public void testRepeatedClose() throws IOException,
configuration.setTimeDuration(OZONE_CONTAINER_REPORT_INTERVAL, 1,
TimeUnit.SECONDS);
ContainerInfo info = mapping.allocateContainer(
ContainerWithPipeline containerWithPipeline = mapping.allocateContainer(
HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.ONE, "ozone");
ContainerInfo info = containerWithPipeline.getContainerInfo();
//Execute these state transitions so that we can close the container.
mapping.updateContainerState(info.getContainerID(), CREATE);
@ -148,10 +151,10 @@ public void testRepeatedClose() throws IOException,
long currentCount = mapping.getCloser().getCloseCount();
long runCount = mapping.getCloser().getThreadRunCount();
DatanodeDetails datanodeDetails = containerWithPipeline.getPipeline()
.getLeader();
DatanodeDetails datanodeDetails = info.getPipeline().getLeader();
// Send this command twice and assert we have only one command in the queue.
// Send this command twice and assert we have only one command in queue.
sendContainerReport(info, 5 * GIGABYTE);
sendContainerReport(info, 5 * GIGABYTE);
@ -183,9 +186,10 @@ public void testCleanupThreadRuns() throws IOException,
long runCount = mapping.getCloser().getThreadRunCount();
for (int x = 0; x < ContainerCloser.getCleanupWaterMark() + 10; x++) {
ContainerInfo info = mapping.allocateContainer(
ContainerWithPipeline containerWithPipeline = mapping.allocateContainer(
HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.ONE, "ozone");
ContainerInfo info = containerWithPipeline.getContainerInfo();
mapping.updateContainerState(info.getContainerID(), CREATE);
mapping.updateContainerState(info.getContainerID(), CREATED);
sendContainerReport(info, 5 * GIGABYTE);

View File

@ -25,7 +25,7 @@
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
@ -151,11 +151,11 @@ public void testContainerPlacementCapacity() throws IOException,
assertTrue(nodeManager.isOutOfChillMode());
ContainerInfo containerInfo = containerManager.allocateContainer(
ContainerWithPipeline containerWithPipeline = containerManager.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), "OZONE");
assertEquals(xceiverClientManager.getFactor().getNumber(),
containerInfo.getPipeline().getMachines().size());
containerWithPipeline.getPipeline().getMachines().size());
} finally {
IOUtils.closeQuietly(containerManager);
IOUtils.closeQuietly(nodeManager);

View File

@ -24,9 +24,9 @@
import org.apache.hadoop.hdds.scm.cli.OzoneCommandHandler;
import org.apache.hadoop.hdds.scm.cli.SCMCLI;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import java.io.IOException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
/**
* The handler of close container command.
@ -51,15 +51,15 @@ public void execute(CommandLine cmd) throws IOException {
}
String containerID = cmd.getOptionValue(OPT_CONTAINER_ID);
ContainerInfo container = getScmClient().
getContainer(Long.parseLong(containerID));
ContainerWithPipeline container = getScmClient().
getContainerWithPipeline(Long.parseLong(containerID));
if (container == null) {
throw new IOException("Cannot close an non-exist container "
+ containerID);
}
logOut("Closing container : %s.", containerID);
getScmClient().closeContainer(container.getContainerID(),
container.getPipeline());
getScmClient()
.closeContainer(container.getContainerInfo().getContainerID());
logOut("Container closed.");
}

View File

@ -25,9 +25,9 @@
import org.apache.commons.cli.Options;
import org.apache.hadoop.hdds.scm.cli.OzoneCommandHandler;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import java.io.IOException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import static org.apache.hadoop.hdds.scm.cli.SCMCLI.CMD_WIDTH;
import static org.apache.hadoop.hdds.scm.cli.SCMCLI.HELP_OP;
@ -60,7 +60,7 @@ public void execute(CommandLine cmd) throws IOException {
String containerID = cmd.getOptionValue(OPT_CONTAINER_ID);
ContainerInfo container = getScmClient().getContainer(
ContainerWithPipeline container = getScmClient().getContainerWithPipeline(
Long.parseLong(containerID));
if (container == null) {
throw new IOException("Cannot delete an non-exist container "
@ -68,8 +68,9 @@ public void execute(CommandLine cmd) throws IOException {
}
logOut("Deleting container : %s.", containerID);
getScmClient().deleteContainer(container.getContainerID(),
container.getPipeline(), cmd.hasOption(OPT_FORCE));
getScmClient()
.deleteContainer(container.getContainerInfo().getContainerID(),
container.getPipeline(), cmd.hasOption(OPT_FORCE));
logOut("Container %s deleted.", containerID);
}

View File

@ -24,7 +24,6 @@
import org.apache.commons.cli.Options;
import org.apache.hadoop.hdds.scm.cli.OzoneCommandHandler;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerData;
@ -33,6 +32,7 @@
import java.io.IOException;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import static org.apache.hadoop.hdds.scm.cli.SCMCLI.CMD_WIDTH;
import static org.apache.hadoop.hdds.scm.cli.SCMCLI.HELP_OP;
@ -68,13 +68,12 @@ public void execute(CommandLine cmd) throws IOException {
}
}
String containerID = cmd.getOptionValue(OPT_CONTAINER_ID);
ContainerInfo container = getScmClient().
getContainer(Long.parseLong(containerID));
ContainerWithPipeline container = getScmClient().
getContainerWithPipeline(Long.parseLong(containerID));
Preconditions.checkNotNull(container, "Container cannot be null");
ContainerData containerData =
getScmClient().readContainer(container.getContainerID(),
container.getPipeline());
ContainerData containerData = getScmClient().readContainer(container
.getContainerInfo().getContainerID(), container.getPipeline());
// Print container report info.
logOut("Container id: %s", containerID);

View File

@ -21,8 +21,8 @@
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
@ -271,17 +271,17 @@ public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo,
KsmKeyLocationInfo ksmKeyLocationInfo = keyLocationInfos.get(i);
BlockID blockID = ksmKeyLocationInfo.getBlockID();
long containerID = blockID.getContainerID();
ContainerInfo container =
storageContainerLocationClient.getContainer(containerID);
XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(container.getPipeline(), containerID);
ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.getContainerWithPipeline(containerID);
XceiverClientSpi xceiverClient = xceiverClientManager
.acquireClient(containerWithPipeline.getPipeline(), containerID);
boolean success = false;
containerKey = ksmKeyLocationInfo.getLocalID();
try {
LOG.debug("get key accessing {} {}",
containerID, containerKey);
groupInputStream.streamOffset[i] = length;
ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation
ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation
.containerKeyDataForRead(blockID);
ContainerProtos.GetKeyResponseProto response = ContainerProtocolCalls
.getKey(xceiverClient, containerKeyData, requestId);
@ -292,7 +292,8 @@ public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo,
}
success = true;
ChunkInputStream inputStream = new ChunkInputStream(
ksmKeyLocationInfo.getBlockID(), xceiverClientManager, xceiverClient,
ksmKeyLocationInfo.getBlockID(), xceiverClientManager,
xceiverClient,
chunks, requestId);
groupInputStream.addStream(inputStream,
ksmKeyLocationInfo.getLength());

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@ -163,10 +164,12 @@ public void addPreallocateBlocks(KsmKeyLocationInfoGroup version,
private void checkKeyLocationInfo(KsmKeyLocationInfo subKeyInfo)
throws IOException {
ContainerInfo container = scmClient.getContainer(
subKeyInfo.getContainerID());
ContainerWithPipeline containerWithPipeline = scmClient
.getContainerWithPipeline(subKeyInfo.getContainerID());
ContainerInfo container = containerWithPipeline.getContainerInfo();
XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(container.getPipeline(),
xceiverClientManager.acquireClient(containerWithPipeline.getPipeline(),
container.getContainerID());
// create container if needed
if (subKeyInfo.getShouldCreateContainer()) {

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocolPB;
/**
* Helper class for converting protobuf objects.
*/
public final class OzonePBHelper {
private OzonePBHelper() {
/** Hidden constructor */
}
}

View File

@ -18,6 +18,7 @@
import com.google.common.primitives.Longs;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@ -30,7 +31,6 @@
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
@ -71,31 +71,35 @@ public void cleanUp() {
@Test
public void testAllocateContainer() throws IOException {
// Allocate a container and verify the container info
ContainerInfo container1 = scm.getClientProtocolServer().allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
ContainerWithPipeline container1 = scm.getClientProtocolServer()
.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
ContainerInfo info = containerStateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(container1.getContainerID(), info.getContainerID());
Assert.assertEquals(container1.getContainerInfo().getContainerID(),
info.getContainerID());
Assert.assertEquals(OzoneConsts.GB * 3, info.getAllocatedBytes());
Assert.assertEquals(containerOwner, info.getOwner());
Assert.assertEquals(xceiverClientManager.getType(),
info.getPipeline().getType());
info.getReplicationType());
Assert.assertEquals(xceiverClientManager.getFactor(),
info.getPipeline().getFactor());
info.getReplicationFactor());
Assert.assertEquals(HddsProtos.LifeCycleState.ALLOCATED, info.getState());
// Check there are two containers in ALLOCATED state after allocation
ContainerInfo container2 = scm.getClientProtocolServer().allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
ContainerWithPipeline container2 = scm.getClientProtocolServer()
.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
int numContainers = containerStateManager
.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED).size();
Assert.assertNotEquals(container1.getContainerID(), container2.getContainerID());
Assert.assertNotEquals(container1.getContainerInfo().getContainerID(),
container2.getContainerInfo().getContainerID());
Assert.assertEquals(2, numContainers);
}
@ -105,14 +109,15 @@ public void testContainerStateManagerRestart() throws IOException {
List<ContainerInfo> containers = new ArrayList<>();
for (int i = 0; i < 10; i++) {
ContainerInfo container = scm.getClientProtocolServer().allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
containers.add(container);
ContainerWithPipeline container = scm.getClientProtocolServer()
.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
containers.add(container.getContainerInfo());
if (i >= 5) {
scm.getScmContainerManager()
.updateContainerState(container.getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
scm.getScmContainerManager().updateContainerState(container
.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
}
}
@ -134,34 +139,40 @@ public void testContainerStateManagerRestart() throws IOException {
@Test
public void testGetMatchingContainer() throws IOException {
ContainerInfo container1 = scm.getClientProtocolServer().
ContainerWithPipeline container1 = scm.getClientProtocolServer().
allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
scmContainerMapping.updateContainerState(container1.getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
scmContainerMapping.updateContainerState(container1.getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
xceiverClientManager.getFactor(), containerOwner);
scmContainerMapping
.updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
scmContainerMapping
.updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
ContainerInfo container2 = scm.getClientProtocolServer().
ContainerWithPipeline container2 = scm.getClientProtocolServer().
allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
xceiverClientManager.getFactor(), containerOwner);
ContainerInfo info = containerStateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.OPEN);
Assert.assertEquals(container1.getContainerID(), info.getContainerID());
Assert.assertEquals(container1.getContainerInfo().getContainerID(),
info.getContainerID());
info = containerStateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(container2.getContainerID(), info.getContainerID());
Assert.assertEquals(container2.getContainerInfo().getContainerID(),
info.getContainerID());
scmContainerMapping.updateContainerState(container2.getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
scmContainerMapping.updateContainerState(container2.getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
scmContainerMapping
.updateContainerState(container2.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
scmContainerMapping
.updateContainerState(container2.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
// space has already been allocated in container1, now container 2 should
// be chosen.
@ -169,7 +180,8 @@ public void testGetMatchingContainer() throws IOException {
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.OPEN);
Assert.assertEquals(container2.getContainerID(), info.getContainerID());
Assert.assertEquals(container2.getContainerInfo().getContainerID(),
info.getContainerID());
}
@Test
@ -183,30 +195,33 @@ public void testUpdateContainerState() throws IOException {
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// OPEN -> CLOSING -> CLOSED -> DELETING -> DELETED
ContainerInfo container1 = scm.getClientProtocolServer().allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
ContainerWithPipeline container1 = scm.getClientProtocolServer()
.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED).size();
Assert.assertEquals(1, containers);
scmContainerMapping.updateContainerState(container1.getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
scmContainerMapping
.updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.CREATING).size();
Assert.assertEquals(1, containers);
scmContainerMapping.updateContainerState(container1.getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
scmContainerMapping
.updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.OPEN).size();
Assert.assertEquals(1, containers);
scmContainerMapping
.updateContainerState(container1.getContainerID(),
.updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.FINALIZE);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
@ -214,7 +229,7 @@ public void testUpdateContainerState() throws IOException {
Assert.assertEquals(1, containers);
scmContainerMapping
.updateContainerState(container1.getContainerID(),
.updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CLOSE);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
@ -222,7 +237,7 @@ public void testUpdateContainerState() throws IOException {
Assert.assertEquals(1, containers);
scmContainerMapping
.updateContainerState(container1.getContainerID(),
.updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.DELETE);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
@ -230,7 +245,7 @@ public void testUpdateContainerState() throws IOException {
Assert.assertEquals(1, containers);
scmContainerMapping
.updateContainerState(container1.getContainerID(),
.updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CLEANUP);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
@ -239,13 +254,15 @@ public void testUpdateContainerState() throws IOException {
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// DELETING
ContainerInfo container2 = scm.getClientProtocolServer().allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
scmContainerMapping.updateContainerState(container2.getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
ContainerWithPipeline container2 = scm.getClientProtocolServer()
.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
scmContainerMapping
.updateContainerState(container2.getContainerID(),
.updateContainerState(container2.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
scmContainerMapping
.updateContainerState(container2.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.TIMEOUT);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
@ -254,17 +271,21 @@ public void testUpdateContainerState() throws IOException {
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// OPEN -> CLOSING -> CLOSED
ContainerInfo container3 = scm.getClientProtocolServer().allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
scmContainerMapping.updateContainerState(container3.getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
scmContainerMapping.updateContainerState(container3.getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
scmContainerMapping.updateContainerState(container3.getContainerID(),
HddsProtos.LifeCycleEvent.FINALIZE);
ContainerWithPipeline container3 = scm.getClientProtocolServer()
.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
scmContainerMapping
.updateContainerState(container3.getContainerID(),
.updateContainerState(container3.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
scmContainerMapping
.updateContainerState(container3.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
scmContainerMapping
.updateContainerState(container3.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.FINALIZE);
scmContainerMapping
.updateContainerState(container3.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CLOSE);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
@ -274,12 +295,14 @@ public void testUpdateContainerState() throws IOException {
@Test
public void testUpdatingAllocatedBytes() throws Exception {
ContainerInfo container1 = scm.getClientProtocolServer().allocateContainer(
xceiverClientManager.getType(),
ContainerWithPipeline container1 = scm.getClientProtocolServer()
.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
scmContainerMapping.updateContainerState(container1.getContainerID(),
scmContainerMapping.updateContainerState(container1
.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
scmContainerMapping.updateContainerState(container1.getContainerID(),
scmContainerMapping.updateContainerState(container1
.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
Random ran = new Random();
@ -292,18 +315,18 @@ public void testUpdatingAllocatedBytes() throws Exception {
.getMatchingContainer(size, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.OPEN);
Assert.assertEquals(container1.getContainerID(), info.getContainerID());
Assert.assertEquals(container1.getContainerInfo().getContainerID(),
info.getContainerID());
ContainerMapping containerMapping =
(ContainerMapping)scmContainerMapping;
(ContainerMapping) scmContainerMapping;
// manually trigger a flush, this will persist the allocated bytes value
// to disk
containerMapping.flushContainerInfo();
// the persisted value should always be equal to allocated size.
byte[] containerBytes =
containerMapping.getContainerStore().get(
Longs.toByteArray(container1.getContainerID()));
byte[] containerBytes = containerMapping.getContainerStore().get(
Longs.toByteArray(container1.getContainerInfo().getContainerID()));
HddsProtos.SCMContainerInfo infoProto =
HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
ContainerInfo currentInfo = ContainerInfo.fromProtobuf(infoProto);

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.ozone;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@ -28,7 +28,6 @@
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.client.ContainerOperationClient;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.junit.AfterClass;
@ -78,12 +77,12 @@ public static void cleanup() throws Exception {
*/
@Test
public void testCreate() throws Exception {
ContainerInfo container = storageClient.createContainer(HddsProtos
ContainerWithPipeline container = storageClient.createContainer(HddsProtos
.ReplicationType.STAND_ALONE, HddsProtos.ReplicationFactor
.ONE, "OZONE");
assertEquals(container.getContainerID(),
storageClient.getContainer(container.getContainerID()).
getContainerID());
assertEquals(container.getContainerInfo().getContainerID(), storageClient
.getContainer(container.getContainerInfo().getContainerID())
.getContainerID());
}
}

View File

@ -22,7 +22,7 @@
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@ -131,7 +131,7 @@ private void testRpcPermissionWithConf(
}
try {
ContainerInfo container2 = mockClientServer
ContainerWithPipeline container2 = mockClientServer
.allocateContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, "OZONE");
if (expectPermissionDenied) {
@ -144,7 +144,7 @@ private void testRpcPermissionWithConf(
}
try {
ContainerInfo container3 = mockClientServer
ContainerWithPipeline container3 = mockClientServer
.allocateContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, "OZONE");
if (expectPermissionDenied) {

View File

@ -23,7 +23,7 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -158,9 +158,11 @@ public List<Long> getAllBlocks(Long containeID) throws IOException {
private MetadataStore getContainerMetadata(Long containerID)
throws IOException {
ContainerInfo container = cluster.getStorageContainerManager()
.getClientProtocolServer().getContainer(containerID);
DatanodeDetails leadDN = container.getPipeline().getLeader();
ContainerWithPipeline containerWithPipeline = cluster
.getStorageContainerManager().getClientProtocolServer()
.getContainerWithPipeline(containerID);
DatanodeDetails leadDN = containerWithPipeline.getPipeline().getLeader();
OzoneContainer containerServer =
getContainerServerByDatanodeUuid(leadDN.getUuidString());
ContainerData containerData = containerServer.getContainerManager()

View File

@ -390,8 +390,8 @@ private boolean verifyRatisReplication(String volumeName, String bucketName,
keyInfo.getLatestVersionLocations().getLocationList()) {
ContainerInfo container =
storageContainerLocationClient.getContainer(info.getContainerID());
if ((container.getPipeline().getFactor() != replicationFactor) ||
(container.getPipeline().getType() != replicationType)) {
if (!container.getReplicationFactor().equals(replicationFactor) || (
container.getReplicationType() != replicationType)) {
return false;
}
}

View File

@ -23,8 +23,6 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@ -35,7 +33,6 @@
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
@ -112,9 +109,9 @@ public void testIfCloseContainerCommandHandlerIsInvoked() throws Exception {
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = ksmKeyLocationInfo.getContainerID();
List<DatanodeDetails> datanodes =
cluster.getStorageContainerManager().getContainerInfo(containerID)
.getPipeline().getMachines();
List<DatanodeDetails> datanodes = cluster.getStorageContainerManager()
.getScmContainerManager().getContainerWithPipeline(containerID)
.getPipeline().getMachines();
Assert.assertTrue(datanodes.size() == 1);
DatanodeDetails datanodeDetails = datanodes.get(0);
@ -167,9 +164,9 @@ public void testCloseContainerViaStandaAlone()
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = ksmKeyLocationInfo.getContainerID();
List<DatanodeDetails> datanodes =
cluster.getStorageContainerManager().getContainerInfo(containerID)
.getPipeline().getMachines();
List<DatanodeDetails> datanodes = cluster.getStorageContainerManager()
.getScmContainerManager().getContainerWithPipeline(containerID)
.getPipeline().getMachines();
Assert.assertTrue(datanodes.size() == 1);
DatanodeDetails datanodeDetails = datanodes.get(0);
@ -220,9 +217,9 @@ public void testCloseContainerViaRatis() throws IOException,
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = ksmKeyLocationInfo.getContainerID();
List<DatanodeDetails> datanodes =
cluster.getStorageContainerManager().getContainerInfo(containerID)
.getPipeline().getMachines();
List<DatanodeDetails> datanodes = cluster.getStorageContainerManager()
.getScmContainerManager().getContainerWithPipeline(containerID)
.getPipeline().getMachines();
Assert.assertTrue(datanodes.size() == 3);
GenericTestUtils.LogCapturer logCapturer =

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
@ -32,7 +33,6 @@
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.junit.AfterClass;
import org.junit.BeforeClass;

View File

@ -17,14 +17,12 @@
*/
package org.apache.hadoop.ozone.scm;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -68,7 +66,7 @@ public static void shutdown() throws InterruptedException {
@Test
public void testAllocate() throws Exception {
ContainerInfo container = storageContainerLocationClient.allocateContainer(
ContainerWithPipeline container = storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
containerOwner);

View File

@ -19,7 +19,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@ -81,17 +81,18 @@ public static void shutdown() throws InterruptedException {
@Test
public void testAllocateWrite() throws Exception {
String traceID = UUID.randomUUID().toString();
ContainerInfo container =
ContainerWithPipeline container =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
XceiverClientSpi client = xceiverClientManager.acquireClient(
container.getPipeline(), container.getContainerID());
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline(),
container.getContainerInfo().getContainerID());
ContainerProtocolCalls.createContainer(client,
container.getContainerID(), traceID);
container.getContainerInfo().getContainerID(), traceID);
BlockID blockID = ContainerTestHelper.getTestBlockID(
container.getContainerID());
container.getContainerInfo().getContainerID());
ContainerProtocolCalls.writeSmallFile(client, blockID,
"data123".getBytes(), traceID);
ContainerProtos.GetSmallFileResponseProto response =
@ -104,20 +105,21 @@ public void testAllocateWrite() throws Exception {
@Test
public void testInvalidKeyRead() throws Exception {
String traceID = UUID.randomUUID().toString();
ContainerInfo container =
ContainerWithPipeline container =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
XceiverClientSpi client = xceiverClientManager.acquireClient(
container.getPipeline(), container.getContainerID());
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline(),
container.getContainerInfo().getContainerID());
ContainerProtocolCalls.createContainer(client,
container.getContainerID(), traceID);
container.getContainerInfo().getContainerID(), traceID);
thrown.expect(StorageContainerException.class);
thrown.expectMessage("Unable to find the key");
BlockID blockID = ContainerTestHelper.getTestBlockID(
container.getContainerID());
container.getContainerInfo().getContainerID());
// Try to read a Key Container Name
ContainerProtos.GetSmallFileResponseProto response =
ContainerProtocolCalls.readSmallFile(client, blockID, traceID);
@ -128,20 +130,20 @@ public void testInvalidKeyRead() throws Exception {
public void testInvalidContainerRead() throws Exception {
String traceID = UUID.randomUUID().toString();
long nonExistContainerID = 8888L;
ContainerInfo container =
ContainerWithPipeline container =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
XceiverClientSpi client = xceiverClientManager.
acquireClient(container.getPipeline(), container.getContainerID());
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline(),
container.getContainerInfo().getContainerID());
ContainerProtocolCalls.createContainer(client,
container.getContainerID(), traceID);
container.getContainerInfo().getContainerID(), traceID);
BlockID blockID = ContainerTestHelper.getTestBlockID(
container.getContainerID());
container.getContainerInfo().getContainerID());
ContainerProtocolCalls.writeSmallFile(client, blockID,
"data123".getBytes(), traceID);
thrown.expect(StorageContainerException.class);
thrown.expectMessage("Unable to find the container");

View File

@ -19,6 +19,7 @@
import com.google.common.primitives.Longs;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@ -136,7 +137,7 @@ public void testCreateContainer() throws Exception {
private boolean containerExist(long containerID) {
try {
ContainerInfo container = scm.getClientProtocolServer()
.getContainer(containerID);
.getContainerWithPipeline(containerID).getContainerInfo();
return container != null
&& containerID == container.getContainerID();
} catch (IOException e) {
@ -157,31 +158,34 @@ public void testDeleteContainer() throws Exception {
// 1. Test to delete a non-empty container.
// ****************************************
// Create an non-empty container
ContainerInfo container = containerOperationClient
ContainerWithPipeline container = containerOperationClient
.createContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
ContainerData cdata = ContainerData
.getFromProtBuf(containerOperationClient.readContainer(
container.getContainerID(), container.getPipeline()), conf);
KeyUtils.getDB(cdata, conf).put(Longs.toByteArray(container.getContainerID()),
"someKey".getBytes());
Assert.assertTrue(containerExist(container.getContainerID()));
container.getContainerInfo().getContainerID()), conf);
KeyUtils.getDB(cdata, conf)
.put(Longs.toByteArray(container.getContainerInfo().getContainerID()),
"someKey".getBytes());
Assert.assertTrue(
containerExist(container.getContainerInfo().getContainerID()));
// Gracefully delete a container should fail because it is open.
delCmd = new String[] {"-container", "-delete", "-c",
Long.toString(container.getContainerID())};
delCmd = new String[]{"-container", "-delete", "-c",
Long.toString(container.getContainerInfo().getContainerID())};
testErr = new ByteArrayOutputStream();
ByteArrayOutputStream out = new ByteArrayOutputStream();
exitCode = runCommandAndGetOutput(delCmd, out, testErr);
assertEquals(EXECUTION_ERROR, exitCode);
assertTrue(testErr.toString()
.contains("Deleting an open container is not allowed."));
Assert.assertTrue(containerExist(container.getContainerID()));
Assert.assertTrue(
containerExist(container.getContainerInfo().getContainerID()));
// Close the container
containerOperationClient.closeContainer(
container.getContainerID(), container.getPipeline());
container.getContainerInfo().getContainerID());
// Gracefully delete a container should fail because it is not empty.
testErr = new ByteArrayOutputStream();
@ -189,45 +193,49 @@ public void testDeleteContainer() throws Exception {
assertEquals(EXECUTION_ERROR, exitCode2);
assertTrue(testErr.toString()
.contains("Container cannot be deleted because it is not empty."));
Assert.assertTrue(containerExist(container.getContainerID()));
Assert.assertTrue(
containerExist(container.getContainerInfo().getContainerID()));
// Try force delete again.
delCmd = new String[] {"-container", "-delete", "-c",
Long.toString(container.getContainerID()), "-f"};
delCmd = new String[]{"-container", "-delete", "-c",
Long.toString(container.getContainerInfo().getContainerID()), "-f"};
exitCode = runCommandAndGetOutput(delCmd, out, null);
assertEquals("Expected success, found:", ResultCode.SUCCESS, exitCode);
assertFalse(containerExist(container.getContainerID()));
assertFalse(containerExist(container.getContainerInfo().getContainerID()));
// ****************************************
// 2. Test to delete an empty container.
// ****************************************
// Create an empty container
ContainerInfo emptyContainer = containerOperationClient
ContainerWithPipeline emptyContainer = containerOperationClient
.createContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
containerOperationClient.closeContainer(emptyContainer.getContainerID(),
container.getPipeline());
Assert.assertTrue(containerExist(emptyContainer.getContainerID()));
containerOperationClient
.closeContainer(emptyContainer.getContainerInfo().getContainerID());
Assert.assertTrue(
containerExist(emptyContainer.getContainerInfo().getContainerID()));
// Successfully delete an empty container.
delCmd = new String[] {"-container", "-delete", "-c",
Long.toString(emptyContainer.getContainerID())};
delCmd = new String[]{"-container", "-delete", "-c",
Long.toString(emptyContainer.getContainerInfo().getContainerID())};
exitCode = runCommandAndGetOutput(delCmd, out, null);
assertEquals(ResultCode.SUCCESS, exitCode);
assertFalse(containerExist(emptyContainer.getContainerID()));
assertFalse(
containerExist(emptyContainer.getContainerInfo().getContainerID()));
// After the container is deleted,
// another container can now be recreated.
ContainerInfo newContainer = containerOperationClient.
ContainerWithPipeline newContainer = containerOperationClient.
createContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
Assert.assertTrue(containerExist(newContainer.getContainerID()));
Assert.assertTrue(
containerExist(newContainer.getContainerInfo().getContainerID()));
// ****************************************
// 3. Test to delete a non-exist container.
// ****************************************
long nonExistContainerID = ContainerTestHelper.getTestContainerID();
delCmd = new String[] {"-container", "-delete", "-c",
long nonExistContainerID = ContainerTestHelper.getTestContainerID();
delCmd = new String[]{"-container", "-delete", "-c",
Long.toString(nonExistContainerID)};
testErr = new ByteArrayOutputStream();
exitCode = runCommandAndGetOutput(delCmd, out, testErr);
@ -250,45 +258,33 @@ public void testInfoContainer() throws Exception {
"LeaderID: %s\n" +
"Datanodes: [%s]\n";
String formatStrWithHash =
"Container id: %s\n" +
"Container State: %s\n" +
"Container Hash: %s\n" +
"Container DB Path: %s\n" +
"Container Path: %s\n" +
"Container Metadata: {%s}\n" +
"LeaderID: %s\n" +
"Datanodes: [%s]\n";
// Test a non-exist container
String containerID =
Long.toString(ContainerTestHelper.getTestContainerID());
String[] info = { "-container", "-info", containerID };
String[] info = {"-container", "-info", containerID};
int exitCode = runCommandAndGetOutput(info, null, null);
assertEquals("Expected Execution Error, Did not find that.",
EXECUTION_ERROR, exitCode);
// Create an empty container.
ContainerInfo container = containerOperationClient
ContainerWithPipeline container = containerOperationClient
.createContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
ContainerData data = ContainerData
.getFromProtBuf(containerOperationClient.
readContainer(container.getContainerID(),
container.getPipeline()), conf);
ContainerData data = ContainerData.getFromProtBuf(containerOperationClient
.readContainer(container.getContainerInfo().getContainerID()), conf);
info = new String[] { "-container", "-info", "-c",
Long.toString(container.getContainerID()) };
info = new String[]{"-container", "-info", "-c",
Long.toString(container.getContainerInfo().getContainerID())};
ByteArrayOutputStream out = new ByteArrayOutputStream();
exitCode = runCommandAndGetOutput(info, out, null);
assertEquals("Expected Success, did not find it.", ResultCode.SUCCESS,
exitCode);
exitCode);
String openStatus = data.isOpen() ? "OPEN" : "CLOSED";
String expected =
String.format(formatStr, container.getContainerID(), openStatus,
data.getDBPath(), data.getContainerPath(), "",
datanodeDetails.getHostName(), datanodeDetails.getHostName());
String expected = String.format(formatStr, container.getContainerInfo()
.getContainerID(), openStatus, data.getDBPath(),
data.getContainerPath(), "", datanodeDetails.getHostName(),
datanodeDetails.getHostName());
assertEquals(expected, out.toString());
out.reset();
@ -299,40 +295,39 @@ public void testInfoContainer() throws Exception {
HddsProtos.ReplicationFactor.ONE, containerOwner);
data = ContainerData
.getFromProtBuf(containerOperationClient.readContainer(
container.getContainerID(), container.getPipeline()), conf);
container.getContainerInfo().getContainerID()), conf);
KeyUtils.getDB(data, conf)
.put(containerID.getBytes(), "someKey".getBytes());
info = new String[] { "-container", "-info", "-c",
Long.toString(container.getContainerID()) };
info = new String[]{"-container", "-info", "-c",
Long.toString(container.getContainerInfo().getContainerID())};
exitCode = runCommandAndGetOutput(info, out, null);
assertEquals(ResultCode.SUCCESS, exitCode);
openStatus = data.isOpen() ? "OPEN" : "CLOSED";
expected = String.format(formatStr, container.getContainerID(), openStatus,
data.getDBPath(), data.getContainerPath(), "",
datanodeDetails.getHostName(), datanodeDetails.getHostName());
expected = String.format(formatStr, container.getContainerInfo().
getContainerID(), openStatus, data.getDBPath(),
data.getContainerPath(), "", datanodeDetails.getHostName(),
datanodeDetails.getHostName());
assertEquals(expected, out.toString());
out.reset();
// Close last container and test info again.
containerOperationClient.closeContainer(
container.getContainerID(), container.getPipeline());
containerOperationClient
.closeContainer(container.getContainerInfo().getContainerID());
info = new String[] { "-container", "-info", "-c",
Long.toString(container.getContainerID()) };
info = new String[]{"-container", "-info", "-c",
Long.toString(container.getContainerInfo().getContainerID())};
exitCode = runCommandAndGetOutput(info, out, null);
assertEquals(ResultCode.SUCCESS, exitCode);
data = ContainerData
.getFromProtBuf(containerOperationClient.readContainer(
container.getContainerID(), container.getPipeline()), conf);
data = ContainerData.getFromProtBuf(containerOperationClient
.readContainer(container.getContainerInfo().getContainerID()), conf);
openStatus = data.isOpen() ? "OPEN" : "CLOSED";
expected = String
.format(formatStr, container.getContainerID(), openStatus,
data.getDBPath(), data.getContainerPath(), "",
.format(formatStr, container.getContainerInfo().getContainerID(),
openStatus, data.getDBPath(), data.getContainerPath(), "",
datanodeDetails.getHostName(), datanodeDetails.getHostName());
assertEquals(expected, out.toString());
}
@ -360,10 +355,10 @@ public void testListContainerCommand() throws Exception {
// Create 20 containers for testing.
List<ContainerInfo> containers = new ArrayList<>();
for (int index = 0; index < 20; index++) {
ContainerInfo container = containerOperationClient.createContainer(
ContainerWithPipeline container = containerOperationClient.createContainer(
xceiverClientManager.getType(), HddsProtos.ReplicationFactor.ONE,
containerOwner);
containers.add(container);
containers.add(container.getContainerInfo());
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
@ -417,11 +412,11 @@ public void testListContainerCommand() throws Exception {
@Test
public void testCloseContainer() throws Exception {
long containerID = containerOperationClient
.createContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner).getContainerID();
long containerID = containerOperationClient.createContainer(
xceiverClientManager.getType(), HddsProtos.ReplicationFactor.ONE,
containerOwner).getContainerInfo().getContainerID();
ContainerInfo container = scm.getClientProtocolServer()
.getContainer(containerID);
.getContainerWithPipeline(containerID).getContainerInfo();
assertNotNull(container);
assertEquals(containerID, container.getContainerID());

View File

@ -20,7 +20,7 @@
import com.google.common.cache.Cache;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@ -98,22 +98,25 @@ public void testCaching() throws IOException {
shouldUseGrpc);
XceiverClientManager clientManager = new XceiverClientManager(conf);
ContainerInfo container1 = storageContainerLocationClient
ContainerWithPipeline container1 = storageContainerLocationClient
.allocateContainer(clientManager.getType(), clientManager.getFactor(),
containerOwner);
XceiverClientSpi client1 = clientManager.acquireClient(container1.getPipeline(),
container1.getContainerID());
XceiverClientSpi client1 = clientManager
.acquireClient(container1.getPipeline(),
container1.getContainerInfo().getContainerID());
Assert.assertEquals(1, client1.getRefcount());
ContainerInfo container2 = storageContainerLocationClient
ContainerWithPipeline container2 = storageContainerLocationClient
.allocateContainer(clientManager.getType(), clientManager.getFactor(),
containerOwner);
XceiverClientSpi client2 = clientManager.acquireClient(container2.getPipeline(),
container2.getContainerID());
XceiverClientSpi client2 = clientManager
.acquireClient(container2.getPipeline(),
container2.getContainerInfo().getContainerID());
Assert.assertEquals(1, client2.getRefcount());
XceiverClientSpi client3 = clientManager.acquireClient(container1.getPipeline(),
container1.getContainerID());
XceiverClientSpi client3 = clientManager
.acquireClient(container1.getPipeline(),
container1.getContainerInfo().getContainerID());
Assert.assertEquals(2, client3.getRefcount());
Assert.assertEquals(2, client1.getRefcount());
Assert.assertEquals(client1, client3);
@ -132,32 +135,35 @@ public void testFreeByReference() throws IOException {
Cache<Long, XceiverClientSpi> cache =
clientManager.getClientCache();
ContainerInfo container1 =
ContainerWithPipeline container1 =
storageContainerLocationClient.allocateContainer(
clientManager.getType(), HddsProtos.ReplicationFactor.ONE,
containerOwner);
XceiverClientSpi client1 = clientManager.acquireClient(container1.getPipeline(),
container1.getContainerID());
XceiverClientSpi client1 = clientManager
.acquireClient(container1.getPipeline(),
container1.getContainerInfo().getContainerID());
Assert.assertEquals(1, client1.getRefcount());
Assert.assertEquals(container1.getPipeline(),
client1.getPipeline());
ContainerInfo container2 =
ContainerWithPipeline container2 =
storageContainerLocationClient.allocateContainer(
clientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
XceiverClientSpi client2 = clientManager.acquireClient(container2.getPipeline(),
container2.getContainerID());
XceiverClientSpi client2 = clientManager
.acquireClient(container2.getPipeline(),
container2.getContainerInfo().getContainerID());
Assert.assertEquals(1, client2.getRefcount());
Assert.assertNotEquals(client1, client2);
// least recent container (i.e containerName1) is evicted
XceiverClientSpi nonExistent1 = cache.getIfPresent(container1.getContainerID());
XceiverClientSpi nonExistent1 = cache
.getIfPresent(container1.getContainerInfo().getContainerID());
Assert.assertEquals(null, nonExistent1);
// However container call should succeed because of refcount on the client.
String traceID1 = "trace" + RandomStringUtils.randomNumeric(4);
ContainerProtocolCalls.createContainer(client1,
container1.getContainerID(), traceID1);
container1.getContainerInfo().getContainerID(), traceID1);
// After releasing the client, this connection should be closed
// and any container operations should fail
@ -166,7 +172,7 @@ public void testFreeByReference() throws IOException {
String expectedMessage = "This channel is not connected.";
try {
ContainerProtocolCalls.createContainer(client1,
container1.getContainerID(), traceID1);
container1.getContainerInfo().getContainerID(), traceID1);
Assert.fail("Create container should throw exception on closed"
+ "client");
} catch (Exception e) {
@ -186,28 +192,30 @@ public void testFreeByEviction() throws IOException {
Cache<Long, XceiverClientSpi> cache =
clientManager.getClientCache();
ContainerInfo container1 =
ContainerWithPipeline container1 =
storageContainerLocationClient.allocateContainer(
clientManager.getType(),
clientManager.getFactor(), containerOwner);
XceiverClientSpi client1 = clientManager.acquireClient(container1.getPipeline(),
container1.getContainerID());
XceiverClientSpi client1 = clientManager
.acquireClient(container1.getPipeline(),
container1.getContainerInfo().getContainerID());
Assert.assertEquals(1, client1.getRefcount());
clientManager.releaseClient(client1);
Assert.assertEquals(0, client1.getRefcount());
ContainerInfo container2 = storageContainerLocationClient
ContainerWithPipeline container2 = storageContainerLocationClient
.allocateContainer(clientManager.getType(), clientManager.getFactor(),
containerOwner);
XceiverClientSpi client2 = clientManager.acquireClient(container2.getPipeline(),
container2.getContainerID());
XceiverClientSpi client2 = clientManager
.acquireClient(container2.getPipeline(),
container2.getContainerInfo().getContainerID());
Assert.assertEquals(1, client2.getRefcount());
Assert.assertNotEquals(client1, client2);
// now client 1 should be evicted
XceiverClientSpi nonExistent = cache.getIfPresent(container1.getContainerID());
XceiverClientSpi nonExistent = cache
.getIfPresent(container1.getContainerInfo().getContainerID());
Assert.assertEquals(null, nonExistent);
// Any container operation should now fail
@ -215,7 +223,7 @@ public void testFreeByEviction() throws IOException {
String expectedMessage = "This channel is not connected.";
try {
ContainerProtocolCalls.createContainer(client1,
container1.getContainerID(), traceID2);
container1.getContainerInfo().getContainerID(), traceID2);
Assert.fail("Create container should throw exception on closed"
+ "client");
} catch (Exception e) {

View File

@ -32,7 +32,7 @@
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@ -79,14 +79,16 @@ public void testMetrics() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
XceiverClientManager clientManager = new XceiverClientManager(conf);
ContainerInfo container = storageContainerLocationClient
ContainerWithPipeline container = storageContainerLocationClient
.allocateContainer(clientManager.getType(), clientManager.getFactor(),
containerOwner);
XceiverClientSpi client = clientManager.acquireClient(
container.getPipeline(), container.getContainerID());
XceiverClientSpi client = clientManager
.acquireClient(container.getPipeline(),
container.getContainerInfo().getContainerID());
ContainerCommandRequestProto request = ContainerTestHelper
.getCreateContainerRequest(container.getContainerID(),
.getCreateContainerRequest(
container.getContainerInfo().getContainerID(),
container.getPipeline());
client.sendCommand(request);
@ -112,7 +114,7 @@ public void testMetrics() throws Exception {
// use async interface for testing pending metrics
for (int i = 0; i < numRequest; i++) {
BlockID blockID = ContainerTestHelper.
getTestBlockID(container.getContainerID());
getTestBlockID(container.getContainerInfo().getContainerID());
ContainerProtos.ContainerCommandRequestProto smallFileRequest;
smallFileRequest = ContainerTestHelper.getWriteSmallFileRequest(

View File

@ -60,7 +60,9 @@ public void initialize() throws IOException {
try {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(CLOSED)
.setPipeline(pipeline)
.setPipelineName(pipeline.getPipelineName())
.setReplicationType(pipeline.getType())
.setReplicationFactor(pipeline.getFactor())
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
@ -81,7 +83,9 @@ public void initialize() throws IOException {
try {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(OPEN)
.setPipeline(pipeline)
.setPipelineName(pipeline.getPipelineName())
.setReplicationType(pipeline.getType())
.setReplicationFactor(pipeline.getFactor())
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
@ -101,7 +105,9 @@ public void initialize() throws IOException {
try {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(OPEN)
.setPipeline(pipeline)
.setPipelineName(pipeline.getPipelineName())
.setReplicationType(pipeline.getType())
.setReplicationFactor(pipeline.getFactor())
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
@ -166,7 +172,9 @@ public void createContainerBenchMark(BenchMarkContainerStateMap state,
int cid = state.containerID.incrementAndGet();
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(CLOSED)
.setPipeline(pipeline)
.setPipelineName(pipeline.getPipelineName())
.setReplicationType(pipeline.getType())
.setReplicationFactor(pipeline.getFactor())
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)

View File

@ -19,6 +19,7 @@
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import com.google.protobuf.ByteString;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
@ -37,7 +38,6 @@
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@ -86,12 +86,12 @@ public class SQLCLI extends Configured implements Tool {
private static final String CREATE_CONTAINER_INFO =
"CREATE TABLE containerInfo (" +
"containerID LONG PRIMARY KEY NOT NULL, " +
"leaderUUID TEXT NOT NULL)";
private static final String CREATE_CONTAINER_MEMBERS =
"CREATE TABLE containerMembers (" +
"containerName TEXT NOT NULL, " +
"datanodeUUID TEXT NOT NULL," +
"PRIMARY KEY(containerName, datanodeUUID));";
"replicationType TEXT NOT NULL," +
"replicationFactor TEXT NOT NULL," +
"usedBytes LONG NOT NULL," +
"allocatedBytes LONG NOT NULL," +
"owner TEXT," +
"numberOfKeys LONG)";
private static final String CREATE_DATANODE_INFO =
"CREATE TABLE datanodeInfo (" +
"hostName TEXT NOT NULL, " +
@ -99,8 +99,10 @@ public class SQLCLI extends Configured implements Tool {
"ipAddress TEXT, " +
"containerPort INTEGER NOT NULL);";
private static final String INSERT_CONTAINER_INFO =
"INSERT INTO containerInfo (containerID, leaderUUID) " +
"VALUES (\"%d\", \"%s\")";
"INSERT INTO containerInfo (containerID, replicationType, "
+ "replicationFactor, usedBytes, allocatedBytes, owner, "
+ "numberOfKeys) VALUES (\"%d\", \"%s\", \"%s\", \"%d\", \"%d\", "
+ "\"%s\", \"%d\")";
private static final String INSERT_DATANODE_INFO =
"INSERT INTO datanodeInfo (hostname, datanodeUUid, ipAddress, " +
"containerPort) " +
@ -469,10 +471,7 @@ private void convertContainerDB(Path dbPath, Path outPath)
.setConf(conf).setDbFile(dbFile).build();
Connection conn = connectDB(outPath.toString())) {
executeSQL(conn, CREATE_CONTAINER_INFO);
executeSQL(conn, CREATE_CONTAINER_MEMBERS);
executeSQL(conn, CREATE_DATANODE_INFO);
HashSet<String> uuidChecked = new HashSet<>();
dbStore.iterate(null, (key, value) -> {
long containerID = Longs.fromByteArray(key);
ContainerInfo containerInfo = null;
@ -481,8 +480,7 @@ private void convertContainerDB(Path dbPath, Path outPath)
Preconditions.checkNotNull(containerInfo);
try {
//TODO: include container state to sqllite schema
insertContainerDB(conn, containerID,
containerInfo.getPipeline().getProtobufMessage(), uuidChecked);
insertContainerDB(conn, containerInfo, containerID);
return true;
} catch (SQLException e) {
throw new IOException(e);
@ -494,38 +492,23 @@ private void convertContainerDB(Path dbPath, Path outPath)
/**
* Insert into the sqlite DB of container.db.
* @param conn the connection to the sqlite DB.
* @param containerID the id of the container.
* @param pipeline the actual container pipeline object.
* @param uuidChecked the uuid that has been already inserted.
* @param containerInfo
* @param containerID
* @throws SQLException throws exception.
*/
private void insertContainerDB(Connection conn, long containerID,
Pipeline pipeline, Set<String> uuidChecked) throws SQLException {
private void insertContainerDB(Connection conn, ContainerInfo containerInfo,
long containerID) throws SQLException {
LOG.info("Insert to sql container db, for container {}", containerID);
String insertContainerInfo = String.format(
INSERT_CONTAINER_INFO, containerID,
pipeline.getLeaderID());
executeSQL(conn, insertContainerInfo);
containerInfo.getReplicationType(),
containerInfo.getReplicationFactor(),
containerInfo.getUsedBytes(),
containerInfo.getAllocatedBytes(),
containerInfo.getOwner(),
containerInfo.getNumberOfKeys());
for (HddsProtos.DatanodeDetailsProto dd :
pipeline.getMembersList()) {
String uuid = dd.getUuid();
if (!uuidChecked.contains(uuid)) {
// we may also not use this checked set, but catch exception instead
// but this seems a bit cleaner.
String ipAddr = dd.getIpAddress();
String hostName = dd.getHostName();
int containerPort = DatanodeDetails.getFromProtoBuf(dd)
.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
String insertMachineInfo = String.format(
INSERT_DATANODE_INFO, hostName, uuid, ipAddr, containerPort);
executeSQL(conn, insertMachineInfo);
uuidChecked.add(uuid);
}
String insertContainerMembers = String.format(
INSERT_CONTAINER_MEMBERS, containerID, uuid);
executeSQL(conn, insertContainerMembers);
}
executeSQL(conn, insertContainerInfo);
LOG.info("Insertion completed.");
}