HADOOP-17770 WASB : Support disabling buffered reads in positional reads (#3149)

This commit is contained in:
Anoop Sam John 2021-07-13 10:37:12 +05:30 committed by GitHub
parent c81f82e21d
commit 177d906a67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 200 additions and 11 deletions

View File

@ -41,6 +41,7 @@
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -241,6 +242,16 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
*/ */
public static final String KEY_ENABLE_FLAT_LISTING = "fs.azure.flatlist.enable"; public static final String KEY_ENABLE_FLAT_LISTING = "fs.azure.flatlist.enable";
/**
* Optional config to enable a lock free pread which will bypass buffer in
* BlockBlobInputStream.
* This is not a config which can be set at cluster level. It can be used as
* an option on FutureDataInputStreamBuilder.
* @see FileSystem#openFile(org.apache.hadoop.fs.Path)
*/
public static final String FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE =
"fs.azure.block.blob.buffered.pread.disable";
/** /**
* The set of directories where we should apply atomic folder rename * The set of directories where we should apply atomic folder rename
* synchronized with createNonRecursive. * synchronized with createNonRecursive.
@ -1591,8 +1602,8 @@ private OutputStream openOutputStream(final CloudBlobWrapper blob)
* Opens a new input stream for the given blob (page or block blob) * Opens a new input stream for the given blob (page or block blob)
* to read its data. * to read its data.
*/ */
private InputStream openInputStream(CloudBlobWrapper blob) private InputStream openInputStream(CloudBlobWrapper blob,
throws StorageException, IOException { Optional<Configuration> options) throws StorageException, IOException {
if (blob instanceof CloudBlockBlobWrapper) { if (blob instanceof CloudBlockBlobWrapper) {
LOG.debug("Using stream seek algorithm {}", inputStreamVersion); LOG.debug("Using stream seek algorithm {}", inputStreamVersion);
switch(inputStreamVersion) { switch(inputStreamVersion) {
@ -1600,9 +1611,13 @@ private InputStream openInputStream(CloudBlobWrapper blob)
return blob.openInputStream(getDownloadOptions(), return blob.openInputStream(getDownloadOptions(),
getInstrumentedContext(isConcurrentOOBAppendAllowed())); getInstrumentedContext(isConcurrentOOBAppendAllowed()));
case 2: case 2:
boolean bufferedPreadDisabled = options.map(c -> c
.getBoolean(FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE, false))
.orElse(false);
return new BlockBlobInputStream((CloudBlockBlobWrapper) blob, return new BlockBlobInputStream((CloudBlockBlobWrapper) blob,
getDownloadOptions(), getDownloadOptions(),
getInstrumentedContext(isConcurrentOOBAppendAllowed())); getInstrumentedContext(isConcurrentOOBAppendAllowed()),
bufferedPreadDisabled);
default: default:
throw new IOException("Unknown seek algorithm: " + inputStreamVersion); throw new IOException("Unknown seek algorithm: " + inputStreamVersion);
} }
@ -2290,6 +2305,12 @@ public InputStream retrieve(String key) throws AzureException, IOException {
@Override @Override
public InputStream retrieve(String key, long startByteOffset) public InputStream retrieve(String key, long startByteOffset)
throws AzureException, IOException { throws AzureException, IOException {
return retrieve(key, startByteOffset, Optional.empty());
}
@Override
public InputStream retrieve(String key, long startByteOffset,
Optional<Configuration> options) throws AzureException, IOException {
try { try {
// Check if a session exists, if not create a session with the // Check if a session exists, if not create a session with the
// Azure storage server. // Azure storage server.
@ -2301,7 +2322,7 @@ public InputStream retrieve(String key, long startByteOffset)
} }
checkContainer(ContainerAccessType.PureRead); checkContainer(ContainerAccessType.PureRead);
InputStream inputStream = openInputStream(getBlobReference(key)); InputStream inputStream = openInputStream(getBlobReference(key), options);
if (startByteOffset > 0) { if (startByteOffset > 0) {
// Skip bytes and ignore return value. This is okay // Skip bytes and ignore return value. This is okay
// because if you try to skip too far you will be positioned // because if you try to skip too far you will be positioned
@ -2852,7 +2873,7 @@ public void rename(String srcKey, String dstKey, boolean acquireLease,
OutputStream opStream = null; OutputStream opStream = null;
try { try {
if (srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){ if (srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){
ipStream = openInputStream(srcBlob); ipStream = openInputStream(srcBlob, Optional.empty());
opStream = openOutputStream(dstBlob); opStream = openOutputStream(dstBlob);
byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE]; byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE];
int len; int len;

View File

@ -28,7 +28,7 @@
import com.microsoft.azure.storage.blob.BlobRequestOptions; import com.microsoft.azure.storage.blob.BlobRequestOptions;
import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper; import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
/** /**
@ -36,10 +36,11 @@
* random access and seek. Random access performance is improved by several * random access and seek. Random access performance is improved by several
* orders of magnitude. * orders of magnitude.
*/ */
final class BlockBlobInputStream extends InputStream implements Seekable { final class BlockBlobInputStream extends FSInputStream {
private final CloudBlockBlobWrapper blob; private final CloudBlockBlobWrapper blob;
private final BlobRequestOptions options; private final BlobRequestOptions options;
private final OperationContext opContext; private final OperationContext opContext;
private final boolean bufferedPreadDisabled;
private InputStream blobInputStream = null; private InputStream blobInputStream = null;
private int minimumReadSizeInBytes = 0; private int minimumReadSizeInBytes = 0;
private long streamPositionAfterLastRead = -1; private long streamPositionAfterLastRead = -1;
@ -62,12 +63,13 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
* @param opContext the blob operation context. * @param opContext the blob operation context.
* @throws IOException IO failure * @throws IOException IO failure
*/ */
BlockBlobInputStream(CloudBlockBlobWrapper blob, BlockBlobInputStream(CloudBlockBlobWrapper blob, BlobRequestOptions options,
BlobRequestOptions options, OperationContext opContext, boolean bufferedPreadDisabled)
OperationContext opContext) throws IOException { throws IOException {
this.blob = blob; this.blob = blob;
this.options = options; this.options = options;
this.opContext = opContext; this.opContext = opContext;
this.bufferedPreadDisabled = bufferedPreadDisabled;
this.minimumReadSizeInBytes = blob.getStreamMinimumReadSizeInBytes(); this.minimumReadSizeInBytes = blob.getStreamMinimumReadSizeInBytes();
@ -263,6 +265,39 @@ private int doNetworkRead(byte[] buffer, int offset, int len)
} }
} }
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
synchronized (this) {
checkState();
}
if (!bufferedPreadDisabled) {
// This will do a seek + read in which the streamBuffer will get used.
return super.read(position, buffer, offset, length);
}
validatePositionedReadArgs(position, buffer, offset, length);
if (length == 0) {
return 0;
}
if (position >= streamLength) {
throw new EOFException("position is beyond stream capacity");
}
MemoryOutputStream os = new MemoryOutputStream(buffer, offset, length);
long bytesToRead = Math.min(minimumReadSizeInBytes,
Math.min(os.capacity(), streamLength - position));
try {
blob.downloadRange(position, bytesToRead, os, options, opContext);
} catch (StorageException e) {
throw new IOException(e);
}
int bytesRead = os.size();
if (bytesRead == 0) {
// This may happen if the blob was modified after the length was obtained.
throw new EOFException("End of stream reached unexpectedly.");
}
return bytesRead;
}
/** /**
* Reads up to <code>len</code> bytes of data from the input stream into an * Reads up to <code>len</code> bytes of data from the input stream into an
* array of bytes. * array of bytes.

View File

@ -33,11 +33,14 @@
import java.util.EnumSet; import java.util.EnumSet;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Stack; import java.util.Stack;
import java.util.HashMap; import java.util.HashMap;
@ -61,6 +64,7 @@
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.Syncable;
@ -70,6 +74,8 @@
import org.apache.hadoop.fs.azure.security.Constants; import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager; import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager;
import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager; import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager;
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
@ -79,6 +85,7 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -915,6 +922,43 @@ public synchronized int read(byte[] b, int off, int len) throws FileNotFoundExce
} }
} }
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
// SpotBugs reports bug type IS2_INCONSISTENT_SYNC here.
// This report is not valid here.
// 'this.in' is instance of BlockBlobInputStream and read(long, byte[], int, int)
// calls it's Super class method when 'fs.azure.block.blob.buffered.pread.disable'
// is configured false. Super class FSInputStream's implementation is having
// proper synchronization.
// When 'fs.azure.block.blob.buffered.pread.disable' is true, we want a lock free
// implementation of blob read. Here we don't use any of the InputStream's
// shared resource (buffer) and also don't change any cursor position etc.
// So its safe to go with unsynchronized way of read.
if (in instanceof PositionedReadable) {
try {
int result = ((PositionedReadable) this.in).read(position, buffer,
offset, length);
if (null != statistics && result > 0) {
statistics.incrementBytesRead(result);
}
return result;
} catch (IOException e) {
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
LOG.error("Encountered Storage Exception for read on Blob : {}"
+ " Exception details: {} Error Code : {}",
key, e, ((StorageException) innerException).getErrorCode());
if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
}
throw e;
}
}
return super.read(position, buffer, offset, length);
}
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
if (!closed) { if (!closed) {
@ -3043,6 +3087,12 @@ public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws I
@Override @Override
public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundException, IOException { public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundException, IOException {
return open(f, bufferSize, Optional.empty());
}
private FSDataInputStream open(Path f, int bufferSize,
Optional<Configuration> options)
throws FileNotFoundException, IOException {
LOG.debug("Opening file: {}", f.toString()); LOG.debug("Opening file: {}", f.toString());
@ -3077,7 +3127,7 @@ public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundExcepti
InputStream inputStream; InputStream inputStream;
try { try {
inputStream = store.retrieve(key); inputStream = store.retrieve(key, 0, options);
} catch(Exception ex) { } catch(Exception ex) {
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
@ -3094,6 +3144,18 @@ public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundExcepti
new NativeAzureFsInputStream(inputStream, key, meta.getLen()), bufferSize)); new NativeAzureFsInputStream(inputStream, key, meta.getLen()), bufferSize));
} }
@Override
protected CompletableFuture<FSDataInputStream> openFileWithOptions(Path path,
OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
Collections.emptySet(),
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(), () ->
open(path, parameters.getBufferSize(), Optional.of(parameters.getOptions())));
}
@Override @Override
public boolean rename(Path src, Path dst) throws FileNotFoundException, IOException { public boolean rename(Path src, Path dst) throws FileNotFoundException, IOException {

View File

@ -23,6 +23,7 @@
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.util.Date; import java.util.Date;
import java.util.Optional;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -50,6 +51,9 @@ void storeEmptyFolder(String key, PermissionStatus permissionStatus)
InputStream retrieve(String key, long byteRangeStart) throws IOException; InputStream retrieve(String key, long byteRangeStart) throws IOException;
InputStream retrieve(String key, long byteRangeStart,
Optional<Configuration> options) throws IOException;
DataOutputStream storefile(String keyEncoded, DataOutputStream storefile(String keyEncoded,
PermissionStatus permissionStatus, PermissionStatus permissionStatus,
String key) throws AzureException; String key) throws AzureException;

View File

@ -545,6 +545,17 @@ The maximum number of entries that that cache can hold can be customized using t
</property> </property>
``` ```
### Performance optimization configurations
`fs.azure.block.blob.buffered.pread.disable`: By default the positional read API will do a
seek and read on input stream. This read will fill the buffer cache in
BlockBlobInputStream. If this configuration is true it will skip usage of buffer and do a
lock free call for reading from blob. This optimization is very much helpful for HBase kind
of short random read over a shared InputStream instance.
Note: This is not a config which can be set at cluster level. It can be used as
an option on FutureDataInputStreamBuilder.
See FileSystem#openFile(Path path)
## Further Reading ## Further Reading
* [Testing the Azure WASB client](testing_azure.html). * [Testing the Azure WASB client](testing_azure.html).

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest; import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
import org.apache.hadoop.fs.azure.integration.AzureTestUtils; import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
@ -306,6 +307,61 @@ private void verifyConsistentReads(FSDataInputStream inputStreamV1,
assertArrayEquals("Mismatch in read data", bufferV1, bufferV2); assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
} }
@Test
public void test_202_PosReadTest() throws Exception {
assumeHugeFileExists();
FutureDataInputStreamBuilder builder = accountUsingInputStreamV2
.getFileSystem().openFile(TEST_FILE_PATH);
builder.opt(AzureNativeFileSystemStore.FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE, true);
try (
FSDataInputStream inputStreamV1
= accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);
FSDataInputStream inputStreamV2
= accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
FSDataInputStream inputStreamV2NoBuffer = builder.build().get();
) {
final int bufferSize = 4 * KILOBYTE;
byte[] bufferV1 = new byte[bufferSize];
byte[] bufferV2 = new byte[bufferSize];
byte[] bufferV2NoBuffer = new byte[bufferSize];
verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, 0,
bufferV1, bufferV2, bufferV2NoBuffer);
int pos = 2 * KILOBYTE;
verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
bufferV1, bufferV2, bufferV2NoBuffer);
pos = 10 * KILOBYTE;
verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
bufferV1, bufferV2, bufferV2NoBuffer);
pos = 4100 * KILOBYTE;
verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
bufferV1, bufferV2, bufferV2NoBuffer);
}
}
private void verifyConsistentReads(FSDataInputStream inputStreamV1,
FSDataInputStream inputStreamV2, FSDataInputStream inputStreamV2NoBuffer,
int pos, byte[] bufferV1, byte[] bufferV2, byte[] bufferV2NoBuffer)
throws IOException {
int size = bufferV1.length;
int numBytesReadV1 = inputStreamV1.read(pos, bufferV1, 0, size);
assertEquals("Bytes read from V1 stream", size, numBytesReadV1);
int numBytesReadV2 = inputStreamV2.read(pos, bufferV2, 0, size);
assertEquals("Bytes read from V2 stream", size, numBytesReadV2);
int numBytesReadV2NoBuffer = inputStreamV2NoBuffer.read(pos,
bufferV2NoBuffer, 0, size);
assertEquals("Bytes read from V2 stream (buffered pread disabled)", size,
numBytesReadV2NoBuffer);
assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
assertArrayEquals("Mismatch in read data", bufferV2, bufferV2NoBuffer);
}
/** /**
* Validates the implementation of InputStream.markSupported. * Validates the implementation of InputStream.markSupported.
* @throws IOException * @throws IOException