HADOOP-17113. Adding ReadAhead Counters in ABFS (#2154)
Contributed by Mehakmeet Singh
This commit is contained in:
parent
d5b4766158
commit
48a7c5b6ba
@ -238,6 +238,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
||||
if (receivedBytes > 0) {
|
||||
incrementReadOps();
|
||||
LOG.debug("Received data from read ahead, not doing remote read");
|
||||
if (streamStatistics != null) {
|
||||
streamStatistics.readAheadBytesRead(receivedBytes);
|
||||
}
|
||||
return receivedBytes;
|
||||
}
|
||||
|
||||
@ -292,6 +295,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
||||
throw new IOException(ex);
|
||||
}
|
||||
long bytesRead = op.getResult().getBytesReceived();
|
||||
if (streamStatistics != null) {
|
||||
streamStatistics.remoteBytesRead(bytesRead);
|
||||
}
|
||||
if (bytesRead > Integer.MAX_VALUE) {
|
||||
throw new IOException("Unexpected Content-Length");
|
||||
}
|
||||
|
@ -84,6 +84,18 @@ public interface AbfsInputStreamStatistics {
|
||||
*/
|
||||
void remoteReadOperation();
|
||||
|
||||
/**
|
||||
* Records the bytes read from readAhead buffer.
|
||||
* @param bytes the bytes to be incremented.
|
||||
*/
|
||||
void readAheadBytesRead(long bytes);
|
||||
|
||||
/**
|
||||
* Records bytes read remotely after nothing from readAheadBuffer was read.
|
||||
* @param bytes the bytes to be incremented.
|
||||
*/
|
||||
void remoteBytesRead(long bytes);
|
||||
|
||||
/**
|
||||
* Makes the string of all the AbfsInputStream statistics.
|
||||
* @return the string with all the statistics.
|
||||
|
@ -33,6 +33,8 @@ public class AbfsInputStreamStatisticsImpl
|
||||
private long readOperations;
|
||||
private long bytesReadFromBuffer;
|
||||
private long remoteReadOperations;
|
||||
private long readAheadBytesRead;
|
||||
private long remoteBytesRead;
|
||||
|
||||
/**
|
||||
* Seek backwards, incrementing the seek and backward seek counters.
|
||||
@ -128,6 +130,30 @@ public class AbfsInputStreamStatisticsImpl
|
||||
readOperations++;
|
||||
}
|
||||
|
||||
/**
|
||||
* Total bytes read from readAhead buffer during a read operation.
|
||||
*
|
||||
* @param bytes the bytes to be incremented.
|
||||
*/
|
||||
@Override
|
||||
public void readAheadBytesRead(long bytes) {
|
||||
if (bytes > 0) {
|
||||
readAheadBytesRead += bytes;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Total bytes read remotely after nothing was read from readAhead buffer.
|
||||
*
|
||||
* @param bytes the bytes to be incremented.
|
||||
*/
|
||||
@Override
|
||||
public void remoteBytesRead(long bytes) {
|
||||
if (bytes > 0) {
|
||||
remoteBytesRead += bytes;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*
|
||||
@ -178,6 +204,14 @@ public class AbfsInputStreamStatisticsImpl
|
||||
return remoteReadOperations;
|
||||
}
|
||||
|
||||
public long getReadAheadBytesRead() {
|
||||
return readAheadBytesRead;
|
||||
}
|
||||
|
||||
public long getRemoteBytesRead() {
|
||||
return remoteBytesRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* String operator describes all the current statistics.
|
||||
* <b>Important: there are no guarantees as to the stability
|
||||
@ -199,6 +233,8 @@ public class AbfsInputStreamStatisticsImpl
|
||||
sb.append(", ReadOperations=").append(readOperations);
|
||||
sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer);
|
||||
sb.append(", remoteReadOperations=").append(remoteReadOperations);
|
||||
sb.append(", readAheadBytesRead=").append(readAheadBytesRead);
|
||||
sb.append(", remoteBytesRead=").append(remoteBytesRead);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azurebfs;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -39,6 +40,10 @@ public class ITestAbfsInputStreamStatistics
|
||||
LoggerFactory.getLogger(ITestAbfsInputStreamStatistics.class);
|
||||
private static final int ONE_MB = 1024 * 1024;
|
||||
private static final int ONE_KB = 1024;
|
||||
private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024;
|
||||
private static final int CUSTOM_READ_AHEAD_BUFFER_SIZE = 8 * CUSTOM_BLOCK_BUFFER_SIZE;
|
||||
private static final int THREAD_SLEEP_10_SECONDS = 10;
|
||||
private static final int TIMEOUT_30_SECONDS = 30000;
|
||||
private byte[] defBuffer = new byte[ONE_MB];
|
||||
|
||||
public ITestAbfsInputStreamStatistics() throws Exception {
|
||||
@ -75,6 +80,8 @@ public class ITestAbfsInputStreamStatistics
|
||||
checkInitValue(stats.getReadOperations(), "readOps");
|
||||
checkInitValue(stats.getBytesReadFromBuffer(), "bytesReadFromBuffer");
|
||||
checkInitValue(stats.getRemoteReadOperations(), "remoteReadOps");
|
||||
checkInitValue(stats.getReadAheadBytesRead(), "readAheadBytesRead");
|
||||
checkInitValue(stats.getRemoteBytesRead(), "readAheadRemoteBytesRead");
|
||||
|
||||
} finally {
|
||||
IOUtils.cleanupWithLogger(LOG, outputStream, inputStream);
|
||||
@ -285,6 +292,94 @@ public class ITestAbfsInputStreamStatistics
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Testing readAhead counters in AbfsInputStream with 30 seconds timeout.
|
||||
*/
|
||||
@Test(timeout = TIMEOUT_30_SECONDS)
|
||||
public void testReadAheadCounters() throws IOException, InterruptedException {
|
||||
describe("Test to check correct values for readAhead counters in "
|
||||
+ "AbfsInputStream");
|
||||
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
AzureBlobFileSystemStore abfss = fs.getAbfsStore();
|
||||
Path readAheadCountersPath = path(getMethodName());
|
||||
|
||||
/*
|
||||
* Setting the block size for readAhead as 4KB.
|
||||
*/
|
||||
abfss.getAbfsConfiguration().setReadBufferSize(CUSTOM_BLOCK_BUFFER_SIZE);
|
||||
|
||||
AbfsOutputStream out = null;
|
||||
AbfsInputStream in = null;
|
||||
|
||||
try {
|
||||
|
||||
/*
|
||||
* Creating a file of 1MB size.
|
||||
*/
|
||||
out = createAbfsOutputStreamWithFlushEnabled(fs, readAheadCountersPath);
|
||||
out.write(defBuffer);
|
||||
out.close();
|
||||
|
||||
in = abfss.openFileForRead(readAheadCountersPath, fs.getFsStatistics());
|
||||
|
||||
/*
|
||||
* Reading 1KB after each i * KB positions. Hence the reads are from 0
|
||||
* to 1KB, 1KB to 2KB, and so on.. for 5 operations.
|
||||
*/
|
||||
for (int i = 0; i < 5; i++) {
|
||||
in.seek(ONE_KB * i);
|
||||
in.read(defBuffer, ONE_KB * i, ONE_KB);
|
||||
}
|
||||
AbfsInputStreamStatisticsImpl stats =
|
||||
(AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
|
||||
|
||||
/*
|
||||
* Since, readAhead is done in background threads. Sometimes, the
|
||||
* threads aren't finished in the background and could result in
|
||||
* inaccurate results. So, we wait till we have the accurate values
|
||||
* with a limit of 30 seconds as that's when the test times out.
|
||||
*
|
||||
*/
|
||||
while (stats.getRemoteBytesRead() < CUSTOM_READ_AHEAD_BUFFER_SIZE
|
||||
|| stats.getReadAheadBytesRead() < CUSTOM_BLOCK_BUFFER_SIZE) {
|
||||
Thread.sleep(THREAD_SLEEP_10_SECONDS);
|
||||
}
|
||||
|
||||
/*
|
||||
* Verifying the counter values of readAheadBytesRead and remoteBytesRead.
|
||||
*
|
||||
* readAheadBytesRead : Since, we read 1KBs 5 times, that means we go
|
||||
* from 0 to 5KB in the file. The bufferSize is set to 4KB, and since
|
||||
* we have 8 blocks of readAhead buffer. We would have 8 blocks of 4KB
|
||||
* buffer. Our read is till 5KB, hence readAhead would ideally read 2
|
||||
* blocks of 4KB which is equal to 8KB. But, sometimes to get more than
|
||||
* one block from readAhead buffer we might have to wait for background
|
||||
* threads to fill the buffer and hence we might do remote read which
|
||||
* would be faster. Therefore, readAheadBytesRead would be equal to or
|
||||
* greater than 4KB.
|
||||
*
|
||||
* remoteBytesRead : Since, the bufferSize is set to 4KB and the number
|
||||
* of blocks or readAheadQueueDepth is equal to 8. We would read 8 * 4
|
||||
* KB buffer on the first read, which is equal to 32KB. But, if we are not
|
||||
* able to read some bytes that were in the buffer after doing
|
||||
* readAhead, we might use remote read again. Thus, the bytes read
|
||||
* remotely could also be greater than 32Kb.
|
||||
*
|
||||
*/
|
||||
Assertions.assertThat(stats.getReadAheadBytesRead()).describedAs(
|
||||
"Mismatch in readAheadBytesRead counter value")
|
||||
.isGreaterThanOrEqualTo(CUSTOM_BLOCK_BUFFER_SIZE);
|
||||
|
||||
Assertions.assertThat(stats.getRemoteBytesRead()).describedAs(
|
||||
"Mismatch in remoteBytesRead counter value")
|
||||
.isGreaterThanOrEqualTo(CUSTOM_READ_AHEAD_BUFFER_SIZE);
|
||||
|
||||
} finally {
|
||||
IOUtils.cleanupWithLogger(LOG, out, in);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to assert the initial values of the statistics.
|
||||
*
|
||||
|
Loading…
x
Reference in New Issue
Block a user