HDDS-709. Modify Close Container handling sequence on datanodes. Contributed by Shashikant Banerjee.
This commit is contained in:
parent
1f9c4f32e8
commit
f944f33832
@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.container.common.helpers;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
|
||||
/**
|
||||
* Exceptions thrown when a write/update opearation is done on non-open
|
||||
* container.
|
||||
*/
|
||||
public class ContainerNotOpenException extends StorageContainerException {
|
||||
|
||||
/**
|
||||
* Constructs an {@code IOException} with the specified detail message.
|
||||
*
|
||||
* @param message The detail message (which is saved for later retrieval by
|
||||
* the {@link #getMessage()} method)
|
||||
*/
|
||||
public ContainerNotOpenException(String message) {
|
||||
super(message, ContainerProtos.Result.CONTAINER_NOT_OPEN);
|
||||
}
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.container.common.helpers;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
|
||||
/**
|
||||
* Exceptions thrown when a container is in invalid state while doing a I/O.
|
||||
*/
|
||||
public class InvalidContainerStateException extends StorageContainerException {
|
||||
|
||||
/**
|
||||
* Constructs an {@code IOException} with the specified detail message.
|
||||
*
|
||||
* @param message The detail message (which is saved for later retrieval by
|
||||
* the {@link #getMessage()} method)
|
||||
*/
|
||||
public InvalidContainerStateException(String message) {
|
||||
super(message, ContainerProtos.Result.INVALID_CONTAINER_STATE);
|
||||
}
|
||||
}
|
@ -142,6 +142,7 @@ enum Result {
|
||||
CONTAINER_UNHEALTHY = 36;
|
||||
UNKNOWN_BCSID = 37;
|
||||
BCSID_MISMATCH = 38;
|
||||
CONTAINER_NOT_OPEN = 39;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -28,6 +28,8 @@
|
||||
.ContainerDataProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerAction;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.InvalidContainerStateException;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||
@ -42,6 +44,9 @@
|
||||
.ContainerCommandResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerType;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
|
||||
ContainerDataProto.State;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -99,6 +104,25 @@ public void shutdown() {
|
||||
volumeSet.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true for exceptions which can be ignored for marking the container
|
||||
* unhealthy.
|
||||
* @param result ContainerCommandResponse error code.
|
||||
* @return true if exception can be ignored, false otherwise.
|
||||
*/
|
||||
private boolean canIgnoreException(Result result) {
|
||||
switch (result) {
|
||||
case SUCCESS:
|
||||
case CONTAINER_UNHEALTHY:
|
||||
case CLOSED_CONTAINER_IO:
|
||||
case DELETE_ON_OPEN_CONTAINER:
|
||||
case ERROR_CONTAINER_NOT_EMPTY:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerCommandResponseProto dispatch(
|
||||
ContainerCommandRequestProto msg) {
|
||||
@ -160,20 +184,31 @@ public ContainerCommandResponseProto dispatch(
|
||||
// failed. All subsequent transactions on the container should fail and
|
||||
// hence replica will be marked unhealthy here. In this case, a close
|
||||
// container action will be sent to SCM to close the container.
|
||||
if (!HddsUtils.isReadOnly(msg)
|
||||
&& responseProto.getResult() != ContainerProtos.Result.SUCCESS) {
|
||||
// If the container is open and the container operation has failed,
|
||||
// it should be first marked unhealthy and the initiate the close
|
||||
// container action. This also implies this is the first transaction
|
||||
// which has failed, so the container is marked unhealthy right here.
|
||||
|
||||
// ApplyTransaction called on closed Container will fail with Closed
|
||||
// container exception. In such cases, ignore the exception here
|
||||
// If the container is already marked unhealthy, no need to change the
|
||||
// state here.
|
||||
|
||||
Result result = responseProto.getResult();
|
||||
if (!HddsUtils.isReadOnly(msg) && !canIgnoreException(result)) {
|
||||
// If the container is open/closing and the container operation
|
||||
// has failed, it should be first marked unhealthy and the initiate the
|
||||
// close container action. This also implies this is the first
|
||||
// transaction which has failed, so the container is marked unhealthy
|
||||
// right here.
|
||||
// Once container is marked unhealthy, all the subsequent write
|
||||
// transactions will fail with UNHEALTHY_CONTAINER exception.
|
||||
if (container.getContainerState() == ContainerDataProto.State.OPEN) {
|
||||
|
||||
// For container to be moved to unhealthy state here, the container can
|
||||
// only be in open or closing state.
|
||||
State containerState = container.getContainerData().getState();
|
||||
Preconditions.checkState(
|
||||
containerState == State.OPEN || containerState == State.CLOSING);
|
||||
container.getContainerData()
|
||||
.setState(ContainerDataProto.State.UNHEALTHY);
|
||||
sendCloseContainerActionIfNeeded(container);
|
||||
}
|
||||
}
|
||||
return responseProto;
|
||||
} else {
|
||||
return ContainerUtils.unsupportedRequest(msg);
|
||||
@ -206,6 +241,54 @@ private void createContainer(ContainerCommandRequestProto containerRequest) {
|
||||
handler.handle(requestBuilder.build(), null);
|
||||
}
|
||||
|
||||
/**
|
||||
* This will be called as a part of creating the log entry during
|
||||
* startTransaction in Ratis on the leader node. In such cases, if the
|
||||
* container is not in open state for writing we should just fail.
|
||||
* Leader will propagate the exception to client.
|
||||
* @param msg container command proto
|
||||
* @throws StorageContainerException In case container state is open for write
|
||||
* requests and in invalid state for read requests.
|
||||
*/
|
||||
@Override
|
||||
public void validateContainerCommand(
|
||||
ContainerCommandRequestProto msg) throws StorageContainerException {
|
||||
ContainerType containerType = msg.getCreateContainer().getContainerType();
|
||||
Handler handler = getHandler(containerType);
|
||||
if (handler == null) {
|
||||
StorageContainerException ex = new StorageContainerException(
|
||||
"Invalid " + "ContainerType " + containerType,
|
||||
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
|
||||
throw ex;
|
||||
}
|
||||
ContainerProtos.Type cmdType = msg.getCmdType();
|
||||
long containerID = msg.getContainerID();
|
||||
Container container;
|
||||
container = getContainer(containerID);
|
||||
if (container != null) {
|
||||
State containerState = container.getContainerState();
|
||||
if (!HddsUtils.isReadOnly(msg) && containerState != State.OPEN) {
|
||||
switch (cmdType) {
|
||||
case CreateContainer:
|
||||
// Create Container is idempotent. There is nothing to validate.
|
||||
break;
|
||||
case CloseContainer:
|
||||
// If the container is unhealthy, closeContainer will be rejected
|
||||
// while execution. Nothing to validate here.
|
||||
break;
|
||||
default:
|
||||
// if the container is not open, no updates can happen. Just throw
|
||||
// an exception
|
||||
throw new ContainerNotOpenException(
|
||||
"Container " + containerID + " in " + containerState + " state");
|
||||
}
|
||||
} else if (HddsUtils.isReadOnly(msg) && containerState == State.INVALID) {
|
||||
throw new InvalidContainerStateException(
|
||||
"Container " + containerID + " in " + containerState + " state");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the container usage reaches the close threshold or the container is
|
||||
* marked unhealthy we send Close ContainerAction to SCM.
|
||||
@ -264,7 +347,6 @@ public void setScmId(String scmId) {
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Container getContainer(long containerID) {
|
||||
return containerSet.getContainer(containerID);
|
||||
}
|
||||
|
@ -23,6 +23,7 @@
|
||||
.ContainerCommandRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerCommandResponseProto;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||
|
||||
/**
|
||||
* Dispatcher acts as the bridge between the transport layer and
|
||||
@ -40,6 +41,15 @@ public interface ContainerDispatcher {
|
||||
*/
|
||||
ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg);
|
||||
|
||||
/**
|
||||
* Validates whether the container command should be executed on the pipeline
|
||||
* or not. Will be invoked by the leader node in the Ratis pipeline
|
||||
* @param msg containerCommand
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
void validateContainerCommand(
|
||||
ContainerCommandRequestProto msg) throws StorageContainerException;
|
||||
|
||||
/**
|
||||
* Initialize the Dispatcher.
|
||||
*/
|
||||
|
@ -24,6 +24,9 @@
|
||||
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
|
||||
ContainerDataProto.State;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine
|
||||
.SCMConnectionManager;
|
||||
@ -84,8 +87,18 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
|
||||
cmdExecuted = false;
|
||||
return;
|
||||
}
|
||||
if (!container.getContainerData().isClosed()) {
|
||||
ContainerData containerData = container.getContainerData();
|
||||
State containerState = container.getContainerData().getState();
|
||||
if (containerState != State.CLOSED) {
|
||||
LOG.debug("Closing container {}.", containerID);
|
||||
// when a closeContainerCommand arrives at a Datanode and if the
|
||||
// container is open, each replica will be moved to closing state first.
|
||||
if (containerState == State.OPEN) {
|
||||
containerData.setState(State.CLOSING);
|
||||
}
|
||||
|
||||
// if the container is already closed, it will be just ignored.
|
||||
// ICR will get triggered to change the replica state in SCM.
|
||||
HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID();
|
||||
HddsProtos.ReplicationType replicationType =
|
||||
closeContainerProto.getReplicationType();
|
||||
@ -100,14 +113,13 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
|
||||
request.setDatanodeUuid(
|
||||
context.getParent().getDatanodeDetails().getUuidString());
|
||||
// submit the close container request for the XceiverServer to handle
|
||||
ozoneContainer.submitContainerRequest(
|
||||
request.build(), replicationType, pipelineID);
|
||||
ozoneContainer.submitContainerRequest(request.build(), replicationType,
|
||||
pipelineID);
|
||||
// Since the container is closed, we trigger an ICR
|
||||
IncrementalContainerReportProto icr = IncrementalContainerReportProto
|
||||
.newBuilder()
|
||||
.addReport(ozoneContainer.getContainerSet()
|
||||
.getContainer(containerID).getContainerReport())
|
||||
.build();
|
||||
IncrementalContainerReportProto icr =
|
||||
IncrementalContainerReportProto.newBuilder().addReport(
|
||||
ozoneContainer.getContainerSet().getContainer(containerID)
|
||||
.getContainerReport()).build();
|
||||
context.addReport(icr);
|
||||
context.getParent().triggerHeartbeat();
|
||||
}
|
||||
|
@ -205,6 +205,17 @@ public TransactionContext startTransaction(RaftClientRequest request)
|
||||
final ContainerCommandRequestProto proto =
|
||||
getRequestProto(request.getMessage().getContent());
|
||||
Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
|
||||
try {
|
||||
dispatcher.validateContainerCommand(proto);
|
||||
} catch (IOException ioe) {
|
||||
TransactionContext ctxt = TransactionContext.newBuilder()
|
||||
.setClientRequest(request)
|
||||
.setStateMachine(this)
|
||||
.setServerRole(RaftPeerRole.LEADER)
|
||||
.build();
|
||||
ctxt.setException(ioe);
|
||||
return ctxt;
|
||||
}
|
||||
if (proto.getCmdType() == Type.WriteChunk) {
|
||||
final WriteChunkRequestProto write = proto.getWriteChunk();
|
||||
// create the state machine data proto
|
||||
|
@ -386,28 +386,26 @@ ContainerCommandResponseProto handleCloseContainer(
|
||||
}
|
||||
|
||||
long containerID = kvContainer.getContainerData().getContainerID();
|
||||
ContainerDataProto.State containerState = kvContainer.getContainerState();
|
||||
|
||||
try {
|
||||
if (containerState == ContainerDataProto.State .CLOSED) {
|
||||
LOG.debug("Container {} is already closed.", containerID);
|
||||
return ContainerUtils.getSuccessResponse(request);
|
||||
} else if (containerState == ContainerDataProto.State .INVALID) {
|
||||
LOG.debug("Invalid container data. ContainerID: {}", containerID);
|
||||
throw new StorageContainerException("Invalid container data. " +
|
||||
"ContainerID: " + containerID, INVALID_CONTAINER_STATE);
|
||||
}
|
||||
|
||||
checkContainerOpen(kvContainer);
|
||||
KeyValueContainerData kvData = kvContainer.getContainerData();
|
||||
|
||||
// remove the container from open block map once, all the blocks
|
||||
// have been committed and the container is closed
|
||||
kvData.setState(ContainerDataProto.State.CLOSING);
|
||||
commitPendingBlocks(kvContainer);
|
||||
|
||||
// TODO : The close command should move the container to either quasi
|
||||
// closed/closed depending upon how the closeContainer gets executed.
|
||||
// If it arrives by Standalone, it will be moved to Quasi Closed or
|
||||
// otherwise moved to Closed state if it gets executed via Ratis.
|
||||
kvContainer.close();
|
||||
// make sure the the container open keys from BlockMap gets removed
|
||||
openContainerBlockMap.removeContainer(kvData.getContainerID());
|
||||
} catch (StorageContainerException ex) {
|
||||
if (ex.getResult() == CLOSED_CONTAINER_IO) {
|
||||
LOG.debug("Container {} is already closed.", containerID);
|
||||
return ContainerUtils.getSuccessResponse(request);
|
||||
}
|
||||
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
||||
} catch (IOException ex) {
|
||||
return ContainerUtils.logAndReturnError(LOG,
|
||||
@ -799,14 +797,21 @@ private void checkContainerOpen(KeyValueContainer kvContainer)
|
||||
|
||||
ContainerDataProto.State containerState = kvContainer.getContainerState();
|
||||
|
||||
if (containerState == ContainerDataProto.State.OPEN) {
|
||||
/**
|
||||
* In a closing state, follower will receive transactions from leader.
|
||||
* Once the leader is put to closing state, it will reject further requests
|
||||
* from clients. Only the transactions which happened before the container
|
||||
* in the leader goes to closing state, will arrive here even the container
|
||||
* might already be in closing state here.
|
||||
*/
|
||||
if (containerState == ContainerDataProto.State.OPEN
|
||||
|| containerState == ContainerDataProto.State.CLOSING) {
|
||||
return;
|
||||
} else {
|
||||
String msg = "Requested operation not allowed as ContainerState is " +
|
||||
containerState;
|
||||
ContainerProtos.Result result = null;
|
||||
switch (containerState) {
|
||||
case CLOSING:
|
||||
case CLOSED:
|
||||
result = CLOSED_CONTAINER_IO;
|
||||
break;
|
||||
|
@ -22,6 +22,7 @@
|
||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
|
||||
@ -429,13 +430,24 @@ private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry,
|
||||
}
|
||||
|
||||
private boolean checkIfContainerIsClosed(IOException ioe) {
|
||||
return Optional.of(ioe.getCause())
|
||||
return checkIfContainerNotOpenException(ioe) || Optional.of(ioe.getCause())
|
||||
.filter(e -> e instanceof StorageContainerException)
|
||||
.map(e -> (StorageContainerException) e)
|
||||
.filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO)
|
||||
.isPresent();
|
||||
}
|
||||
|
||||
private boolean checkIfContainerNotOpenException(IOException ioe) {
|
||||
Throwable t = ioe.getCause();
|
||||
while (t != null) {
|
||||
if (t instanceof ContainerNotOpenException) {
|
||||
return true;
|
||||
}
|
||||
t = t.getCause();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private long getKeyLength() {
|
||||
return streamEntries.parallelStream().mapToLong(e -> e.currentPosition)
|
||||
.sum();
|
||||
|
@ -559,7 +559,7 @@ public void testRetriesOnBlockNotCommittedException() throws Exception {
|
||||
if (datanodes.get(0).equals(datanodeService.getDatanodeDetails())) {
|
||||
datanodeService.getDatanodeStateMachine().getContainer()
|
||||
.getContainerSet().getContainer(containerID).getContainerData()
|
||||
.setState(ContainerProtos.ContainerDataProto.State.CLOSING);
|
||||
.setState(ContainerProtos.ContainerDataProto.State.CLOSED);
|
||||
}
|
||||
}
|
||||
dataString = fixedLengthString(keyString, (chunkSize * 1 / 2));
|
||||
|
@ -162,9 +162,9 @@ public void testContainerStateMachineFailures() throws Exception {
|
||||
key.close();
|
||||
Assert.fail("Expected exception not thrown");
|
||||
} catch (IOException ioe) {
|
||||
Assert.assertTrue(ioe.getCause() instanceof StorageContainerException);
|
||||
Assert.assertTrue(((StorageContainerException) ioe.getCause()).getResult()
|
||||
== ContainerProtos.Result.CONTAINER_UNHEALTHY);
|
||||
Assert.assertTrue(ioe instanceof StorageContainerException);
|
||||
Assert.assertTrue(((StorageContainerException) ioe).getResult()
|
||||
== ContainerProtos.Result.BLOCK_NOT_COMMITTED);
|
||||
}
|
||||
}
|
||||
}
|
@ -33,6 +33,7 @@
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerCommandResponseProto;
|
||||
import org.apache.hadoop.hdds.scm.*;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
@ -172,6 +173,11 @@ public ContainerCommandResponseProto dispatch(
|
||||
return ContainerTestHelper.getCreateContainerResponse(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void validateContainerCommand(
|
||||
ContainerCommandRequestProto msg) throws StorageContainerException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.ozone.container.server;
|
||||
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
|
||||
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
|
||||
@ -228,6 +229,11 @@ private static class TestContainerDispatcher implements ContainerDispatcher {
|
||||
public void init() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void validateContainerCommand(
|
||||
ContainerCommandRequestProto msg) throws StorageContainerException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user