diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java index 62b12e3e04..74e90e95da 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java @@ -20,8 +20,10 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import java.util.Objects; + /** - * BlockID of ozone (containerID + localID) + * BlockID of ozone (containerID localID) */ public class BlockID { private long containerID; @@ -68,4 +70,20 @@ public static BlockID getFromProtobuf(ContainerProtos.DatanodeBlockID blockID) { blockID.getLocalID()); } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BlockID blockID = (BlockID) o; + return containerID == blockID.containerID && localID == blockID.localID; + } + + @Override + public int hashCode() { + return Objects.hash(containerID, localID); + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index f4f14efa38..36cdfc995d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -98,6 +98,34 @@ public static GetKeyResponseProto getKey(XceiverClientSpi xceiverClient, return response.getGetKey(); } + /** + * Calls the container protocol to get the length of a committed block. + * + * @param xceiverClient client to perform call + * @param blockID blockId for the Block + * @param traceID container protocol call args + * @return container protocol getLastCommittedBlockLength response + * @throws IOException if there is an I/O error while performing the call + */ + public static ContainerProtos.GetCommittedBlockLengthResponseProto + getCommittedBlockLength( + XceiverClientSpi xceiverClient, BlockID blockID, String traceID) + throws IOException { + ContainerProtos.GetCommittedBlockLengthRequestProto.Builder + getBlockLengthRequestBuilder = + ContainerProtos.GetCommittedBlockLengthRequestProto.newBuilder(). + setBlockID(blockID.getDatanodeBlockIDProtobuf()); + String id = xceiverClient.getPipeline().getLeader().getUuidString(); + ContainerCommandRequestProto request = + ContainerCommandRequestProto.newBuilder() + .setCmdType(Type.GetCommittedBlockLength).setTraceID(traceID) + .setDatanodeUuid(id) + .setGetCommittedBlockLength(getBlockLengthRequestBuilder).build(); + ContainerCommandResponseProto response = xceiverClient.sendCommand(request); + validateContainerResponse(response); + return response.getGetCommittedBlockLength(); + } + /** * Calls the container protocol to put a container key. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java index b63332fa10..1919ed9902 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java @@ -41,6 +41,11 @@ public class KeyData { */ private List chunks; + /** + * total size of the key. + */ + private long size; + /** * Constructs a KeyData Object. * @@ -49,6 +54,7 @@ public class KeyData { public KeyData(BlockID blockID) { this.blockID = blockID; this.metadata = new TreeMap<>(); + this.size = 0; } /** @@ -66,6 +72,9 @@ public static KeyData getFromProtoBuf(ContainerProtos.KeyData data) throws data.getMetadata(x).getValue()); } keyData.setChunks(data.getChunksList()); + if (data.hasSize()) { + keyData.setSize(data.getSize()); + } return keyData; } @@ -84,6 +93,7 @@ public ContainerProtos.KeyData getProtoBufMessage() { builder.addMetadata(keyValBuilder.setKey(entry.getKey()) .setValue(entry.getValue()).build()); } + builder.setSize(size); return builder.build(); } @@ -182,11 +192,26 @@ public void setChunks(List chunks) { this.chunks = chunks; } + /** + * sets the total size of the block + * @param size size of the block + */ + public void setSize(long size) { + this.size = size; + } + /** * Get the total size of chunks allocated for the key. * @return total size of the key. */ public long getSize() { - return chunks.parallelStream().mapToLong(e->e.getLen()).sum(); + return size; + } + + /** + * computes the total size of chunks allocated for the key. + */ + public void computeSize() { + setSize(chunks.parallelStream().mapToLong(e -> e.getLen()).sum()); } } diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index ff1582e8c8..a3c44677d8 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -99,7 +99,7 @@ enum Type { PutSmallFile = 15; GetSmallFile = 16; CloseContainer = 17; - + GetCommittedBlockLength = 18; } @@ -193,8 +193,8 @@ message ContainerCommandRequestProto { optional PutSmallFileRequestProto putSmallFile = 16; optional GetSmallFileRequestProto getSmallFile = 17; optional CloseContainerRequestProto closeContainer = 18; - - required string datanodeUuid = 19; + optional GetCommittedBlockLengthRequestProto getCommittedBlockLength = 19; + required string datanodeUuid = 20; } message ContainerCommandResponseProto { @@ -223,6 +223,7 @@ message ContainerCommandResponseProto { optional PutSmallFileResponseProto putSmallFile = 19; optional GetSmallFileResponseProto getSmallFile = 20; optional CloseContainerResponseProto closeContainer = 21; + optional GetCommittedBlockLengthResponseProto getCommittedBlockLength = 22; } @@ -302,6 +303,7 @@ message KeyData { optional int64 flags = 2; // for future use. repeated KeyValue metadata = 3; repeated ChunkInfo chunks = 4; + optional int64 size = 5; } // Key Messages. @@ -325,6 +327,15 @@ message DeleteKeyRequestProto { required DatanodeBlockID blockID = 1; } +message GetCommittedBlockLengthRequestProto { + required DatanodeBlockID blockID = 1; +} + +message GetCommittedBlockLengthResponseProto { + required DatanodeBlockID blockID = 1; + required int64 blockLength = 2; +} + message DeleteKeyResponseProto { } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index bee84174a3..6d11abb29d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -181,6 +181,8 @@ private long getContainerID(ContainerCommandRequestProto request) .getContainerID(); case GetSmallFile: return request.getGetSmallFile().getKey().getBlockID().getContainerID(); + case GetCommittedBlockLength: + return request.getGetCommittedBlockLength().getBlockID().getContainerID(); } throw new StorageContainerException( diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index d3a1ca43c1..4123dc881e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -198,6 +198,8 @@ public ContainerCommandResponseProto handle( return handlePutSmallFile(request, kvContainer); case GetSmallFile: return handleGetSmallFile(request, kvContainer); + case GetCommittedBlockLength: + return handleGetCommittedBlockLength(request, kvContainer); } return null; } @@ -443,6 +445,8 @@ private void commitPendingKeys(KeyValueContainer kvContainer) private void commitKey(KeyData keyData, KeyValueContainer kvContainer) throws IOException { Preconditions.checkNotNull(keyData); + //sets the total size of the key before committing + keyData.computeSize(); keyManager.putKey(kvContainer, keyData); //update the open key Map in containerManager this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID()); @@ -478,6 +482,35 @@ ContainerCommandResponseProto handleGetKey( return KeyUtils.getKeyDataResponse(request, responseData); } + /** + * Handles GetCommittedBlockLength operation. + * Calls KeyManager to process the request. + */ + ContainerCommandResponseProto handleGetCommittedBlockLength( + ContainerCommandRequestProto request, KeyValueContainer kvContainer) { + if (!request.hasGetCommittedBlockLength()) { + LOG.debug("Malformed Get Key request. trace ID: {}", + request.getTraceID()); + return ContainerUtils.malformedRequest(request); + } + + long blockLength; + try { + BlockID blockID = BlockID.getFromProtobuf( + request.getGetCommittedBlockLength().getBlockID()); + blockLength = keyManager.getCommittedBlockLength(kvContainer, blockID); + + } catch (StorageContainerException ex) { + return ContainerUtils.logAndReturnError(LOG, ex, request); + } catch (IOException ex) { + return ContainerUtils.logAndReturnError(LOG, + new StorageContainerException("GetCommittedBlockLength failed", ex, + IO_EXCEPTION), request); + } + + return KeyUtils.getBlockLengthResponse(request, blockLength); + } + /** * Handle Delete Key operation. Calls KeyManager to process the request. */ @@ -665,6 +698,7 @@ ContainerCommandResponseProto handlePutSmallFile( List chunks = new LinkedList<>(); chunks.add(chunkInfo.getProtoBufMessage()); keyData.setChunks(chunks); + keyData.computeSize(); keyManager.putKey(kvContainer, keyData); metrics.incContainerBytesStats(Type.PutSmallFile, data.length); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java index 5845fae65e..2be966d23b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java @@ -144,4 +144,24 @@ public static ContainerCommandResponseProto getKeyDataResponse( builder.setGetKey(getKey); return builder.build(); } + + /** + * Returns successful getCommittedBlockLength Response. + * @param msg - Request. + * @return Response. + */ + public static ContainerProtos.ContainerCommandResponseProto + getBlockLengthResponse(ContainerProtos. + ContainerCommandRequestProto msg, long blockLength) { + ContainerProtos.GetCommittedBlockLengthResponseProto.Builder + getCommittedBlockLengthResponseBuilder = ContainerProtos. + GetCommittedBlockLengthResponseProto.newBuilder(); + getCommittedBlockLengthResponseBuilder.setBlockLength(blockLength); + getCommittedBlockLengthResponseBuilder + .setBlockID(msg.getGetCommittedBlockLength().getBlockID()); + ContainerProtos.ContainerCommandResponseProto.Builder builder = + ContainerUtils.getSuccessResponseBuilder(msg); + builder.setGetCommittedBlockLength(getCommittedBlockLengthResponseBuilder); + return builder.build(); + } } \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java index 6a8897ae02..58bf1f8ed0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java @@ -119,6 +119,32 @@ public KeyData getKey(Container container, BlockID blockID) return KeyData.getFromProtoBuf(keyData); } + /** + * Returns the length of the committed block. + * + * @param container - Container from which key need to be get. + * @param blockID - BlockID of the key. + * @return length of the block. + * @throws IOException in case, the block key does not exist in db. + */ + @Override + public long getCommittedBlockLength(Container container, BlockID blockID) + throws IOException { + KeyValueContainerData containerData = (KeyValueContainerData) container + .getContainerData(); + MetadataStore db = KeyUtils.getDB(containerData, config); + // This is a post condition that acts as a hint to the user. + // Should never fail. + Preconditions.checkNotNull(db, "DB cannot be null here"); + byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID())); + if (kData == null) { + throw new StorageContainerException("Unable to find the key.", + NO_SUCH_KEY); + } + ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom(kData); + return keyData.getSize(); + } + /** * Deletes an existing Key. * @@ -164,6 +190,7 @@ public void deleteKey(Container container, BlockID blockID) throws * @param count - Number of keys to return. * @return List of Keys that match the criteria. */ + @Override public List listKey(Container container, long startLocalID, int count) throws IOException { Preconditions.checkNotNull(container, "container cannot be null"); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java index 7a5d48b779..dad688e497 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java @@ -69,6 +69,13 @@ public interface KeyManager { List listKey(Container container, long startLocalID, int count) throws IOException; + /** + * Returns the last committed block length for the block. + * @param blockID blockId + */ + long getCommittedBlockLength(Container container, BlockID blockID) + throws IOException; + /** * Shutdown ContainerManager. */ diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java new file mode 100644 index 0000000000..7e8aa5f1aa --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.scm; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.common.helpers. + ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers. + StorageContainerException; +import org.apache.hadoop.hdds.scm.container.placement.algorithms. + ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.container.placement.algorithms. + SCMContainerPlacementCapacity; +import org.apache.hadoop.hdds.scm.protocolPB. + StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + + +import java.util.UUID; + +/** + * Test Container calls. + */ +public class TestCommittedBlockLengthAPI { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration ozoneConfig; + private static StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + private static XceiverClientManager xceiverClientManager; + private static String containerOwner = "OZONE"; + + @BeforeClass + public static void init() throws Exception { + ozoneConfig = new OzoneConfiguration(); + ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, + SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class); + cluster = + MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build(); + cluster.waitForClusterToBeReady(); + storageContainerLocationClient = + cluster.getStorageContainerLocationClient(); + xceiverClientManager = new XceiverClientManager(ozoneConfig); + } + + @AfterClass + public static void shutdown() throws InterruptedException { + if (cluster != null) { + cluster.shutdown(); + } + IOUtils.cleanupWithLogger(null, storageContainerLocationClient); + } + + @Test + public void tesGetCommittedBlockLength() throws Exception { + ContainerProtos.GetCommittedBlockLengthResponseProto response; + String traceID = UUID.randomUUID().toString(); + ContainerWithPipeline container = storageContainerLocationClient + .allocateContainer(xceiverClientManager.getType(), + HddsProtos.ReplicationFactor.ONE, containerOwner); + long containerID = container.getContainerInfo().getContainerID(); + Pipeline pipeline = container.getPipeline(); + XceiverClientSpi client = + xceiverClientManager.acquireClient(pipeline, containerID); + //create the container + ContainerProtocolCalls.createContainer(client, containerID, traceID); + + BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); + byte[] data = + RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes(); + ContainerProtos.ContainerCommandRequestProto writeChunkRequest = + ContainerTestHelper + .getWriteChunkRequest(container.getPipeline(), blockID, + data.length); + client.sendCommand(writeChunkRequest); + try { + // since there is neither explicit putKey request made for the block, + // nor the container is closed, GetCommittedBlockLength request + // should fail here. + response = ContainerProtocolCalls + .getCommittedBlockLength(client, blockID, traceID); + Assert.fail("Expected exception not thrown"); + } catch (StorageContainerException sce) { + Assert.assertTrue(sce.getMessage().contains("Unable to find the key")); + } + // Now, explicitly make a putKey request for the block. + ContainerProtos.ContainerCommandRequestProto putKeyRequest = + ContainerTestHelper + .getPutKeyRequest(pipeline, writeChunkRequest.getWriteChunk()); + client.sendCommand(putKeyRequest); + response = ContainerProtocolCalls + .getCommittedBlockLength(client, blockID, traceID); + // make sure the block ids in the request and response are same. + Assert.assertTrue( + BlockID.getFromProtobuf(response.getBlockID()).equals(blockID)); + Assert.assertTrue(response.getBlockLength() == data.length); + xceiverClientManager.releaseClient(client); + } + + @Test + public void tesGetCommittedBlockLengthWithClosedContainer() + throws Exception { + String traceID = UUID.randomUUID().toString(); + ContainerWithPipeline container = storageContainerLocationClient + .allocateContainer(xceiverClientManager.getType(), + HddsProtos.ReplicationFactor.ONE, containerOwner); + long containerID = container.getContainerInfo().getContainerID(); + Pipeline pipeline = container.getPipeline(); + XceiverClientSpi client = + xceiverClientManager.acquireClient(pipeline, containerID); + // create the container + ContainerProtocolCalls.createContainer(client, containerID, traceID); + + byte[] data = + RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes(); + BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); + ContainerProtos.ContainerCommandRequestProto writeChunkRequest = + ContainerTestHelper + .getWriteChunkRequest(container.getPipeline(), blockID, + data.length); + client.sendCommand(writeChunkRequest); + // close the container + ContainerProtocolCalls.closeContainer(client, containerID, traceID); + ContainerProtos.GetCommittedBlockLengthResponseProto response = + ContainerProtocolCalls + .getCommittedBlockLength(client, blockID, traceID); + // make sure the block ids in the request and response are same. + // This will also ensure that closing the container committed the block + // on the Datanodes. + Assert.assertTrue( + BlockID.getFromProtobuf(response.getBlockID()).equals(blockID)); + Assert.assertTrue(response.getBlockLength() == data.length); + xceiverClientManager.releaseClient(client); + } + + @Test + public void tesGetCommittedBlockLengthForInvalidBlock() throws Exception { + String traceID = UUID.randomUUID().toString(); + ContainerWithPipeline container = storageContainerLocationClient + .allocateContainer(xceiverClientManager.getType(), + HddsProtos.ReplicationFactor.ONE, containerOwner); + long containerID = container.getContainerInfo().getContainerID(); + XceiverClientSpi client = xceiverClientManager + .acquireClient(container.getPipeline(), containerID); + ContainerProtocolCalls.createContainer(client, containerID, traceID); + + BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); + // move the container to closed state + ContainerProtocolCalls.closeContainer(client, containerID, traceID); + try { + // There is no block written inside the container. The request should + // fail. + ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID); + Assert.fail("Expected exception not thrown"); + } catch (StorageContainerException sce) { + Assert.assertTrue(sce.getMessage().contains("Unable to find the key")); + } + xceiverClientManager.releaseClient(client); + } +} \ No newline at end of file