From 398d89554398a38ffa1347524286cd437f94f3ae Mon Sep 17 00:00:00 2001 From: Mukul Kumar Singh Date: Fri, 10 Aug 2018 23:45:56 +0530 Subject: [PATCH] HDDS-339. Add block length and blockId in PutKeyResponse. Contributed by Shashikant Banerjee. --- .../proto/DatanodeContainerProtocol.proto | 1 + .../container/keyvalue/KeyValueHandler.java | 18 ++++--- .../container/keyvalue/helpers/KeyUtils.java | 50 +++++++++++++++---- .../keyvalue/impl/KeyManagerImpl.java | 4 +- .../keyvalue/interfaces/KeyManager.java | 3 +- ...TestGetCommittedBlockLengthAndPutKey.java} | 40 ++++++++++++++- 6 files changed, 98 insertions(+), 18 deletions(-) rename hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/{TestCommittedBlockLengthAPI.java => TestGetCommittedBlockLengthAndPutKey.java} (83%) diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index af0634672e..930f314515 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -308,6 +308,7 @@ message PutKeyRequestProto { } message PutKeyResponseProto { + required GetCommittedBlockLengthResponseProto committedBlockLength = 1; } message GetKeyRequestProto { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index f4699dde8c..8364a77508 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -421,6 +421,7 @@ ContainerCommandResponseProto handleCloseContainer( ContainerCommandResponseProto handlePutKey( ContainerCommandRequestProto request, KeyValueContainer kvContainer) { + long blockLength; if (!request.hasPutKey()) { LOG.debug("Malformed Put Key request. trace ID: {}", request.getTraceID()); @@ -433,7 +434,7 @@ ContainerCommandResponseProto handlePutKey( KeyData keyData = KeyData.getFromProtoBuf( request.getPutKey().getKeyData()); long numBytes = keyData.getProtoBufMessage().toByteArray().length; - commitKey(keyData, kvContainer); + blockLength = commitKey(keyData, kvContainer); metrics.incContainerBytesStats(Type.PutKey, numBytes); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); @@ -443,7 +444,7 @@ ContainerCommandResponseProto handlePutKey( request); } - return KeyUtils.getKeyResponseSuccess(request); + return KeyUtils.putKeyResponseSuccess(request, blockLength); } private void commitPendingKeys(KeyValueContainer kvContainer) @@ -456,12 +457,13 @@ private void commitPendingKeys(KeyValueContainer kvContainer) } } - private void commitKey(KeyData keyData, KeyValueContainer kvContainer) + private long commitKey(KeyData keyData, KeyValueContainer kvContainer) throws IOException { Preconditions.checkNotNull(keyData); - keyManager.putKey(kvContainer, keyData); + long length = keyManager.putKey(kvContainer, keyData); //update the open key Map in containerManager this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID()); + return length; } /** * Handle Get Key operation. Calls KeyManager to process the request. @@ -662,8 +664,12 @@ ContainerCommandResponseProto handleWriteChunk( request.getWriteChunk().getStage() == Stage.COMBINED) { metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk() .getChunkData().getLen()); - // the openContainerBlockMap should be updated only while writing data - // not during COMMIT_STAGE of handling write chunk request. + } + + if (request.getWriteChunk().getStage() == Stage.COMMIT_DATA + || request.getWriteChunk().getStage() == Stage.COMBINED) { + // the openContainerBlockMap should be updated only during + // COMMIT_STAGE of handling write chunk request. openContainerBlockMap.addChunk(blockID, chunkInfoProto); } } catch (StorageContainerException ex) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java index 2be966d23b..a83d298d77 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java @@ -27,6 +27,10 @@ .ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .GetKeyResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. + GetCommittedBlockLengthResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. + PutKeyResponseProto; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; @@ -122,6 +126,26 @@ public static KeyData getKeyData(byte[] bytes) throws IOException { } } + /** + * Returns putKey response success. + * @param msg - Request. + * @return Response. + */ + public static ContainerCommandResponseProto putKeyResponseSuccess( + ContainerCommandRequestProto msg, long blockLength) { + GetCommittedBlockLengthResponseProto.Builder + committedBlockLengthResponseBuilder = + getCommittedBlockLengthResponseBuilder(blockLength, + msg.getPutKey().getKeyData().getBlockID()); + PutKeyResponseProto.Builder putKeyResponse = + PutKeyResponseProto.newBuilder(); + putKeyResponse + .setCommittedBlockLength(committedBlockLengthResponseBuilder); + ContainerProtos.ContainerCommandResponseProto.Builder builder = + ContainerUtils.getSuccessResponseBuilder(msg); + builder.setPutKey(putKeyResponse); + return builder.build(); + } /** * Returns successful keyResponse. * @param msg - Request. @@ -150,18 +174,26 @@ public static ContainerCommandResponseProto getKeyDataResponse( * @param msg - Request. * @return Response. */ - public static ContainerProtos.ContainerCommandResponseProto - getBlockLengthResponse(ContainerProtos. - ContainerCommandRequestProto msg, long blockLength) { + public static ContainerCommandResponseProto getBlockLengthResponse( + ContainerCommandRequestProto msg, long blockLength) { + GetCommittedBlockLengthResponseProto.Builder + committedBlockLengthResponseBuilder = + getCommittedBlockLengthResponseBuilder(blockLength, + msg.getGetCommittedBlockLength().getBlockID()); + ContainerProtos.ContainerCommandResponseProto.Builder builder = + ContainerUtils.getSuccessResponseBuilder(msg); + builder.setGetCommittedBlockLength(committedBlockLengthResponseBuilder); + return builder.build(); + } + + private static GetCommittedBlockLengthResponseProto.Builder + getCommittedBlockLengthResponseBuilder( + long blockLength, ContainerProtos.DatanodeBlockID blockID) { ContainerProtos.GetCommittedBlockLengthResponseProto.Builder getCommittedBlockLengthResponseBuilder = ContainerProtos. GetCommittedBlockLengthResponseProto.newBuilder(); getCommittedBlockLengthResponseBuilder.setBlockLength(blockLength); - getCommittedBlockLengthResponseBuilder - .setBlockID(msg.getGetCommittedBlockLength().getBlockID()); - ContainerProtos.ContainerCommandResponseProto.Builder builder = - ContainerUtils.getSuccessResponseBuilder(msg); - builder.setGetCommittedBlockLength(getCommittedBlockLengthResponseBuilder); - return builder.build(); + getCommittedBlockLengthResponseBuilder.setBlockID(blockID); + return getCommittedBlockLengthResponseBuilder; } } \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java index 58bf1f8ed0..6370f8eca4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java @@ -67,9 +67,10 @@ public KeyManagerImpl(Configuration conf) { * * @param container - Container for which key need to be added. * @param data - Key Data. + * @return length of the key. * @throws IOException */ - public void putKey(Container container, KeyData data) throws IOException { + public long putKey(Container container, KeyData data) throws IOException { Preconditions.checkNotNull(data, "KeyData cannot be null for put " + "operation."); Preconditions.checkState(data.getContainerID() >= 0, "Container Id " + @@ -87,6 +88,7 @@ public void putKey(Container container, KeyData data) throws IOException { // Increment keycount here container.getContainerData().incrKeyCount(); + return data.getSize(); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java index dad688e497..37871be92b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java @@ -35,9 +35,10 @@ public interface KeyManager { * * @param container - Container for which key need to be added. * @param data - Key Data. + * @return length of the Key. * @throws IOException */ - void putKey(Container container, KeyData data) throws IOException; + long putKey(Container container, KeyData data) throws IOException; /** * Gets an existing key. diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java similarity index 83% rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java index 3c6479f5a3..f82b0d389d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java @@ -52,7 +52,7 @@ /** * Test Container calls. */ -public class TestCommittedBlockLengthAPI { +public class TestGetCommittedBlockLengthAndPutKey { private static MiniOzoneCluster cluster; private static OzoneConfiguration ozoneConfig; @@ -213,4 +213,42 @@ public void testGetCommittedBlockLengthForOpenBlock() throws Exception { Assert.assertTrue(response.getBlockLength() == 1024); xceiverClientManager.releaseClient(client); } + + @Test + public void tesPutKeyResposne() throws Exception { + ContainerProtos.PutKeyResponseProto response; + String traceID = UUID.randomUUID().toString(); + ContainerWithPipeline container = storageContainerLocationClient + .allocateContainer(xceiverClientManager.getType(), + HddsProtos.ReplicationFactor.ONE, containerOwner); + long containerID = container.getContainerInfo().getContainerID(); + Pipeline pipeline = container.getPipeline(); + XceiverClientSpi client = + xceiverClientManager.acquireClient(pipeline, containerID); + //create the container + ContainerProtocolCalls.createContainer(client, containerID, traceID); + + BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); + byte[] data = + RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes(); + ContainerProtos.ContainerCommandRequestProto writeChunkRequest = + ContainerTestHelper + .getWriteChunkRequest(container.getPipeline(), blockID, + data.length); + client.sendCommand(writeChunkRequest); + // Now, explicitly make a putKey request for the block. + ContainerProtos.ContainerCommandRequestProto putKeyRequest = + ContainerTestHelper + .getPutKeyRequest(pipeline, writeChunkRequest.getWriteChunk()); + response = client.sendCommand(putKeyRequest).getPutKey(); + // make sure the block ids in the request and response are same. + // This will also ensure that closing the container committed the block + // on the Datanodes. + Assert.assertEquals(BlockID + .getFromProtobuf(response.getCommittedBlockLength().getBlockID()), + blockID); + Assert.assertEquals( + response.getCommittedBlockLength().getBlockLength(), data.length); + xceiverClientManager.releaseClient(client); + } } \ No newline at end of file