HDDS-749. Restructure BlockId class in Ozone. Contributed by Shashikant Banerjee.

This commit is contained in:
Shashikant Banerjee 2018-10-30 14:15:27 +05:30
parent 486b9a4a75
commit 7757331dbc
29 changed files with 229 additions and 130 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.storage; package org.apache.hadoop.hdds.scm.storage;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
@ -57,7 +58,7 @@
*/ */
public class ChunkOutputStream extends OutputStream { public class ChunkOutputStream extends OutputStream {
private final BlockID blockID; private BlockID blockID;
private final String key; private final String key;
private final String traceID; private final String traceID;
private final BlockData.Builder containerBlockData; private final BlockData.Builder containerBlockData;
@ -67,7 +68,6 @@ public class ChunkOutputStream extends OutputStream {
private final String streamId; private final String streamId;
private int chunkIndex; private int chunkIndex;
private int chunkSize; private int chunkSize;
private long blockCommitSequenceId;
/** /**
* Creates a new ChunkOutputStream. * Creates a new ChunkOutputStream.
@ -96,15 +96,14 @@ public ChunkOutputStream(BlockID blockID, String key,
this.buffer = ByteBuffer.allocate(chunkSize); this.buffer = ByteBuffer.allocate(chunkSize);
this.streamId = UUID.randomUUID().toString(); this.streamId = UUID.randomUUID().toString();
this.chunkIndex = 0; this.chunkIndex = 0;
blockCommitSequenceId = 0;
} }
public ByteBuffer getBuffer() { public ByteBuffer getBuffer() {
return buffer; return buffer;
} }
public long getBlockCommitSequenceId() { public BlockID getBlockID() {
return blockCommitSequenceId; return blockID;
} }
@Override @Override
@ -165,8 +164,12 @@ public void close() throws IOException {
try { try {
ContainerProtos.PutBlockResponseProto responseProto = ContainerProtos.PutBlockResponseProto responseProto =
putBlock(xceiverClient, containerBlockData.build(), traceID); putBlock(xceiverClient, containerBlockData.build(), traceID);
blockCommitSequenceId = BlockID responseBlockID = BlockID.getFromProtobuf(
responseProto.getCommittedBlockLength().getBlockCommitSequenceId(); responseProto.getCommittedBlockLength().getBlockID());
Preconditions.checkState(blockID.getContainerBlockID()
.equals(responseBlockID.getContainerBlockID()));
// updates the bcsId of the block
blockID = responseBlockID;
} catch (IOException e) { } catch (IOException e) {
throw new IOException( throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e); "Unexpected Storage Container Exception: " + e.toString(), e);

View File

@ -23,52 +23,88 @@
import java.util.Objects; import java.util.Objects;
/** /**
* BlockID of ozone (containerID localID). * BlockID of Ozone (containerID + localID + blockCommitSequenceId).
*/ */
public class BlockID { public class BlockID {
private long containerID;
private long localID; private ContainerBlockID containerBlockID;
private long blockCommitSequenceId;
public BlockID(long containerID, long localID) { public BlockID(long containerID, long localID) {
this.containerID = containerID; this(containerID, localID, 0);
this.localID = localID; }
private BlockID(long containerID, long localID, long bcsID) {
containerBlockID = new ContainerBlockID(containerID, localID);
blockCommitSequenceId = bcsID;
}
public BlockID(ContainerBlockID containerBlockID) {
this(containerBlockID, 0);
}
private BlockID(ContainerBlockID containerBlockID, long bcsId) {
this.containerBlockID = containerBlockID;
blockCommitSequenceId = bcsId;
} }
public long getContainerID() { public long getContainerID() {
return containerID; return containerBlockID.getContainerID();
} }
public long getLocalID() { public long getLocalID() {
return localID; return containerBlockID.getLocalID();
}
public long getBlockCommitSequenceId() {
return blockCommitSequenceId;
}
public void setBlockCommitSequenceId(long blockCommitSequenceId) {
this.blockCommitSequenceId = blockCommitSequenceId;
}
public ContainerBlockID getContainerBlockID() {
return containerBlockID;
}
public void setContainerBlockID(ContainerBlockID containerBlockID) {
this.containerBlockID = containerBlockID;
} }
@Override @Override
public String toString() { public String toString() {
return new ToStringBuilder(this). return new ToStringBuilder(this)
append("containerID", containerID). .append("containerID", containerBlockID.getContainerID())
append("localID", localID). .append("localID", containerBlockID.getLocalID())
toString(); .append("blockCommitSequenceId", blockCommitSequenceId)
} .toString();
public HddsProtos.BlockID getProtobuf() {
return HddsProtos.BlockID.newBuilder().
setContainerID(containerID).setLocalID(localID).build();
}
public static BlockID getFromProtobuf(HddsProtos.BlockID blockID) {
return new BlockID(blockID.getContainerID(),
blockID.getLocalID());
} }
public ContainerProtos.DatanodeBlockID getDatanodeBlockIDProtobuf() { public ContainerProtos.DatanodeBlockID getDatanodeBlockIDProtobuf() {
return ContainerProtos.DatanodeBlockID.newBuilder(). return ContainerProtos.DatanodeBlockID.newBuilder().
setContainerID(containerID).setLocalID(localID).build(); setContainerID(containerBlockID.getContainerID())
.setLocalID(containerBlockID.getLocalID())
.setBlockCommitSequenceId(blockCommitSequenceId).build();
} }
public static BlockID getFromProtobuf( public static BlockID getFromProtobuf(
ContainerProtos.DatanodeBlockID blockID) { ContainerProtos.DatanodeBlockID blockID) {
return new BlockID(blockID.getContainerID(), return new BlockID(blockID.getContainerID(),
blockID.getLocalID()); blockID.getLocalID(), blockID.getBlockCommitSequenceId());
}
public HddsProtos.BlockID getProtobuf() {
return HddsProtos.BlockID.newBuilder()
.setContainerBlockID(containerBlockID.getProtobuf())
.setBlockCommitSequenceId(blockCommitSequenceId).build();
}
public static BlockID getFromProtobuf(HddsProtos.BlockID blockID) {
return new BlockID(
ContainerBlockID.getFromProtobuf(blockID.getContainerBlockID()),
blockID.getBlockCommitSequenceId());
} }
@Override @Override
@ -80,11 +116,14 @@ public boolean equals(Object o) {
return false; return false;
} }
BlockID blockID = (BlockID) o; BlockID blockID = (BlockID) o;
return containerID == blockID.containerID && localID == blockID.localID; return containerBlockID.equals(blockID.getContainerBlockID())
&& blockCommitSequenceId == blockID.getBlockCommitSequenceId();
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(containerID, localID); return Objects
.hash(containerBlockID.getContainerID(), containerBlockID.getLocalID(),
blockCommitSequenceId);
} }
} }

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.client;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import java.util.Objects;
/**
* BlockID returned by SCM during allocation of block (containerID + localID).
*/
public class ContainerBlockID {
private long containerID;
private long localID;
public ContainerBlockID(long containerID, long localID) {
this.containerID = containerID;
this.localID = localID;
}
public long getContainerID() {
return containerID;
}
public long getLocalID() {
return localID;
}
@Override
public String toString() {
return new ToStringBuilder(this).
append("containerID", containerID).
append("localID", localID).
toString();
}
public HddsProtos.ContainerBlockID getProtobuf() {
return HddsProtos.ContainerBlockID.newBuilder().
setContainerID(containerID).setLocalID(localID).build();
}
public static ContainerBlockID getFromProtobuf(
HddsProtos.ContainerBlockID containerBlockID) {
return new ContainerBlockID(containerBlockID.getContainerID(),
containerBlockID.getLocalID());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ContainerBlockID blockID = (ContainerBlockID) o;
return containerID == blockID.containerID && localID == blockID.localID;
}
@Override
public int hashCode() {
return Objects.hash(containerID, localID);
}
}

