HADOOP-11959. WASB should configure client side socket timeout in storage client blob request options. Contributed by Ivan Mitic.
This commit is contained in:
parent
7ebe80ec12
commit
94e7d49a6d
@ -791,6 +791,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HADOOP-11930. test-patch in offline mode should tell maven to be in
|
HADOOP-11930. test-patch in offline mode should tell maven to be in
|
||||||
offline mode (Sean Busbey via aw)
|
offline mode (Sean Busbey via aw)
|
||||||
|
|
||||||
|
HADOOP-11959. WASB should configure client side socket timeout in storage
|
||||||
|
client blob request options. (Ivan Mitic via cnauroth)
|
||||||
|
|
||||||
Release 2.7.1 - UNRELEASED
|
Release 2.7.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -920,7 +920,7 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.microsoft.azure</groupId>
|
<groupId>com.microsoft.azure</groupId>
|
||||||
<artifactId>azure-storage</artifactId>
|
<artifactId>azure-storage</artifactId>
|
||||||
<version>2.0.0</version>
|
<version>2.2.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -2434,15 +2434,6 @@ public void rename(String srcKey, String dstKey, boolean acquireLease,
|
|||||||
//
|
//
|
||||||
CloudBlobWrapper dstBlob = getBlobReference(dstKey);
|
CloudBlobWrapper dstBlob = getBlobReference(dstKey);
|
||||||
|
|
||||||
// TODO: Remove at the time when we move to Azure Java SDK 1.2+.
|
|
||||||
// This is the workaround provided by Azure Java SDK team to
|
|
||||||
// mitigate the issue with un-encoded x-ms-copy-source HTTP
|
|
||||||
// request header. Azure sdk version before 1.2+ does not encode this
|
|
||||||
// header what causes all URIs that have special (category "other")
|
|
||||||
// characters in the URI not to work with startCopyFromBlob when
|
|
||||||
// specified as source (requests fail with HTTP 403).
|
|
||||||
URI srcUri = new URI(srcBlob.getUri().toASCIIString());
|
|
||||||
|
|
||||||
// Rename the source blob to the destination blob by copying it to
|
// Rename the source blob to the destination blob by copying it to
|
||||||
// the destination blob then deleting it.
|
// the destination blob then deleting it.
|
||||||
//
|
//
|
||||||
@ -2451,7 +2442,7 @@ public void rename(String srcKey, String dstKey, boolean acquireLease,
|
|||||||
// a more intensive exponential retry policy when the cluster is getting
|
// a more intensive exponential retry policy when the cluster is getting
|
||||||
// throttled.
|
// throttled.
|
||||||
try {
|
try {
|
||||||
dstBlob.startCopyFromBlob(srcUri, null, getInstrumentedContext());
|
dstBlob.startCopyFromBlob(srcBlob, null, getInstrumentedContext());
|
||||||
} catch (StorageException se) {
|
} catch (StorageException se) {
|
||||||
if (se.getErrorCode().equals(
|
if (se.getErrorCode().equals(
|
||||||
StorageErrorCode.SERVER_BUSY.toString())) {
|
StorageErrorCode.SERVER_BUSY.toString())) {
|
||||||
@ -2475,7 +2466,7 @@ public void rename(String srcKey, String dstKey, boolean acquireLease,
|
|||||||
options.setRetryPolicyFactory(new RetryExponentialRetry(
|
options.setRetryPolicyFactory(new RetryExponentialRetry(
|
||||||
copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff,
|
copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff,
|
||||||
copyBlobMaxRetries));
|
copyBlobMaxRetries));
|
||||||
dstBlob.startCopyFromBlob(srcUri, options, getInstrumentedContext());
|
dstBlob.startCopyFromBlob(srcBlob, options, getInstrumentedContext());
|
||||||
} else {
|
} else {
|
||||||
throw se;
|
throw se;
|
||||||
}
|
}
|
||||||
|
@ -381,8 +381,8 @@ public interface CloudBlobWrapper extends ListBlobItem {
|
|||||||
* Copies an existing blob's contents, properties, and metadata to this instance of the <code>CloudBlob</code>
|
* Copies an existing blob's contents, properties, and metadata to this instance of the <code>CloudBlob</code>
|
||||||
* class, using the specified operation context.
|
* class, using the specified operation context.
|
||||||
*
|
*
|
||||||
* @param source
|
* @param sourceBlob
|
||||||
* A <code>java.net.URI</code> The URI of a source blob.
|
* A <code>CloudBlob</code> object that represents the source blob to copy.
|
||||||
* @param options
|
* @param options
|
||||||
* A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
|
* A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
|
||||||
* <code>null</code> will use the default request options from the associated service client (
|
* <code>null</code> will use the default request options from the associated service client (
|
||||||
@ -397,7 +397,7 @@ public interface CloudBlobWrapper extends ListBlobItem {
|
|||||||
* @throws URISyntaxException
|
* @throws URISyntaxException
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public abstract void startCopyFromBlob(URI source,
|
public abstract void startCopyFromBlob(CloudBlobWrapper sourceBlob,
|
||||||
BlobRequestOptions options, OperationContext opContext)
|
BlobRequestOptions options, OperationContext opContext)
|
||||||
throws StorageException, URISyntaxException;
|
throws StorageException, URISyntaxException;
|
||||||
|
|
||||||
|
@ -393,10 +393,10 @@ public CopyState getCopyState() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void startCopyFromBlob(URI source, BlobRequestOptions options,
|
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
|
||||||
OperationContext opContext)
|
OperationContext opContext)
|
||||||
throws StorageException, URISyntaxException {
|
throws StorageException, URISyntaxException {
|
||||||
getBlob().startCopyFromBlob(source,
|
getBlob().startCopyFromBlob(((CloudBlobWrapperImpl)sourceBlob).blob,
|
||||||
null, null, options, opContext);
|
null, null, options, opContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -429,9 +429,9 @@ public void setMetadata(HashMap<String, String> metadata) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void startCopyFromBlob(URI source, BlobRequestOptions options,
|
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
|
||||||
OperationContext opContext) throws StorageException, URISyntaxException {
|
OperationContext opContext) throws StorageException, URISyntaxException {
|
||||||
backingStore.copy(convertUriToDecodedString(source), convertUriToDecodedString(uri));
|
backingStore.copy(convertUriToDecodedString(sourceBlob.getUri()), convertUriToDecodedString(uri));
|
||||||
//TODO: set the backingStore.properties.CopyState and
|
//TODO: set the backingStore.properties.CopyState and
|
||||||
// update azureNativeFileSystemStore.waitForCopyToComplete
|
// update azureNativeFileSystemStore.waitForCopyToComplete
|
||||||
}
|
}
|
||||||
|
@ -205,6 +205,7 @@ public void testTransientErrorOnCommitBlockList() throws Exception {
|
|||||||
@Override
|
@Override
|
||||||
public boolean isTargetConnection(HttpURLConnection connection) {
|
public boolean isTargetConnection(HttpURLConnection connection) {
|
||||||
return connection.getRequestMethod().equals("PUT")
|
return connection.getRequestMethod().equals("PUT")
|
||||||
|
&& connection.getURL().getQuery() != null
|
||||||
&& connection.getURL().getQuery().contains("blocklist");
|
&& connection.getURL().getQuery().contains("blocklist");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -191,6 +191,7 @@ private void checkObtainedMd5(String obtainedMd5) {
|
|||||||
|
|
||||||
private static boolean isPutBlock(HttpURLConnection connection) {
|
private static boolean isPutBlock(HttpURLConnection connection) {
|
||||||
return connection.getRequestMethod().equals("PUT")
|
return connection.getRequestMethod().equals("PUT")
|
||||||
|
&& connection.getURL().getQuery() != null
|
||||||
&& connection.getURL().getQuery().contains("blockid");
|
&& connection.getURL().getQuery().contains("blockid");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user