HADOOP-14520. WASB: Block compaction for Azure Block Blobs.

Contributed by Georgi Chalakov
This commit is contained in:
Steve Loughran 2017-09-07 18:35:03 +01:00
parent d77ed238a9
commit 13eda50003
No known key found for this signature in database
GPG Key ID: 950CC3E032B79CA2
14 changed files with 1391 additions and 657 deletions

View File

@ -202,6 +202,23 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
*/
private Set<String> pageBlobDirs;
/**
* Configuration key to indicate the set of directories in WASB where we
* should store files as block blobs with block compaction enabled.
*
* Entries can be directory paths relative to the container (e.g. "/path") or
* fully qualified wasb:// URIs (e.g.
* wasb://container@example.blob.core.windows.net/path)
*/
public static final String KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES =
"fs.azure.block.blob.with.compaction.dir";
/**
* The set of directories where we should store files as block blobs with
* block compaction enabled.
*/
private Set<String> blockBlobWithCompationDirs;
/**
* Configuration key to indicate the set of directories in WASB where
* we should do atomic folder rename synchronized with createNonRecursive.
@ -527,6 +544,12 @@ public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentati
// User-agent
userAgentId = conf.get(USER_AGENT_ID_KEY, USER_AGENT_ID_DEFAULT);
// Extract the directories that should contain block blobs with compaction
blockBlobWithCompationDirs = getDirectorySet(
KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES);
LOG.debug("Block blobs with compaction directories: {}",
setToString(blockBlobWithCompationDirs));
// Extract directories that should have atomic rename applied.
atomicRenameDirs = getDirectorySet(KEY_ATOMIC_RENAME_DIRECTORIES);
String hbaseRoot;
@ -1164,6 +1187,17 @@ public boolean isPageBlobKey(String key) {
return isKeyForDirectorySet(key, pageBlobDirs);
}
/**
* Checks if the given key in Azure Storage should be stored as a block blobs
* with compaction enabled instead of normal block blob.
*
* @param key blob name
* @return true, if the file is in directory with block compaction enabled.
*/
public boolean isBlockBlobWithCompactionKey(String key) {
return isKeyForDirectorySet(key, blockBlobWithCompationDirs);
}
/**
* Checks if the given key in Azure storage should have synchronized
* atomic folder rename createNonRecursive implemented.
@ -1356,7 +1390,9 @@ private BlobRequestOptions getDownloadOptions() {
}
@Override
public DataOutputStream storefile(String key, PermissionStatus permissionStatus)
public DataOutputStream storefile(String keyEncoded,
PermissionStatus permissionStatus,
String key)
throws AzureException {
try {
@ -1417,12 +1453,26 @@ public DataOutputStream storefile(String key, PermissionStatus permissionStatus)
// Get the blob reference from the store's container and
// return it.
CloudBlobWrapper blob = getBlobReference(key);
CloudBlobWrapper blob = getBlobReference(keyEncoded);
storePermissionStatus(blob, permissionStatus);
// Create the output stream for the Azure blob.
//
OutputStream outputStream = openOutputStream(blob);
OutputStream outputStream;
if (isBlockBlobWithCompactionKey(key)) {
BlockBlobAppendStream blockBlobOutputStream = new BlockBlobAppendStream(
(CloudBlockBlobWrapper) blob,
keyEncoded,
this.uploadBlockSizeBytes,
true,
getInstrumentedContext());
outputStream = blockBlobOutputStream;
} else {
outputStream = openOutputStream(blob);
}
DataOutputStream dataOutStream = new SyncableDataOutputStream(outputStream);
return dataOutStream;
} catch (Exception e) {
@ -2869,10 +2919,21 @@ public DataOutputStream retrieveAppendStream(String key, int bufferSize) throws
CloudBlobWrapper blob = this.container.getBlockBlobReference(key);
BlockBlobAppendStream appendStream = new BlockBlobAppendStream((CloudBlockBlobWrapper) blob, key, bufferSize, getInstrumentedContext());
appendStream.initialize();
OutputStream outputStream;
return new DataOutputStream(appendStream);
BlockBlobAppendStream blockBlobOutputStream = new BlockBlobAppendStream(
(CloudBlockBlobWrapper) blob,
key,
bufferSize,
isBlockBlobWithCompactionKey(key),
getInstrumentedContext());
outputStream = blockBlobOutputStream;
DataOutputStream dataOutStream = new SyncableDataOutputStream(
outputStream);
return dataOutStream;
} catch(Exception ex) {
throw new AzureException(ex);
}

View File

@ -62,6 +62,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
import org.apache.hadoop.fs.azure.security.Constants;
@ -352,9 +354,9 @@ public String makeRenamePendingFileContents() {
}
/**
* This is an exact copy of org.codehaus.jettison.json.JSONObject.quote
* This is an exact copy of org.codehaus.jettison.json.JSONObject.quote
* method.
*
*
* Produce a string in double quotes with backslash sequences in all the
* right places. A backslash will be inserted within </, allowing JSON
* text to be delivered in HTML. In JSON text, a string cannot contain a
@ -947,11 +949,11 @@ private void checkNotClosed() throws IOException {
}
}
private class NativeAzureFsOutputStream extends OutputStream {
// We should not override flush() to actually close current block and flush
// to DFS, this will break applications that assume flush() is a no-op.
// Applications are advised to use Syncable.hflush() for that purpose.
// NativeAzureFsOutputStream needs to implement Syncable if needed.
/**
* Azure output stream; wraps an inner stream of different types.
*/
public class NativeAzureFsOutputStream extends OutputStream
implements Syncable, StreamCapabilities {
private String key;
private String keyEncoded;
private OutputStream out;
@ -983,6 +985,48 @@ public NativeAzureFsOutputStream(OutputStream out, String aKey,
setEncodedKey(anEncodedKey);
}
/**
* Get a reference to the wrapped output stream.
*
* @return the underlying output stream
*/
@InterfaceAudience.LimitedPrivate({"HDFS"})
public OutputStream getOutStream() {
return out;
}
@Override // Syncable
public void hflush() throws IOException {
if (out instanceof Syncable) {
((Syncable) out).hflush();
} else {
flush();
}
}
@Override // Syncable
public void hsync() throws IOException {
if (out instanceof Syncable) {
((Syncable) out).hsync();
} else {
flush();
}
}
/**
* Propagate probe of stream capabilities to nested stream
* (if supported), else return false.
* @param capability string to query the stream support for.
* @return true if the nested stream supports the specific capability.
*/
@Override // StreamCapability
public boolean hasCapability(String capability) {
if (out instanceof StreamCapabilities) {
return ((StreamCapabilities) out).hasCapability(capability);
}
return false;
}
@Override
public synchronized void close() throws IOException {
if (out != null) {
@ -990,8 +1034,11 @@ public synchronized void close() throws IOException {
// before returning to the caller.
//
out.close();
restoreKey();
out = null;
try {
restoreKey();
} finally {
out = null;
}
}
}
@ -1045,10 +1092,10 @@ public void write(byte[] b) throws IOException {
/**
* Writes <code>len</code> from the specified byte array starting at offset
* <code>off</code> to the output stream. The general contract for write(b,
* off, len) is that some of the bytes in the array <code>
* b</code b> are written to the output stream in order; element
* <code>b[off]</code> is the first byte written and
* <code>b[off+len-1]</code> is the last byte written by this operation.
* off, len) is that some of the bytes in the array <code>b</code>
* are written to the output stream in order; element <code>b[off]</code>
* is the first byte written and <code>b[off+len-1]</code> is the last
* byte written by this operation.
*
* @param b
* Byte array to be written.
@ -1749,7 +1796,7 @@ private FSDataOutputStream create(Path f, FsPermission permission,
OutputStream bufOutStream;
if (store.isPageBlobKey(key)) {
// Store page blobs directly in-place without renames.
bufOutStream = store.storefile(key, permissionStatus);
bufOutStream = store.storefile(key, permissionStatus, key);
} else {
// This is a block blob, so open the output blob stream based on the
// encoded key.
@ -1777,7 +1824,7 @@ private FSDataOutputStream create(Path f, FsPermission permission,
// these
// blocks.
bufOutStream = new NativeAzureFsOutputStream(store.storefile(
keyEncoded, permissionStatus), key, keyEncoded);
keyEncoded, permissionStatus, key), key, keyEncoded);
}
// Construct the data output stream from the buffered output stream.
FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);

View File

@ -50,8 +50,9 @@ void storeEmptyFolder(String key, PermissionStatus permissionStatus)
InputStream retrieve(String key, long byteRangeStart) throws IOException;
DataOutputStream storefile(String key, PermissionStatus permissionStatus)
throws AzureException;
DataOutputStream storefile(String keyEncoded,
PermissionStatus permissionStatus,
String key) throws AzureException;
boolean isPageBlobKey(String key);

View File

@ -519,7 +519,7 @@ public void downloadRange(long offset, long length, OutputStream outStream,
@Override
public SelfRenewingLease acquireLease() throws StorageException {
return new SelfRenewingLease(this);
return new SelfRenewingLease(this, false);
}
}
@ -557,10 +557,12 @@ public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequest
}
@Override
public void uploadBlock(String blockId, InputStream sourceStream,
public void uploadBlock(String blockId, AccessCondition accessCondition,
InputStream sourceStream,
long length, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException {
((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length,
accessCondition, options, opContext);
}
@Override
@ -593,4 +595,4 @@ public ArrayList<PageRange> downloadPageRanges(BlobRequestOptions options,
null, options, opContext);
}
}
}
}

View File

@ -30,6 +30,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import static com.microsoft.azure.storage.StorageErrorCodeStrings.LEASE_ALREADY_PRESENT;
/**
* An Azure blob lease that automatically renews itself indefinitely
* using a background thread. Use it to synchronize distributed processes,
@ -66,7 +68,7 @@ public class SelfRenewingLease {
@VisibleForTesting
static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000;
public SelfRenewingLease(CloudBlobWrapper blobWrapper)
public SelfRenewingLease(CloudBlobWrapper blobWrapper, boolean throwIfPresent)
throws StorageException {
this.leaseFreed = false;
@ -79,10 +81,14 @@ public SelfRenewingLease(CloudBlobWrapper blobWrapper)
leaseID = blob.acquireLease(LEASE_TIMEOUT, null);
} catch (StorageException e) {
if (throwIfPresent && e.getErrorCode().equals(LEASE_ALREADY_PRESENT)) {
throw e;
}
// Throw again if we don't want to keep waiting.
// We expect it to be that the lease is already present,
// or in some cases that the blob does not exist.
if (!"LeaseAlreadyPresent".equals(e.getErrorCode())) {
if (!LEASE_ALREADY_PRESENT.equals(e.getErrorCode())) {
LOG.info(
"Caught exception when trying to get lease on blob "
+ blobWrapper.getUri().toString() + ". " + e.getMessage());

View File

@ -665,6 +665,7 @@ List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions
*
* @param blockId A String that represents the Base-64 encoded block ID. Note for a given blob
* the length of all Block IDs must be identical.
* @param accessCondition An {@link AccessCondition} object that represents the access conditions for the blob.
* @param sourceStream An {@link InputStream} object that represents the input stream to write to the
* block blob.
* @param length A long which represents the length, in bytes, of the stream data,
@ -678,7 +679,7 @@ List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions
* @throws IOException If an I/O error occurred.
* @throws StorageException If a storage service error occurred.
*/
void uploadBlock(String blockId, InputStream sourceStream,
void uploadBlock(String blockId, AccessCondition accessCondition, InputStream sourceStream,
long length, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException;

View File

@ -277,7 +277,7 @@ public CloudBlobWrapper getBlockBlobReference(String relativePath)
return new CloudBlockBlobWrapperImpl(container.getBlockBlobReference(relativePath));
}
@Override
public CloudBlobWrapper getPageBlobReference(String relativePath)
throws URISyntaxException, StorageException {
@ -286,7 +286,7 @@ public CloudBlobWrapper getPageBlobReference(String relativePath)
}
}
abstract static class CloudBlobWrapperImpl implements CloudBlobWrapper {
private final CloudBlob blob;
@ -441,10 +441,10 @@ public void downloadRange(long offset, long length, OutputStream outStream,
@Override
public SelfRenewingLease acquireLease() throws StorageException {
return new SelfRenewingLease(this);
return new SelfRenewingLease(this, false);
}
}
//
// CloudBlockBlobWrapperImpl
@ -479,10 +479,10 @@ public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequest
}
@Override
public void uploadBlock(String blockId, InputStream sourceStream,
public void uploadBlock(String blockId, AccessCondition accessCondition, InputStream sourceStream,
long length, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException {
((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, accessCondition, options, opContext);
}
@Override

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Support the Syncable interface on top of a DataOutputStream.
@ -38,6 +39,16 @@ public SyncableDataOutputStream(OutputStream out) {
super(out);
}
/**
* Get a reference to the wrapped output stream.
*
* @return the underlying output stream
*/
@InterfaceAudience.LimitedPrivate({"HDFS"})
public OutputStream getOutStream() {
return out;
}
@Override
public boolean hasCapability(String capability) {
if (out instanceof StreamCapabilities) {

View File

@ -153,6 +153,40 @@ line argument:
```
### Block Blob with Compaction Support and Configuration
Block blobs are the default kind of blob and are good for most big-data use
cases. However, block blobs have strict limit of 50,000 blocks per blob.
To prevent reaching the limit WASB, by default, does not upload new block to
the service after every `hflush()` or `hsync()`.
For most of the cases, combining data from multiple `write()` calls in
blocks of 4Mb is a good optimization. But, in others cases, like HBase log files,
every call to `hflush()` or `hsync()` must upload the data to the service.
Block blobs with compaction upload the data to the cloud service after every
`hflush()`/`hsync()`. To mitigate the limit of 50000 blocks, `hflush()
`/`hsync()` runs once compaction process, if number of blocks in the blob
is above 32,000.
Block compaction search and replaces a sequence of small blocks with one big
block. That means there is associated cost with block compaction: reading
small blocks back to the client and writing it again as one big block.
In order to have the files you create be block blobs with block compaction
enabled, the client must set the configuration variable
`fs.azure.block.blob.with.compaction.dir` to a comma-separated list of
folder names.
For example:
```xml
<property>
<name>fs.azure.block.blob.with.compaction.dir</name>
<value>/hbase/WALs,/data/myblobfiles</value>
</property>
```
### Page Blob Support and Configuration
The Azure Blob Storage interface for Hadoop supports two kinds of blobs,

View File

@ -551,7 +551,8 @@ public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequest
throw new UnsupportedOperationException("downloadBlockList not used in Mock Tests");
}
@Override
public void uploadBlock(String blockId, InputStream sourceStream,
public void uploadBlock(String blockId, AccessCondition accessCondition,
InputStream sourceStream,
long length, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException {
throw new UnsupportedOperationException("uploadBlock not used in Mock Tests");

View File

@ -107,7 +107,8 @@ public void run() {
//
outputStream = writerStorageAccount.getStore().storefile(
key,
new PermissionStatus("", "", FsPermission.getDefault()));
new PermissionStatus("", "", FsPermission.getDefault()),
key);
Arrays.fill(dataBlockWrite, (byte) (i % 256));
for (int j = 0; j < NUMBER_OF_BLOCKS; j++) {
@ -141,7 +142,8 @@ public void testReadOOBWrites() throws Exception {
// reading. This eliminates the race between the reader and writer threads.
OutputStream outputStream = testAccount.getStore().storefile(
"WASB_String.txt",
new PermissionStatus("", "", FsPermission.getDefault()));
new PermissionStatus("", "", FsPermission.getDefault()),
"WASB_String.txt");
Arrays.fill(dataBlockWrite, (byte) 255);
for (int i = 0; i < NUMBER_OF_BLOCKS; i++) {
outputStream.write(dataBlockWrite);

View File

@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import com.microsoft.azure.storage.blob.BlockEntry;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.List;
/**
* Test class that runs WASB block compaction process for block blobs.
*/
public class TestNativeAzureFileSystemBlockCompaction extends AbstractWasbTestBase {
private static final String TEST_FILE = "/user/active/test.dat";
private static final Path TEST_PATH = new Path(TEST_FILE);
private static final String TEST_FILE_NORMAL = "/user/normal/test.dat";
private static final Path TEST_PATH_NORMAL = new Path(TEST_FILE_NORMAL);
private AzureBlobStorageTestAccount testAccount = null;
@Before
public void setUp() throws Exception {
super.setUp();
testAccount = createTestAccount();
fs = testAccount.getFileSystem();
Configuration conf = fs.getConf();
conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, true);
conf.set(AzureNativeFileSystemStore.KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES, "/user/active");
URI uri = fs.getUri();
fs.initialize(uri, conf);
}
/*
* Helper method that creates test data of size provided by the
* "size" parameter.
*/
private static byte[] getTestData(int size) {
byte[] testData = new byte[size];
System.arraycopy(RandomStringUtils.randomAlphabetic(size).getBytes(), 0, testData, 0, size);
return testData;
}
@Override
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
return AzureBlobStorageTestAccount.create();
}
private BlockBlobAppendStream getBlockBlobAppendStream(FSDataOutputStream appendStream) {
SyncableDataOutputStream dataOutputStream = null;
if (appendStream.getWrappedStream() instanceof NativeAzureFileSystem.NativeAzureFsOutputStream) {
NativeAzureFileSystem.NativeAzureFsOutputStream fsOutputStream =
(NativeAzureFileSystem.NativeAzureFsOutputStream) appendStream.getWrappedStream();
dataOutputStream = (SyncableDataOutputStream) fsOutputStream.getOutStream();
}
if (appendStream.getWrappedStream() instanceof SyncableDataOutputStream) {
dataOutputStream = (SyncableDataOutputStream) appendStream.getWrappedStream();
}
Assert.assertNotNull("Did not recognize " + dataOutputStream,
dataOutputStream);
return (BlockBlobAppendStream) dataOutputStream.getOutStream();
}
private void verifyBlockList(BlockBlobAppendStream blockBlobStream,
int[] testData) throws Throwable {
List<BlockEntry> blockList = blockBlobStream.getBlockList();
Assert.assertEquals("Block list length", testData.length, blockList.size());
int i = 0;
for (BlockEntry block: blockList) {
Assert.assertTrue(block.getSize() == testData[i++]);
}
}
private void appendBlockList(FSDataOutputStream fsStream,
ByteArrayOutputStream memStream,
int[] testData) throws Throwable {
for (int d: testData) {
byte[] data = getTestData(d);
memStream.write(data);
fsStream.write(data);
}
fsStream.hflush();
}
@Test
public void testCompactionDisabled() throws Throwable {
try (FSDataOutputStream appendStream = fs.create(TEST_PATH_NORMAL)) {
// testing new file
SyncableDataOutputStream dataOutputStream = null;
OutputStream wrappedStream = appendStream.getWrappedStream();
if (wrappedStream instanceof NativeAzureFileSystem.NativeAzureFsOutputStream) {
NativeAzureFileSystem.NativeAzureFsOutputStream fsOutputStream =
(NativeAzureFileSystem.NativeAzureFsOutputStream) wrappedStream;
dataOutputStream = (SyncableDataOutputStream) fsOutputStream.getOutStream();
} else if (wrappedStream instanceof SyncableDataOutputStream) {
dataOutputStream = (SyncableDataOutputStream) wrappedStream;
} else {
Assert.fail("Unable to determine type of " + wrappedStream
+ " class of " + wrappedStream.getClass());
}
Assert.assertFalse("Data output stream is a BlockBlobAppendStream: "
+ dataOutputStream,
dataOutputStream.getOutStream() instanceof BlockBlobAppendStream);
}
}
@Test
public void testCompaction() throws Throwable {
final int n2 = 2;
final int n4 = 4;
final int n10 = 10;
final int n12 = 12;
final int n14 = 14;
final int n16 = 16;
final int maxBlockSize = 16;
final int compactionBlockCount = 4;
ByteArrayOutputStream memStream = new ByteArrayOutputStream();
try (FSDataOutputStream appendStream = fs.create(TEST_PATH)) {
// test new file
BlockBlobAppendStream blockBlobStream = getBlockBlobAppendStream(appendStream);
blockBlobStream.setMaxBlockSize(maxBlockSize);
blockBlobStream.setCompactionBlockCount(compactionBlockCount);
appendBlockList(appendStream, memStream, new int[]{n2});
verifyBlockList(blockBlobStream, new int[]{n2});
appendStream.hflush();
verifyBlockList(blockBlobStream, new int[]{n2});
appendBlockList(appendStream, memStream, new int[]{n4});
verifyBlockList(blockBlobStream, new int[]{n2, n4});
appendStream.hsync();
verifyBlockList(blockBlobStream, new int[]{n2, n4});
appendBlockList(appendStream, memStream, new int[]{n4});
verifyBlockList(blockBlobStream, new int[]{n2, n4, n4});
appendBlockList(appendStream, memStream, new int[]{n4});
verifyBlockList(blockBlobStream, new int[]{n2, n4, n4, n4});
appendBlockList(appendStream, memStream, new int[]{n4});
verifyBlockList(blockBlobStream, new int[]{n14, n4});
appendBlockList(appendStream, memStream, new int[]{n4});
verifyBlockList(blockBlobStream, new int[]{n14, n4, n4});
appendBlockList(appendStream, memStream, new int[]{n4});
verifyBlockList(blockBlobStream, new int[]{n14, n4, n4, n4});
appendBlockList(appendStream, memStream, new int[]{n2, n4, n4});
verifyBlockList(blockBlobStream, new int[]{n14, n12, n10});
appendBlockList(appendStream, memStream, new int[]{n4});
verifyBlockList(blockBlobStream, new int[]{n14, n12, n10, n4});
appendBlockList(appendStream, memStream,
new int[]{n4, n4, n4, n4});
verifyBlockList(blockBlobStream,
new int[]{n14, n12, n14, n16});
appendBlockList(appendStream, memStream,
new int[]{n4, n4, n4, n4, n4});
verifyBlockList(blockBlobStream,
new int[]{n14, n12, n14, n16, n16, n4});
appendBlockList(appendStream, memStream,
new int[]{n4});
verifyBlockList(blockBlobStream,
new int[]{n14, n12, n14, n16, n16, n4, n4});
appendBlockList(appendStream, memStream,
new int[]{n4});
verifyBlockList(blockBlobStream,
new int[]{n14, n12, n14, n16, n16, n4, n4, n4});
appendBlockList(appendStream, memStream,
new int[]{n4});
verifyBlockList(blockBlobStream,
new int[]{n14, n12, n14, n16, n16, n4, n4, n4, n4});
appendBlockList(appendStream, memStream, new int[]{n4});
appendStream.close();
ContractTestUtils.verifyFileContents(fs, TEST_PATH, memStream.toByteArray());
}
try (FSDataOutputStream appendStream = fs.append(TEST_PATH)) {
// test existing file
BlockBlobAppendStream blockBlobStream = getBlockBlobAppendStream(appendStream);
blockBlobStream.setMaxBlockSize(maxBlockSize);
blockBlobStream.setCompactionBlockCount(compactionBlockCount);
appendBlockList(appendStream, memStream, new int[]{n4});
verifyBlockList(blockBlobStream,
new int[]{n14, n12, n14, n16, n16, n16, n4, n4});
appendBlockList(appendStream, memStream, new int[]{n4});
verifyBlockList(blockBlobStream,
new int[]{n14, n12, n14, n16, n16, n16, n4, n4, n4});
appendBlockList(appendStream, memStream, new int[]{n4});
verifyBlockList(blockBlobStream,
new int[]{n14, n12, n14, n16, n16, n16, n4, n4, n4, n4});
appendBlockList(appendStream, memStream, new int[]{n4});
verifyBlockList(blockBlobStream,
new int[]{n14, n12, n14, n16, n16, n16, n16, n4});
appendStream.close();
ContractTestUtils.verifyFileContents(fs, TEST_PATH, memStream.toByteArray());
}
}
}

View File

@ -23,3 +23,4 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t]: %c{2} (%F:%M(%L)) - %m%n
log4j.logger.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor=DEBUG
log4j.logger.org.apache.hadoop.fs.azure.BlockBlobAppendStream=DEBUG