From 0a6ddfa145b788c834098b9169ea880eec2b5b82 Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Tue, 12 Jan 2021 21:18:09 +0530 Subject: [PATCH] HADOOP-17272. ABFS Streams to support IOStatistics API (#2604) Contributed by Mehakmeet Singh. --- .../fs/statistics/StreamStatisticNames.java | 72 ++++++++ .../fs/azurebfs/services/AbfsInputStream.java | 23 ++- .../services/AbfsInputStreamStatistics.java | 15 +- .../AbfsInputStreamStatisticsImpl.java | 162 +++++++++++------- .../azurebfs/services/AbfsOutputStream.java | 123 ++++++++----- .../services/AbfsOutputStreamStatistics.java | 17 +- .../AbfsOutputStreamStatisticsImpl.java | 130 ++++++++------ .../ITestAbfsInputStreamStatistics.java | 43 ++++- .../ITestAbfsOutputStreamStatistics.java | 31 ++++ .../TestAbfsOutputStreamStatistics.java | 27 +-- 10 files changed, 443 insertions(+), 200 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index 02072d464d..bbb8517118 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -286,6 +286,78 @@ public final class StreamStatisticNames { public static final String STREAM_WRITE_TOTAL_DATA = "stream_write_total_data"; + /** + * Number of bytes to upload from an OutputStream. + */ + public static final String BYTES_TO_UPLOAD + = "bytes_upload"; + + /** + * Number of bytes uploaded successfully to the object store. + */ + public static final String BYTES_UPLOAD_SUCCESSFUL + = "bytes_upload_successfully"; + + /** + * Number of bytes failed to upload to the object store. + */ + public static final String BYTES_UPLOAD_FAILED + = "bytes_upload_failed"; + + /** + * Total time spent on waiting for a task to complete. + */ + public static final String TIME_SPENT_ON_TASK_WAIT + = "time_spent_task_wait"; + + /** + * Number of task queue shrunk operations. + */ + public static final String QUEUE_SHRUNK_OPS + = "queue_shrunk_ops"; + + /** + * Number of times current buffer is written to the service. + */ + public static final String WRITE_CURRENT_BUFFER_OPERATIONS + = "write_current_buffer_ops"; + + /** + * Total time spent on completing a PUT request. + */ + public static final String TIME_SPENT_ON_PUT_REQUEST + = "time_spent_on_put_request"; + + /** + * Number of seeks in buffer. + */ + public static final String SEEK_IN_BUFFER + = "seek_in_buffer"; + + /** + * Number of bytes read from the buffer. + */ + public static final String BYTES_READ_BUFFER + = "bytes_read_buffer"; + + /** + * Total number of remote read operations performed. + */ + public static final String REMOTE_READ_OP + = "remote_read_op"; + + /** + * Total number of bytes read from readAhead. + */ + public static final String READ_AHEAD_BYTES_READ + = "read_ahead_bytes_read"; + + /** + * Total number of bytes read from remote operations. + */ + public static final String REMOTE_BYTES_READ + = "remote_bytes_read"; + private StreamStatisticNames() { } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 1d109f493c..c1de031215 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -37,6 +37,11 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.statistics.StoreStatisticNames; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import static java.lang.Math.max; import static java.lang.Math.min; @@ -48,7 +53,7 @@ * The AbfsInputStream for AbfsClient. */ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, - StreamCapabilities { + StreamCapabilities, IOStatisticsSource { private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class); // Footer size is set to qualify for both ORC and parquet files public static final int FOOTER_SIZE = 16 * ONE_KB; @@ -92,6 +97,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private long bytesFromRemoteRead; // bytes read remotely; for testing private final AbfsInputStreamContext context; + private IOStatistics ioStatistics; public AbfsInputStream( final AbfsClient client, @@ -120,6 +126,9 @@ public AbfsInputStream( // Propagate the config values to ReadBufferManager so that the first instance // to initialize can set the readAheadBlockSize ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize); + if (streamStatistics != null) { + ioStatistics = streamStatistics.getIOStatistics(); + } } public String getPath() { @@ -152,7 +161,7 @@ public synchronized int read(final byte[] b, final int off, final int len) throw int lastReadBytes; int totalReadBytes = 0; if (streamStatistics != null) { - streamStatistics.readOperationStarted(off, len); + streamStatistics.readOperationStarted(); } incrementReadOps(); do { @@ -431,7 +440,10 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length); - op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get()); + op = IOStatisticsBinding.trackDuration((IOStatisticsStore) ioStatistics, + StoreStatisticNames.ACTION_HTTP_GET_REQUEST, + () -> client.read(path, position, b, offset, length, + tolerateOobAppends ? "*" : eTag, cachedSasToken.get())); cachedSasToken.update(op.getSasToken()); if (streamStatistics != null) { streamStatistics.remoteReadOperation(); @@ -694,6 +706,11 @@ public boolean shouldAlwaysReadBufferSize() { return alwaysReadBufferSize; } + @Override + public IOStatistics getIOStatistics() { + return ioStatistics; + } + /** * Get the statistics of the stream. * @return a string value. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java index c910a1f75e..00663467fe 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java @@ -19,12 +19,14 @@ package org.apache.hadoop.fs.azurebfs.services; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; /** * Interface for statistics for the AbfsInputStream. */ @InterfaceStability.Unstable -public interface AbfsInputStreamStatistics { +public interface AbfsInputStreamStatistics extends IOStatisticsSource { /** * Seek backwards, incrementing the seek and backward seek counters. * @@ -73,11 +75,8 @@ public interface AbfsInputStreamStatistics { /** * A {@code read(byte[] buf, int off, int len)} operation has started. - * - * @param pos starting position of the read. - * @param len length of bytes to read. */ - void readOperationStarted(long pos, long len); + void readOperationStarted(); /** * Records a successful remote read operation. @@ -96,6 +95,12 @@ public interface AbfsInputStreamStatistics { */ void remoteBytesRead(long bytes); + /** + * Get the IOStatisticsStore instance from AbfsInputStreamStatistics. + * @return instance of IOStatisticsStore which extends IOStatistics. + */ + IOStatistics getIOStatistics(); + /** * Makes the string of all the AbfsInputStream statistics. * @return the string with all the statistics. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java index 12cc407dcb..bd09762976 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java @@ -18,23 +18,50 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.StreamStatisticNames; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; + +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore; + /** * Stats for the AbfsInputStream. */ public class AbfsInputStreamStatisticsImpl implements AbfsInputStreamStatistics { - private long seekOperations; - private long forwardSeekOperations; - private long backwardSeekOperations; - private long bytesRead; - private long bytesSkippedOnSeek; - private long bytesBackwardsOnSeek; - private long seekInBuffer; - private long readOperations; - private long bytesReadFromBuffer; - private long remoteReadOperations; - private long readAheadBytesRead; - private long remoteBytesRead; + + private final IOStatisticsStore ioStatisticsStore = iostatisticsStore() + .withCounters( + StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS, + StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS, + StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS, + StreamStatisticNames.STREAM_READ_BYTES, + StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED, + StreamStatisticNames.STREAM_READ_OPERATIONS, + StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS, + StreamStatisticNames.SEEK_IN_BUFFER, + StreamStatisticNames.BYTES_READ_BUFFER, + StreamStatisticNames.REMOTE_READ_OP, + StreamStatisticNames.READ_AHEAD_BYTES_READ, + StreamStatisticNames.REMOTE_BYTES_READ + ) + .withDurationTracking(ACTION_HTTP_GET_REQUEST) + .build(); + + /* Reference to the atomic counter for frequently updated counters to avoid + * cost of the map lookup on every increment. + */ + private final AtomicLong bytesRead = + ioStatisticsStore.getCounterReference(StreamStatisticNames.STREAM_READ_BYTES); + private final AtomicLong readOps = + ioStatisticsStore.getCounterReference(StreamStatisticNames.STREAM_READ_OPERATIONS); + private final AtomicLong seekOps = + ioStatisticsStore.getCounterReference(StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS); /** * Seek backwards, incrementing the seek and backward seek counters. @@ -44,9 +71,9 @@ public class AbfsInputStreamStatisticsImpl */ @Override public void seekBackwards(long negativeOffset) { - seekOperations++; - backwardSeekOperations++; - bytesBackwardsOnSeek -= negativeOffset; + seekOps.incrementAndGet(); + ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS); + ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS, negativeOffset); } /** @@ -58,11 +85,9 @@ public void seekBackwards(long negativeOffset) { */ @Override public void seekForwards(long skipped) { - seekOperations++; - forwardSeekOperations++; - if (skipped > 0) { - bytesSkippedOnSeek += skipped; - } + seekOps.incrementAndGet(); + ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS); + ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED, skipped); } /** @@ -90,9 +115,7 @@ public void seek(long seekTo, long currentPos) { */ @Override public void bytesRead(long bytes) { - if (bytes > 0) { - bytesRead += bytes; - } + bytesRead.addAndGet(bytes); } /** @@ -104,9 +127,7 @@ public void bytesRead(long bytes) { */ @Override public void bytesReadFromBuffer(long bytes) { - if (bytes > 0) { - bytesReadFromBuffer += bytes; - } + ioStatisticsStore.incrementCounter(StreamStatisticNames.BYTES_READ_BUFFER, bytes); } /** @@ -116,18 +137,15 @@ public void bytesReadFromBuffer(long bytes) { */ @Override public void seekInBuffer() { - seekInBuffer++; + ioStatisticsStore.incrementCounter(StreamStatisticNames.SEEK_IN_BUFFER); } /** * A {@code read(byte[] buf, int off, int len)} operation has started. - * - * @param pos starting position of the read. - * @param len length of bytes to read. */ @Override - public void readOperationStarted(long pos, long len) { - readOperations++; + public void readOperationStarted() { + readOps.incrementAndGet(); } /** @@ -137,9 +155,7 @@ public void readOperationStarted(long pos, long len) { */ @Override public void readAheadBytesRead(long bytes) { - if (bytes > 0) { - readAheadBytesRead += bytes; - } + ioStatisticsStore.incrementCounter(StreamStatisticNames.READ_AHEAD_BYTES_READ, bytes); } /** @@ -149,9 +165,7 @@ public void readAheadBytesRead(long bytes) { */ @Override public void remoteBytesRead(long bytes) { - if (bytes > 0) { - remoteBytesRead += bytes; - } + ioStatisticsStore.incrementCounter(StreamStatisticNames.REMOTE_BYTES_READ, bytes); } /** @@ -161,55 +175,88 @@ public void remoteBytesRead(long bytes) { */ @Override public void remoteReadOperation() { - remoteReadOperations++; + ioStatisticsStore.incrementCounter(StreamStatisticNames.REMOTE_READ_OP); } + /** + * Getter for IOStatistics instance used. + * @return IOStatisticsStore instance which extends IOStatistics. + */ + @Override + public IOStatistics getIOStatistics() { + return ioStatisticsStore; + } + + @VisibleForTesting public long getSeekOperations() { - return seekOperations; + return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS); } + @VisibleForTesting public long getForwardSeekOperations() { - return forwardSeekOperations; + return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS); } + @VisibleForTesting public long getBackwardSeekOperations() { - return backwardSeekOperations; + return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS); } + @VisibleForTesting public long getBytesRead() { - return bytesRead; + return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_BYTES); } + @VisibleForTesting public long getBytesSkippedOnSeek() { - return bytesSkippedOnSeek; + return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED); } + @VisibleForTesting public long getBytesBackwardsOnSeek() { - return bytesBackwardsOnSeek; + return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS); } + @VisibleForTesting public long getSeekInBuffer() { - return seekInBuffer; + return ioStatisticsStore.counters().get(StreamStatisticNames.SEEK_IN_BUFFER); + } + @VisibleForTesting public long getReadOperations() { - return readOperations; + return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_OPERATIONS); } + @VisibleForTesting public long getBytesReadFromBuffer() { - return bytesReadFromBuffer; + return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_READ_BUFFER); } + @VisibleForTesting public long getRemoteReadOperations() { - return remoteReadOperations; + return ioStatisticsStore.counters().get(StreamStatisticNames.REMOTE_READ_OP); } + @VisibleForTesting public long getReadAheadBytesRead() { - return readAheadBytesRead; + return ioStatisticsStore.counters().get(StreamStatisticNames.READ_AHEAD_BYTES_READ); } + @VisibleForTesting public long getRemoteBytesRead() { - return remoteBytesRead; + return ioStatisticsStore.counters().get(StreamStatisticNames.REMOTE_BYTES_READ); + } + + /** + * Getter for the mean value of the time taken to complete a HTTP GET + * request by AbfsInputStream. + * @return mean value. + */ + @VisibleForTesting + public double getActionHttpGetRequest() { + return ioStatisticsStore.meanStatistics(). + get(ACTION_HTTP_GET_REQUEST + SUFFIX_MEAN).mean(); } /** @@ -223,18 +270,7 @@ public long getRemoteBytesRead() { public String toString() { final StringBuilder sb = new StringBuilder( "StreamStatistics{"); - sb.append(", SeekOperations=").append(seekOperations); - sb.append(", ForwardSeekOperations=").append(forwardSeekOperations); - sb.append(", BackwardSeekOperations=").append(backwardSeekOperations); - sb.append(", BytesSkippedOnSeek=").append(bytesSkippedOnSeek); - sb.append(", BytesBackwardsOnSeek=").append(bytesBackwardsOnSeek); - sb.append(", seekInBuffer=").append(seekInBuffer); - sb.append(", BytesRead=").append(bytesRead); - sb.append(", ReadOperations=").append(readOperations); - sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer); - sb.append(", remoteReadOperations=").append(remoteReadOperations); - sb.append(", readAheadBytesRead=").append(readAheadBytesRead); - sb.append(", remoteBytesRead=").append(remoteBytesRead); + sb.append(ioStatisticsStore.toString()); sb.append('}'); return sb.toString(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 402fdda7b2..53bdfe94cf 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -29,7 +29,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -43,6 +42,12 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.statistics.StreamStatisticNames; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FSExceptionMessages; @@ -57,7 +62,8 @@ /** * The BlobFsOutputStream for Rest AbfsClient. */ -public class AbfsOutputStream extends OutputStream implements Syncable, StreamCapabilities { +public class AbfsOutputStream extends OutputStream implements Syncable, + StreamCapabilities, IOStatisticsSource { private final AbfsClient client; private final String path; @@ -97,6 +103,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private final Statistics statistics; private final AbfsOutputStreamStatistics outputStreamStatistics; + private IOStatistics ioStatistics; private static final Logger LOG = LoggerFactory.getLogger(AbfsOutputStream.class); @@ -144,6 +151,9 @@ public AbfsOutputStream( this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.cachedSasToken = new CachedSASToken( abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds()); + if (outputStreamStatistics != null) { + this.ioStatistics = outputStreamStatistics.getIOStatistics(); + } } /** @@ -354,11 +364,12 @@ private void writeAppendBlobCurrentBufferToService() throws IOException { if (bufferIndex == 0) { return; } - outputStreamStatistics.writeCurrentBuffer(); - final byte[] bytes = buffer; final int bytesLength = bufferIndex; - outputStreamStatistics.bytesToUpload(bytesLength); + if (outputStreamStatistics != null) { + outputStreamStatistics.writeCurrentBuffer(); + outputStreamStatistics.bytesToUpload(bytesLength); + } buffer = byteBufferPool.getBuffer(false, bufferSize).array(); bufferIndex = 0; final long offset = position; @@ -370,7 +381,9 @@ private void writeAppendBlobCurrentBufferToService() throws IOException { bytesLength, APPEND_MODE, true); AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get()); cachedSasToken.update(op.getSasToken()); - outputStreamStatistics.uploadSuccessful(bytesLength); + if (outputStreamStatistics != null) { + outputStreamStatistics.uploadSuccessful(bytesLength); + } perfInfo.registerResult(op.getResult()); byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); perfInfo.registerSuccess(true); @@ -402,55 +415,63 @@ private synchronized void writeCurrentBufferToService(boolean isFlush, boolean i if (bufferIndex == 0) { return; } - outputStreamStatistics.writeCurrentBuffer(); numOfAppendsToServerSinceLastFlush++; final byte[] bytes = buffer; final int bytesLength = bufferIndex; - outputStreamStatistics.bytesToUpload(bytesLength); + if (outputStreamStatistics != null) { + outputStreamStatistics.writeCurrentBuffer(); + outputStreamStatistics.bytesToUpload(bytesLength); + } buffer = byteBufferPool.getBuffer(false, bufferSize).array(); bufferIndex = 0; final long offset = position; position += bytesLength; if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) { - long start = System.currentTimeMillis(); - waitForTaskToComplete(); - outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis()); - } - - final Future job = completionService.submit(new Callable() { - @Override - public Void call() throws Exception { - AbfsPerfTracker tracker = client.getAbfsPerfTracker(); - try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, - "writeCurrentBufferToService", "append")) { - AppendRequestParameters.Mode - mode = APPEND_MODE; - if (isFlush & isClose) { - mode = FLUSH_CLOSE_MODE; - } else if (isFlush) { - mode = FLUSH_MODE; - } - - AppendRequestParameters reqParams = new AppendRequestParameters( - offset, 0, bytesLength, mode, false); - AbfsRestOperation op = client.append(path, bytes, reqParams, - cachedSasToken.get()); - - cachedSasToken.update(op.getSasToken()); - perfInfo.registerResult(op.getResult()); - byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); - perfInfo.registerSuccess(true); - return null; + //Tracking time spent on waiting for task to complete. + if (outputStreamStatistics != null) { + try (DurationTracker ignored = outputStreamStatistics.timeSpentTaskWait()) { + waitForTaskToComplete(); } + } else { + waitForTaskToComplete(); } - }); + } + final Future job = + completionService.submit(IOStatisticsBinding + .trackDurationOfCallable((IOStatisticsStore) ioStatistics, + StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST, + () -> { + AbfsPerfTracker tracker = client.getAbfsPerfTracker(); + try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, + "writeCurrentBufferToService", "append")) { + AppendRequestParameters.Mode + mode = APPEND_MODE; + if (isFlush & isClose) { + mode = FLUSH_CLOSE_MODE; + } else if (isFlush) { + mode = FLUSH_MODE; + } + AppendRequestParameters reqParams = new AppendRequestParameters( + offset, 0, bytesLength, mode, false); + AbfsRestOperation op = client.append(path, bytes, reqParams, + cachedSasToken.get()); + cachedSasToken.update(op.getSasToken()); + perfInfo.registerResult(op.getResult()); + byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); + perfInfo.registerSuccess(true); + return null; + } + }) + ); - if (job.isCancelled()) { - outputStreamStatistics.uploadFailed(bytesLength); - } else { - outputStreamStatistics.uploadSuccessful(bytesLength); + if (outputStreamStatistics != null) { + if (job.isCancelled()) { + outputStreamStatistics.uploadFailed(bytesLength); + } else { + outputStreamStatistics.uploadSuccessful(bytesLength); + } } writeOperations.add(new WriteOperation(job, offset, bytesLength)); @@ -527,7 +548,9 @@ private synchronized void shrinkWriteOperationQueue() throws IOException { lastTotalAppendOffset += writeOperations.peek().length; writeOperations.remove(); // Incrementing statistics to indicate queue has been shrunk. - outputStreamStatistics.queueShrunk(); + if (outputStreamStatistics != null) { + outputStreamStatistics.queueShrunk(); + } } } catch (Exception e) { if (e.getCause() instanceof AzureBlobFileSystemException) { @@ -615,6 +638,11 @@ Boolean isAppendBlobStream() { return isAppendBlob; } + @Override + public IOStatistics getIOStatistics() { + return ioStatistics; + } + /** * Appending AbfsOutputStream statistics to base toString(). * @@ -623,9 +651,12 @@ Boolean isAppendBlobStream() { @Override public String toString() { final StringBuilder sb = new StringBuilder(super.toString()); - sb.append("AbfsOuputStream@").append(this.hashCode()).append("){"); - sb.append(outputStreamStatistics.toString()); - sb.append("}"); + if (outputStreamStatistics != null) { + sb.append("AbfsOutputStream@").append(this.hashCode()); + sb.append("){"); + sb.append(outputStreamStatistics.toString()); + sb.append("}"); + } return sb.toString(); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java index c9fe0dd455..c57d5d9bca 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java @@ -19,12 +19,15 @@ package org.apache.hadoop.fs.azurebfs.services; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; /** * Interface for {@link AbfsOutputStream} statistics. */ @InterfaceStability.Unstable -public interface AbfsOutputStreamStatistics { +public interface AbfsOutputStreamStatistics extends IOStatisticsSource { /** * Number of bytes to be uploaded. @@ -49,11 +52,9 @@ public interface AbfsOutputStreamStatistics { /** * Time spent in waiting for tasks to be completed in the blocking queue. - * - * @param start millisecond at which the wait for task to be complete begins. - * @param end millisecond at which the wait is completed for the task. + * @return instance of the DurationTracker that tracks the time for waiting. */ - void timeSpentTaskWait(long start, long end); + DurationTracker timeSpentTaskWait(); /** * Number of times task queue is shrunk. @@ -65,6 +66,12 @@ public interface AbfsOutputStreamStatistics { */ void writeCurrentBuffer(); + /** + * Get the IOStatisticsStore instance from AbfsOutputStreamStatistics. + * @return instance of IOStatisticsStore which extends IOStatistics. + */ + IOStatistics getIOStatistics(); + /** * Method to form a string of all AbfsOutputStream statistics and their * values. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java index cd5a29e217..b07cf28a71 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java @@ -18,32 +18,47 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.StoreStatisticNames; +import org.apache.hadoop.fs.statistics.StreamStatisticNames; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; + +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore; + /** * OutputStream statistics implementation for Abfs. */ public class AbfsOutputStreamStatisticsImpl implements AbfsOutputStreamStatistics { - private long bytesToUpload; - private long bytesUploadSuccessful; - private long bytesUploadFailed; - /** - * Counter to get the total time spent while waiting for tasks to complete - * in the blocking queue inside the thread executor. + + private final IOStatisticsStore ioStatisticsStore = iostatisticsStore() + .withCounters( + StreamStatisticNames.BYTES_TO_UPLOAD, + StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL, + StreamStatisticNames.BYTES_UPLOAD_FAILED, + StreamStatisticNames.QUEUE_SHRUNK_OPS, + StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS + ) + .withDurationTracking( + StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST, + StreamStatisticNames.TIME_SPENT_ON_TASK_WAIT + ) + .build(); + + /* Reference to the atomic counter for frequently updated counters to avoid + * cost of the map lookup on every increment. */ - private long timeSpentOnTaskWait; - /** - * Counter to get the total number of queue shrink operations done {@code - * AbfsOutputStream#shrinkWriteOperationQueue()} by AbfsOutputStream to - * remove the write operations which were successfully done by - * AbfsOutputStream from the task queue. - */ - private long queueShrunkOps; - /** - * Counter to get the total number of times the current buffer is written - * to the service {@code AbfsOutputStream#writeCurrentBufferToService()} via - * AbfsClient and appended to the data store by AbfsRestOperation. - */ - private long writeCurrentBufferOperations; + private final AtomicLong bytesUpload = + ioStatisticsStore.getCounterReference(StreamStatisticNames.BYTES_TO_UPLOAD); + private final AtomicLong bytesUploadedSuccessfully = + ioStatisticsStore.getCounterReference(StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL); + private final AtomicLong writeCurrentBufferOps = + ioStatisticsStore.getCounterReference(StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS); /** * Records the need to upload bytes and increments the total bytes that @@ -53,9 +68,7 @@ public class AbfsOutputStreamStatisticsImpl */ @Override public void bytesToUpload(long bytes) { - if (bytes > 0) { - bytesToUpload += bytes; - } + bytesUpload.addAndGet(bytes); } /** @@ -66,9 +79,7 @@ public void bytesToUpload(long bytes) { */ @Override public void uploadSuccessful(long bytes) { - if (bytes > 0) { - bytesUploadSuccessful += bytes; - } + bytesUploadedSuccessfully.addAndGet(bytes); } /** @@ -78,9 +89,7 @@ public void uploadSuccessful(long bytes) { */ @Override public void uploadFailed(long bytes) { - if (bytes > 0) { - bytesUploadFailed += bytes; - } + ioStatisticsStore.incrementCounter(StreamStatisticNames.BYTES_UPLOAD_FAILED, bytes); } /** @@ -96,14 +105,10 @@ public void uploadFailed(long bytes) { * This time spent while waiting for the task to be completed is being * recorded in this counter. * - * @param startTime time(in milliseconds) before the wait for task to be - * completed is begin. - * @param endTime time(in milliseconds) after the wait for the task to be - * completed is done. */ @Override - public void timeSpentTaskWait(long startTime, long endTime) { - timeSpentOnTaskWait += endTime - startTime; + public DurationTracker timeSpentTaskWait() { + return ioStatisticsStore.trackDuration(StreamStatisticNames.TIME_SPENT_ON_TASK_WAIT); } /** @@ -114,7 +119,7 @@ public void timeSpentTaskWait(long startTime, long endTime) { */ @Override public void queueShrunk() { - queueShrunkOps++; + ioStatisticsStore.incrementCounter(StreamStatisticNames.QUEUE_SHRUNK_OPS); } /** @@ -125,31 +130,59 @@ public void queueShrunk() { */ @Override public void writeCurrentBuffer() { - writeCurrentBufferOperations++; + writeCurrentBufferOps.incrementAndGet(); } + /** + * {@inheritDoc} + * + * A getter for IOStatisticsStore instance which extends IOStatistics. + * + * @return IOStatisticsStore instance. + */ + @Override + public IOStatistics getIOStatistics() { + return ioStatisticsStore; + } + + @VisibleForTesting public long getBytesToUpload() { - return bytesToUpload; + return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_TO_UPLOAD); } + @VisibleForTesting public long getBytesUploadSuccessful() { - return bytesUploadSuccessful; + return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL); } + @VisibleForTesting public long getBytesUploadFailed() { - return bytesUploadFailed; + return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_UPLOAD_FAILED); } + @VisibleForTesting public long getTimeSpentOnTaskWait() { - return timeSpentOnTaskWait; + return ioStatisticsStore.counters().get(StreamStatisticNames.TIME_SPENT_ON_TASK_WAIT); } + @VisibleForTesting public long getQueueShrunkOps() { - return queueShrunkOps; + return ioStatisticsStore.counters().get(StreamStatisticNames.QUEUE_SHRUNK_OPS); } + @VisibleForTesting public long getWriteCurrentBufferOperations() { - return writeCurrentBufferOperations; + return ioStatisticsStore.counters().get(StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS); + } + + /** + * Getter for mean value of time taken to complete a PUT request by + * AbfsOutputStream. + * @return mean value. + */ + @VisibleForTesting + public double getTimeSpentOnPutRequest() { + return ioStatisticsStore.meanStatistics().get(StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST + StoreStatisticNames.SUFFIX_MEAN).mean(); } /** @@ -160,16 +193,7 @@ public long getWriteCurrentBufferOperations() { @Override public String toString() { final StringBuilder outputStreamStats = new StringBuilder( "OutputStream Statistics{"); - outputStreamStats.append(", bytes_upload=").append(bytesToUpload); - outputStreamStats.append(", bytes_upload_successfully=") - .append(bytesUploadSuccessful); - outputStreamStats.append(", bytes_upload_failed=") - .append(bytesUploadFailed); - outputStreamStats.append(", time_spent_task_wait=") - .append(timeSpentOnTaskWait); - outputStreamStats.append(", queue_shrunk_ops=").append(queueShrunkOps); - outputStreamStats.append(", write_current_buffer_ops=") - .append(writeCurrentBufferOperations); + outputStreamStats.append(ioStatisticsStore.toString()); outputStreamStats.append("}"); return outputStreamStats.toString(); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index 52dfdf2a61..a33a76ecef 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -139,10 +139,9 @@ public void testSeekStatistics() throws IOException { * forwardSeekOps - Since we are doing a forward seek inside a loop * for OPERATION times, total forward seeks would be OPERATIONS. * - * bytesBackwardsOnSeek - Since we are doing backward seeks from end of - * file in a ONE_MB file each time, this would mean the bytes from - * backward seek would be OPERATIONS * ONE_MB. Since this is backward - * seek this value is expected be to be negative. + * negativeBytesBackwardsOnSeek - Since we are doing backward seeks from + * end of file in a ONE_MB file each time, this would mean the bytes from + * backward seek would be OPERATIONS * ONE_MB. * * bytesSkippedOnSeek - Since, we move from start to end in seek, but * our fCursor(position of cursor) always remain at end of file, this @@ -160,7 +159,7 @@ public void testSeekStatistics() throws IOException { assertEquals("Mismatch in forwardSeekOps value", OPERATIONS, stats.getForwardSeekOperations()); assertEquals("Mismatch in bytesBackwardsOnSeek value", - -1 * OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek()); + OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek()); assertEquals("Mismatch in bytesSkippedOnSeek value", 0, stats.getBytesSkippedOnSeek()); assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS, @@ -366,6 +365,40 @@ public void testReadAheadCounters() throws IOException { } } + /** + * Testing time taken by AbfsInputStream to complete a GET request. + */ + @Test + public void testActionHttpGetRequest() throws IOException { + describe("Test to check the correct value of Time taken by http get " + + "request in AbfsInputStream"); + AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore abfss = fs.getAbfsStore(); + Path actionHttpGetRequestPath = path(getMethodName()); + AbfsInputStream abfsInputStream = null; + AbfsOutputStream abfsOutputStream = null; + try { + abfsOutputStream = createAbfsOutputStreamWithFlushEnabled(fs, + actionHttpGetRequestPath); + abfsOutputStream.write('a'); + abfsOutputStream.hflush(); + + abfsInputStream = + abfss.openFileForRead(actionHttpGetRequestPath, fs.getFsStatistics()); + abfsInputStream.read(); + AbfsInputStreamStatisticsImpl abfsInputStreamStatistics = + (AbfsInputStreamStatisticsImpl) abfsInputStream.getStreamStatistics(); + + LOG.info("AbfsInputStreamStats info: {}", abfsInputStreamStatistics.toString()); + Assertions.assertThat( + abfsInputStreamStatistics.getActionHttpGetRequest()) + .describedAs("Mismatch in time taken by a GET request") + .isGreaterThan(0.0); + } finally { + IOUtils.cleanupWithLogger(LOG, abfsInputStream, abfsOutputStream); + } + } + /** * Method to assert the initial values of the statistics. * diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java index c8640dded3..392e80a0a7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java @@ -20,7 +20,10 @@ import java.io.IOException; +import org.assertj.core.api.Assertions; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; @@ -31,7 +34,10 @@ */ public class ITestAbfsOutputStreamStatistics extends AbstractAbfsIntegrationTest { + private static final int OPERATIONS = 10; + private static final Logger LOG = + LoggerFactory.getLogger(ITestAbfsOutputStreamStatistics.class); public ITestAbfsOutputStreamStatistics() throws Exception { } @@ -219,6 +225,31 @@ public void testAbfsOutputStreamWriteBuffer() throws IOException { } } + /** + * Test to check correct value of time spent on a PUT request in + * AbfsOutputStream. + */ + @Test + public void testAbfsOutputStreamDurationTrackerPutRequest() throws IOException { + describe("Testing to check if DurationTracker for PUT request is working " + + "correctly."); + AzureBlobFileSystem fs = getFileSystem(); + Path pathForPutRequest = path(getMethodName()); + + try(AbfsOutputStream outputStream = + createAbfsOutputStreamWithFlushEnabled(fs, pathForPutRequest)) { + outputStream.write('a'); + outputStream.hflush(); + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + getAbfsOutputStreamStatistics(outputStream); + LOG.info("AbfsOutputStreamStats info: {}", abfsOutputStreamStatistics.toString()); + Assertions.assertThat(abfsOutputStreamStatistics.getTimeSpentOnPutRequest()) + .describedAs("Mismatch in timeSpentOnPutRequest DurationTracker") + .isGreaterThan(0.0); + } + } + /** * Method to get the AbfsOutputStream statistics. * diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java index 58f0023371..5f9404302b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java @@ -94,17 +94,11 @@ public void testAbfsOutputStreamTimeSpentOnWaitTask() { assertEquals("Mismatch in time spent on waiting for tasks to complete", 0, abfsOutputStreamStatistics.getTimeSpentOnTaskWait()); - int smallRandomStartTime = - new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE); - int smallRandomEndTime = - new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE) - + smallRandomStartTime; - int smallDiff = smallRandomEndTime - smallRandomStartTime; abfsOutputStreamStatistics - .timeSpentTaskWait(smallRandomStartTime, smallRandomEndTime); - //Test for small random value of timeSpentWaitTask. + .timeSpentTaskWait(); + //Test for one op call value of timeSpentWaitTask. assertEquals("Mismatch in time spent on waiting for tasks to complete", - smallDiff, abfsOutputStreamStatistics.getTimeSpentOnTaskWait()); + 1, abfsOutputStreamStatistics.getTimeSpentOnTaskWait()); //Reset statistics for the next test. abfsOutputStreamStatistics = new AbfsOutputStreamStatisticsImpl(); @@ -113,23 +107,16 @@ public void testAbfsOutputStreamTimeSpentOnWaitTask() { * Entering multiple values for timeSpentTaskWait() to check the * summation is happening correctly. Also calculating the expected result. */ - int expectedRandomDiff = 0; for (int i = 0; i < OPERATIONS; i++) { - int largeRandomStartTime = - new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE); - int largeRandomEndTime = new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE) - + largeRandomStartTime; - abfsOutputStreamStatistics - .timeSpentTaskWait(largeRandomStartTime, largeRandomEndTime); - expectedRandomDiff += largeRandomEndTime - largeRandomStartTime; + abfsOutputStreamStatistics.timeSpentTaskWait(); } /* - * Test to check correct value of timeSpentTaskWait after multiple - * random values are passed in it. + * Test to check correct value of timeSpentTaskWait after OPERATIONS + * number of op calls. */ assertEquals("Mismatch in time spent on waiting for tasks to complete", - expectedRandomDiff, + OPERATIONS, abfsOutputStreamStatistics.getTimeSpentOnTaskWait()); }