diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java index 4547163d5d..4e881c434c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.storage; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.commons.codec.digest.DigestUtils; @@ -57,7 +58,7 @@ */ public class ChunkOutputStream extends OutputStream { - private final BlockID blockID; + private BlockID blockID; private final String key; private final String traceID; private final BlockData.Builder containerBlockData; @@ -67,7 +68,6 @@ public class ChunkOutputStream extends OutputStream { private final String streamId; private int chunkIndex; private int chunkSize; - private long blockCommitSequenceId; /** * Creates a new ChunkOutputStream. @@ -96,15 +96,14 @@ public ChunkOutputStream(BlockID blockID, String key, this.buffer = ByteBuffer.allocate(chunkSize); this.streamId = UUID.randomUUID().toString(); this.chunkIndex = 0; - blockCommitSequenceId = 0; } public ByteBuffer getBuffer() { return buffer; } - public long getBlockCommitSequenceId() { - return blockCommitSequenceId; + public BlockID getBlockID() { + return blockID; } @Override @@ -165,8 +164,12 @@ public void close() throws IOException { try { ContainerProtos.PutBlockResponseProto responseProto = putBlock(xceiverClient, containerBlockData.build(), traceID); - blockCommitSequenceId = - responseProto.getCommittedBlockLength().getBlockCommitSequenceId(); + BlockID responseBlockID = BlockID.getFromProtobuf( + responseProto.getCommittedBlockLength().getBlockID()); + Preconditions.checkState(blockID.getContainerBlockID() + .equals(responseBlockID.getContainerBlockID())); + // updates the bcsId of the block + blockID = responseBlockID; } catch (IOException e) { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java index 81497406d6..a863437891 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java @@ -23,52 +23,88 @@ import java.util.Objects; /** - * BlockID of ozone (containerID localID). + * BlockID of Ozone (containerID + localID + blockCommitSequenceId). */ + public class BlockID { - private long containerID; - private long localID; + + private ContainerBlockID containerBlockID; + private long blockCommitSequenceId; public BlockID(long containerID, long localID) { - this.containerID = containerID; - this.localID = localID; + this(containerID, localID, 0); + } + + private BlockID(long containerID, long localID, long bcsID) { + containerBlockID = new ContainerBlockID(containerID, localID); + blockCommitSequenceId = bcsID; + } + + public BlockID(ContainerBlockID containerBlockID) { + this(containerBlockID, 0); + } + + private BlockID(ContainerBlockID containerBlockID, long bcsId) { + this.containerBlockID = containerBlockID; + blockCommitSequenceId = bcsId; } public long getContainerID() { - return containerID; + return containerBlockID.getContainerID(); } public long getLocalID() { - return localID; + return containerBlockID.getLocalID(); + } + + public long getBlockCommitSequenceId() { + return blockCommitSequenceId; + } + + public void setBlockCommitSequenceId(long blockCommitSequenceId) { + this.blockCommitSequenceId = blockCommitSequenceId; + } + + public ContainerBlockID getContainerBlockID() { + return containerBlockID; + } + + public void setContainerBlockID(ContainerBlockID containerBlockID) { + this.containerBlockID = containerBlockID; } @Override public String toString() { - return new ToStringBuilder(this). - append("containerID", containerID). - append("localID", localID). - toString(); - } - - public HddsProtos.BlockID getProtobuf() { - return HddsProtos.BlockID.newBuilder(). - setContainerID(containerID).setLocalID(localID).build(); - } - - public static BlockID getFromProtobuf(HddsProtos.BlockID blockID) { - return new BlockID(blockID.getContainerID(), - blockID.getLocalID()); + return new ToStringBuilder(this) + .append("containerID", containerBlockID.getContainerID()) + .append("localID", containerBlockID.getLocalID()) + .append("blockCommitSequenceId", blockCommitSequenceId) + .toString(); } public ContainerProtos.DatanodeBlockID getDatanodeBlockIDProtobuf() { return ContainerProtos.DatanodeBlockID.newBuilder(). - setContainerID(containerID).setLocalID(localID).build(); + setContainerID(containerBlockID.getContainerID()) + .setLocalID(containerBlockID.getLocalID()) + .setBlockCommitSequenceId(blockCommitSequenceId).build(); } public static BlockID getFromProtobuf( ContainerProtos.DatanodeBlockID blockID) { return new BlockID(blockID.getContainerID(), - blockID.getLocalID()); + blockID.getLocalID(), blockID.getBlockCommitSequenceId()); + } + + public HddsProtos.BlockID getProtobuf() { + return HddsProtos.BlockID.newBuilder() + .setContainerBlockID(containerBlockID.getProtobuf()) + .setBlockCommitSequenceId(blockCommitSequenceId).build(); + } + + public static BlockID getFromProtobuf(HddsProtos.BlockID blockID) { + return new BlockID( + ContainerBlockID.getFromProtobuf(blockID.getContainerBlockID()), + blockID.getBlockCommitSequenceId()); } @Override @@ -80,11 +116,14 @@ public boolean equals(Object o) { return false; } BlockID blockID = (BlockID) o; - return containerID == blockID.containerID && localID == blockID.localID; + return containerBlockID.equals(blockID.getContainerBlockID()) + && blockCommitSequenceId == blockID.getBlockCommitSequenceId(); } @Override public int hashCode() { - return Objects.hash(containerID, localID); + return Objects + .hash(containerBlockID.getContainerID(), containerBlockID.getLocalID(), + blockCommitSequenceId); } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ContainerBlockID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ContainerBlockID.java new file mode 100644 index 0000000000..82084f22a9 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ContainerBlockID.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.client; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + +import java.util.Objects; + +/** + * BlockID returned by SCM during allocation of block (containerID + localID). + */ +public class ContainerBlockID { + private long containerID; + private long localID; + + public ContainerBlockID(long containerID, long localID) { + this.containerID = containerID; + this.localID = localID; + } + + public long getContainerID() { + return containerID; + } + + public long getLocalID() { + return localID; + } + + @Override + public String toString() { + return new ToStringBuilder(this). + append("containerID", containerID). + append("localID", localID). + toString(); + } + + public HddsProtos.ContainerBlockID getProtobuf() { + return HddsProtos.ContainerBlockID.newBuilder(). + setContainerID(containerID).setLocalID(localID).build(); + } + + public static ContainerBlockID getFromProtobuf( + HddsProtos.ContainerBlockID containerBlockID) { + return new ContainerBlockID(containerBlockID.getContainerID(), + containerBlockID.getLocalID()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ContainerBlockID blockID = (ContainerBlockID) o; + return containerID == blockID.containerID && localID == blockID.localID; + } + + @Override + public int hashCode() { + return Objects.hash(containerID, localID); + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java index f657b7459b..93af56dd77 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hdds.scm.container.common.helpers; +import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.client.BlockID; /** * Allocated block wraps the result returned from SCM#allocateBlock which @@ -27,7 +27,7 @@ */ public final class AllocatedBlock { private Pipeline pipeline; - private BlockID blockID; + private ContainerBlockID containerBlockID; // Indicates whether the client should create container before writing block. private boolean shouldCreateContainer; @@ -36,7 +36,7 @@ public final class AllocatedBlock { */ public static class Builder { private Pipeline pipeline; - private BlockID blockID; + private ContainerBlockID containerBlockID; private boolean shouldCreateContainer; public Builder setPipeline(Pipeline p) { @@ -44,8 +44,8 @@ public Builder setPipeline(Pipeline p) { return this; } - public Builder setBlockID(BlockID blockId) { - this.blockID = blockId; + public Builder setContainerBlockID(ContainerBlockID blockId) { + this.containerBlockID = blockId; return this; } @@ -55,14 +55,15 @@ public Builder setShouldCreateContainer(boolean shouldCreate) { } public AllocatedBlock build() { - return new AllocatedBlock(pipeline, blockID, shouldCreateContainer); + return new AllocatedBlock(pipeline, containerBlockID, + shouldCreateContainer); } } - private AllocatedBlock(Pipeline pipeline, BlockID blockID, + private AllocatedBlock(Pipeline pipeline, ContainerBlockID containerBlockID, boolean shouldCreateContainer) { this.pipeline = pipeline; - this.blockID = blockID; + this.containerBlockID = containerBlockID; this.shouldCreateContainer = shouldCreateContainer; } @@ -70,8 +71,8 @@ public Pipeline getPipeline() { return pipeline; } - public BlockID getBlockID() { - return blockID; + public ContainerBlockID getBlockID() { + return containerBlockID; } public boolean getCreateContainer() { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index e684ae3947..f868209179 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -20,7 +20,7 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -102,7 +102,8 @@ public AllocatedBlock allocateBlock(long size, response.getErrorMessage() : "Allocate block failed."); } AllocatedBlock.Builder builder = new AllocatedBlock.Builder() - .setBlockID(BlockID.getFromProtobuf(response.getBlockID())) + .setContainerBlockID( + ContainerBlockID.getFromProtobuf(response.getContainerBlockID())) .setPipeline(Pipeline.getFromProtobuf(response.getPipeline())) .setShouldCreateContainer(response.getCreateContainer()); return builder.build(); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index df1467b792..150b1d6bfe 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -81,17 +81,14 @@ private ContainerProtocolCalls() { * @param xceiverClient client to perform call * @param datanodeBlockID blockID to identify container * @param traceID container protocol call args - * @param blockCommitSequenceId latest commit Id of the block * @return container protocol get block response * @throws IOException if there is an I/O error while performing the call */ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, - DatanodeBlockID datanodeBlockID, String traceID, - long blockCommitSequenceId) throws IOException { + DatanodeBlockID datanodeBlockID, String traceID) throws IOException { GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto .newBuilder() - .setBlockID(datanodeBlockID) - .setBlockCommitSequenceId(blockCommitSequenceId); + .setBlockID(datanodeBlockID); String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto request = ContainerCommandRequestProto diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java index 7a5403f290..2f6030fdc8 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java @@ -61,7 +61,8 @@ public KeyBlocks getProto() { public static BlockGroup getFromProto(KeyBlocks proto) { List blockIDs = new ArrayList<>(); for (HddsProtos.BlockID block : proto.getBlocksList()) { - blockIDs.add(new BlockID(block.getContainerID(), block.getLocalID())); + blockIDs.add(new BlockID(block.getContainerBlockID().getContainerID(), + block.getContainerBlockID().getLocalID())); } return BlockGroup.newBuilder().setKeyName(proto.getKey()) .addAllBlockIDs(blockIDs).build(); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java index 87cf82460e..3b9e57c8c9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java @@ -34,7 +34,6 @@ public class BlockData { private final BlockID blockID; private final Map metadata; - private long blockCommitSequenceId; /** * Represent a list of chunks. @@ -65,15 +64,14 @@ public BlockData(BlockID blockID) { this.blockID = blockID; this.metadata = new TreeMap<>(); this.size = 0; - blockCommitSequenceId = 0; } public long getBlockCommitSequenceId() { - return blockCommitSequenceId; + return blockID.getBlockCommitSequenceId(); } public void setBlockCommitSequenceId(long blockCommitSequenceId) { - this.blockCommitSequenceId = blockCommitSequenceId; + this.blockID.setBlockCommitSequenceId(blockCommitSequenceId); } /** @@ -95,7 +93,6 @@ public static BlockData getFromProtoBuf(ContainerProtos.BlockData data) throws if (data.hasSize()) { Preconditions.checkArgument(data.getSize() == blockData.getSize()); } - blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId()); return blockData; } @@ -115,7 +112,6 @@ public ContainerProtos.BlockData getProtoBufMessage() { } builder.addAllChunks(getChunks()); builder.setSize(size); - builder.setBlockCommitSequenceId(blockCommitSequenceId); return builder.build(); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java index 37a13095f6..2ecf1f4239 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java @@ -76,7 +76,7 @@ public AllocateScmBlockResponseProto allocateScmBlock( if (allocatedBlock != null) { return AllocateScmBlockResponseProto.newBuilder() - .setBlockID(allocatedBlock.getBlockID().getProtobuf()) + .setContainerBlockID(allocatedBlock.getBlockID().getProtobuf()) .setPipeline(allocatedBlock.getPipeline().getProtobufMessage()) .setCreateContainer(allocatedBlock.getCreateContainer()) .setErrorCode(AllocateScmBlockResponseProto.Error.success) diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index f9262ba1bd..318ec09028 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -150,6 +150,7 @@ enum Result { message DatanodeBlockID { required int64 containerID = 1; required int64 localID = 2; + optional uint64 blockCommitSequenceId = 3 [default = 0]; } message KeyValue { @@ -303,7 +304,6 @@ message BlockData { repeated KeyValue metadata = 3; repeated ChunkInfo chunks = 4; optional int64 size = 5; - optional uint64 blockCommitSequenceId = 6; } // Block Messages. @@ -317,7 +317,6 @@ message PutBlockResponseProto { message GetBlockRequestProto { required DatanodeBlockID blockID = 1; - optional uint64 blockCommitSequenceId = 2 [default = 0]; } message GetBlockResponseProto { @@ -336,7 +335,6 @@ message GetCommittedBlockLengthRequestProto { message GetCommittedBlockLengthResponseProto { required DatanodeBlockID blockID = 1; required int64 blockLength = 2; - optional uint64 blockCommitSequenceId = 3 [default = 0]; } message DeleteBlockResponseProto { diff --git a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto index 01a0ddea38..dc68481faf 100644 --- a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto @@ -103,7 +103,7 @@ message AllocateScmBlockResponseProto { unknownFailure = 4; } required Error errorCode = 1; - optional BlockID blockID = 2; + optional ContainerBlockID containerBlockID = 2; optional hadoop.hdds.Pipeline pipeline = 3; optional bool createContainer = 4; optional string errorMessage = 5; diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index 9e813afc2e..62b4833f0c 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -186,7 +186,12 @@ enum ScmOps { /** * Block ID that uniquely identify a block by SCM. */ -message BlockID { +message ContainerBlockID { required int64 containerID = 1; required int64 localID = 2; } + +message BlockID { + required ContainerBlockID containerBlockID = 1; + optional uint64 blockCommitSequenceId = 2 [default = 0]; +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index de72d254cb..b0bc08bd78 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -483,8 +483,7 @@ ContainerCommandResponseProto handleGetBlock( try { BlockID blockID = BlockID.getFromProtobuf( request.getGetBlock().getBlockID()); - responseData = blockManager.getBlock(kvContainer, blockID, - request.getGetBlock().getBlockCommitSequenceId()); + responseData = blockManager.getBlock(kvContainer, blockID); long numBytes = responseData.getProtoBufMessage().toByteArray().length; metrics.incContainerBytesStats(Type.GetBlock, numBytes); @@ -759,7 +758,7 @@ ContainerCommandResponseProto handleGetSmallFile( .getBlockID()); // TODO: add bcsId as a part of getSmallFile transaction // by default its 0 - BlockData responseData = blockManager.getBlock(kvContainer, blockID, 0); + BlockData responseData = blockManager.getBlock(kvContainer, blockID); ContainerProtos.ChunkInfo chunkInfo = null; ByteString dataBuf = ByteString.EMPTY; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java index d6cadc8156..e085fb072a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java @@ -139,8 +139,6 @@ public static ContainerCommandResponseProto putBlockResponseSuccess( committedBlockLengthResponseBuilder = getCommittedBlockLengthResponseBuilder(blockLength, blockData.getBlockID()); - committedBlockLengthResponseBuilder - .setBlockCommitSequenceId(blockData.getBlockCommitSequenceId()); PutBlockResponseProto.Builder putKeyResponse = PutBlockResponseProto.newBuilder(); putKeyResponse diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index 31761891a8..e2e57006e7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -129,13 +129,13 @@ public long putBlock(Container container, BlockData data) throws IOException { * * @param container - Container from which block need to be fetched. * @param blockID - BlockID of the block. - * @param bcsId latest commit Id of the block * @return Key Data. * @throws IOException */ @Override - public BlockData getBlock(Container container, BlockID blockID, long bcsId) + public BlockData getBlock(Container container, BlockID blockID) throws IOException { + long bcsId = blockID.getBlockCommitSequenceId(); Preconditions.checkNotNull(blockID, "BlockID cannot be null in GetBlock request"); Preconditions.checkNotNull(blockID.getContainerID(), @@ -162,7 +162,7 @@ public BlockData getBlock(Container container, BlockID blockID, long bcsId) } ContainerProtos.BlockData blockData = ContainerProtos.BlockData.parseFrom(kData); - long id = blockData.getBlockCommitSequenceId(); + long id = blockData.getBlockID().getBlockCommitSequenceId(); if (id < bcsId) { throw new StorageContainerException( "bcsId " + bcsId + " mismatches with existing block Id " diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java index 8c865835b4..6812b0d8ff 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java @@ -45,11 +45,10 @@ public interface BlockManager { * * @param container - Container from which block need to be get. * @param blockID - BlockID of the Block. - * @param bcsId latest commit id of the block * @return Block Data. * @throws IOException */ - BlockData getBlock(Container container, BlockID blockID, long bcsId) + BlockData getBlock(Container container, BlockID blockID) throws IOException; /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestBlockManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestBlockManagerImpl.java index 65477d8135..6fe6d81ee4 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestBlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestBlockManagerImpl.java @@ -113,7 +113,7 @@ public void testPutAndGetBlock() throws Exception { assertEquals(1, keyValueContainer.getContainerData().getKeyCount()); //Get Block BlockData fromGetBlockData = blockManager.getBlock(keyValueContainer, - blockData.getBlockID(), 0); + blockData.getBlockID()); assertEquals(blockData.getContainerID(), fromGetBlockData.getContainerID()); assertEquals(blockData.getLocalID(), fromGetBlockData.getLocalID()); @@ -139,7 +139,7 @@ public void testDeleteBlock() throws Exception { assertEquals(0, keyValueContainer.getContainerData().getKeyCount()); try { - blockManager.getBlock(keyValueContainer, blockID, 0); + blockManager.getBlock(keyValueContainer, blockID); fail("testDeleteBlock"); } catch (StorageContainerException ex) { GenericTestUtils.assertExceptionContains( @@ -197,7 +197,7 @@ public void testGetNoSuchBlock() throws Exception { keyValueContainer.getContainerData().getKeyCount()); try { //Since the block has been deleted, we should not be able to find it - blockManager.getBlock(keyValueContainer, blockID, 0); + blockManager.getBlock(keyValueContainer, blockID); fail("testGetNoSuchBlock failed"); } catch (StorageContainerException ex) { GenericTestUtils.assertExceptionContains( diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index 681d021ec1..c878d9766a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -19,6 +19,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmUtils; @@ -318,7 +319,7 @@ private AllocatedBlock newBlock(ContainerWithPipeline containerWithPipeline, AllocatedBlock.Builder abb = new AllocatedBlock.Builder() - .setBlockID(new BlockID(containerID, localID)) + .setContainerBlockID(new ContainerBlockID(containerID, localID)) .setPipeline(containerWithPipeline.getPipeline()) .setShouldCreateContainer(createContainer); LOG.trace("New block allocated : {} Container ID: {}", localID, diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java index 0c09fc823c..56327566ae 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java @@ -297,8 +297,7 @@ public static LengthInputStream getFromOmKeyInfo( ContainerProtos.DatanodeBlockID datanodeBlockID = blockID .getDatanodeBlockIDProtobuf(); ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls - .getBlock(xceiverClient, datanodeBlockID, requestId, - omKeyLocationInfo.getBlockCommitSequenceId()); + .getBlock(xceiverClient, datanodeBlockID, requestId); List chunks = response.getBlockData().getChunksList(); for (ContainerProtos.ChunkInfo chunk : chunks) { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 6d13bb2b96..78d69c1d49 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -128,7 +128,6 @@ public List getLocationInfoList() throws IOException { new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID) .setShouldCreateContainer(false) .setLength(streamEntry.currentPosition).setOffset(0) - .setBlockCommitSequenceId(streamEntry.getBlockCommitSequenceId()) .build(); locationInfoList.add(info); } @@ -614,7 +613,7 @@ public Builder setRetryPolicy(RetryPolicy rPolicy) { private static class ChunkOutputStreamEntry extends OutputStream { private OutputStream outputStream; - private final BlockID blockID; + private BlockID blockID; private final String key; private final XceiverClientManager xceiverClientManager; private final XceiverClientSpi xceiverClient; @@ -700,6 +699,11 @@ public void flush() throws IOException { public void close() throws IOException { if (this.outputStream != null) { this.outputStream.close(); + // after closing the chunkOutPutStream, blockId would have been + // reconstructed with updated bcsId + if (this.outputStream instanceof ChunkOutputStream) { + this.blockID = ((ChunkOutputStream) outputStream).getBlockID(); + } } } @@ -711,19 +715,6 @@ ByteBuffer getBuffer() throws IOException { throw new IOException("Invalid Output Stream for Key: " + key); } - long getBlockCommitSequenceId() throws IOException { - if (this.outputStream instanceof ChunkOutputStream) { - ChunkOutputStream out = (ChunkOutputStream) this.outputStream; - return out.getBlockCommitSequenceId(); - } else if (outputStream == null) { - // For a pre allocated block for which no write has been initiated, - // the OutputStream will be null here. - // In such cases, the default blockCommitSequenceId will be 0 - return 0; - } - throw new IOException("Invalid Output Stream for Key: " + key); - } - public void cleanup() { checkStream(); if (this.outputStream instanceof ChunkOutputStream) { diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java index 9d54cea92a..d86153d7d8 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java @@ -31,15 +31,13 @@ public final class OmKeyLocationInfo { private final long offset; // the version number indicating when this block was added private long createVersion; - private final long blockCommitSequenceId; private OmKeyLocationInfo(BlockID blockID, boolean shouldCreateContainer, - long length, long offset, long blockCommitSequenceId) { + long length, long offset) { this.blockID = blockID; this.shouldCreateContainer = shouldCreateContainer; this.length = length; this.offset = offset; - this.blockCommitSequenceId = blockCommitSequenceId; } public void setCreateVersion(long version) { @@ -79,7 +77,7 @@ public long getOffset() { } public long getBlockCommitSequenceId() { - return blockCommitSequenceId; + return blockID.getBlockCommitSequenceId(); } /** @@ -90,7 +88,6 @@ public static class Builder { private boolean shouldCreateContainer; private long length; private long offset; - private long blockCommitSequenceId; public Builder setBlockID(BlockID blockId) { this.blockID = blockId; @@ -112,14 +109,9 @@ public Builder setOffset(long off) { return this; } - public Builder setBlockCommitSequenceId(long sequenceId) { - this.blockCommitSequenceId = sequenceId; - return this; - } - public OmKeyLocationInfo build() { return new OmKeyLocationInfo(blockID, - shouldCreateContainer, length, offset, blockCommitSequenceId); + shouldCreateContainer, length, offset); } } @@ -130,7 +122,6 @@ public KeyLocation getProtobuf() { .setLength(length) .setOffset(offset) .setCreateVersion(createVersion) - .setBlockCommitSequenceId(blockCommitSequenceId) .build(); } @@ -139,8 +130,7 @@ public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) { BlockID.getFromProtobuf(keyLocation.getBlockID()), keyLocation.getShouldCreateContainer(), keyLocation.getLength(), - keyLocation.getOffset(), - keyLocation.getBlockCommitSequenceId()); + keyLocation.getOffset()); info.setCreateVersion(keyLocation.getCreateVersion()); return info; } @@ -152,7 +142,6 @@ public String toString() { ", shouldCreateContainer=" + shouldCreateContainer + ", length=" + length + ", offset=" + offset + - ", blockCommitSequenceId=" + blockCommitSequenceId + ", createVersion=" + createVersion + '}'; } } diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index d36caceb11..8c4c40903b 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -252,7 +252,6 @@ message KeyLocation { required uint64 length = 4; // indicated at which version this block gets created. optional uint64 createVersion = 5; - optional uint64 blockCommitSequenceId = 6; } message KeyLocationList { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java index 1789e557f2..5153b41fe3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java @@ -151,7 +151,7 @@ public void testContainerReplication() throws Exception { .getHandler(ContainerType.KeyValueContainer); BlockData key = handler.getBlockManager() - .getBlock(container, BlockID.getFromProtobuf(blockID), 0); + .getBlock(container, BlockID.getFromProtobuf(blockID)); Assert.assertNotNull(key); Assert.assertEquals(1, key.getChunks().size()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java index 360b6830df..0ae63e3101 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java @@ -256,6 +256,6 @@ public void testCloseContainer() throws Exception { openContainerBlockMap.getBlockDataMap(testContainerID)); // Make sure the key got committed Assert.assertNotNull(handler.getBlockManager() - .getBlock(container, blockID, 0)); + .getBlock(container, blockID)); } } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index 293aac8587..f81ee577e5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -558,7 +558,7 @@ public void testPutBlock() throws IOException, NoSuchAlgorithmException { blockData.setChunks(chunkList); blockManager.putBlock(container, blockData); BlockData readBlockData = blockManager. - getBlock(container, blockData.getBlockID(), 0); + getBlock(container, blockData.getBlockID()); ChunkInfo readChunk = ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0)); Assert.assertEquals(info.getChecksum(), readChunk.getChecksum()); @@ -596,25 +596,27 @@ public void testPutBlockWithInvalidBCSId() blockManager.putBlock(container, blockData); BlockData readBlockData; try { + blockID1.setBlockCommitSequenceId(5); // read with bcsId higher than container bcsId blockManager. - getBlock(container, blockID1, 5); + getBlock(container, blockID1); Assert.fail("Expected exception not thrown"); } catch (StorageContainerException sce) { Assert.assertTrue(sce.getResult() == UNKNOWN_BCSID); } try { + blockID1.setBlockCommitSequenceId(4); // read with bcsId lower than container bcsId but greater than committed // bcsId. blockManager. - getBlock(container, blockID1, 4); + getBlock(container, blockID1); Assert.fail("Expected exception not thrown"); } catch (StorageContainerException sce) { Assert.assertTrue(sce.getResult() == BCSID_MISMATCH); } readBlockData = blockManager. - getBlock(container, blockData.getBlockID(), 4); + getBlock(container, blockData.getBlockID()); ChunkInfo readChunk = ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0)); Assert.assertEquals(info.getChecksum(), readChunk.getChecksum()); @@ -666,7 +668,7 @@ public void testPutBlockWithLotsOfChunks() throws IOException, blockData.setChunks(chunkProtoList); blockManager.putBlock(container, blockData); BlockData readBlockData = blockManager. - getBlock(container, blockData.getBlockID(), 0); + getBlock(container, blockData.getBlockID()); ChunkInfo lastChunk = chunkList.get(chunkList.size() - 1); ChunkInfo readChunk = ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(readBlockData @@ -694,7 +696,7 @@ public void testDeleteBlock() throws IOException, NoSuchAlgorithmException { blockManager.deleteBlock(container, blockID); exception.expect(StorageContainerException.class); exception.expectMessage("Unable to find the block."); - blockManager.getBlock(container, blockData.getBlockID(), 0); + blockManager.getBlock(container, blockData.getBlockID()); } /** diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java index 834dff0375..974bb97f05 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java @@ -239,16 +239,18 @@ public void tesPutKeyResposne() throws Exception { ContainerTestHelper .getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk()); response = client.sendCommand(putKeyRequest).getPutBlock(); + Assert.assertEquals( + response.getCommittedBlockLength().getBlockLength(), data.length); + Assert.assertTrue(response.getCommittedBlockLength().getBlockID() + .getBlockCommitSequenceId() > 0); + BlockID responseBlockID = BlockID + .getFromProtobuf(response.getCommittedBlockLength().getBlockID()); + blockID + .setBlockCommitSequenceId(responseBlockID.getBlockCommitSequenceId()); // make sure the block ids in the request and response are same. // This will also ensure that closing the container committed the block // on the Datanodes. - Assert.assertEquals(BlockID - .getFromProtobuf(response.getCommittedBlockLength().getBlockID()), - blockID); - Assert.assertEquals( - response.getCommittedBlockLength().getBlockLength(), data.length); - Assert.assertTrue( - response.getCommittedBlockLength().getBlockCommitSequenceId() > 0); + Assert.assertEquals(responseBlockID, blockID); xceiverClientManager.releaseClient(client); } } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java index 08905ebe0b..d83d9a3261 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java @@ -700,7 +700,7 @@ public void testDeleteKey() throws Exception { KeyValueContainer container = (KeyValueContainer) cm.getContainerSet() .getContainer(location.getBlockID().getContainerID()); BlockData blockInfo = keyValueHandler.getBlockManager() - .getBlock(container, location.getBlockID(), 0); + .getBlock(container, location.getBlockID()); KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); File dataDir = new File(containerData.getChunksPath()); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 340197f588..733ed85173 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.om; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; @@ -169,10 +170,9 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID) throw ex; } OmKeyLocationInfo info = new OmKeyLocationInfo.Builder() - .setBlockID(allocatedBlock.getBlockID()) + .setBlockID(new BlockID(allocatedBlock.getBlockID())) .setShouldCreateContainer(allocatedBlock.getCreateContainer()) .setLength(scmBlockSize) - .setBlockCommitSequenceId(0) .setOffset(0) .build(); // current version not committed, so new blocks coming now are added to @@ -234,10 +234,9 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { throw ex; } OmKeyLocationInfo subKeyInfo = new OmKeyLocationInfo.Builder() - .setBlockID(allocatedBlock.getBlockID()) + .setBlockID(new BlockID(allocatedBlock.getBlockID())) .setShouldCreateContainer(allocatedBlock.getCreateContainer()) .setLength(allocateSize) - .setBlockCommitSequenceId(0) .setOffset(0) .build(); locations.add(subKeyInfo); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java index 259f8420ee..2076ced9ee 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java @@ -21,6 +21,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ScmInfo; @@ -120,7 +121,7 @@ public AllocatedBlock allocateBlock(long size, long localID = Time.monotonicNow(); AllocatedBlock.Builder abb = new AllocatedBlock.Builder() - .setBlockID(new BlockID(containerID, localID)) + .setContainerBlockID(new ContainerBlockID(containerID, localID)) .setPipeline(pipeline) .setShouldCreateContainer(false); return abb.build();