HADOOP-17058. ABFS: Support for AppendBlob in Hadoop ABFS Driver

- Contributed by Ishani Ahuja
This commit is contained in:
ishaniahuja 2020-07-05 01:55:14 +05:30 committed by Thomas Marquardt
parent 7c9b459786
commit f24e2ec487
No known key found for this signature in database
GPG Key ID: AEB30C9E78868287
21 changed files with 714 additions and 42 deletions

View File

@ -173,6 +173,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES) DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
private String azureAtomicDirs; private String azureAtomicDirs;
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APPEND_BLOB_KEY,
DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
private String azureAppendBlobDirs;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION) DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
private boolean createRemoteFileSystemDuringInitialization; private boolean createRemoteFileSystemDuringInitialization;
@ -561,6 +565,10 @@ public String getAzureAtomicRenameDirs() {
return this.azureAtomicDirs; return this.azureAtomicDirs;
} }
public String getAppendBlobDirs() {
return this.azureAppendBlobDirs;
}
public boolean getCreateRemoteFileSystemDuringInitialization() { public boolean getCreateRemoteFileSystemDuringInitialization() {
// we do not support creating the filesystem when AuthType is SAS // we do not support creating the filesystem when AuthType is SAS
return this.createRemoteFileSystemDuringInitialization return this.createRemoteFileSystemDuringInitialization

View File

@ -62,6 +62,7 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
@ -146,6 +147,11 @@ public class AzureBlobFileSystemStore implements Closeable {
private final IdentityTransformerInterface identityTransformer; private final IdentityTransformerInterface identityTransformer;
private final AbfsPerfTracker abfsPerfTracker; private final AbfsPerfTracker abfsPerfTracker;
/**
* The set of directories where we should store files as append blobs.
*/
private Set<String> appendBlobDirSet;
public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
Configuration configuration, Configuration configuration,
AbfsCounters abfsCounters) throws IOException { AbfsCounters abfsCounters) throws IOException {
@ -197,6 +203,23 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
throw new IOException(e); throw new IOException(e);
} }
LOG.trace("IdentityTransformer init complete"); LOG.trace("IdentityTransformer init complete");
// Extract the directories that should contain append blobs
String appendBlobDirs = abfsConfiguration.getAppendBlobDirs();
if (appendBlobDirs.trim().isEmpty()) {
this.appendBlobDirSet = new HashSet<String>();
} else {
this.appendBlobDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA)));
}
}
/**
* Checks if the given key in Azure Storage should be stored as a page
* blob instead of block blob.
*/
public boolean isAppendBlobKey(String key) {
return isKeyForDirectorySet(key, appendBlobDirSet);
} }
/** /**
@ -432,10 +455,15 @@ public OutputStream createFile(final Path path,
isNamespaceEnabled); isNamespaceEnabled);
String relativePath = getRelativePath(path); String relativePath = getRelativePath(path);
boolean isAppendBlob = false;
if (isAppendBlobKey(path.toString())) {
isAppendBlob = true;
}
final AbfsRestOperation op = client.createPath(relativePath, true, overwrite, final AbfsRestOperation op = client.createPath(relativePath, true, overwrite,
isNamespaceEnabled ? getOctalNotation(permission) : null, isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null); isNamespaceEnabled ? getOctalNotation(umask) : null,
isAppendBlob);
perfInfo.registerResult(op.getResult()).registerSuccess(true); perfInfo.registerResult(op.getResult()).registerSuccess(true);
return new AbfsOutputStream( return new AbfsOutputStream(
@ -443,16 +471,21 @@ public OutputStream createFile(final Path path,
statistics, statistics,
relativePath, relativePath,
0, 0,
populateAbfsOutputStreamContext()); populateAbfsOutputStreamContext(isAppendBlob));
} }
} }
private AbfsOutputStreamContext populateAbfsOutputStreamContext() { private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) {
int bufferSize = abfsConfiguration.getWriteBufferSize();
if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
}
return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds()) return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
.withWriteBufferSize(abfsConfiguration.getWriteBufferSize()) .withWriteBufferSize(bufferSize)
.enableFlush(abfsConfiguration.isFlushEnabled()) .enableFlush(abfsConfiguration.isFlushEnabled())
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled()) .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl()) .withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
.withAppendBlob(isAppendBlob)
.build(); .build();
} }
@ -469,7 +502,7 @@ public void createDirectory(final Path path, final FsPermission permission, fina
final AbfsRestOperation op = client.createPath(getRelativePath(path), false, true, final AbfsRestOperation op = client.createPath(getRelativePath(path), false, true,
isNamespaceEnabled ? getOctalNotation(permission) : null, isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null); isNamespaceEnabled ? getOctalNotation(umask) : null, false);
perfInfo.registerResult(op.getResult()).registerSuccess(true); perfInfo.registerResult(op.getResult()).registerSuccess(true);
} }
} }
@ -545,12 +578,17 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic
perfInfo.registerSuccess(true); perfInfo.registerSuccess(true);
boolean isAppendBlob = false;
if (isAppendBlobKey(path.toString())) {
isAppendBlob = true;
}
return new AbfsOutputStream( return new AbfsOutputStream(
client, client,
statistics, statistics,
relativePath, relativePath,
offset, offset,
populateAbfsOutputStreamContext()); populateAbfsOutputStreamContext(isAppendBlob));
} }
} }

View File

@ -40,6 +40,7 @@ public final class AbfsHttpConstants {
public static final String CHECK_ACCESS = "checkAccess"; public static final String CHECK_ACCESS = "checkAccess";
public static final String GET_STATUS = "getStatus"; public static final String GET_STATUS = "getStatus";
public static final String DEFAULT_TIMEOUT = "90"; public static final String DEFAULT_TIMEOUT = "90";
public static final String APPEND_BLOB_TYPE = "appendblob";
public static final String TOKEN_VERSION = "2"; public static final String TOKEN_VERSION = "2";
public static final String JAVA_VENDOR = "java.vendor"; public static final String JAVA_VENDOR = "java.vendor";

View File

@ -65,6 +65,9 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling"; public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https"; public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key"; public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
/** Provides a config to provide comma separated path prefixes on which Appendblob based files are created
* Default is empty. **/
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth"; public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
/** Provides a config control to enable or disable ABFS Flush operations - /** Provides a config control to enable or disable ABFS Flush operations -
* HFlush and HSync. Default is true. **/ * HFlush and HSync. Default is true. **/

