diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index 534919e13f..f76b44a094 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -481,6 +481,9 @@ public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentati } } + // Configure Azure storage session. + configureAzureStorageSession(); + // Start an Azure storage session. // createAzureStorageSession(); @@ -792,9 +795,6 @@ private void connectUsingAnonymousCredentials(final URI uri) // Accessing the storage server unauthenticated using // anonymous credentials. isAnonymousCredentials = true; - - // Configure Azure storage session. - configureAzureStorageSession(); } private void connectUsingCredentials(String accountName, @@ -820,9 +820,6 @@ private void connectUsingCredentials(String accountName, // Can only create container if using account key credentials canCreateOrModifyContainer = credentials instanceof StorageCredentialsAccountAndKey; - - // Configure Azure storage session. - configureAzureStorageSession(); } /** @@ -848,8 +845,6 @@ private void connectToAzureStorageInSecureMode(String accountName, rootDirectory = container.getDirectoryReference(""); canCreateOrModifyContainer = true; - - configureAzureStorageSession(); tolerateOobAppends = false; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java index 650149af6a..810aacfa0f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java @@ -69,6 +69,8 @@ public class SecureStorageInterfaceImpl extends StorageInterface { public static final String SAS_ERROR_CODE = "SAS Error"; private SASKeyGeneratorInterface sasKeyGenerator; private String storageAccount; + private RetryPolicyFactory retryPolicy; + private int timeoutIntervalInMs; public SecureStorageInterfaceImpl(boolean useLocalSASKeyMode, Configuration conf) throws SecureModeException { @@ -90,10 +92,12 @@ public SecureStorageInterfaceImpl(boolean useLocalSASKeyMode, @Override public void setTimeoutInMs(int timeoutInMs) { + timeoutIntervalInMs = timeoutInMs; } @Override public void setRetryPolicyFactory(RetryPolicyFactory retryPolicyFactory) { + retryPolicy = retryPolicyFactory; } @Override @@ -133,9 +137,15 @@ public CloudBlobContainerWrapper getContainerReference(String name) throws URISyntaxException, StorageException { try { - return new SASCloudBlobContainerWrapperImpl(storageAccount, - new CloudBlobContainer(sasKeyGenerator.getContainerSASUri( - storageAccount, name)), sasKeyGenerator); + CloudBlobContainer container = new CloudBlobContainer(sasKeyGenerator.getContainerSASUri( + storageAccount, name)); + if (retryPolicy != null) { + container.getServiceClient().getDefaultRequestOptions().setRetryPolicyFactory(retryPolicy); + } + if (timeoutIntervalInMs > 0) { + container.getServiceClient().getDefaultRequestOptions().setTimeoutIntervalInMs(timeoutIntervalInMs); + } + return new SASCloudBlobContainerWrapperImpl(storageAccount, container, sasKeyGenerator); } catch (SASKeyGenerationException sasEx) { String errorMsg = "Encountered SASKeyGeneration exception while " + "generating SAS Key for container : " + name @@ -216,9 +226,12 @@ public CloudBlobDirectoryWrapper getDirectoryReference(String relativePath) public CloudBlobWrapper getBlockBlobReference(String relativePath) throws URISyntaxException, StorageException { try { + CloudBlockBlob blob = new CloudBlockBlob(sasKeyGenerator.getRelativeBlobSASUri( + storageAccount, getName(), relativePath)); + blob.getServiceClient().setDefaultRequestOptions( + container.getServiceClient().getDefaultRequestOptions()); return new SASCloudBlockBlobWrapperImpl( - new CloudBlockBlob(sasKeyGenerator.getRelativeBlobSASUri( - storageAccount, getName(), relativePath))); + blob); } catch (SASKeyGenerationException sasEx) { String errorMsg = "Encountered SASKeyGeneration exception while " + "generating SAS Key for relativePath : " + relativePath @@ -232,9 +245,12 @@ public CloudBlobWrapper getBlockBlobReference(String relativePath) public CloudBlobWrapper getPageBlobReference(String relativePath) throws URISyntaxException, StorageException { try { + CloudPageBlob blob = new CloudPageBlob(sasKeyGenerator.getRelativeBlobSASUri( + storageAccount, getName(), relativePath)); + blob.getServiceClient().setDefaultRequestOptions( + container.getServiceClient().getDefaultRequestOptions()); return new SASCloudPageBlobWrapperImpl( - new CloudPageBlob(sasKeyGenerator.getRelativeBlobSASUri( - storageAccount, getName(), relativePath))); + blob); } catch (SASKeyGenerationException sasEx) { String errorMsg = "Encountered SASKeyGeneration exception while " + "generating SAS Key for relativePath : " + relativePath diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java index 367cd04455..491a0d09c9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java @@ -60,32 +60,50 @@ @InterfaceAudience.Private class StorageInterfaceImpl extends StorageInterface { private CloudBlobClient serviceClient; + private RetryPolicyFactory retryPolicyFactory; + private int timeoutIntervalInMs; + + private void updateRetryPolicy() { + if (serviceClient != null && retryPolicyFactory != null) { + serviceClient.getDefaultRequestOptions().setRetryPolicyFactory(retryPolicyFactory); + } + } + + private void updateTimeoutInMs() { + if (serviceClient != null && timeoutIntervalInMs > 0) { + serviceClient.getDefaultRequestOptions().setTimeoutIntervalInMs(timeoutIntervalInMs); + } + } @Override public void setRetryPolicyFactory(final RetryPolicyFactory retryPolicyFactory) { - serviceClient.getDefaultRequestOptions().setRetryPolicyFactory( - retryPolicyFactory); + this.retryPolicyFactory = retryPolicyFactory; + updateRetryPolicy(); } @Override public void setTimeoutInMs(int timeoutInMs) { - serviceClient.getDefaultRequestOptions().setTimeoutIntervalInMs( - timeoutInMs); + timeoutIntervalInMs = timeoutInMs; + updateTimeoutInMs(); } @Override public void createBlobClient(CloudStorageAccount account) { serviceClient = account.createCloudBlobClient(); + updateRetryPolicy(); + updateTimeoutInMs(); } @Override public void createBlobClient(URI baseUri) { - serviceClient = new CloudBlobClient(baseUri); + createBlobClient(baseUri, (StorageCredentials)null); } @Override public void createBlobClient(URI baseUri, StorageCredentials credentials) { serviceClient = new CloudBlobClient(baseUri, credentials); + updateRetryPolicy(); + updateTimeoutInMs(); } @Override