diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 2354819b76..9908ba7426 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -548,7 +548,7 @@ public FSDataOutputStream create(Path f, FsPermission permission, progress, partSize, blockFactory, - instrumentation.newOutputStreamStatistics(), + instrumentation.newOutputStreamStatistics(statistics), new WriteOperationHelper(key) ), null); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 963c53facd..fb8c85239d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.fs.FileSystem.Statistics; import static org.apache.hadoop.fs.s3a.Statistic.*; @@ -639,9 +640,8 @@ public String toString() { * Create a stream output statistics instance. * @return the new instance */ - - OutputStreamStatistics newOutputStreamStatistics() { - return new OutputStreamStatistics(); + OutputStreamStatistics newOutputStreamStatistics(Statistics statistics) { + return new OutputStreamStatistics(statistics); } /** @@ -677,6 +677,12 @@ public final class OutputStreamStatistics implements Closeable { private final AtomicLong queueDuration = new AtomicLong(0); private final AtomicLong exceptionsInMultipartFinalize = new AtomicLong(0); + private Statistics statistics; + + public OutputStreamStatistics(Statistics statistics){ + this.statistics = statistics; + } + /** * Block is queued for upload. */ @@ -717,6 +723,7 @@ void blockUploadFailed(long duration, int blockSize) { /** Intermediate report of bytes uploaded. */ void bytesTransferred(long byteCount) { bytesUploaded.addAndGet(byteCount); + statistics.incrementBytesWritten(byteCount); bytesPendingUpload.addAndGet(-byteCount); incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, -byteCount); }