HADOOP-13354. Update WASB driver to use the latest version (4.2.0) of SDK for Microsoft Azure Storage Clients. Contributed by Sivaguru Sankaridurg.

This commit is contained in:
Chris Nauroth 2016-07-27 15:50:28 -07:00
parent eb7ff0c992
commit b43de80031
4 changed files with 85 additions and 20 deletions

View File

@ -996,7 +996,7 @@
<dependency> <dependency>
<groupId>com.microsoft.azure</groupId> <groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId> <artifactId>azure-storage</artifactId>
<version>2.2.0</version> <version>4.2.0</version>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -28,6 +28,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Locale; import java.util.Locale;
import java.util.List; import java.util.List;
import java.util.UUID;
import java.util.Random; import java.util.Random;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -98,6 +99,12 @@ public class BlockBlobAppendStream extends OutputStream {
*/ */
private long nextBlockCount = UNSET_BLOCKS_COUNT; private long nextBlockCount = UNSET_BLOCKS_COUNT;
/**
* Variable to hold the block id prefix to be used for azure
* storage blocks from azure-storage-java sdk version 4.2.0 onwards
*/
private String blockIdPrefix = null;
private final Random sequenceGenerator = new Random(); private final Random sequenceGenerator = new Random();
/** /**
@ -180,7 +187,8 @@ public BlockBlobAppendStream(final CloudBlockBlobWrapper blob,
this.key = aKey; this.key = aKey;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.threadSequenceNumber = new AtomicInteger(0); this.threadSequenceNumber = new AtomicInteger(0);
setBlocksCount(); this.blockIdPrefix = null;
setBlocksCountAndBlockIdPrefix();
this.outBuffer = new ByteArrayOutputStream(bufferSize); this.outBuffer = new ByteArrayOutputStream(bufferSize);
this.uncommittedBlockEntries = new ArrayList<BlockEntry>(); this.uncommittedBlockEntries = new ArrayList<BlockEntry>();
@ -433,22 +441,41 @@ private synchronized void commitAppendBlocks() throws IOException {
* Helper method used to generate the blockIDs. The algorithm used is similar to the Azure * Helper method used to generate the blockIDs. The algorithm used is similar to the Azure
* storage SDK. * storage SDK.
*/ */
private void setBlocksCount() throws IOException { private void setBlocksCountAndBlockIdPrefix() throws IOException {
try { try {
if (nextBlockCount == UNSET_BLOCKS_COUNT) { if (nextBlockCount == UNSET_BLOCKS_COUNT && blockIdPrefix==null) {
nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE))
+ sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT);
List<BlockEntry> blockEntries = List<BlockEntry> blockEntries =
blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), opContext); blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), opContext);
nextBlockCount += blockEntries.size(); String blockZeroBlockId = (blockEntries.size() > 0) ? blockEntries.get(0).getId() : "";
String prefix = UUID.randomUUID().toString() + "-";
String sampleNewerVersionBlockId = generateNewerVersionBlockId(prefix, 0);
if (blockEntries.size() > 0 && blockZeroBlockId.length() < sampleNewerVersionBlockId.length()) {
// If blob has already been created with 2.2.0, append subsequent blocks with older version (2.2.0) blockId
// compute nextBlockCount, the way it was done before; and don't use blockIdPrefix
this.blockIdPrefix = "";
nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE))
+ sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT);
nextBlockCount += blockEntries.size();
} else {
// If there are no existing blocks, create the first block with newer version (4.2.0) blockId
// If blob has already been created with 4.2.0, append subsequent blocks with newer version (4.2.0) blockId
this.blockIdPrefix = prefix;
nextBlockCount = blockEntries.size();
}
} }
} catch (StorageException ex) { } catch (StorageException ex) {
LOG.debug("Encountered storage exception during setting next Block Count." LOG.debug("Encountered storage exception during setting next Block Count and BlockId prefix."
+ " StorageException : {} ErrorCode : {}", ex, ex.getErrorCode()); + " StorageException : {} ErrorCode : {}", ex, ex.getErrorCode());
throw new IOException(ex); throw new IOException(ex);
} }
@ -465,7 +492,40 @@ private String generateBlockId() throws IOException {
throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly"); throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly");
} }
byte[] blockIdInBytes = getBytesFromLong(nextBlockCount); if (this.blockIdPrefix == null) {
throw new IOException("Append Stream in invalid state. blockIdPrefix not set correctly");
}
if (!this.blockIdPrefix.equals("")) {
return generateNewerVersionBlockId(this.blockIdPrefix, nextBlockCount++);
} else {
return generateOlderVersionBlockId(nextBlockCount++);
}
}
/**
* Helper method that generates an older (2.2.0) version blockId
* @return String representing the block ID generated.
*/
private String generateOlderVersionBlockId(long id) {
byte[] blockIdInBytes = getBytesFromLong(id);
return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8);
}
/**
* Helper method that generates an newer (4.2.0) version blockId
* @return String representing the block ID generated.
*/
private String generateNewerVersionBlockId(String prefix, long id) {
String blockIdSuffix = String.format("%06d", id);
byte[] blockIdInBytes = (prefix + blockIdSuffix).getBytes(StandardCharsets.UTF_8);
return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8); return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8);
} }
@ -481,28 +541,33 @@ private String generateBlockId() throws IOException {
* @return A byte array that represents the data of the specified <code>long</code> value. * @return A byte array that represents the data of the specified <code>long</code> value.
*/ */
private static byte[] getBytesFromLong(final long value) { private static byte[] getBytesFromLong(final long value) {
final byte[] tempArray = new byte[8];
for (int m = 0; m < 8; m++) { final byte[] tempArray = new byte[8];
tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF);
}
return tempArray; for (int m = 0; m < 8; m++) {
tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF);
}
return tempArray;
} }
/** /**
* Helper method that creates a thread to upload a block to azure storage. * Helper method that creates a thread to upload a block to azure storage.
* @param payload * @param payload
* @throws IOException * @throws IOException
*/ */
private synchronized void uploadBlockToStorage(byte[] payload) throws IOException { private synchronized void uploadBlockToStorage(byte[] payload)
throws IOException {
// upload payload to azure storage // upload payload to azure storage
nextBlockCount++;
String blockId = generateBlockId(); String blockId = generateBlockId();
// Since uploads of the Azure storage are done in parallel threads, we go ahead // Since uploads of the Azure storage are done in parallel threads, we go ahead
// add the blockId in the uncommitted list. If the upload of the block fails // add the blockId in the uncommitted list. If the upload of the block fails
// we don't commit the blockIds. // we don't commit the blockIds.
uncommittedBlockEntries.add(new BlockEntry(blockId)); BlockEntry blockEntry = new BlockEntry(blockId);
blockEntry.setSize(payload.length);
uncommittedBlockEntries.add(blockEntry);
ioThreadPool.execute(new WriteRequest(payload, blockId)); ioThreadPool.execute(new WriteRequest(payload, blockId));
} }

View File

@ -147,7 +147,7 @@ && isOutOfBandIoAllowed()) {
try { try {
// Sign the request. GET's have no payload so the content length is // Sign the request. GET's have no payload so the content length is
// zero. // zero.
StorageCredentialsHelper.signBlobAndQueueRequest(getCredentials(), StorageCredentialsHelper.signBlobQueueAndFileRequest(getCredentials(),
urlConnection, -1L, getOperationContext()); urlConnection, -1L, getOperationContext());
} catch (InvalidKeyException e) { } catch (InvalidKeyException e) {
// Log invalid key exception to track signing error before the send // Log invalid key exception to track signing error before the send

View File

@ -404,7 +404,7 @@ public CopyState getCopyState() {
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options, public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
OperationContext opContext) OperationContext opContext)
throws StorageException, URISyntaxException { throws StorageException, URISyntaxException {
getBlob().startCopyFromBlob(((CloudBlobWrapperImpl) sourceBlob).blob, getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
null, null, options, opContext); null, null, options, opContext);
} }