From 873ef8ae81321325889c9d3a6939163e98fbf5bb Mon Sep 17 00:00:00 2001 From: Mukul Kumar Singh Date: Mon, 3 Sep 2018 12:26:34 +0530 Subject: [PATCH] HDDS-263. Add retries in Ozone Client to handle BlockNotCommitted Exception. Contributed by Shashikant Banerjee. --- .../helpers/BlockNotCommittedException.java | 36 ++++++++ .../scm/storage/ContainerProtocolCalls.java | 5 + .../apache/hadoop/ozone/OzoneConfigKeys.java | 8 ++ .../src/main/resources/ozone-default.xml | 16 ++++ .../hadoop/ozone/client/OzoneClientUtils.java | 28 ++++++ .../client/io/ChunkGroupOutputStream.java | 89 ++++++++++++++---- .../hadoop/ozone/client/rpc/RpcClient.java | 5 + .../TestCloseContainerHandlingByClient.java | 91 ++++++++++++++++--- .../storage/DistributedStorageHandler.java | 5 + 9 files changed, 254 insertions(+), 29 deletions(-) create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/BlockNotCommittedException.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/BlockNotCommittedException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/BlockNotCommittedException.java new file mode 100644 index 0000000000..86f5a66cf4 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/BlockNotCommittedException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.common.helpers; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; + +/** + * Exceptions thrown when a block is yet to be committed on the datanode. + */ +public class BlockNotCommittedException extends StorageContainerException { + + /** + * Constructs an {@code IOException} with the specified detail message. + * + * @param message The detail message (which is saved for later retrieval by + * the {@link #getMessage()} method) + */ + public BlockNotCommittedException(String message) { + super(message, ContainerProtos.Result.BLOCK_NOT_COMMITTED); + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 1f2fafbc7d..1d6a89d73a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdds.scm.storage; +import org.apache.hadoop.hdds.scm.container.common.helpers + .BlockNotCommittedException; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers @@ -420,6 +422,9 @@ private static void validateContainerResponse( ) throws StorageContainerException { if (response.getResult() == ContainerProtos.Result.SUCCESS) { return; + } else if (response.getResult() + == ContainerProtos.Result.BLOCK_NOT_COMMITTED) { + throw new BlockNotCommittedException(response.getMessage()); } throw new StorageContainerException( response.getMessage(), response.getResult()); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 6ad9085444..8f53da58b4 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -194,6 +194,14 @@ public final class OzoneConfigKeys { public static final int OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT = 10; + public static final String OZONE_CLIENT_MAX_RETRIES = + "ozone.client.max.retries"; + public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 50; + + public static final String OZONE_CLIENT_RETRY_INTERVAL = + "ozone.client.retry.interval"; + public static final String OZONE_CLIENT_RETRY_INTERVAL_DEFAULT = "200ms"; + public static final String DFS_CONTAINER_RATIS_ENABLED_KEY = ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY; public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 6f296c61d0..a9fd10b21a 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -243,6 +243,22 @@ Connection timeout for Ozone client in milliseconds. + + ozone.client.max.retries + 50 + OZONE, CLIENT + Maximum number of retries by Ozone Client on encountering + exception while fetching committed block length. + + + + ozone.client.retry.interval + 200ms + OZONE, CLIENT + Interval between retries by Ozone Client on encountering + exception while fetching committed block length. + + ozone.client.protocol org.apache.hadoop.ozone.client.rpc.RpcClient diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java index 0aaee31ffb..5d57753220 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java @@ -17,14 +17,23 @@ */ package org.apache.hadoop.ozone.client; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.rest.response.BucketInfo; import org.apache.hadoop.ozone.client.rest.response.KeyInfo; import org.apache.hadoop.ozone.client.rest.response.VolumeInfo; import org.apache.hadoop.ozone.client.rest.response.VolumeOwner; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + /** A utility class for OzoneClient. */ public final class OzoneClientUtils { @@ -84,4 +93,23 @@ public static KeyInfo asKeyInfo(OzoneKey key) { keyInfo.setSize(key.getDataSize()); return keyInfo; } + + public static RetryPolicy createRetryPolicy(Configuration conf) { + int maxRetryCount = + conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys. + OZONE_CLIENT_MAX_RETRIES_DEFAULT); + long retryInterval = conf.getTimeDuration(OzoneConfigKeys. + OZONE_CLIENT_RETRY_INTERVAL, OzoneConfigKeys. + OZONE_CLIENT_RETRY_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + RetryPolicy basePolicy = RetryPolicies + .retryUpToMaximumCountWithFixedSleep(maxRetryCount, retryInterval, + TimeUnit.MILLISECONDS); + Map, RetryPolicy> exceptionToPolicyMap = + new HashMap, RetryPolicy>(); + exceptionToPolicyMap.put(BlockNotCommittedException.class, basePolicy); + RetryPolicy retryPolicy = RetryPolicies + .retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, + exceptionToPolicyMap); + return retryPolicy; + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index c632df6a6d..21406b52c9 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; @@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -63,7 +65,7 @@ */ public class ChunkGroupOutputStream extends OutputStream { - private static final Logger LOG = + public static final Logger LOG = LoggerFactory.getLogger(ChunkGroupOutputStream.class); // array list's get(index) is O(1) @@ -80,6 +82,7 @@ public class ChunkGroupOutputStream extends OutputStream { private final String requestID; private boolean closed; private List locationInfoList; + private final RetryPolicy retryPolicy; /** * A constructor for testing purpose only. */ @@ -95,6 +98,7 @@ public ChunkGroupOutputStream() { requestID = null; closed = false; locationInfoList = null; + retryPolicy = null; } /** @@ -124,7 +128,7 @@ public ChunkGroupOutputStream( StorageContainerLocationProtocolClientSideTranslatorPB scmClient, OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize, String requestId, ReplicationFactor factor, - ReplicationType type) throws IOException { + ReplicationType type, RetryPolicy retryPolicy) throws IOException { this.streamEntries = new ArrayList<>(); this.currentStreamIndex = 0; this.byteOffset = 0; @@ -143,6 +147,7 @@ public ChunkGroupOutputStream( this.chunkSize = chunkSize; this.requestID = requestId; this.locationInfoList = new ArrayList<>(); + this.retryPolicy = retryPolicy; LOG.debug("Expecting open key with one block, but got" + info.getKeyLocationVersions().size()); } @@ -305,6 +310,62 @@ private void handleWrite(byte[] b, int off, int len) throws IOException { } } + private long getCommittedBlockLength(ChunkOutputStreamEntry streamEntry) + throws IOException { + long blockLength; + ContainerProtos.GetCommittedBlockLengthResponseProto responseProto; + RetryPolicy.RetryAction action; + int numRetries = 0; + + // TODO : At this point of time, we also need to allocate new blocks + // from a different container and may need to nullify + // all the remaining pre-allocated blocks in case they were + // pre-allocated on the same container which got closed now.This needs + // caching the closed container list on the client itself. + while (true) { + try { + responseProto = ContainerProtocolCalls + .getCommittedBlockLength(streamEntry.xceiverClient, + streamEntry.blockID, requestID); + blockLength = responseProto.getBlockLength(); + return blockLength; + } catch (StorageContainerException sce) { + try { + action = retryPolicy.shouldRetry(sce, numRetries, 0, true); + } catch (Exception e) { + throw e instanceof IOException ? (IOException) e : new IOException(e); + } + if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { + if (action.reason != null) { + LOG.error( + "GetCommittedBlockLength request failed. " + action.reason, + sce); + } + throw sce; + } + + // Throw the exception if the thread is interrupted + if (Thread.currentThread().isInterrupted()) { + LOG.warn("Interrupted while trying for connection"); + throw sce; + } + Preconditions.checkArgument( + action.action == RetryPolicy.RetryAction.RetryDecision.RETRY); + try { + Thread.sleep(action.delayMillis); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException( + "Interrupted: action=" + action + ", retry policy=" + retryPolicy) + .initCause(e); + } + numRetries++; + LOG.trace("Retrying GetCommittedBlockLength request. Already tried " + + numRetries + " time(s); retry policy is " + retryPolicy); + continue; + } + } + } + /** * It performs following actions : * a. Updates the committed length at datanode for the current stream in @@ -317,15 +378,6 @@ private void handleWrite(byte[] b, int off, int len) throws IOException { */ private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry, int streamIndex) throws IOException { - // TODO : If the block is still not committed and is in the - // pending openBlock Map, it will return BLOCK_NOT_COMMITTED - // exception. We should handle this by retrying the same operation - // n times and update the OzoneManager with the actual block length - // written. At this point of time, we also need to allocate new blocks - // from a different container and may need to nullify - // all the remaining pre-allocated blocks in case they were - // pre-allocated on the same container which got closed now.This needs - // caching the closed container list on the client itself. long committedLength = 0; ByteBuffer buffer = streamEntry.getBuffer(); if (buffer == null) { @@ -342,11 +394,7 @@ private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry, // for this block associated with the stream here. if (streamEntry.currentPosition >= chunkSize || streamEntry.currentPosition != buffer.position()) { - ContainerProtos.GetCommittedBlockLengthResponseProto responseProto = - ContainerProtocolCalls - .getCommittedBlockLength(streamEntry.xceiverClient, - streamEntry.blockID, requestID); - committedLength = responseProto.getBlockLength(); + committedLength = getCommittedBlockLength(streamEntry); // update the length of the current stream locationInfoList.get(streamIndex).setLength(committedLength); } @@ -481,6 +529,7 @@ public static class Builder { private String requestID; private ReplicationType type; private ReplicationFactor factor; + private RetryPolicy retryPolicy; public Builder setHandler(OpenKeySession handler) { this.openHandler = handler; @@ -526,8 +575,14 @@ public Builder setFactor(ReplicationFactor replicationFactor) { public ChunkGroupOutputStream build() throws IOException { return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient, - omClient, chunkSize, requestID, factor, type); + omClient, chunkSize, requestID, factor, type, retryPolicy); } + + public Builder setRetryPolicy(RetryPolicy rPolicy) { + this.retryPolicy = rPolicy; + return this; + } + } private static class ChunkOutputStreamEntry extends OutputStream { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index fc70514453..387f41fe6a 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; @@ -37,6 +38,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.ozone.client.VolumeArgs; +import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; import org.apache.hadoop.ozone.client.io.LengthInputStream; @@ -97,6 +99,7 @@ public class RpcClient implements ClientProtocol { private final UserGroupInformation ugi; private final OzoneAcl.OzoneACLRights userRights; private final OzoneAcl.OzoneACLRights groupRights; + private final RetryPolicy retryPolicy; /** * Creates RpcClient instance with the given configuration. @@ -137,6 +140,7 @@ public RpcClient(Configuration conf) throws IOException { Client.getRpcTimeout(conf))); this.xceiverClientManager = new XceiverClientManager(conf); + retryPolicy = OzoneClientUtils.createRetryPolicy(conf); int configuredChunkSize = conf.getInt( ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, @@ -469,6 +473,7 @@ public OzoneOutputStream createKey( .setRequestID(requestId) .setType(HddsProtos.ReplicationType.valueOf(type.toString())) .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue())) + .setRetryPolicy(retryPolicy) .build(); groupOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index 50d7ec54dd..9f12633381 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -22,6 +22,10 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.HddsDatanodeService; +import org.apache.hadoop.hdds.scm.container.common.helpers. + StorageContainerException; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -45,6 +49,7 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.event.Level; import java.io.IOException; import java.security.MessageDigest; @@ -52,6 +57,7 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; +import java.util.Random; /** * Tests Close Container Exception handling by Ozone Client. @@ -67,6 +73,7 @@ public class TestCloseContainerHandlingByClient { private static String volumeName; private static String bucketName; private static String keyString; + private static int maxRetries; /** * Create a MiniDFSCluster for testing. @@ -78,6 +85,9 @@ public class TestCloseContainerHandlingByClient { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); + // generate a no between 1 to 10 + maxRetries = new Random().nextInt(10); + conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, maxRetries); chunkSize = (int) OzoneConsts.MB; blockSize = 4 * chunkSize; conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize); @@ -286,17 +296,8 @@ private void waitForContainerClose(String keyName, ChunkGroupOutputStream groupOutputStream = (ChunkGroupOutputStream) outputStream.getOutputStream(); - long clientId = groupOutputStream.getOpenID(); - OMMetadataManager metadataManager = - cluster.getOzoneManager().getMetadataManager(); - byte[] openKey = - metadataManager.getOpenKeyBytes( - volumeName, bucketName, keyName, clientId); - byte[] openKeyData = metadataManager.getOpenKeyTable().get(openKey); - OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf( - OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData)); List locationInfoList = - keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); + getLocationInfos(groupOutputStream, keyName); List containerIdList = new ArrayList<>(); List pipelineList = new ArrayList<>(); for (OmKeyLocationInfo info : locationInfoList) { @@ -318,7 +319,6 @@ private void waitForContainerClose(String keyName, new CloseContainerCommand(containerID, type, pipeline.getId())); } } - int index = 0; for (long containerID : containerIdList) { Pipeline pipeline = pipelineList.get(index); @@ -333,7 +333,6 @@ private void waitForContainerClose(String keyName, } index++; } - } private OzoneOutputStream createKey(String keyName, ReplicationType type, @@ -345,6 +344,20 @@ private OzoneOutputStream createKey(String keyName, ReplicationType type, .createKey(keyName, size, type, factor); } + private List getLocationInfos( + ChunkGroupOutputStream groupOutputStream, String keyName) + throws IOException { + long clientId = groupOutputStream.getOpenID(); + OMMetadataManager metadataManager = + cluster.getOzoneManager().getMetadataManager(); + byte[] openKey = metadataManager + .getOpenKeyBytes(volumeName, bucketName, keyName, clientId); + byte[] openKeyData = metadataManager.getOpenKeyTable().get(openKey); + OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf( + OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData)); + return keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); + } + private void validateData(String keyName, byte[] data) throws Exception { byte[] readData = new byte[data.length]; OzoneInputStream is = @@ -399,4 +412,58 @@ public void testBlockWriteViaRatis() throws Exception { dataString.concat(dataString); validateData(keyName, dataString.getBytes()); } + + @Test + public void testRetriesOnBlockNotCommittedException() throws Exception { + String keyName = "blockcommitexceptiontest"; + OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 0); + ChunkGroupOutputStream groupOutputStream = + (ChunkGroupOutputStream) key.getOutputStream(); + GenericTestUtils.setLogLevel(ChunkGroupOutputStream.LOG, Level.TRACE); + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(ChunkGroupOutputStream.LOG); + + Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + String dataString = fixedLengthString(keyString, (3 * chunkSize)); + key.write(dataString.getBytes()); + List locationInfos = + getLocationInfos(groupOutputStream, keyName); + long containerID = locationInfos.get(0).getContainerID(); + List datanodes = + cluster.getStorageContainerManager().getScmContainerManager() + .getContainerWithPipeline(containerID).getPipeline().getMachines(); + Assert.assertEquals(1, datanodes.size()); + // move the container on the datanode to Closing state, this will ensure + // closing the key will hit BLOCK_NOT_COMMITTED_EXCEPTION while trying + // to fetch the committed length + for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes()) { + if (datanodes.get(0).equals(datanodeService.getDatanodeDetails())) { + datanodeService.getDatanodeStateMachine().getContainer() + .getContainerSet().getContainer(containerID).getContainerData() + .setState(ContainerProtos.ContainerLifeCycleState.CLOSING); + } + } + dataString = fixedLengthString(keyString, (chunkSize * 1 / 2)); + key.write(dataString.getBytes()); + try { + key.close(); + Assert.fail("Expected Exception not thrown"); + } catch (IOException ioe) { + Assert.assertTrue(ioe instanceof StorageContainerException); + Assert.assertTrue(((StorageContainerException) ioe).getResult() + == ContainerProtos.Result.BLOCK_NOT_COMMITTED); + } + // It should retry only for max retries times + for (int i = 1; i <= maxRetries; i++) { + Assert.assertTrue(logCapturer.getOutput() + .contains("Retrying GetCommittedBlockLength request")); + Assert.assertTrue(logCapturer.getOutput().contains("Already tried " + i)); + } + Assert.assertTrue(logCapturer.getOutput() + .contains("GetCommittedBlockLength request failed.")); + Assert.assertTrue(logCapturer.getOutput().contains( + "retries get failed due to exceeded maximum allowed retries number" + + ": " + maxRetries)); + logCapturer.stopCapturing(); + } } diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index ec33990de4..0d62432e3c 100644 --- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -21,6 +21,8 @@ import com.google.common.base.Strings; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.client.io.LengthInputStream; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; @@ -85,6 +87,7 @@ public final class DistributedStorageHandler implements StorageHandler { private final boolean useRatis; private final HddsProtos.ReplicationType type; private final HddsProtos.ReplicationFactor factor; + private final RetryPolicy retryPolicy; /** * Creates a new DistributedStorageHandler. @@ -119,6 +122,7 @@ public DistributedStorageHandler(OzoneConfiguration conf, OMConfigKeys.OZONE_OM_USER_RIGHTS_DEFAULT); groupRights = conf.getEnum(OMConfigKeys.OZONE_OM_GROUP_RIGHTS, OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT); + retryPolicy = OzoneClientUtils.createRetryPolicy(conf); if(chunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) { LOG.warn("The chunk size ({}) is not allowed to be more than" + " the maximum size ({})," @@ -420,6 +424,7 @@ public OutputStream newKeyWriter(KeyArgs args) throws IOException, .setRequestID(args.getRequestID()) .setType(xceiverClientManager.getType()) .setFactor(xceiverClientManager.getFactor()) + .setRetryPolicy(retryPolicy) .build(); groupOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(),