HDFS-6865. Byte array native checksumming on client side. Contributed by James Thomas.

This commit is contained in:
Todd Lipcon 2014-08-28 16:44:09 -07:00
parent 48aa3b7274
commit ab638e77b8
11 changed files with 108 additions and 72 deletions

View File

@ -381,7 +381,8 @@ public ChecksumFSOutputSummer(ChecksumFileSystem fs,
long blockSize,
Progressable progress)
throws IOException {
super(DataChecksum.newCrc32(), fs.getBytesPerSum(), 4);
super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
fs.getBytesPerSum()));
int bytesPerSum = fs.getBytesPerSum();
this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize,
replication, blockSize, progress);
@ -405,10 +406,11 @@ public void close() throws IOException {
}
@Override
protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
int ckoff, int cklen)
throws IOException {
datas.write(b, offset, len);
sums.write(checksum);
sums.write(checksum, ckoff, cklen);
}
@Override

View File

@ -337,7 +337,8 @@ public ChecksumFSOutputSummer(final ChecksumFs fs, final Path file,
final short replication, final long blockSize,
final Progressable progress, final ChecksumOpt checksumOpt,
final boolean createParent) throws IOException {
super(DataChecksum.newCrc32(), fs.getBytesPerSum(), 4);
super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
fs.getBytesPerSum()));
// checksumOpt is passed down to the raw fs. Unless it implements
// checksum impelemts internally, checksumOpt will be ignored.
@ -370,10 +371,11 @@ public void close() throws IOException {
}
@Override
protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
int ckoff, int cklen)
throws IOException {
datas.write(b, offset, len);
sums.write(checksum);
sums.write(checksum, ckoff, cklen);
}
@Override

View File