View File

@ -18,8 +18,8 @@
package org.apache.hadoop.hdds.scm.container.common.helpers; package org.apache.hadoop.hdds.scm.container.common.helpers;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.client.BlockID;
/** /**
* Allocated block wraps the result returned from SCM#allocateBlock which * Allocated block wraps the result returned from SCM#allocateBlock which
@ -27,7 +27,7 @@
*/ */
public final class AllocatedBlock { public final class AllocatedBlock {
private Pipeline pipeline; private Pipeline pipeline;
private BlockID blockID; private ContainerBlockID containerBlockID;
// Indicates whether the client should create container before writing block. // Indicates whether the client should create container before writing block.
private boolean shouldCreateContainer; private boolean shouldCreateContainer;
@ -36,7 +36,7 @@ public final class AllocatedBlock {
*/ */
public static class Builder { public static class Builder {
private Pipeline pipeline; private Pipeline pipeline;
private BlockID blockID; private ContainerBlockID containerBlockID;
private boolean shouldCreateContainer; private boolean shouldCreateContainer;
public Builder setPipeline(Pipeline p) { public Builder setPipeline(Pipeline p) {
@ -44,8 +44,8 @@ public Builder setPipeline(Pipeline p) {
return this; return this;
} }
public Builder setBlockID(BlockID blockId) { public Builder setContainerBlockID(ContainerBlockID blockId) {
this.blockID = blockId; this.containerBlockID = blockId;
return this; return this;
} }
@ -55,14 +55,15 @@ public Builder setShouldCreateContainer(boolean shouldCreate) {
} }
public AllocatedBlock build() { public AllocatedBlock build() {
return new AllocatedBlock(pipeline, blockID, shouldCreateContainer); return new AllocatedBlock(pipeline, containerBlockID,
shouldCreateContainer);
} }
} }
private AllocatedBlock(Pipeline pipeline, BlockID blockID, private AllocatedBlock(Pipeline pipeline, ContainerBlockID containerBlockID,
boolean shouldCreateContainer) { boolean shouldCreateContainer) {
this.pipeline = pipeline; this.pipeline = pipeline;
this.blockID = blockID; this.containerBlockID = containerBlockID;
this.shouldCreateContainer = shouldCreateContainer; this.shouldCreateContainer = shouldCreateContainer;
} }
@ -70,8 +71,8 @@ public Pipeline getPipeline() {
return pipeline; return pipeline;
} }
public BlockID getBlockID() { public ContainerBlockID getBlockID() {
return blockID; return containerBlockID;
} }
public boolean getCreateContainer() { public boolean getCreateContainer() {

View File

@ -20,7 +20,7 @@
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@ -102,7 +102,8 @@ public AllocatedBlock allocateBlock(long size,
response.getErrorMessage() : "Allocate block failed."); response.getErrorMessage() : "Allocate block failed.");
} }
AllocatedBlock.Builder builder = new AllocatedBlock.Builder() AllocatedBlock.Builder builder = new AllocatedBlock.Builder()
.setBlockID(BlockID.getFromProtobuf(response.getBlockID())) .setContainerBlockID(
ContainerBlockID.getFromProtobuf(response.getContainerBlockID()))
.setPipeline(Pipeline.getFromProtobuf(response.getPipeline())) .setPipeline(Pipeline.getFromProtobuf(response.getPipeline()))
.setShouldCreateContainer(response.getCreateContainer()); .setShouldCreateContainer(response.getCreateContainer());
return builder.build(); return builder.build();

View File

@ -81,17 +81,14 @@ private ContainerProtocolCalls() {
* @param xceiverClient client to perform call * @param xceiverClient client to perform call
* @param datanodeBlockID blockID to identify container * @param datanodeBlockID blockID to identify container
* @param traceID container protocol call args * @param traceID container protocol call args
* @param blockCommitSequenceId latest commit Id of the block
* @return container protocol get block response * @return container protocol get block response
* @throws IOException if there is an I/O error while performing the call * @throws IOException if there is an I/O error while performing the call
*/ */
public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
DatanodeBlockID datanodeBlockID, String traceID, DatanodeBlockID datanodeBlockID, String traceID) throws IOException {
long blockCommitSequenceId) throws IOException {
GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto
.newBuilder() .newBuilder()
.setBlockID(datanodeBlockID) .setBlockID(datanodeBlockID);
.setBlockCommitSequenceId(blockCommitSequenceId);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto request = ContainerCommandRequestProto ContainerCommandRequestProto request = ContainerCommandRequestProto

View File

@ -61,7 +61,8 @@ public KeyBlocks getProto() {
public static BlockGroup getFromProto(KeyBlocks proto) { public static BlockGroup getFromProto(KeyBlocks proto) {
List<BlockID> blockIDs = new ArrayList<>(); List<BlockID> blockIDs = new ArrayList<>();
for (HddsProtos.BlockID block : proto.getBlocksList()) { for (HddsProtos.BlockID block : proto.getBlocksList()) {
blockIDs.add(new BlockID(block.getContainerID(), block.getLocalID())); blockIDs.add(new BlockID(block.getContainerBlockID().getContainerID(),
block.getContainerBlockID().getLocalID()));
} }
return BlockGroup.newBuilder().setKeyName(proto.getKey()) return BlockGroup.newBuilder().setKeyName(proto.getKey())
.addAllBlockIDs(blockIDs).build(); .addAllBlockIDs(blockIDs).build();

View File

@ -34,7 +34,6 @@
public class BlockData { public class BlockData {
private final BlockID blockID; private final BlockID blockID;
private final Map<String, String> metadata; private final Map<String, String> metadata;
private long blockCommitSequenceId;
/** /**
* Represent a list of chunks. * Represent a list of chunks.
@ -65,15 +64,14 @@ public BlockData(BlockID blockID) {
this.blockID = blockID; this.blockID = blockID;
this.metadata = new TreeMap<>(); this.metadata = new TreeMap<>();
this.size = 0; this.size = 0;
blockCommitSequenceId = 0;
} }
public long getBlockCommitSequenceId() { public long getBlockCommitSequenceId() {
return blockCommitSequenceId; return blockID.getBlockCommitSequenceId();
} }
public void setBlockCommitSequenceId(long blockCommitSequenceId) { public void setBlockCommitSequenceId(long blockCommitSequenceId) {
this.blockCommitSequenceId = blockCommitSequenceId; this.blockID.setBlockCommitSequenceId(blockCommitSequenceId);
} }
/** /**
@ -95,7 +93,6 @@ public static BlockData getFromProtoBuf(ContainerProtos.BlockData data) throws
if (data.hasSize()) { if (data.hasSize()) {
Preconditions.checkArgument(data.getSize() == blockData.getSize()); Preconditions.checkArgument(data.getSize() == blockData.getSize());
} }
blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId());
return blockData; return blockData;
} }
@ -115,7 +112,6 @@ public ContainerProtos.BlockData getProtoBufMessage() {
} }
builder.addAllChunks(getChunks()); builder.addAllChunks(getChunks());
builder.setSize(size); builder.setSize(size);
builder.setBlockCommitSequenceId(blockCommitSequenceId);
return builder.build(); return builder.build();
} }

View File

@ -76,7 +76,7 @@ public AllocateScmBlockResponseProto allocateScmBlock(
if (allocatedBlock != null) { if (allocatedBlock != null) {
return return
AllocateScmBlockResponseProto.newBuilder() AllocateScmBlockResponseProto.newBuilder()
.setBlockID(allocatedBlock.getBlockID().getProtobuf()) .setContainerBlockID(allocatedBlock.getBlockID().getProtobuf())
.setPipeline(allocatedBlock.getPipeline().getProtobufMessage()) .setPipeline(allocatedBlock.getPipeline().getProtobufMessage())
.setCreateContainer(allocatedBlock.getCreateContainer()) .setCreateContainer(allocatedBlock.getCreateContainer())
.setErrorCode(AllocateScmBlockResponseProto.Error.success) .setErrorCode(AllocateScmBlockResponseProto.Error.success)

View File

@ -150,6 +150,7 @@ enum Result {
message DatanodeBlockID { message DatanodeBlockID {
required int64 containerID = 1; required int64 containerID = 1;
required int64 localID = 2; required int64 localID = 2;
optional uint64 blockCommitSequenceId = 3 [default = 0];
} }
message KeyValue { message KeyValue {
@ -303,7 +304,6 @@ message BlockData {
repeated KeyValue metadata = 3; repeated KeyValue metadata = 3;
repeated ChunkInfo chunks = 4; repeated ChunkInfo chunks = 4;
optional int64 size = 5; optional int64 size = 5;
optional uint64 blockCommitSequenceId = 6;
} }
// Block Messages. // Block Messages.
@ -317,7 +317,6 @@ message PutBlockResponseProto {
message GetBlockRequestProto { message GetBlockRequestProto {
required DatanodeBlockID blockID = 1; required DatanodeBlockID blockID = 1;
optional uint64 blockCommitSequenceId = 2 [default = 0];
} }
message GetBlockResponseProto { message GetBlockResponseProto {
@ -336,7 +335,6 @@ message GetCommittedBlockLengthRequestProto {
message GetCommittedBlockLengthResponseProto { message GetCommittedBlockLengthResponseProto {
required DatanodeBlockID blockID = 1; required DatanodeBlockID blockID = 1;
required int64 blockLength = 2; required int64 blockLength = 2;
optional uint64 blockCommitSequenceId = 3 [default = 0];
} }
message DeleteBlockResponseProto { message DeleteBlockResponseProto {

View File

@ -103,7 +103,7 @@ message AllocateScmBlockResponseProto {
unknownFailure = 4; unknownFailure = 4;
} }
required Error errorCode = 1; required Error errorCode = 1;
optional BlockID blockID = 2; optional ContainerBlockID containerBlockID = 2;
optional hadoop.hdds.Pipeline pipeline = 3; optional hadoop.hdds.Pipeline pipeline = 3;
optional bool createContainer = 4; optional bool createContainer = 4;
optional string errorMessage = 5; optional string errorMessage = 5;

View File

@ -186,7 +186,12 @@ enum ScmOps {
/** /**
* Block ID that uniquely identify a block by SCM. * Block ID that uniquely identify a block by SCM.
*/ */
message BlockID { message ContainerBlockID {
required int64 containerID = 1; required int64 containerID = 1;
required int64 localID = 2; required int64 localID = 2;
} }
message BlockID {
required ContainerBlockID containerBlockID = 1;
optional uint64 blockCommitSequenceId = 2 [default = 0];
}

View File

@ -483,8 +483,7 @@ ContainerCommandResponseProto handleGetBlock(
try { try {
BlockID blockID = BlockID.getFromProtobuf( BlockID blockID = BlockID.getFromProtobuf(
request.getGetBlock().getBlockID()); request.getGetBlock().getBlockID());
responseData = blockManager.getBlock(kvContainer, blockID, responseData = blockManager.getBlock(kvContainer, blockID);
request.getGetBlock().getBlockCommitSequenceId());
long numBytes = responseData.getProtoBufMessage().toByteArray().length; long numBytes = responseData.getProtoBufMessage().toByteArray().length;
metrics.incContainerBytesStats(Type.GetBlock, numBytes); metrics.incContainerBytesStats(Type.GetBlock, numBytes);
@ -759,7 +758,7 @@ ContainerCommandResponseProto handleGetSmallFile(
.getBlockID()); .getBlockID());
// TODO: add bcsId as a part of getSmallFile transaction // TODO: add bcsId as a part of getSmallFile transaction
// by default its 0 // by default its 0
BlockData responseData = blockManager.getBlock(kvContainer, blockID, 0); BlockData responseData = blockManager.getBlock(kvContainer, blockID);
ContainerProtos.ChunkInfo chunkInfo = null; ContainerProtos.ChunkInfo chunkInfo = null;
ByteString dataBuf = ByteString.EMPTY; ByteString dataBuf = ByteString.EMPTY;

View File

@ -139,8 +139,6 @@ public static ContainerCommandResponseProto putBlockResponseSuccess(
committedBlockLengthResponseBuilder = committedBlockLengthResponseBuilder =
getCommittedBlockLengthResponseBuilder(blockLength, getCommittedBlockLengthResponseBuilder(blockLength,
blockData.getBlockID()); blockData.getBlockID());
committedBlockLengthResponseBuilder
.setBlockCommitSequenceId(blockData.getBlockCommitSequenceId());
PutBlockResponseProto.Builder putKeyResponse = PutBlockResponseProto.Builder putKeyResponse =
PutBlockResponseProto.newBuilder(); PutBlockResponseProto.newBuilder();
putKeyResponse putKeyResponse

View File

@ -129,13 +129,13 @@ public long putBlock(Container container, BlockData data) throws IOException {
* *
* @param container - Container from which block need to be fetched. * @param container - Container from which block need to be fetched.
* @param blockID - BlockID of the block. * @param blockID - BlockID of the block.
* @param bcsId latest commit Id of the block
* @return Key Data. * @return Key Data.
* @throws IOException * @throws IOException
*/ */
@Override @Override
public BlockData getBlock(Container container, BlockID blockID, long bcsId) public BlockData getBlock(Container container, BlockID blockID)
throws IOException { throws IOException {
long bcsId = blockID.getBlockCommitSequenceId();
Preconditions.checkNotNull(blockID, Preconditions.checkNotNull(blockID,
"BlockID cannot be null in GetBlock request"); "BlockID cannot be null in GetBlock request");
Preconditions.checkNotNull(blockID.getContainerID(), Preconditions.checkNotNull(blockID.getContainerID(),
@ -162,7 +162,7 @@ public BlockData getBlock(Container container, BlockID blockID, long bcsId)
} }
ContainerProtos.BlockData blockData = ContainerProtos.BlockData blockData =
ContainerProtos.BlockData.parseFrom(kData); ContainerProtos.BlockData.parseFrom(kData);
long id = blockData.getBlockCommitSequenceId(); long id = blockData.getBlockID().getBlockCommitSequenceId();
if (id < bcsId) { if (id < bcsId) {
throw new StorageContainerException( throw new StorageContainerException(
"bcsId " + bcsId + " mismatches with existing block Id " "bcsId " + bcsId + " mismatches with existing block Id "

View File

@ -45,11 +45,10 @@ public interface BlockManager {
* *
* @param container - Container from which block need to be get. * @param container - Container from which block need to be get.
* @param blockID - BlockID of the Block. * @param blockID - BlockID of the Block.
* @param bcsId latest commit id of the block
* @return Block Data. * @return Block Data.
* @throws IOException * @throws IOException
*/ */
BlockData getBlock(Container container, BlockID blockID, long bcsId) BlockData getBlock(Container container, BlockID blockID)
throws IOException; throws IOException;
/** /**

View File

@ -113,7 +113,7 @@ public void testPutAndGetBlock() throws Exception {
assertEquals(1, keyValueContainer.getContainerData().getKeyCount()); assertEquals(1, keyValueContainer.getContainerData().getKeyCount());
//Get Block //Get Block
BlockData fromGetBlockData = blockManager.getBlock(keyValueContainer, BlockData fromGetBlockData = blockManager.getBlock(keyValueContainer,
blockData.getBlockID(), 0); blockData.getBlockID());
assertEquals(blockData.getContainerID(), fromGetBlockData.getContainerID()); assertEquals(blockData.getContainerID(), fromGetBlockData.getContainerID());
assertEquals(blockData.getLocalID(), fromGetBlockData.getLocalID()); assertEquals(blockData.getLocalID(), fromGetBlockData.getLocalID());
@ -139,7 +139,7 @@ public void testDeleteBlock() throws Exception {
assertEquals(0, assertEquals(0,
keyValueContainer.getContainerData().getKeyCount()); keyValueContainer.getContainerData().getKeyCount());
try { try {
blockManager.getBlock(keyValueContainer, blockID, 0); blockManager.getBlock(keyValueContainer, blockID);
fail("testDeleteBlock"); fail("testDeleteBlock");
} catch (StorageContainerException ex) { } catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains( GenericTestUtils.assertExceptionContains(
@ -197,7 +197,7 @@ public void testGetNoSuchBlock() throws Exception {
keyValueContainer.getContainerData().getKeyCount()); keyValueContainer.getContainerData().getKeyCount());
try { try {
//Since the block has been deleted, we should not be able to find it //Since the block has been deleted, we should not be able to find it
blockManager.getBlock(keyValueContainer, blockID, 0); blockManager.getBlock(keyValueContainer, blockID);
fail("testGetNoSuchBlock failed"); fail("testGetNoSuchBlock failed");
} catch (StorageContainerException ex) { } catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains( GenericTestUtils.assertExceptionContains(

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmUtils; import org.apache.hadoop.hdds.scm.ScmUtils;
@ -318,7 +319,7 @@ private AllocatedBlock newBlock(ContainerWithPipeline containerWithPipeline,
AllocatedBlock.Builder abb = AllocatedBlock.Builder abb =
new AllocatedBlock.Builder() new AllocatedBlock.Builder()
.setBlockID(new BlockID(containerID, localID)) .setContainerBlockID(new ContainerBlockID(containerID, localID))
.setPipeline(containerWithPipeline.getPipeline()) .setPipeline(containerWithPipeline.getPipeline())
.setShouldCreateContainer(createContainer); .setShouldCreateContainer(createContainer);
LOG.trace("New block allocated : {} Container ID: {}", localID, LOG.trace("New block allocated : {} Container ID: {}", localID,

View File

@ -297,8 +297,7 @@ public static LengthInputStream getFromOmKeyInfo(
ContainerProtos.DatanodeBlockID datanodeBlockID = blockID ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
.getDatanodeBlockIDProtobuf(); .getDatanodeBlockIDProtobuf();
ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClient, datanodeBlockID, requestId, .getBlock(xceiverClient, datanodeBlockID, requestId);
omKeyLocationInfo.getBlockCommitSequenceId());
List<ContainerProtos.ChunkInfo> chunks = List<ContainerProtos.ChunkInfo> chunks =
response.getBlockData().getChunksList(); response.getBlockData().getChunksList();
for (ContainerProtos.ChunkInfo chunk : chunks) { for (ContainerProtos.ChunkInfo chunk : chunks) {

View File

@ -128,7 +128,6 @@ public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID) new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID)
.setShouldCreateContainer(false) .setShouldCreateContainer(false)
.setLength(streamEntry.currentPosition).setOffset(0) .setLength(streamEntry.currentPosition).setOffset(0)
.setBlockCommitSequenceId(streamEntry.getBlockCommitSequenceId())
.build(); .build();
locationInfoList.add(info); locationInfoList.add(info);
} }
@ -614,7 +613,7 @@ public Builder setRetryPolicy(RetryPolicy rPolicy) {
private static class ChunkOutputStreamEntry extends OutputStream { private static class ChunkOutputStreamEntry extends OutputStream {
private OutputStream outputStream; private OutputStream outputStream;
private final BlockID blockID; private BlockID blockID;
private final String key; private final String key;
private final XceiverClientManager xceiverClientManager; private final XceiverClientManager xceiverClientManager;
private final XceiverClientSpi xceiverClient; private final XceiverClientSpi xceiverClient;
@ -700,6 +699,11 @@ public void flush() throws IOException {
public void close() throws IOException { public void close() throws IOException {
if (this.outputStream != null) { if (this.outputStream != null) {
this.outputStream.close(); this.outputStream.close();
// after closing the chunkOutPutStream, blockId would have been
// reconstructed with updated bcsId
if (this.outputStream instanceof ChunkOutputStream) {
this.blockID = ((ChunkOutputStream) outputStream).getBlockID();
}
} }
} }
@ -711,19 +715,6 @@ ByteBuffer getBuffer() throws IOException {
throw new IOException("Invalid Output Stream for Key: " + key); throw new IOException("Invalid Output Stream for Key: " + key);
} }
long getBlockCommitSequenceId() throws IOException {
if (this.outputStream instanceof ChunkOutputStream) {
ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
return out.getBlockCommitSequenceId();
} else if (outputStream == null) {
// For a pre allocated block for which no write has been initiated,
// the OutputStream will be null here.
// In such cases, the default blockCommitSequenceId will be 0
return 0;
}
throw new IOException("Invalid Output Stream for Key: " + key);
}
public void cleanup() { public void cleanup() {
checkStream(); checkStream();
if (this.outputStream instanceof ChunkOutputStream) { if (this.outputStream instanceof ChunkOutputStream) {

View File

@ -31,15 +31,13 @@ public final class OmKeyLocationInfo {
private final long offset; private final long offset;
// the version number indicating when this block was added // the version number indicating when this block was added
private long createVersion; private long createVersion;
private final long blockCommitSequenceId;
private OmKeyLocationInfo(BlockID blockID, boolean shouldCreateContainer, private OmKeyLocationInfo(BlockID blockID, boolean shouldCreateContainer,
long length, long offset, long blockCommitSequenceId) { long length, long offset) {
this.blockID = blockID; this.blockID = blockID;
this.shouldCreateContainer = shouldCreateContainer; this.shouldCreateContainer = shouldCreateContainer;
this.length = length; this.length = length;
this.offset = offset; this.offset = offset;
this.blockCommitSequenceId = blockCommitSequenceId;
} }
public void setCreateVersion(long version) { public void setCreateVersion(long version) {
@ -79,7 +77,7 @@ public long getOffset() {
} }
public long getBlockCommitSequenceId() { public long getBlockCommitSequenceId() {
return blockCommitSequenceId; return blockID.getBlockCommitSequenceId();
} }
/** /**
@ -90,7 +88,6 @@ public static class Builder {
private boolean shouldCreateContainer; private boolean shouldCreateContainer;
private long length; private long length;
private long offset; private long offset;
private long blockCommitSequenceId;
public Builder setBlockID(BlockID blockId) { public Builder setBlockID(BlockID blockId) {
this.blockID = blockId; this.blockID = blockId;
@ -112,14 +109,9 @@ public Builder setOffset(long off) {
return this; return this;
} }
public Builder setBlockCommitSequenceId(long sequenceId) {
this.blockCommitSequenceId = sequenceId;
return this;
}
public OmKeyLocationInfo build() { public OmKeyLocationInfo build() {
return new OmKeyLocationInfo(blockID, return new OmKeyLocationInfo(blockID,
shouldCreateContainer, length, offset, blockCommitSequenceId); shouldCreateContainer, length, offset);
} }
} }
@ -130,7 +122,6 @@ public KeyLocation getProtobuf() {
.setLength(length) .setLength(length)
.setOffset(offset) .setOffset(offset)
.setCreateVersion(createVersion) .setCreateVersion(createVersion)
.setBlockCommitSequenceId(blockCommitSequenceId)
.build(); .build();
} }
@ -139,8 +130,7 @@ public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) {
BlockID.getFromProtobuf(keyLocation.getBlockID()), BlockID.getFromProtobuf(keyLocation.getBlockID()),
keyLocation.getShouldCreateContainer(), keyLocation.getShouldCreateContainer(),
keyLocation.getLength(), keyLocation.getLength(),
keyLocation.getOffset(), keyLocation.getOffset());
keyLocation.getBlockCommitSequenceId());
info.setCreateVersion(keyLocation.getCreateVersion()); info.setCreateVersion(keyLocation.getCreateVersion());
return info; return info;
} }
@ -152,7 +142,6 @@ public String toString() {
", shouldCreateContainer=" + shouldCreateContainer + ", shouldCreateContainer=" + shouldCreateContainer +
", length=" + length + ", length=" + length +
", offset=" + offset + ", offset=" + offset +
", blockCommitSequenceId=" + blockCommitSequenceId +
", createVersion=" + createVersion + '}'; ", createVersion=" + createVersion + '}';
} }
} }

View File

@ -252,7 +252,6 @@ message KeyLocation {
required uint64 length = 4; required uint64 length = 4;
// indicated at which version this block gets created. // indicated at which version this block gets created.
optional uint64 createVersion = 5; optional uint64 createVersion = 5;
optional uint64 blockCommitSequenceId = 6;
} }
message KeyLocationList { message KeyLocationList {

View File

@ -151,7 +151,7 @@ public void testContainerReplication() throws Exception {
.getHandler(ContainerType.KeyValueContainer); .getHandler(ContainerType.KeyValueContainer);
BlockData key = handler.getBlockManager() BlockData key = handler.getBlockManager()
.getBlock(container, BlockID.getFromProtobuf(blockID), 0); .getBlock(container, BlockID.getFromProtobuf(blockID));
Assert.assertNotNull(key); Assert.assertNotNull(key);
Assert.assertEquals(1, key.getChunks().size()); Assert.assertEquals(1, key.getChunks().size());

View File

@ -256,6 +256,6 @@ public void testCloseContainer() throws Exception {
openContainerBlockMap.getBlockDataMap(testContainerID)); openContainerBlockMap.getBlockDataMap(testContainerID));
// Make sure the key got committed // Make sure the key got committed
Assert.assertNotNull(handler.getBlockManager() Assert.assertNotNull(handler.getBlockManager()
.getBlock(container, blockID, 0)); .getBlock(container, blockID));
} }
} }

View File

@ -558,7 +558,7 @@ public void testPutBlock() throws IOException, NoSuchAlgorithmException {
blockData.setChunks(chunkList); blockData.setChunks(chunkList);
blockManager.putBlock(container, blockData); blockManager.putBlock(container, blockData);
BlockData readBlockData = blockManager. BlockData readBlockData = blockManager.
getBlock(container, blockData.getBlockID(), 0); getBlock(container, blockData.getBlockID());
ChunkInfo readChunk = ChunkInfo readChunk =
ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0)); ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0));
Assert.assertEquals(info.getChecksum(), readChunk.getChecksum()); Assert.assertEquals(info.getChecksum(), readChunk.getChecksum());
@ -596,25 +596,27 @@ public void testPutBlockWithInvalidBCSId()
blockManager.putBlock(container, blockData); blockManager.putBlock(container, blockData);
BlockData readBlockData; BlockData readBlockData;
try { try {
blockID1.setBlockCommitSequenceId(5);
// read with bcsId higher than container bcsId // read with bcsId higher than container bcsId
blockManager. blockManager.
getBlock(container, blockID1, 5); getBlock(container, blockID1);
Assert.fail("Expected exception not thrown"); Assert.fail("Expected exception not thrown");
} catch (StorageContainerException sce) { } catch (StorageContainerException sce) {
Assert.assertTrue(sce.getResult() == UNKNOWN_BCSID); Assert.assertTrue(sce.getResult() == UNKNOWN_BCSID);
} }
try { try {
blockID1.setBlockCommitSequenceId(4);
// read with bcsId lower than container bcsId but greater than committed // read with bcsId lower than container bcsId but greater than committed
// bcsId. // bcsId.
blockManager. blockManager.
getBlock(container, blockID1, 4); getBlock(container, blockID1);
Assert.fail("Expected exception not thrown"); Assert.fail("Expected exception not thrown");
} catch (StorageContainerException sce) { } catch (StorageContainerException sce) {
Assert.assertTrue(sce.getResult() == BCSID_MISMATCH); Assert.assertTrue(sce.getResult() == BCSID_MISMATCH);
} }
readBlockData = blockManager. readBlockData = blockManager.
getBlock(container, blockData.getBlockID(), 4); getBlock(container, blockData.getBlockID());
ChunkInfo readChunk = ChunkInfo readChunk =
ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0)); ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0));
Assert.assertEquals(info.getChecksum(), readChunk.getChecksum()); Assert.assertEquals(info.getChecksum(), readChunk.getChecksum());
@ -666,7 +668,7 @@ public void testPutBlockWithLotsOfChunks() throws IOException,
blockData.setChunks(chunkProtoList); blockData.setChunks(chunkProtoList);
blockManager.putBlock(container, blockData); blockManager.putBlock(container, blockData);
BlockData readBlockData = blockManager. BlockData readBlockData = blockManager.
getBlock(container, blockData.getBlockID(), 0); getBlock(container, blockData.getBlockID());
ChunkInfo lastChunk = chunkList.get(chunkList.size() - 1); ChunkInfo lastChunk = chunkList.get(chunkList.size() - 1);
ChunkInfo readChunk = ChunkInfo readChunk =
ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(readBlockData ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(readBlockData
@ -694,7 +696,7 @@ public void testDeleteBlock() throws IOException, NoSuchAlgorithmException {
blockManager.deleteBlock(container, blockID); blockManager.deleteBlock(container, blockID);
exception.expect(StorageContainerException.class); exception.expect(StorageContainerException.class);
exception.expectMessage("Unable to find the block."); exception.expectMessage("Unable to find the block.");
blockManager.getBlock(container, blockData.getBlockID(), 0); blockManager.getBlock(container, blockData.getBlockID());
} }
/** /**

View File

@ -239,16 +239,18 @@ public void tesPutKeyResposne() throws Exception {
ContainerTestHelper ContainerTestHelper
.getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk()); .getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk());
response = client.sendCommand(putKeyRequest).getPutBlock(); response = client.sendCommand(putKeyRequest).getPutBlock();
Assert.assertEquals(
response.getCommittedBlockLength().getBlockLength(), data.length);
Assert.assertTrue(response.getCommittedBlockLength().getBlockID()
.getBlockCommitSequenceId() > 0);
BlockID responseBlockID = BlockID
.getFromProtobuf(response.getCommittedBlockLength().getBlockID());
blockID
.setBlockCommitSequenceId(responseBlockID.getBlockCommitSequenceId());
// make sure the block ids in the request and response are same. // make sure the block ids in the request and response are same.
// This will also ensure that closing the container committed the block // This will also ensure that closing the container committed the block
// on the Datanodes. // on the Datanodes.
Assert.assertEquals(BlockID Assert.assertEquals(responseBlockID, blockID);
.getFromProtobuf(response.getCommittedBlockLength().getBlockID()),
blockID);
Assert.assertEquals(
response.getCommittedBlockLength().getBlockLength(), data.length);
Assert.assertTrue(
response.getCommittedBlockLength().getBlockCommitSequenceId() > 0);
xceiverClientManager.releaseClient(client); xceiverClientManager.releaseClient(client);
} }
} }

View File

@ -700,7 +700,7 @@ public void testDeleteKey() throws Exception {
KeyValueContainer container = (KeyValueContainer) cm.getContainerSet() KeyValueContainer container = (KeyValueContainer) cm.getContainerSet()
.getContainer(location.getBlockID().getContainerID()); .getContainer(location.getBlockID().getContainerID());
BlockData blockInfo = keyValueHandler.getBlockManager() BlockData blockInfo = keyValueHandler.getBlockManager()
.getBlock(container, location.getBlockID(), 0); .getBlock(container, location.getBlockID());
KeyValueContainerData containerData = KeyValueContainerData containerData =
(KeyValueContainerData) container.getContainerData(); (KeyValueContainerData) container.getContainerData();
File dataDir = new File(containerData.getChunksPath()); File dataDir = new File(containerData.getChunksPath());

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.om; package org.apache.hadoop.ozone.om;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@ -169,10 +170,9 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
throw ex; throw ex;
} }
OmKeyLocationInfo info = new OmKeyLocationInfo.Builder() OmKeyLocationInfo info = new OmKeyLocationInfo.Builder()
.setBlockID(allocatedBlock.getBlockID()) .setBlockID(new BlockID(allocatedBlock.getBlockID()))
.setShouldCreateContainer(allocatedBlock.getCreateContainer()) .setShouldCreateContainer(allocatedBlock.getCreateContainer())
.setLength(scmBlockSize) .setLength(scmBlockSize)
.setBlockCommitSequenceId(0)
.setOffset(0) .setOffset(0)
.build(); .build();
// current version not committed, so new blocks coming now are added to // current version not committed, so new blocks coming now are added to
@ -234,10 +234,9 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
throw ex; throw ex;
} }
OmKeyLocationInfo subKeyInfo = new OmKeyLocationInfo.Builder() OmKeyLocationInfo subKeyInfo = new OmKeyLocationInfo.Builder()
.setBlockID(allocatedBlock.getBlockID()) .setBlockID(new BlockID(allocatedBlock.getBlockID()))
.setShouldCreateContainer(allocatedBlock.getCreateContainer()) .setShouldCreateContainer(allocatedBlock.getCreateContainer())
.setLength(allocateSize) .setLength(allocateSize)
.setBlockCommitSequenceId(0)
.setOffset(0) .setOffset(0)
.build(); .build();
locations.add(subKeyInfo); locations.add(subKeyInfo);

View File

@ -21,6 +21,7 @@
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.ScmInfo;
@ -120,7 +121,7 @@ public AllocatedBlock allocateBlock(long size,
long localID = Time.monotonicNow(); long localID = Time.monotonicNow();
AllocatedBlock.Builder abb = AllocatedBlock.Builder abb =
new AllocatedBlock.Builder() new AllocatedBlock.Builder()
.setBlockID(new BlockID(containerID, localID)) .setContainerBlockID(new ContainerBlockID(containerID, localID))
.setPipeline(pipeline) .setPipeline(pipeline)
.setShouldCreateContainer(false); .setShouldCreateContainer(false);
return abb.build(); return abb.build();