diff --git a/CHANGES.txt b/CHANGES.txt
index a90b2e6b0c..386159ab9d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -83,6 +83,9 @@ Trunk (unreleased changes)
HADOOP-6443. Serialization classes accept invalid metadata.
(Aaron Kimball via tomwhite)
+ HADOOP-3205. Read multiple chunks directly from FSInputChecker subclass
+ into user buffers. (Todd Lipcon via tomwhite)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
index 513270620c..ac69a2ea7b 100644
--- a/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
+++ b/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
@@ -205,24 +205,41 @@ public boolean seekToNewSource(long targetPos) throws IOException {
@Override
protected int readChunk(long pos, byte[] buf, int offset, int len,
byte[] checksum) throws IOException {
+
boolean eof = false;
- if(needChecksum()) {
- try {
- long checksumPos = getChecksumFilePos(pos);
- if(checksumPos != sums.getPos()) {
- sums.seek(checksumPos);
- }
- sums.readFully(checksum);
- } catch (EOFException e) {
- eof = true;
+ if (needChecksum()) {
+ assert checksum != null; // we have a checksum buffer
+ assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
+ assert len >= bytesPerSum; // we must read at least one chunk
+
+ final int checksumsToRead = Math.min(
+ len/bytesPerSum, // number of checksums based on len to read
+ checksum.length / CHECKSUM_SIZE); // size of checksum buffer
+ long checksumPos = getChecksumFilePos(pos);
+ if(checksumPos != sums.getPos()) {
+ sums.seek(checksumPos);
+ }
+
+ int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
+ if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
+ throw new ChecksumException(
+ "Checksum file not a length multiple of checksum size " +
+ "in " + file + " at " + pos + " checksumpos: " + checksumPos +
+ " sumLenread: " + sumLenRead,
+ pos);
+ }
+ if (sumLenRead <= 0) { // we're at the end of the file
+ eof = true;
+ } else {
+ // Adjust amount of data to read based on how many checksum chunks we read
+ len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
}
- len = bytesPerSum;
}
if(pos != datas.getPos()) {
datas.seek(pos);
}
int nread = readFully(datas, buf, offset, len);
- if( eof && nread > 0) {
+ if (eof && nread > 0) {
throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
}
return nread;
diff --git a/src/java/org/apache/hadoop/fs/ChecksumFs.java b/src/java/org/apache/hadoop/fs/ChecksumFs.java
index 60b0cde254..343b1bc69e 100644
--- a/src/java/org/apache/hadoop/fs/ChecksumFs.java
+++ b/src/java/org/apache/hadoop/fs/ChecksumFs.java
@@ -200,21 +200,35 @@ protected int readChunk(long pos, byte[] buf, int offset, int len,
byte[] checksum) throws IOException {
boolean eof = false;
if (needChecksum()) {
- try {
- final long checksumPos = getChecksumFilePos(pos);
- if (checksumPos != sums.getPos()) {
- sums.seek(checksumPos);
- }
- sums.readFully(checksum);
- } catch (EOFException e) {
- eof = true;
+ assert checksum != null; // we have a checksum buffer
+ assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
+ assert len >= bytesPerSum; // we must read at least one chunk
+
+ final int checksumsToRead = Math.min(
+ len/bytesPerSum, // number of checksums based on len to read
+ checksum.length / CHECKSUM_SIZE); // size of checksum buffer
+ long checksumPos = getChecksumFilePos(pos);
+ if(checksumPos != sums.getPos()) {
+ sums.seek(checksumPos);
+ }
+
+ int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
+ if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
+ throw new EOFException("Checksum file not a length multiple of checksum size " +
+ "in " + file + " at " + pos + " checksumpos: " + checksumPos +
+ " sumLenread: " + sumLenRead );
+ }
+ if (sumLenRead <= 0) { // we're at the end of the file
+ eof = true;
+ } else {
+ // Adjust amount of data to read based on how many checksum chunks we read
+ len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
}
- len = bytesPerSum;
}
if (pos != datas.getPos()) {
datas.seek(pos);
}
- final int nread = readFully(datas, buf, offset, len);
+ int nread = readFully(datas, buf, offset, len);
if (eof && nread > 0) {
throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
}
diff --git a/src/java/org/apache/hadoop/fs/FSInputChecker.java b/src/java/org/apache/hadoop/fs/FSInputChecker.java
index 6de45d2c0d..cfe3a202ec 100644
--- a/src/java/org/apache/hadoop/fs/FSInputChecker.java
+++ b/src/java/org/apache/hadoop/fs/FSInputChecker.java
@@ -24,6 +24,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
/**
* This is a generic input stream for verifying checksums for
@@ -38,16 +40,26 @@ abstract public class FSInputChecker extends FSInputStream {
protected Path file;
private Checksum sum;
private boolean verifyChecksum = true;
- private byte[] buf;
+ private int maxChunkSize; // data bytes for checksum (eg 512)
+ private byte[] buf; // buffer for non-chunk-aligned reading
private byte[] checksum;
- private int pos;
- private int count;
+ private IntBuffer checksumInts; // wrapper on checksum buffer
+ private int pos; // the position of the reader inside buf
+ private int count; // the number of bytes currently in buf
private int numOfRetries;
// cached file position
+ // this should always be a multiple of maxChunkSize
private long chunkPos = 0;
-
+
+ // Number of checksum chunks that can be read at once into a user
+ // buffer. Chosen by benchmarks - higher values do not reduce
+ // CPU usage. The size of the data reads made to the underlying stream
+ // will be CHUNKS_PER_READ * maxChunkSize.
+ private static final int CHUNKS_PER_READ = 32;
+ protected static final int CHECKSUM_SIZE = 4; // 32-bit checksum
+
/** Constructor
*
* @param file The name of the file to be read
@@ -72,14 +84,34 @@ protected FSInputChecker( Path file, int numOfRetries,
set(verifyChecksum, sum, chunkSize, checksumSize);
}
- /** Reads in next checksum chunk data into buf
at offset
+ /**
+ * Reads in checksum chunks into buf
at offset
* and checksum into checksum
.
+ * Since checksums can be disabled, there are two cases implementors need
+ * to worry about:
+ *
+ * (a) needChecksum() will return false:
+ * - len can be any positive value
+ * - checksum will be null
+ * Implementors should simply pass through to the underlying data stream.
+ * or
+ * (b) needChecksum() will return true:
+ * - len >= maxChunkSize
+ * - checksum.length is a multiple of CHECKSUM_SIZE
+ * Implementors should read an integer number of data chunks into
+ * buf. The amount read should be bounded by len or by
+ * checksum.length / CHECKSUM_SIZE * maxChunkSize. Note that len may
+ * be a value that is not a multiple of maxChunkSize, in which case
+ * the implementation may return less than len.
+ *
* The method is used for implementing read, therefore, it should be optimized
- * for sequential reading
+ * for sequential reading.
+ *
* @param pos chunkPos
* @param buf desitination buffer
* @param offset offset in buf at which to store data
- * @param len maximun number of bytes to read
+ * @param len maximum number of bytes to read
+ * @param checksum the data buffer into which to write checksums
* @return number of bytes read
*/
abstract protected int readChunk(long pos, byte[] buf, int offset, int len,
@@ -96,7 +128,7 @@ abstract protected int readChunk(long pos, byte[] buf, int offset, int len,
protected synchronized boolean needChecksum() {
return verifyChecksum && sum != null;
}
-
+
/**
* Read one checksum-verified byte
*
@@ -173,7 +205,7 @@ public synchronized int read(byte[] b, int off, int len) throws IOException {
private void fill( ) throws IOException {
assert(pos>=count);
// fill internal buffer
- count = readChecksumChunk(buf, 0, buf.length);
+ count = readChecksumChunk(buf, 0, maxChunkSize);
if (count < 0) count = 0;
}
@@ -185,13 +217,13 @@ private int read1(byte b[], int off, int len)
throws IOException {
int avail = count-pos;
if( avail <= 0 ) {
- if(len>=buf.length) {
+ if(len >= maxChunkSize) {
// read a chunk to user buffer directly; avoid one copy
int nread = readChecksumChunk(b, off, len);
return nread;
} else {
// read a chunk into the local buffer
- fill();
+ fill();
if( count <= 0 ) {
return -1;
} else {
@@ -207,10 +239,10 @@ private int read1(byte b[], int off, int len)
return cnt;
}
- /* Read up one checksum chunk to array b at pos off
- * It requires a checksum chunk boundary
+ /* Read up one or more checksum chunk to array b at pos off
+ * It requires at least one checksum chunk boundary
* in between
- * and it stops reading at the boundary or at the end of the stream;
+ * and it stops reading at the last boundary or at the end of the stream;
* Otherwise an IllegalArgumentException is thrown.
* This makes sure that all data read are checksum verified.
*
@@ -223,7 +255,7 @@ private int read1(byte b[], int off, int len)
* the stream has been reached.
* @throws IOException if an I/O error occurs.
*/
- private int readChecksumChunk(byte b[], int off, int len)
+ private int readChecksumChunk(byte b[], final int off, final int len)
throws IOException {
// invalidate buffer
count = pos = 0;
@@ -236,13 +268,12 @@ private int readChecksumChunk(byte b[], int off, int len)
try {
read = readChunk(chunkPos, b, off, len, checksum);
- if( read > 0 ) {
+ if( read > 0) {
if( needChecksum() ) {
- sum.update(b, off, read);
- verifySum(chunkPos);
+ verifySums(b, off, read);
}
chunkPos += read;
- }
+ }
retry = false;
} catch (ChecksumException ce) {
LOG.info("Found checksum error: b[" + off + ", " + (off+read) + "]="
@@ -266,26 +297,38 @@ private int readChecksumChunk(byte b[], int off, int len)
} while (retry);
return read;
}
-
- /* verify checksum for the chunk.
- * @throws ChecksumException if there is a mismatch
- */
- private void verifySum(long errPos) throws ChecksumException {
- long crc = getChecksum();
- long sumValue = sum.getValue();
- sum.reset();
- if (crc != sumValue) {
- throw new ChecksumException(
- "Checksum error: "+file+" at "+errPos, errPos);
+
+ private void verifySums(final byte b[], final int off, int read)
+ throws ChecksumException
+ {
+ int leftToVerify = read;
+ int verifyOff = 0;
+ checksumInts.rewind();
+ checksumInts.limit((read - 1)/maxChunkSize + 1);
+
+ while (leftToVerify > 0) {
+ sum.update(b, off + verifyOff, Math.min(leftToVerify, maxChunkSize));
+ int expected = checksumInts.get();
+ int calculated = (int)sum.getValue();
+ sum.reset();
+
+ if (expected != calculated) {
+ long errPos = chunkPos + verifyOff;
+ throw new ChecksumException(
+ "Checksum error: "+file+" at "+ errPos +
+ " exp: " + expected + " got: " + calculated, errPos);
+ }
+ leftToVerify -= maxChunkSize;
+ verifyOff += maxChunkSize;
}
}
-
- /* calculate checksum value */
- private long getChecksum() {
- return checksum2long(checksum);
- }
- /** Convert a checksum byte array to a long */
+ /**
+ * Convert a checksum byte array to a long
+ * This is deprecated since 0.22 since it is no longer in use
+ * by this class.
+ */
+ @Deprecated
static public long checksum2long(byte[] checksum) {
long crc = 0L;
for(int i=0; i