From 88371ff07fdddd29b9f064d14c544cdd3427dc86 Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Tue, 14 Mar 2017 21:28:23 -0700 Subject: [PATCH] HDFS-11491. Ozone: SCM: Add close container RPC. Contributed by Anu Engineer. --- .../proto/DatanodeContainerProtocol.proto | 20 +++ .../common/helpers/ContainerData.java | 50 ++++++++ .../container/common/impl/Dispatcher.java | 80 ++++++++++-- .../common/interfaces/ContainerManager.java | 19 +++ .../ozone/container/ContainerTestHelper.java | 18 +++ .../ozoneimpl/TestOzoneContainer.java | 119 +++++++++++++++++- 6 files changed, 293 insertions(+), 13 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto index 6566a71e1d..dfd4bc58bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto @@ -67,6 +67,12 @@ import "hdfs.proto"; * 13. ListChunk - Given a Container/Key returns the list of Chunks. * * 14. CompactChunk - Re-writes a chunk based on Offsets. + * + * 15. PutSmallFile - A single RPC that combines both putKey and WriteChunk. + * + * 16. GetSmallFile - A single RPC that combines both getKey and ReadChunk. + * + * 17. CloseContainer - Closes an open container and makes it immutable. */ enum Type { @@ -90,6 +96,7 @@ enum Type { /** Combines Key and Chunk Operation into Single RPC. */ PutSmallFile = 15; GetSmallFile = 16; + CloseContainer = 17; } @@ -116,6 +123,7 @@ enum Result { INVALID_ARGUMENT = 19; PUT_SMALL_FILE_ERROR = 20; GET_SMALL_FILE_ERROR = 21; + CLOSED_CONTAINER_IO = 22; } message ContainerCommandRequestProto { @@ -147,6 +155,7 @@ message ContainerCommandRequestProto { optional PutSmallFileRequestProto putSmallFile = 16; optional GetSmallFileRequestProto getSmallFile = 17; + optional CloseContainerRequestProto closeContainer = 18; } message ContainerCommandResponseProto { @@ -174,6 +183,7 @@ message ContainerCommandResponseProto { optional PutSmallFileResponseProto putSmallFile = 19; optional GetSmallFileResponseProto getSmallFile = 20; + optional CloseContainerResponseProto closeContainer = 21; } @@ -194,6 +204,8 @@ message ContainerData { repeated KeyValue metadata = 2; optional string dbPath = 3; optional string containerPath = 4; + optional bool open = 5 [default = true]; + optional string hash = 6; } message ContainerMeta { @@ -246,6 +258,14 @@ message ListContainerResponseProto { repeated ContainerData containerData = 1; } +message CloseContainerRequestProto { + required Pipeline pipeline = 1; +} + +message CloseContainerResponseProto { + optional Pipeline pipeline = 1; + optional string hash = 2; +} message KeyData { required string containerName = 1; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java index c6c432bb58..91f7cbe550 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java @@ -38,6 +38,8 @@ public class ContainerData { private String dbPath; // Path to Level DB Store. // Path to Physical file system where container and checksum are stored. private String containerFilePath; + private boolean open; + private String hash; /** * Constructs a ContainerData Object. @@ -71,6 +73,15 @@ public static ContainerData getFromProtBuf( data.setDBPath(protoData.getDbPath()); } + if (protoData.hasOpen()) { + data.setOpen(protoData.getOpen()); + } else { + data.setOpen(true); + } + + if(protoData.hasHash()) { + data.setHash(protoData.getHash()); + } return data; } @@ -98,6 +109,8 @@ public ContainerProtos.ContainerData getProtoBufMessage() { builder.addMetadata(keyValBuilder.setKey(entry.getKey()) .setValue(entry.getValue()).build()); } + + return builder.build(); } @@ -196,4 +209,41 @@ public void setContainerPath(String containerPath) { this.containerFilePath = containerPath; } + /** + * checks if the container is open. + * @return - boolean + */ + public boolean isOpen() { + return open; + } + + /** + * Marks this container as closed. + */ + public void closeContainer() { + this.open = false; + } + + /** + * Final hash for this container. + * @return - Hash + */ + public String getHash() { + return hash; + } + + public void setHash(String hash) { + this.hash = hash; + } + + + + /** + * Sets the open or closed values. + * @param open + */ + public void setOpen(boolean open) { + this.open = open; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java index ac13176357..93c9da69cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java @@ -43,13 +43,14 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.security.NoSuchAlgorithmException; import java.util.LinkedList; import java.util.List; -import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .Result.PUT_SMALL_FILE_ERROR; -import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .Result.GET_SMALL_FILE_ERROR; +import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO; +import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.GET_SMALL_FILE_ERROR; +import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.NO_SUCH_ALGORITHM; +import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR; /** * Ozone Container dispatcher takes a call from the netty server and routes it @@ -97,8 +98,9 @@ public ContainerCommandResponseProto dispatch( (cmdType == Type.DeleteContainer) || (cmdType == Type.ReadContainer) || (cmdType == Type.ListContainer) || - (cmdType == Type.UpdateContainer)) { - resp = containerProcessHandler(msg); + (cmdType == Type.UpdateContainer) || + (cmdType == Type.CloseContainer)) { + return containerProcessHandler(msg); } if ((cmdType == Type.PutKey) || @@ -167,6 +169,9 @@ private ContainerCommandResponseProto containerProcessHandler( case ReadContainer: return handleReadContainer(msg); + case CloseContainer: + return handleCloseContainer(msg); + default: return ContainerUtils.unsupportedRequest(msg); } @@ -274,6 +279,12 @@ private ContainerCommandResponseProto chunkProcessHandler( } } + /** + * Dispatch calls to small file hanlder. + * @param msg - request + * @return response + * @throws StorageContainerException + */ private ContainerCommandResponseProto smallFileHandler( ContainerCommandRequestProto msg) throws StorageContainerException { switch (msg.getCmdType()) { @@ -349,16 +360,46 @@ private ContainerCommandResponseProto handleCreateContainer( } ContainerData cData = ContainerData.getFromProtBuf( msg.getCreateContainer().getContainerData()); - Preconditions.checkNotNull(cData); + Preconditions.checkNotNull(cData, "Container data is null"); Pipeline pipeline = Pipeline.getFromProtoBuf( msg.getCreateContainer().getPipeline()); - Preconditions.checkNotNull(pipeline); + Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); this.containerManager.createContainer(pipeline, cData); return ContainerUtils.getContainerResponse(msg); } + /** + * closes an open container. + * + * @param msg - + * @return + * @throws IOException + */ + private ContainerCommandResponseProto handleCloseContainer( + ContainerCommandRequestProto msg) throws IOException { + try { + if (!msg.hasCloseContainer()) { + LOG.debug("Malformed close Container request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getCloseContainer() + .getPipeline()); + Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); + if (!this.containerManager.isOpen(pipeline.getContainerName())) { + throw new StorageContainerException("Attempting to close a closed " + + "container.", CLOSED_CONTAINER_IO); + } + this.containerManager.closeContainer(pipeline.getContainerName()); + return ContainerUtils.getContainerResponse(msg); + } catch (NoSuchAlgorithmException e) { + throw new StorageContainerException("No such Algorithm", e, + NO_SUCH_ALGORITHM); + } + } + /** * Calls into chunk manager to write a chunk. * @@ -373,11 +414,14 @@ private ContainerCommandResponseProto handleWriteChunk( msg.getTraceID()); return ContainerUtils.malformedRequest(msg); } - String keyName = msg.getWriteChunk().getKeyName(); Pipeline pipeline = Pipeline.getFromProtoBuf( msg.getWriteChunk().getPipeline()); Preconditions.checkNotNull(pipeline); + if (!this.containerManager.isOpen(pipeline.getContainerName())) { + throw new StorageContainerException("Write to closed container.", + CLOSED_CONTAINER_IO); + } ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getWriteChunk() .getChunkData()); @@ -437,7 +481,10 @@ private ContainerCommandResponseProto handleDeleteChunk( Pipeline pipeline = Pipeline.getFromProtoBuf( msg.getDeleteChunk().getPipeline()); Preconditions.checkNotNull(pipeline); - + if (!this.containerManager.isOpen(pipeline.getContainerName())) { + throw new StorageContainerException("Write to closed container.", + CLOSED_CONTAINER_IO); + } ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getDeleteChunk() .getChunkData()); Preconditions.checkNotNull(chunkInfo); @@ -463,6 +510,10 @@ private ContainerCommandResponseProto handlePutKey( } Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getPutKey().getPipeline()); Preconditions.checkNotNull(pipeline); + if (!this.containerManager.isOpen(pipeline.getContainerName())) { + throw new StorageContainerException("Write to closed container.", + CLOSED_CONTAINER_IO); + } KeyData keyData = KeyData.getFromProtoBuf(msg.getPutKey().getKeyData()); Preconditions.checkNotNull(keyData); this.containerManager.getKeyManager().putKey(pipeline, keyData); @@ -508,10 +559,13 @@ private ContainerCommandResponseProto handleDeleteKey( msg.getTraceID()); return ContainerUtils.malformedRequest(msg); } - Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getDeleteKey().getPipeline()); Preconditions.checkNotNull(pipeline); + if (!this.containerManager.isOpen(pipeline.getContainerName())) { + throw new StorageContainerException("Write to closed container.", + CLOSED_CONTAINER_IO); + } String keyName = msg.getDeleteKey().getName(); Preconditions.checkNotNull(keyName); Preconditions.checkState(!keyName.isEmpty()); @@ -541,6 +595,10 @@ private ContainerCommandResponseProto handlePutSmallFile( .getKey().getPipeline()); Preconditions.checkNotNull(pipeline); + if (!this.containerManager.isOpen(pipeline.getContainerName())) { + throw new StorageContainerException("Write to closed container.", + CLOSED_CONTAINER_IO); + } KeyData keyData = KeyData.getFromProtoBuf(msg.getPutSmallFile().getKey() .getKeyData()); ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getPutSmallFile() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java index f3ae10516d..19ce6598ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.scm.container.common.helpers.Pipeline; import java.io.IOException; +import java.security.NoSuchAlgorithmException; import java.util.List; /** @@ -92,6 +93,24 @@ void listContainer(String prefix, long count, String prevKey, ContainerData readContainer(String containerName) throws StorageContainerException; + /** + * Closes a open container, if it is already closed or does not exist a + * StorageContainerException is thrown. + * @param containerName - Name of the container. + * @throws StorageContainerException + */ + void closeContainer(String containerName) + throws StorageContainerException, NoSuchAlgorithmException; + + /** + * Checks if a container exists. + * @param containerName - Name of the container. + * @return true if the container is open false otherwise. + * @throws StorageContainerException - Throws Exception if we are not + * able to find the container. + */ + boolean isOpen(String containerName) throws StorageContainerException; + /** * Supports clean shutdown of container. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 6c24527aec..df96db57fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container; +import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; @@ -394,4 +395,21 @@ public static ContainerCommandRequestProto getDeleteKeyRequest( return request.build(); } + /** + * Returns a close container request. + * @param pipeline - pipeline + * @return ContainerCommandRequestProto. + */ + public static ContainerCommandRequestProto getCloseContainer( + Pipeline pipeline) { + Preconditions.checkNotNull(pipeline); + ContainerProtos.CloseContainerRequestProto closeReqeuest = + ContainerProtos.CloseContainerRequestProto.newBuilder().setPipeline( + pipeline.getProtobufMessage()).build(); + ContainerProtos.ContainerCommandRequestProto cmd = + ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos + .Type.CloseContainer).setCloseContainer(closeReqeuest).build(); + return cmd; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index f5f1de4e5d..11d57cef9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -56,7 +56,7 @@ public void testCreateOzoneContainer() throws Exception { OzoneContainer container = null; MiniOzoneCluster cluster = null; try { - cluster = new MiniOzoneCluster.Builder(conf) + cluster = new MiniOzoneCluster.Builder(conf) .setHandlerType("distributed").build(); // We don't start Ozone Container via data node, we will do it // independently in our test path. @@ -79,7 +79,7 @@ public void testCreateOzoneContainer() throws Exception { if (container != null) { container.stop(); } - if(cluster != null) { + if (cluster != null) { cluster.shutdown(); } } @@ -250,4 +250,119 @@ public void testBothGetandPutSmallFile() throws Exception { } } + private void testCloseContainer() throws Exception { + MiniOzoneCluster cluster = null; + XceiverClient client = null; + try { + + String keyName = OzoneUtils.getRequestID(); + String containerName = OzoneUtils.getRequestID(); + OzoneConfiguration conf = new OzoneConfiguration(); + URL p = conf.getClass().getResource(""); + String path = p.getPath().concat( + TestOzoneContainer.class.getSimpleName()); + path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, + OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT); + conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path); + + // Start ozone container Via Datanode create. + + Pipeline pipeline = + ContainerTestHelper.createSingleNodePipeline(containerName); + conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, + pipeline.getLeader().getContainerPort()); + + cluster = new MiniOzoneCluster.Builder(conf) + .setHandlerType("distributed").build(); + + // This client talks to ozone container via datanode. + client = new XceiverClient(pipeline, conf); + client.connect(); + + + ContainerProtos.ContainerCommandRequestProto writeChunkRequest = + ContainerTestHelper.getWriteChunkRequest(pipeline, containerName, + keyName, 1024); + + ContainerProtos.ContainerCommandRequestProto request; + ContainerProtos.ContainerCommandResponseProto response; + + ContainerProtos.ContainerCommandRequestProto putKeyRequest = + ContainerTestHelper.getPutKeyRequest(writeChunkRequest + .getWriteChunk()); + + // Write Chunk before closing + response = client.sendCommand(writeChunkRequest); + Assert.assertNotNull(response); + Assert.assertEquals(ContainerProtos.Result.SUCCESS, + response.getResult()); + Assert.assertTrue(writeChunkRequest.getTraceID().equals(response + .getTraceID())); + + + // Put key before closing. + response = client.sendCommand(putKeyRequest); + Assert.assertNotNull(response); + Assert.assertEquals(ContainerProtos.Result.SUCCESS, + response.getResult()); + Assert.assertTrue( + putKeyRequest.getTraceID().equals(response.getTraceID())); + + // Close the contianer. + request = ContainerTestHelper.getCloseContainer(pipeline); + response = client.sendCommand(request); + Assert.assertNotNull(response); + Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); + + + // Assert that none of the write operations are working after close. + + // Write chunks should fail now. + + response = client.sendCommand(writeChunkRequest); + Assert.assertNotNull(response); + Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO, + response.getResult()); + Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); + + // Read chunk must work on a closed container. + request = ContainerTestHelper.getReadChunkRequest(writeChunkRequest + .getWriteChunk()); + response = client.sendCommand(request); + Assert.assertNotNull(response); + Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); + + + // Put key will fail on a closed container. + response = client.sendCommand(putKeyRequest); + Assert.assertNotNull(response); + Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO, + response.getResult()); + Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); + + // Get key must work on the closed container. + request = ContainerTestHelper.getKeyRequest(putKeyRequest.getPutKey()); + response = client.sendCommand(request); + ContainerTestHelper.verifyGetKey(request, response); + + // Delete Key must fail on a closed container. + request = + ContainerTestHelper.getDeleteKeyRequest(putKeyRequest.getPutKey()); + response = client.sendCommand(request); + Assert.assertNotNull(response); + Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO, + response.getResult()); + Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); + } finally { + if (client != null) { + client.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + } + }