HDDS-203. Add getCommittedBlockLength API in datanode. Contributed by Shashikant Banerjee.
This commit is contained in:
parent
81d59506e5
commit
955f795101
@ -20,8 +20,10 @@
|
|||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* BlockID of ozone (containerID + localID)
|
* BlockID of ozone (containerID localID)
|
||||||
*/
|
*/
|
||||||
public class BlockID {
|
public class BlockID {
|
||||||
private long containerID;
|
private long containerID;
|
||||||
@ -68,4 +70,20 @@ public static BlockID getFromProtobuf(ContainerProtos.DatanodeBlockID blockID) {
|
|||||||
blockID.getLocalID());
|
blockID.getLocalID());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
BlockID blockID = (BlockID) o;
|
||||||
|
return containerID == blockID.containerID && localID == blockID.localID;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(containerID, localID);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -98,6 +98,34 @@ public static GetKeyResponseProto getKey(XceiverClientSpi xceiverClient,
|
|||||||
return response.getGetKey();
|
return response.getGetKey();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calls the container protocol to get the length of a committed block.
|
||||||
|
*
|
||||||
|
* @param xceiverClient client to perform call
|
||||||
|
* @param blockID blockId for the Block
|
||||||
|
* @param traceID container protocol call args
|
||||||
|
* @return container protocol getLastCommittedBlockLength response
|
||||||
|
* @throws IOException if there is an I/O error while performing the call
|
||||||
|
*/
|
||||||
|
public static ContainerProtos.GetCommittedBlockLengthResponseProto
|
||||||
|
getCommittedBlockLength(
|
||||||
|
XceiverClientSpi xceiverClient, BlockID blockID, String traceID)
|
||||||
|
throws IOException {
|
||||||
|
ContainerProtos.GetCommittedBlockLengthRequestProto.Builder
|
||||||
|
getBlockLengthRequestBuilder =
|
||||||
|
ContainerProtos.GetCommittedBlockLengthRequestProto.newBuilder().
|
||||||
|
setBlockID(blockID.getDatanodeBlockIDProtobuf());
|
||||||
|
String id = xceiverClient.getPipeline().getLeader().getUuidString();
|
||||||
|
ContainerCommandRequestProto request =
|
||||||
|
ContainerCommandRequestProto.newBuilder()
|
||||||
|
.setCmdType(Type.GetCommittedBlockLength).setTraceID(traceID)
|
||||||
|
.setDatanodeUuid(id)
|
||||||
|
.setGetCommittedBlockLength(getBlockLengthRequestBuilder).build();
|
||||||
|
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
|
||||||
|
validateContainerResponse(response);
|
||||||
|
return response.getGetCommittedBlockLength();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calls the container protocol to put a container key.
|
* Calls the container protocol to put a container key.
|
||||||
*
|
*
|
||||||
|
@ -41,6 +41,11 @@ public class KeyData {
|
|||||||
*/
|
*/
|
||||||
private List<ContainerProtos.ChunkInfo> chunks;
|
private List<ContainerProtos.ChunkInfo> chunks;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* total size of the key.
|
||||||
|
*/
|
||||||
|
private long size;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a KeyData Object.
|
* Constructs a KeyData Object.
|
||||||
*
|
*
|
||||||
@ -49,6 +54,7 @@ public class KeyData {
|
|||||||
public KeyData(BlockID blockID) {
|
public KeyData(BlockID blockID) {
|
||||||
this.blockID = blockID;
|
this.blockID = blockID;
|
||||||
this.metadata = new TreeMap<>();
|
this.metadata = new TreeMap<>();
|
||||||
|
this.size = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -66,6 +72,9 @@ public static KeyData getFromProtoBuf(ContainerProtos.KeyData data) throws
|
|||||||
data.getMetadata(x).getValue());
|
data.getMetadata(x).getValue());
|
||||||
}
|
}
|
||||||
keyData.setChunks(data.getChunksList());
|
keyData.setChunks(data.getChunksList());
|
||||||
|
if (data.hasSize()) {
|
||||||
|
keyData.setSize(data.getSize());
|
||||||
|
}
|
||||||
return keyData;
|
return keyData;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -84,6 +93,7 @@ public ContainerProtos.KeyData getProtoBufMessage() {
|
|||||||
builder.addMetadata(keyValBuilder.setKey(entry.getKey())
|
builder.addMetadata(keyValBuilder.setKey(entry.getKey())
|
||||||
.setValue(entry.getValue()).build());
|
.setValue(entry.getValue()).build());
|
||||||
}
|
}
|
||||||
|
builder.setSize(size);
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -182,11 +192,26 @@ public void setChunks(List<ContainerProtos.ChunkInfo> chunks) {
|
|||||||
this.chunks = chunks;
|
this.chunks = chunks;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* sets the total size of the block
|
||||||
|
* @param size size of the block
|
||||||
|
*/
|
||||||
|
public void setSize(long size) {
|
||||||
|
this.size = size;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the total size of chunks allocated for the key.
|
* Get the total size of chunks allocated for the key.
|
||||||
* @return total size of the key.
|
* @return total size of the key.
|
||||||
*/
|
*/
|
||||||
public long getSize() {
|
public long getSize() {
|
||||||
return chunks.parallelStream().mapToLong(e->e.getLen()).sum();
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* computes the total size of chunks allocated for the key.
|
||||||
|
*/
|
||||||
|
public void computeSize() {
|
||||||
|
setSize(chunks.parallelStream().mapToLong(e -> e.getLen()).sum());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -99,7 +99,7 @@ enum Type {
|
|||||||
PutSmallFile = 15;
|
PutSmallFile = 15;
|
||||||
GetSmallFile = 16;
|
GetSmallFile = 16;
|
||||||
CloseContainer = 17;
|
CloseContainer = 17;
|
||||||
|
GetCommittedBlockLength = 18;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -193,8 +193,8 @@ message ContainerCommandRequestProto {
|
|||||||
optional PutSmallFileRequestProto putSmallFile = 16;
|
optional PutSmallFileRequestProto putSmallFile = 16;
|
||||||
optional GetSmallFileRequestProto getSmallFile = 17;
|
optional GetSmallFileRequestProto getSmallFile = 17;
|
||||||
optional CloseContainerRequestProto closeContainer = 18;
|
optional CloseContainerRequestProto closeContainer = 18;
|
||||||
|
optional GetCommittedBlockLengthRequestProto getCommittedBlockLength = 19;
|
||||||
required string datanodeUuid = 19;
|
required string datanodeUuid = 20;
|
||||||
}
|
}
|
||||||
|
|
||||||
message ContainerCommandResponseProto {
|
message ContainerCommandResponseProto {
|
||||||
@ -223,6 +223,7 @@ message ContainerCommandResponseProto {
|
|||||||
optional PutSmallFileResponseProto putSmallFile = 19;
|
optional PutSmallFileResponseProto putSmallFile = 19;
|
||||||
optional GetSmallFileResponseProto getSmallFile = 20;
|
optional GetSmallFileResponseProto getSmallFile = 20;
|
||||||
optional CloseContainerResponseProto closeContainer = 21;
|
optional CloseContainerResponseProto closeContainer = 21;
|
||||||
|
optional GetCommittedBlockLengthResponseProto getCommittedBlockLength = 22;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -302,6 +303,7 @@ message KeyData {
|
|||||||
optional int64 flags = 2; // for future use.
|
optional int64 flags = 2; // for future use.
|
||||||
repeated KeyValue metadata = 3;
|
repeated KeyValue metadata = 3;
|
||||||
repeated ChunkInfo chunks = 4;
|
repeated ChunkInfo chunks = 4;
|
||||||
|
optional int64 size = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Key Messages.
|
// Key Messages.
|
||||||
@ -325,6 +327,15 @@ message DeleteKeyRequestProto {
|
|||||||
required DatanodeBlockID blockID = 1;
|
required DatanodeBlockID blockID = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message GetCommittedBlockLengthRequestProto {
|
||||||
|
required DatanodeBlockID blockID = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetCommittedBlockLengthResponseProto {
|
||||||
|
required DatanodeBlockID blockID = 1;
|
||||||
|
required int64 blockLength = 2;
|
||||||
|
}
|
||||||
|
|
||||||
message DeleteKeyResponseProto {
|
message DeleteKeyResponseProto {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -181,6 +181,8 @@ private long getContainerID(ContainerCommandRequestProto request)
|
|||||||
.getContainerID();
|
.getContainerID();
|
||||||
case GetSmallFile:
|
case GetSmallFile:
|
||||||
return request.getGetSmallFile().getKey().getBlockID().getContainerID();
|
return request.getGetSmallFile().getKey().getBlockID().getContainerID();
|
||||||
|
case GetCommittedBlockLength:
|
||||||
|
return request.getGetCommittedBlockLength().getBlockID().getContainerID();
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new StorageContainerException(
|
throw new StorageContainerException(
|
||||||
|
@ -198,6 +198,8 @@ public ContainerCommandResponseProto handle(
|
|||||||
return handlePutSmallFile(request, kvContainer);
|
return handlePutSmallFile(request, kvContainer);
|
||||||
case GetSmallFile:
|
case GetSmallFile:
|
||||||
return handleGetSmallFile(request, kvContainer);
|
return handleGetSmallFile(request, kvContainer);
|
||||||
|
case GetCommittedBlockLength:
|
||||||
|
return handleGetCommittedBlockLength(request, kvContainer);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -443,6 +445,8 @@ private void commitPendingKeys(KeyValueContainer kvContainer)
|
|||||||
private void commitKey(KeyData keyData, KeyValueContainer kvContainer)
|
private void commitKey(KeyData keyData, KeyValueContainer kvContainer)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Preconditions.checkNotNull(keyData);
|
Preconditions.checkNotNull(keyData);
|
||||||
|
//sets the total size of the key before committing
|
||||||
|
keyData.computeSize();
|
||||||
keyManager.putKey(kvContainer, keyData);
|
keyManager.putKey(kvContainer, keyData);
|
||||||
//update the open key Map in containerManager
|
//update the open key Map in containerManager
|
||||||
this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID());
|
this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID());
|
||||||
@ -478,6 +482,35 @@ ContainerCommandResponseProto handleGetKey(
|
|||||||
return KeyUtils.getKeyDataResponse(request, responseData);
|
return KeyUtils.getKeyDataResponse(request, responseData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles GetCommittedBlockLength operation.
|
||||||
|
* Calls KeyManager to process the request.
|
||||||
|
*/
|
||||||
|
ContainerCommandResponseProto handleGetCommittedBlockLength(
|
||||||
|
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
|
||||||
|
if (!request.hasGetCommittedBlockLength()) {
|
||||||
|
LOG.debug("Malformed Get Key request. trace ID: {}",
|
||||||
|
request.getTraceID());
|
||||||
|
return ContainerUtils.malformedRequest(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
long blockLength;
|
||||||
|
try {
|
||||||
|
BlockID blockID = BlockID.getFromProtobuf(
|
||||||
|
request.getGetCommittedBlockLength().getBlockID());
|
||||||
|
blockLength = keyManager.getCommittedBlockLength(kvContainer, blockID);
|
||||||
|
|
||||||
|
} catch (StorageContainerException ex) {
|
||||||
|
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
return ContainerUtils.logAndReturnError(LOG,
|
||||||
|
new StorageContainerException("GetCommittedBlockLength failed", ex,
|
||||||
|
IO_EXCEPTION), request);
|
||||||
|
}
|
||||||
|
|
||||||
|
return KeyUtils.getBlockLengthResponse(request, blockLength);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle Delete Key operation. Calls KeyManager to process the request.
|
* Handle Delete Key operation. Calls KeyManager to process the request.
|
||||||
*/
|
*/
|
||||||
@ -665,6 +698,7 @@ ContainerCommandResponseProto handlePutSmallFile(
|
|||||||
List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
|
List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
|
||||||
chunks.add(chunkInfo.getProtoBufMessage());
|
chunks.add(chunkInfo.getProtoBufMessage());
|
||||||
keyData.setChunks(chunks);
|
keyData.setChunks(chunks);
|
||||||
|
keyData.computeSize();
|
||||||
keyManager.putKey(kvContainer, keyData);
|
keyManager.putKey(kvContainer, keyData);
|
||||||
metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
|
metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
|
||||||
|
|
||||||
|
@ -144,4 +144,24 @@ public static ContainerCommandResponseProto getKeyDataResponse(
|
|||||||
builder.setGetKey(getKey);
|
builder.setGetKey(getKey);
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns successful getCommittedBlockLength Response.
|
||||||
|
* @param msg - Request.
|
||||||
|
* @return Response.
|
||||||
|
*/
|
||||||
|
public static ContainerProtos.ContainerCommandResponseProto
|
||||||
|
getBlockLengthResponse(ContainerProtos.
|
||||||
|
ContainerCommandRequestProto msg, long blockLength) {
|
||||||
|
ContainerProtos.GetCommittedBlockLengthResponseProto.Builder
|
||||||
|
getCommittedBlockLengthResponseBuilder = ContainerProtos.
|
||||||
|
GetCommittedBlockLengthResponseProto.newBuilder();
|
||||||
|
getCommittedBlockLengthResponseBuilder.setBlockLength(blockLength);
|
||||||
|
getCommittedBlockLengthResponseBuilder
|
||||||
|
.setBlockID(msg.getGetCommittedBlockLength().getBlockID());
|
||||||
|
ContainerProtos.ContainerCommandResponseProto.Builder builder =
|
||||||
|
ContainerUtils.getSuccessResponseBuilder(msg);
|
||||||
|
builder.setGetCommittedBlockLength(getCommittedBlockLengthResponseBuilder);
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
}
|
}
|
@ -119,6 +119,32 @@ public KeyData getKey(Container container, BlockID blockID)
|
|||||||
return KeyData.getFromProtoBuf(keyData);
|
return KeyData.getFromProtoBuf(keyData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the length of the committed block.
|
||||||
|
*
|
||||||
|
* @param container - Container from which key need to be get.
|
||||||
|
* @param blockID - BlockID of the key.
|
||||||
|
* @return length of the block.
|
||||||
|
* @throws IOException in case, the block key does not exist in db.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public long getCommittedBlockLength(Container container, BlockID blockID)
|
||||||
|
throws IOException {
|
||||||
|
KeyValueContainerData containerData = (KeyValueContainerData) container
|
||||||
|
.getContainerData();
|
||||||
|
MetadataStore db = KeyUtils.getDB(containerData, config);
|
||||||
|
// This is a post condition that acts as a hint to the user.
|
||||||
|
// Should never fail.
|
||||||
|
Preconditions.checkNotNull(db, "DB cannot be null here");
|
||||||
|
byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID()));
|
||||||
|
if (kData == null) {
|
||||||
|
throw new StorageContainerException("Unable to find the key.",
|
||||||
|
NO_SUCH_KEY);
|
||||||
|
}
|
||||||
|
ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom(kData);
|
||||||
|
return keyData.getSize();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deletes an existing Key.
|
* Deletes an existing Key.
|
||||||
*
|
*
|
||||||
@ -164,6 +190,7 @@ public void deleteKey(Container container, BlockID blockID) throws
|
|||||||
* @param count - Number of keys to return.
|
* @param count - Number of keys to return.
|
||||||
* @return List of Keys that match the criteria.
|
* @return List of Keys that match the criteria.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public List<KeyData> listKey(Container container, long startLocalID, int
|
public List<KeyData> listKey(Container container, long startLocalID, int
|
||||||
count) throws IOException {
|
count) throws IOException {
|
||||||
Preconditions.checkNotNull(container, "container cannot be null");
|
Preconditions.checkNotNull(container, "container cannot be null");
|
||||||
|
@ -69,6 +69,13 @@ public interface KeyManager {
|
|||||||
List<KeyData> listKey(Container container, long startLocalID, int count) throws
|
List<KeyData> listKey(Container container, long startLocalID, int count) throws
|
||||||
IOException;
|
IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the last committed block length for the block.
|
||||||
|
* @param blockID blockId
|
||||||
|
*/
|
||||||
|
long getCommittedBlockLength(Container container, BlockID blockID)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shutdown ContainerManager.
|
* Shutdown ContainerManager.
|
||||||
*/
|
*/
|
||||||
|
@ -0,0 +1,191 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.scm;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
|
import org.apache.commons.lang3.RandomUtils;
|
||||||
|
import org.apache.hadoop.hdds.client.BlockID;
|
||||||
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||||
|
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||||
|
import org.apache.hadoop.hdds.scm.XceiverClientManager;
|
||||||
|
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
|
||||||
|
import org.apache.hadoop.hdds.scm.container.common.helpers.
|
||||||
|
ContainerWithPipeline;
|
||||||
|
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||||
|
import org.apache.hadoop.hdds.scm.container.common.helpers.
|
||||||
|
StorageContainerException;
|
||||||
|
import org.apache.hadoop.hdds.scm.container.placement.algorithms.
|
||||||
|
ContainerPlacementPolicy;
|
||||||
|
import org.apache.hadoop.hdds.scm.container.placement.algorithms.
|
||||||
|
SCMContainerPlacementCapacity;
|
||||||
|
import org.apache.hadoop.hdds.scm.protocolPB.
|
||||||
|
StorageContainerLocationProtocolClientSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||||
|
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Container calls.
|
||||||
|
*/
|
||||||
|
public class TestCommittedBlockLengthAPI {
|
||||||
|
|
||||||
|
private static MiniOzoneCluster cluster;
|
||||||
|
private static OzoneConfiguration ozoneConfig;
|
||||||
|
private static StorageContainerLocationProtocolClientSideTranslatorPB
|
||||||
|
storageContainerLocationClient;
|
||||||
|
private static XceiverClientManager xceiverClientManager;
|
||||||
|
private static String containerOwner = "OZONE";
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void init() throws Exception {
|
||||||
|
ozoneConfig = new OzoneConfiguration();
|
||||||
|
ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
|
||||||
|
SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
|
||||||
|
cluster =
|
||||||
|
MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
|
||||||
|
cluster.waitForClusterToBeReady();
|
||||||
|
storageContainerLocationClient =
|
||||||
|
cluster.getStorageContainerLocationClient();
|
||||||
|
xceiverClientManager = new XceiverClientManager(ozoneConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void shutdown() throws InterruptedException {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
IOUtils.cleanupWithLogger(null, storageContainerLocationClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void tesGetCommittedBlockLength() throws Exception {
|
||||||
|
ContainerProtos.GetCommittedBlockLengthResponseProto response;
|
||||||
|
String traceID = UUID.randomUUID().toString();
|
||||||
|
ContainerWithPipeline container = storageContainerLocationClient
|
||||||
|
.allocateContainer(xceiverClientManager.getType(),
|
||||||
|
HddsProtos.ReplicationFactor.ONE, containerOwner);
|
||||||
|
long containerID = container.getContainerInfo().getContainerID();
|
||||||
|
Pipeline pipeline = container.getPipeline();
|
||||||
|
XceiverClientSpi client =
|
||||||
|
xceiverClientManager.acquireClient(pipeline, containerID);
|
||||||
|
//create the container
|
||||||
|
ContainerProtocolCalls.createContainer(client, containerID, traceID);
|
||||||
|
|
||||||
|
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
|
||||||
|
byte[] data =
|
||||||
|
RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
|
||||||
|
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
|
||||||
|
ContainerTestHelper
|
||||||
|
.getWriteChunkRequest(container.getPipeline(), blockID,
|
||||||
|
data.length);
|
||||||
|
client.sendCommand(writeChunkRequest);
|
||||||
|
try {
|
||||||
|
// since there is neither explicit putKey request made for the block,
|
||||||
|
// nor the container is closed, GetCommittedBlockLength request
|
||||||
|
// should fail here.
|
||||||
|
response = ContainerProtocolCalls
|
||||||
|
.getCommittedBlockLength(client, blockID, traceID);
|
||||||
|
Assert.fail("Expected exception not thrown");
|
||||||
|
} catch (StorageContainerException sce) {
|
||||||
|
Assert.assertTrue(sce.getMessage().contains("Unable to find the key"));
|
||||||
|
}
|
||||||
|
// Now, explicitly make a putKey request for the block.
|
||||||
|
ContainerProtos.ContainerCommandRequestProto putKeyRequest =
|
||||||
|
ContainerTestHelper
|
||||||
|
.getPutKeyRequest(pipeline, writeChunkRequest.getWriteChunk());
|
||||||
|
client.sendCommand(putKeyRequest);
|
||||||
|
response = ContainerProtocolCalls
|
||||||
|
.getCommittedBlockLength(client, blockID, traceID);
|
||||||
|
// make sure the block ids in the request and response are same.
|
||||||
|
Assert.assertTrue(
|
||||||
|
BlockID.getFromProtobuf(response.getBlockID()).equals(blockID));
|
||||||
|
Assert.assertTrue(response.getBlockLength() == data.length);
|
||||||
|
xceiverClientManager.releaseClient(client);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void tesGetCommittedBlockLengthWithClosedContainer()
|
||||||
|
throws Exception {
|
||||||
|
String traceID = UUID.randomUUID().toString();
|
||||||
|
ContainerWithPipeline container = storageContainerLocationClient
|
||||||
|
.allocateContainer(xceiverClientManager.getType(),
|
||||||
|
HddsProtos.ReplicationFactor.ONE, containerOwner);
|
||||||
|
long containerID = container.getContainerInfo().getContainerID();
|
||||||
|
Pipeline pipeline = container.getPipeline();
|
||||||
|
XceiverClientSpi client =
|
||||||
|
xceiverClientManager.acquireClient(pipeline, containerID);
|
||||||
|
// create the container
|
||||||
|
ContainerProtocolCalls.createContainer(client, containerID, traceID);
|
||||||
|
|
||||||
|
byte[] data =
|
||||||
|
RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
|
||||||
|
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
|
||||||
|
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
|
||||||
|
ContainerTestHelper
|
||||||
|
.getWriteChunkRequest(container.getPipeline(), blockID,
|
||||||
|
data.length);
|
||||||
|
client.sendCommand(writeChunkRequest);
|
||||||
|
// close the container
|
||||||
|
ContainerProtocolCalls.closeContainer(client, containerID, traceID);
|
||||||
|
ContainerProtos.GetCommittedBlockLengthResponseProto response =
|
||||||
|
ContainerProtocolCalls
|
||||||
|
.getCommittedBlockLength(client, blockID, traceID);
|
||||||
|
// make sure the block ids in the request and response are same.
|
||||||
|
// This will also ensure that closing the container committed the block
|
||||||
|
// on the Datanodes.
|
||||||
|
Assert.assertTrue(
|
||||||
|
BlockID.getFromProtobuf(response.getBlockID()).equals(blockID));
|
||||||
|
Assert.assertTrue(response.getBlockLength() == data.length);
|
||||||
|
xceiverClientManager.releaseClient(client);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void tesGetCommittedBlockLengthForInvalidBlock() throws Exception {
|
||||||
|
String traceID = UUID.randomUUID().toString();
|
||||||
|
ContainerWithPipeline container = storageContainerLocationClient
|
||||||
|
.allocateContainer(xceiverClientManager.getType(),
|
||||||
|
HddsProtos.ReplicationFactor.ONE, containerOwner);
|
||||||
|
long containerID = container.getContainerInfo().getContainerID();
|
||||||
|
XceiverClientSpi client = xceiverClientManager
|
||||||
|
.acquireClient(container.getPipeline(), containerID);
|
||||||
|
ContainerProtocolCalls.createContainer(client, containerID, traceID);
|
||||||
|
|
||||||
|
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
|
||||||
|
// move the container to closed state
|
||||||
|
ContainerProtocolCalls.closeContainer(client, containerID, traceID);
|
||||||
|
try {
|
||||||
|
// There is no block written inside the container. The request should
|
||||||
|
// fail.
|
||||||
|
ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID);
|
||||||
|
Assert.fail("Expected exception not thrown");
|
||||||
|
} catch (StorageContainerException sce) {
|
||||||
|
Assert.assertTrue(sce.getMessage().contains("Unable to find the key"));
|
||||||
|
}
|
||||||
|
xceiverClientManager.releaseClient(client);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user