View File

@ -55,6 +55,7 @@ public final class FileSystemConfigurations {
// Default upload and download buffer size // Default upload and download buffer size
public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB
public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB
public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB
@ -69,6 +70,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false; public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false;
public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase"; public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase";
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1; public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
public static final boolean DEFAULT_ENABLE_FLUSH = true; public static final boolean DEFAULT_ENABLE_FLUSH = true;

View File

@ -38,6 +38,7 @@ public final class HttpQueryParams {
public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData"; public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";
public static final String QUERY_PARAM_CLOSE = "close"; public static final String QUERY_PARAM_CLOSE = "close";
public static final String QUERY_PARAM_UPN = "upn"; public static final String QUERY_PARAM_UPN = "upn";
public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
private HttpQueryParams() {} private HttpQueryParams() {}
} }

View File

@ -272,7 +272,8 @@ public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException
} }
public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite, public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
final String permission, final String umask) throws AzureBlobFileSystemException { final String permission, final String umask,
final boolean isAppendBlob) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
if (!overwrite) { if (!overwrite) {
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR)); requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
@ -288,6 +289,9 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
if (isAppendBlob) {
abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOBTYPE, APPEND_BLOB_TYPE);
}
String operation = isFile String operation = isFile
? SASTokenProvider.CREATE_FILE_OPERATION ? SASTokenProvider.CREATE_FILE_OPERATION
@ -380,7 +384,7 @@ public AbfsRestOperation renameIdempotencyCheckOp(
} }
public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset, public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset,
final int length, final String cachedSasToken) throws AzureBlobFileSystemException { final int length, final String cachedSasToken, final boolean isAppendBlob) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use // JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header. // PUT and specify the real method in the X-Http-Method-Override header.
@ -401,10 +405,46 @@ public AbfsRestOperation append(final String path, final long position, final by
HTTP_METHOD_PUT, HTTP_METHOD_PUT,
url, url,
requestHeaders, buffer, offset, length, sasTokenForReuse); requestHeaders, buffer, offset, length, sasTokenForReuse);
try {
op.execute(); op.execute();
} catch (AzureBlobFileSystemException e) {
if (isAppendBlob && appendSuccessCheckOp(op, path, (position + length))) {
final AbfsRestOperation successOp = new AbfsRestOperation(
AbfsRestOperationType.Append,
this,
HTTP_METHOD_PUT,
url,
requestHeaders, buffer, offset, length, sasTokenForReuse);
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
return successOp;
}
throw e;
}
return op; return op;
} }
// For AppendBlob its possible that the append succeeded in the backend but the request failed.
// However a retry would fail with an InvalidQueryParameterValue
// (as the current offset would be unacceptable).
// Hence, we pass/succeed the appendblob append call
// in case we are doing a retry after checking the length of the file
public boolean appendSuccessCheckOp(AbfsRestOperation op, final String path,
final long length) throws AzureBlobFileSystemException {
if ((op.isARetriedRequest())
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_BAD_REQUEST)) {
final AbfsRestOperation destStatusOp = getPathStatus(path, false);
if (destStatusOp.getResult().getStatusCode() == HttpURLConnection.HTTP_OK) {
String fileLength = destStatusOp.getResult().getResponseHeader(
HttpHeaderConfigurations.CONTENT_LENGTH);
if (length <= Long.parseLong(fileLength)) {
return true;
}
}
}
return false;
}
public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData, public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData,
boolean isClose, final String cachedSasToken) boolean isClose, final String cachedSasToken)
throws AzureBlobFileSystemException { throws AzureBlobFileSystemException {

View File

@ -69,6 +69,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
private String storageErrorMessage = ""; private String storageErrorMessage = "";
private String clientRequestId = ""; private String clientRequestId = "";
private String requestId = ""; private String requestId = "";
private String expectedAppendPos = "";
private ListResultSchema listResultSchema = null; private ListResultSchema listResultSchema = null;
// metrics // metrics
@ -126,6 +127,10 @@ public String getClientRequestId() {
return clientRequestId; return clientRequestId;
} }
public String getExpectedAppendPos() {
return expectedAppendPos;
}
public String getRequestId() { public String getRequestId() {
return requestId; return requestId;
} }
@ -154,6 +159,8 @@ public String toString() {
sb.append(statusCode); sb.append(statusCode);
sb.append(","); sb.append(",");
sb.append(storageErrorCode); sb.append(storageErrorCode);
sb.append(",");
sb.append(expectedAppendPos);
sb.append(",cid="); sb.append(",cid=");
sb.append(clientRequestId); sb.append(clientRequestId);
sb.append(",rid="); sb.append(",rid=");
@ -449,6 +456,9 @@ private void processStorageErrorResponse() {
case "message": case "message":
storageErrorMessage = fieldValue; storageErrorMessage = fieldValue;
break; break;
case "ExpectedAppendPos":
expectedAppendPos = fieldValue;
break;
default: default:
break; break;
} }

View File

@ -60,6 +60,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private boolean closed; private boolean closed;
private boolean supportFlush; private boolean supportFlush;
private boolean disableOutputStreamFlush; private boolean disableOutputStreamFlush;
private boolean isAppendBlob;
private volatile IOException lastError; private volatile IOException lastError;
private long lastFlushOffset; private long lastFlushOffset;
@ -106,6 +107,7 @@ public AbfsOutputStream(
this.supportFlush = abfsOutputStreamContext.isEnableFlush(); this.supportFlush = abfsOutputStreamContext.isEnableFlush();
this.disableOutputStreamFlush = abfsOutputStreamContext this.disableOutputStreamFlush = abfsOutputStreamContext
.isDisableOutputStreamFlush(); .isDisableOutputStreamFlush();
this.isAppendBlob = abfsOutputStreamContext.isAppendBlob();
this.lastError = null; this.lastError = null;
this.lastFlushOffset = 0; this.lastFlushOffset = 0;
this.bufferSize = abfsOutputStreamContext.getWriteBufferSize(); this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
@ -114,8 +116,11 @@ public AbfsOutputStream(
this.writeOperations = new ConcurrentLinkedDeque<>(); this.writeOperations = new ConcurrentLinkedDeque<>();
this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics(); this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
if (this.isAppendBlob) {
this.maxConcurrentRequestCount = 1;
} else {
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
}
this.threadExecutor this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount, = new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount, maxConcurrentRequestCount,
@ -309,7 +314,50 @@ private synchronized void flushInternalAsync() throws IOException {
flushWrittenBytesToServiceAsync(); flushWrittenBytesToServiceAsync();
} }
private void writeAppendBlobCurrentBufferToService() throws IOException {
if (bufferIndex == 0) {
return;
}
outputStreamStatistics.writeCurrentBuffer();
final byte[] bytes = buffer;
final int bytesLength = bufferIndex;
outputStreamStatistics.bytesToUpload(bytesLength);
buffer = byteBufferPool.getBuffer(false, bufferSize).array();
bufferIndex = 0;
final long offset = position;
position += bytesLength;
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) {
AbfsRestOperation op = client.append(path, offset, bytes, 0,
bytesLength, cachedSasToken.get(), this.isAppendBlob);
cachedSasToken.update(op.getSasToken());
outputStreamStatistics.uploadSuccessful(bytesLength);
perfInfo.registerResult(op.getResult());
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
perfInfo.registerSuccess(true);
return;
} catch (Exception ex) {
if (ex instanceof AbfsRestOperationException) {
if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ex.getMessage());
}
}
if (ex instanceof AzureBlobFileSystemException) {
ex = (AzureBlobFileSystemException) ex;
}
lastError = new IOException(ex);
throw lastError;
}
}
private synchronized void writeCurrentBufferToService() throws IOException { private synchronized void writeCurrentBufferToService() throws IOException {
if (this.isAppendBlob) {
writeAppendBlobCurrentBufferToService();
return;
}
if (bufferIndex == 0) { if (bufferIndex == 0) {
return; return;
} }
@ -336,7 +384,7 @@ public Void call() throws Exception {
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) { "writeCurrentBufferToService", "append")) {
AbfsRestOperation op = client.append(path, offset, bytes, 0, AbfsRestOperation op = client.append(path, offset, bytes, 0,
bytesLength, cachedSasToken.get()); bytesLength, cachedSasToken.get(), false);
cachedSasToken.update(op.getSasToken()); cachedSasToken.update(op.getSasToken());
perfInfo.registerResult(op.getResult()); perfInfo.registerResult(op.getResult());
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
@ -389,6 +437,11 @@ private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
private synchronized void flushWrittenBytesToServiceInternal(final long offset, private synchronized void flushWrittenBytesToServiceInternal(final long offset,
final boolean retainUncommitedData, final boolean isClose) throws IOException { final boolean retainUncommitedData, final boolean isClose) throws IOException {
// flush is called for appendblob only on close
if (this.isAppendBlob && !isClose) {
return;
}
AbfsPerfTracker tracker = client.getAbfsPerfTracker(); AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"flushWrittenBytesToServiceInternal", "flush")) { "flushWrittenBytesToServiceInternal", "flush")) {
@ -434,6 +487,10 @@ private void waitForTaskToComplete() throws IOException {
for (completed = false; completionService.poll() != null; completed = true) { for (completed = false; completionService.poll() != null; completed = true) {
// keep polling until there is no data // keep polling until there is no data
} }
// for AppendBLob, jobs are not submitted to completion service
if (isAppendBlob) {
completed = true;
}
if (!completed) { if (!completed) {
try { try {

View File

@ -31,6 +31,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
private AbfsOutputStreamStatistics streamStatistics; private AbfsOutputStreamStatistics streamStatistics;
private boolean isAppendBlob;
public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds); super(sasTokenRenewPeriodForStreamsInSeconds);
} }
@ -58,6 +60,12 @@ public AbfsOutputStreamContext withStreamStatistics(
return this; return this;
} }
public AbfsOutputStreamContext withAppendBlob(
final boolean isAppendBlob) {
this.isAppendBlob = isAppendBlob;
return this;
}
public AbfsOutputStreamContext build() { public AbfsOutputStreamContext build() {
// Validation of parameters to be done here. // Validation of parameters to be done here.
return this; return this;
@ -78,4 +86,8 @@ public boolean isDisableOutputStreamFlush() {
public AbfsOutputStreamStatistics getStreamStatistics() { public AbfsOutputStreamStatistics getStreamStatistics() {
return streamStatistics; return streamStatistics;
} }
public boolean isAppendBlob() {
return isAppendBlob;
}
} }

View File

@ -122,6 +122,10 @@ protected AbstractAbfsIntegrationTest() throws Exception {
this.testUrl = defaultUri.toString(); this.testUrl = defaultUri.toString();
abfsConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString()); abfsConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString());
abfsConfig.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true); abfsConfig.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true);
if (abfsConfig.get(FS_AZURE_TEST_APPENDBLOB_ENABLED) == "true") {
String appendblobDirs = this.testUrl + "," + abfsConfig.get(FS_AZURE_CONTRACT_TEST_URI);
rawConfig.set(FS_AZURE_APPEND_BLOB_KEY, appendblobDirs);
}
// For testing purposes, an IP address and port may be provided to override // For testing purposes, an IP address and port may be provided to override
// the host specified in the FileSystem URI. Also note that the format of // the host specified in the FileSystem URI. Also note that the format of
// the Azure Storage Service URI changes from // the Azure Storage Service URI changes from

View File

@ -81,10 +81,18 @@ public void testAbfsHttpSendStatistics() throws IOException {
* *
* bytes_sent : bytes wrote in AbfsOutputStream. * bytes_sent : bytes wrote in AbfsOutputStream.
*/ */
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
// no network calls are made for hflush in case of appendblob
connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
5, metricMap);
requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 3,
metricMap);
} else {
connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE, connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
6, metricMap); 6, metricMap);
requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 4, requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 4,
metricMap); metricMap);
}
bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT, bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
testNetworkStatsString.getBytes().length, metricMap); testNetworkStatsString.getBytes().length, metricMap);
@ -125,10 +133,18 @@ public void testAbfsHttpSendStatistics() throws IOException {
* wrote each time). * wrote each time).
* *
*/ */
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
// no network calls are made for hflush in case of appendblob
assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
connectionsMade + 1 + LARGE_OPERATIONS, metricMap);
assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS,
requestsSent + 1 + LARGE_OPERATIONS, metricMap);
} else {
assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE, assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap); connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap);
assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS,
requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap); requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap);
}
assertAbfsStatistics(AbfsStatistic.BYTES_SENT, assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length), bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
metricMap); metricMap);
@ -183,8 +199,14 @@ public void testAbfsHttpResponseStatistics() throws IOException {
* *
* bytes_received - This should be equal to bytes sent earlier. * bytes_received - This should be equal to bytes sent earlier.
*/ */
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
//for appendBlob hflush is a no-op
getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 7,
metricMap);
} else {
getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 8, getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 8,
metricMap); metricMap);
}
// Testing that bytes received is equal to bytes sent. // Testing that bytes received is equal to bytes sent.
long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName()); long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName());
bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
@ -242,8 +264,14 @@ public void testAbfsHttpResponseStatistics() throws IOException {
assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length), bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length),
metricMap); metricMap);
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
// no network calls are made for hflush in case of appendblob
assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
getResponses + 3 + LARGE_OPERATIONS, metricMap);
} else {
assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap); getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap);
}
} finally { } finally {
IOUtils.cleanupWithLogger(LOG, out, in); IOUtils.cleanupWithLogger(LOG, out, in);

View File

@ -113,6 +113,10 @@ public void testAbfsOutputStreamQueueShrink() throws IOException {
final AzureBlobFileSystem fs = getFileSystem(); final AzureBlobFileSystem fs = getFileSystem();
Path queueShrinkFilePath = path(getMethodName()); Path queueShrinkFilePath = path(getMethodName());
String testQueueShrink = "testQueue"; String testQueueShrink = "testQueue";
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(queueShrinkFilePath).toString())) {
// writeOperationsQueue is not used for appendBlob, hence queueShrink is 0
return;
}
try (AbfsOutputStream outForOneOp = createAbfsOutputStreamWithFlushEnabled( try (AbfsOutputStream outForOneOp = createAbfsOutputStreamWithFlushEnabled(
fs, queueShrinkFilePath)) { fs, queueShrinkFilePath)) {

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
@ -46,6 +47,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
public static Iterable<Object[]> sizes() { public static Iterable<Object[]> sizes() {
return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE}, return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE},
{DEFAULT_READ_BUFFER_SIZE}, {DEFAULT_READ_BUFFER_SIZE},
{APPENDBLOB_MAX_WRITE_BUFFER_SIZE},
{MAX_BUFFER_SIZE}}); {MAX_BUFFER_SIZE}});
} }
@ -70,6 +72,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
final byte[] b = new byte[2 * bufferSize]; final byte[] b = new byte[2 * bufferSize];
new Random().nextBytes(b); new Random().nextBytes(b);
try (FSDataOutputStream stream = fs.create(TEST_PATH)) { try (FSDataOutputStream stream = fs.create(TEST_PATH)) {
stream.write(b); stream.write(b);
} }

