HADOOP-10130. RawLocalFS pread does not track FileSystem Statistics (Binglin Chang via Colin Patrick McCabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1547117 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
58f73acdc1
commit
08d6213083
@ -516,6 +516,9 @@ Release 2.2.1 - UNRELEASED
|
|||||||
HADOOP-9114. After defined the dfs.checksum.type as the NULL, write file and hflush will
|
HADOOP-9114. After defined the dfs.checksum.type as the NULL, write file and hflush will
|
||||||
through java.lang.ArrayIndexOutOfBoundsException (Sathish via umamahesh)
|
through java.lang.ArrayIndexOutOfBoundsException (Sathish via umamahesh)
|
||||||
|
|
||||||
|
HADOOP-10130. RawLocalFS::LocalFSFileInputStream.pread does not track
|
||||||
|
FS::Statistics (Binglin Chang via Colin Patrick McCabe)
|
||||||
|
|
||||||
Release 2.2.0 - 2013-10-13
|
Release 2.2.0 - 2013-10-13
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -83,39 +83,6 @@ public void initialize(URI uri, Configuration conf) throws IOException {
|
|||||||
setConf(conf);
|
setConf(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
class TrackingFileInputStream extends FileInputStream {
|
|
||||||
public TrackingFileInputStream(File f) throws IOException {
|
|
||||||
super(f);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int read() throws IOException {
|
|
||||||
int result = super.read();
|
|
||||||
if (result != -1) {
|
|
||||||
statistics.incrementBytesRead(1);
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int read(byte[] data) throws IOException {
|
|
||||||
int result = super.read(data);
|
|
||||||
if (result != -1) {
|
|
||||||
statistics.incrementBytesRead(result);
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int read(byte[] data, int offset, int length) throws IOException {
|
|
||||||
int result = super.read(data, offset, length);
|
|
||||||
if (result != -1) {
|
|
||||||
statistics.incrementBytesRead(result);
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*******************************************************
|
/*******************************************************
|
||||||
* For open()'s FSInputStream.
|
* For open()'s FSInputStream.
|
||||||
*******************************************************/
|
*******************************************************/
|
||||||
@ -124,7 +91,7 @@ class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor
|
|||||||
private long position;
|
private long position;
|
||||||
|
|
||||||
public LocalFSFileInputStream(Path f) throws IOException {
|
public LocalFSFileInputStream(Path f) throws IOException {
|
||||||
this.fis = new TrackingFileInputStream(pathToFile(f));
|
fis = new FileInputStream(pathToFile(f));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -159,6 +126,7 @@ public int read() throws IOException {
|
|||||||
int value = fis.read();
|
int value = fis.read();
|
||||||
if (value >= 0) {
|
if (value >= 0) {
|
||||||
this.position++;
|
this.position++;
|
||||||
|
statistics.incrementBytesRead(1);
|
||||||
}
|
}
|
||||||
return value;
|
return value;
|
||||||
} catch (IOException e) { // unexpected exception
|
} catch (IOException e) { // unexpected exception
|
||||||
@ -172,6 +140,7 @@ public int read(byte[] b, int off, int len) throws IOException {
|
|||||||
int value = fis.read(b, off, len);
|
int value = fis.read(b, off, len);
|
||||||
if (value > 0) {
|
if (value > 0) {
|
||||||
this.position += value;
|
this.position += value;
|
||||||
|
statistics.incrementBytesRead(value);
|
||||||
}
|
}
|
||||||
return value;
|
return value;
|
||||||
} catch (IOException e) { // unexpected exception
|
} catch (IOException e) { // unexpected exception
|
||||||
@ -184,7 +153,11 @@ public int read(long position, byte[] b, int off, int len)
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
ByteBuffer bb = ByteBuffer.wrap(b, off, len);
|
ByteBuffer bb = ByteBuffer.wrap(b, off, len);
|
||||||
try {
|
try {
|
||||||
return fis.getChannel().read(bb, position);
|
int value = fis.getChannel().read(bb, position);
|
||||||
|
if (value > 0) {
|
||||||
|
statistics.incrementBytesRead(value);
|
||||||
|
}
|
||||||
|
return value;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new FSError(e);
|
throw new FSError(e);
|
||||||
}
|
}
|
||||||
|
@ -91,6 +91,7 @@ public void testStatistics() throws IOException, URISyntaxException {
|
|||||||
FSDataInputStream fstr = fc.open(filePath);
|
FSDataInputStream fstr = fc.open(filePath);
|
||||||
byte[] buf = new byte[blockSize];
|
byte[] buf = new byte[blockSize];
|
||||||
int bytesRead = fstr.read(buf, 0, blockSize);
|
int bytesRead = fstr.read(buf, 0, blockSize);
|
||||||
|
fstr.read(0, buf, 0, blockSize);
|
||||||
Assert.assertEquals(blockSize, bytesRead);
|
Assert.assertEquals(blockSize, bytesRead);
|
||||||
verifyReadBytes(stats);
|
verifyReadBytes(stats);
|
||||||
verifyWrittenBytes(stats);
|
verifyWrittenBytes(stats);
|
||||||
|
@ -47,7 +47,8 @@ public void tearDown() throws Exception {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void verifyReadBytes(Statistics stats) {
|
protected void verifyReadBytes(Statistics stats) {
|
||||||
Assert.assertEquals(blockSize, stats.getBytesRead());
|
// one blockSize for read, one for pread
|
||||||
|
Assert.assertEquals(2*blockSize, stats.getBytesRead());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user