HDDS-2032. Ozone client should retry writes in case of any ratis/stateMachine exceptions. Contributed by Shashikant Banerjee (#1420).

This commit is contained in:
bshashikant 2019-09-18 22:30:41 +05:30 committed by GitHub
parent 5dd859a8a0
commit 2c52d00a27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 19 deletions

View File

@ -28,7 +28,7 @@
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
import org.apache.hadoop.hdds.scm.XceiverClientManager.ScmClientConfig;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.io.retry.RetryPolicies;
@ -86,7 +86,7 @@ private HddsClientUtils() {
private static final List<Class<? extends Exception>> EXCEPTION_LIST =
new ArrayList<Class<? extends Exception>>() {{
add(TimeoutException.class);
add(ContainerNotOpenException.class);
add(StorageContainerException.class);
add(RaftRetryFailureException.class);
add(AlreadyClosedException.class);
add(GroupMismatchException.class);
@ -301,7 +301,7 @@ public static SCMSecurityProtocol getScmSecurityClient(
return scmSecurityClient;
}
public static Throwable checkForException(Exception e) throws IOException {
public static Throwable checkForException(Exception e) {
Throwable t = e;
while (t != null) {
for (Class<? extends Exception> cls : getExceptionList()) {
@ -311,8 +311,7 @@ public static Throwable checkForException(Exception e) throws IOException {
}
t = t.getCause();
}
throw e instanceof IOException ? (IOException)e : new IOException(e);
return t;
}
public static RetryPolicy createRetryPolicy(int maxRetryCount,

View File

@ -25,8 +25,8 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.io.retry.RetryPolicies;
@ -37,8 +37,6 @@
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.NotReplicatedException;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,7 +47,6 @@
import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -256,10 +253,11 @@ private void handleWrite(byte[] b, int off, long len, boolean retry)
private void handleException(BlockOutputStreamEntry streamEntry,
IOException exception) throws IOException {
Throwable t = HddsClientUtils.checkForException(exception);
Preconditions.checkNotNull(t);
boolean retryFailure = checkForRetryFailure(t);
boolean closedContainerException = false;
boolean containerExclusionException = false;
if (!retryFailure) {
closedContainerException = checkIfContainerIsClosed(t);
containerExclusionException = checkIfContainerToExclude(t);
}
Pipeline pipeline = streamEntry.getPipeline();
PipelineID pipelineId = pipeline.getId();
@ -267,7 +265,7 @@ private void handleException(BlockOutputStreamEntry streamEntry,
//set the correct length for the current stream
streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
long bufferedDataLen = blockOutputStreamEntryPool.computeBufferData();
if (closedContainerException) {
if (containerExclusionException) {
LOG.debug(
"Encountered exception {}. The last committed block length is {}, "
+ "uncommitted data length is {} retry count {}", exception,
@ -290,11 +288,12 @@ private void handleException(BlockOutputStreamEntry streamEntry,
if (!failedServers.isEmpty()) {
excludeList.addDatanodes(failedServers);
}
if (closedContainerException) {
// if the container needs to be excluded , add the container to the
// exclusion list , otherwise add the pipeline to the exclusion list
if (containerExclusionException) {
excludeList.addConatinerId(ContainerID.valueof(containerId));
} else if (retryFailure || t instanceof TimeoutException
|| t instanceof GroupMismatchException
|| t instanceof NotReplicatedException) {
} else {
excludeList.addPipeline(pipelineId);
}
// just clean up the current stream.
@ -303,7 +302,7 @@ private void handleException(BlockOutputStreamEntry streamEntry,
// discard all subsequent blocks the containers and pipelines which
// are in the exclude list so that, the very next retry should never
// write data on the closed container/pipeline
if (closedContainerException) {
if (containerExclusionException) {
// discard subsequent pre allocated blocks from the streamEntries list
// from the closed container
blockOutputStreamEntryPool
@ -386,8 +385,10 @@ private boolean checkForRetryFailure(Throwable t) {
|| t instanceof AlreadyClosedException;
}
private boolean checkIfContainerIsClosed(Throwable t) {
return t instanceof ContainerNotOpenException;
// Every container specific exception from datatnode will be seen as
// StorageContainerException
private boolean checkIfContainerToExclude(Throwable t) {
return t instanceof StorageContainerException;
}
@Override