From 2c52d00a27f62912519b2a81d12b3c8d84be20f3 Mon Sep 17 00:00:00 2001 From: bshashikant Date: Wed, 18 Sep 2019 22:30:41 +0530 Subject: [PATCH] HDDS-2032. Ozone client should retry writes in case of any ratis/stateMachine exceptions. Contributed by Shashikant Banerjee (#1420). --- .../hdds/scm/client/HddsClientUtils.java | 9 +++--- .../ozone/client/io/KeyOutputStream.java | 29 ++++++++++--------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java index 78213ed447..4a3926d5c2 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB; import org.apache.hadoop.hdds.scm.XceiverClientManager.ScmClientConfig; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; import org.apache.hadoop.io.retry.RetryPolicies; @@ -86,7 +86,7 @@ private HddsClientUtils() { private static final List> EXCEPTION_LIST = new ArrayList>() {{ add(TimeoutException.class); - add(ContainerNotOpenException.class); + add(StorageContainerException.class); add(RaftRetryFailureException.class); add(AlreadyClosedException.class); add(GroupMismatchException.class); @@ -301,7 +301,7 @@ public static SCMSecurityProtocol getScmSecurityClient( return scmSecurityClient; } - public static Throwable checkForException(Exception e) throws IOException { + public static Throwable checkForException(Exception e) { Throwable t = e; while (t != null) { for (Class cls : getExceptionList()) { @@ -311,8 +311,7 @@ public static Throwable checkForException(Exception e) throws IOException { } t = t.getCause(); } - - throw e instanceof IOException ? (IOException)e : new IOException(e); + return t; } public static RetryPolicy createRetryPolicy(int maxRetryCount, diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 3a92cf475a..fd503c344d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -25,8 +25,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.io.retry.RetryPolicies; @@ -37,8 +37,6 @@ import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.ratis.protocol.AlreadyClosedException; -import org.apache.ratis.protocol.GroupMismatchException; -import org.apache.ratis.protocol.NotReplicatedException; import org.apache.ratis.protocol.RaftRetryFailureException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +47,6 @@ import java.util.List; import java.util.Collection; import java.util.Map; -import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.stream.Collectors; @@ -256,10 +253,11 @@ private void handleWrite(byte[] b, int off, long len, boolean retry) private void handleException(BlockOutputStreamEntry streamEntry, IOException exception) throws IOException { Throwable t = HddsClientUtils.checkForException(exception); + Preconditions.checkNotNull(t); boolean retryFailure = checkForRetryFailure(t); - boolean closedContainerException = false; + boolean containerExclusionException = false; if (!retryFailure) { - closedContainerException = checkIfContainerIsClosed(t); + containerExclusionException = checkIfContainerToExclude(t); } Pipeline pipeline = streamEntry.getPipeline(); PipelineID pipelineId = pipeline.getId(); @@ -267,7 +265,7 @@ private void handleException(BlockOutputStreamEntry streamEntry, //set the correct length for the current stream streamEntry.setCurrentPosition(totalSuccessfulFlushedData); long bufferedDataLen = blockOutputStreamEntryPool.computeBufferData(); - if (closedContainerException) { + if (containerExclusionException) { LOG.debug( "Encountered exception {}. The last committed block length is {}, " + "uncommitted data length is {} retry count {}", exception, @@ -290,11 +288,12 @@ private void handleException(BlockOutputStreamEntry streamEntry, if (!failedServers.isEmpty()) { excludeList.addDatanodes(failedServers); } - if (closedContainerException) { + + // if the container needs to be excluded , add the container to the + // exclusion list , otherwise add the pipeline to the exclusion list + if (containerExclusionException) { excludeList.addConatinerId(ContainerID.valueof(containerId)); - } else if (retryFailure || t instanceof TimeoutException - || t instanceof GroupMismatchException - || t instanceof NotReplicatedException) { + } else { excludeList.addPipeline(pipelineId); } // just clean up the current stream. @@ -303,7 +302,7 @@ private void handleException(BlockOutputStreamEntry streamEntry, // discard all subsequent blocks the containers and pipelines which // are in the exclude list so that, the very next retry should never // write data on the closed container/pipeline - if (closedContainerException) { + if (containerExclusionException) { // discard subsequent pre allocated blocks from the streamEntries list // from the closed container blockOutputStreamEntryPool @@ -386,8 +385,10 @@ private boolean checkForRetryFailure(Throwable t) { || t instanceof AlreadyClosedException; } - private boolean checkIfContainerIsClosed(Throwable t) { - return t instanceof ContainerNotOpenException; + // Every container specific exception from datatnode will be seen as + // StorageContainerException + private boolean checkIfContainerToExclude(Throwable t) { + return t instanceof StorageContainerException; } @Override