diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 27ca20763b..9aba59b6f8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -87,6 +87,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl; @@ -512,6 +513,7 @@ public class AzureBlobFileSystemStore implements Closeable { .withReadBufferSize(abfsConfiguration.getReadBufferSize()) .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) + .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) .build(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 50380c9bb9..a809bde6c3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -68,6 +68,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, // of valid bytes in buffer) private boolean closed = false; + /** Stream statistics. */ + private final AbfsInputStreamStatistics streamStatistics; + public AbfsInputStream( final AbfsClient client, final Statistics statistics, @@ -86,6 +89,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, this.readAheadEnabled = true; this.cachedSasToken = new CachedSASToken( abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds()); + this.streamStatistics = abfsInputStreamContext.getStreamStatistics(); } public String getPath() { @@ -105,10 +109,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, @Override public synchronized int read(final byte[] b, final int off, final int len) throws IOException { + // check if buffer is null before logging the length + if (b != null) { + LOG.debug("read requested b.length = {} offset = {} len = {}", b.length, + off, len); + } else { + LOG.debug("read requested b = null offset = {} len = {}", off, len); + } + int currentOff = off; int currentLen = len; int lastReadBytes; int totalReadBytes = 0; + if (streamStatistics != null) { + streamStatistics.readOperationStarted(off, len); + } incrementReadOps(); do { lastReadBytes = readOneBlock(b, currentOff, currentLen); @@ -130,6 +145,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, } Preconditions.checkNotNull(b); + LOG.debug("read one block requested b.length = {} off {} len {}", b.length, + off, len); if (len == 0) { return 0; @@ -155,6 +172,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, bCursor = 0; limit = 0; if (buffer == null) { + LOG.debug("created new buffer size {}", bufferSize); buffer = new byte[bufferSize]; } @@ -183,6 +201,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, if (statistics != null) { statistics.incrementBytesRead(bytesToRead); } + if (streamStatistics != null) { + // Bytes read from the local buffer. + streamStatistics.bytesReadFromBuffer(bytesToRead); + streamStatistics.bytesRead(bytesToRead); + } return bytesToRead; } @@ -200,8 +223,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, int numReadAheads = this.readAheadQueueDepth; long nextSize; long nextOffset = position; + LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads); while (numReadAheads > 0 && nextOffset < contentLength) { nextSize = Math.min((long) bufferSize, contentLength - nextOffset); + LOG.debug("issuing read ahead requestedOffset = {} requested size {}", + nextOffset, nextSize); ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize); nextOffset = nextOffset + nextSize; numReadAheads--; @@ -211,6 +237,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); if (receivedBytes > 0) { incrementReadOps(); + LOG.debug("Received data from read ahead, not doing remote read"); return receivedBytes; } @@ -218,6 +245,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, receivedBytes = readRemote(position, b, offset, length); return receivedBytes; } else { + LOG.debug("read ahead disabled, reading remote"); return readRemote(position, b, offset, length); } } @@ -247,6 +275,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length); op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get()); cachedSasToken.update(op.getSasToken()); + if (streamStatistics != null) { + streamStatistics.remoteReadOperation(); + } + LOG.debug("issuing HTTP GET request params position = {} b.length = {} " + + "offset = {} length = {}", position, b.length, offset, length); perfInfo.registerResult(op.getResult()).registerSuccess(true); incrementReadOps(); } catch (AzureBlobFileSystemException ex) { @@ -262,6 +295,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, if (bytesRead > Integer.MAX_VALUE) { throw new IOException("Unexpected Content-Length"); } + LOG.debug("HTTP request read bytes = {}", bytesRead); return (int) bytesRead; } @@ -282,6 +316,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, */ @Override public synchronized void seek(long n) throws IOException { + LOG.debug("requested seek to position {}", n); if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } @@ -292,13 +327,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } + if (streamStatistics != null) { + streamStatistics.seek(n, fCursor); + } + if (n>=fCursor-limit && n<=fCursor) { // within buffer bCursor = (int) (n-(fCursor-limit)); + if (streamStatistics != null) { + streamStatistics.seekInBuffer(); + } return; } // next read will read from here fCursor = n; + LOG.debug("set fCursor to {}", fCursor); //invalidate buffer limit = 0; @@ -390,6 +433,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, public synchronized void close() throws IOException { closed = true; buffer = null; // de-reference the buffer so it can be GC'ed sooner + LOG.debug("Closing {}", this); } /** @@ -443,4 +487,28 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, this.cachedSasToken = cachedSasToken; } + /** + * Getter for AbfsInputStreamStatistics. + * + * @return an instance of AbfsInputStreamStatistics. + */ + @VisibleForTesting + public AbfsInputStreamStatistics getStreamStatistics() { + return streamStatistics; + } + + /** + * Get the statistics of the stream. + * @return a string value. + */ + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(super.toString()); + if (streamStatistics != null) { + sb.append("AbfsInputStream@(").append(this.hashCode()).append("){"); + sb.append(streamStatistics.toString()); + sb.append("}"); + } + return sb.toString(); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index a847b56eab..f8d3b2a599 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -29,6 +29,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean tolerateOobAppends; + private AbfsInputStreamStatistics streamStatistics; + public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { super(sasTokenRenewPeriodForStreamsInSeconds); } @@ -52,6 +54,12 @@ public class AbfsInputStreamContext extends AbfsStreamContext { return this; } + public AbfsInputStreamContext withStreamStatistics( + final AbfsInputStreamStatistics streamStatistics) { + this.streamStatistics = streamStatistics; + return this; + } + public AbfsInputStreamContext build() { // Validation of parameters to be done here. return this; @@ -68,4 +76,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { public boolean isTolerateOobAppends() { return tolerateOobAppends; } + + public AbfsInputStreamStatistics getStreamStatistics() { + return streamStatistics; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java new file mode 100644 index 0000000000..2603394c93 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Interface for statistics for the AbfsInputStream. + */ +@InterfaceStability.Unstable +public interface AbfsInputStreamStatistics { + /** + * Seek backwards, incrementing the seek and backward seek counters. + * + * @param negativeOffset how far was the seek? + * This is expected to be negative. + */ + void seekBackwards(long negativeOffset); + + /** + * Record a forward seek, adding a seek operation, a forward + * seek operation, and any bytes skipped. + * + * @param skipped number of bytes skipped by reading from the stream. + * If the seek was implemented by a close + reopen, set this to zero. + */ + void seekForwards(long skipped); + + /** + * Record a forward or backward seek, adding a seek operation, a forward or + * a backward seek operation, and number of bytes skipped. + * + * @param seekTo seek to the position. + * @param currentPos current position. + */ + void seek(long seekTo, long currentPos); + + /** + * Increment the bytes read counter by the number of bytes; + * no-op if the argument is negative. + * + * @param bytes number of bytes read. + */ + void bytesRead(long bytes); + + /** + * Record the total bytes read from buffer. + * + * @param bytes number of bytes that are read from buffer. + */ + void bytesReadFromBuffer(long bytes); + + /** + * Records the total number of seeks done in the buffer. + */ + void seekInBuffer(); + + /** + * A {@code read(byte[] buf, int off, int len)} operation has started. + * + * @param pos starting position of the read. + * @param len length of bytes to read. + */ + void readOperationStarted(long pos, long len); + + /** + * Records a successful remote read operation. + */ + void remoteReadOperation(); + + /** + * Makes the string of all the AbfsInputStream statistics. + * @return the string with all the statistics. + */ + @Override + String toString(); +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java new file mode 100644 index 0000000000..fd18910813 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * Stats for the AbfsInputStream. + */ +public class AbfsInputStreamStatisticsImpl + implements AbfsInputStreamStatistics { + private long seekOperations; + private long forwardSeekOperations; + private long backwardSeekOperations; + private long bytesRead; + private long bytesSkippedOnSeek; + private long bytesBackwardsOnSeek; + private long seekInBuffer; + private long readOperations; + private long bytesReadFromBuffer; + private long remoteReadOperations; + + /** + * Seek backwards, incrementing the seek and backward seek counters. + * + * @param negativeOffset how far was the seek? + * This is expected to be negative. + */ + @Override + public void seekBackwards(long negativeOffset) { + seekOperations++; + backwardSeekOperations++; + bytesBackwardsOnSeek -= negativeOffset; + } + + /** + * Record a forward seek, adding a seek operation, a forward + * seek operation, and any bytes skipped. + * + * @param skipped number of bytes skipped by reading from the stream. + * If the seek was implemented by a close + reopen, set this to zero. + */ + @Override + public void seekForwards(long skipped) { + seekOperations++; + forwardSeekOperations++; + if (skipped > 0) { + bytesSkippedOnSeek += skipped; + } + } + + /** + * Record a forward or backward seek, adding a seek operation, a forward or + * a backward seek operation, and number of bytes skipped. + * The seek direction will be calculated based on the parameters. + * + * @param seekTo seek to the position. + * @param currentPos current position. + */ + @Override + public void seek(long seekTo, long currentPos) { + if (seekTo >= currentPos) { + this.seekForwards(seekTo - currentPos); + } else { + this.seekBackwards(currentPos - seekTo); + } + } + + /** + * Increment the bytes read counter by the number of bytes; + * no-op if the argument is negative. + * + * @param bytes number of bytes read. + */ + @Override + public void bytesRead(long bytes) { + if (bytes > 0) { + bytesRead += bytes; + } + } + + /** + * {@inheritDoc} + * + * Total bytes read from the buffer. + * + * @param bytes number of bytes that are read from buffer. + */ + @Override + public void bytesReadFromBuffer(long bytes) { + if (bytes > 0) { + bytesReadFromBuffer += bytes; + } + } + + /** + * {@inheritDoc} + * + * Increment the number of seeks in the buffer. + */ + @Override + public void seekInBuffer() { + seekInBuffer++; + } + + /** + * A {@code read(byte[] buf, int off, int len)} operation has started. + * + * @param pos starting position of the read. + * @param len length of bytes to read. + */ + @Override + public void readOperationStarted(long pos, long len) { + readOperations++; + } + + /** + * {@inheritDoc} + * + * Increment the counter when a remote read operation occurs. + */ + @Override + public void remoteReadOperation() { + remoteReadOperations++; + } + + public long getSeekOperations() { + return seekOperations; + } + + public long getForwardSeekOperations() { + return forwardSeekOperations; + } + + public long getBackwardSeekOperations() { + return backwardSeekOperations; + } + + public long getBytesRead() { + return bytesRead; + } + + public long getBytesSkippedOnSeek() { + return bytesSkippedOnSeek; + } + + public long getBytesBackwardsOnSeek() { + return bytesBackwardsOnSeek; + } + + public long getSeekInBuffer() { + return seekInBuffer; + } + + public long getReadOperations() { + return readOperations; + } + + public long getBytesReadFromBuffer() { + return bytesReadFromBuffer; + } + + public long getRemoteReadOperations() { + return remoteReadOperations; + } + + /** + * String operator describes all the current statistics. + * Important: there are no guarantees as to the stability + * of this value. + * + * @return the current values of the stream statistics. + */ + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "StreamStatistics{"); + sb.append(", SeekOperations=").append(seekOperations); + sb.append(", ForwardSeekOperations=").append(forwardSeekOperations); + sb.append(", BackwardSeekOperations=").append(backwardSeekOperations); + sb.append(", BytesSkippedOnSeek=").append(bytesSkippedOnSeek); + sb.append(", BytesBackwardsOnSeek=").append(bytesBackwardsOnSeek); + sb.append(", seekInBuffer=").append(seekInBuffer); + sb.append(", BytesRead=").append(bytesRead); + sb.append(", ReadOperations=").append(readOperations); + sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer); + sb.append(", remoteReadOperations=").append(remoteReadOperations); + sb.append('}'); + return sb.toString(); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java new file mode 100644 index 0000000000..7a62ecab7f --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; +import org.apache.hadoop.io.IOUtils; + +public class ITestAbfsInputStreamStatistics + extends AbstractAbfsIntegrationTest { + private static final int OPERATIONS = 10; + private static final Logger LOG = + LoggerFactory.getLogger(ITestAbfsInputStreamStatistics.class); + private static final int ONE_MB = 1024 * 1024; + private static final int ONE_KB = 1024; + private byte[] defBuffer = new byte[ONE_MB]; + + public ITestAbfsInputStreamStatistics() throws Exception { + } + + /** + * Test to check the initial values of the AbfsInputStream statistics. + */ + @Test + public void testInitValues() throws IOException { + describe("Testing the initial values of AbfsInputStream Statistics"); + + AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore abfss = fs.getAbfsStore(); + Path initValuesPath = path(getMethodName()); + AbfsOutputStream outputStream = null; + AbfsInputStream inputStream = null; + + try { + + outputStream = createAbfsOutputStreamWithFlushEnabled(fs, initValuesPath); + inputStream = abfss.openFileForRead(initValuesPath, fs.getFsStatistics()); + + AbfsInputStreamStatisticsImpl stats = + (AbfsInputStreamStatisticsImpl) inputStream.getStreamStatistics(); + + checkInitValue(stats.getSeekOperations(), "seekOps"); + checkInitValue(stats.getForwardSeekOperations(), "forwardSeekOps"); + checkInitValue(stats.getBackwardSeekOperations(), "backwardSeekOps"); + checkInitValue(stats.getBytesRead(), "bytesRead"); + checkInitValue(stats.getBytesSkippedOnSeek(), "bytesSkippedOnSeek"); + checkInitValue(stats.getBytesBackwardsOnSeek(), "bytesBackwardsOnSeek"); + checkInitValue(stats.getSeekInBuffer(), "seekInBuffer"); + checkInitValue(stats.getReadOperations(), "readOps"); + checkInitValue(stats.getBytesReadFromBuffer(), "bytesReadFromBuffer"); + checkInitValue(stats.getRemoteReadOperations(), "remoteReadOps"); + + } finally { + IOUtils.cleanupWithLogger(LOG, outputStream, inputStream); + } + } + + /** + * Test to check statistics from seek operation in AbfsInputStream. + */ + @Test + public void testSeekStatistics() throws IOException { + describe("Testing the values of statistics from seek operations in " + + "AbfsInputStream"); + + AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore abfss = fs.getAbfsStore(); + Path seekStatPath = path(getMethodName()); + + AbfsOutputStream out = null; + AbfsInputStream in = null; + + try { + out = createAbfsOutputStreamWithFlushEnabled(fs, seekStatPath); + + //Writing a default buffer in a file. + out.write(defBuffer); + out.hflush(); + in = abfss.openFileForRead(seekStatPath, fs.getFsStatistics()); + + /* + * Writing 1MB buffer to the file, this would make the fCursor(Current + * position of cursor) to the end of file. + */ + int result = in.read(defBuffer, 0, ONE_MB); + LOG.info("Result of read : {}", result); + + /* + * Seeking to start of file and then back to end would result in a + * backward and a forward seek respectively 10 times. + */ + for (int i = 0; i < OPERATIONS; i++) { + in.seek(0); + in.seek(ONE_MB); + } + + AbfsInputStreamStatisticsImpl stats = + (AbfsInputStreamStatisticsImpl) in.getStreamStatistics(); + + LOG.info("STATISTICS: {}", stats.toString()); + + /* + * seekOps - Since we are doing backward and forward seek OPERATIONS + * times, total seeks would be 2 * OPERATIONS. + * + * backwardSeekOps - Since we are doing a backward seek inside a loop + * for OPERATION times, total backward seeks would be OPERATIONS. + * + * forwardSeekOps - Since we are doing a forward seek inside a loop + * for OPERATION times, total forward seeks would be OPERATIONS. + * + * bytesBackwardsOnSeek - Since we are doing backward seeks from end of + * file in a ONE_MB file each time, this would mean the bytes from + * backward seek would be OPERATIONS * ONE_MB. Since this is backward + * seek this value is expected be to be negative. + * + * bytesSkippedOnSeek - Since, we move from start to end in seek, but + * our fCursor(position of cursor) always remain at end of file, this + * would mean no bytes were skipped on seek. Since, all forward seeks + * are in buffer. + * + * seekInBuffer - Since all seeks were in buffer, the seekInBuffer + * would be equal to 2 * OPERATIONS. + * + */ + assertEquals("Mismatch in seekOps value", 2 * OPERATIONS, + stats.getSeekOperations()); + assertEquals("Mismatch in backwardSeekOps value", OPERATIONS, + stats.getBackwardSeekOperations()); + assertEquals("Mismatch in forwardSeekOps value", OPERATIONS, + stats.getForwardSeekOperations()); + assertEquals("Mismatch in bytesBackwardsOnSeek value", + -1 * OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek()); + assertEquals("Mismatch in bytesSkippedOnSeek value", + 0, stats.getBytesSkippedOnSeek()); + assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS, + stats.getSeekInBuffer()); + + in.close(); + // Verifying whether stats are readable after stream is closed. + LOG.info("STATISTICS after closing: {}", stats.toString()); + } finally { + IOUtils.cleanupWithLogger(LOG, out, in); + } + } + + /** + * Test to check statistics value from read operation in AbfsInputStream. + */ + @Test + public void testReadStatistics() throws IOException { + describe("Testing the values of statistics from read operation in " + + "AbfsInputStream"); + + AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore abfss = fs.getAbfsStore(); + Path readStatPath = path(getMethodName()); + + AbfsOutputStream out = null; + AbfsInputStream in = null; + + try { + out = createAbfsOutputStreamWithFlushEnabled(fs, readStatPath); + + /* + * Writing 1MB buffer to the file. + */ + out.write(defBuffer); + out.hflush(); + in = abfss.openFileForRead(readStatPath, fs.getFsStatistics()); + + /* + * Doing file read 10 times. + */ + for (int i = 0; i < OPERATIONS; i++) { + in.read(); + } + + AbfsInputStreamStatisticsImpl stats = + (AbfsInputStreamStatisticsImpl) in.getStreamStatistics(); + + LOG.info("STATISTICS: {}", stats.toString()); + + /* + * bytesRead - Since each time a single byte is read, total + * bytes read would be equal to OPERATIONS. + * + * readOps - Since each time read operation is performed OPERATIONS + * times, total number of read operations would be equal to OPERATIONS. + * + * remoteReadOps - Only a single remote read operation is done. Hence, + * total remote read ops is 1. + * + */ + assertEquals("Mismatch in bytesRead value", OPERATIONS, + stats.getBytesRead()); + assertEquals("Mismatch in readOps value", OPERATIONS, + stats.getReadOperations()); + assertEquals("Mismatch in remoteReadOps value", 1, + stats.getRemoteReadOperations()); + + in.close(); + // Verifying if stats are still readable after stream is closed. + LOG.info("STATISTICS after closing: {}", stats.toString()); + } finally { + IOUtils.cleanupWithLogger(LOG, out, in); + } + } + + /** + * Testing AbfsInputStream works with null Statistics. + */ + @Test + public void testWithNullStreamStatistics() throws IOException { + describe("Testing AbfsInputStream operations with statistics as null"); + + AzureBlobFileSystem fs = getFileSystem(); + Path nullStatFilePath = path(getMethodName()); + byte[] oneKbBuff = new byte[ONE_KB]; + + // Creating an AbfsInputStreamContext instance with null StreamStatistics. + AbfsInputStreamContext abfsInputStreamContext = + new AbfsInputStreamContext( + getConfiguration().getSasTokenRenewPeriodForStreamsInSeconds()) + .withReadBufferSize(getConfiguration().getReadBufferSize()) + .withReadAheadQueueDepth(getConfiguration().getReadAheadQueueDepth()) + .withStreamStatistics(null) + .build(); + + AbfsOutputStream out = null; + AbfsInputStream in = null; + + try { + out = createAbfsOutputStreamWithFlushEnabled(fs, nullStatFilePath); + + // Writing a 1KB buffer in the file. + out.write(oneKbBuff); + out.hflush(); + + // AbfsRestOperation Instance required for eTag. + AbfsRestOperation abfsRestOperation = + fs.getAbfsClient().getPathStatus(nullStatFilePath.toUri().getPath(), false); + + // AbfsInputStream with no StreamStatistics. + in = new AbfsInputStream(fs.getAbfsClient(), null, + nullStatFilePath.toUri().getPath(), ONE_KB, + abfsInputStreamContext, + abfsRestOperation.getResult().getResponseHeader("ETag")); + + // Verifying that AbfsInputStream Operations works with null statistics. + assertNotEquals("AbfsInputStream read() with null statistics should " + + "work", -1, in.read()); + in.seek(ONE_KB); + + // Verifying toString() with no StreamStatistics. + LOG.info("AbfsInputStream: {}", in.toString()); + } finally { + IOUtils.cleanupWithLogger(LOG, out, in); + } + } + + /** + * Method to assert the initial values of the statistics. + * + * @param actualValue the actual value of the statistics. + * @param statistic the name of operation or statistic being asserted. + */ + private void checkInitValue(long actualValue, String statistic) { + assertEquals("Mismatch in " + statistic + " value", 0, actualValue); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsInputStreamStatistics.java new file mode 100644 index 0000000000..22c247f98a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsInputStreamStatistics.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import org.junit.Test; + +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl; + +public class TestAbfsInputStreamStatistics extends AbstractAbfsIntegrationTest { + + private static final int OPERATIONS = 100; + + public TestAbfsInputStreamStatistics() throws Exception { + } + + /** + * Test to check the bytesReadFromBuffer statistic value from AbfsInputStream. + */ + @Test + public void testBytesReadFromBufferStatistic() { + describe("Testing bytesReadFromBuffer statistics value in AbfsInputStream"); + + AbfsInputStreamStatisticsImpl abfsInputStreamStatistics = + new AbfsInputStreamStatisticsImpl(); + + //Increment the bytesReadFromBuffer value. + for (int i = 0; i < OPERATIONS; i++) { + abfsInputStreamStatistics.bytesReadFromBuffer(1); + } + + /* + * Since we incremented the bytesReadFromBuffer OPERATIONS times, this + * should be the expected value. + */ + assertEquals("Mismatch in bytesReadFromBuffer value", OPERATIONS, + abfsInputStreamStatistics.getBytesReadFromBuffer()); + + } +}