HADOOP-12334. Change Mode Of Copy Operation of HBase WAL Archiving to bypass Azure Storage Throttling after retries. Contributed by Gaurav Kanade.
This commit is contained in:
parent
aea26bf4dd
commit
47641fcbc9
@ -1335,6 +1335,9 @@ Release 2.8.0 - UNRELEASED
|
||||
HADOOP-12418. TestRPC.testRPCInterruptedSimple fails intermittently.
|
||||
(kihwal)
|
||||
|
||||
HADOOP-12334. Change Mode Of Copy Operation of HBase WAL Archiving to bypass
|
||||
Azure Storage Throttling after retries. (Gaurav Kanade via cnauroth)
|
||||
|
||||
Release 2.7.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -60,6 +60,7 @@
|
||||
import org.apache.hadoop.fs.azure.metrics.ResponseReceivedMetricUpdater;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -76,6 +77,7 @@
|
||||
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
||||
import com.microsoft.azure.storage.blob.BlobProperties;
|
||||
import com.microsoft.azure.storage.blob.BlobRequestOptions;
|
||||
import com.microsoft.azure.storage.blob.BlobType;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.blob.CopyStatus;
|
||||
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
|
||||
@ -2373,6 +2375,9 @@ public void rename(String srcKey, String dstKey, boolean acquireLease,
|
||||
throw new IOException("Cannot acquire new lease if one already exists.");
|
||||
}
|
||||
|
||||
CloudBlobWrapper srcBlob = null;
|
||||
CloudBlobWrapper dstBlob = null;
|
||||
SelfRenewingLease lease = null;
|
||||
try {
|
||||
// Attempts rename may occur before opening any streams so first,
|
||||
// check if a session exists, if not create a session with the Azure
|
||||
@ -2388,8 +2393,8 @@ public void rename(String srcKey, String dstKey, boolean acquireLease,
|
||||
// Get the source blob and assert its existence. If the source key
|
||||
// needs to be normalized then normalize it.
|
||||
//
|
||||
CloudBlobWrapper srcBlob = getBlobReference(srcKey);
|
||||
|
||||
srcBlob = getBlobReference(srcKey);
|
||||
if (!srcBlob.exists(getInstrumentedContext())) {
|
||||
throw new AzureException ("Source blob " + srcKey +
|
||||
" does not exist.");
|
||||
@ -2406,7 +2411,6 @@ public void rename(String srcKey, String dstKey, boolean acquireLease,
|
||||
* when HBase runs on HDFS, where the region server recovers the lease
|
||||
* on a log file, to gain exclusive access to it, before it splits it.
|
||||
*/
|
||||
SelfRenewingLease lease = null;
|
||||
if (acquireLease) {
|
||||
lease = srcBlob.acquireLease();
|
||||
} else if (existingLease != null) {
|
||||
@ -2416,7 +2420,7 @@ public void rename(String srcKey, String dstKey, boolean acquireLease,
|
||||
// Get the destination blob. The destination key always needs to be
|
||||
// normalized.
|
||||
//
|
||||
CloudBlobWrapper dstBlob = getBlobReference(dstKey);
|
||||
dstBlob = getBlobReference(dstKey);
|
||||
|
||||
// Rename the source blob to the destination blob by copying it to
|
||||
// the destination blob then deleting it.
|
||||
@ -2429,7 +2433,7 @@ public void rename(String srcKey, String dstKey, boolean acquireLease,
|
||||
dstBlob.startCopyFromBlob(srcBlob, null, getInstrumentedContext());
|
||||
} catch (StorageException se) {
|
||||
if (se.getErrorCode().equals(
|
||||
StorageErrorCode.SERVER_BUSY.toString())) {
|
||||
StorageErrorCode.SERVER_BUSY.toString())) {
|
||||
int copyBlobMinBackoff = sessionConfiguration.getInt(
|
||||
KEY_COPYBLOB_MIN_BACKOFF_INTERVAL,
|
||||
DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL);
|
||||
@ -2449,7 +2453,7 @@ public void rename(String srcKey, String dstKey, boolean acquireLease,
|
||||
BlobRequestOptions options = new BlobRequestOptions();
|
||||
options.setRetryPolicyFactory(new RetryExponentialRetry(
|
||||
copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff,
|
||||
copyBlobMaxRetries));
|
||||
copyBlobMaxRetries));
|
||||
dstBlob.startCopyFromBlob(srcBlob, options, getInstrumentedContext());
|
||||
} else {
|
||||
throw se;
|
||||
@ -2458,8 +2462,37 @@ public void rename(String srcKey, String dstKey, boolean acquireLease,
|
||||
waitForCopyToComplete(dstBlob, getInstrumentedContext());
|
||||
safeDelete(srcBlob, lease);
|
||||
} catch (StorageException e) {
|
||||
// Re-throw exception as an Azure storage exception.
|
||||
throw new AzureException(e);
|
||||
if (e.getErrorCode().equals(
|
||||
StorageErrorCode.SERVER_BUSY.toString())) {
|
||||
LOG.warn("Rename: CopyBlob: StorageException: ServerBusy: Retry complete, will attempt client side copy for page blob");
|
||||
InputStream ipStream = null;
|
||||
OutputStream opStream = null;
|
||||
try {
|
||||
if(srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){
|
||||
ipStream = openInputStream(srcBlob);
|
||||
opStream = openOutputStream(dstBlob);
|
||||
byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE];
|
||||
int len;
|
||||
while ((len = ipStream.read(buffer)) != -1) {
|
||||
opStream.write(buffer, 0, len);
|
||||
}
|
||||
opStream.flush();
|
||||
opStream.close();
|
||||
ipStream.close();
|
||||
} else {
|
||||
throw new AzureException(e);
|
||||
}
|
||||
safeDelete(srcBlob, lease);
|
||||
} catch(StorageException se) {
|
||||
LOG.warn("Rename: CopyBlob: StorageException: Failed");
|
||||
throw new AzureException(se);
|
||||
} finally {
|
||||
IOUtils.closeStream(ipStream);
|
||||
IOUtils.closeStream(opStream);
|
||||
}
|
||||
} else {
|
||||
throw new AzureException(e);
|
||||
}
|
||||
} catch (URISyntaxException e) {
|
||||
// Re-throw exception as an Azure storage exception.
|
||||
throw new AzureException(e);
|
||||
|
Loading…
Reference in New Issue
Block a user