View File

@ -136,8 +136,15 @@ public void testAbfsStreamOps() throws Exception {
testReadWriteOps.getBytes().length); testReadWriteOps.getBytes().length);
} }
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(largeOperationsFile).toString())) {
// for appendblob data is already flushed, so there is more data to read.
assertTrue(String.format("The actual value of %d was not equal to the "
+ "expected value", statistics.getReadOps()),
statistics.getReadOps() == (largeValue + 3) || statistics.getReadOps() == (largeValue + 4));
} else {
//Test for 1000000 read operations //Test for 1000000 read operations
assertReadWriteOps("read", largeValue, statistics.getReadOps()); assertReadWriteOps("read", largeValue, statistics.getReadOps());
}
} finally { } finally {
IOUtils.cleanupWithLogger(LOG, inForLargeOperations, IOUtils.cleanupWithLogger(LOG, inForLargeOperations,

View File

@ -145,6 +145,9 @@ public void testTryWithResources() throws Throwable {
out.hsync(); out.hsync();
fail("Expected a failure"); fail("Expected a failure");
} catch (FileNotFoundException fnfe) { } catch (FileNotFoundException fnfe) {
//appendblob outputStream does not generate suppressed exception on close as it is
//single threaded code
if (!fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testPath).toString())) {
// the exception raised in close() must be in the caught exception's // the exception raised in close() must be in the caught exception's
// suppressed list // suppressed list
Throwable[] suppressed = fnfe.getSuppressed(); Throwable[] suppressed = fnfe.getSuppressed();
@ -156,6 +159,7 @@ public void testTryWithResources() throws Throwable {
GenericTestUtils.assertExceptionContains(fnfe.getMessage(), inner); GenericTestUtils.assertExceptionContains(fnfe.getMessage(), inner);
} }
} }
}
/** /**
* Attempts to write to the azure stream after it is closed will raise * Attempts to write to the azure stream after it is closed will raise

View File

@ -203,6 +203,9 @@ public void testWriteWithFileNotFoundException() throws Exception {
public void testFlushWithFileNotFoundException() throws Exception { public void testFlushWithFileNotFoundException() throws Exception {
final AzureBlobFileSystem fs = getFileSystem(); final AzureBlobFileSystem fs = getFileSystem();
final Path testFilePath = new Path(methodName.getMethodName()); final Path testFilePath = new Path(methodName.getMethodName());
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
return;
}
FSDataOutputStream stream = fs.create(testFilePath); FSDataOutputStream stream = fs.create(testFilePath);
assertTrue(fs.exists(testFilePath)); assertTrue(fs.exists(testFilePath));

View File

@ -49,7 +49,8 @@
public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
private static final int BASE_SIZE = 1024; private static final int BASE_SIZE = 1024;
private static final int ONE_THOUSAND = 1000; private static final int ONE_THOUSAND = 1000;
private static final int TEST_BUFFER_SIZE = 5 * ONE_THOUSAND * BASE_SIZE; //3000 KB to support appenblob too
private static final int TEST_BUFFER_SIZE = 3 * ONE_THOUSAND * BASE_SIZE;
private static final int ONE_MB = 1024 * 1024; private static final int ONE_MB = 1024 * 1024;
private static final int FLUSH_TIMES = 200; private static final int FLUSH_TIMES = 200;
private static final int THREAD_SLEEP_TIME = 1000; private static final int THREAD_SLEEP_TIME = 1000;
@ -226,11 +227,15 @@ private void testFlush(boolean disableOutputStreamFlush) throws Exception {
final Path testFilePath = path(methodName.getMethodName()); final Path testFilePath = path(methodName.getMethodName());
byte[] buffer = getRandomBytesArray(); byte[] buffer = getRandomBytesArray();
// The test case must write "fs.azure.write.request.size" bytes // The test case must write "fs.azure.write.request.size" bytes
// to the stream in order for the data to be uploaded to storage. // to the stream in order for the data to be uploaded to storage.
assertEquals(fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize(), assertTrue(fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize()
buffer.length); <= buffer.length);
boolean isAppendBlob = true;
if (!fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
isAppendBlob = false;
}
try (FSDataOutputStream stream = fs.create(testFilePath)) { try (FSDataOutputStream stream = fs.create(testFilePath)) {
stream.write(buffer); stream.write(buffer);
@ -245,7 +250,8 @@ private void testFlush(boolean disableOutputStreamFlush) throws Exception {
// Verify that the data can be read if disableOutputStreamFlush is // Verify that the data can be read if disableOutputStreamFlush is
// false; and otherwise cannot be read. // false; and otherwise cannot be read.
validate(fs.open(testFilePath), buffer, !disableOutputStreamFlush); /* For Appendlob flush is not needed to update data on server */
validate(fs.open(testFilePath), buffer, !disableOutputStreamFlush || isAppendBlob);
} }
} }
@ -267,10 +273,15 @@ public void testHflushWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem(); final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray(); byte[] buffer = getRandomBytesArray();
final Path testFilePath = path(methodName.getMethodName()); final Path testFilePath = path(methodName.getMethodName());
boolean isAppendBlob = false;
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
isAppendBlob = true;
}
try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) { try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
stream.hflush(); stream.hflush();
validate(fs, testFilePath, buffer, false); /* For Appendlob flush is not needed to update data on server */
validate(fs, testFilePath, buffer, isAppendBlob);
} }
} }
@ -322,9 +333,14 @@ public void testHsyncWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem(); final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray(); byte[] buffer = getRandomBytesArray();
final Path testFilePath = path(methodName.getMethodName()); final Path testFilePath = path(methodName.getMethodName());
boolean isAppendBlob = false;
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
isAppendBlob = true;
}
try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) { try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
stream.hsync(); stream.hsync();
validate(fs, testFilePath, buffer, false); /* For Appendlob flush is not needed to update data on server */
validate(fs, testFilePath, buffer, isAppendBlob);
} }
} }

