HDDS-755. ContainerInfo and ContainerReplica protobuf changes.
Contributed by Nanda kumar.
This commit is contained in:
parent
773f0d1519
commit
e4f22b08e0
@ -27,7 +27,7 @@
|
||||
.StorageContainerLocationProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerData;
|
||||
.ContainerDataProto;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ReadContainerResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
@ -309,7 +309,7 @@ public List<ContainerInfo> listContainer(long startContainerID,
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public ContainerData readContainer(long containerID,
|
||||
public ContainerDataProto readContainer(long containerID,
|
||||
Pipeline pipeline) throws IOException {
|
||||
XceiverClientSpi client = null;
|
||||
try {
|
||||
@ -337,7 +337,7 @@ public ContainerData readContainer(long containerID,
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public ContainerData readContainer(long containerID) throws IOException {
|
||||
public ContainerDataProto readContainer(long containerID) throws IOException {
|
||||
ContainerWithPipeline info = getContainerWithPipeline(containerID);
|
||||
return readContainer(containerID, info.getPipeline());
|
||||
}
|
||||
|
@ -22,7 +22,7 @@
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerData;
|
||||
.ContainerDataProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
|
||||
import java.io.Closeable;
|
||||
@ -119,7 +119,7 @@ List<ContainerInfo> listContainer(long startContainerID,
|
||||
* @return ContainerInfo
|
||||
* @throws IOException
|
||||
*/
|
||||
ContainerData readContainer(long containerID, Pipeline pipeline)
|
||||
ContainerDataProto readContainer(long containerID, Pipeline pipeline)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
@ -128,7 +128,7 @@ ContainerData readContainer(long containerID, Pipeline pipeline)
|
||||
* @return ContainerInfo
|
||||
* @throws IOException
|
||||
*/
|
||||
ContainerData readContainer(long containerID)
|
||||
ContainerDataProto readContainer(long containerID)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -114,7 +114,7 @@ public ContainerInfo(ContainerInfo info) {
|
||||
public ContainerInfo() {
|
||||
}
|
||||
|
||||
public static ContainerInfo fromProtobuf(HddsProtos.SCMContainerInfo info) {
|
||||
public static ContainerInfo fromProtobuf(HddsProtos.ContainerInfoProto info) {
|
||||
ContainerInfo.Builder builder = new ContainerInfo.Builder();
|
||||
return builder.setPipelineID(
|
||||
PipelineID.getFromProtobuf(info.getPipelineID()))
|
||||
@ -191,9 +191,9 @@ public void updateLastUsedTime() {
|
||||
lastUsed = Time.monotonicNow();
|
||||
}
|
||||
|
||||
public HddsProtos.SCMContainerInfo getProtobuf() {
|
||||
HddsProtos.SCMContainerInfo.Builder builder =
|
||||
HddsProtos.SCMContainerInfo.newBuilder();
|
||||
public HddsProtos.ContainerInfoProto getProtobuf() {
|
||||
HddsProtos.ContainerInfoProto.Builder builder =
|
||||
HddsProtos.ContainerInfoProto.newBuilder();
|
||||
Preconditions.checkState(containerID > 0);
|
||||
return builder.setContainerID(getContainerID())
|
||||
.setUsedBytes(getUsedBytes())
|
||||
|
@ -181,7 +181,7 @@ public List<ContainerInfo> listContainer(long startContainerID, int count)
|
||||
SCMListContainerResponseProto response =
|
||||
rpcProxy.listContainer(NULL_RPC_CONTROLLER, request);
|
||||
List<ContainerInfo> containerList = new ArrayList<>();
|
||||
for (HddsProtos.SCMContainerInfo containerInfoProto : response
|
||||
for (HddsProtos.ContainerInfoProto containerInfoProto : response
|
||||
.getContainersList()) {
|
||||
containerList.add(ContainerInfo.fromProtobuf(containerInfoProto));
|
||||
}
|
||||
|
@ -158,17 +158,6 @@ message KeyValue {
|
||||
optional string value = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Lifecycle states of a container in Datanode.
|
||||
*/
|
||||
enum ContainerLifeCycleState {
|
||||
OPEN = 1;
|
||||
CLOSING = 2;
|
||||
CLOSED = 3;
|
||||
UNHEALTHY = 4;
|
||||
INVALID = 5;
|
||||
}
|
||||
|
||||
message ContainerCommandRequestProto {
|
||||
required Type cmdType = 1; // Type of the command
|
||||
|
||||
@ -235,14 +224,22 @@ message ContainerCommandResponseProto {
|
||||
optional GetCommittedBlockLengthResponseProto getCommittedBlockLength = 21;
|
||||
}
|
||||
|
||||
message ContainerData {
|
||||
message ContainerDataProto {
|
||||
enum State {
|
||||
OPEN = 1;
|
||||
CLOSING = 2;
|
||||
CLOSED = 3;
|
||||
QUASI_CLOSED = 4;
|
||||
UNHEALTHY = 5;
|
||||
INVALID = 6;
|
||||
}
|
||||
required int64 containerID = 1;
|
||||
repeated KeyValue metadata = 2;
|
||||
optional string containerPath = 4;
|
||||
optional int64 bytesUsed = 6;
|
||||
optional int64 size = 7;
|
||||
optional int64 blockCount = 8;
|
||||
optional ContainerLifeCycleState state = 9 [default = OPEN];
|
||||
optional State state = 9 [default = OPEN];
|
||||
optional ContainerType containerType = 10 [default = KeyValueContainer];
|
||||
}
|
||||
|
||||
@ -264,7 +261,7 @@ message ReadContainerRequestProto {
|
||||
}
|
||||
|
||||
message ReadContainerResponseProto {
|
||||
optional ContainerData containerData = 1;
|
||||
optional ContainerDataProto containerData = 1;
|
||||
}
|
||||
|
||||
message UpdateContainerRequestProto {
|
||||
@ -287,7 +284,7 @@ message ListContainerRequestProto {
|
||||
}
|
||||
|
||||
message ListContainerResponseProto {
|
||||
repeated ContainerData containerData = 1;
|
||||
repeated ContainerDataProto containerData = 1;
|
||||
}
|
||||
|
||||
message CloseContainerRequestProto {
|
||||
|
@ -60,7 +60,7 @@ message GetContainerRequestProto {
|
||||
}
|
||||
|
||||
message GetContainerResponseProto {
|
||||
required SCMContainerInfo containerInfo = 1;
|
||||
required ContainerInfoProto containerInfo = 1;
|
||||
}
|
||||
|
||||
message GetContainerWithPipelineRequestProto {
|
||||
@ -77,7 +77,7 @@ message SCMListContainerRequestProto {
|
||||
}
|
||||
|
||||
message SCMListContainerResponseProto {
|
||||
repeated SCMContainerInfo containers = 1;
|
||||
repeated ContainerInfoProto containers = 1;
|
||||
}
|
||||
|
||||
message SCMDeleteContainerRequestProto {
|
||||
|
@ -131,7 +131,7 @@ enum LifeCycleEvent {
|
||||
CLEANUP = 8;
|
||||
}
|
||||
|
||||
message SCMContainerInfo {
|
||||
message ContainerInfoProto {
|
||||
required int64 containerID = 1;
|
||||
required LifeCycleState state = 2;
|
||||
optional PipelineID pipelineID = 3;
|
||||
@ -145,7 +145,7 @@ message SCMContainerInfo {
|
||||
}
|
||||
|
||||
message ContainerWithPipeline {
|
||||
required SCMContainerInfo containerInfo = 1;
|
||||
required ContainerInfoProto containerInfo = 1;
|
||||
required Pipeline pipeline = 2;
|
||||
}
|
||||
|
||||
|
@ -25,8 +25,8 @@
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
|
||||
ContainerType;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
|
||||
ContainerLifeCycleState;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerDataProto;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
|
||||
@ -65,7 +65,7 @@ public abstract class ContainerData {
|
||||
private final Map<String, String> metadata;
|
||||
|
||||
// State of the Container
|
||||
private ContainerLifeCycleState state;
|
||||
private ContainerDataProto.State state;
|
||||
|
||||
private final long maxSize;
|
||||
|
||||
@ -121,7 +121,7 @@ protected ContainerData(ContainerType type, long containerId,
|
||||
this.containerID = containerId;
|
||||
this.layOutVersion = layOutVersion;
|
||||
this.metadata = new TreeMap<>();
|
||||
this.state = ContainerLifeCycleState.OPEN;
|
||||
this.state = ContainerDataProto.State.OPEN;
|
||||
this.readCount = new AtomicLong(0L);
|
||||
this.readBytes = new AtomicLong(0L);
|
||||
this.writeCount = new AtomicLong(0L);
|
||||
@ -158,7 +158,7 @@ public ContainerType getContainerType() {
|
||||
* Returns the state of the container.
|
||||
* @return ContainerLifeCycleState
|
||||
*/
|
||||
public synchronized ContainerLifeCycleState getState() {
|
||||
public synchronized ContainerDataProto.State getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
@ -166,7 +166,7 @@ public synchronized ContainerLifeCycleState getState() {
|
||||
* Set the state of the container.
|
||||
* @param state
|
||||
*/
|
||||
public synchronized void setState(ContainerLifeCycleState state) {
|
||||
public synchronized void setState(ContainerDataProto.State state) {
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
@ -222,7 +222,7 @@ public void setMetadata(Map<String, String> metadataMap) {
|
||||
* @return - boolean
|
||||
*/
|
||||
public synchronized boolean isOpen() {
|
||||
return ContainerLifeCycleState.OPEN == state;
|
||||
return ContainerDataProto.State.OPEN == state;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -230,7 +230,7 @@ public synchronized boolean isOpen() {
|
||||
* @return - boolean
|
||||
*/
|
||||
public synchronized boolean isValid() {
|
||||
return !(ContainerLifeCycleState.INVALID == state);
|
||||
return !(ContainerDataProto.State.INVALID == state);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -238,14 +238,14 @@ public synchronized boolean isValid() {
|
||||
* @return - boolean
|
||||
*/
|
||||
public synchronized boolean isClosed() {
|
||||
return ContainerLifeCycleState.CLOSED == state;
|
||||
return ContainerDataProto.State.CLOSED == state;
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks this container as closed.
|
||||
*/
|
||||
public synchronized void closeContainer() {
|
||||
setState(ContainerLifeCycleState.CLOSED);
|
||||
setState(ContainerDataProto.State.CLOSED);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -431,5 +431,5 @@ public void computeAndSetChecksum(Yaml yaml) throws IOException {
|
||||
*
|
||||
* @return Protocol Buffer Message
|
||||
*/
|
||||
public abstract ContainerProtos.ContainerData getProtoBufMessage();
|
||||
public abstract ContainerProtos.ContainerDataProto getProtoBufMessage();
|
||||
}
|
||||
|
@ -253,13 +253,13 @@ public Object construct(Node node) {
|
||||
String state = (String) nodes.get(OzoneConsts.STATE);
|
||||
switch (state) {
|
||||
case "OPEN":
|
||||
kvData.setState(ContainerProtos.ContainerLifeCycleState.OPEN);
|
||||
kvData.setState(ContainerProtos.ContainerDataProto.State.OPEN);
|
||||
break;
|
||||
case "CLOSING":
|
||||
kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSING);
|
||||
kvData.setState(ContainerProtos.ContainerDataProto.State.CLOSING);
|
||||
break;
|
||||
case "CLOSED":
|
||||
kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSED);
|
||||
kvData.setState(ContainerProtos.ContainerDataProto.State.CLOSED);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("Unexpected " +
|
||||
|
@ -24,6 +24,8 @@
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.HddsUtils;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerDataProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerAction;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||
@ -40,8 +42,6 @@
|
||||
.ContainerCommandResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerType;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerLifeCycleState;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -156,9 +156,9 @@ public ContainerCommandResponseProto dispatch(
|
||||
// which has failed, so the container is marked unhealthy right here.
|
||||
// Once container is marked unhealthy, all the subsequent write
|
||||
// transactions will fail with UNHEALTHY_CONTAINER exception.
|
||||
if (container.getContainerState() == ContainerLifeCycleState.OPEN) {
|
||||
if (container.getContainerState() == ContainerDataProto.State.OPEN) {
|
||||
container.getContainerData()
|
||||
.setState(ContainerLifeCycleState.UNHEALTHY);
|
||||
.setState(ContainerDataProto.State.UNHEALTHY);
|
||||
sendCloseContainerActionIfNeeded(container);
|
||||
}
|
||||
}
|
||||
@ -191,7 +191,7 @@ private void sendCloseContainerActionIfNeeded(Container container) {
|
||||
|
||||
private boolean isContainerFull(Container container) {
|
||||
boolean isOpen = Optional.ofNullable(container)
|
||||
.map(cont -> cont.getContainerState() == ContainerLifeCycleState.OPEN)
|
||||
.map(cont -> cont.getContainerState() == ContainerDataProto.State.OPEN)
|
||||
.orElse(Boolean.FALSE);
|
||||
if (isOpen) {
|
||||
ContainerData containerData = container.getContainerData();
|
||||
@ -205,7 +205,8 @@ private boolean isContainerFull(Container container) {
|
||||
|
||||
private boolean isContainerUnhealthy(Container container) {
|
||||
return Optional.ofNullable(container).map(
|
||||
cont -> (cont.getContainerState() == ContainerLifeCycleState.UNHEALTHY))
|
||||
cont -> (cont.getContainerState() ==
|
||||
ContainerDataProto.State.UNHEALTHY))
|
||||
.orElse(Boolean.FALSE);
|
||||
}
|
||||
|
||||
|
@ -25,11 +25,10 @@
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerLifeCycleState;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||
.StorageContainerException;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
|
||||
|
||||
import org.apache.hadoop.hdfs.util.RwLock;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||
@ -80,7 +79,7 @@ void update(Map<String, String> metaData, boolean forceUpdate)
|
||||
* @return ContainerLifeCycleState - Container State.
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
ContainerLifeCycleState getContainerState();
|
||||
ContainerProtos.ContainerDataProto.State getContainerState();
|
||||
|
||||
/**
|
||||
* Closes a open container, if it is already closed or does not exist a
|
||||
@ -130,7 +129,7 @@ void exportContainerData(OutputStream stream,
|
||||
/**
|
||||
* Returns containerReport for the container.
|
||||
*/
|
||||
StorageContainerDatanodeProtocolProtos.ContainerInfo getContainerReport()
|
||||
ContainerReplicaProto getContainerReport()
|
||||
throws StorageContainerException;
|
||||
|
||||
/**
|
||||
|
@ -30,13 +30,11 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerLifeCycleState;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerType;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos;
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||
.StorageContainerException;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
@ -284,7 +282,7 @@ public void close() throws StorageContainerException {
|
||||
|
||||
} catch (StorageContainerException ex) {
|
||||
// Failed to update .container file. Reset the state to CLOSING
|
||||
containerData.setState(ContainerLifeCycleState.CLOSING);
|
||||
containerData.setState(ContainerProtos.ContainerDataProto.State.CLOSING);
|
||||
throw ex;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
@ -309,7 +307,7 @@ public KeyValueContainerData getContainerData() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerLifeCycleState getContainerState() {
|
||||
public ContainerProtos.ContainerDataProto.State getContainerState() {
|
||||
return containerData.getState();
|
||||
}
|
||||
|
||||
@ -427,7 +425,8 @@ public void importContainerData(InputStream input,
|
||||
@Override
|
||||
public void exportContainerData(OutputStream destination,
|
||||
ContainerPacker<KeyValueContainerData> packer) throws IOException {
|
||||
if (getContainerData().getState() != ContainerLifeCycleState.CLOSED) {
|
||||
if (getContainerData().getState() !=
|
||||
ContainerProtos.ContainerDataProto.State.CLOSED) {
|
||||
throw new IllegalStateException(
|
||||
"Only closed containers could be exported: ContainerId="
|
||||
+ getContainerData().getContainerID());
|
||||
@ -518,10 +517,10 @@ public void updateBlockCommitSequenceId(long blockCommitSequenceId) {
|
||||
* Returns KeyValueContainerReport for the KeyValueContainer.
|
||||
*/
|
||||
@Override
|
||||
public StorageContainerDatanodeProtocolProtos.ContainerInfo
|
||||
getContainerReport() throws StorageContainerException{
|
||||
StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
|
||||
StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
|
||||
public ContainerReplicaProto getContainerReport()
|
||||
throws StorageContainerException {
|
||||
ContainerReplicaProto.Builder ciBuilder =
|
||||
ContainerReplicaProto.newBuilder();
|
||||
ciBuilder.setContainerID(containerData.getContainerID())
|
||||
.setReadCount(containerData.getReadCount())
|
||||
.setWriteCount(containerData.getWriteCount())
|
||||
@ -540,18 +539,18 @@ public void updateBlockCommitSequenceId(long blockCommitSequenceId) {
|
||||
* @return LifeCycle State of the container in HddsProtos format
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
private HddsProtos.LifeCycleState getHddsState()
|
||||
private ContainerReplicaProto.State getHddsState()
|
||||
throws StorageContainerException {
|
||||
HddsProtos.LifeCycleState state;
|
||||
ContainerReplicaProto.State state;
|
||||
switch (containerData.getState()) {
|
||||
case OPEN:
|
||||
state = HddsProtos.LifeCycleState.OPEN;
|
||||
state = ContainerReplicaProto.State.OPEN;
|
||||
break;
|
||||
case CLOSING:
|
||||
state = HddsProtos.LifeCycleState.CLOSING;
|
||||
state = ContainerReplicaProto.State.CLOSING;
|
||||
break;
|
||||
case CLOSED:
|
||||
state = HddsProtos.LifeCycleState.CLOSED;
|
||||
state = ContainerReplicaProto.State.CLOSED;
|
||||
break;
|
||||
default:
|
||||
throw new StorageContainerException("Invalid Container state found: " +
|
||||
|
@ -24,6 +24,8 @@
|
||||
|
||||
import org.apache.hadoop.conf.StorageSize;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerDataProto;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||
@ -245,9 +247,8 @@ public long getDeleteTransactionId() {
|
||||
*
|
||||
* @return Protocol Buffer Message
|
||||
*/
|
||||
public ContainerProtos.ContainerData getProtoBufMessage() {
|
||||
ContainerProtos.ContainerData.Builder builder = ContainerProtos
|
||||
.ContainerData.newBuilder();
|
||||
public ContainerDataProto getProtoBufMessage() {
|
||||
ContainerDataProto.Builder builder = ContainerDataProto.newBuilder();
|
||||
builder.setContainerID(this.getContainerID());
|
||||
builder.setContainerPath(this.getMetadataPath());
|
||||
builder.setState(this.getState());
|
||||
@ -282,7 +283,7 @@ public static List<String> getYamlFields() {
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static KeyValueContainerData getFromProtoBuf(
|
||||
ContainerProtos.ContainerData protoData) throws IOException {
|
||||
ContainerDataProto protoData) throws IOException {
|
||||
// TODO: Add containerMaxSize to ContainerProtos.ContainerData
|
||||
StorageSize storageSize = StorageSize.parse(
|
||||
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
|
||||
|
@ -31,12 +31,12 @@
|
||||
import org.apache.hadoop.conf.StorageUnit;
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerDataProto;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerCommandRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerCommandResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerLifeCycleState;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerType;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
@ -385,13 +385,13 @@ ContainerCommandResponseProto handleCloseContainer(
|
||||
}
|
||||
|
||||
long containerID = kvContainer.getContainerData().getContainerID();
|
||||
ContainerLifeCycleState containerState = kvContainer.getContainerState();
|
||||
ContainerDataProto.State containerState = kvContainer.getContainerState();
|
||||
|
||||
try {
|
||||
if (containerState == ContainerLifeCycleState.CLOSED) {
|
||||
if (containerState == ContainerDataProto.State .CLOSED) {
|
||||
LOG.debug("Container {} is already closed.", containerID);
|
||||
return ContainerUtils.getSuccessResponse(request);
|
||||
} else if (containerState == ContainerLifeCycleState.INVALID) {
|
||||
} else if (containerState == ContainerDataProto.State .INVALID) {
|
||||
LOG.debug("Invalid container data. ContainerID: {}", containerID);
|
||||
throw new StorageContainerException("Invalid container data. " +
|
||||
"ContainerID: " + containerID, INVALID_CONTAINER_STATE);
|
||||
@ -401,7 +401,7 @@ ContainerCommandResponseProto handleCloseContainer(
|
||||
|
||||
// remove the container from open block map once, all the blocks
|
||||
// have been committed and the container is closed
|
||||
kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSING);
|
||||
kvData.setState(ContainerDataProto.State.CLOSING);
|
||||
commitPendingBlocks(kvContainer);
|
||||
kvContainer.close();
|
||||
// make sure the the container open keys from BlockMap gets removed
|
||||
@ -798,9 +798,9 @@ ContainerCommandResponseProto handleUnsupportedOp(
|
||||
private void checkContainerOpen(KeyValueContainer kvContainer)
|
||||
throws StorageContainerException {
|
||||
|
||||
ContainerLifeCycleState containerState = kvContainer.getContainerState();
|
||||
ContainerDataProto.State containerState = kvContainer.getContainerState();
|
||||
|
||||
if (containerState == ContainerLifeCycleState.OPEN) {
|
||||
if (containerState == ContainerDataProto.State.OPEN) {
|
||||
return;
|
||||
} else {
|
||||
String msg = "Requested operation not allowed as ContainerState is " +
|
||||
|
@ -80,10 +80,11 @@ message SCMHeartbeatRequestProto {
|
||||
required DatanodeDetailsProto datanodeDetails = 1;
|
||||
optional NodeReportProto nodeReport = 2;
|
||||
optional ContainerReportsProto containerReport = 3;
|
||||
repeated CommandStatusReportsProto commandStatusReports = 4;
|
||||
optional ContainerActionsProto containerActions = 5;
|
||||
optional PipelineActionsProto pipelineActions = 6;
|
||||
optional PipelineReportsProto pipelineReports = 7;
|
||||
optional IncrementalContainerReportProto incrementalContainerReport = 4;
|
||||
repeated CommandStatusReportsProto commandStatusReports = 5;
|
||||
optional ContainerActionsProto containerActions = 6;
|
||||
optional PipelineActionsProto pipelineActions = 7;
|
||||
optional PipelineReportsProto pipelineReports = 8;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -128,7 +129,34 @@ enum StorageTypeProto {
|
||||
}
|
||||
|
||||
message ContainerReportsProto {
|
||||
repeated ContainerInfo reports = 1;
|
||||
repeated ContainerReplicaProto reports = 1;
|
||||
}
|
||||
|
||||
message IncrementalContainerReportProto {
|
||||
repeated ContainerReplicaProto report = 1;
|
||||
}
|
||||
|
||||
message ContainerReplicaProto {
|
||||
enum State {
|
||||
OPEN = 1;
|
||||
CLOSING = 2;
|
||||
CLOSED = 3;
|
||||
QUASI_CLOSED = 4;
|
||||
UNHEALTHY = 5;
|
||||
INVALID = 6;
|
||||
}
|
||||
required int64 containerID = 1;
|
||||
required State state = 2;
|
||||
optional int64 size = 3;
|
||||
optional int64 used = 4;
|
||||
optional int64 keyCount = 5;
|
||||
optional int64 readCount = 6;
|
||||
optional int64 writeCount = 7;
|
||||
optional int64 readBytes = 8;
|
||||
optional int64 writeBytes = 9;
|
||||
optional string finalhash = 10;
|
||||
optional int64 deleteTransactionId = 11;
|
||||
optional uint64 blockCommitSequenceId = 12;
|
||||
}
|
||||
|
||||
message CommandStatusReportsProto {
|
||||
@ -200,25 +228,6 @@ message PipelineAction {
|
||||
optional ClosePipelineInfo closePipeline = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
A container report contains the following information.
|
||||
*/
|
||||
message ContainerInfo {
|
||||
required int64 containerID = 1;
|
||||
optional int64 size = 2;
|
||||
optional int64 used = 3;
|
||||
optional int64 keyCount = 4;
|
||||
// TODO: move the io count to separate message
|
||||
optional int64 readCount = 5;
|
||||
optional int64 writeCount = 6;
|
||||
optional int64 readBytes = 7;
|
||||
optional int64 writeBytes = 8;
|
||||
optional string finalhash = 9;
|
||||
optional hadoop.hdds.LifeCycleState state = 10;
|
||||
optional int64 deleteTransactionId = 11;
|
||||
optional uint64 blockCommitSequenceId = 12;
|
||||
}
|
||||
|
||||
/*
|
||||
* These are commands returned by SCM for to the datanode to execute.
|
||||
*/
|
||||
|
@ -18,6 +18,8 @@
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
@ -31,8 +33,6 @@
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
@ -66,8 +66,9 @@ public ScmTestMock() {
|
||||
}
|
||||
|
||||
// Map of datanode to containers
|
||||
private Map<DatanodeDetails, Map<String, ContainerInfo>> nodeContainers =
|
||||
new HashMap();
|
||||
private Map<DatanodeDetails,
|
||||
Map<String, ContainerReplicaProto>> nodeContainers =
|
||||
new HashMap<>();
|
||||
private Map<DatanodeDetails, NodeReportProto> nodeReports = new HashMap<>();
|
||||
private AtomicInteger commandStatusReport = new AtomicInteger(0);
|
||||
private List<CommandStatus> cmdStatusList = new LinkedList<>();
|
||||
@ -274,7 +275,7 @@ public void updateContainerReport(
|
||||
nodeContainers.put(datanode, containers);
|
||||
}
|
||||
|
||||
for (StorageContainerDatanodeProtocolProtos.ContainerInfo report : reports
|
||||
for (ContainerReplicaProto report : reports
|
||||
.getReportsList()) {
|
||||
containers.put(report.getContainerID(), report);
|
||||
}
|
||||
@ -297,7 +298,8 @@ public int getNodeReportsCount(DatanodeDetails datanodeDetails) {
|
||||
* @return count of storage reports of a datanode
|
||||
*/
|
||||
public int getContainerCountsForDatanode(DatanodeDetails datanodeDetails) {
|
||||
Map<String, ContainerInfo> cr = nodeContainers.get(datanodeDetails);
|
||||
Map<String, ContainerReplicaProto> cr =
|
||||
nodeContainers.get(datanodeDetails);
|
||||
if(cr != null) {
|
||||
return cr.size();
|
||||
}
|
||||
|
@ -40,8 +40,8 @@ public void testKeyValueData() {
|
||||
.ContainerType.KeyValueContainer;
|
||||
String path = "/tmp";
|
||||
String containerDBType = "RocksDB";
|
||||
ContainerProtos.ContainerLifeCycleState state = ContainerProtos
|
||||
.ContainerLifeCycleState.CLOSED;
|
||||
ContainerProtos.ContainerDataProto.State state =
|
||||
ContainerProtos.ContainerDataProto.State.CLOSED;
|
||||
AtomicLong val = new AtomicLong(0);
|
||||
|
||||
KeyValueContainerData kvData = new KeyValueContainerData(containerId,
|
||||
@ -49,7 +49,7 @@ public void testKeyValueData() {
|
||||
|
||||
assertEquals(containerType, kvData.getContainerType());
|
||||
assertEquals(containerId, kvData.getContainerID());
|
||||
assertEquals(ContainerProtos.ContainerLifeCycleState.OPEN, kvData
|
||||
assertEquals(ContainerProtos.ContainerDataProto.State.OPEN, kvData
|
||||
.getState());
|
||||
assertEquals(0, kvData.getMetadata().size());
|
||||
assertEquals(0, kvData.getNumPendingDeletionBlocks());
|
||||
|
@ -91,7 +91,7 @@ public void testCreateContainerFile() throws IOException {
|
||||
assertEquals("RocksDB", kvData.getContainerDBType());
|
||||
assertEquals(containerFile.getParent(), kvData.getMetadataPath());
|
||||
assertEquals(containerFile.getParent(), kvData.getChunksPath());
|
||||
assertEquals(ContainerProtos.ContainerLifeCycleState.OPEN, kvData
|
||||
assertEquals(ContainerProtos.ContainerDataProto.State.OPEN, kvData
|
||||
.getState());
|
||||
assertEquals(1, kvData.getLayOutVersion());
|
||||
assertEquals(0, kvData.getMetadata().size());
|
||||
@ -100,7 +100,7 @@ public void testCreateContainerFile() throws IOException {
|
||||
// Update ContainerData.
|
||||
kvData.addMetadata("VOLUME", "hdfs");
|
||||
kvData.addMetadata("OWNER", "ozone");
|
||||
kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSED);
|
||||
kvData.setState(ContainerProtos.ContainerDataProto.State.CLOSED);
|
||||
|
||||
|
||||
ContainerDataYaml.createContainerFile(ContainerProtos.ContainerType
|
||||
@ -117,7 +117,7 @@ public void testCreateContainerFile() throws IOException {
|
||||
assertEquals("RocksDB", kvData.getContainerDBType());
|
||||
assertEquals(containerFile.getParent(), kvData.getMetadataPath());
|
||||
assertEquals(containerFile.getParent(), kvData.getChunksPath());
|
||||
assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED, kvData
|
||||
assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, kvData
|
||||
.getState());
|
||||
assertEquals(1, kvData.getLayOutVersion());
|
||||
assertEquals(2, kvData.getMetadata().size());
|
||||
@ -161,7 +161,7 @@ public void testCheckBackWardCompatabilityOfContainerFile() throws
|
||||
ContainerUtils.verifyChecksum(kvData);
|
||||
|
||||
//Checking the Container file data is consistent or not
|
||||
assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED, kvData
|
||||
assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, kvData
|
||||
.getState());
|
||||
assertEquals("RocksDB", kvData.getContainerDBType());
|
||||
assertEquals(ContainerProtos.ContainerType.KeyValueContainer, kvData
|
||||
|
@ -51,8 +51,8 @@ public class TestContainerSet {
|
||||
public void testAddGetRemoveContainer() throws StorageContainerException {
|
||||
ContainerSet containerSet = new ContainerSet();
|
||||
long containerId = 100L;
|
||||
ContainerProtos.ContainerLifeCycleState state = ContainerProtos
|
||||
.ContainerLifeCycleState.CLOSED;
|
||||
ContainerProtos.ContainerDataProto.State state = ContainerProtos
|
||||
.ContainerDataProto.State.CLOSED;
|
||||
|
||||
KeyValueContainerData kvData = new KeyValueContainerData(containerId,
|
||||
(long) StorageUnit.GB.toBytes(5));
|
||||
@ -101,10 +101,10 @@ public void testIteratorsAndCount() throws StorageContainerException {
|
||||
ContainerData containerData = kv.getContainerData();
|
||||
long containerId = containerData.getContainerID();
|
||||
if (containerId%2 == 0) {
|
||||
assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED,
|
||||
assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
|
||||
containerData.getState());
|
||||
} else {
|
||||
assertEquals(ContainerProtos.ContainerLifeCycleState.OPEN,
|
||||
assertEquals(ContainerProtos.ContainerDataProto.State.OPEN,
|
||||
containerData.getState());
|
||||
}
|
||||
count++;
|
||||
@ -121,10 +121,10 @@ public void testIteratorsAndCount() throws StorageContainerException {
|
||||
ContainerData containerData = kv.getContainerData();
|
||||
long containerId = containerData.getContainerID();
|
||||
if (containerId%2 == 0) {
|
||||
assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED,
|
||||
assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
|
||||
containerData.getState());
|
||||
} else {
|
||||
assertEquals(ContainerProtos.ContainerLifeCycleState.OPEN,
|
||||
assertEquals(ContainerProtos.ContainerDataProto.State.OPEN,
|
||||
containerData.getState());
|
||||
}
|
||||
count++;
|
||||
@ -168,9 +168,9 @@ private ContainerSet createContainerSet() throws StorageContainerException {
|
||||
KeyValueContainerData kvData = new KeyValueContainerData(i,
|
||||
(long) StorageUnit.GB.toBytes(5));
|
||||
if (i%2 == 0) {
|
||||
kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSED);
|
||||
kvData.setState(ContainerProtos.ContainerDataProto.State.CLOSED);
|
||||
} else {
|
||||
kvData.setState(ContainerProtos.ContainerLifeCycleState.OPEN);
|
||||
kvData.setState(ContainerProtos.ContainerDataProto.State.OPEN);
|
||||
}
|
||||
KeyValueContainer kv = new KeyValueContainer(kvData, new
|
||||
OzoneConfiguration());
|
||||
|
@ -24,8 +24,6 @@
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerLifeCycleState;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||
.StorageContainerException;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
|
||||
@ -185,7 +183,8 @@ public void testContainerImportExport() throws Exception {
|
||||
keyValueContainerData = keyValueContainer
|
||||
.getContainerData();
|
||||
|
||||
keyValueContainerData.setState(ContainerLifeCycleState.CLOSED);
|
||||
keyValueContainerData.setState(
|
||||
ContainerProtos.ContainerDataProto.State.CLOSED);
|
||||
|
||||
int numberOfKeysToWrite = 12;
|
||||
//write one few keys to check the key count after import
|
||||
@ -286,7 +285,7 @@ public void testDiskFullExceptionCreateContainer() throws Exception {
|
||||
|
||||
@Test
|
||||
public void testDeleteContainer() throws Exception {
|
||||
keyValueContainerData.setState(ContainerProtos.ContainerLifeCycleState
|
||||
keyValueContainerData.setState(ContainerProtos.ContainerDataProto.State
|
||||
.CLOSED);
|
||||
keyValueContainer = new KeyValueContainer(
|
||||
keyValueContainerData, conf);
|
||||
@ -315,7 +314,7 @@ public void testCloseContainer() throws Exception {
|
||||
keyValueContainerData = keyValueContainer
|
||||
.getContainerData();
|
||||
|
||||
assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED,
|
||||
assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
|
||||
keyValueContainerData.getState());
|
||||
|
||||
//Check state in the .container file
|
||||
@ -325,7 +324,7 @@ public void testCloseContainer() throws Exception {
|
||||
|
||||
keyValueContainerData = (KeyValueContainerData) ContainerDataYaml
|
||||
.readContainerFile(containerFile);
|
||||
assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED,
|
||||
assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
|
||||
keyValueContainerData.getState());
|
||||
}
|
||||
|
||||
@ -354,8 +353,8 @@ public void testUpdateContainer() throws IOException {
|
||||
@Test
|
||||
public void testUpdateContainerUnsupportedRequest() throws Exception {
|
||||
try {
|
||||
keyValueContainerData.setState(ContainerProtos.ContainerLifeCycleState
|
||||
.CLOSED);
|
||||
keyValueContainerData.setState(
|
||||
ContainerProtos.ContainerDataProto.State.CLOSED);
|
||||
keyValueContainer = new KeyValueContainer(keyValueContainerData, conf);
|
||||
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
|
||||
Map<String, String> metadata = new HashMap<>();
|
||||
|
@ -266,7 +266,7 @@ public void testCloseInvalidContainer() {
|
||||
KeyValueContainerData kvData = new KeyValueContainerData(containerID,
|
||||
(long) StorageUnit.GB.toBytes(1));
|
||||
KeyValueContainer container = new KeyValueContainer(kvData, conf);
|
||||
kvData.setState(ContainerProtos.ContainerLifeCycleState.INVALID);
|
||||
kvData.setState(ContainerProtos.ContainerDataProto.State.INVALID);
|
||||
|
||||
// Create Close container request
|
||||
ContainerCommandRequestProto closeContainerRequest =
|
||||
|
@ -78,7 +78,7 @@ public void onMessage(ContainerReportFromDatanode containerReportFromDatanode,
|
||||
|
||||
Set<ContainerID> containerIds = containerReport.getReportsList().stream()
|
||||
.map(StorageContainerDatanodeProtocolProtos
|
||||
.ContainerInfo::getContainerID)
|
||||
.ContainerReplicaProto::getContainerID)
|
||||
.map(ContainerID::new)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
|
@ -22,6 +22,7 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.StorageUnit;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
|
||||
import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
|
||||
@ -37,7 +38,7 @@
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos;
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||
@ -144,7 +145,7 @@ private void loadExistingContainers() throws IOException {
|
||||
.getSequentialRangeKVs(null, Integer.MAX_VALUE, null);
|
||||
for (Map.Entry<byte[], byte[]> entry : range) {
|
||||
ContainerInfo container = ContainerInfo.fromProtobuf(
|
||||
HddsProtos.SCMContainerInfo.PARSER.parseFrom(entry.getValue()));
|
||||
ContainerInfoProto.PARSER.parseFrom(entry.getValue()));
|
||||
Preconditions.checkNotNull(container);
|
||||
containerStateManager.loadContainer(container);
|
||||
if (container.isOpen()) {
|
||||
@ -452,7 +453,7 @@ public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
|
||||
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
|
||||
}
|
||||
ContainerInfo containerInfo = ContainerInfo.fromProtobuf(
|
||||
HddsProtos.SCMContainerInfo.parseFrom(containerBytes));
|
||||
HddsProtos.ContainerInfoProto.parseFrom(containerBytes));
|
||||
containerInfo.updateDeleteTransactionId(entry.getValue());
|
||||
batch.put(dbKey, containerInfo.getProtobuf().toByteArray());
|
||||
}
|
||||
@ -507,11 +508,11 @@ public ContainerWithPipeline getMatchingContainerWithPipeline(
|
||||
@Override
|
||||
public void processContainerReports(DatanodeDetails datanodeDetails,
|
||||
ContainerReportsProto reports) throws IOException {
|
||||
List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
|
||||
List<ContainerReplicaProto>
|
||||
containerInfos = reports.getReportsList();
|
||||
PendingDeleteStatusList pendingDeleteStatusList =
|
||||
new PendingDeleteStatusList(datanodeDetails);
|
||||
for (StorageContainerDatanodeProtocolProtos.ContainerInfo newInfo :
|
||||
for (ContainerReplicaProto newInfo :
|
||||
containerInfos) {
|
||||
ContainerID id = ContainerID.valueof(newInfo.getContainerID());
|
||||
ContainerReplica replica = ContainerReplica.newBuilder()
|
||||
@ -523,7 +524,7 @@ public void processContainerReports(DatanodeDetails datanodeDetails,
|
||||
try {
|
||||
containerStateManager.updateContainerReplica(id, replica);
|
||||
ContainerInfo currentInfo = containerStateManager.getContainer(id);
|
||||
if (newInfo.getState() == LifeCycleState.CLOSED
|
||||
if (newInfo.getState() == ContainerReplicaProto.State.CLOSED
|
||||
&& currentInfo.getState() == LifeCycleState.CLOSING) {
|
||||
currentInfo = updateContainerStateInternal(id, LifeCycleEvent.CLOSE);
|
||||
if (!currentInfo.isOpen()) {
|
||||
@ -532,7 +533,7 @@ public void processContainerReports(DatanodeDetails datanodeDetails,
|
||||
}
|
||||
}
|
||||
|
||||
HddsProtos.SCMContainerInfo newState =
|
||||
ContainerInfoProto newState =
|
||||
reconcileState(newInfo, currentInfo);
|
||||
|
||||
if (currentInfo.getDeleteTransactionId() >
|
||||
@ -567,11 +568,11 @@ public void processContainerReports(DatanodeDetails datanodeDetails,
|
||||
* @param knownState - State inside SCM.
|
||||
* @return new SCM State for this container.
|
||||
*/
|
||||
private HddsProtos.SCMContainerInfo reconcileState(
|
||||
StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState,
|
||||
private HddsProtos.ContainerInfoProto reconcileState(
|
||||
ContainerReplicaProto datanodeState,
|
||||
ContainerInfo knownState) {
|
||||
HddsProtos.SCMContainerInfo.Builder builder =
|
||||
HddsProtos.SCMContainerInfo.newBuilder();
|
||||
HddsProtos.ContainerInfoProto.Builder builder =
|
||||
HddsProtos.ContainerInfoProto.newBuilder();
|
||||
builder.setContainerID(knownState.getContainerID())
|
||||
.setPipelineID(knownState.getPipelineID().getProtobuf())
|
||||
.setState(knownState.getState())
|
||||
|
@ -55,7 +55,7 @@ private HddsTestUtils() {
|
||||
*/
|
||||
public static NodeRegistrationContainerReport
|
||||
createNodeRegistrationContainerReport(List<ContainerInfo> dnContainers) {
|
||||
List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
|
||||
List<StorageContainerDatanodeProtocolProtos.ContainerReplicaProto>
|
||||
containers = new ArrayList<>();
|
||||
dnContainers.forEach(c -> {
|
||||
containers.add(TestUtils.getRandomContainerInfo(c.getContainerID()));
|
||||
|
@ -17,6 +17,8 @@
|
||||
package org.apache.hadoop.hdds.scm;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.PipelineReport;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
@ -27,8 +29,6 @@
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerManager;
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||
import org.apache.hadoop.hdds.protocol
|
||||
@ -293,7 +293,7 @@ public static ContainerReportsProto getRandomContainerReports() {
|
||||
*/
|
||||
public static ContainerReportsProto getRandomContainerReports(
|
||||
int numberOfContainers) {
|
||||
List<ContainerInfo> containerInfos = new ArrayList<>();
|
||||
List<ContainerReplicaProto> containerInfos = new ArrayList<>();
|
||||
for (int i = 0; i < numberOfContainers; i++) {
|
||||
containerInfos.add(getRandomContainerInfo(i));
|
||||
}
|
||||
@ -326,7 +326,7 @@ public static PipelineReportFromDatanode getRandomPipelineReportFromDatanode(
|
||||
* @return ContainerReportsProto
|
||||
*/
|
||||
public static ContainerReportsProto getContainerReports(
|
||||
ContainerInfo... containerInfos) {
|
||||
ContainerReplicaProto... containerInfos) {
|
||||
return getContainerReports(Arrays.asList(containerInfos));
|
||||
}
|
||||
|
||||
@ -338,10 +338,10 @@ public static ContainerReportsProto getContainerReports(
|
||||
* @return ContainerReportsProto
|
||||
*/
|
||||
public static ContainerReportsProto getContainerReports(
|
||||
List<ContainerInfo> containerInfos) {
|
||||
List<ContainerReplicaProto> containerInfos) {
|
||||
ContainerReportsProto.Builder
|
||||
reportsBuilder = ContainerReportsProto.newBuilder();
|
||||
for (ContainerInfo containerInfo : containerInfos) {
|
||||
for (ContainerReplicaProto containerInfo : containerInfos) {
|
||||
reportsBuilder.addReports(containerInfo);
|
||||
}
|
||||
return reportsBuilder.build();
|
||||
@ -354,7 +354,8 @@ public static ContainerReportsProto getContainerReports(
|
||||
*
|
||||
* @return ContainerInfo
|
||||
*/
|
||||
public static ContainerInfo getRandomContainerInfo(long containerId) {
|
||||
public static ContainerReplicaProto getRandomContainerInfo(
|
||||
long containerId) {
|
||||
return createContainerInfo(containerId,
|
||||
OzoneConsts.GB * 5,
|
||||
random.nextLong(1000),
|
||||
@ -379,11 +380,12 @@ public static ContainerInfo getRandomContainerInfo(long containerId) {
|
||||
*
|
||||
* @return ContainerInfo
|
||||
*/
|
||||
public static ContainerInfo createContainerInfo(
|
||||
public static ContainerReplicaProto createContainerInfo(
|
||||
long containerId, long size, long keyCount, long bytesUsed,
|
||||
long readCount, long readBytes, long writeCount, long writeBytes) {
|
||||
return ContainerInfo.newBuilder()
|
||||
return ContainerReplicaProto.newBuilder()
|
||||
.setContainerID(containerId)
|
||||
.setState(ContainerReplicaProto.State.OPEN)
|
||||
.setSize(size)
|
||||
.setKeyCount(keyCount)
|
||||
.setUsed(bytesUsed)
|
||||
|
@ -191,9 +191,11 @@ private ContainerReportsProto createContainerReport(long[] containerIds) {
|
||||
|
||||
for (long containerId : containerIds) {
|
||||
org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder
|
||||
.StorageContainerDatanodeProtocolProtos
|
||||
.ContainerReplicaProto.Builder
|
||||
ciBuilder = org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
|
||||
.StorageContainerDatanodeProtocolProtos
|
||||
.ContainerReplicaProto.newBuilder();
|
||||
ciBuilder.setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
|
||||
.setSize(5368709120L)
|
||||
.setUsed(2000000000L)
|
||||
|
@ -28,7 +28,7 @@
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos;
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
|
||||
@ -244,10 +244,10 @@ public void testContainerCreationLeaseTimeout() throws IOException,
|
||||
public void testFullContainerReport() throws Exception {
|
||||
ContainerInfo info = createContainer();
|
||||
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
|
||||
List<StorageContainerDatanodeProtocolProtos.ContainerInfo> reports =
|
||||
List<ContainerReplicaProto> reports =
|
||||
new ArrayList<>();
|
||||
StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
|
||||
StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
|
||||
ContainerReplicaProto.Builder ciBuilder =
|
||||
ContainerReplicaProto.newBuilder();
|
||||
ciBuilder.setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
|
||||
.setSize(5368709120L)
|
||||
.setUsed(2000000000L)
|
||||
@ -257,6 +257,7 @@ public void testFullContainerReport() throws Exception {
|
||||
.setReadBytes(2000000000L)
|
||||
.setWriteBytes(2000000000L)
|
||||
.setContainerID(info.getContainerID())
|
||||
.setState(ContainerReplicaProto.State.CLOSED)
|
||||
.setDeleteTransactionId(0);
|
||||
|
||||
reports.add(ciBuilder.build());
|
||||
@ -274,14 +275,14 @@ public void testFullContainerReport() throws Exception {
|
||||
updatedContainer.getNumberOfKeys());
|
||||
Assert.assertEquals(2000000000L, updatedContainer.getUsedBytes());
|
||||
|
||||
for (StorageContainerDatanodeProtocolProtos.ContainerInfo c : reports) {
|
||||
for (ContainerReplicaProto c : reports) {
|
||||
Assert.assertEquals(containerManager.getContainerReplicas(
|
||||
ContainerID.valueof(c.getContainerID())).size(), 1);
|
||||
}
|
||||
|
||||
containerManager.processContainerReports(TestUtils.randomDatanodeDetails(),
|
||||
crBuilder.build());
|
||||
for (StorageContainerDatanodeProtocolProtos.ContainerInfo c : reports) {
|
||||
for (ContainerReplicaProto c : reports) {
|
||||
Assert.assertEquals(containerManager.getContainerReplicas(
|
||||
ContainerID.valueof(c.getContainerID())).size(), 2);
|
||||
}
|
||||
@ -292,10 +293,10 @@ public void testListContainerAfterReport() throws Exception {
|
||||
ContainerInfo info1 = createContainer();
|
||||
ContainerInfo info2 = createContainer();
|
||||
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
|
||||
List<StorageContainerDatanodeProtocolProtos.ContainerInfo> reports =
|
||||
List<ContainerReplicaProto> reports =
|
||||
new ArrayList<>();
|
||||
StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
|
||||
StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
|
||||
ContainerReplicaProto.Builder ciBuilder =
|
||||
ContainerReplicaProto.newBuilder();
|
||||
long cID1 = info1.getContainerID();
|
||||
long cID2 = info2.getContainerID();
|
||||
ciBuilder.setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
|
||||
@ -304,7 +305,8 @@ public void testListContainerAfterReport() throws Exception {
|
||||
.setKeyCount(100000000L)
|
||||
.setReadBytes(1000000000L)
|
||||
.setWriteBytes(1000000000L)
|
||||
.setContainerID(cID1);
|
||||
.setContainerID(cID1)
|
||||
.setState(ContainerReplicaProto.State.CLOSED);
|
||||
reports.add(ciBuilder.build());
|
||||
|
||||
ciBuilder.setFinalhash("e16cc9d6024365750ed8dbd194ea54a9")
|
||||
|
@ -23,9 +23,7 @@
|
||||
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerData;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerLifeCycleState;
|
||||
.ContainerDataProto;
|
||||
import org.apache.hadoop.hdds.scm.cli.SCMCLI;
|
||||
import org.apache.hadoop.hdds.scm.client.ScmClient;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||
@ -64,13 +62,13 @@ public Void call() throws Exception {
|
||||
getContainerWithPipeline(containerID);
|
||||
Preconditions.checkNotNull(container, "Container cannot be null");
|
||||
|
||||
ContainerData containerData = scmClient.readContainer(container
|
||||
ContainerDataProto containerData = scmClient.readContainer(container
|
||||
.getContainerInfo().getContainerID(), container.getPipeline());
|
||||
|
||||
// Print container report info.
|
||||
LOG.info("Container id: {}", containerID);
|
||||
String openStatus =
|
||||
containerData.getState() == ContainerLifeCycleState.OPEN ? "OPEN" :
|
||||
containerData.getState() == ContainerDataProto.State.OPEN ? "OPEN" :
|
||||
"CLOSED";
|
||||
LOG.info("Container State: {}", openStatus);
|
||||
LOG.info("Container Path: {}", containerData.getContainerPath());
|
||||
|
@ -464,7 +464,7 @@ public void testRetriesOnBlockNotCommittedException() throws Exception {
|
||||
if (datanodes.get(0).equals(datanodeService.getDatanodeDetails())) {
|
||||
datanodeService.getDatanodeStateMachine().getContainer()
|
||||
.getContainerSet().getContainer(containerID).getContainerData()
|
||||
.setState(ContainerProtos.ContainerLifeCycleState.CLOSING);
|
||||
.setState(ContainerProtos.ContainerDataProto.State.CLOSING);
|
||||
}
|
||||
}
|
||||
dataString = fixedLengthString(keyString, (chunkSize * 1 / 2));
|
||||
|
@ -161,7 +161,7 @@ public void testContainerStateMachineFailures() throws Exception {
|
||||
.getContainer().getContainerSet()
|
||||
.getContainer(omKeyLocationInfo.getContainerID())
|
||||
.getContainerState()
|
||||
== ContainerProtos.ContainerLifeCycleState.UNHEALTHY);
|
||||
== ContainerProtos.ContainerDataProto.State.UNHEALTHY);
|
||||
try {
|
||||
// subsequent requests will fail with unhealthy container exception
|
||||
key.close();
|
||||
|
@ -268,10 +268,10 @@ public void testGetContainerReports() throws Exception {
|
||||
|
||||
// ContainerSet#getContainerReport currently returns all containers (open
|
||||
// and closed) reports.
|
||||
List<StorageContainerDatanodeProtocolProtos.ContainerInfo> reports =
|
||||
List<StorageContainerDatanodeProtocolProtos.ContainerReplicaProto> reports =
|
||||
containerSet.getContainerReport().getReportsList();
|
||||
Assert.assertEquals(10, reports.size());
|
||||
for (StorageContainerDatanodeProtocolProtos.ContainerInfo report :
|
||||
for (StorageContainerDatanodeProtocolProtos.ContainerReplicaProto report :
|
||||
reports) {
|
||||
long actualContainerID = report.getContainerID();
|
||||
Assert.assertTrue(containerIDs.remove(actualContainerID));
|
||||
|
@ -24,7 +24,7 @@
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
|
||||
@ -233,9 +233,11 @@ private void verifyPendingDeleteEvent()
|
||||
ContainerReportsProto containerReport = dnContainerSet.getContainerReport();
|
||||
ContainerReportsProto.Builder dummyReportsBuilder =
|
||||
ContainerReportsProto.newBuilder();
|
||||
for (ContainerInfo containerInfo : containerReport.getReportsList()) {
|
||||
for (ContainerReplicaProto containerInfo :
|
||||
containerReport.getReportsList()) {
|
||||
dummyReportsBuilder.addReports(
|
||||
ContainerInfo.newBuilder(containerInfo).setDeleteTransactionId(0)
|
||||
ContainerReplicaProto.newBuilder(containerInfo)
|
||||
.setDeleteTransactionId(0)
|
||||
.build());
|
||||
}
|
||||
ContainerReportsProto dummyReport = dummyReportsBuilder.build();
|
||||
@ -246,7 +248,7 @@ private void verifyPendingDeleteEvent()
|
||||
// wait for event to be handled by event handler
|
||||
Thread.sleep(1000);
|
||||
String output = logCapturer.getOutput();
|
||||
for (ContainerInfo containerInfo : dummyReport.getReportsList()) {
|
||||
for (ContainerReplicaProto containerInfo : dummyReport.getReportsList()) {
|
||||
long containerId = containerInfo.getContainerID();
|
||||
// Event should be triggered only for containers which have deleted blocks
|
||||
if (containerIdsWithDeletedBlocks.contains(containerId)) {
|
||||
|
@ -469,7 +469,7 @@ private void convertContainerDB(Path dbPath, Path outPath)
|
||||
long containerID = Longs.fromByteArray(key);
|
||||
ContainerInfo containerInfo = null;
|
||||
containerInfo = ContainerInfo.fromProtobuf(
|
||||
HddsProtos.SCMContainerInfo.PARSER.parseFrom(value));
|
||||
HddsProtos.ContainerInfoProto.PARSER.parseFrom(value));
|
||||
Preconditions.checkNotNull(containerInfo);
|
||||
try {
|
||||
//TODO: include container state to sqllite schema
|
||||
|
Loading…
Reference in New Issue
Block a user