HDDS-1403. KeyOutputStream writes fails after max retries while writing to a closed container (#753)
This commit is contained in:
parent
556eafd01a
commit
37582705fa
@ -27,7 +27,9 @@
|
|||||||
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
|
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
|
||||||
import org.apache.ratis.util.TimeDuration;
|
import org.apache.ratis.util.TimeDuration;
|
||||||
|
|
||||||
/**
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
* This class contains constants for configuration keys used in Ozone.
|
* This class contains constants for configuration keys used in Ozone.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@ -140,8 +142,11 @@ public final class OzoneConfigKeys {
|
|||||||
|
|
||||||
public static final String OZONE_CLIENT_MAX_RETRIES =
|
public static final String OZONE_CLIENT_MAX_RETRIES =
|
||||||
"ozone.client.max.retries";
|
"ozone.client.max.retries";
|
||||||
public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 5;
|
public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 100;
|
||||||
|
public static final String OZONE_CLIENT_RETRY_INTERVAL =
|
||||||
|
"ozone.client.retry.interval";
|
||||||
|
public static final TimeDuration OZONE_CLIENT_RETRY_INTERVAL_DEFAULT =
|
||||||
|
TimeDuration.valueOf(0, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
// This defines the overall connection limit for the connection pool used in
|
// This defines the overall connection limit for the connection pool used in
|
||||||
// RestClient.
|
// RestClient.
|
||||||
|
@ -429,12 +429,21 @@
|
|||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>ozone.client.max.retries</name>
|
<name>ozone.client.max.retries</name>
|
||||||
<value>5</value>
|
<value>100</value>
|
||||||
<tag>OZONE, CLIENT</tag>
|
<tag>OZONE, CLIENT</tag>
|
||||||
<description>Maximum number of retries by Ozone Client on encountering
|
<description>Maximum number of retries by Ozone Client on encountering
|
||||||
exception while writing a key.
|
exception while writing a key.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>ozone.client.retry.interval</name>
|
||||||
|
<value>0ms</value>
|
||||||
|
<tag>OZONE, CLIENT</tag>
|
||||||
|
<description>Indicates the time duration a client will wait before
|
||||||
|
retrying a write key request on encountering an exception. By default
|
||||||
|
there is no wait.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>ozone.client.protocol</name>
|
<name>ozone.client.protocol</name>
|
||||||
<value>org.apache.hadoop.ozone.client.rpc.RpcClient</value>
|
<value>org.apache.hadoop.ozone.client.rpc.RpcClient</value>
|
||||||
|
@ -127,10 +127,11 @@ public static KeyInfoDetails asKeyInfoDetails(OzoneKeyDetails key) {
|
|||||||
return keyInfo;
|
return keyInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static RetryPolicy createRetryPolicy(int maxRetryCount) {
|
public static RetryPolicy createRetryPolicy(int maxRetryCount,
|
||||||
|
long retryInterval) {
|
||||||
// just retry without sleep
|
// just retry without sleep
|
||||||
RetryPolicy retryPolicy = RetryPolicies
|
RetryPolicy retryPolicy = RetryPolicies
|
||||||
.retryUpToMaximumCountWithFixedSleep(maxRetryCount, 0,
|
.retryUpToMaximumCountWithFixedSleep(maxRetryCount, retryInterval,
|
||||||
TimeUnit.MILLISECONDS);
|
TimeUnit.MILLISECONDS);
|
||||||
return retryPolicy;
|
return retryPolicy;
|
||||||
}
|
}
|
||||||
|
@ -164,7 +164,8 @@ public KeyOutputStream(OpenKeySession handler,
|
|||||||
String requestId, ReplicationFactor factor, ReplicationType type,
|
String requestId, ReplicationFactor factor, ReplicationType type,
|
||||||
long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
|
long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
|
||||||
ChecksumType checksumType, int bytesPerChecksum,
|
ChecksumType checksumType, int bytesPerChecksum,
|
||||||
String uploadID, int partNumber, boolean isMultipart, int maxRetryCount) {
|
String uploadID, int partNumber, boolean isMultipart,
|
||||||
|
int maxRetryCount, long retryInterval) {
|
||||||
this.streamEntries = new ArrayList<>();
|
this.streamEntries = new ArrayList<>();
|
||||||
this.currentStreamIndex = 0;
|
this.currentStreamIndex = 0;
|
||||||
this.omClient = omClient;
|
this.omClient = omClient;
|
||||||
@ -199,7 +200,8 @@ public KeyOutputStream(OpenKeySession handler,
|
|||||||
this.bufferPool =
|
this.bufferPool =
|
||||||
new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize);
|
new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize);
|
||||||
this.excludeList = new ExcludeList();
|
this.excludeList = new ExcludeList();
|
||||||
this.retryPolicy = OzoneClientUtils.createRetryPolicy(maxRetryCount);
|
this.retryPolicy = OzoneClientUtils.createRetryPolicy(maxRetryCount,
|
||||||
|
retryInterval);
|
||||||
this.retryCount = 0;
|
this.retryCount = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -726,6 +728,7 @@ public static class Builder {
|
|||||||
private int multipartNumber;
|
private int multipartNumber;
|
||||||
private boolean isMultipartKey;
|
private boolean isMultipartKey;
|
||||||
private int maxRetryCount;
|
private int maxRetryCount;
|
||||||
|
private long retryInterval;
|
||||||
|
|
||||||
|
|
||||||
public Builder setMultipartUploadID(String uploadID) {
|
public Builder setMultipartUploadID(String uploadID) {
|
||||||
@ -814,12 +817,17 @@ public Builder setMaxRetryCount(int maxCount) {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder setRetryInterval(long retryIntervalInMS) {
|
||||||
|
this.retryInterval = retryIntervalInMS;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public KeyOutputStream build() throws IOException {
|
public KeyOutputStream build() throws IOException {
|
||||||
return new KeyOutputStream(openHandler, xceiverManager,
|
return new KeyOutputStream(openHandler, xceiverManager,
|
||||||
omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
|
omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
|
||||||
streamBufferMaxSize, blockSize, watchTimeout, checksumType,
|
streamBufferMaxSize, blockSize, watchTimeout, checksumType,
|
||||||
bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,
|
bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,
|
||||||
maxRetryCount);
|
maxRetryCount, retryInterval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,6 +85,7 @@
|
|||||||
import org.apache.hadoop.hdds.scm.protocolPB
|
import org.apache.hadoop.hdds.scm.protocolPB
|
||||||
.StorageContainerLocationProtocolPB;
|
.StorageContainerLocationProtocolPB;
|
||||||
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
|
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
|
||||||
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
@ -128,6 +129,7 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
|
|||||||
private final long watchTimeout;
|
private final long watchTimeout;
|
||||||
private final ClientId clientId = ClientId.randomId();
|
private final ClientId clientId = ClientId.randomId();
|
||||||
private final int maxRetryCount;
|
private final int maxRetryCount;
|
||||||
|
private final long retryInterval;
|
||||||
private Text dtService;
|
private Text dtService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -214,6 +216,9 @@ public RpcClient(Configuration conf) throws IOException {
|
|||||||
maxRetryCount =
|
maxRetryCount =
|
||||||
conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
|
conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
|
||||||
OZONE_CLIENT_MAX_RETRIES_DEFAULT);
|
OZONE_CLIENT_MAX_RETRIES_DEFAULT);
|
||||||
|
retryInterval = OzoneUtils.getTimeDurationInMS(conf,
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL,
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL_DEFAULT);
|
||||||
dtService =
|
dtService =
|
||||||
getOMProxyProvider().getProxy().getDelegationTokenService();
|
getOMProxyProvider().getProxy().getDelegationTokenService();
|
||||||
boolean isUnsafeByteOperationsEnabled = conf.getBoolean(
|
boolean isUnsafeByteOperationsEnabled = conf.getBoolean(
|
||||||
@ -861,6 +866,7 @@ public OzoneOutputStream createMultipartKey(String volumeName,
|
|||||||
.setMultipartUploadID(uploadID)
|
.setMultipartUploadID(uploadID)
|
||||||
.setIsMultipartKey(true)
|
.setIsMultipartKey(true)
|
||||||
.setMaxRetryCount(maxRetryCount)
|
.setMaxRetryCount(maxRetryCount)
|
||||||
|
.setRetryInterval(retryInterval)
|
||||||
.build();
|
.build();
|
||||||
keyOutputStream.addPreallocateBlocks(
|
keyOutputStream.addPreallocateBlocks(
|
||||||
openKey.getKeyInfo().getLatestVersionLocations(),
|
openKey.getKeyInfo().getLatestVersionLocations(),
|
||||||
@ -1022,7 +1028,9 @@ private OzoneOutputStream createOutputStream(OpenKeySession openKey,
|
|||||||
.setBlockSize(blockSize)
|
.setBlockSize(blockSize)
|
||||||
.setChecksumType(checksumType)
|
.setChecksumType(checksumType)
|
||||||
.setBytesPerChecksum(bytesPerChecksum)
|
.setBytesPerChecksum(bytesPerChecksum)
|
||||||
.setMaxRetryCount(maxRetryCount).build();
|
.setMaxRetryCount(maxRetryCount)
|
||||||
|
.setRetryInterval(retryInterval)
|
||||||
|
.build();
|
||||||
keyOutputStream
|
keyOutputStream
|
||||||
.addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
|
.addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
|
||||||
openKey.getOpenVersion());
|
openKey.getOpenVersion());
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -33,6 +34,7 @@
|
|||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.ratis.util.TimeDuration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set of Utility functions used in ozone.
|
* Set of Utility functions used in ozone.
|
||||||
@ -214,4 +216,24 @@ public static void verifyResourceName(String resName)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the TimeDuration configured for the given key. If not configured,
|
||||||
|
* return the default value.
|
||||||
|
*/
|
||||||
|
public static TimeDuration getTimeDuration(Configuration conf, String key,
|
||||||
|
TimeDuration defaultValue) {
|
||||||
|
TimeUnit defaultTimeUnit = defaultValue.getUnit();
|
||||||
|
long timeDurationInDefaultUnit = conf.getTimeDuration(key,
|
||||||
|
defaultValue.getDuration(), defaultTimeUnit);
|
||||||
|
return TimeDuration.valueOf(timeDurationInDefaultUnit, defaultTimeUnit);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the time configured for the given key in milliseconds.
|
||||||
|
*/
|
||||||
|
public static long getTimeDurationInMS(Configuration conf, String key,
|
||||||
|
TimeDuration defaultValue) {
|
||||||
|
return getTimeDuration(conf, key, defaultValue)
|
||||||
|
.toLong(TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,6 +57,7 @@
|
|||||||
import org.apache.hadoop.ozone.web.handlers.UserArgs;
|
import org.apache.hadoop.ozone.web.handlers.UserArgs;
|
||||||
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
|
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
|
||||||
import org.apache.hadoop.ozone.web.response.*;
|
import org.apache.hadoop.ozone.web.response.*;
|
||||||
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -90,6 +91,7 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||||||
private final int bytesPerChecksum;
|
private final int bytesPerChecksum;
|
||||||
private final boolean verifyChecksum;
|
private final boolean verifyChecksum;
|
||||||
private final int maxRetryCount;
|
private final int maxRetryCount;
|
||||||
|
private final long retryInterval;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new DistributedStorageHandler.
|
* Creates a new DistributedStorageHandler.
|
||||||
@ -159,6 +161,9 @@ public DistributedStorageHandler(OzoneConfiguration conf,
|
|||||||
this.maxRetryCount =
|
this.maxRetryCount =
|
||||||
conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
|
conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
|
||||||
OZONE_CLIENT_MAX_RETRIES_DEFAULT);
|
OZONE_CLIENT_MAX_RETRIES_DEFAULT);
|
||||||
|
this.retryInterval = OzoneUtils.getTimeDurationInMS(conf,
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL,
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL_DEFAULT);
|
||||||
boolean isUnsafeByteOperationsEnabled = conf.getBoolean(
|
boolean isUnsafeByteOperationsEnabled = conf.getBoolean(
|
||||||
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
|
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
|
||||||
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT);
|
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT);
|
||||||
@ -464,6 +469,7 @@ public OutputStream newKeyWriter(KeyArgs args) throws IOException,
|
|||||||
.setChecksumType(checksumType)
|
.setChecksumType(checksumType)
|
||||||
.setBytesPerChecksum(bytesPerChecksum)
|
.setBytesPerChecksum(bytesPerChecksum)
|
||||||
.setMaxRetryCount(maxRetryCount)
|
.setMaxRetryCount(maxRetryCount)
|
||||||
|
.setRetryInterval(retryInterval)
|
||||||
.build();
|
.build();
|
||||||
keyOutputStream.addPreallocateBlocks(
|
keyOutputStream.addPreallocateBlocks(
|
||||||
openKey.getKeyInfo().getLatestVersionLocations(),
|
openKey.getKeyInfo().getLatestVersionLocations(),
|
||||||
|
Loading…
Reference in New Issue
Block a user