HDDS-579. ContainerStateMachine should fail subsequent transactions per container in case one fails. Contributed by Shashikant Banerjee.

This commit is contained in:
Jitendra Pandey 2018-10-13 19:15:01 -07:00
parent 5209c7503b
commit 603649d3a9
5 changed files with 242 additions and 31 deletions

View File

@ -139,6 +139,7 @@ enum Result {
CONTAINER_CHECKSUM_ERROR = 33; CONTAINER_CHECKSUM_ERROR = 33;
UNKNOWN_CONTAINER_TYPE = 34; UNKNOWN_CONTAINER_TYPE = 34;
BLOCK_NOT_COMMITTED = 35; BLOCK_NOT_COMMITTED = 35;
CONTAINER_UNHEALTHY = 36;
} }
/** /**
@ -161,7 +162,8 @@ enum ContainerLifeCycleState {
OPEN = 1; OPEN = 1;
CLOSING = 2; CLOSING = 2;
CLOSED = 3; CLOSED = 3;
INVALID = 4; UNHEALTHY = 4;
INVALID = 5;
} }
message ContainerCommandRequestProto { message ContainerCommandRequestProto {

View File

@ -142,6 +142,26 @@ public ContainerCommandResponseProto dispatch(
responseProto = handler.handle(msg, container); responseProto = handler.handle(msg, container);
if (responseProto != null) { if (responseProto != null) {
metrics.incContainerOpsLatencies(cmdType, System.nanoTime() - startTime); metrics.incContainerOpsLatencies(cmdType, System.nanoTime() - startTime);
// If the request is of Write Type and the container operation
// is unsuccessful, it implies the applyTransaction on the container
// failed. All subsequent transactions on the container should fail and
// hence replica will be marked unhealthy here. In this case, a close
// container action will be sent to SCM to close the container.
if (!HddsUtils.isReadOnly(msg)
&& responseProto.getResult() != ContainerProtos.Result.SUCCESS) {
// If the container is open and the container operation has failed,
// it should be first marked unhealthy and the initiate the close
// container action. This also implies this is the first transaction
// which has failed, so the container is marked unhealthy right here.
// Once container is marked unhealthy, all the subsequent write
// transactions will fail with UNHEALTHY_CONTAINER exception.
if (container.getContainerState() == ContainerLifeCycleState.OPEN) {
container.getContainerData()
.setState(ContainerLifeCycleState.UNHEALTHY);
sendCloseContainerActionIfNeeded(container);
}
}
return responseProto; return responseProto;
} else { } else {
return ContainerUtils.unsupportedRequest(msg); return ContainerUtils.unsupportedRequest(msg);
@ -149,31 +169,46 @@ public ContainerCommandResponseProto dispatch(
} }
/** /**
* If the container usage reaches the close threshold we send Close * If the container usage reaches the close threshold or the container is
* ContainerAction to SCM. * marked unhealthy we send Close ContainerAction to SCM.
*
* @param container current state of container * @param container current state of container
*/ */
private void sendCloseContainerActionIfNeeded(Container container) { private void sendCloseContainerActionIfNeeded(Container container) {
// We have to find a more efficient way to close a container. // We have to find a more efficient way to close a container.
Boolean isOpen = Optional.ofNullable(container) boolean isSpaceFull = isContainerFull(container);
boolean shouldClose = isSpaceFull || isContainerUnhealthy(container);
if (shouldClose) {
ContainerData containerData = container.getContainerData();
ContainerAction.Reason reason =
isSpaceFull ? ContainerAction.Reason.CONTAINER_FULL :
ContainerAction.Reason.CONTAINER_UNHEALTHY;
ContainerAction action = ContainerAction.newBuilder()
.setContainerID(containerData.getContainerID())
.setAction(ContainerAction.Action.CLOSE).setReason(reason).build();
context.addContainerActionIfAbsent(action);
}
}
private boolean isContainerFull(Container container) {
boolean isOpen = Optional.ofNullable(container)
.map(cont -> cont.getContainerState() == ContainerLifeCycleState.OPEN) .map(cont -> cont.getContainerState() == ContainerLifeCycleState.OPEN)
.orElse(Boolean.FALSE); .orElse(Boolean.FALSE);
if (isOpen) { if (isOpen) {
ContainerData containerData = container.getContainerData(); ContainerData containerData = container.getContainerData();
double containerUsedPercentage = 1.0f * containerData.getBytesUsed() / double containerUsedPercentage =
containerData.getMaxSize(); 1.0f * containerData.getBytesUsed() / containerData.getMaxSize();
if (containerUsedPercentage >= containerCloseThreshold) { return containerUsedPercentage >= containerCloseThreshold;
ContainerAction action = ContainerAction.newBuilder() } else {
.setContainerID(containerData.getContainerID()) return false;
.setAction(ContainerAction.Action.CLOSE)
.setReason(ContainerAction.Reason.CONTAINER_FULL)
.build();
context.addContainerActionIfAbsent(action);
}
} }
} }
private boolean isContainerUnhealthy(Container container) {
return Optional.ofNullable(container).map(
cont -> (cont.getContainerState() == ContainerLifeCycleState.UNHEALTHY))
.orElse(Boolean.FALSE);
}
@Override @Override
public Handler getHandler(ContainerProtos.ContainerType containerType) { public Handler getHandler(ContainerProtos.ContainerType containerType) {
return handlers.get(containerType); return handlers.get(containerType);

View File

@ -79,22 +79,7 @@
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import static org.apache.hadoop.hdds.HddsConfigKeys import static org.apache.hadoop.hdds.HddsConfigKeys
.HDDS_DATANODE_VOLUME_CHOOSING_POLICY; .HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
.Result.BLOCK_NOT_COMMITTED;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CLOSED_CONTAINER_IO;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CONTAINER_INTERNAL_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.DELETE_ON_OPEN_CONTAINER;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.GET_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.INVALID_CONTAINER_STATE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.IO_EXCEPTION;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.PUT_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Stage; .Stage;
import static org.apache.hadoop.ozone.OzoneConfigKeys import static org.apache.hadoop.ozone.OzoneConfigKeys
@ -819,6 +804,9 @@ private void checkContainerOpen(KeyValueContainer kvContainer)
case CLOSED: case CLOSED:
result = CLOSED_CONTAINER_IO; result = CLOSED_CONTAINER_IO;
break; break;
case UNHEALTHY:
result = CONTAINER_UNHEALTHY;
break;
case INVALID: case INVALID:
result = INVALID_CONTAINER_STATE; result = INVALID_CONTAINER_STATE;
break; break;

View File

@ -159,6 +159,7 @@ message ContainerAction {
enum Reason { enum Reason {
CONTAINER_FULL = 1; CONTAINER_FULL = 1;
CONTAINER_UNHEALTHY = 2;
} }
required int64 containerID = 1; required int64 containerID = 1;

View File

@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.ContainerAction.Action;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.ContainerAction.Reason;
import org.apache.hadoop.hdds.scm.container.
common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.HddsConfigKeys.
HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.
HDDS_CONTAINER_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
OZONE_SCM_STALENODE_INTERVAL;
/**
* Tests the containerStateMachine failure handling.
*/
public class TestContainerStateMachineFailures {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static OzoneClient client;
private static ObjectStore objectStore;
private static String volumeName;
private static String bucketName;
private static String path;
private static int chunkSize;
/**
* Create a MiniDFSCluster for testing.
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
path = GenericTestUtils
.getTempPath(TestContainerStateMachineFailures.class.getSimpleName());
File baseDir = new File(path);
baseDir.mkdirs();
chunkSize = (int) OzoneConsts.MB;
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.setQuietMode(false);
cluster =
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
objectStore = client.getObjectStore();
volumeName = "testcontainerstatemachinefailures";
bucketName = volumeName;
objectStore.createVolume(volumeName);
objectStore.getVolume(volumeName).createBucket(bucketName);
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testContainerStateMachineFailures() throws Exception {
OzoneOutputStream key =
objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey("ratis", 1024, ReplicationType.RATIS,
ReplicationFactor.ONE);
key.write("ratis".getBytes());
//get the name of a valid container
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis")
.build();
ChunkGroupOutputStream groupOutputStream =
(ChunkGroupOutputStream) key.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
Assert.assertEquals(1, locationInfoList.size());
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
long containerID = omKeyLocationInfo.getContainerID();
// delete the container dir
FileUtil.fullyDelete(new File(
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet()
.getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
.getContainerPath()));
try {
// flush will throw an exception
key.flush();
Assert.fail("Expected exception not thrown");
} catch (IOException ioe) {
Assert.assertTrue(ioe.getCause() instanceof StorageContainerException);
}
// Make sure the container is marked unhealthy
Assert.assertTrue(
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet()
.getContainer(omKeyLocationInfo.getContainerID())
.getContainerState()
== ContainerProtos.ContainerLifeCycleState.UNHEALTHY);
try {
// subsequent requests will fail with unhealthy container exception
key.close();
Assert.fail("Expected exception not thrown");
} catch (IOException ioe) {
Assert.assertTrue(ioe.getCause() instanceof StorageContainerException);
Assert.assertTrue(((StorageContainerException) ioe.getCause()).getResult()
== ContainerProtos.Result.CONTAINER_UNHEALTHY);
}
StorageContainerDatanodeProtocolProtos.ContainerAction action =
StorageContainerDatanodeProtocolProtos.ContainerAction.newBuilder()
.setContainerID(containerID).setAction(Action.CLOSE)
.setReason(Reason.CONTAINER_UNHEALTHY)
.build();
// Make sure the container close action is initiated to SCM.
Assert.assertTrue(
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().getContext()
.getAllPendingContainerActions().contains(action));
}
}