HDFS-11491. Ozone: SCM: Add close container RPC. Contributed by Anu Engineer.
This commit is contained in:
parent
932423211f
commit
88371ff07f
@ -67,6 +67,12 @@ import "hdfs.proto";
|
||||
* 13. ListChunk - Given a Container/Key returns the list of Chunks.
|
||||
*
|
||||
* 14. CompactChunk - Re-writes a chunk based on Offsets.
|
||||
*
|
||||
* 15. PutSmallFile - A single RPC that combines both putKey and WriteChunk.
|
||||
*
|
||||
* 16. GetSmallFile - A single RPC that combines both getKey and ReadChunk.
|
||||
*
|
||||
* 17. CloseContainer - Closes an open container and makes it immutable.
|
||||
*/
|
||||
|
||||
enum Type {
|
||||
@ -90,6 +96,7 @@ enum Type {
|
||||
/** Combines Key and Chunk Operation into Single RPC. */
|
||||
PutSmallFile = 15;
|
||||
GetSmallFile = 16;
|
||||
CloseContainer = 17;
|
||||
|
||||
}
|
||||
|
||||
@ -116,6 +123,7 @@ enum Result {
|
||||
INVALID_ARGUMENT = 19;
|
||||
PUT_SMALL_FILE_ERROR = 20;
|
||||
GET_SMALL_FILE_ERROR = 21;
|
||||
CLOSED_CONTAINER_IO = 22;
|
||||
}
|
||||
|
||||
message ContainerCommandRequestProto {
|
||||
@ -147,6 +155,7 @@ message ContainerCommandRequestProto {
|
||||
|
||||
optional PutSmallFileRequestProto putSmallFile = 16;
|
||||
optional GetSmallFileRequestProto getSmallFile = 17;
|
||||
optional CloseContainerRequestProto closeContainer = 18;
|
||||
}
|
||||
|
||||
message ContainerCommandResponseProto {
|
||||
@ -174,6 +183,7 @@ message ContainerCommandResponseProto {
|
||||
|
||||
optional PutSmallFileResponseProto putSmallFile = 19;
|
||||
optional GetSmallFileResponseProto getSmallFile = 20;
|
||||
optional CloseContainerResponseProto closeContainer = 21;
|
||||
|
||||
}
|
||||
|
||||
@ -194,6 +204,8 @@ message ContainerData {
|
||||
repeated KeyValue metadata = 2;
|
||||
optional string dbPath = 3;
|
||||
optional string containerPath = 4;
|
||||
optional bool open = 5 [default = true];
|
||||
optional string hash = 6;
|
||||
}
|
||||
|
||||
message ContainerMeta {
|
||||
@ -246,6 +258,14 @@ message ListContainerResponseProto {
|
||||
repeated ContainerData containerData = 1;
|
||||
}
|
||||
|
||||
message CloseContainerRequestProto {
|
||||
required Pipeline pipeline = 1;
|
||||
}
|
||||
|
||||
message CloseContainerResponseProto {
|
||||
optional Pipeline pipeline = 1;
|
||||
optional string hash = 2;
|
||||
}
|
||||
|
||||
message KeyData {
|
||||
required string containerName = 1;
|
||||
|
@ -38,6 +38,8 @@ public class ContainerData {
|
||||
private String dbPath; // Path to Level DB Store.
|
||||
// Path to Physical file system where container and checksum are stored.
|
||||
private String containerFilePath;
|
||||
private boolean open;
|
||||
private String hash;
|
||||
|
||||
/**
|
||||
* Constructs a ContainerData Object.
|
||||
@ -71,6 +73,15 @@ public static ContainerData getFromProtBuf(
|
||||
data.setDBPath(protoData.getDbPath());
|
||||
}
|
||||
|
||||
if (protoData.hasOpen()) {
|
||||
data.setOpen(protoData.getOpen());
|
||||
} else {
|
||||
data.setOpen(true);
|
||||
}
|
||||
|
||||
if(protoData.hasHash()) {
|
||||
data.setHash(protoData.getHash());
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
@ -98,6 +109,8 @@ public ContainerProtos.ContainerData getProtoBufMessage() {
|
||||
builder.addMetadata(keyValBuilder.setKey(entry.getKey())
|
||||
.setValue(entry.getValue()).build());
|
||||
}
|
||||
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@ -196,4 +209,41 @@ public void setContainerPath(String containerPath) {
|
||||
this.containerFilePath = containerPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* checks if the container is open.
|
||||
* @return - boolean
|
||||
*/
|
||||
public boolean isOpen() {
|
||||
return open;
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks this container as closed.
|
||||
*/
|
||||
public void closeContainer() {
|
||||
this.open = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Final hash for this container.
|
||||
* @return - Hash
|
||||
*/
|
||||
public String getHash() {
|
||||
return hash;
|
||||
}
|
||||
|
||||
public void setHash(String hash) {
|
||||
this.hash = hash;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Sets the open or closed values.
|
||||
* @param open
|
||||
*/
|
||||
public void setOpen(boolean open) {
|
||||
this.open = open;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -43,13 +43,14 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||
.Result.PUT_SMALL_FILE_ERROR;
|
||||
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||
.Result.GET_SMALL_FILE_ERROR;
|
||||
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO;
|
||||
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.GET_SMALL_FILE_ERROR;
|
||||
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.NO_SUCH_ALGORITHM;
|
||||
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
|
||||
|
||||
/**
|
||||
* Ozone Container dispatcher takes a call from the netty server and routes it
|
||||
@ -97,8 +98,9 @@ public ContainerCommandResponseProto dispatch(
|
||||
(cmdType == Type.DeleteContainer) ||
|
||||
(cmdType == Type.ReadContainer) ||
|
||||
(cmdType == Type.ListContainer) ||
|
||||
(cmdType == Type.UpdateContainer)) {
|
||||
resp = containerProcessHandler(msg);
|
||||
(cmdType == Type.UpdateContainer) ||
|
||||
(cmdType == Type.CloseContainer)) {
|
||||
return containerProcessHandler(msg);
|
||||
}
|
||||
|
||||
if ((cmdType == Type.PutKey) ||
|
||||
@ -167,6 +169,9 @@ private ContainerCommandResponseProto containerProcessHandler(
|
||||
case ReadContainer:
|
||||
return handleReadContainer(msg);
|
||||
|
||||
case CloseContainer:
|
||||
return handleCloseContainer(msg);
|
||||
|
||||
default:
|
||||
return ContainerUtils.unsupportedRequest(msg);
|
||||
}
|
||||
@ -274,6 +279,12 @@ private ContainerCommandResponseProto chunkProcessHandler(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatch calls to small file hanlder.
|
||||
* @param msg - request
|
||||
* @return response
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
private ContainerCommandResponseProto smallFileHandler(
|
||||
ContainerCommandRequestProto msg) throws StorageContainerException {
|
||||
switch (msg.getCmdType()) {
|
||||
@ -349,16 +360,46 @@ private ContainerCommandResponseProto handleCreateContainer(
|
||||
}
|
||||
ContainerData cData = ContainerData.getFromProtBuf(
|
||||
msg.getCreateContainer().getContainerData());
|
||||
Preconditions.checkNotNull(cData);
|
||||
Preconditions.checkNotNull(cData, "Container data is null");
|
||||
|
||||
Pipeline pipeline = Pipeline.getFromProtoBuf(
|
||||
msg.getCreateContainer().getPipeline());
|
||||
Preconditions.checkNotNull(pipeline);
|
||||
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
|
||||
|
||||
this.containerManager.createContainer(pipeline, cData);
|
||||
return ContainerUtils.getContainerResponse(msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* closes an open container.
|
||||
*
|
||||
* @param msg -
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
private ContainerCommandResponseProto handleCloseContainer(
|
||||
ContainerCommandRequestProto msg) throws IOException {
|
||||
try {
|
||||
if (!msg.hasCloseContainer()) {
|
||||
LOG.debug("Malformed close Container request. trace ID: {}",
|
||||
msg.getTraceID());
|
||||
return ContainerUtils.malformedRequest(msg);
|
||||
}
|
||||
Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getCloseContainer()
|
||||
.getPipeline());
|
||||
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
|
||||
if (!this.containerManager.isOpen(pipeline.getContainerName())) {
|
||||
throw new StorageContainerException("Attempting to close a closed " +
|
||||
"container.", CLOSED_CONTAINER_IO);
|
||||
}
|
||||
this.containerManager.closeContainer(pipeline.getContainerName());
|
||||
return ContainerUtils.getContainerResponse(msg);
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
throw new StorageContainerException("No such Algorithm", e,
|
||||
NO_SUCH_ALGORITHM);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls into chunk manager to write a chunk.
|
||||
*
|
||||
@ -373,11 +414,14 @@ private ContainerCommandResponseProto handleWriteChunk(
|
||||
msg.getTraceID());
|
||||
return ContainerUtils.malformedRequest(msg);
|
||||
}
|
||||
|
||||
String keyName = msg.getWriteChunk().getKeyName();
|
||||
Pipeline pipeline = Pipeline.getFromProtoBuf(
|
||||
msg.getWriteChunk().getPipeline());
|
||||
Preconditions.checkNotNull(pipeline);
|
||||
if (!this.containerManager.isOpen(pipeline.getContainerName())) {
|
||||
throw new StorageContainerException("Write to closed container.",
|
||||
CLOSED_CONTAINER_IO);
|
||||
}
|
||||
|
||||
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getWriteChunk()
|
||||
.getChunkData());
|
||||
@ -437,7 +481,10 @@ private ContainerCommandResponseProto handleDeleteChunk(
|
||||
Pipeline pipeline = Pipeline.getFromProtoBuf(
|
||||
msg.getDeleteChunk().getPipeline());
|
||||
Preconditions.checkNotNull(pipeline);
|
||||
|
||||
if (!this.containerManager.isOpen(pipeline.getContainerName())) {
|
||||
throw new StorageContainerException("Write to closed container.",
|
||||
CLOSED_CONTAINER_IO);
|
||||
}
|
||||
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getDeleteChunk()
|
||||
.getChunkData());
|
||||
Preconditions.checkNotNull(chunkInfo);
|
||||
@ -463,6 +510,10 @@ private ContainerCommandResponseProto handlePutKey(
|
||||
}
|
||||
Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getPutKey().getPipeline());
|
||||
Preconditions.checkNotNull(pipeline);
|
||||
if (!this.containerManager.isOpen(pipeline.getContainerName())) {
|
||||
throw new StorageContainerException("Write to closed container.",
|
||||
CLOSED_CONTAINER_IO);
|
||||
}
|
||||
KeyData keyData = KeyData.getFromProtoBuf(msg.getPutKey().getKeyData());
|
||||
Preconditions.checkNotNull(keyData);
|
||||
this.containerManager.getKeyManager().putKey(pipeline, keyData);
|
||||
@ -508,10 +559,13 @@ private ContainerCommandResponseProto handleDeleteKey(
|
||||
msg.getTraceID());
|
||||
return ContainerUtils.malformedRequest(msg);
|
||||
}
|
||||
|
||||
Pipeline pipeline =
|
||||
Pipeline.getFromProtoBuf(msg.getDeleteKey().getPipeline());
|
||||
Preconditions.checkNotNull(pipeline);
|
||||
if (!this.containerManager.isOpen(pipeline.getContainerName())) {
|
||||
throw new StorageContainerException("Write to closed container.",
|
||||
CLOSED_CONTAINER_IO);
|
||||
}
|
||||
String keyName = msg.getDeleteKey().getName();
|
||||
Preconditions.checkNotNull(keyName);
|
||||
Preconditions.checkState(!keyName.isEmpty());
|
||||
@ -541,6 +595,10 @@ private ContainerCommandResponseProto handlePutSmallFile(
|
||||
.getKey().getPipeline());
|
||||
|
||||
Preconditions.checkNotNull(pipeline);
|
||||
if (!this.containerManager.isOpen(pipeline.getContainerName())) {
|
||||
throw new StorageContainerException("Write to closed container.",
|
||||
CLOSED_CONTAINER_IO);
|
||||
}
|
||||
KeyData keyData = KeyData.getFromProtoBuf(msg.getPutSmallFile().getKey()
|
||||
.getKeyData());
|
||||
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getPutSmallFile()
|
||||
|
@ -30,6 +30,7 @@
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -92,6 +93,24 @@ void listContainer(String prefix, long count, String prevKey,
|
||||
ContainerData readContainer(String containerName)
|
||||
throws StorageContainerException;
|
||||
|
||||
/**
|
||||
* Closes a open container, if it is already closed or does not exist a
|
||||
* StorageContainerException is thrown.
|
||||
* @param containerName - Name of the container.
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
void closeContainer(String containerName)
|
||||
throws StorageContainerException, NoSuchAlgorithmException;
|
||||
|
||||
/**
|
||||
* Checks if a container exists.
|
||||
* @param containerName - Name of the container.
|
||||
* @return true if the container is open false otherwise.
|
||||
* @throws StorageContainerException - Throws Exception if we are not
|
||||
* able to find the container.
|
||||
*/
|
||||
boolean isOpen(String containerName) throws StorageContainerException;
|
||||
|
||||
/**
|
||||
* Supports clean shutdown of container.
|
||||
*
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.ozone.container;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.ByteString;
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
@ -394,4 +395,21 @@ public static ContainerCommandRequestProto getDeleteKeyRequest(
|
||||
return request.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a close container request.
|
||||
* @param pipeline - pipeline
|
||||
* @return ContainerCommandRequestProto.
|
||||
*/
|
||||
public static ContainerCommandRequestProto getCloseContainer(
|
||||
Pipeline pipeline) {
|
||||
Preconditions.checkNotNull(pipeline);
|
||||
ContainerProtos.CloseContainerRequestProto closeReqeuest =
|
||||
ContainerProtos.CloseContainerRequestProto.newBuilder().setPipeline(
|
||||
pipeline.getProtobufMessage()).build();
|
||||
ContainerProtos.ContainerCommandRequestProto cmd =
|
||||
ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos
|
||||
.Type.CloseContainer).setCloseContainer(closeReqeuest).build();
|
||||
return cmd;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -56,7 +56,7 @@ public void testCreateOzoneContainer() throws Exception {
|
||||
OzoneContainer container = null;
|
||||
MiniOzoneCluster cluster = null;
|
||||
try {
|
||||
cluster = new MiniOzoneCluster.Builder(conf)
|
||||
cluster = new MiniOzoneCluster.Builder(conf)
|
||||
.setHandlerType("distributed").build();
|
||||
// We don't start Ozone Container via data node, we will do it
|
||||
// independently in our test path.
|
||||
@ -79,7 +79,7 @@ public void testCreateOzoneContainer() throws Exception {
|
||||
if (container != null) {
|
||||
container.stop();
|
||||
}
|
||||
if(cluster != null) {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
@ -250,4 +250,119 @@ public void testBothGetandPutSmallFile() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
private void testCloseContainer() throws Exception {
|
||||
MiniOzoneCluster cluster = null;
|
||||
XceiverClient client = null;
|
||||
try {
|
||||
|
||||
String keyName = OzoneUtils.getRequestID();
|
||||
String containerName = OzoneUtils.getRequestID();
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
URL p = conf.getClass().getResource("");
|
||||
String path = p.getPath().concat(
|
||||
TestOzoneContainer.class.getSimpleName());
|
||||
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
|
||||
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
|
||||
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
|
||||
|
||||
// Start ozone container Via Datanode create.
|
||||
|
||||
Pipeline pipeline =
|
||||
ContainerTestHelper.createSingleNodePipeline(containerName);
|
||||
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
|
||||
pipeline.getLeader().getContainerPort());
|
||||
|
||||
cluster = new MiniOzoneCluster.Builder(conf)
|
||||
.setHandlerType("distributed").build();
|
||||
|
||||
// This client talks to ozone container via datanode.
|
||||
client = new XceiverClient(pipeline, conf);
|
||||
client.connect();
|
||||
|
||||
|
||||
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
|
||||
ContainerTestHelper.getWriteChunkRequest(pipeline, containerName,
|
||||
keyName, 1024);
|
||||
|
||||
ContainerProtos.ContainerCommandRequestProto request;
|
||||
ContainerProtos.ContainerCommandResponseProto response;
|
||||
|
||||
ContainerProtos.ContainerCommandRequestProto putKeyRequest =
|
||||
ContainerTestHelper.getPutKeyRequest(writeChunkRequest
|
||||
.getWriteChunk());
|
||||
|
||||
// Write Chunk before closing
|
||||
response = client.sendCommand(writeChunkRequest);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
|
||||
response.getResult());
|
||||
Assert.assertTrue(writeChunkRequest.getTraceID().equals(response
|
||||
.getTraceID()));
|
||||
|
||||
|
||||
// Put key before closing.
|
||||
response = client.sendCommand(putKeyRequest);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
|
||||
response.getResult());
|
||||
Assert.assertTrue(
|
||||
putKeyRequest.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
// Close the contianer.
|
||||
request = ContainerTestHelper.getCloseContainer(pipeline);
|
||||
response = client.sendCommand(request);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
|
||||
// Assert that none of the write operations are working after close.
|
||||
|
||||
// Write chunks should fail now.
|
||||
|
||||
response = client.sendCommand(writeChunkRequest);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO,
|
||||
response.getResult());
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
// Read chunk must work on a closed container.
|
||||
request = ContainerTestHelper.getReadChunkRequest(writeChunkRequest
|
||||
.getWriteChunk());
|
||||
response = client.sendCommand(request);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
|
||||
// Put key will fail on a closed container.
|
||||
response = client.sendCommand(putKeyRequest);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO,
|
||||
response.getResult());
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
// Get key must work on the closed container.
|
||||
request = ContainerTestHelper.getKeyRequest(putKeyRequest.getPutKey());
|
||||
response = client.sendCommand(request);
|
||||
ContainerTestHelper.verifyGetKey(request, response);
|
||||
|
||||
// Delete Key must fail on a closed container.
|
||||
request =
|
||||
ContainerTestHelper.getDeleteKeyRequest(putKeyRequest.getPutKey());
|
||||
response = client.sendCommand(request);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO,
|
||||
response.getResult());
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
} finally {
|
||||
if (client != null) {
|
||||
client.close();
|
||||
}
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user