HADOOP-7444. Add Checksum API to verify and calculate checksums "in bulk". Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1146111 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-07-13 16:12:42 +00:00
parent 654705b882
commit 22fa0e43e3
3 changed files with 284 additions and 0 deletions

View File

@ -259,6 +259,9 @@ Trunk (unreleased changes)
HADOOP-7457. Remove out-of-date Chinese language documentation.
(Jakob Homan via eli)
HADOOP-7444. Add Checksum API to verify and calculate checksums "in bulk"
(todd)
OPTIMIZATIONS
HADOOP-7333. Performance improvement in PureJavaCrc32. (Eric Caspole

View File

@ -21,10 +21,12 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
/**
* This class provides inteface and utilities for processing checksums for
@ -233,6 +235,157 @@ public void update( int b ) {
inSum += 1;
}
/**
* Verify that the given checksums match the given data.
*
* The 'mark' of the ByteBuffer parameters may be modified by this function,.
* but the position is maintained.
*
* @param data the DirectByteBuffer pointing to the data to verify.
* @param checksums the DirectByteBuffer pointing to a series of stored
* checksums
* @param fileName the name of the file being read, for error-reporting
* @param basePos the file position to which the start of 'data' corresponds
* @throws ChecksumException if the checksums do not match
*/
public void verifyChunkedSums(ByteBuffer data, ByteBuffer checksums,
String fileName, long basePos)
throws ChecksumException {
if (size == 0) return;
if (data.hasArray() && checksums.hasArray()) {
verifyChunkedSums(
data.array(), data.arrayOffset() + data.position(), data.remaining(),
checksums.array(), checksums.arrayOffset() + checksums.position(),
fileName, basePos);
return;
}
int startDataPos = data.position();
data.mark();
checksums.mark();
try {
byte[] buf = new byte[bytesPerChecksum];
byte[] sum = new byte[size];
while (data.remaining() > 0) {
int n = Math.min(data.remaining(), bytesPerChecksum);
checksums.get(sum);
data.get(buf, 0, n);
summer.reset();
summer.update(buf, 0, n);
int calculated = (int)summer.getValue();
int stored = (sum[0] << 24 & 0xff000000) |
(sum[1] << 16 & 0xff0000) |
(sum[2] << 8 & 0xff00) |
sum[3] & 0xff;
if (calculated != stored) {
long errPos = basePos + data.position() - startDataPos - n;
throw new ChecksumException(
"Checksum error: "+ fileName + " at "+ errPos +
" exp: " + stored + " got: " + calculated, errPos);
}
}
} finally {
data.reset();
checksums.reset();
}
}
/**
* Implementation of chunked verification specifically on byte arrays. This
* is to avoid the copy when dealing with ByteBuffers that have array backing.
*/
private void verifyChunkedSums(
byte[] data, int dataOff, int dataLen,
byte[] checksums, int checksumsOff, String fileName,
long basePos) throws ChecksumException {
int remaining = dataLen;
int dataPos = 0;
while (remaining > 0) {
int n = Math.min(remaining, bytesPerChecksum);
summer.reset();
summer.update(data, dataOff + dataPos, n);
dataPos += n;
remaining -= n;
int calculated = (int)summer.getValue();
int stored = (checksums[checksumsOff] << 24 & 0xff000000) |
(checksums[checksumsOff + 1] << 16 & 0xff0000) |
(checksums[checksumsOff + 2] << 8 & 0xff00) |
checksums[checksumsOff + 3] & 0xff;
checksumsOff += 4;
if (calculated != stored) {
long errPos = basePos + dataPos - n;
throw new ChecksumException(
"Checksum error: "+ fileName + " at "+ errPos +
" exp: " + stored + " got: " + calculated, errPos);
}
}
}
/**
* Calculate checksums for the given data.
*
* The 'mark' of the ByteBuffer parameters may be modified by this function,
* but the position is maintained.
*
* @param data the DirectByteBuffer pointing to the data to checksum.
* @param checksums the DirectByteBuffer into which checksums will be
* stored. Enough space must be available in this
* buffer to put the checksums.
*/
public void calculateChunkedSums(ByteBuffer data, ByteBuffer checksums) {
if (size == 0) return;
if (data.hasArray() && checksums.hasArray()) {
calculateChunkedSums(data.array(), data.arrayOffset() + data.position(), data.remaining(),
checksums.array(), checksums.arrayOffset() + checksums.position());
return;
}
data.mark();
checksums.mark();
try {
byte[] buf = new byte[bytesPerChecksum];
while (data.remaining() > 0) {
int n = Math.min(data.remaining(), bytesPerChecksum);
data.get(buf, 0, n);
summer.reset();
summer.update(buf, 0, n);
checksums.putInt((int)summer.getValue());
}
} finally {
data.reset();
checksums.reset();
}
}
/**
* Implementation of chunked calculation specifically on byte arrays. This
* is to avoid the copy when dealing with ByteBuffers that have array backing.
*/
private void calculateChunkedSums(
byte[] data, int dataOffset, int dataLength,
byte[] sums, int sumsOffset) {
int remaining = dataLength;
while (remaining > 0) {
int n = Math.min(remaining, bytesPerChecksum);
summer.reset();
summer.update(data, dataOffset, n);
dataOffset += n;
remaining -= n;
long calculated = summer.getValue();
sums[sumsOffset++] = (byte) (calculated >> 24);
sums[sumsOffset++] = (byte) (calculated >> 16);
sums[sumsOffset++] = (byte) (calculated >> 8);
sums[sumsOffset++] = (byte) (calculated);
}
}
/**
* This just provides a dummy implimentation for Checksum class
* This is used when there is no checksum available or required for

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.hadoop.fs.ChecksumException;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestDataChecksum {
// Set up buffers that have some header and trailer before the
// actual data or checksums, to make sure the code handles
// buffer.position(), limit, etc correctly.
private static final int SUMS_OFFSET_IN_BUFFER = 3;
private static final int DATA_OFFSET_IN_BUFFER = 3;
private static final int DATA_TRAILER_IN_BUFFER = 3;
private static final int BYTES_PER_CHUNK = 512;
private static final DataChecksum checksum =
DataChecksum.newDataChecksum(
DataChecksum.CHECKSUM_CRC32, BYTES_PER_CHUNK);
@Test
public void testBulkOps() throws Exception {
for (boolean useDirect : new boolean[]{false, true}) {
doBulkTest(1023, useDirect);
doBulkTest(1024, useDirect);
doBulkTest(1025, useDirect);
}
}
private void doBulkTest(int dataLength, boolean useDirect)
throws Exception {
System.err.println("Testing bulk checksums of length " +
dataLength + " with " +
(useDirect ? "direct" : "array-backed") + " buffers");
int numSums = (dataLength - 1)/checksum.getBytesPerChecksum() + 1;
int sumsLength = numSums * checksum.getChecksumSize();
byte data[] = new byte[dataLength +
DATA_OFFSET_IN_BUFFER +
DATA_TRAILER_IN_BUFFER];
new Random().nextBytes(data);
ByteBuffer dataBuf = ByteBuffer.wrap(
data, DATA_OFFSET_IN_BUFFER, dataLength);
byte checksums[] = new byte[SUMS_OFFSET_IN_BUFFER + sumsLength];
ByteBuffer checksumBuf = ByteBuffer.wrap(
checksums, SUMS_OFFSET_IN_BUFFER, sumsLength);
// Swap out for direct buffers if requested.
if (useDirect) {
dataBuf = directify(dataBuf);
checksumBuf = directify(checksumBuf);
}
// calculate real checksum, make sure it passes
checksum.calculateChunkedSums(dataBuf, checksumBuf);
checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
// Change a byte in the header and in the trailer, make sure
// it doesn't affect checksum result
corruptBufferOffset(checksumBuf, 0);
checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
corruptBufferOffset(dataBuf, 0);
dataBuf.limit(dataBuf.limit() + 1);
corruptBufferOffset(dataBuf, dataLength + DATA_OFFSET_IN_BUFFER);
dataBuf.limit(dataBuf.limit() - 1);
checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
// Make sure bad checksums fail - error at beginning of array
corruptBufferOffset(checksumBuf, SUMS_OFFSET_IN_BUFFER);
try {
checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
fail("Did not throw on bad checksums");
} catch (ChecksumException ce) {
assertEquals(0, ce.getPos());
}
// Make sure bad checksums fail - error at end of array
uncorruptBufferOffset(checksumBuf, SUMS_OFFSET_IN_BUFFER);
corruptBufferOffset(checksumBuf, SUMS_OFFSET_IN_BUFFER + sumsLength - 1);
try {
checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
fail("Did not throw on bad checksums");
} catch (ChecksumException ce) {
int expectedPos = checksum.getBytesPerChecksum() * (numSums - 1);
assertEquals(expectedPos, ce.getPos());
}
}
private static void corruptBufferOffset(ByteBuffer buf, int offset) {
buf.put(offset, (byte)(buf.get(offset) + 1));
}
private static void uncorruptBufferOffset(ByteBuffer buf, int offset) {
buf.put(offset, (byte)(buf.get(offset) - 1));
}
private static ByteBuffer directify(ByteBuffer dataBuf) {
ByteBuffer newBuf = ByteBuffer.allocateDirect(dataBuf.capacity());
newBuf.position(dataBuf.position());
newBuf.mark();
newBuf.put(dataBuf);
newBuf.reset();
newBuf.limit(dataBuf.limit());
return newBuf;
}
}