HDDS-339. Add block length and blockId in PutKeyResponse. Contributed by Shashikant Banerjee.

This commit is contained in:
Mukul Kumar Singh 2018-08-10 23:45:56 +05:30
parent 15241c6349
commit 398d895543
6 changed files with 98 additions and 18 deletions

View File

@ -308,6 +308,7 @@ message PutKeyRequestProto {
} }
message PutKeyResponseProto { message PutKeyResponseProto {
required GetCommittedBlockLengthResponseProto committedBlockLength = 1;
} }
message GetKeyRequestProto { message GetKeyRequestProto {

View File

@ -421,6 +421,7 @@ ContainerCommandResponseProto handleCloseContainer(
ContainerCommandResponseProto handlePutKey( ContainerCommandResponseProto handlePutKey(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) { ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
long blockLength;
if (!request.hasPutKey()) { if (!request.hasPutKey()) {
LOG.debug("Malformed Put Key request. trace ID: {}", LOG.debug("Malformed Put Key request. trace ID: {}",
request.getTraceID()); request.getTraceID());
@ -433,7 +434,7 @@ ContainerCommandResponseProto handlePutKey(
KeyData keyData = KeyData.getFromProtoBuf( KeyData keyData = KeyData.getFromProtoBuf(
request.getPutKey().getKeyData()); request.getPutKey().getKeyData());
long numBytes = keyData.getProtoBufMessage().toByteArray().length; long numBytes = keyData.getProtoBufMessage().toByteArray().length;
commitKey(keyData, kvContainer); blockLength = commitKey(keyData, kvContainer);
metrics.incContainerBytesStats(Type.PutKey, numBytes); metrics.incContainerBytesStats(Type.PutKey, numBytes);
} catch (StorageContainerException ex) { } catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request); return ContainerUtils.logAndReturnError(LOG, ex, request);
@ -443,7 +444,7 @@ ContainerCommandResponseProto handlePutKey(
request); request);
} }
return KeyUtils.getKeyResponseSuccess(request); return KeyUtils.putKeyResponseSuccess(request, blockLength);
} }
private void commitPendingKeys(KeyValueContainer kvContainer) private void commitPendingKeys(KeyValueContainer kvContainer)
@ -456,12 +457,13 @@ private void commitPendingKeys(KeyValueContainer kvContainer)
} }
} }
private void commitKey(KeyData keyData, KeyValueContainer kvContainer) private long commitKey(KeyData keyData, KeyValueContainer kvContainer)
throws IOException { throws IOException {
Preconditions.checkNotNull(keyData); Preconditions.checkNotNull(keyData);
keyManager.putKey(kvContainer, keyData); long length = keyManager.putKey(kvContainer, keyData);
//update the open key Map in containerManager //update the open key Map in containerManager
this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID()); this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID());
return length;
} }
/** /**
* Handle Get Key operation. Calls KeyManager to process the request. * Handle Get Key operation. Calls KeyManager to process the request.
@ -662,8 +664,12 @@ ContainerCommandResponseProto handleWriteChunk(
request.getWriteChunk().getStage() == Stage.COMBINED) { request.getWriteChunk().getStage() == Stage.COMBINED) {
metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk() metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk()
.getChunkData().getLen()); .getChunkData().getLen());
// the openContainerBlockMap should be updated only while writing data }
// not during COMMIT_STAGE of handling write chunk request.
if (request.getWriteChunk().getStage() == Stage.COMMIT_DATA
|| request.getWriteChunk().getStage() == Stage.COMBINED) {
// the openContainerBlockMap should be updated only during
// COMMIT_STAGE of handling write chunk request.
openContainerBlockMap.addChunk(blockID, chunkInfoProto); openContainerBlockMap.addChunk(blockID, chunkInfoProto);
} }
} catch (StorageContainerException ex) { } catch (StorageContainerException ex) {

View File

@ -27,6 +27,10 @@
.ContainerCommandResponseProto; .ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetKeyResponseProto; .GetKeyResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
GetCommittedBlockLengthResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
PutKeyResponseProto;
import org.apache.hadoop.hdds.scm.container.common.helpers import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException; .StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
@ -122,6 +126,26 @@ public static KeyData getKeyData(byte[] bytes) throws IOException {
} }
} }
/**
* Returns putKey response success.
* @param msg - Request.
* @return Response.
*/
public static ContainerCommandResponseProto putKeyResponseSuccess(
ContainerCommandRequestProto msg, long blockLength) {
GetCommittedBlockLengthResponseProto.Builder
committedBlockLengthResponseBuilder =
getCommittedBlockLengthResponseBuilder(blockLength,
msg.getPutKey().getKeyData().getBlockID());
PutKeyResponseProto.Builder putKeyResponse =
PutKeyResponseProto.newBuilder();
putKeyResponse
.setCommittedBlockLength(committedBlockLengthResponseBuilder);
ContainerProtos.ContainerCommandResponseProto.Builder builder =
ContainerUtils.getSuccessResponseBuilder(msg);
builder.setPutKey(putKeyResponse);
return builder.build();
}
/** /**
* Returns successful keyResponse. * Returns successful keyResponse.
* @param msg - Request. * @param msg - Request.
@ -150,18 +174,26 @@ public static ContainerCommandResponseProto getKeyDataResponse(
* @param msg - Request. * @param msg - Request.
* @return Response. * @return Response.
*/ */
public static ContainerProtos.ContainerCommandResponseProto public static ContainerCommandResponseProto getBlockLengthResponse(
getBlockLengthResponse(ContainerProtos. ContainerCommandRequestProto msg, long blockLength) {
ContainerCommandRequestProto msg, long blockLength) { GetCommittedBlockLengthResponseProto.Builder
committedBlockLengthResponseBuilder =
getCommittedBlockLengthResponseBuilder(blockLength,
msg.getGetCommittedBlockLength().getBlockID());
ContainerProtos.ContainerCommandResponseProto.Builder builder =
ContainerUtils.getSuccessResponseBuilder(msg);
builder.setGetCommittedBlockLength(committedBlockLengthResponseBuilder);
return builder.build();
}
private static GetCommittedBlockLengthResponseProto.Builder
getCommittedBlockLengthResponseBuilder(
long blockLength, ContainerProtos.DatanodeBlockID blockID) {
ContainerProtos.GetCommittedBlockLengthResponseProto.Builder ContainerProtos.GetCommittedBlockLengthResponseProto.Builder
getCommittedBlockLengthResponseBuilder = ContainerProtos. getCommittedBlockLengthResponseBuilder = ContainerProtos.
GetCommittedBlockLengthResponseProto.newBuilder(); GetCommittedBlockLengthResponseProto.newBuilder();
getCommittedBlockLengthResponseBuilder.setBlockLength(blockLength); getCommittedBlockLengthResponseBuilder.setBlockLength(blockLength);
getCommittedBlockLengthResponseBuilder getCommittedBlockLengthResponseBuilder.setBlockID(blockID);
.setBlockID(msg.getGetCommittedBlockLength().getBlockID()); return getCommittedBlockLengthResponseBuilder;
ContainerProtos.ContainerCommandResponseProto.Builder builder =
ContainerUtils.getSuccessResponseBuilder(msg);
builder.setGetCommittedBlockLength(getCommittedBlockLengthResponseBuilder);
return builder.build();
} }
} }

