From 47641fcbc9c41f4a338d8899501e4a310d2e81ad Mon Sep 17 00:00:00 2001 From: cnauroth Date: Thu, 22 Oct 2015 12:21:32 -0700 Subject: [PATCH] HADOOP-12334. Change Mode Of Copy Operation of HBase WAL Archiving to bypass Azure Storage Throttling after retries. Contributed by Gaurav Kanade. --- .../hadoop-common/CHANGES.txt | 3 ++ .../fs/azure/AzureNativeFileSystemStore.java | 47 ++++++++++++++++--- 2 files changed, 43 insertions(+), 7 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index ce2a2a7153..74c62cb0d9 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -1335,6 +1335,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12418. TestRPC.testRPCInterruptedSimple fails intermittently. (kihwal) + HADOOP-12334. Change Mode Of Copy Operation of HBase WAL Archiving to bypass + Azure Storage Throttling after retries. (Gaurav Kanade via cnauroth) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index 679413afb5..8a337421c1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -60,6 +60,7 @@ import org.apache.hadoop.fs.azure.metrics.ResponseReceivedMetricUpdater; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.io.IOUtils; import org.mortbay.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +77,7 @@ import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobProperties; import com.microsoft.azure.storage.blob.BlobRequestOptions; +import com.microsoft.azure.storage.blob.BlobType; import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CopyStatus; import com.microsoft.azure.storage.blob.DeleteSnapshotsOption; @@ -2373,6 +2375,9 @@ public void rename(String srcKey, String dstKey, boolean acquireLease, throw new IOException("Cannot acquire new lease if one already exists."); } + CloudBlobWrapper srcBlob = null; + CloudBlobWrapper dstBlob = null; + SelfRenewingLease lease = null; try { // Attempts rename may occur before opening any streams so first, // check if a session exists, if not create a session with the Azure @@ -2388,8 +2393,8 @@ public void rename(String srcKey, String dstKey, boolean acquireLease, // Get the source blob and assert its existence. If the source key // needs to be normalized then normalize it. // - CloudBlobWrapper srcBlob = getBlobReference(srcKey); + srcBlob = getBlobReference(srcKey); if (!srcBlob.exists(getInstrumentedContext())) { throw new AzureException ("Source blob " + srcKey + " does not exist."); @@ -2406,7 +2411,6 @@ public void rename(String srcKey, String dstKey, boolean acquireLease, * when HBase runs on HDFS, where the region server recovers the lease * on a log file, to gain exclusive access to it, before it splits it. */ - SelfRenewingLease lease = null; if (acquireLease) { lease = srcBlob.acquireLease(); } else if (existingLease != null) { @@ -2416,7 +2420,7 @@ public void rename(String srcKey, String dstKey, boolean acquireLease, // Get the destination blob. The destination key always needs to be // normalized. // - CloudBlobWrapper dstBlob = getBlobReference(dstKey); + dstBlob = getBlobReference(dstKey); // Rename the source blob to the destination blob by copying it to // the destination blob then deleting it. @@ -2429,7 +2433,7 @@ public void rename(String srcKey, String dstKey, boolean acquireLease, dstBlob.startCopyFromBlob(srcBlob, null, getInstrumentedContext()); } catch (StorageException se) { if (se.getErrorCode().equals( - StorageErrorCode.SERVER_BUSY.toString())) { + StorageErrorCode.SERVER_BUSY.toString())) { int copyBlobMinBackoff = sessionConfiguration.getInt( KEY_COPYBLOB_MIN_BACKOFF_INTERVAL, DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL); @@ -2449,7 +2453,7 @@ public void rename(String srcKey, String dstKey, boolean acquireLease, BlobRequestOptions options = new BlobRequestOptions(); options.setRetryPolicyFactory(new RetryExponentialRetry( copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff, - copyBlobMaxRetries)); + copyBlobMaxRetries)); dstBlob.startCopyFromBlob(srcBlob, options, getInstrumentedContext()); } else { throw se; @@ -2458,8 +2462,37 @@ public void rename(String srcKey, String dstKey, boolean acquireLease, waitForCopyToComplete(dstBlob, getInstrumentedContext()); safeDelete(srcBlob, lease); } catch (StorageException e) { - // Re-throw exception as an Azure storage exception. - throw new AzureException(e); + if (e.getErrorCode().equals( + StorageErrorCode.SERVER_BUSY.toString())) { + LOG.warn("Rename: CopyBlob: StorageException: ServerBusy: Retry complete, will attempt client side copy for page blob"); + InputStream ipStream = null; + OutputStream opStream = null; + try { + if(srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){ + ipStream = openInputStream(srcBlob); + opStream = openOutputStream(dstBlob); + byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE]; + int len; + while ((len = ipStream.read(buffer)) != -1) { + opStream.write(buffer, 0, len); + } + opStream.flush(); + opStream.close(); + ipStream.close(); + } else { + throw new AzureException(e); + } + safeDelete(srcBlob, lease); + } catch(StorageException se) { + LOG.warn("Rename: CopyBlob: StorageException: Failed"); + throw new AzureException(se); + } finally { + IOUtils.closeStream(ipStream); + IOUtils.closeStream(opStream); + } + } else { + throw new AzureException(e); + } } catch (URISyntaxException e) { // Re-throw exception as an Azure storage exception. throw new AzureException(e);