View File

@ -27,6 +27,7 @@ public final class TestConfigurationKeys {
public static final String FS_AZURE_ACCOUNT_KEY = "fs.azure.account.key"; public static final String FS_AZURE_ACCOUNT_KEY = "fs.azure.account.key";
public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs"; public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs";
public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled"; public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled";
public static final String FS_AZURE_TEST_APPENDBLOB_ENABLED = "fs.azure.test.appendblob.enabled";
public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID = "fs.azure.account.oauth2.contributor.client.id"; public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID = "fs.azure.account.oauth2.contributor.client.id";
public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET = "fs.azure.account.oauth2.contributor.client.secret"; public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET = "fs.azure.account.oauth2.contributor.client.secret";

View File

@ -0,0 +1,430 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Random;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.conf.Configuration;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.anyLong;
import static org.assertj.core.api.Assertions.assertThat;
public final class TestAbfsOutputStream {
private static final int BUFFER_SIZE = 4096;
private static final int WRITE_SIZE = 1000;
private static final String PATH = "~/testpath";
private final String globalKey = "fs.azure.configuration";
private final String accountName1 = "account1";
private final String accountKey1 = globalKey + "." + accountName1;
private final String accountValue1 = "one";
private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferSize,
boolean isFlushEnabled,
boolean disableOutputStreamFlush,
boolean isAppendBlob) {
return new AbfsOutputStreamContext(2)
.withWriteBufferSize(writeBufferSize)
.enableFlush(isFlushEnabled)
.disableOutputStreamFlush(disableOutputStreamFlush)
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
.withAppendBlob(isAppendBlob)
.build();
}
/**
* The test verifies OutputStream shortwrite case(2000bytes write followed by flush, hflush, hsync) is making correct HTTP calls to the server
*/
@Test
public void verifyShortWriteRequest() throws Exception {
AbfsClient client = mock(AbfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
final byte[] b = new byte[WRITE_SIZE];
new Random().nextBytes(b);
out.write(b);
out.hsync();
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
final byte[] b1 = new byte[2*WRITE_SIZE];
new Random().nextBytes(b1);
out.write(b1);
out.flush();
out.hflush();
out.hsync();
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
acSASToken.capture(), acAppendBlobAppend.capture());
assertThat(Arrays.asList(PATH, PATH)).describedAs("Path of the requests").isEqualTo(acString.getAllValues());
assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(WRITE_SIZE))).describedAs("Write Position").isEqualTo(acLong.getAllValues());
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
assertThat(Arrays.asList(WRITE_SIZE, 2*WRITE_SIZE)).describedAs("Buffer length").isEqualTo(acBufferLength.getAllValues());
}
/**
* The test verifies OutputStream Write of WRITE_SIZE(1000 bytes) followed by a close is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequest() throws Exception {
AbfsClient client = mock(AbfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
final byte[] b = new byte[WRITE_SIZE];
new Random().nextBytes(b);
for (int i = 0; i < 5; i++) {
out.write(b);
}
out.close();
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
acSASToken.capture(), acAppendBlobAppend.capture());
assertThat(Arrays.asList(PATH, PATH)).describedAs("Path").isEqualTo(acString.getAllValues());
assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(new HashSet<Long>(
acLong.getAllValues()));
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
assertThat(new HashSet<Integer>(Arrays.asList(BUFFER_SIZE, 5*WRITE_SIZE-BUFFER_SIZE))).describedAs("Buffer Length").isEqualTo(new HashSet<Integer>(
acBufferLength.getAllValues()));
ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
acFlushSASToken.capture());
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues());
assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
}
/**
* The test verifies OutputStream Write of BUFFER_SIZE(4KB) followed by a close is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequestOfBufferSizeAndClose() throws Exception {
AbfsClient client = mock(AbfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
when(op.getSasToken()).thenReturn("testToken");
when(op.getResult()).thenReturn(httpOp);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
final byte[] b = new byte[BUFFER_SIZE];
new Random().nextBytes(b);
for (int i = 0; i < 2; i++) {
out.write(b);
}
out.close();
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
acSASToken.capture(), acAppendBlobAppend.capture());
assertThat(Arrays.asList(PATH, PATH)).describedAs("path").isEqualTo(acString.getAllValues());
assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(new HashSet<Long>(
acLong.getAllValues()));
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
acFlushSASToken.capture());
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues());
assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
}
/**
* The test verifies OutputStream Write of BUFFER_SIZE(4KB) is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequestOfBufferSize() throws Exception {
AbfsClient client = mock(AbfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
when(op.getSasToken()).thenReturn("testToken");
when(op.getResult()).thenReturn(httpOp);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
final byte[] b = new byte[BUFFER_SIZE];
new Random().nextBytes(b);
for (int i = 0; i < 2; i++) {
out.write(b);
}
Thread.sleep(1000);
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
acSASToken.capture(), acAppendBlobAppend.capture());
assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues());
assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position in file").isEqualTo(
new HashSet<Long>(acLong.getAllValues()));
assertThat(Arrays.asList(0, 0)).describedAs("buffer offset").isEqualTo(acBufferOffset.getAllValues());
assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("buffer length").isEqualTo(acBufferLength.getAllValues());
}
/**
* The test verifies OutputStream Write of BUFFER_SIZE(4KB) on a AppendBlob based stream is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception {
AbfsClient client = mock(AbfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true));
final byte[] b = new byte[BUFFER_SIZE];
new Random().nextBytes(b);
for (int i = 0; i < 2; i++) {
out.write(b);
}
Thread.sleep(1000);
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
acSASToken.capture(), acAppendBlobAppend.capture());
assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues());
assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE))).describedAs("File Position").isEqualTo(acLong.getAllValues());
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
assertThat(Arrays.asList(true, true)).describedAs("is AppendBlob Append").isEqualTo(acAppendBlobAppend.getAllValues());
}
/**
* The test verifies OutputStream Write of BUFFER_SIZE(4KB) followed by a hflush call is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception {
AbfsClient client = mock(AbfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
final byte[] b = new byte[BUFFER_SIZE];
new Random().nextBytes(b);
for (int i = 0; i < 2; i++) {
out.write(b);
}
out.hflush();
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
acSASToken.capture(), acAppendBlobAppend.capture());
assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues());
assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("File Position").isEqualTo(
new HashSet<Long>(acLong.getAllValues()));
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
acFlushSASToken.capture());
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues());
assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
assertThat(Arrays.asList(false)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
}
/**
* The test verifies OutputStream Write of BUFFER_SIZE(4KB) followed by a flush call is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception {
AbfsClient client = mock(AbfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
final byte[] b = new byte[BUFFER_SIZE];
new Random().nextBytes(b);
for (int i = 0; i < 2; i++) {
out.write(b);
}
Thread.sleep(1000);
out.flush();
Thread.sleep(1000);
ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
acSASToken.capture(), acAppendBlobAppend.capture());
assertThat(Arrays.asList(PATH, PATH)).describedAs("path").isEqualTo(acString.getAllValues());
assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(
new HashSet<Long>(acLong.getAllValues()));
assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
}
}