From 7367ff333bf332b300e0acd6e7501ce8139a1998 Mon Sep 17 00:00:00 2001 From: Nanda kumar Date: Tue, 9 Oct 2018 18:07:01 +0530 Subject: [PATCH] HDDS-450. Generate BlockCommitSequenceId in ContainerStateMachine for every commit operation in Ratis. Contributed by Shashikant Banerjee. --- .../hdds/scm/storage/ChunkOutputStream.java | 12 ++++++- .../scm/storage/ContainerProtocolCalls.java | 25 +++++++-------- .../container/common/helpers/BlockData.java | 12 +++++++ .../proto/DatanodeContainerProtocol.proto | 2 ++ .../server/ratis/ContainerStateMachine.java | 31 ++++++++++++++++--- .../keyvalue/helpers/BlockUtils.java | 5 ++- .../keyvalue/impl/BlockManagerImpl.java | 1 - .../client/io/ChunkGroupOutputStream.java | 16 +++++++--- .../ozone/om/helpers/OmKeyLocationInfo.java | 17 ++++++++-- .../src/main/proto/OzoneManagerProtocol.proto | 1 + .../TestGetCommittedBlockLengthAndPutKey.java | 4 ++- .../hadoop/ozone/om/KeyManagerImpl.java | 2 ++ 12 files changed, 99 insertions(+), 29 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java index 10b3bb5670..cc1ea8dbe9 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.storage; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.hdds.scm.XceiverClientManager; @@ -65,6 +66,7 @@ public class ChunkOutputStream extends OutputStream { private final String streamId; private int chunkIndex; private int chunkSize; + private long blockCommitSequenceId; /** * Creates a new ChunkOutputStream. @@ -93,12 +95,17 @@ public ChunkOutputStream(BlockID blockID, String key, this.buffer = ByteBuffer.allocate(chunkSize); this.streamId = UUID.randomUUID().toString(); this.chunkIndex = 0; + blockCommitSequenceId = 0; } public ByteBuffer getBuffer() { return buffer; } + public long getBlockCommitSequenceId() { + return blockCommitSequenceId; + } + @Override public void write(int b) throws IOException { checkOpen(); @@ -155,7 +162,10 @@ public void close() throws IOException { writeChunkToContainer(); } try { - putBlock(xceiverClient, containerBlockData.build(), traceID); + ContainerProtos.PutBlockResponseProto responseProto = + putBlock(xceiverClient, containerBlockData.build(), traceID); + blockCommitSequenceId = + responseProto.getCommittedBlockLength().getBlockCommitSequenceId(); } catch (IOException e) { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 6b7a328472..1df50b1650 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -141,24 +141,23 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, * @param xceiverClient client to perform call * @param containerBlockData block data to identify container * @param traceID container protocol call args + * @return putBlockResponse * @throws IOException if there is an I/O error while performing the call */ - public static void putBlock(XceiverClientSpi xceiverClient, - BlockData containerBlockData, String traceID) throws IOException { - PutBlockRequestProto.Builder createBlockRequest = PutBlockRequestProto - .newBuilder() - .setBlockData(containerBlockData); + public static ContainerProtos.PutBlockResponseProto putBlock( + XceiverClientSpi xceiverClient, BlockData containerBlockData, + String traceID) throws IOException { + PutBlockRequestProto.Builder createBlockRequest = + PutBlockRequestProto.newBuilder().setBlockData(containerBlockData); String id = xceiverClient.getPipeline().getLeader().getUuidString(); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.PutBlock) - .setContainerID(containerBlockData.getBlockID().getContainerID()) - .setTraceID(traceID) - .setDatanodeUuid(id) - .setPutBlock(createBlockRequest) - .build(); + ContainerCommandRequestProto request = + ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock) + .setContainerID(containerBlockData.getBlockID().getContainerID()) + .setTraceID(traceID).setDatanodeUuid(id) + .setPutBlock(createBlockRequest).build(); ContainerCommandResponseProto response = xceiverClient.sendCommand(request); validateContainerResponse(response); + return response.getPutBlock(); } /** diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java index 0c1d427775..87cf82460e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java @@ -34,6 +34,7 @@ public class BlockData { private final BlockID blockID; private final Map metadata; + private long blockCommitSequenceId; /** * Represent a list of chunks. @@ -64,6 +65,15 @@ public BlockData(BlockID blockID) { this.blockID = blockID; this.metadata = new TreeMap<>(); this.size = 0; + blockCommitSequenceId = 0; + } + + public long getBlockCommitSequenceId() { + return blockCommitSequenceId; + } + + public void setBlockCommitSequenceId(long blockCommitSequenceId) { + this.blockCommitSequenceId = blockCommitSequenceId; } /** @@ -85,6 +95,7 @@ public static BlockData getFromProtoBuf(ContainerProtos.BlockData data) throws if (data.hasSize()) { Preconditions.checkArgument(data.getSize() == blockData.getSize()); } + blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId()); return blockData; } @@ -104,6 +115,7 @@ public ContainerProtos.BlockData getProtoBufMessage() { } builder.addAllChunks(getChunks()); builder.setSize(size); + builder.setBlockCommitSequenceId(blockCommitSequenceId); return builder.build(); } diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index 7be8a62c29..456775019c 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -300,6 +300,7 @@ message BlockData { repeated KeyValue metadata = 3; repeated ChunkInfo chunks = 4; optional int64 size = 5; + optional uint64 blockCommitSequenceId = 6; } // Block Messages. @@ -331,6 +332,7 @@ message GetCommittedBlockLengthRequestProto { message GetCommittedBlockLengthResponseProto { required DatanodeBlockID blockID = 1; required int64 blockLength = 2; + optional uint64 blockCommitSequenceId = 3; } message DeleteBlockResponseProto { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index a7bef86b22..f07c95ff05 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -21,6 +21,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.RaftServer; @@ -506,17 +508,36 @@ CompletableFuture handleStateMachineData( // on a container private CompletableFuture handlePutBlock( - ContainerCommandRequestProto requestProto) { + ContainerCommandRequestProto requestProto, long index) { List> futureList = new ArrayList<>(); - long localId = - requestProto.getPutBlock().getBlockData().getBlockID().getLocalID(); + BlockData blockData = null; + ContainerProtos.BlockData blockDataProto = + requestProto.getPutBlock().getBlockData(); + + // set the blockCommitSequenceId + try { + blockData = BlockData.getFromProtoBuf(blockDataProto); + } catch (IOException ioe) { + LOG.error("unable to retrieve blockData info for Block {}", + blockDataProto.getBlockID()); + return completeExceptionally(ioe); + } + blockData.setBlockCommitSequenceId(index); + final ContainerProtos.PutBlockRequestProto putBlockRequestProto = + ContainerProtos.PutBlockRequestProto + .newBuilder(requestProto.getPutBlock()) + .setBlockData(blockData.getProtoBufMessage()).build(); + ContainerCommandRequestProto containerCommandRequestProto = + ContainerCommandRequestProto.newBuilder(requestProto) + .setPutBlock(putBlockRequestProto).build(); + long localId = blockDataProto.getBlockID().getLocalID(); // Need not wait for create container future here as it has already // finished. if (block2ChunkMap.get(localId) != null) { futureList.addAll(block2ChunkMap.get(localId).getAll()); } CompletableFuture effectiveFuture = - runCommandAfterFutures(futureList, requestProto); + runCommandAfterFutures(futureList, containerCommandRequestProto); CompletableFuture putBlockFuture = effectiveFuture.thenApply(message -> { @@ -616,7 +637,7 @@ CompletableFuture executeContainerCommand( case CloseContainer: return handleCloseContainer(requestProto); case PutBlock: - return handlePutBlock(requestProto); + return handlePutBlock(requestProto, index); case CreateContainer: return handleCreateContainer(requestProto); default: diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java index f5cc847774..b25fec40c3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java @@ -133,10 +133,13 @@ public static BlockData getBlockData(byte[] bytes) throws IOException { */ public static ContainerCommandResponseProto putBlockResponseSuccess( ContainerCommandRequestProto msg, long blockLength) { + ContainerProtos.BlockData blockData = msg.getPutBlock().getBlockData(); GetCommittedBlockLengthResponseProto.Builder committedBlockLengthResponseBuilder = getCommittedBlockLengthResponseBuilder(blockLength, - msg.getPutBlock().getBlockData().getBlockID()); + blockData.getBlockID()); + committedBlockLengthResponseBuilder + .setBlockCommitSequenceId(blockData.getBlockCommitSequenceId()); PutBlockResponseProto.Builder putKeyResponse = PutBlockResponseProto.newBuilder(); putKeyResponse diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index 54c15fb268..0dd87392ee 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -85,7 +85,6 @@ public long putBlock(Container container, BlockData data) throws IOException { Preconditions.checkNotNull(db, "DB cannot be null here"); db.put(Longs.toByteArray(data.getLocalID()), data.getProtoBufMessage() .toByteArray()); - // Increment keycount here container.getContainerData().incrKeyCount(); return data.getSize(); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 3742a9a5d1..6580c2c9f1 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -117,13 +117,15 @@ public List getStreamEntries() { return streamEntries; } - public List getLocationInfoList() { + public List getLocationInfoList() throws IOException { List locationInfoList = new ArrayList<>(); for (ChunkOutputStreamEntry streamEntry : streamEntries) { OmKeyLocationInfo info = new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID) .setShouldCreateContainer(false) - .setLength(streamEntry.currentPosition).setOffset(0).build(); + .setLength(streamEntry.currentPosition).setOffset(0) + .setBlockCommitSequenceId(streamEntry.getBlockCommitSequenceId()) + .build(); locationInfoList.add(info); } return locationInfoList; @@ -153,8 +155,6 @@ public ChunkGroupOutputStream( this.chunkSize = chunkSize; this.requestID = requestId; this.retryPolicy = retryPolicy; - LOG.debug("Expecting open key with one block, but got" + - info.getKeyLocationVersions().size()); } /** @@ -708,6 +708,14 @@ ByteBuffer getBuffer() throws IOException { throw new IOException("Invalid Output Stream for Key: " + key); } + long getBlockCommitSequenceId() throws IOException { + if (this.outputStream instanceof ChunkOutputStream) { + ChunkOutputStream out = (ChunkOutputStream) this.outputStream; + return out.getBlockCommitSequenceId(); + } + throw new IOException("Invalid Output Stream for Key: " + key); + } + public void cleanup() { checkStream(); if (this.outputStream instanceof ChunkOutputStream) { diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java index 79b3c82b2d..ada3567530 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java @@ -31,13 +31,15 @@ public final class OmKeyLocationInfo { private final long offset; // the version number indicating when this block was added private long createVersion; + private final long blockCommitSequenceId; private OmKeyLocationInfo(BlockID blockID, boolean shouldCreateContainer, - long length, long offset) { + long length, long offset, long blockCommitSequenceId) { this.blockID = blockID; this.shouldCreateContainer = shouldCreateContainer; this.length = length; this.offset = offset; + this.blockCommitSequenceId = blockCommitSequenceId; } public void setCreateVersion(long version) { @@ -84,6 +86,7 @@ public static class Builder { private boolean shouldCreateContainer; private long length; private long offset; + private long blockCommitSequenceId; public Builder setBlockID(BlockID blockId) { this.blockID = blockId; @@ -105,9 +108,14 @@ public Builder setOffset(long off) { return this; } + public Builder setBlockCommitSequenceId(long sequenceId) { + this.blockCommitSequenceId = sequenceId; + return this; + } + public OmKeyLocationInfo build() { return new OmKeyLocationInfo(blockID, - shouldCreateContainer, length, offset); + shouldCreateContainer, length, offset, blockCommitSequenceId); } } @@ -118,6 +126,7 @@ public KeyLocation getProtobuf() { .setLength(length) .setOffset(offset) .setCreateVersion(createVersion) + .setBlockCommitSequenceId(blockCommitSequenceId) .build(); } @@ -126,7 +135,8 @@ public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) { BlockID.getFromProtobuf(keyLocation.getBlockID()), keyLocation.getShouldCreateContainer(), keyLocation.getLength(), - keyLocation.getOffset()); + keyLocation.getOffset(), + keyLocation.getBlockCommitSequenceId()); info.setCreateVersion(keyLocation.getCreateVersion()); return info; } @@ -138,6 +148,7 @@ public String toString() { ", shouldCreateContainer=" + shouldCreateContainer + ", length=" + length + ", offset=" + offset + + ", blockCommitSequenceId=" + blockCommitSequenceId + ", createVersion=" + createVersion + '}'; } } diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index 975c790f78..823f36bb86 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -244,6 +244,7 @@ message KeyLocation { required uint64 length = 4; // indicated at which version this block gets created. optional uint64 createVersion = 5; + optional uint64 blockCommitSequenceId = 6; } message KeyLocationList { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java index 08e780855d..42047aaaec 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java @@ -219,7 +219,7 @@ public void tesPutKeyResposne() throws Exception { ContainerProtos.PutBlockResponseProto response; String traceID = UUID.randomUUID().toString(); ContainerWithPipeline container = storageContainerLocationClient - .allocateContainer(xceiverClientManager.getType(), + .allocateContainer(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE, containerOwner); long containerID = container.getContainerInfo().getContainerID(); Pipeline pipeline = container.getPipeline(); @@ -249,6 +249,8 @@ public void tesPutKeyResposne() throws Exception { blockID); Assert.assertEquals( response.getCommittedBlockLength().getBlockLength(), data.length); + Assert.assertTrue( + response.getCommittedBlockLength().getBlockCommitSequenceId() > 0); xceiverClientManager.releaseClient(client); } } \ No newline at end of file diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 41b391a492..e035eb21ef 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -172,6 +172,7 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) .setBlockID(allocatedBlock.getBlockID()) .setShouldCreateContainer(allocatedBlock.getCreateContainer()) .setLength(scmBlockSize) + .setBlockCommitSequenceId(0) .setOffset(0) .build(); // current version not committed, so new blocks coming now are added to @@ -236,6 +237,7 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { .setBlockID(allocatedBlock.getBlockID()) .setShouldCreateContainer(allocatedBlock.getCreateContainer()) .setLength(allocateSize) + .setBlockCommitSequenceId(0) .setOffset(0) .build(); locations.add(subKeyInfo);