diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 150b1d6bfe..c1d90a5c0b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -59,6 +59,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .WriteChunkRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. + PutSmallFileResponseProto; import org.apache.hadoop.hdds.client.BlockID; import java.io.IOException; @@ -231,10 +233,11 @@ public final class ContainerProtocolCalls { * @param blockID - ID of the block * @param data - Data to be written into the container. * @param traceID - Trace ID for logging purpose. + * @return container protocol writeSmallFile response * @throws IOException */ - public static void writeSmallFile(XceiverClientSpi client, - BlockID blockID, byte[] data, String traceID) + public static PutSmallFileResponseProto writeSmallFile( + XceiverClientSpi client, BlockID blockID, byte[] data, String traceID) throws IOException { BlockData containerBlockData = @@ -268,6 +271,7 @@ public final class ContainerProtocolCalls { .build(); ContainerCommandResponseProto response = client.sendCommand(request); validateContainerResponse(response); + return response.getPutSmallFile(); } /** diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index 1700e236b1..df26f24320 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -413,7 +413,7 @@ message PutSmallFileRequestProto { message PutSmallFileResponseProto { - + required GetCommittedBlockLengthResponseProto committedBlockLength = 1; } message GetSmallFileRequestProto { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index ac0833bb9a..d5762bcbbf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -481,10 +481,11 @@ public class ContainerStateMachine extends BaseStateMachine { getRequestProto(trx.getStateMachineLogEntry().getLogData()); Type cmdType = requestProto.getCmdType(); CompletableFuture future; - if (cmdType == Type.PutBlock) { + if (cmdType == Type.PutBlock || cmdType == Type.PutSmallFile) { BlockData blockData; - ContainerProtos.BlockData blockDataProto = - requestProto.getPutBlock().getBlockData(); + ContainerProtos.BlockData blockDataProto = cmdType == Type.PutBlock ? + requestProto.getPutBlock().getBlockData() : + requestProto.getPutSmallFile().getBlock().getBlockData(); // set the blockCommitSequenceId try { @@ -499,9 +500,20 @@ public class ContainerStateMachine extends BaseStateMachine { ContainerProtos.PutBlockRequestProto .newBuilder(requestProto.getPutBlock()) .setBlockData(blockData.getProtoBufMessage()).build(); - ContainerCommandRequestProto containerCommandRequestProto = - ContainerCommandRequestProto.newBuilder(requestProto) - .setPutBlock(putBlockRequestProto).build(); + ContainerCommandRequestProto containerCommandRequestProto; + if (cmdType == Type.PutSmallFile) { + ContainerProtos.PutSmallFileRequestProto smallFileRequestProto = + ContainerProtos.PutSmallFileRequestProto + .newBuilder(requestProto.getPutSmallFile()) + .setBlock(putBlockRequestProto).build(); + containerCommandRequestProto = + ContainerCommandRequestProto.newBuilder(requestProto) + .setPutSmallFile(smallFileRequestProto).build(); + } else { + containerCommandRequestProto = + ContainerCommandRequestProto.newBuilder(requestProto) + .setPutBlock(putBlockRequestProto).build(); + } future = CompletableFuture .supplyAsync(() -> runCommand(containerCommandRequestProto), getCommandExecutor(requestProto)); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 7aaa5e670d..4cb23ed418 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -756,8 +756,6 @@ public class KeyValueHandler extends Handler { try { BlockID blockID = BlockID.getFromProtobuf(getSmallFileReq.getBlock() .getBlockID()); - // TODO: add bcsId as a part of getSmallFile transaction - // by default its 0 BlockData responseData = blockManager.getBlock(kvContainer, blockID); ContainerProtos.ChunkInfo chunkInfo = null; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java index e085fb072a..667e66de71 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java @@ -188,7 +188,7 @@ public final class BlockUtils { return builder.build(); } - private static GetCommittedBlockLengthResponseProto.Builder + public static GetCommittedBlockLengthResponseProto.Builder getCommittedBlockLengthResponseBuilder(long blockLength, ContainerProtos.DatanodeBlockID blockID) { ContainerProtos.GetCommittedBlockLengthResponseProto.Builder diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java index dee0c11b97..e91c8a67fa 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java @@ -47,6 +47,13 @@ public final class SmallFileUtils { ContainerCommandRequestProto msg) { ContainerProtos.PutSmallFileResponseProto.Builder getResponse = ContainerProtos.PutSmallFileResponseProto.newBuilder(); + ContainerProtos.BlockData blockData = + msg.getPutSmallFile().getBlock().getBlockData(); + ContainerProtos.GetCommittedBlockLengthResponseProto.Builder + committedBlockLengthResponseBuilder = BlockUtils + .getCommittedBlockLengthResponseBuilder(blockData.getSize(), + blockData.getBlockID()); + getResponse.setCommittedBlockLength(committedBlockLengthResponseBuilder); ContainerCommandResponseProto.Builder builder = ContainerUtils.getSuccessResponseBuilder(msg); builder.setCmdType(ContainerProtos.Type.PutSmallFile); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index b3f0be70e4..0640649790 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -23,16 +23,9 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.ContainerAction.Action; -import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.ContainerAction.Reason; import org.apache.hadoop.hdds.scm.container. common.helpers.StorageContainerException; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; @@ -88,7 +81,6 @@ public class TestContainerStateMachineFailures { File baseDir = new File(path); baseDir.mkdirs(); - chunkSize = (int) OzoneConsts.MB; conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200, TimeUnit.MILLISECONDS); @@ -140,7 +132,6 @@ public class TestContainerStateMachineFailures { Assert.assertEquals(1, locationInfoList.size()); OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); - long containerID = omKeyLocationInfo.getContainerID(); // delete the container dir FileUtil.fullyDelete(new File( cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() @@ -171,15 +162,5 @@ public class TestContainerStateMachineFailures { Assert.assertTrue(((StorageContainerException) ioe.getCause()).getResult() == ContainerProtos.Result.CONTAINER_UNHEALTHY); } - StorageContainerDatanodeProtocolProtos.ContainerAction action = - StorageContainerDatanodeProtocolProtos.ContainerAction.newBuilder() - .setContainerID(containerID).setAction(Action.CLOSE) - .setReason(Reason.CONTAINER_UNHEALTHY) - .build(); - - // Make sure the container close action is initiated to SCM. - Assert.assertTrue( - cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().getContext() - .getAllPendingContainerActions().contains(action)); } -} +} \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java index 90dc2c47b0..ecf0d846b1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java @@ -152,7 +152,58 @@ public class TestContainerSmallFile { xceiverClientManager.releaseClient(client); } + @Test + public void testReadWriteWithBCSId() throws Exception { + String traceID = UUID.randomUUID().toString(); + ContainerWithPipeline container = + storageContainerLocationClient.allocateContainer( + HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE, containerOwner); + XceiverClientSpi client = xceiverClientManager + .acquireClient(container.getPipeline()); + ContainerProtocolCalls.createContainer(client, + container.getContainerInfo().getContainerID(), traceID); + BlockID blockID1 = ContainerTestHelper.getTestBlockID( + container.getContainerInfo().getContainerID()); + ContainerProtos.PutSmallFileResponseProto responseProto = + ContainerProtocolCalls + .writeSmallFile(client, blockID1, "data123".getBytes(), traceID); + long bcsId = responseProto.getCommittedBlockLength().getBlockID() + .getBlockCommitSequenceId(); + try { + blockID1.setBlockCommitSequenceId(bcsId + 1); + //read a file with higher bcsId than the container bcsId + ContainerProtocolCalls + .readSmallFile(client, blockID1, traceID); + Assert.fail("Expected exception not thrown"); + } catch (StorageContainerException sce) { + Assert + .assertTrue(sce.getResult() == ContainerProtos.Result.UNKNOWN_BCSID); + } + + // write a new block again to bump up the container bcsId + BlockID blockID2 = ContainerTestHelper + .getTestBlockID(container.getContainerInfo().getContainerID()); + ContainerProtocolCalls + .writeSmallFile(client, blockID2, "data123".getBytes(), traceID); + + try { + blockID1.setBlockCommitSequenceId(bcsId + 1); + //read a file with higher bcsId than the committed bcsId for the block + ContainerProtocolCalls.readSmallFile(client, blockID1, traceID); + Assert.fail("Expected exception not thrown"); + } catch (StorageContainerException sce) { + Assert + .assertTrue(sce.getResult() == ContainerProtos.Result.BCSID_MISMATCH); + } + blockID1.setBlockCommitSequenceId(bcsId); + ContainerProtos.GetSmallFileResponseProto response = + ContainerProtocolCalls.readSmallFile(client, blockID1, traceID); + String readData = response.getData().getData().toStringUtf8(); + Assert.assertEquals("data123", readData); + xceiverClientManager.releaseClient(client); + } }