HDDS-1561: Mark OPEN containers as QUASI_CLOSED as part of Ratis groupRemove (#1401)

This commit is contained in:
Lokesh Jain 2019-09-06 13:15:49 +05:30 committed by Nanda kumar
parent 494d75eb2b
commit 6e4cdf89ef
13 changed files with 189 additions and 70 deletions

View File

@ -284,6 +284,14 @@ public synchronized boolean isQuasiClosed() {
return ContainerDataProto.State.QUASI_CLOSED == state;
}
/**
* checks if the container is unhealthy.
* @return - boolean
*/
public synchronized boolean isUnhealthy() {
return ContainerDataProto.State.UNHEALTHY == state;
}
/**
* Marks this container as quasi closed.
*/

View File

@ -86,37 +86,38 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
return;
}
if (container.getContainerState() ==
ContainerProtos.ContainerDataProto.State.CLOSED) {
// Closing a container is an idempotent operation.
return;
}
// Move the container to CLOSING state
// move the container to CLOSING if in OPEN state
controller.markContainerForClose(containerId);
switch (container.getContainerState()) {
case OPEN:
case CLOSING:
// If the container is part of open pipeline, close it via write channel
if (ozoneContainer.getWriteChannel()
.isExist(closeCommand.getPipelineID())) {
if (closeCommand.getForce()) {
LOG.warn("Cannot force close a container when the container is" +
" part of an active pipeline.");
return;
}
ContainerCommandRequestProto request =
getContainerCommandRequestProto(datanodeDetails,
closeCommand.getContainerID());
ozoneContainer.getWriteChannel().submitRequest(
request, closeCommand.getPipelineID());
return;
}
// If we reach here, there is no active pipeline for this container.
if (!closeCommand.getForce()) {
// QUASI_CLOSE the container.
controller.quasiCloseContainer(containerId);
ozoneContainer.getWriteChannel()
.submitRequest(request, closeCommand.getPipelineID());
} else {
// SCM told us to force close the container.
// Container should not exist in CLOSING state without a pipeline
controller.markContainerUnhealthy(containerId);
}
break;
case QUASI_CLOSED:
if (closeCommand.getForce()) {
controller.closeContainer(containerId);
break;
}
case CLOSED:
break;
case UNHEALTHY:
case INVALID:
LOG.debug("Cannot close the container #{}, the container is"
+ " in {} state.", containerId, container.getContainerState());
default:
break;
}
} catch (NotLeaderException e) {
LOG.debug("Follower cannot close container #{}.", containerId);

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.util.Time;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftGroupId;
@ -138,6 +139,7 @@ public class ContainerStateMachine extends BaseStateMachine {
new SimpleStateMachineStorage();
private final RaftGroupId gid;
private final ContainerDispatcher dispatcher;
private final ContainerController containerController;
private ThreadPoolExecutor chunkExecutor;
private final XceiverServerRatis ratisServer;
private final ConcurrentHashMap<Long,
@ -160,11 +162,13 @@ public class ContainerStateMachine extends BaseStateMachine {
@SuppressWarnings("parameternumber")
public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer,
long expiryInterval, boolean isBlockTokenEnabled,
TokenVerifier tokenVerifier, Configuration conf) {
ContainerController containerController, ThreadPoolExecutor chunkExecutor,
XceiverServerRatis ratisServer, long expiryInterval,
boolean isBlockTokenEnabled, TokenVerifier tokenVerifier,
Configuration conf) {
this.gid = gid;
this.dispatcher = dispatcher;
this.containerController = containerController;
this.chunkExecutor = chunkExecutor;
this.ratisServer = ratisServer;
metrics = CSMMetrics.create(gid);
@ -215,6 +219,7 @@ public void initialize(
throws IOException {
super.initialize(server, id, raftStorage);
storage.init(raftStorage);
ratisServer.notifyGroupAdd(gid);
loadSnapshot(storage.getLatestSnapshot());
}
@ -800,6 +805,21 @@ public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
return future;
}
@Override
public void notifyGroupRemove() {
ratisServer.notifyGroupRemove(gid);
// Make best effort to quasi-close all the containers on group removal.
// Containers already in terminal state like CLOSED or UNHEALTHY will not
// be affected.
for (Long cid : createContainerSet) {
try {
containerController.markContainerForClose(cid);
containerController.quasiCloseContainer(cid);
} catch (IOException e) {
}
}
}
@Override
public void close() throws IOException {
evictStateMachineCache();

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import io.opentracing.Scope;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.ratis.RaftConfigKeys;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.ratis.conf.RaftProperties;
@ -63,9 +64,11 @@
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
@ -90,6 +93,7 @@ private static long nextCallId() {
private final RaftServer server;
private ThreadPoolExecutor chunkExecutor;
private final ContainerDispatcher dispatcher;
private final ContainerController containerController;
private ClientId clientId = ClientId.randomId();
private final StateContext context;
private final ReplicationLevel replicationLevel;
@ -98,10 +102,15 @@ private static long nextCallId() {
private boolean isStarted = false;
private DatanodeDetails datanodeDetails;
private final Configuration conf;
// TODO: Remove the gids set when Ratis supports an api to query active
// pipelines
private final Set<RaftGroupId> raftGids = new HashSet<>();
@SuppressWarnings("parameternumber")
private XceiverServerRatis(DatanodeDetails dd, int port,
ContainerDispatcher dispatcher, Configuration conf, StateContext
context, GrpcTlsConfig tlsConfig, CertificateClient caClient)
ContainerDispatcher dispatcher, ContainerController containerController,
StateContext context, GrpcTlsConfig tlsConfig, CertificateClient caClient,
Configuration conf)
throws IOException {
super(conf, caClient);
this.conf = conf;
@ -127,6 +136,7 @@ private XceiverServerRatis(DatanodeDetails dd, int port,
DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
this.dispatcher = dispatcher;
this.containerController = containerController;
RaftServer.Builder builder =
RaftServer.newBuilder().setServerId(RatisHelper.toRaftPeerId(dd))
@ -139,9 +149,10 @@ private XceiverServerRatis(DatanodeDetails dd, int port,
}
private ContainerStateMachine getStateMachine(RaftGroupId gid) {
return new ContainerStateMachine(gid, dispatcher, chunkExecutor, this,
cacheEntryExpiryInteval, getSecurityConfig().isBlockTokenEnabled(),
getBlockTokenVerifier(), conf);
return new ContainerStateMachine(gid, dispatcher, containerController,
chunkExecutor, this, cacheEntryExpiryInteval,
getSecurityConfig().isBlockTokenEnabled(), getBlockTokenVerifier(),
conf);
}
private RaftProperties newRaftProperties() {
@ -258,7 +269,7 @@ private void setNodeFailureTimeout(RaftProperties properties) {
.getDuration(), timeUnit);
final TimeDuration nodeFailureTimeout =
TimeDuration.valueOf(duration, timeUnit);
RaftServerConfigKeys.setLeaderElectionTimeout(properties,
RaftServerConfigKeys.Notification.setNoLeaderTimeout(properties,
nodeFailureTimeout);
RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
nodeFailureTimeout);
@ -367,8 +378,8 @@ private RpcType setRpcType(RaftProperties properties) {
public static XceiverServerRatis newXceiverServerRatis(
DatanodeDetails datanodeDetails, Configuration ozoneConf,
ContainerDispatcher dispatcher, StateContext context,
CertificateClient caClient) throws IOException {
ContainerDispatcher dispatcher, ContainerController containerController,
CertificateClient caClient, StateContext context) throws IOException {
int localPort = ozoneConf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT);
@ -383,8 +394,8 @@ public static XceiverServerRatis newXceiverServerRatis(
GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfig(
new SecurityConfig(ozoneConf));
return new XceiverServerRatis(datanodeDetails, localPort,
dispatcher, ozoneConf, context, tlsConfig, caClient);
return new XceiverServerRatis(datanodeDetails, localPort, dispatcher,
containerController, context, tlsConfig, caClient, ozoneConf);
}
@Override
@ -561,13 +572,8 @@ private void triggerPipelineClose(RaftGroupId groupId, String detail,
@Override
public boolean isExist(HddsProtos.PipelineID pipelineId) {
for (RaftGroupId groupId : server.getGroupIds()) {
if (PipelineID.valueOf(groupId.getUuid()).getProtobuf()
.equals(pipelineId)) {
return true;
}
}
return false;
return raftGids.contains(
RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineId).getId()));
}
@Override
@ -658,4 +664,12 @@ public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException {
minIndex = RatisHelper.getMinReplicatedIndex(reply.getCommitInfos());
return minIndex == null ? -1 : minIndex.longValue();
}
void notifyGroupRemove(RaftGroupId gid) {
raftGids.remove(gid);
}
void notifyGroupAdd(RaftGroupId gid) {
raftGids.add(gid);
}
}

View File

@ -113,7 +113,8 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
*/
this.controller = new ContainerController(containerSet, handlers);
this.writeChannel = XceiverServerRatis.newXceiverServerRatis(
datanodeDetails, config, hddsDispatcher, context, certClient);
datanodeDetails, config, hddsDispatcher, controller, certClient,
context);
this.readChannel = new XceiverServerGrpc(
datanodeDetails, config, hddsDispatcher, certClient,
createReplicationService());

View File

@ -119,8 +119,10 @@ public void closeContainerWithoutPipeline() throws IOException {
.markContainerForClose(container);
verify(writeChannel, never())
.submitRequest(any(), any());
// Container in CLOSING state is moved to UNHEALTHY if pipeline does not
// exist. Container should not exist in CLOSING state without a pipeline.
verify(containerHandler)
.quasiCloseContainer(container);
.markContainerUnhealthy(container);
}
@Test
@ -144,8 +146,10 @@ public void forceCloseOpenContainer() throws Exception {
verify(writeChannel, never())
.submitRequest(any(), any());
// Container in CLOSING state is moved to UNHEALTHY if pipeline does not
// exist. Container should not exist in CLOSING state without a pipeline.
verify(containerHandler)
.closeContainer(container);
.markContainerUnhealthy(container);
}
@Test
@ -155,7 +159,7 @@ public void forceCloseOpenContainerWithPipeline() throws Exception {
verify(containerHandler)
.markContainerForClose(container);
verify(writeChannel, never())
verify(writeChannel)
.submitRequest(any(), any());
verify(containerHandler, never())
.quasiCloseContainer(container);

View File

@ -48,7 +48,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<hdds.version>0.5.0-SNAPSHOT</hdds.version>
<!-- Apache Ratis version -->
<ratis.version>0.4.0-2337318-SNAPSHOT</ratis.version>
<ratis.version>0.4.0-78e95b9-SNAPSHOT</ratis.version>
<bouncycastle.version>1.60</bouncycastle.version>

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
@ -26,7 +26,6 @@
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.ObjectStore;
@ -97,10 +96,11 @@ public static void shutdown() {
@Test
public void testIfCloseContainerCommandHandlerIsInvoked() throws Exception {
String keyName = "testIfCloseContainerCommandHandlerIsInvoked";
OzoneOutputStream key = objectStore.getVolume("test").getBucket("test")
.createKey("standalone", 1024, ReplicationType.RATIS,
ReplicationFactor.ONE, new HashMap<>());
key.write("standalone".getBytes());
.createKey(keyName, 1024, ReplicationType.RATIS, ReplicationFactor.ONE,
new HashMap<>());
key.write(keyName.getBytes());
key.close();
//get the name of a valid container
@ -108,7 +108,7 @@ public void testIfCloseContainerCommandHandlerIsInvoked() throws Exception {
new OmKeyArgs.Builder().setVolumeName("test").setBucketName("test")
.setType(HddsProtos.ReplicationType.RATIS)
.setFactor(HddsProtos.ReplicationFactor.ONE).setDataSize(1024)
.setKeyName("standalone").setRefreshPipeline(true).build();
.setKeyName(keyName).setRefreshPipeline(true).build();
OmKeyLocationInfo omKeyLocationInfo =
cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions()
.get(0).getBlocksLatestVersionOnly().get(0);
@ -186,16 +186,21 @@ public void testCloseContainerViaStandAlone()
// the container will not be closed via RATIS
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(datanodeDetails.getUuid(),
new CloseContainerCommand(containerID, PipelineID.randomId()));
new CloseContainerCommand(containerID, pipeline.getId()));
//double check if it's really closed (waitFor also throws an exception)
// TODO: change the below line after implementing QUASI_CLOSED to CLOSED
// logic. The container will be QUASI closed as of now
GenericTestUtils
.waitFor(() -> isContainerQuasiClosed(
cluster, containerID, datanodeDetails), 500, 5 * 1000);
Assert.assertTrue(
isContainerQuasiClosed(cluster, containerID, datanodeDetails));
.waitFor(() -> isContainerClosed(cluster, containerID, datanodeDetails),
500, 5 * 1000);
Assert.assertTrue(isContainerClosed(cluster, containerID, datanodeDetails));
cluster.getStorageContainerManager().getPipelineManager()
.finalizeAndDestroyPipeline(pipeline, false);
Thread.sleep(5000);
// Pipeline close should not affect a container in CLOSED state
Assert.assertTrue(isContainerClosed(cluster, containerID, datanodeDetails));
}
@Test
@ -258,6 +263,65 @@ public void testCloseContainerViaRatis() throws IOException,
}
}
@Test
public void testQuasiCloseTransitionViaRatis()
throws IOException, TimeoutException, InterruptedException {
String keyName = "testQuasiCloseTransitionViaRatis";
OzoneOutputStream key = objectStore.getVolume("test").getBucket("test")
.createKey(keyName, 1024, ReplicationType.RATIS,
ReplicationFactor.ONE, new HashMap<>());
key.write(keyName.getBytes());
key.close();
OmKeyArgs keyArgs =
new OmKeyArgs.Builder().setVolumeName("test").setBucketName("test")
.setType(HddsProtos.ReplicationType.RATIS)
.setFactor(HddsProtos.ReplicationFactor.ONE).setDataSize(1024)
.setKeyName(keyName)
.setRefreshPipeline(true)
.build();
OmKeyLocationInfo omKeyLocationInfo =
cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions()
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = omKeyLocationInfo.getContainerID();
ContainerInfo container = cluster.getStorageContainerManager()
.getContainerManager().getContainer(ContainerID.valueof(containerID));
Pipeline pipeline = cluster.getStorageContainerManager()
.getPipelineManager().getPipeline(container.getPipelineID());
List<DatanodeDetails> datanodes = pipeline.getNodes();
Assert.assertEquals(datanodes.size(), 1);
DatanodeDetails datanodeDetails = datanodes.get(0);
Assert
.assertFalse(isContainerClosed(cluster, containerID, datanodeDetails));
// close the pipeline
cluster.getStorageContainerManager()
.getPipelineManager().finalizeAndDestroyPipeline(pipeline, false);
// All the containers in OPEN or CLOSING state should transition to
// QUASI-CLOSED after pipeline close
GenericTestUtils.waitFor(
() -> isContainerQuasiClosed(cluster, containerID, datanodeDetails),
500, 5 * 1000);
Assert.assertTrue(
isContainerQuasiClosed(cluster, containerID, datanodeDetails));
// Send close container command from SCM to datanode with forced flag as
// true
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(datanodeDetails.getUuid(),
new CloseContainerCommand(containerID, pipeline.getId(), true));
GenericTestUtils
.waitFor(() -> isContainerClosed(
cluster, containerID, datanodeDetails), 500, 5 * 1000);
Assert.assertTrue(
isContainerClosed(cluster, containerID, datanodeDetails));
}
private Boolean isContainerClosed(MiniOzoneCluster ozoneCluster,
long containerID,
DatanodeDetails datanode) {

View File

@ -27,6 +27,7 @@
import java.util.List;
import java.util.ArrayList;
import com.google.common.collect.Maps;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@ -41,10 +42,12 @@
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.transport.server
.XceiverServerSpi;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@ -185,6 +188,7 @@ static XceiverServerRatis newXceiverServerRatis(
final ContainerDispatcher dispatcher = new TestContainerDispatcher();
return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher,
new ContainerController(new ContainerSet(), Maps.newHashMap()),
null, null);
}

View File

@ -141,8 +141,9 @@ static XceiverServerRatis newXceiverServerRatis(
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
final ContainerDispatcher dispatcher = new TestContainerDispatcher();
return XceiverServerRatis
.newXceiverServerRatis(dn, conf, dispatcher, null, caClient);
return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher,
new ContainerController(new ContainerSet(), Maps.newHashMap()),
caClient, null);
}
static void runTestClientServerRatis(RpcType rpc, int numNodes)

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.server;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
@ -149,8 +150,9 @@ static XceiverServerRatis newXceiverServerRatis(
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
final ContainerDispatcher dispatcher = new TestContainerDispatcher();
return XceiverServerRatis
.newXceiverServerRatis(dn, conf, dispatcher, null, caClient);
return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher,
new ContainerController(new ContainerSet(), Maps.newHashMap()),
caClient, null);
}
static void runTestClientServerRatis(RpcType rpc, int numNodes)

View File

@ -489,7 +489,7 @@ private RaftProperties newRaftProperties(Configuration conf) {
.getDuration(), nodeFailureTimeoutUnit);
final TimeDuration nodeFailureTimeout = TimeDuration.valueOf(
nodeFailureTimeoutDuration, nodeFailureTimeoutUnit);
RaftServerConfigKeys.setLeaderElectionTimeout(properties,
RaftServerConfigKeys.Notification.setNoLeaderTimeout(properties,
nodeFailureTimeout);
RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
nodeFailureTimeout);

View File

@ -29,7 +29,7 @@
<hadoop.version>3.2.0</hadoop.version>
<hdds.version>0.5.0-SNAPSHOT</hdds.version>
<ozone.version>0.5.0-SNAPSHOT</ozone.version>
<ratis.version>0.4.0-2337318-SNAPSHOT</ratis.version>
<ratis.version>0.4.0-78e95b9-SNAPSHOT</ratis.version>
<bouncycastle.version>1.60</bouncycastle.version>
<ozone.release>Crater Lake</ozone.release>
<declared.ozone.version>${ozone.version}</declared.ozone.version>