HDDS-853. Option to force close a container in Datanode.
Contributed by Nanda kumar.
This commit is contained in:
parent
892b33e054
commit
ebb9245366
hadoop-hdds/container-service/src
main
java/org/apache/hadoop/ozone
container/common/statemachine/commandhandler
protocol/commands
proto
test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler
@ -79,8 +79,6 @@ public class CloseContainerCommandHandler implements CommandHandler {
|
||||
final ContainerController controller = ozoneContainer.getController();
|
||||
final long containerId = closeCommand.getContainerID();
|
||||
try {
|
||||
// TODO: Closing of QUASI_CLOSED container.
|
||||
|
||||
final Container container = controller.getContainer(containerId);
|
||||
|
||||
if (container == null) {
|
||||
@ -95,6 +93,11 @@ public class CloseContainerCommandHandler implements CommandHandler {
|
||||
// If the container is part of open pipeline, close it via write channel
|
||||
if (ozoneContainer.getWriteChannel()
|
||||
.isExist(closeCommand.getPipelineID())) {
|
||||
if (closeCommand.getForce()) {
|
||||
LOG.warn("Cannot force close a container when the container is" +
|
||||
" part of an active pipeline.");
|
||||
return;
|
||||
}
|
||||
ContainerCommandRequestProto request =
|
||||
getContainerCommandRequestProto(datanodeDetails,
|
||||
closeCommand.getContainerID());
|
||||
@ -102,10 +105,14 @@ public class CloseContainerCommandHandler implements CommandHandler {
|
||||
request, closeCommand.getPipelineID());
|
||||
return;
|
||||
}
|
||||
|
||||
// The container is not part of any open pipeline.
|
||||
// QUASI_CLOSE the container using ContainerController.
|
||||
controller.quasiCloseContainer(containerId);
|
||||
// If we reach here, there is no active pipeline for this container.
|
||||
if (!closeCommand.getForce()) {
|
||||
// QUASI_CLOSE the container.
|
||||
controller.quasiCloseContainer(containerId);
|
||||
} else {
|
||||
// SCM told us to force close the container.
|
||||
controller.closeContainer(containerId);
|
||||
}
|
||||
} catch (NotLeaderException e) {
|
||||
LOG.debug("Follower cannot close container #{}.", containerId);
|
||||
} catch (IOException e) {
|
||||
|
@ -30,11 +30,18 @@ public class CloseContainerCommand
|
||||
extends SCMCommand<CloseContainerCommandProto> {
|
||||
|
||||
private final PipelineID pipelineID;
|
||||
private boolean force;
|
||||
|
||||
public CloseContainerCommand(final long containerID,
|
||||
final PipelineID pipelineID) {
|
||||
this(containerID, pipelineID, false);
|
||||
}
|
||||
|
||||
public CloseContainerCommand(final long containerID,
|
||||
final PipelineID pipelineID, boolean force) {
|
||||
super(containerID);
|
||||
this.pipelineID = pipelineID;
|
||||
this.force = force;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -62,6 +69,7 @@ public class CloseContainerCommand
|
||||
.setContainerID(getId())
|
||||
.setCmdId(getId())
|
||||
.setPipelineID(pipelineID.getProtobuf())
|
||||
.setForce(force)
|
||||
.build();
|
||||
}
|
||||
|
||||
@ -69,7 +77,8 @@ public class CloseContainerCommand
|
||||
CloseContainerCommandProto closeContainerProto) {
|
||||
Preconditions.checkNotNull(closeContainerProto);
|
||||
return new CloseContainerCommand(closeContainerProto.getCmdId(),
|
||||
PipelineID.getFromProtobuf(closeContainerProto.getPipelineID()));
|
||||
PipelineID.getFromProtobuf(closeContainerProto.getPipelineID()),
|
||||
closeContainerProto.getForce());
|
||||
}
|
||||
|
||||
public long getContainerID() {
|
||||
|
@ -293,6 +293,8 @@ message CloseContainerCommandProto {
|
||||
required PipelineID pipelineID = 2;
|
||||
// cmdId will be removed
|
||||
required int64 cmdId = 3;
|
||||
// Force will be used when closing a container out side of ratis.
|
||||
optional bool force = 4 [default = false];
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -17,6 +17,7 @@
|
||||
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
@ -24,7 +25,9 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine
|
||||
.DatanodeStateMachine;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
|
||||
@ -44,6 +47,7 @@ import org.mockito.Mockito;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
@ -52,8 +56,156 @@ import java.util.UUID;
|
||||
public class TestCloseContainerCommandHandler {
|
||||
|
||||
private final StateContext context = Mockito.mock(StateContext.class);
|
||||
private final Random random = new Random();
|
||||
private static File testDir;
|
||||
|
||||
@Test
|
||||
public void testCloseContainerViaRatis()
|
||||
throws Exception {
|
||||
final OzoneConfiguration conf = new OzoneConfiguration();
|
||||
final DatanodeDetails datanodeDetails = randomDatanodeDetails();
|
||||
final OzoneContainer ozoneContainer =
|
||||
getOzoneContainer(conf, datanodeDetails);
|
||||
ozoneContainer.start();
|
||||
try {
|
||||
final Container container =
|
||||
createContainer(conf, datanodeDetails, ozoneContainer);
|
||||
final long containerId = container.getContainerData().getContainerID();
|
||||
final PipelineID pipelineId = PipelineID.valueOf(UUID.fromString(
|
||||
container.getContainerData().getOriginPipelineId()));
|
||||
|
||||
// We have created a container via ratis.
|
||||
// Now close the container on ratis.
|
||||
final CloseContainerCommandHandler closeHandler =
|
||||
new CloseContainerCommandHandler();
|
||||
final CloseContainerCommand command = new CloseContainerCommand(
|
||||
containerId, pipelineId);
|
||||
|
||||
closeHandler.handle(command, ozoneContainer, context, null);
|
||||
|
||||
Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
|
||||
ozoneContainer.getContainerSet().getContainer(containerId)
|
||||
.getContainerState());
|
||||
|
||||
Mockito.verify(context.getParent(),
|
||||
Mockito.times(2)).triggerHeartbeat();
|
||||
} finally {
|
||||
ozoneContainer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseContainerViaStandalone()
|
||||
throws Exception {
|
||||
final OzoneConfiguration conf = new OzoneConfiguration();
|
||||
final DatanodeDetails datanodeDetails = randomDatanodeDetails();
|
||||
final OzoneContainer ozoneContainer =
|
||||
getOzoneContainer(conf, datanodeDetails);
|
||||
ozoneContainer.start();
|
||||
try {
|
||||
final Container container =
|
||||
createContainer(conf, datanodeDetails, ozoneContainer);
|
||||
final long containerId = container.getContainerData().getContainerID();
|
||||
// To quasi close specify a pipeline which doesn't exist in the datanode.
|
||||
final PipelineID pipelineId = PipelineID.randomId();
|
||||
|
||||
// We have created a container via ratis. Now quasi close it.
|
||||
final CloseContainerCommandHandler closeHandler =
|
||||
new CloseContainerCommandHandler();
|
||||
final CloseContainerCommand command = new CloseContainerCommand(
|
||||
containerId, pipelineId);
|
||||
|
||||
closeHandler.handle(command, ozoneContainer, context, null);
|
||||
|
||||
Assert.assertEquals(ContainerProtos.ContainerDataProto.State.QUASI_CLOSED,
|
||||
ozoneContainer.getContainerSet().getContainer(containerId)
|
||||
.getContainerState());
|
||||
|
||||
Mockito.verify(context.getParent(),
|
||||
Mockito.times(2)).triggerHeartbeat();
|
||||
} finally {
|
||||
ozoneContainer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQuasiCloseToClose() throws Exception {
|
||||
final OzoneConfiguration conf = new OzoneConfiguration();
|
||||
final DatanodeDetails datanodeDetails = randomDatanodeDetails();
|
||||
final OzoneContainer ozoneContainer =
|
||||
getOzoneContainer(conf, datanodeDetails);
|
||||
ozoneContainer.start();
|
||||
try {
|
||||
final Container container =
|
||||
createContainer(conf, datanodeDetails, ozoneContainer);
|
||||
final long containerId = container.getContainerData().getContainerID();
|
||||
// A pipeline which doesn't exist in the datanode.
|
||||
final PipelineID pipelineId = PipelineID.randomId();
|
||||
|
||||
// We have created a container via ratis. Now quasi close it.
|
||||
final CloseContainerCommandHandler closeHandler =
|
||||
new CloseContainerCommandHandler();
|
||||
final CloseContainerCommand command = new CloseContainerCommand(
|
||||
containerId, pipelineId);
|
||||
|
||||
closeHandler.handle(command, ozoneContainer, context, null);
|
||||
|
||||
Assert.assertEquals(ContainerProtos.ContainerDataProto.State.QUASI_CLOSED,
|
||||
ozoneContainer.getContainerSet().getContainer(containerId)
|
||||
.getContainerState());
|
||||
|
||||
Mockito.verify(context.getParent(),
|
||||
Mockito.times(2)).triggerHeartbeat();
|
||||
|
||||
// The container is quasi closed. Force close the container now.
|
||||
final CloseContainerCommand closeCommand = new CloseContainerCommand(
|
||||
containerId, pipelineId, true);
|
||||
|
||||
closeHandler.handle(closeCommand, ozoneContainer, context, null);
|
||||
|
||||
Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
|
||||
ozoneContainer.getContainerSet().getContainer(containerId)
|
||||
.getContainerState());
|
||||
|
||||
Mockito.verify(context.getParent(),
|
||||
Mockito.times(3)).triggerHeartbeat();
|
||||
} finally {
|
||||
ozoneContainer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testForceCloseOpenContainer() throws Exception {
|
||||
final OzoneConfiguration conf = new OzoneConfiguration();
|
||||
final DatanodeDetails datanodeDetails = randomDatanodeDetails();
|
||||
final OzoneContainer ozoneContainer =
|
||||
getOzoneContainer(conf, datanodeDetails);
|
||||
ozoneContainer.start();
|
||||
try {
|
||||
final Container container =
|
||||
createContainer(conf, datanodeDetails, ozoneContainer);
|
||||
final long containerId = container.getContainerData().getContainerID();
|
||||
// A pipeline which doesn't exist in the datanode.
|
||||
final PipelineID pipelineId = PipelineID.randomId();
|
||||
|
||||
final CloseContainerCommandHandler closeHandler =
|
||||
new CloseContainerCommandHandler();
|
||||
|
||||
final CloseContainerCommand closeCommand = new CloseContainerCommand(
|
||||
containerId, pipelineId, true);
|
||||
|
||||
closeHandler.handle(closeCommand, ozoneContainer, context, null);
|
||||
|
||||
Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
|
||||
ozoneContainer.getContainerSet().getContainer(containerId)
|
||||
.getContainerState());
|
||||
|
||||
Mockito.verify(context.getParent(),
|
||||
Mockito.times(2)).triggerHeartbeat();
|
||||
} finally {
|
||||
ozoneContainer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private OzoneContainer getOzoneContainer(final OzoneConfiguration conf,
|
||||
final DatanodeDetails datanodeDetails) throws IOException {
|
||||
@ -67,19 +219,15 @@ public class TestCloseContainerCommandHandler {
|
||||
Mockito.when(datanodeStateMachine.getDatanodeDetails())
|
||||
.thenReturn(datanodeDetails);
|
||||
Mockito.when(context.getParent()).thenReturn(datanodeStateMachine);
|
||||
return new OzoneContainer(datanodeDetails, conf, context);
|
||||
final OzoneContainer ozoneContainer = new OzoneContainer(
|
||||
datanodeDetails, conf, context);
|
||||
ozoneContainer.getDispatcher().setScmId(UUID.randomUUID().toString());
|
||||
return ozoneContainer;
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testCloseContainerViaRatis()
|
||||
throws IOException, InterruptedException {
|
||||
final OzoneConfiguration conf = new OzoneConfiguration();
|
||||
final DatanodeDetails datanodeDetails = randomDatanodeDetails();
|
||||
final OzoneContainer container = getOzoneContainer(conf, datanodeDetails);
|
||||
container.getDispatcher().setScmId(UUID.randomUUID().toString());
|
||||
container.start();
|
||||
// Give some time for ratis for leader election.
|
||||
private Container createContainer(final Configuration conf,
|
||||
final DatanodeDetails datanodeDetails,
|
||||
final OzoneContainer ozoneContainer) throws Exception {
|
||||
final PipelineID pipelineID = PipelineID.randomId();
|
||||
final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId());
|
||||
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
|
||||
@ -88,9 +236,10 @@ public class TestCloseContainerCommandHandler {
|
||||
Collections.singleton(datanodeDetails));
|
||||
final RaftClient client = RatisHelper.newRaftClient(
|
||||
SupportedRpcType.GRPC, peer, retryPolicy);
|
||||
System.out.println(client.groupAdd(group, peer.getId()).isSuccess());
|
||||
Assert.assertTrue(client.groupAdd(group, peer.getId()).isSuccess());
|
||||
Thread.sleep(2000);
|
||||
final ContainerID containerId = ContainerID.valueof(1);
|
||||
final ContainerID containerId = ContainerID.valueof(
|
||||
random.nextLong() & Long.MAX_VALUE);
|
||||
ContainerProtos.ContainerCommandRequestProto.Builder request =
|
||||
ContainerProtos.ContainerCommandRequestProto.newBuilder();
|
||||
request.setCmdType(ContainerProtos.Type.CreateContainer);
|
||||
@ -99,79 +248,14 @@ public class TestCloseContainerCommandHandler {
|
||||
ContainerProtos.CreateContainerRequestProto.getDefaultInstance());
|
||||
request.setTraceID(UUID.randomUUID().toString());
|
||||
request.setDatanodeUuid(datanodeDetails.getUuidString());
|
||||
container.getWriteChannel().submitRequest(
|
||||
ozoneContainer.getWriteChannel().submitRequest(
|
||||
request.build(), pipelineID.getProtobuf());
|
||||
|
||||
final Container container = ozoneContainer.getContainerSet().getContainer(
|
||||
containerId.getId());
|
||||
Assert.assertEquals(ContainerProtos.ContainerDataProto.State.OPEN,
|
||||
container.getContainerSet().getContainer(
|
||||
containerId.getId()).getContainerState());
|
||||
|
||||
// We have created a container via ratis. Now close the container on ratis.
|
||||
final CloseContainerCommandHandler closeHandler =
|
||||
new CloseContainerCommandHandler();
|
||||
final CloseContainerCommand command = new CloseContainerCommand(
|
||||
containerId.getId(), pipelineID);
|
||||
|
||||
closeHandler.handle(command, container, context, null);
|
||||
|
||||
Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
|
||||
container.getContainerSet().getContainer(
|
||||
containerId.getId()).getContainerState());
|
||||
|
||||
Mockito.verify(context.getParent(), Mockito.times(2)).triggerHeartbeat();
|
||||
container.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseContainerViaStandalone()
|
||||
throws IOException, InterruptedException {
|
||||
final OzoneConfiguration conf = new OzoneConfiguration();
|
||||
final DatanodeDetails datanodeDetails = randomDatanodeDetails();
|
||||
final OzoneContainer container = getOzoneContainer(conf, datanodeDetails);
|
||||
container.getDispatcher().setScmId(UUID.randomUUID().toString());
|
||||
container.start();
|
||||
// Give some time for ratis for leader election.
|
||||
final PipelineID pipelineID = PipelineID.randomId();
|
||||
final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId());
|
||||
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
|
||||
final RaftPeer peer = RatisHelper.toRaftPeer(datanodeDetails);
|
||||
final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId,
|
||||
Collections.singleton(datanodeDetails));
|
||||
final RaftClient client = RatisHelper.newRaftClient(
|
||||
SupportedRpcType.GRPC, peer, retryPolicy);
|
||||
System.out.println(client.groupAdd(group, peer.getId()).isSuccess());
|
||||
Thread.sleep(2000);
|
||||
final ContainerID containerId = ContainerID.valueof(2);
|
||||
ContainerProtos.ContainerCommandRequestProto.Builder request =
|
||||
ContainerProtos.ContainerCommandRequestProto.newBuilder();
|
||||
request.setCmdType(ContainerProtos.Type.CreateContainer);
|
||||
request.setContainerID(containerId.getId());
|
||||
request.setCreateContainer(
|
||||
ContainerProtos.CreateContainerRequestProto.getDefaultInstance());
|
||||
request.setTraceID(UUID.randomUUID().toString());
|
||||
request.setDatanodeUuid(datanodeDetails.getUuidString());
|
||||
container.getWriteChannel().submitRequest(
|
||||
request.build(), pipelineID.getProtobuf());
|
||||
|
||||
Assert.assertEquals(ContainerProtos.ContainerDataProto.State.OPEN,
|
||||
container.getContainerSet().getContainer(
|
||||
containerId.getId()).getContainerState());
|
||||
|
||||
// We have created a container via ratis. Now quasi close it
|
||||
final CloseContainerCommandHandler closeHandler =
|
||||
new CloseContainerCommandHandler();
|
||||
// Specify a pipeline which doesn't exist in the datanode.
|
||||
final CloseContainerCommand command = new CloseContainerCommand(
|
||||
containerId.getId(), PipelineID.randomId());
|
||||
|
||||
closeHandler.handle(command, container, context, null);
|
||||
|
||||
Assert.assertEquals(ContainerProtos.ContainerDataProto.State.QUASI_CLOSED,
|
||||
container.getContainerSet().getContainer(
|
||||
containerId.getId()).getContainerState());
|
||||
|
||||
Mockito.verify(context.getParent(), Mockito.times(2)).triggerHeartbeat();
|
||||
container.stop();
|
||||
container.getContainerState());
|
||||
return container;
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user