@ -18,13 +18,14 @@
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.DataChecksum;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This is a generic output stream for generating checksums for
* data before it is written to the underlying stream
@ -33,7 +34,7 @@
@InterfaceStability.Unstable
abstract public class FSOutputSummer extends OutputStream {
// data checksum
private Checksum sum;
private final DataChecksum sum;
// internal buffer for storing data before it is checksumed
private byte buf[];
// internal buffer for storing checksum
@ -41,18 +42,24 @@ abstract public class FSOutputSummer extends OutputStream {
// The number of valid bytes in the buffer.
private int count;
protected FSOutputSummer(Checksum sum, int maxChunkSize, int checksumSize) {
// We want this value to be a multiple of 3 because the native code checksums
// 3 chunks simultaneously. The chosen value of 9 strikes a balance between
// limiting the number of JNI calls and flushing to the underlying stream
// relatively frequently.
private static final int BUFFER_NUM_CHUNKS = 9;
protected FSOutputSummer(DataChecksum sum) {
this.sum = sum;
this.buf = new byte[maxChunkSize];
this.checksum = new byte[checksumSize];
this.buf = new byte[sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS];
this.checksum = new byte[sum.getChecksumSize() * BUFFER_NUM_CHUNKS];
this.count = 0;
}
/* write the data chunk in <code>b</code> staring at <code>offset</code> with
* a length of <code>len</code>, and its checksum
* a length of <code>len > 0</code>, and its checksum
*/
protected abstract void writeChunk(byte[] b, int offset, int len, byte[] checksum)
throws IOException;
protected abstract void writeChunk(byte[] b, int bOffset, int bLen,
byte[] checksum, int checksumOffset, int checksumLen) throws IOException;
/**
* Check if the implementing OutputStream is closed and should no longer
@ -66,7 +73,6 @@ protected abstract void writeChunk(byte[] b, int offset, int len, byte[] checksu
/** Write one byte */
@Override
public synchronized void write(int b) throws IOException {
sum.update(b);
buf[count++] = (byte)b;
if(count == buf.length) {
flushBuffer();
@ -111,18 +117,17 @@ public synchronized void write(byte b[], int off, int len)
*/
private int write1(byte b[], int off, int len) throws IOException {
if(count==0 && len>=buf.length) {
// local buffer is empty and user data has one chunk
// checksum and output data
// local buffer is empty and user buffer size >= local buffer size, so
// simply checksum the user buffer and send it directly to the underlying
// stream
final int length = buf.length;
sum.update(b, off, length);
writeChecksumChunk(b, off, length, false);
writeChecksumChunks(b, off, length);
return length;
}
// copy user data to local buffer
int bytesToCopy = buf.length-count;
bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
sum.update(b, off, bytesToCopy);
System.arraycopy(b, off, buf, count, bytesToCopy);
count += bytesToCopy;
if (count == buf.length) {
@ -136,22 +141,45 @@ private int write1(byte b[], int off, int len) throws IOException {
* the underlying output stream.
*/
protected synchronized void flushBuffer() throws IOException {
flushBuffer(false);
flushBuffer(false, true);
}
/* Forces any buffered output bytes to be checksumed and written out to
* the underlying output stream. If keep is true, then the state of
* this object remains intact.
/* Forces buffered output bytes to be checksummed and written out to
* the underlying output stream. If there is a trailing partial chunk in the
* buffer,
* 1) flushPartial tells us whether to flush that chunk
* 2) if flushPartial is true, keep tells us whether to keep that chunk in the
* buffer (if flushPartial is false, it is always kept in the buffer)
*
* Returns the number of bytes that were flushed but are still left in the
* buffer (can only be non-zero if keep is true).
*/
protected synchronized void flushBuffer(boolean keep) throws IOException {
if (count != 0) {
int chunkLen = count;
protected synchronized int flushBuffer(boolean keep,
boolean flushPartial) throws IOException {
int bufLen = count;
int partialLen = bufLen % sum.getBytesPerChecksum();
int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;
if (lenToFlush != 0) {
writeChecksumChunks(buf, 0, lenToFlush);
if (!flushPartial || keep) {
count = partialLen;
System.arraycopy(buf, bufLen - count, buf, 0, count);
} else {
count = 0;
writeChecksumChunk(buf, 0, chunkLen, keep);
if (keep) {
count = chunkLen;
}
}
// total bytes left minus unflushed bytes left
return count - (bufLen - lenToFlush);
}
/**
* Checksums all complete data chunks and flushes them to the underlying
* stream. If there is a trailing partial chunk, it is not flushed and is
* maintained in the buffer.
*/
public void flush() throws IOException {
flushBuffer(false, false);
}
/**
@ -161,18 +189,18 @@ protected synchronized int getBufferedDataSize() {
return count;
}
/** Generate checksum for the data chunk and output data chunk & checksum
* to the underlying output stream. If keep is true then keep the
* current checksum intact, do not reset it.
/** Generate checksums for the given data chunks and output chunks & checksums
* to the underlying output stream.
*/
private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {
int tempChecksum = (int)sum.getValue();
if (!keep) {
sum.reset();
sum.calculateChunkedSums(b, off, len, checksum, 0);
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * sum.getChecksumSize();
writeChunk(b, off + i, chunkLen, checksum, ckOffset,
sum.getChecksumSize());
}
int2byte(tempChecksum, checksum);
writeChunk(b, off, len, checksum);
}
/**
@ -196,9 +224,14 @@ static byte[] int2byte(int integer, byte[] bytes) {
/**
* Resets existing buffer with a new one of the specified size.
*/
protected synchronized void resetChecksumChunk(int size) {
sum.reset();
protected synchronized void setChecksumBufSize(int size) {
this.buf = new byte[size];
this.checksum = new byte[((size - 1) / sum.getBytesPerChecksum() + 1) *
sum.getChecksumSize()];
this.count = 0;
}
protected synchronized void resetChecksumBufSize() {
setChecksumBufSize(sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS);
}
}

View File

@ -339,6 +339,7 @@ private void verifyChunkedSums(
byte[] data, int dataOff, int dataLen,
byte[] checksums, int checksumsOff, String fileName,
long basePos) throws ChecksumException {
if (type.size == 0) return;
if (NativeCrc32.isAvailable()) {
NativeCrc32.verifyChunkedSumsByteArray(bytesPerChecksum, type.id,
@ -421,6 +422,7 @@ public void calculateChunkedSums(ByteBuffer data, ByteBuffer checksums) {
public void calculateChunkedSums(
byte[] data, int dataOffset, int dataLength,
byte[] sums, int sumsOffset) {
if (type.size == 0) return;
if (NativeCrc32.isAvailable()) {
NativeCrc32.calculateChunkedSumsByteArray(bytesPerChecksum, type.id,

View File

@ -42,7 +42,7 @@ public static boolean isAvailable() {
* modified.
*
* @param bytesPerSum the chunk size (eg 512 bytes)
* @param checksumType the DataChecksum type constant
* @param checksumType the DataChecksum type constant (NULL is not supported)
* @param sums the DirectByteBuffer pointing at the beginning of the
* stored checksums
* @param data the DirectByteBuffer pointing at the beginning of the

View File

@ -434,6 +434,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6773. MiniDFSCluster should skip edit log fsync by default (Stephen
Chu via Colin Patrick McCabe)
HDFS-6865. Byte array native checksumming on client side
(James Thomas via todd)
BUG FIXES
HDFS-6823. dfs.web.authentication.kerberos.principal shows up in logs for

View File

@ -398,7 +398,7 @@ private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
// one chunk that fills up the partial chunk.
//
computePacketChunkSize(0, freeInCksum);
resetChecksumChunk(freeInCksum);
setChecksumBufSize(freeInCksum);
appendChunk = true;
} else {
// if the remaining space in the block is smaller than
@ -1563,7 +1563,7 @@ public synchronized DatanodeInfo[] getPipeline() {
private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
HdfsFileStatus stat, DataChecksum checksum) throws IOException {
super(checksum, checksum.getBytesPerChecksum(), checksum.getChecksumSize());
super(checksum);
this.dfsClient = dfsClient;
this.src = src;
this.fileId = stat.getFileId();
@ -1717,22 +1717,21 @@ private void waitAndQueueCurrentPacket() throws IOException {
// @see FSOutputSummer#writeChunk()
@Override
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)
throws IOException {
protected synchronized void writeChunk(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
dfsClient.checkOpen();
checkClosed();
int cklen = checksum.length;
int bytesPerChecksum = this.checksum.getBytesPerChecksum();
if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len +
" is larger than supported bytesPerChecksum " +
bytesPerChecksum);
}
if (checksum.length != this.checksum.getChecksumSize()) {
if (cklen != this.checksum.getChecksumSize()) {
throw new IOException("writeChunk() checksum size is supposed to be " +
this.checksum.getChecksumSize() +
" but found to be " + checksum.length);
" but found to be " + cklen);
}
if (currentPacket == null) {
@ -1748,7 +1747,7 @@ protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] che
}
}
currentPacket.writeChecksum(checksum, 0, cklen);
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.numChunks++;
bytesCurBlock += len;
@ -1772,7 +1771,7 @@ protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] che
// crc chunks from now on.
if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
appendChunk = false;
resetChecksumChunk(bytesPerChecksum);
resetChecksumBufSize();
}
if (!appendChunk) {
@ -1853,20 +1852,13 @@ private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
long lastBlockLength = -1L;
boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);
synchronized (this) {
/* Record current blockOffset. This might be changed inside
* flushBuffer() where a partial checksum chunk might be flushed.
* After the flush, reset the bytesCurBlock back to its previous value,
* any partial checksum chunk will be sent now and in next packet.
*/
long saveOffset = bytesCurBlock;
Packet oldCurrentPacket = currentPacket;
// flush checksum buffer, but keep checksum buffer intact
flushBuffer(true);
int numKept = flushBuffer(true, true);
// bytesCurBlock potentially incremented if there was buffered data
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug(
"DFSClient flush() : saveOffset " + saveOffset +
"DFSClient flush() :" +
" bytesCurBlock " + bytesCurBlock +
" lastFlushOffset " + lastFlushOffset);
}
@ -1883,14 +1875,6 @@ private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
}
} else {
// We already flushed up to this offset.
// This means that we haven't written anything since the last flush
// (or the beginning of the file). Hence, we should not have any
// packet queued prior to this call, since the last flush set
// currentPacket = null.
assert oldCurrentPacket == null :
"Empty flush should not occur with a currentPacket";
if (isSync && bytesCurBlock > 0) {
// Nothing to send right now,
// and the block was partially written,
@ -1910,7 +1894,7 @@ private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
// Restore state of stream. Record the last flush offset
// of the last full chunk that was flushed.
//
bytesCurBlock = saveOffset;
bytesCurBlock -= numKept;
toWaitFor = lastQueuedSeqno;
} // end synchronized

View File

@ -261,7 +261,9 @@ public void testComplexFlush() throws IOException {
start += 29;
}
stm.write(fileContents, start, AppendTestUtil.FILE_SIZE -start);
// need to make sure we completely write out all full blocks before
// the checkFile() call (see FSOutputSummer#flush)
stm.flush();
// verify that full blocks are sane
checkFile(fs, file1, 1);
stm.close();

View File

@ -394,6 +394,8 @@ public void testBlockTokenInLastLocatedBlock() throws IOException,
Path filePath = new Path(fileName);
FSDataOutputStream out = fs.create(filePath, (short) 1);
out.write(new byte[1000]);
// ensure that the first block is written out (see FSOutputSummer#flush)
out.flush();
LocatedBlocks locatedBlocks = cluster.getNameNodeRpc().getBlockLocations(
fileName, 0, 1000);
while (locatedBlocks.getLastLocatedBlock() == null) {

View File

@ -70,6 +70,9 @@ void writeFile(Path file, FSDataOutputStream stm, int size)
long blocksBefore = stm.getPos() / BLOCK_SIZE;
TestFileCreation.writeFile(stm, BLOCK_SIZE);
// need to make sure the full block is completely flushed to the DataNodes
// (see FSOutputSummer#flush)
stm.flush();
int blocksAfter = 0;
// wait until the block is allocated by DataStreamer
BlockLocation[] locatedBlocks;

View File

@ -141,6 +141,9 @@ private FSDataOutputStream writeIncompleteFile(FileSystem fileSys, Path name,
Random rand = new Random(seed);
rand.nextBytes(buffer);
stm.write(buffer);
// need to make sure that we actually write out both file blocks
// (see FSOutputSummer#flush)
stm.flush();
// Do not close stream, return it
// so that it is not garbage collected
return stm;