HADOOP-16040. ABFS: Bug fix for tolerateOobAppends configuration.

Contributed by Da Zhou.
This commit is contained in:
Da Zhou 2019-01-10 11:58:39 +00:00 committed by Steve Loughran
parent 2091d1a4af
commit e8d1900369
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
3 changed files with 39 additions and 4 deletions

View File

@ -374,7 +374,8 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist
// Add statistics for InputStream // Add statistics for InputStream
return new AbfsInputStream(client, statistics, return new AbfsInputStream(client, statistics,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), eTag); abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(),
abfsConfiguration.getTolerateOobAppends(), eTag);
} }
public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws

View File

@ -61,6 +61,7 @@ public AbfsInputStream(
final long contentLength, final long contentLength,
final int bufferSize, final int bufferSize,
final int readAheadQueueDepth, final int readAheadQueueDepth,
final boolean tolerateOobAppends,
final String eTag) { final String eTag) {
this.client = client; this.client = client;
this.statistics = statistics; this.statistics = statistics;
@ -68,8 +69,8 @@ public AbfsInputStream(
this.contentLength = contentLength; this.contentLength = contentLength;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
this.tolerateOobAppends = tolerateOobAppends;
this.eTag = eTag; this.eTag = eTag;
this.tolerateOobAppends = false;
this.readAheadEnabled = true; this.readAheadEnabled = true;
} }

View File

@ -25,12 +25,14 @@
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/** /**
@ -66,7 +68,9 @@ public void testReadWriteBytesToFile() throws Exception {
} }
@Test (expected = IOException.class) @Test (expected = IOException.class)
public void testOOBWrites() throws Exception { public void testOOBWritesAndReadFail() throws Exception {
Configuration conf = this.getRawConfiguration();
conf.setBoolean(AZURE_TOLERATE_CONCURRENT_APPEND, false);
final AzureBlobFileSystem fs = getFileSystem(); final AzureBlobFileSystem fs = getFileSystem();
int readBufferSize = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize(); int readBufferSize = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize();
@ -83,7 +87,6 @@ public void testOOBWrites() throws Exception {
try (FSDataInputStream readStream = fs.open(testFilePath)) { try (FSDataInputStream readStream = fs.open(testFilePath)) {
assertEquals(readBufferSize, assertEquals(readBufferSize,
readStream.read(bytesToRead, 0, readBufferSize)); readStream.read(bytesToRead, 0, readBufferSize));
try (FSDataOutputStream writeStream = fs.create(testFilePath)) { try (FSDataOutputStream writeStream = fs.create(testFilePath)) {
writeStream.write(b); writeStream.write(b);
writeStream.flush(); writeStream.flush();
@ -94,6 +97,36 @@ public void testOOBWrites() throws Exception {
} }
} }
@Test
public void testOOBWritesAndReadSucceed() throws Exception {
Configuration conf = this.getRawConfiguration();
conf.setBoolean(AZURE_TOLERATE_CONCURRENT_APPEND, true);
final AzureBlobFileSystem fs = getFileSystem(conf);
int readBufferSize = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize();
byte[] bytesToRead = new byte[readBufferSize];
final byte[] b = new byte[2 * readBufferSize];
new Random().nextBytes(b);
final Path testFilePath = new Path(methodName.getMethodName());
try (FSDataOutputStream writeStream = fs.create(testFilePath)) {
writeStream.write(b);
writeStream.flush();
}
try (FSDataInputStream readStream = fs.open(testFilePath)) {
// Read
assertEquals(readBufferSize, readStream.read(bytesToRead, 0, readBufferSize));
// Concurrent write
try (FSDataOutputStream writeStream = fs.create(testFilePath)) {
writeStream.write(b);
writeStream.flush();
}
assertEquals(readBufferSize, readStream.read(bytesToRead, 0, readBufferSize));
}
}
@Test @Test
public void testWriteWithBufferOffset() throws Exception { public void testWriteWithBufferOffset() throws Exception {
final AzureBlobFileSystem fs = getFileSystem(); final AzureBlobFileSystem fs = getFileSystem();