View File

@ -67,9 +67,10 @@ public KeyManagerImpl(Configuration conf) {
* *
* @param container - Container for which key need to be added. * @param container - Container for which key need to be added.
* @param data - Key Data. * @param data - Key Data.
* @return length of the key.
* @throws IOException * @throws IOException
*/ */
public void putKey(Container container, KeyData data) throws IOException { public long putKey(Container container, KeyData data) throws IOException {
Preconditions.checkNotNull(data, "KeyData cannot be null for put " + Preconditions.checkNotNull(data, "KeyData cannot be null for put " +
"operation."); "operation.");
Preconditions.checkState(data.getContainerID() >= 0, "Container Id " + Preconditions.checkState(data.getContainerID() >= 0, "Container Id " +
@ -87,6 +88,7 @@ public void putKey(Container container, KeyData data) throws IOException {
// Increment keycount here // Increment keycount here
container.getContainerData().incrKeyCount(); container.getContainerData().incrKeyCount();
return data.getSize();
} }
/** /**

View File

@ -35,9 +35,10 @@ public interface KeyManager {
* *
* @param container - Container for which key need to be added. * @param container - Container for which key need to be added.
* @param data - Key Data. * @param data - Key Data.
* @return length of the Key.
* @throws IOException * @throws IOException
*/ */
void putKey(Container container, KeyData data) throws IOException; long putKey(Container container, KeyData data) throws IOException;
/** /**
* Gets an existing key. * Gets an existing key.

View File

@ -52,7 +52,7 @@
/** /**
* Test Container calls. * Test Container calls.
*/ */
public class TestCommittedBlockLengthAPI { public class TestGetCommittedBlockLengthAndPutKey {
private static MiniOzoneCluster cluster; private static MiniOzoneCluster cluster;
private static OzoneConfiguration ozoneConfig; private static OzoneConfiguration ozoneConfig;
@ -213,4 +213,42 @@ public void testGetCommittedBlockLengthForOpenBlock() throws Exception {
Assert.assertTrue(response.getBlockLength() == 1024); Assert.assertTrue(response.getBlockLength() == 1024);
xceiverClientManager.releaseClient(client); xceiverClientManager.releaseClient(client);
} }
@Test
public void tesPutKeyResposne() throws Exception {
ContainerProtos.PutKeyResponseProto response;
String traceID = UUID.randomUUID().toString();
ContainerWithPipeline container = storageContainerLocationClient
.allocateContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
long containerID = container.getContainerInfo().getContainerID();
Pipeline pipeline = container.getPipeline();
XceiverClientSpi client =
xceiverClientManager.acquireClient(pipeline, containerID);
//create the container
ContainerProtocolCalls.createContainer(client, containerID, traceID);
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
byte[] data =
RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
ContainerTestHelper
.getWriteChunkRequest(container.getPipeline(), blockID,
data.length);
client.sendCommand(writeChunkRequest);
// Now, explicitly make a putKey request for the block.
ContainerProtos.ContainerCommandRequestProto putKeyRequest =
ContainerTestHelper
.getPutKeyRequest(pipeline, writeChunkRequest.getWriteChunk());
response = client.sendCommand(putKeyRequest).getPutKey();
// make sure the block ids in the request and response are same.
// This will also ensure that closing the container committed the block
// on the Datanodes.
Assert.assertEquals(BlockID
.getFromProtobuf(response.getCommittedBlockLength().getBlockID()),
blockID);
Assert.assertEquals(
response.getCommittedBlockLength().getBlockLength(), data.length);
xceiverClientManager.releaseClient(client);
}
} }