HDFS-5762. BlockReaderLocal does not return -1 on EOF when doing zero-length reads (cmccabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1558526 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
349f25a132
commit
037a89abc5
@ -674,6 +674,9 @@ Release 2.4.0 - UNRELEASED
|
|||||||
|
|
||||||
HDFS-5220. Expose group resolution time as metric (jxiang via cmccabe)
|
HDFS-5220. Expose group resolution time as metric (jxiang via cmccabe)
|
||||||
|
|
||||||
|
HDFS-5762. BlockReaderLocal doesn't return -1 on EOF when doing zero-length
|
||||||
|
reads (Colin Patrick McCabe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
|
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
|
||||||
|
@ -39,6 +39,8 @@ public interface BlockReader extends ByteBufferReadable {
|
|||||||
* "Read should not modify user buffer before successful read"
|
* "Read should not modify user buffer before successful read"
|
||||||
* because it first reads the data to user buffer and then checks
|
* because it first reads the data to user buffer and then checks
|
||||||
* the checksum.
|
* the checksum.
|
||||||
|
* Note: this must return -1 on EOF, even in the case of a 0-byte read.
|
||||||
|
* See HDFS-5762 for details.
|
||||||
*/
|
*/
|
||||||
int read(byte[] buf, int off, int len) throws IOException;
|
int read(byte[] buf, int off, int len) throws IOException;
|
||||||
|
|
||||||
|
@ -328,10 +328,12 @@ private synchronized void freeChecksumBufIfExists() {
|
|||||||
|
|
||||||
private synchronized int drainDataBuf(ByteBuffer buf)
|
private synchronized int drainDataBuf(ByteBuffer buf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (dataBuf == null) return 0;
|
if (dataBuf == null) return -1;
|
||||||
int oldLimit = dataBuf.limit();
|
int oldLimit = dataBuf.limit();
|
||||||
int nRead = Math.min(dataBuf.remaining(), buf.remaining());
|
int nRead = Math.min(dataBuf.remaining(), buf.remaining());
|
||||||
if (nRead == 0) return 0;
|
if (nRead == 0) {
|
||||||
|
return (dataBuf.remaining() == 0) ? -1 : 0;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
dataBuf.limit(dataBuf.position() + nRead);
|
dataBuf.limit(dataBuf.position() + nRead);
|
||||||
buf.put(dataBuf);
|
buf.put(dataBuf);
|
||||||
@ -444,13 +446,11 @@ private synchronized int readWithoutBounceBuffer(ByteBuffer buf)
|
|||||||
int total = 0;
|
int total = 0;
|
||||||
while (buf.hasRemaining()) {
|
while (buf.hasRemaining()) {
|
||||||
int nRead = dataIn.read(buf, dataPos);
|
int nRead = dataIn.read(buf, dataPos);
|
||||||
if (nRead < 0) {
|
if (nRead <= 0) break;
|
||||||
break;
|
|
||||||
}
|
|
||||||
dataPos += nRead;
|
dataPos += nRead;
|
||||||
total += nRead;
|
total += nRead;
|
||||||
}
|
}
|
||||||
return (total == 0) ? -1 : total;
|
return (total == 0 && (dataPos == dataIn.size())) ? -1 : total;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -512,15 +512,15 @@ private synchronized boolean fillDataBuf(boolean canSkipChecksum)
|
|||||||
private synchronized int readWithBounceBuffer(ByteBuffer buf,
|
private synchronized int readWithBounceBuffer(ByteBuffer buf,
|
||||||
boolean canSkipChecksum) throws IOException {
|
boolean canSkipChecksum) throws IOException {
|
||||||
int total = 0;
|
int total = 0;
|
||||||
boolean eof = false;
|
int bb = drainDataBuf(buf); // drain bounce buffer if possible
|
||||||
while (true) {
|
if (bb >= 0) {
|
||||||
int bb = drainDataBuf(buf); // drain bounce buffer if possible
|
|
||||||
total += bb;
|
total += bb;
|
||||||
int needed = buf.remaining();
|
if (buf.remaining() == 0) return total;
|
||||||
if (eof || (needed == 0)) {
|
}
|
||||||
break;
|
boolean eof = false;
|
||||||
} else if (buf.isDirect() && (needed >= maxReadaheadLength)
|
do {
|
||||||
&& ((dataPos % bytesPerChecksum) == 0)) {
|
if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength)
|
||||||
|
&& ((dataPos % bytesPerChecksum) == 0)) {
|
||||||
// Fast lane: try to read directly into user-supplied buffer, bypassing
|
// Fast lane: try to read directly into user-supplied buffer, bypassing
|
||||||
// bounce buffer.
|
// bounce buffer.
|
||||||
int oldLimit = buf.limit();
|
int oldLimit = buf.limit();
|
||||||
@ -540,9 +540,13 @@ private synchronized int readWithBounceBuffer(ByteBuffer buf,
|
|||||||
if (fillDataBuf(canSkipChecksum)) {
|
if (fillDataBuf(canSkipChecksum)) {
|
||||||
eof = true;
|
eof = true;
|
||||||
}
|
}
|
||||||
|
bb = drainDataBuf(buf); // drain bounce buffer if possible
|
||||||
|
if (bb >= 0) {
|
||||||
|
total += bb;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
} while ((!eof) && (buf.remaining() > 0));
|
||||||
return total == 0 ? -1 : total;
|
return (eof && total == 0) ? -1 : total;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -587,8 +591,10 @@ private synchronized int readWithoutBounceBuffer(byte arr[], int off,
|
|||||||
int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos);
|
int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos);
|
||||||
if (nRead > 0) {
|
if (nRead > 0) {
|
||||||
dataPos += nRead;
|
dataPos += nRead;
|
||||||
|
} else if ((nRead == 0) && (dataPos == dataIn.size())) {
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
return nRead == 0 ? -1 : nRead;
|
return nRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized int readWithBounceBuffer(byte arr[], int off, int len,
|
private synchronized int readWithBounceBuffer(byte arr[], int off, int len,
|
||||||
@ -599,9 +605,10 @@ private synchronized int readWithBounceBuffer(byte arr[], int off, int len,
|
|||||||
dataBuf.limit(maxReadaheadLength);
|
dataBuf.limit(maxReadaheadLength);
|
||||||
fillDataBuf(canSkipChecksum);
|
fillDataBuf(canSkipChecksum);
|
||||||
}
|
}
|
||||||
|
if (dataBuf.remaining() == 0) return -1;
|
||||||
int toRead = Math.min(dataBuf.remaining(), len);
|
int toRead = Math.min(dataBuf.remaining(), len);
|
||||||
dataBuf.get(arr, off, toRead);
|
dataBuf.get(arr, off, toRead);
|
||||||
return toRead == 0 ? -1 : toRead;
|
return toRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -604,6 +604,24 @@ public void doTest(BlockReaderLocal reader, byte original[])
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class TestBlockReaderLocalReadZeroBytes
|
||||||
|
extends BlockReaderLocalTest {
|
||||||
|
@Override
|
||||||
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
|
throws IOException {
|
||||||
|
byte emptyArr[] = new byte[0];
|
||||||
|
Assert.assertEquals(0, reader.read(emptyArr, 0, 0));
|
||||||
|
ByteBuffer emptyBuf = ByteBuffer.wrap(emptyArr);
|
||||||
|
Assert.assertEquals(0, reader.read(emptyBuf));
|
||||||
|
reader.skip(1);
|
||||||
|
Assert.assertEquals(0, reader.read(emptyArr, 0, 0));
|
||||||
|
Assert.assertEquals(0, reader.read(emptyBuf));
|
||||||
|
reader.skip(BlockReaderLocalTest.TEST_LENGTH - 1);
|
||||||
|
Assert.assertEquals(-1, reader.read(emptyArr, 0, 0));
|
||||||
|
Assert.assertEquals(-1, reader.read(emptyBuf));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderLocalOnFileWithoutChecksum()
|
public void testBlockReaderLocalOnFileWithoutChecksum()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
@ -632,6 +650,35 @@ public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead()
|
|||||||
false, 0);
|
false, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalReadZeroBytes()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
|
||||||
|
true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalReadZeroBytesNoChecksum()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
|
||||||
|
false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalReadZeroBytesNoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
|
||||||
|
true, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalReadZeroBytesNoChecksumNoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
|
||||||
|
false, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
public void TestStatisticsForShortCircuitLocalRead() throws Exception {
|
public void TestStatisticsForShortCircuitLocalRead() throws Exception {
|
||||||
testStatistics(true);
|
testStatistics(true);
|
||||||
|
Loading…
Reference in New Issue
Block a user