HDFS-4660. Block corruption can happen during pipeline recovery. Contributed by Kihwal Lee.

This commit is contained in:
Kihwal Lee 2015-06-16 15:39:46 -05:00
parent b578807b99
commit c74517c46b
2 changed files with 97 additions and 33 deletions

View File

@ -1026,6 +1026,8 @@ Release 2.7.1 - UNRELEASED
HDFS-8597. Fix TestFSImage#testZeroBlockSize on Windows. (Xiaoyu Yao) HDFS-8597. Fix TestFSImage#testZeroBlockSize on Windows. (Xiaoyu Yao)
HDFS-4660. Block corruption can happen during pipeline recovery (kihwal)
Release 2.7.0 - 2015-04-20 Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -588,29 +588,59 @@ private int receivePacket() throws IOException {
try { try {
long onDiskLen = replicaInfo.getBytesOnDisk(); long onDiskLen = replicaInfo.getBytesOnDisk();
if (onDiskLen<offsetInBlock) { if (onDiskLen<offsetInBlock) {
//finally write to the disk : // Normally the beginning of an incoming packet is aligned with the
// existing data on disk. If the beginning packet data offset is not
// checksum chunk aligned, the end of packet will not go beyond the
// next chunk boundary.
// When a failure-recovery is involved, the client state and the
// the datanode state may not exactly agree. I.e. the client may
// resend part of data that is already on disk. Correct number of
// bytes should be skipped when writing the data and checksum
// buffers out to disk.
long partialChunkSizeOnDisk = onDiskLen % bytesPerChecksum;
boolean alignedOnDisk = partialChunkSizeOnDisk == 0;
boolean alignedInPacket = firstByteInBlock % bytesPerChecksum == 0;
if (onDiskLen % bytesPerChecksum != 0) { // Since data is always appended, not overwritten, partial CRC
// prepare to overwrite last checksum // recalculation is necessary if the on-disk data is not chunk-
adjustCrcFilePosition(); // aligned, regardless of whether the beginning of the data in
// the packet is chunk-aligned.
boolean doPartialCrc = !alignedOnDisk && !shouldNotWriteChecksum;
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. If the starting offset is not chunk
// aligned, the packet should terminate at or before the next
// chunk boundary.
if (!alignedInPacket && len > bytesPerChecksum) {
throw new IOException("Unexpected packet data length for "
+ block + " from " + inAddr + ": a partial chunk must be "
+ " sent in an individual packet (data length = " + len
+ " > bytesPerChecksum = " + bytesPerChecksum + ")");
} }
// If this is a partial chunk, then read in pre-existing checksum // If the last portion of the block file is not a full chunk,
// then read in pre-existing partial data chunk and recalculate
// the checksum so that the checksum calculation can continue
// from the right state.
Checksum partialCrc = null; Checksum partialCrc = null;
if (!shouldNotWriteChecksum && firstByteInBlock % bytesPerChecksum != 0) { if (doPartialCrc) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("receivePacket for " + block LOG.debug("receivePacket for " + block
+ ": bytesPerChecksum=" + bytesPerChecksum + ": previous write did not end at the chunk boundary."
+ " does not divide firstByteInBlock=" + firstByteInBlock); + " onDiskLen=" + onDiskLen);
} }
long offsetInChecksum = BlockMetadataHeader.getHeaderSize() + long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
onDiskLen / bytesPerChecksum * checksumSize; onDiskLen / bytesPerChecksum * checksumSize;
partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum); partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum);
} }
// The data buffer position where write will begin. If the packet
// data and on-disk data have no overlap, this will not be at the
// beginning of the buffer.
int startByteToDisk = (int)(onDiskLen-firstByteInBlock) int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
+ dataBuf.arrayOffset() + dataBuf.position(); + dataBuf.arrayOffset() + dataBuf.position();
// Actual number of data bytes to write.
int numBytesToDisk = (int)(offsetInBlock-onDiskLen); int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
// Write data to disk. // Write data to disk.
@ -625,31 +655,63 @@ private int receivePacket() throws IOException {
final byte[] lastCrc; final byte[] lastCrc;
if (shouldNotWriteChecksum) { if (shouldNotWriteChecksum) {
lastCrc = null; lastCrc = null;
} else if (partialCrc != null) { } else {
// If this is a partial chunk, then verify that this is the only int skip = 0;
// chunk in the packet. Calculate new crc for this chunk. byte[] crcBytes = null;
if (len > bytesPerChecksum) {
throw new IOException("Unexpected packet data length for " // First, overwrite the partial crc at the end, if necessary.
+ block + " from " + inAddr + ": a partial chunk must be " if (doPartialCrc) { // not chunk-aligned on disk
+ " sent in an individual packet (data length = " + len // Calculate new crc for this chunk.
+ " > bytesPerChecksum = " + bytesPerChecksum + ")"); int bytesToReadForRecalc =
(int)(bytesPerChecksum - partialChunkSizeOnDisk);
if (numBytesToDisk < bytesToReadForRecalc) {
bytesToReadForRecalc = numBytesToDisk;
} }
partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize); partialCrc.update(dataBuf.array(), startByteToDisk,
lastCrc = copyLastChunkChecksum(buf, checksumSize, buf.length); bytesToReadForRecalc);
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc,
checksumSize);
crcBytes = copyLastChunkChecksum(buf, checksumSize, buf.length);
// prepare to overwrite last checksum
adjustCrcFilePosition();
checksumOut.write(buf); checksumOut.write(buf);
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Writing out partial crc for data len " + len); LOG.debug("Writing out partial crc for data len " + len +
", skip=" + skip);
} }
partialCrc = null; skip++; // For the partial chunk that was just read.
} else { }
// write checksum
// Determine how many checksums need to be skipped up to the last
// boundary. The checksum after the boundary was already counted
// above. Only count the number of checksums skipped up to the
// boundary here.
long lastChunkBoundary = onDiskLen - (onDiskLen%bytesPerChecksum);
long skippedDataBytes = lastChunkBoundary - firstByteInBlock;
if (skippedDataBytes > 0) {
skip += (int)(skippedDataBytes / bytesPerChecksum) +
((skippedDataBytes % bytesPerChecksum == 0) ? 0 : 1);
}
skip *= checksumSize; // Convert to number of bytes
// write the rest of checksum
final int offset = checksumBuf.arrayOffset() + final int offset = checksumBuf.arrayOffset() +
checksumBuf.position(); checksumBuf.position() + skip;
final int end = offset + checksumLen; final int end = offset + checksumLen - skip;
lastCrc = copyLastChunkChecksum(checksumBuf.array(), checksumSize, // If offset > end, there is no more checksum to write.
end); // I.e. a partial chunk checksum rewrite happened and there is no
checksumOut.write(checksumBuf.array(), offset, checksumLen); // more to write after that.
if (offset > end) {
assert crcBytes != null;
lastCrc = crcBytes;
} else {
final int remainingBytes = checksumLen - skip;
lastCrc = copyLastChunkChecksum(checksumBuf.array(),
checksumSize, end);
checksumOut.write(checksumBuf.array(), offset, remainingBytes);
}
} }
/// flush entire packet, sync if requested /// flush entire packet, sync if requested