HADOOP-3205. Read multiple chunks directly from FSInputChecker subclass into user buffers. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@896243 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas White 2010-01-05 22:14:17 +00:00
parent efcad06506
commit ae91b5d0f4
5 changed files with 229 additions and 67 deletions

View File

@ -83,6 +83,9 @@ Trunk (unreleased changes)
HADOOP-6443. Serialization classes accept invalid metadata.
(Aaron Kimball via tomwhite)
HADOOP-3205. Read multiple chunks directly from FSInputChecker subclass
into user buffers. (Todd Lipcon via tomwhite)
OPTIMIZATIONS
BUG FIXES

View File

@ -205,24 +205,41 @@ public boolean seekToNewSource(long targetPos) throws IOException {
@Override
protected int readChunk(long pos, byte[] buf, int offset, int len,
byte[] checksum) throws IOException {
boolean eof = false;
if(needChecksum()) {
try {
long checksumPos = getChecksumFilePos(pos);
if(checksumPos != sums.getPos()) {
sums.seek(checksumPos);
}
sums.readFully(checksum);
} catch (EOFException e) {
eof = true;
if (needChecksum()) {
assert checksum != null; // we have a checksum buffer
assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
assert len >= bytesPerSum; // we must read at least one chunk
final int checksumsToRead = Math.min(
len/bytesPerSum, // number of checksums based on len to read
checksum.length / CHECKSUM_SIZE); // size of checksum buffer
long checksumPos = getChecksumFilePos(pos);
if(checksumPos != sums.getPos()) {
sums.seek(checksumPos);
}
int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
throw new ChecksumException(
"Checksum file not a length multiple of checksum size " +
"in " + file + " at " + pos + " checksumpos: " + checksumPos +
" sumLenread: " + sumLenRead,
pos);
}
if (sumLenRead <= 0) { // we're at the end of the file
eof = true;
} else {
// Adjust amount of data to read based on how many checksum chunks we read
len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
}
len = bytesPerSum;
}
if(pos != datas.getPos()) {
datas.seek(pos);
}
int nread = readFully(datas, buf, offset, len);
if( eof && nread > 0) {
if (eof && nread > 0) {
throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
}
return nread;

View File

@ -200,21 +200,35 @@ protected int readChunk(long pos, byte[] buf, int offset, int len,
byte[] checksum) throws IOException {
boolean eof = false;
if (needChecksum()) {
try {
final long checksumPos = getChecksumFilePos(pos);
if (checksumPos != sums.getPos()) {
sums.seek(checksumPos);
}
sums.readFully(checksum);
} catch (EOFException e) {
eof = true;
assert checksum != null; // we have a checksum buffer
assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
assert len >= bytesPerSum; // we must read at least one chunk
final int checksumsToRead = Math.min(
len/bytesPerSum, // number of checksums based on len to read
checksum.length / CHECKSUM_SIZE); // size of checksum buffer
long checksumPos = getChecksumFilePos(pos);
if(checksumPos != sums.getPos()) {
sums.seek(checksumPos);
}
int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
throw new EOFException("Checksum file not a length multiple of checksum size " +
"in " + file + " at " + pos + " checksumpos: " + checksumPos +
" sumLenread: " + sumLenRead );
}
if (sumLenRead <= 0) { // we're at the end of the file
eof = true;
} else {
// Adjust amount of data to read based on how many checksum chunks we read
len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
}
len = bytesPerSum;
}
if (pos != datas.getPos()) {
datas.seek(pos);
}
final int nread = readFully(datas, buf, offset, len);
int nread = readFully(datas, buf, offset, len);
if (eof && nread > 0) {
throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
}

View File

@ -24,6 +24,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
/**
* This is a generic input stream for verifying checksums for
@ -38,16 +40,26 @@ abstract public class FSInputChecker extends FSInputStream {
protected Path file;
private Checksum sum;
private boolean verifyChecksum = true;
private byte[] buf;
private int maxChunkSize; // data bytes for checksum (eg 512)
private byte[] buf; // buffer for non-chunk-aligned reading
private byte[] checksum;
private int pos;
private int count;
private IntBuffer checksumInts; // wrapper on checksum buffer
private int pos; // the position of the reader inside buf
private int count; // the number of bytes currently in buf
private int numOfRetries;
// cached file position
// this should always be a multiple of maxChunkSize
private long chunkPos = 0;
// Number of checksum chunks that can be read at once into a user
// buffer. Chosen by benchmarks - higher values do not reduce
// CPU usage. The size of the data reads made to the underlying stream
// will be CHUNKS_PER_READ * maxChunkSize.
private static final int CHUNKS_PER_READ = 32;
protected static final int CHECKSUM_SIZE = 4; // 32-bit checksum
/** Constructor
*
* @param file The name of the file to be read
@ -72,14 +84,34 @@ protected FSInputChecker( Path file, int numOfRetries,
set(verifyChecksum, sum, chunkSize, checksumSize);
}
/** Reads in next checksum chunk data into <code>buf</code> at <code>offset</code>
/**
* Reads in checksum chunks into <code>buf</code> at <code>offset</code>
* and checksum into <code>checksum</code>.
* Since checksums can be disabled, there are two cases implementors need
* to worry about:
*
* (a) needChecksum() will return false:
* - len can be any positive value
* - checksum will be null
* Implementors should simply pass through to the underlying data stream.
* or
* (b) needChecksum() will return true:
* - len >= maxChunkSize
* - checksum.length is a multiple of CHECKSUM_SIZE
* Implementors should read an integer number of data chunks into
* buf. The amount read should be bounded by len or by
* checksum.length / CHECKSUM_SIZE * maxChunkSize. Note that len may
* be a value that is not a multiple of maxChunkSize, in which case
* the implementation may return less than len.
*
* The method is used for implementing read, therefore, it should be optimized
* for sequential reading
* for sequential reading.
*
* @param pos chunkPos
* @param buf desitination buffer
* @param offset offset in buf at which to store data
* @param len maximun number of bytes to read
* @param len maximum number of bytes to read
* @param checksum the data buffer into which to write checksums
* @return number of bytes read
*/
abstract protected int readChunk(long pos, byte[] buf, int offset, int len,
@ -173,7 +205,7 @@ public synchronized int read(byte[] b, int off, int len) throws IOException {
private void fill( ) throws IOException {
assert(pos>=count);
// fill internal buffer
count = readChecksumChunk(buf, 0, buf.length);
count = readChecksumChunk(buf, 0, maxChunkSize);
if (count < 0) count = 0;
}
@ -185,13 +217,13 @@ private int read1(byte b[], int off, int len)
throws IOException {
int avail = count-pos;
if( avail <= 0 ) {
if(len>=buf.length) {
if(len >= maxChunkSize) {
// read a chunk to user buffer directly; avoid one copy
int nread = readChecksumChunk(b, off, len);
return nread;
} else {
// read a chunk into the local buffer
fill();
fill();
if( count <= 0 ) {
return -1;
} else {
@ -207,10 +239,10 @@ private int read1(byte b[], int off, int len)
return cnt;
}
/* Read up one checksum chunk to array <i>b</i> at pos <i>off</i>
* It requires a checksum chunk boundary
/* Read up one or more checksum chunk to array <i>b</i> at pos <i>off</i>
* It requires at least one checksum chunk boundary
* in between <cur_pos, cur_pos+len>
* and it stops reading at the boundary or at the end of the stream;
* and it stops reading at the last boundary or at the end of the stream;
* Otherwise an IllegalArgumentException is thrown.
* This makes sure that all data read are checksum verified.
*
@ -223,7 +255,7 @@ private int read1(byte b[], int off, int len)
* the stream has been reached.
* @throws IOException if an I/O error occurs.
*/
private int readChecksumChunk(byte b[], int off, int len)
private int readChecksumChunk(byte b[], final int off, final int len)
throws IOException {
// invalidate buffer
count = pos = 0;
@ -236,10 +268,9 @@ private int readChecksumChunk(byte b[], int off, int len)
try {
read = readChunk(chunkPos, b, off, len, checksum);
if( read > 0 ) {
if( read > 0) {
if( needChecksum() ) {
sum.update(b, off, read);
verifySum(chunkPos);
verifySums(b, off, read);
}
chunkPos += read;
}
@ -267,25 +298,37 @@ private int readChecksumChunk(byte b[], int off, int len)
return read;
}
/* verify checksum for the chunk.
* @throws ChecksumException if there is a mismatch
*/
private void verifySum(long errPos) throws ChecksumException {
long crc = getChecksum();
long sumValue = sum.getValue();
sum.reset();
if (crc != sumValue) {
throw new ChecksumException(
"Checksum error: "+file+" at "+errPos, errPos);
private void verifySums(final byte b[], final int off, int read)
throws ChecksumException
{
int leftToVerify = read;
int verifyOff = 0;
checksumInts.rewind();
checksumInts.limit((read - 1)/maxChunkSize + 1);
while (leftToVerify > 0) {
sum.update(b, off + verifyOff, Math.min(leftToVerify, maxChunkSize));
int expected = checksumInts.get();
int calculated = (int)sum.getValue();
sum.reset();
if (expected != calculated) {
long errPos = chunkPos + verifyOff;
throw new ChecksumException(
"Checksum error: "+file+" at "+ errPos +
" exp: " + expected + " got: " + calculated, errPos);
}
leftToVerify -= maxChunkSize;
verifyOff += maxChunkSize;
}
}
/* calculate checksum value */
private long getChecksum() {
return checksum2long(checksum);
}
/** Convert a checksum byte array to a long */
/**
* Convert a checksum byte array to a long
* This is deprecated since 0.22 since it is no longer in use
* by this class.
*/
@Deprecated
static public long checksum2long(byte[] checksum) {
long crc = 0L;
for(int i=0; i<checksum.length; i++) {
@ -399,11 +442,19 @@ protected static int readFully(InputStream stm,
* @param checksumSize checksum size
*/
final protected synchronized void set(boolean verifyChecksum,
Checksum sum, int maxChunkSize, int checksumSize ) {
Checksum sum, int maxChunkSize, int checksumSize) {
// The code makes assumptions that checksums are always 32-bit.
assert !verifyChecksum || sum == null || checksumSize == CHECKSUM_SIZE;
this.maxChunkSize = maxChunkSize;
this.verifyChecksum = verifyChecksum;
this.sum = sum;
this.buf = new byte[maxChunkSize];
this.checksum = new byte[checksumSize];
// The size of the checksum array here determines how much we can
// read in a single call to readChunk
this.checksum = new byte[CHUNKS_PER_READ * checksumSize];
this.checksumInts = ByteBuffer.wrap(checksum).asIntBuffer();
this.count = 0;
this.pos = 0;
}

View File

@ -26,6 +26,9 @@
import junit.framework.TestCase;
public class TestChecksumFileSystem extends TestCase {
static final String TEST_ROOT_DIR
= System.getProperty("test.build.data","build/test/data/work-dir/localfs");
public void testgetChecksumLength() throws Exception {
assertEquals(8, ChecksumFileSystem.getChecksumLength(0L, 512));
assertEquals(12, ChecksumFileSystem.getChecksumLength(1L, 512));
@ -39,9 +42,6 @@ public void testgetChecksumLength() throws Exception {
}
public void testVerifyChecksum() throws Exception {
String TEST_ROOT_DIR
= System.getProperty("test.build.data","build/test/data/work-dir/localfs");
Configuration conf = new Configuration();
LocalFileSystem localFs = FileSystem.getLocal(conf);
Path testPath = new Path(TEST_ROOT_DIR, "testPath");
@ -54,9 +54,15 @@ public void testVerifyChecksum() throws Exception {
fout.write("testing you".getBytes());
fout.close();
// Exercise some boundary cases - a divisor of the chunk size
// the chunk size, 2x chunk size, and +/-1 around these.
TestLocalFileSystem.readFile(localFs, testPath, 128);
TestLocalFileSystem.readFile(localFs, testPath, 511);
TestLocalFileSystem.readFile(localFs, testPath, 512);
TestLocalFileSystem.readFile(localFs, testPath, 513);
TestLocalFileSystem.readFile(localFs, testPath, 1023);
TestLocalFileSystem.readFile(localFs, testPath, 1024);
TestLocalFileSystem.readFile(localFs, testPath, 1025);
localFs.delete(localFs.getChecksumFile(testPath), true);
assertTrue("checksum deleted", !localFs.exists(localFs.getChecksumFile(testPath)));
@ -75,9 +81,80 @@ public void testVerifyChecksum() throws Exception {
assertTrue("error reading", errorRead);
//now setting verify false, the read should succeed
localFs.setVerifyChecksum(false);
String str = TestLocalFileSystem.readFile(localFs, testPath, 1024);
assertTrue("read", "testing".equals(str));
try {
localFs.setVerifyChecksum(false);
String str = TestLocalFileSystem.readFile(localFs, testPath, 1024);
assertTrue("read", "testing".equals(str));
} finally {
// reset for other tests
localFs.setVerifyChecksum(true);
}
}
public void testMultiChunkFile() throws Exception {
Configuration conf = new Configuration();
LocalFileSystem localFs = FileSystem.getLocal(conf);
Path testPath = new Path(TEST_ROOT_DIR, "testMultiChunk");
FSDataOutputStream fout = localFs.create(testPath);
for (int i = 0; i < 1000; i++) {
fout.write(("testing" + i).getBytes());
}
fout.close();
// Exercise some boundary cases - a divisor of the chunk size
// the chunk size, 2x chunk size, and +/-1 around these.
TestLocalFileSystem.readFile(localFs, testPath, 128);
TestLocalFileSystem.readFile(localFs, testPath, 511);
TestLocalFileSystem.readFile(localFs, testPath, 512);
TestLocalFileSystem.readFile(localFs, testPath, 513);
TestLocalFileSystem.readFile(localFs, testPath, 1023);
TestLocalFileSystem.readFile(localFs, testPath, 1024);
TestLocalFileSystem.readFile(localFs, testPath, 1025);
}
/**
* Test to ensure that if the checksum file is truncated, a
* ChecksumException is thrown
*/
public void testTruncatedChecksum() throws Exception {
Configuration conf = new Configuration();
LocalFileSystem localFs = FileSystem.getLocal(conf);
Path testPath = new Path(TEST_ROOT_DIR, "testtruncatedcrc");
FSDataOutputStream fout = localFs.create(testPath);
fout.write("testing truncation".getBytes());
fout.close();
// Read in the checksum
Path checksumFile = localFs.getChecksumFile(testPath);
FileSystem rawFs = localFs.getRawFileSystem();
FSDataInputStream checksumStream = rawFs.open(checksumFile);
byte buf[] = new byte[8192];
int read = checksumStream.read(buf, 0, buf.length);
checksumStream.close();
// Now rewrite the checksum file with the last byte missing
FSDataOutputStream replaceStream = rawFs.create(checksumFile);
replaceStream.write(buf, 0, read - 1);
replaceStream.close();
// Now reading the file should fail with a ChecksumException
try {
TestLocalFileSystem.readFile(localFs, testPath, 1024);
fail("Did not throw a ChecksumException when reading truncated " +
"crc file");
} catch(ChecksumException ie) {
}
// telling it not to verify checksums, should avoid issue.
try {
localFs.setVerifyChecksum(false);
String str = TestLocalFileSystem.readFile(localFs, testPath, 1024);
assertTrue("read", "testing truncation".equals(str));
} finally {
// reset for other tests
localFs.setVerifyChecksum(true);
}
}
}