HADOOP-16242. ABFS: add bufferpool to AbfsOutputStream.
Contributed by Da Zhou.
This commit is contained in:
parent
b434f558ca
commit
1cef194a28
@ -23,6 +23,7 @@
|
|||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
@ -37,6 +38,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
||||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||||
|
import org.apache.hadoop.io.ElasticByteBufferPool;
|
||||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||||
import org.apache.hadoop.fs.StreamCapabilities;
|
import org.apache.hadoop.fs.StreamCapabilities;
|
||||||
import org.apache.hadoop.fs.Syncable;
|
import org.apache.hadoop.fs.Syncable;
|
||||||
@ -64,6 +66,15 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|||||||
private final ThreadPoolExecutor threadExecutor;
|
private final ThreadPoolExecutor threadExecutor;
|
||||||
private final ExecutorCompletionService<Void> completionService;
|
private final ExecutorCompletionService<Void> completionService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Queue storing buffers with the size of the Azure block ready for
|
||||||
|
* reuse. The pool allows reusing the blocks instead of allocating new
|
||||||
|
* blocks. After the data is sent to the service, the buffer is returned
|
||||||
|
* back to the queue
|
||||||
|
*/
|
||||||
|
private final ElasticByteBufferPool byteBufferPool
|
||||||
|
= new ElasticByteBufferPool();
|
||||||
|
|
||||||
public AbfsOutputStream(
|
public AbfsOutputStream(
|
||||||
final AbfsClient client,
|
final AbfsClient client,
|
||||||
final String path,
|
final String path,
|
||||||
@ -78,7 +89,7 @@ public AbfsOutputStream(
|
|||||||
this.lastError = null;
|
this.lastError = null;
|
||||||
this.lastFlushOffset = 0;
|
this.lastFlushOffset = 0;
|
||||||
this.bufferSize = bufferSize;
|
this.bufferSize = bufferSize;
|
||||||
this.buffer = new byte[bufferSize];
|
this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
|
||||||
this.bufferIndex = 0;
|
this.bufferIndex = 0;
|
||||||
this.writeOperations = new ConcurrentLinkedDeque<>();
|
this.writeOperations = new ConcurrentLinkedDeque<>();
|
||||||
|
|
||||||
@ -263,8 +274,7 @@ private synchronized void writeCurrentBufferToService() throws IOException {
|
|||||||
|
|
||||||
final byte[] bytes = buffer;
|
final byte[] bytes = buffer;
|
||||||
final int bytesLength = bufferIndex;
|
final int bytesLength = bufferIndex;
|
||||||
|
buffer = byteBufferPool.getBuffer(false, bufferSize).array();
|
||||||
buffer = new byte[bufferSize];
|
|
||||||
bufferIndex = 0;
|
bufferIndex = 0;
|
||||||
final long offset = position;
|
final long offset = position;
|
||||||
position += bytesLength;
|
position += bytesLength;
|
||||||
@ -278,6 +288,7 @@ private synchronized void writeCurrentBufferToService() throws IOException {
|
|||||||
public Void call() throws Exception {
|
public Void call() throws Exception {
|
||||||
client.append(path, offset, bytes, 0,
|
client.append(path, offset, bytes, 0,
|
||||||
bytesLength);
|
bytesLength);
|
||||||
|
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
Loading…
Reference in New Issue
Block a user