HDFS-11581. Ozone: Support force delete a container. Contributed by Yuanbo Liu.

This commit is contained in:
Anu Engineer 2017-04-17 17:58:54 -07:00 committed by Owen O'Malley
parent d3f93d55d9
commit 6c4bd1647c
7 changed files with 204 additions and 111 deletions

View File

@ -126,6 +126,7 @@ enum Result {
CLOSED_CONTAINER_IO = 22;
ERROR_CONTAINER_NOT_EMPTY = 23;
ERROR_IN_COMPACT_DB = 24;
UNCLOSED_CONTAINER_IO = 25;
}
message ContainerCommandRequestProto {
@ -244,6 +245,7 @@ message UpdateContainerResponseProto {
message DeleteContainerRequestProto {
required Pipeline pipeline = 1;
required string name = 2;
optional bool forceDelete = 3 [default = false];
}
message DeleteContainerResponseProto {

View File

@ -338,15 +338,19 @@ public static Path getDataDirectory(ContainerData cData)
* we created on the data location.
*
* @param containerData - Data of the container to remove.
* @param conf - configuration of the cluster.
* @param forceDelete - whether this container should be deleted forcibly.
* @throws IOException
*/
public static void removeContainer(ContainerData containerData,
Configuration conf) throws IOException {
Configuration conf, boolean forceDelete) throws IOException {
Preconditions.checkNotNull(containerData);
Path dbPath = Paths.get(containerData.getDBPath());
LevelDBStore db = KeyUtils.getDB(containerData, conf);
if(!db.isEmpty()) {
// If the container is not empty and cannot be deleted forcibly,
// then throw a SCE to stop deleting.
if(!forceDelete && !db.isEmpty()) {
throw new StorageContainerException(
"Container cannot be deleted because it is not empty.",
ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);

View File

@ -51,6 +51,7 @@
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.GET_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.NO_SUCH_ALGORITHM;
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.UNCLOSED_CONTAINER_IO;
/**
* Ozone Container dispatcher takes a call from the netty server and routes it
@ -362,11 +363,22 @@ private ContainerCommandResponseProto handleDeleteContainer(
}
String name = msg.getDeleteContainer().getName();
boolean forceDelete = msg.getDeleteContainer().getForceDelete();
Pipeline pipeline = Pipeline.getFromProtoBuf(
msg.getDeleteContainer().getPipeline());
Preconditions.checkNotNull(pipeline);
this.containerManager.deleteContainer(pipeline, name);
// If this cmd requires force deleting, then we should
// make sure the container is in the state of closed,
// otherwise, stop deleting container.
if (forceDelete) {
if (this.containerManager.isOpen(pipeline.getContainerName())) {
throw new StorageContainerException("Attempting to force delete "
+ "an open container.", UNCLOSED_CONTAINER_IO);
}
}
this.containerManager.deleteContainer(pipeline, name, forceDelete);
return ContainerUtils.getContainerResponse(msg);
}

View File

@ -64,10 +64,11 @@ void createContainer(Pipeline pipeline, ContainerData containerData)
*
* @param pipeline - nodes that make this container.
* @param containerName - name of the container.
* @param forceDelete - whether this container should be deleted forcibly.
* @throws StorageContainerException
*/
void deleteContainer(Pipeline pipeline, String containerName)
throws StorageContainerException;
void deleteContainer(Pipeline pipeline, String containerName,
boolean forceDelete) throws StorageContainerException;
/**
* Update an existing container.

View File

@ -508,4 +508,19 @@ public static ContainerCommandRequestProto getCloseContainer(
return cmd;
}
/**
* Returns a delete container request.
* @param pipeline - pipeline
* @return ContainerCommandRequestProto.
*/
public static ContainerCommandRequestProto getDeleteContainer(
Pipeline pipeline, boolean forceDelete) {
Preconditions.checkNotNull(pipeline);
ContainerProtos.DeleteContainerRequestProto deleteRequest =
ContainerProtos.DeleteContainerRequestProto.newBuilder().setName(
pipeline.getContainerName()).setPipeline(
pipeline.getProtobufMessage()).setForceDelete(forceDelete).build();
return ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos
.Type.DeleteContainer).setDeleteContainer(deleteRequest).build();
}
}

View File

@ -219,7 +219,7 @@ public void testDeleteContainer() throws Exception {
.containsKey(containerName2));
containerManager.deleteContainer(createSingleNodePipeline(containerName1),
containerName1);
containerName1, false);
Assert.assertFalse(containerManager.getContainerMap()
.containsKey(containerName1));
@ -251,7 +251,7 @@ public void testDeleteContainer() throws Exception {
"Container cannot be deleted because it is not empty.");
containerManager.deleteContainer(
createSingleNodePipeline(containerName1),
containerName1);
containerName1, false);
Assert.assertTrue(containerManager.getContainerMap()
.containsKey(containerName1));
}

View File

@ -124,7 +124,7 @@ private static void runTestOzoneContainerViaDataNodeRatis(
pipeline, conf);
try {
runTestOzoneContainerViaDataNode(containerName, pipeline, client);
runTestOzoneContainerViaDataNode(containerName, client);
} finally {
cluster.shutdown();
}
@ -170,7 +170,7 @@ public void testOzoneContainerViaDataNode() throws Exception {
// This client talks to ozone container via datanode.
XceiverClient client = new XceiverClient(pipeline, conf);
runTestOzoneContainerViaDataNode(containerName, pipeline, client);
runTestOzoneContainerViaDataNode(containerName, client);
} finally {
if (cluster != null) {
cluster.shutdown();
@ -178,30 +178,19 @@ public void testOzoneContainerViaDataNode() throws Exception {
}
}
static void runTestOzoneContainerViaDataNode(
String containerName, Pipeline pipeline, XceiverClientSpi client)
throws Exception {
static void runTestOzoneContainerViaDataNode(String containerName,
XceiverClientSpi client) throws Exception {
ContainerProtos.ContainerCommandRequestProto
request, writeChunkRequest, putKeyRequest,
updateRequest1, updateRequest2;
ContainerProtos.ContainerCommandResponseProto response,
updateResponse1, updateResponse2;
try {
client.connect();
// Create container
ContainerProtos.ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Write Chunk
final String keyName = OzoneUtils.getRequestID();
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
ContainerTestHelper.getWriteChunkRequest(pipeline, containerName,
keyName, 1024);
response = client.sendCommand(writeChunkRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
createContainerForTesting(client, containerName);
writeChunkRequest = writeChunkForContainer(client, containerName, 1024);
// Read Chunk
request = ContainerTestHelper.getReadChunkRequest(writeChunkRequest
@ -213,8 +202,7 @@ static void runTestOzoneContainerViaDataNode(
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Put Key
ContainerProtos.ContainerCommandRequestProto putKeyRequest =
ContainerTestHelper.getPutKeyRequest(writeChunkRequest
putKeyRequest = ContainerTestHelper.getPutKeyRequest(writeChunkRequest
.getWriteChunk());
@ -249,21 +237,17 @@ static void runTestOzoneContainerViaDataNode(
//Update an existing container
Map<String, String> containerUpdate = new HashMap<String, String>();
containerUpdate.put("container_updated_key", "container_updated_value");
ContainerProtos.ContainerCommandRequestProto updateRequest1 =
ContainerTestHelper.getUpdateContainerRequest(
updateRequest1 = ContainerTestHelper.getUpdateContainerRequest(
containerName, containerUpdate);
ContainerProtos.ContainerCommandResponseProto updateResponse1 =
client.sendCommand(updateRequest1);
updateResponse1 = client.sendCommand(updateRequest1);
Assert.assertNotNull(updateResponse1);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
//Update an non-existing container
ContainerProtos.ContainerCommandRequestProto updateRequest2 =
ContainerTestHelper.getUpdateContainerRequest(
updateRequest2 = ContainerTestHelper.getUpdateContainerRequest(
"non_exist_container", containerUpdate);
ContainerProtos.ContainerCommandResponseProto updateResponse2 =
client.sendCommand(updateRequest2);
updateResponse2 = client.sendCommand(updateRequest2);
Assert.assertEquals(ContainerProtos.Result.CONTAINER_NOT_FOUND,
updateResponse2.getResult());
} finally {
@ -277,52 +261,31 @@ static void runTestOzoneContainerViaDataNode(
public void testBothGetandPutSmallFile() throws Exception {
MiniOzoneCluster cluster = null;
XceiverClient client = null;
ContainerProtos.ContainerCommandResponseProto response;
ContainerProtos.ContainerCommandRequestProto
smallFileRequest, getSmallFileRequest;
try {
String keyName = OzoneUtils.getRequestID();
String containerName = OzoneUtils.getRequestID();
OzoneConfiguration conf = new OzoneConfiguration();
URL p = conf.getClass().getResource("");
String path = p.getPath().concat(
TestOzoneContainer.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
// Start ozone container Via Datanode create.
Pipeline pipeline =
ContainerTestHelper.createSingleNodePipeline(containerName);
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
client = createClientForTesting(conf);
cluster = new MiniOzoneCluster.Builder(conf)
.setRandomContainerPort(false)
.setHandlerType("distributed").build();
// This client talks to ozone container via datanode.
client = new XceiverClient(pipeline, conf);
client.connect();
// Create container
ContainerProtos.ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
ContainerProtos.ContainerCommandRequestProto smallFileRequest =
ContainerTestHelper.getWriteSmallFileRequest(pipeline, containerName,
keyName, 1024);
String containerName = client.getPipeline().getContainerName();
createContainerForTesting(client, containerName);
smallFileRequest = ContainerTestHelper.getWriteSmallFileRequest(
client.getPipeline(), containerName, keyName, 1024);
response = client.sendCommand(smallFileRequest);
Assert.assertNotNull(response);
Assert.assertTrue(smallFileRequest.getTraceID()
.equals(response.getTraceID()));
ContainerProtos.ContainerCommandRequestProto getSmallFileRequest =
getSmallFileRequest =
ContainerTestHelper.getReadSmallFileRequest(smallFileRequest
.getPutSmallFile().getKey());
response = client.sendCommand(getSmallFileRequest);
@ -343,58 +306,25 @@ public void testBothGetandPutSmallFile() throws Exception {
public void testCloseContainer() throws Exception {
MiniOzoneCluster cluster = null;
XceiverClient client = null;
ContainerProtos.ContainerCommandResponseProto response;
ContainerProtos.ContainerCommandRequestProto
writeChunkRequest, putKeyRequest, request;
try {
String keyName = OzoneUtils.getRequestID();
String containerName = OzoneUtils.getRequestID();
OzoneConfiguration conf = new OzoneConfiguration();
URL p = conf.getClass().getResource("");
String path = p.getPath().concat(
TestOzoneContainer.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
// Start ozone container Via Datanode create.
Pipeline pipeline =
ContainerTestHelper.createSingleNodePipeline(containerName);
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
client = createClientForTesting(conf);
cluster = new MiniOzoneCluster.Builder(conf)
.setRandomContainerPort(false)
.setHandlerType("distributed").build();
// This client talks to ozone container via datanode.
client = new XceiverClient(pipeline, conf);
client.connect();
String containerName = client.getPipeline().getContainerName();
createContainerForTesting(client, containerName);
writeChunkRequest = writeChunkForContainer(client, containerName, 1024);
// Create container
ContainerProtos.ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
ContainerTestHelper.getWriteChunkRequest(pipeline, containerName,
keyName, 1024);
// Write Chunk before closing
response = client.sendCommand(writeChunkRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
Assert.assertTrue(writeChunkRequest.getTraceID().equals(response
.getTraceID()));
ContainerProtos.ContainerCommandRequestProto putKeyRequest =
ContainerTestHelper.getPutKeyRequest(writeChunkRequest
putKeyRequest = ContainerTestHelper.getPutKeyRequest(writeChunkRequest
.getWriteChunk());
// Put key before closing.
response = client.sendCommand(putKeyRequest);
@ -405,7 +335,7 @@ public void testCloseContainer() throws Exception {
putKeyRequest.getTraceID().equals(response.getTraceID()));
// Close the contianer.
request = ContainerTestHelper.getCloseContainer(pipeline);
request = ContainerTestHelper.getCloseContainer(client.getPipeline());
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
@ -460,4 +390,133 @@ public void testCloseContainer() throws Exception {
}
}
}
@Test
public void testDeleteContainer() throws Exception {
MiniOzoneCluster cluster = null;
XceiverClient client = null;
ContainerProtos.ContainerCommandResponseProto response;
ContainerProtos.ContainerCommandRequestProto request,
writeChunkRequest, putKeyRequest;
try {
OzoneConfiguration conf = new OzoneConfiguration();
client = createClientForTesting(conf);
cluster = new MiniOzoneCluster.Builder(conf)
.setRandomContainerPort(false)
.setHandlerType("distributed").build();
client.connect();
String containerName = client.getPipeline().getContainerName();
createContainerForTesting(client, containerName);
writeChunkRequest = writeChunkForContainer(client, containerName, 1024);
putKeyRequest = ContainerTestHelper.getPutKeyRequest(writeChunkRequest
.getWriteChunk());
// Put key before deleting.
response = client.sendCommand(putKeyRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
Assert.assertTrue(
putKeyRequest.getTraceID().equals(response.getTraceID()));
// Container cannot be deleted because the container is not empty.
request = ContainerTestHelper.getDeleteContainer(
client.getPipeline(), false);
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY,
response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Container cannot be deleted forcibly because
// the container is not closed.
request = ContainerTestHelper.getDeleteContainer(
client.getPipeline(), true);
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.UNCLOSED_CONTAINER_IO,
response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Close the container.
request = ContainerTestHelper.getCloseContainer(client.getPipeline());
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Container can be deleted forcibly because
// it has been closed.
request = ContainerTestHelper.getDeleteContainer(
client.getPipeline(), true);
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
} finally {
if (client != null) {
client.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
private XceiverClient createClientForTesting(OzoneConfiguration conf)
throws Exception {
String containerName = OzoneUtils.getRequestID();
URL p = conf.getClass().getResource("");
String path = p.getPath().concat(
TestOzoneContainer.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
// Start ozone container Via Datanode create.
Pipeline pipeline =
ContainerTestHelper.createSingleNodePipeline(containerName);
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
// This client talks to ozone container via datanode.
return new XceiverClient(pipeline, conf);
}
private static void createContainerForTesting(XceiverClientSpi client,
String containerName) throws Exception {
// Create container
ContainerProtos.ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
}
private static ContainerProtos.ContainerCommandRequestProto
writeChunkForContainer(XceiverClientSpi client,
String containerName, int dataLen) throws Exception {
// Write Chunk
final String keyName = OzoneUtils.getRequestID();
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
ContainerTestHelper.getWriteChunkRequest(client.getPipeline(),
containerName, keyName, dataLen);
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(writeChunkRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(response.getTraceID().equals(response.getTraceID()));
return writeChunkRequest;
}
}