HDFS-3721. hsync support broke wire compatibility. Contributed by Todd Lipcon and Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1371495 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-08-09 21:31:12 +00:00
parent 82910ecaa3
commit 9ea7c06468
7 changed files with 557 additions and 447 deletions

View File

@ -579,6 +579,8 @@ Branch-2 ( Unreleased changes )
HDFS-3710. libhdfs misuses O_RDONLY/WRONLY/RDWR. (Andy Isaacson via atm) HDFS-3710. libhdfs misuses O_RDONLY/WRONLY/RDWR. (Andy Isaacson via atm)
HDFS-3721. hsync support broke wire compatibility. (todd and atm)
BREAKDOWN OF HDFS-3042 SUBTASKS BREAKDOWN OF HDFS-3042 SUBTASKS
HDFS-2185. HDFS portion of ZK-based FailoverController (todd) HDFS-2185. HDFS portion of ZK-based FailoverController (todd)

View File

@ -30,7 +30,6 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.nio.BufferOverflowException; import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
@ -126,7 +125,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
private long lastQueuedSeqno = -1; private long lastQueuedSeqno = -1;
private long lastAckedSeqno = -1; private long lastAckedSeqno = -1;
private long bytesCurBlock = 0; // bytes writen in current block private long bytesCurBlock = 0; // bytes writen in current block
private int packetSize = 0; // write packet size, including the header. private int packetSize = 0; // write packet size, not including the header.
private int chunksPerPacket = 0; private int chunksPerPacket = 0;
private volatile IOException lastException = null; private volatile IOException lastException = null;
private long artificialSlowdown = 0; private long artificialSlowdown = 0;
@ -147,28 +146,31 @@ private class Packet {
int numChunks; // number of chunks currently in packet int numChunks; // number of chunks currently in packet
int maxChunks; // max chunks in packet int maxChunks; // max chunks in packet
/** buffer for accumulating packet checksum and data */
ByteBuffer buffer; // wraps buf, only one of these two may be non-null
byte[] buf; byte[] buf;
/** /**
* buf is pointed into like follows: * buf is pointed into like follows:
* (C is checksum data, D is payload data) * (C is checksum data, D is payload data)
* *
* [HHHHHCCCCC________________DDDDDDDDDDDDDDDD___] * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
* ^ ^ ^ ^ * ^ ^ ^ ^
* | checksumPos dataStart dataPos * | checksumPos dataStart dataPos
* checksumStart * checksumStart
*
* Right before sending, we move the checksum data to immediately precede
* the actual data, and then insert the header into the buffer immediately
* preceding the checksum data, so we make sure to keep enough space in
* front of the checksum data to support the largest conceivable header.
*/ */
int checksumStart; int checksumStart;
int checksumPos;
int dataStart; int dataStart;
int dataPos; int dataPos;
int checksumPos;
private static final long HEART_BEAT_SEQNO = -1L; private static final long HEART_BEAT_SEQNO = -1L;
/** /**
* create a heartbeat packet * Create a heartbeat packet.
*/ */
Packet() { Packet() {
this.lastPacketInBlock = false; this.lastPacketInBlock = false;
@ -176,17 +178,19 @@ private class Packet {
this.offsetInBlock = 0; this.offsetInBlock = 0;
this.seqno = HEART_BEAT_SEQNO; this.seqno = HEART_BEAT_SEQNO;
buffer = null; buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
int packetSize = PacketHeader.PKT_HEADER_LEN + HdfsConstants.BYTES_IN_INTEGER;
buf = new byte[packetSize];
checksumStart = dataStart = packetSize; checksumStart = checksumPos = dataPos = dataStart = PacketHeader.PKT_MAX_HEADER_LEN;
checksumPos = checksumStart;
dataPos = dataStart;
maxChunks = 0; maxChunks = 0;
} }
// create a new packet /**
* Create a new packet.
*
* @param pktSize maximum size of the packet, including checksum data and actual data.
* @param chunksPerPkt maximum number of chunks per packet.
* @param offsetInBlock offset in bytes into the HDFS block.
*/
Packet(int pktSize, int chunksPerPkt, long offsetInBlock) { Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
this.lastPacketInBlock = false; this.lastPacketInBlock = false;
this.numChunks = 0; this.numChunks = 0;
@ -194,25 +198,24 @@ private class Packet {
this.seqno = currentSeqno; this.seqno = currentSeqno;
currentSeqno++; currentSeqno++;
buffer = null; buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
buf = new byte[pktSize];
checksumStart = PacketHeader.PKT_HEADER_LEN; checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
checksumPos = checksumStart; checksumPos = checksumStart;
dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize(); dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize());
dataPos = dataStart; dataPos = dataStart;
maxChunks = chunksPerPkt; maxChunks = chunksPerPkt;
} }
void writeData(byte[] inarray, int off, int len) { void writeData(byte[] inarray, int off, int len) {
if ( dataPos + len > buf.length) { if (dataPos + len > buf.length) {
throw new BufferOverflowException(); throw new BufferOverflowException();
} }
System.arraycopy(inarray, off, buf, dataPos, len); System.arraycopy(inarray, off, buf, dataPos, len);
dataPos += len; dataPos += len;
} }
void writeChecksum(byte[] inarray, int off, int len) { void writeChecksum(byte[] inarray, int off, int len) {
if (checksumPos + len > dataStart) { if (checksumPos + len > dataStart) {
throw new BufferOverflowException(); throw new BufferOverflowException();
} }
@ -221,45 +224,38 @@ void writeChecksum(byte[] inarray, int off, int len) {
} }
/** /**
* Returns ByteBuffer that contains one full packet, including header. * Write the full packet, including the header, to the given output stream.
*/ */
ByteBuffer getBuffer() { void writeTo(DataOutputStream stm) throws IOException {
/* Once this is called, no more data can be added to the packet. final int dataLen = dataPos - dataStart;
* setting 'buf' to null ensures that. final int checksumLen = checksumPos - checksumStart;
* This is called only when the packet is ready to be sent. final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
*/
if (buffer != null) {
return buffer;
}
//prepare the header and close any gap between checksum and data.
int dataLen = dataPos - dataStart;
int checksumLen = checksumPos - checksumStart;
if (checksumPos != dataStart) {
/* move the checksum to cover the gap.
* This can happen for the last packet.
*/
System.arraycopy(buf, checksumStart, buf,
dataStart - checksumLen , checksumLen);
}
int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
//normally dataStart == checksumPos, i.e., offset is zero.
buffer = ByteBuffer.wrap(
buf, dataStart - checksumPos,
PacketHeader.PKT_HEADER_LEN + pktLen - HdfsConstants.BYTES_IN_INTEGER);
buf = null;
buffer.mark();
PacketHeader header = new PacketHeader( PacketHeader header = new PacketHeader(
pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock); pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
header.putInBuffer(buffer);
buffer.reset(); if (checksumPos != dataStart) {
return buffer; // Move the checksum to cover the gap. This can happen for the last
// packet or during an hflush/hsync call.
System.arraycopy(buf, checksumStart, buf,
dataStart - checksumLen , checksumLen);
checksumPos = dataStart;
checksumStart = checksumPos - checksumLen;
}
final int headerStart = checksumStart - header.getSerializedSize();
assert checksumStart + 1 >= header.getSerializedSize();
assert checksumPos == dataStart;
assert headerStart >= 0;
assert headerStart + header.getSerializedSize() == checksumStart;
// Copy the header data into the buffer immediately preceding the checksum
// data.
System.arraycopy(header.getBytes(), 0, buf, headerStart,
header.getSerializedSize());
// Write the now contiguous full packet to the output stream.
stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
} }
// get the packet's last byte's offset in the block // get the packet's last byte's offset in the block
@ -502,8 +498,6 @@ public void run() {
} }
// send the packet // send the packet
ByteBuffer buf = one.getBuffer();
synchronized (dataQueue) { synchronized (dataQueue) {
// move packet from dataQueue to ackQueue // move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) { if (!one.isHeartbeatPacket()) {
@ -519,8 +513,8 @@ public void run() {
} }
// write out data to remote datanode // write out data to remote datanode
try { try {
blockStream.write(buf.array(), buf.position(), buf.remaining()); one.writeTo(blockStream);
blockStream.flush(); blockStream.flush();
} catch (IOException e) { } catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to // HDFS-3398 treat primary DN is down since client is unable to
@ -1358,9 +1352,8 @@ static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
private void computePacketChunkSize(int psize, int csize) { private void computePacketChunkSize(int psize, int csize) {
int chunkSize = csize + checksum.getChecksumSize(); int chunkSize = csize + checksum.getChecksumSize();
int n = PacketHeader.PKT_HEADER_LEN; chunksPerPacket = Math.max(psize/chunkSize, 1);
chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1); packetSize = chunkSize*chunksPerPacket;
packetSize = n + chunkSize*chunksPerPacket;
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("computePacketChunkSize: src=" + src + DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
", chunkSize=" + chunkSize + ", chunkSize=" + chunkSize +
@ -1474,8 +1467,7 @@ protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] che
// indicate the end of block and reset bytesCurBlock. // indicate the end of block and reset bytesCurBlock.
// //
if (bytesCurBlock == blockSize) { if (bytesCurBlock == blockSize) {
currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, currentPacket = new Packet(0, 0, bytesCurBlock);
bytesCurBlock);
currentPacket.lastPacketInBlock = true; currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock; currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket(); waitAndQueueCurrentPacket();
@ -1745,8 +1737,7 @@ public synchronized void close() throws IOException {
if (bytesCurBlock != 0) { if (bytesCurBlock != 0) {
// send an empty packet to mark the end of the block // send an empty packet to mark the end of the block
currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, currentPacket = new Packet(0, 0, bytesCurBlock);
bytesCurBlock);
currentPacket.lastPacketInBlock = true; currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock; currentPacket.syncBlock = shouldSyncBlock;
} }
@ -1799,8 +1790,7 @@ public void setArtificialSlowdown(long period) {
@VisibleForTesting @VisibleForTesting
public synchronized void setChunksPerPacket(int value) { public synchronized void setChunksPerPacket(int value) {
chunksPerPacket = Math.min(chunksPerPacket, value); chunksPerPacket = Math.min(chunksPerPacket, value);
packetSize = PacketHeader.PKT_HEADER_LEN + packetSize = (checksum.getBytesPerChecksum() +
(checksum.getBytesPerChecksum() +
checksum.getChecksumSize()) * chunksPerPacket; checksum.getChecksumSize()) * chunksPerPacket;
} }

View File

@ -33,12 +33,12 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
@ -48,14 +48,11 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.SocketInputWrapper; import org.apache.hadoop.net.SocketInputWrapper;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import com.google.common.base.Preconditions;
/** /**
* This is a wrapper around connection to datanode * This is a wrapper around connection to datanode
* and understands checksum, offset etc. * and understands checksum, offset etc.
@ -93,11 +90,9 @@ public class RemoteBlockReader2 implements BlockReader {
private final ReadableByteChannel in; private final ReadableByteChannel in;
private DataChecksum checksum; private DataChecksum checksum;
private PacketHeader curHeader; private PacketReceiver packetReceiver = new PacketReceiver(true);
private ByteBuffer curPacketBuf = null;
private ByteBuffer curDataSlice = null; private ByteBuffer curDataSlice = null;
/** offset in block of the last chunk received */ /** offset in block of the last chunk received */
private long lastSeqNo = -1; private long lastSeqNo = -1;
@ -105,10 +100,6 @@ public class RemoteBlockReader2 implements BlockReader {
private long startOffset; private long startOffset;
private final String filename; private final String filename;
private static DirectBufferPool bufferPool = new DirectBufferPool();
private final ByteBuffer headerBuf = ByteBuffer.allocate(
PacketHeader.PKT_HEADER_LEN);
private final int bytesPerChecksum; private final int bytesPerChecksum;
private final int checksumSize; private final int checksumSize;
@ -132,7 +123,7 @@ public class RemoteBlockReader2 implements BlockReader {
public synchronized int read(byte[] buf, int off, int len) public synchronized int read(byte[] buf, int off, int len)
throws IOException { throws IOException {
if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
readNextPacket(); readNextPacket();
} }
if (curDataSlice.remaining() == 0) { if (curDataSlice.remaining() == 0) {
@ -149,7 +140,7 @@ public synchronized int read(byte[] buf, int off, int len)
@Override @Override
public int read(ByteBuffer buf) throws IOException { public int read(ByteBuffer buf) throws IOException {
if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
readNextPacket(); readNextPacket();
} }
if (curDataSlice.remaining() == 0) { if (curDataSlice.remaining() == 0) {
@ -167,11 +158,13 @@ public int read(ByteBuffer buf) throws IOException {
} }
private void readNextPacket() throws IOException { private void readNextPacket() throws IOException {
Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
//Read packet headers. //Read packet headers.
readPacketHeader(); packetReceiver.receiveNextPacket(in);
PacketHeader curHeader = packetReceiver.getHeader();
curDataSlice = packetReceiver.getDataSlice();
assert curDataSlice.capacity() == curHeader.getDataLen();
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("DFSClient readNextPacket got header " + curHeader); LOG.trace("DFSClient readNextPacket got header " + curHeader);
} }
@ -185,17 +178,20 @@ private void readNextPacket() throws IOException {
if (curHeader.getDataLen() > 0) { if (curHeader.getDataLen() > 0) {
int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum; int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
int checksumsLen = chunks * checksumSize; int checksumsLen = chunks * checksumSize;
int bufsize = checksumsLen + curHeader.getDataLen();
assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
"checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() +
" checksumsLen=" + checksumsLen;
resetPacketBuffer(checksumsLen, curHeader.getDataLen());
lastSeqNo = curHeader.getSeqno(); lastSeqNo = curHeader.getSeqno();
if (bufsize > 0) { if (verifyChecksum && curDataSlice.remaining() > 0) {
readChannelFully(in, curPacketBuf); // N.B.: the checksum error offset reported here is actually
curPacketBuf.flip(); // relative to the start of the block, not the start of the file.
if (verifyChecksum) { // This is slightly misleading, but preserves the behavior from
verifyPacketChecksums(); // the older BlockReader.
} checksum.verifyChunkedSums(curDataSlice,
packetReceiver.getChecksumSlice(),
filename, curHeader.getOffsetInBlock());
} }
bytesNeededToFinish -= curHeader.getDataLen(); bytesNeededToFinish -= curHeader.getDataLen();
} }
@ -218,40 +214,7 @@ private void readNextPacket() throws IOException {
} }
} }
} }
private void verifyPacketChecksums() throws ChecksumException {
// N.B.: the checksum error offset reported here is actually
// relative to the start of the block, not the start of the file.
// This is slightly misleading, but preserves the behavior from
// the older BlockReader.
checksum.verifyChunkedSums(curDataSlice, curPacketBuf,
filename, curHeader.getOffsetInBlock());
}
private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
throws IOException {
while (buf.remaining() > 0) {
int n = ch.read(buf);
if (n < 0) {
throw new IOException("Premature EOF reading from " + ch);
}
}
}
private void resetPacketBuffer(int checksumsLen, int dataLen) {
int packetLen = checksumsLen + dataLen;
if (curPacketBuf == null ||
curPacketBuf.capacity() < packetLen) {
returnPacketBufToPool();
curPacketBuf = bufferPool.getBuffer(packetLen);
}
curPacketBuf.position(checksumsLen);
curDataSlice = curPacketBuf.slice();
curDataSlice.limit(dataLen);
curPacketBuf.clear();
curPacketBuf.limit(checksumsLen + dataLen);
}
@Override @Override
public synchronized long skip(long n) throws IOException { public synchronized long skip(long n) throws IOException {
/* How can we make sure we don't throw a ChecksumException, at least /* How can we make sure we don't throw a ChecksumException, at least
@ -272,23 +235,14 @@ public synchronized long skip(long n) throws IOException {
return nSkipped; return nSkipped;
} }
private void readPacketHeader() throws IOException {
headerBuf.clear();
readChannelFully(in, headerBuf);
headerBuf.flip();
if (curHeader == null) curHeader = new PacketHeader();
curHeader.readFields(headerBuf);
}
private void readTrailingEmptyPacket() throws IOException { private void readTrailingEmptyPacket() throws IOException {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Reading empty packet at end of read"); LOG.trace("Reading empty packet at end of read");
} }
headerBuf.clear();
readChannelFully(in, headerBuf); packetReceiver.receiveNextPacket(in);
headerBuf.flip();
PacketHeader trailer = new PacketHeader(); PacketHeader trailer = packetReceiver.getHeader();
trailer.readFields(headerBuf);
if (!trailer.isLastPacketInBlock() || if (!trailer.isLastPacketInBlock() ||
trailer.getDataLen() != 0) { trailer.getDataLen() != 0) {
throw new IOException("Expected empty end-of-read packet! Header: " + throw new IOException("Expected empty end-of-read packet! Header: " +
@ -321,7 +275,7 @@ protected RemoteBlockReader2(String file, String bpid, long blockId,
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
returnPacketBufToPool(); packetReceiver.close();
startOffset = -1; startOffset = -1;
checksum = null; checksum = null;
@ -332,24 +286,6 @@ public synchronized void close() throws IOException {
// in will be closed when its Socket is closed. // in will be closed when its Socket is closed.
} }
@Override
protected void finalize() throws Throwable {
try {
// just in case it didn't get closed, we
// may as well still try to return the buffer
returnPacketBufToPool();
} finally {
super.finalize();
}
}
private void returnPacketBufToPool() {
if (curPacketBuf != null) {
bufferPool.returnBuffer(curPacketBuf);
curPacketBuf = null;
}
}
/** /**
* Take the socket used to talk to the DN. * Take the socket used to talk to the DN.
*/ */

View File

@ -27,14 +27,31 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
import org.apache.hadoop.hdfs.util.ByteBufferOutputStream; import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Shorts;
import com.google.common.primitives.Ints;
import com.google.protobuf.InvalidProtocolBufferException;
/** /**
* Header data for each packet that goes through the read/write pipelines. * Header data for each packet that goes through the read/write pipelines.
* Includes all of the information about the packet, excluding checksums and
* actual data.
*
* This data includes:
* - the offset in bytes into the HDFS block of the data in this packet
* - the sequence number of this packet in the pipeline
* - whether or not this is the last packet in the pipeline
* - the length of the data in this packet
* - whether or not this packet should be synced by the DNs.
*
* When serialized, this header is written out as a protocol buffer, preceded
* by a 4-byte integer representing the full packet length, and a 2-byte short
* representing the header length.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class PacketHeader { public class PacketHeader {
/** Header size for a packet */ private static final int MAX_PROTO_SIZE =
private static final int PROTO_SIZE =
PacketHeaderProto.newBuilder() PacketHeaderProto.newBuilder()
.setOffsetInBlock(0) .setOffsetInBlock(0)
.setSeqno(0) .setSeqno(0)
@ -42,8 +59,10 @@ public class PacketHeader {
.setDataLen(0) .setDataLen(0)
.setSyncBlock(false) .setSyncBlock(false)
.build().getSerializedSize(); .build().getSerializedSize();
public static final int PKT_HEADER_LEN = public static final int PKT_LENGTHS_LEN =
6 + PROTO_SIZE; Ints.BYTES + Shorts.BYTES;
public static final int PKT_MAX_HEADER_LEN =
PKT_LENGTHS_LEN + MAX_PROTO_SIZE;
private int packetLen; private int packetLen;
private PacketHeaderProto proto; private PacketHeaderProto proto;
@ -54,13 +73,25 @@ public PacketHeader() {
public PacketHeader(int packetLen, long offsetInBlock, long seqno, public PacketHeader(int packetLen, long offsetInBlock, long seqno,
boolean lastPacketInBlock, int dataLen, boolean syncBlock) { boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
this.packetLen = packetLen; this.packetLen = packetLen;
proto = PacketHeaderProto.newBuilder() Preconditions.checkArgument(packetLen >= Ints.BYTES,
"packet len %s should always be at least 4 bytes",
packetLen);
PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder()
.setOffsetInBlock(offsetInBlock) .setOffsetInBlock(offsetInBlock)
.setSeqno(seqno) .setSeqno(seqno)
.setLastPacketInBlock(lastPacketInBlock) .setLastPacketInBlock(lastPacketInBlock)
.setDataLen(dataLen) .setDataLen(dataLen);
.setSyncBlock(syncBlock)
.build(); if (syncBlock) {
// Only set syncBlock if it is specified.
// This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721
// because it changes the length of the packet header, and BlockReceiver
// in that version did not support variable-length headers.
builder.setSyncBlock(syncBlock);
}
proto = builder.build();
} }
public int getDataLen() { public int getDataLen() {
@ -90,10 +121,16 @@ public boolean getSyncBlock() {
@Override @Override
public String toString() { public String toString() {
return "PacketHeader with packetLen=" + packetLen + return "PacketHeader with packetLen=" + packetLen +
"Header data: " + " header data: " +
proto.toString(); proto.toString();
} }
public void setFieldsFromData(
int packetLen, byte[] headerData) throws InvalidProtocolBufferException {
this.packetLen = packetLen;
proto = PacketHeaderProto.parseFrom(headerData);
}
public void readFields(ByteBuffer buf) throws IOException { public void readFields(ByteBuffer buf) throws IOException {
packetLen = buf.getInt(); packetLen = buf.getInt();
short protoLen = buf.getShort(); short protoLen = buf.getShort();
@ -110,14 +147,21 @@ public void readFields(DataInputStream in) throws IOException {
proto = PacketHeaderProto.parseFrom(data); proto = PacketHeaderProto.parseFrom(data);
} }
/**
* @return the number of bytes necessary to write out this header,
* including the length-prefixing of the payload and header
*/
public int getSerializedSize() {
return PKT_LENGTHS_LEN + proto.getSerializedSize();
}
/** /**
* Write the header into the buffer. * Write the header into the buffer.
* This requires that PKT_HEADER_LEN bytes are available. * This requires that PKT_HEADER_LEN bytes are available.
*/ */
public void putInBuffer(final ByteBuffer buf) { public void putInBuffer(final ByteBuffer buf) {
assert proto.getSerializedSize() == PROTO_SIZE assert proto.getSerializedSize() <= MAX_PROTO_SIZE
: "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize(); : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
try { try {
buf.putInt(packetLen); buf.putInt(packetLen);
buf.putShort((short) proto.getSerializedSize()); buf.putShort((short) proto.getSerializedSize());
@ -128,12 +172,18 @@ public void putInBuffer(final ByteBuffer buf) {
} }
public void write(DataOutputStream out) throws IOException { public void write(DataOutputStream out) throws IOException {
assert proto.getSerializedSize() == PROTO_SIZE assert proto.getSerializedSize() <= MAX_PROTO_SIZE
: "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize(); : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
out.writeInt(packetLen); out.writeInt(packetLen);
out.writeShort(proto.getSerializedSize()); out.writeShort(proto.getSerializedSize());
proto.writeTo(out); proto.writeTo(out);
} }
public byte[] getBytes() {
ByteBuffer buf = ByteBuffer.allocate(getSerializedSize());
putInBuffer(buf);
return buf.array();
}
/** /**
* Perform a sanity check on the packet, returning true if it is sane. * Perform a sanity check on the packet, returning true if it is sane.

View File

@ -0,0 +1,292 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
/**
* Class to handle reading packets one-at-a-time from the wire.
* These packets are used both for reading and writing data to/from
* DataNodes.
*/
@InterfaceAudience.Private
public class PacketReceiver implements Closeable {
/**
* The max size of any single packet. This prevents OOMEs when
* invalid data is sent.
*/
private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
static Log LOG = LogFactory.getLog(PacketReceiver.class);
private static final DirectBufferPool bufferPool = new DirectBufferPool();
private final boolean useDirectBuffers;
/**
* Internal buffer for reading the length prefixes at the start of
* the packet.
*/
private final ByteBuffer lengthPrefixBuf = ByteBuffer.allocate(
PacketHeader.PKT_LENGTHS_LEN);
/**
* The entirety of the most recently read packet, excepting the
* length prefixes.
*/
private ByteBuffer curPacketBuf = null;
/**
* A slice of {@link #curPacketBuf} which contains just the checksums.
*/
private ByteBuffer curChecksumSlice = null;
/**
* A slice of {@link #curPacketBuf} which contains just the data.
*/
private ByteBuffer curDataSlice = null;
/**
* The packet header of the most recently read packet.
*/
private PacketHeader curHeader;
public PacketReceiver(boolean useDirectBuffers) {
this.useDirectBuffers = useDirectBuffers;
}
public PacketHeader getHeader() {
return curHeader;
}
public ByteBuffer getDataSlice() {
return curDataSlice;
}
public ByteBuffer getChecksumSlice() {
return curChecksumSlice;
}
/**
* Reads all of the data for the next packet into the appropriate buffers.
*
* The data slice and checksum slice members will be set to point to the
* user data and corresponding checksums. The header will be parsed and
* set.
*/
public void receiveNextPacket(ReadableByteChannel in) throws IOException {
doRead(in, null);
}
/**
* @see #receiveNextPacket(ReadableByteChannel)
*/
public void receiveNextPacket(InputStream in) throws IOException {
doRead(null, in);
}
private void doRead(ReadableByteChannel ch, InputStream in)
throws IOException {
// Each packet looks like:
// PLEN HLEN HEADER CHECKSUMS DATA
// 32-bit 16-bit <protobuf> <variable length>
//
// PLEN: Payload length
// = length(PLEN) + length(CHECKSUMS) + length(DATA)
// This length includes its own encoded length in
// the sum for historical reasons.
//
// HLEN: Header length
// = length(HEADER)
//
// HEADER: the actual packet header fields, encoded in protobuf
// CHECKSUMS: the crcs for the data chunk. May be missing if
// checksums were not requested
// DATA the actual block data
Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
lengthPrefixBuf.clear();
doReadFully(ch, in, lengthPrefixBuf);
lengthPrefixBuf.flip();
int payloadLen = lengthPrefixBuf.getInt();
if (payloadLen < Ints.BYTES) {
// The "payload length" includes its own length. Therefore it
// should never be less than 4 bytes
throw new IOException("Invalid payload length " +
payloadLen);
}
int dataPlusChecksumLen = payloadLen - Ints.BYTES;
int headerLen = lengthPrefixBuf.getShort();
if (headerLen < 0) {
throw new IOException("Invalid header length " + headerLen);
}
if (LOG.isTraceEnabled()) {
LOG.trace("readNextPacket: dataPlusChecksumLen = " + dataPlusChecksumLen +
" headerLen = " + headerLen);
}
// Sanity check the buffer size so we don't allocate too much memory
// and OOME.
int totalLen = payloadLen + headerLen;
if (totalLen < 0 || totalLen > MAX_PACKET_SIZE) {
throw new IOException("Incorrect value for packet payload size: " +
payloadLen);
}
// Make sure we have space for the whole packet, and
// read it.
reallocPacketBuf(dataPlusChecksumLen + headerLen);
curPacketBuf.clear();
curPacketBuf.limit(dataPlusChecksumLen + headerLen);
doReadFully(ch, in, curPacketBuf);
curPacketBuf.flip();
// Extract the header from the front of the buffer.
byte[] headerBuf = new byte[headerLen];
curPacketBuf.get(headerBuf);
if (curHeader == null) {
curHeader = new PacketHeader();
}
curHeader.setFieldsFromData(dataPlusChecksumLen, headerBuf);
// Compute the sub-slices of the packet
int checksumLen = dataPlusChecksumLen - curHeader.getDataLen();
if (checksumLen < 0) {
throw new IOException("Invalid packet: data length in packet header " +
"exceeds data length received. dataPlusChecksumLen=" +
dataPlusChecksumLen + " header: " + curHeader);
}
reslicePacket(headerLen, checksumLen, curHeader.getDataLen());
}
/**
* Rewrite the last-read packet on the wire to the given output stream.
*/
public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException {
Preconditions.checkState(!useDirectBuffers,
"Currently only supported for non-direct buffers");
assert lengthPrefixBuf.capacity() == PacketHeader.PKT_LENGTHS_LEN;
mirrorOut.write(lengthPrefixBuf.array(),
lengthPrefixBuf.arrayOffset(),
lengthPrefixBuf.capacity());
mirrorOut.write(curPacketBuf.array(),
curPacketBuf.arrayOffset(),
curPacketBuf.remaining());
}
private static void doReadFully(ReadableByteChannel ch, InputStream in,
ByteBuffer buf) throws IOException {
if (ch != null) {
readChannelFully(ch, buf);
} else {
Preconditions.checkState(!buf.isDirect(),
"Must not use direct buffers with InputStream API");
IOUtils.readFully(in, buf.array(),
buf.arrayOffset() + buf.position(),
buf.remaining());
buf.position(buf.position() + buf.remaining());
}
}
private void reslicePacket(
int headerLen, int checksumsLen, int dataLen) {
assert dataLen >= 0 : "invalid datalen: " + dataLen;
assert curPacketBuf.position() == headerLen;
assert checksumsLen + dataLen == curPacketBuf.remaining() :
"headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen +
" rem=" + curPacketBuf.remaining();
curPacketBuf.position(headerLen);
curPacketBuf.limit(headerLen + checksumsLen);
curChecksumSlice = curPacketBuf.slice();
curPacketBuf.position(headerLen + checksumsLen);
curPacketBuf.limit(headerLen + checksumsLen + dataLen);
curDataSlice = curPacketBuf.slice();
curPacketBuf.position(0);
curPacketBuf.limit(headerLen + checksumsLen + dataLen);
}
private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
throws IOException {
while (buf.remaining() > 0) {
int n = ch.read(buf);
if (n < 0) {
throw new IOException("Premature EOF reading from " + ch);
}
}
}
private void reallocPacketBuf(int atLeastCapacity) {
// Realloc the buffer if this packet is longer than the previous
// one.
if (curPacketBuf == null ||
curPacketBuf.capacity() < atLeastCapacity) {
returnPacketBufToPool();
if (useDirectBuffers) {
curPacketBuf = bufferPool.getBuffer(atLeastCapacity);
} else {
curPacketBuf = ByteBuffer.allocate(atLeastCapacity);
}
}
}
private void returnPacketBufToPool() {
if (curPacketBuf != null && curPacketBuf.isDirect()) {
bufferPool.returnBuffer(curPacketBuf);
curPacketBuf = null;
}
}
@Override // Closeable
public void close() {
returnPacketBufToPool();
}
@Override
protected void finalize() throws Throwable {
try {
// just in case it didn't get closed, we
// may as well still try to return the buffer
returnPacketBufToPool();
} finally {
super.finalize();
}
}
}

View File

@ -23,7 +23,6 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileDescriptor; import java.io.FileDescriptor;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
@ -34,12 +33,14 @@
import java.util.zip.Checksum; import java.util.zip.Checksum;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSOutputSummer; import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
@ -77,9 +78,10 @@ class BlockReceiver implements Closeable {
private DataOutputStream checksumOut = null; // to crc file at local disk private DataOutputStream checksumOut = null; // to crc file at local disk
private int bytesPerChecksum; private int bytesPerChecksum;
private int checksumSize; private int checksumSize;
private ByteBuffer buf; // contains one full packet.
private int bufRead; //amount of valid data in the buf private PacketReceiver packetReceiver =
private int maxPacketReadLen; new PacketReceiver(false);
protected final String inAddr; protected final String inAddr;
protected final String myAddr; protected final String myAddr;
private String mirrorAddr; private String mirrorAddr;
@ -248,6 +250,10 @@ class BlockReceiver implements Closeable {
*/ */
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (packetReceiver != null) {
packetReceiver.close();
}
IOException ioe = null; IOException ioe = null;
if (syncOnClose && (out != null || checksumOut != null)) { if (syncOnClose && (out != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount(); datanode.metrics.incrFsyncCount();
@ -365,33 +371,24 @@ private void handleMirrorOutError(IOException ioe) throws IOException {
/** /**
* Verify multiple CRC chunks. * Verify multiple CRC chunks.
*/ */
private void verifyChunks( byte[] dataBuf, int dataOff, int len, private void verifyChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf)
byte[] checksumBuf, int checksumOff ) throws IOException {
throws IOException { try {
while (len > 0) { clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, clientname, 0);
int chunkLen = Math.min(len, bytesPerChecksum); } catch (ChecksumException ce) {
LOG.warn("Checksum error in block " + block + " from " + inAddr, ce);
clientChecksum.update(dataBuf, dataOff, chunkLen); if (srcDataNode != null) {
try {
if (!clientChecksum.compare(checksumBuf, checksumOff)) { LOG.info("report corrupt block " + block + " from datanode " +
if (srcDataNode != null) { srcDataNode + " to namenode");
try { datanode.reportRemoteBadBlock(srcDataNode, block);
LOG.info("report corrupt block " + block + " from datanode " + } catch (IOException e) {
srcDataNode + " to namenode"); LOG.warn("Failed to report bad block " + block +
datanode.reportRemoteBadBlock(srcDataNode, block); " from datanode " + srcDataNode + " to namenode");
} catch (IOException e) {
LOG.warn("Failed to report bad block " + block +
" from datanode " + srcDataNode + " to namenode");
}
} }
throw new IOException("Unexpected checksum mismatch " +
"while writing " + block + " from " + inAddr);
} }
throw new IOException("Unexpected checksum mismatch " +
clientChecksum.reset(); "while writing " + block + " from " + inAddr);
dataOff += chunkLen;
checksumOff += checksumSize;
len -= chunkLen;
} }
} }
@ -403,163 +400,24 @@ private void verifyChunks( byte[] dataBuf, int dataOff, int len,
* This does not verify the original checksums, under the assumption * This does not verify the original checksums, under the assumption
* that they have already been validated. * that they have already been validated.
*/ */
private void translateChunks( byte[] dataBuf, int dataOff, int len, private void translateChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) {
byte[] checksumBuf, int checksumOff ) { diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
if (len == 0) return;
int numChunks = (len - 1)/bytesPerChecksum + 1;
diskChecksum.calculateChunkedSums(
ByteBuffer.wrap(dataBuf, dataOff, len),
ByteBuffer.wrap(checksumBuf, checksumOff, numChunks * checksumSize));
} }
/**
* Makes sure buf.position() is zero without modifying buf.remaining().
* It moves the data if position needs to be changed.
*/
private void shiftBufData() {
if (bufRead != buf.limit()) {
throw new IllegalStateException("bufRead should be same as " +
"buf.limit()");
}
//shift the remaining data on buf to the front
if (buf.position() > 0) {
int dataLeft = buf.remaining();
if (dataLeft > 0) {
byte[] b = buf.array();
System.arraycopy(b, buf.position(), b, 0, dataLeft);
}
buf.position(0);
bufRead = dataLeft;
buf.limit(bufRead);
}
}
/**
* reads upto toRead byte to buf at buf.limit() and increments the limit.
* throws an IOException if read does not succeed.
*/
private int readToBuf(int toRead) throws IOException {
if (toRead < 0) {
toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity())
- buf.limit();
}
int nRead = in.read(buf.array(), buf.limit(), toRead);
if (nRead < 0) {
throw new EOFException("while trying to read " + toRead + " bytes");
}
bufRead = buf.limit() + nRead;
buf.limit(bufRead);
return nRead;
}
/**
* Reads (at least) one packet and returns the packet length.
* buf.position() points to the start of the packet and
* buf.limit() point to the end of the packet. There could
* be more data from next packet in buf.<br><br>
*
* It tries to read a full packet with single read call.
* Consecutive packets are usually of the same length.
*/
private void readNextPacket() throws IOException {
/* This dances around buf a little bit, mainly to read
* full packet with single read and to accept arbitrary size
* for next packet at the same time.
*/
if (buf == null) {
/* initialize buffer to the best guess size:
* 'chunksPerPacket' calculation here should match the same
* calculation in DFSClient to make the guess accurate.
*/
int chunkSize = bytesPerChecksum + checksumSize;
int chunksPerPacket = (datanode.getDnConf().writePacketSize - PacketHeader.PKT_HEADER_LEN
+ chunkSize - 1)/chunkSize;
buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN +
Math.max(chunksPerPacket, 1) * chunkSize);
buf.limit(0);
}
// See if there is data left in the buffer :
if (bufRead > buf.limit()) {
buf.limit(bufRead);
}
while (buf.remaining() < HdfsConstants.BYTES_IN_INTEGER) {
if (buf.position() > 0) {
shiftBufData();
}
readToBuf(-1);
}
/* We mostly have the full packet or at least enough for an int
*/
buf.mark();
int payloadLen = buf.getInt();
buf.reset();
// check corrupt values for pktLen, 100MB upper limit should be ok?
if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
throw new IOException("Incorrect value for packet payload : " +
payloadLen);
}
// Subtract BYTES_IN_INTEGER since that accounts for the payloadLen that
// we read above.
int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN
- HdfsConstants.BYTES_IN_INTEGER;
if (buf.remaining() < pktSize) {
//we need to read more data
int toRead = pktSize - buf.remaining();
// first make sure buf has enough space.
int spaceLeft = buf.capacity() - buf.limit();
if (toRead > spaceLeft && buf.position() > 0) {
shiftBufData();
spaceLeft = buf.capacity() - buf.limit();
}
if (toRead > spaceLeft) {
byte oldBuf[] = buf.array();
int toCopy = buf.limit();
buf = ByteBuffer.allocate(toCopy + toRead);
System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy);
buf.limit(toCopy);
}
//now read:
while (toRead > 0) {
toRead -= readToBuf(toRead);
}
}
if (buf.remaining() > pktSize) {
buf.limit(buf.position() + pktSize);
}
if (pktSize > maxPacketReadLen) {
maxPacketReadLen = pktSize;
}
}
/** /**
* Receives and processes a packet. It can contain many chunks. * Receives and processes a packet. It can contain many chunks.
* returns the number of data bytes that the packet has. * returns the number of data bytes that the packet has.
*/ */
private int receivePacket() throws IOException { private int receivePacket() throws IOException {
// read the next packet // read the next packet
readNextPacket(); packetReceiver.receiveNextPacket(in);
buf.mark(); PacketHeader header = packetReceiver.getHeader();
PacketHeader header = new PacketHeader(); if (LOG.isDebugEnabled()){
header.readFields(buf); LOG.debug("Receiving one packet for block " + block +
int endOfHeader = buf.position(); ": " + header);
buf.reset(); }
// Sanity check the header // Sanity check the header
if (header.getOffsetInBlock() > replicaInfo.getNumBytes()) { if (header.getOffsetInBlock() > replicaInfo.getNumBytes()) {
@ -574,38 +432,12 @@ private int receivePacket() throws IOException {
header.getDataLen()); header.getDataLen());
} }
return receivePacket( long offsetInBlock = header.getOffsetInBlock();
header.getOffsetInBlock(), long seqno = header.getSeqno();
header.getSeqno(), boolean lastPacketInBlock = header.isLastPacketInBlock();
header.isLastPacketInBlock(), int len = header.getDataLen();
header.getDataLen(), boolean syncBlock = header.getSyncBlock();
header.getSyncBlock(),
endOfHeader);
}
/**
* Write the received packet to disk (data only)
*/
private void writePacketToDisk(byte[] pktBuf, int startByteToDisk,
int numBytesToDisk) throws IOException {
out.write(pktBuf, startByteToDisk, numBytesToDisk);
}
/**
* Receives and processes a packet. It can contain many chunks.
* returns the number of data bytes that the packet has.
*/
private int receivePacket(long offsetInBlock, long seqno,
boolean lastPacketInBlock, int len, boolean syncBlock,
int endOfHeader) throws IOException {
if (LOG.isDebugEnabled()){
LOG.debug("Receiving one packet for block " + block +
" of length " + len +
" seqno " + seqno +
" offsetInBlock " + offsetInBlock +
" syncBlock " + syncBlock +
" lastPacketInBlock " + lastPacketInBlock);
}
// make sure the block gets sync'ed upon close // make sure the block gets sync'ed upon close
this.syncOnClose |= syncBlock && lastPacketInBlock; this.syncOnClose |= syncBlock && lastPacketInBlock;
@ -625,14 +457,15 @@ private int receivePacket(long offsetInBlock, long seqno,
//First write the packet to the mirror: //First write the packet to the mirror:
if (mirrorOut != null && !mirrorError) { if (mirrorOut != null && !mirrorError) {
try { try {
mirrorOut.write(buf.array(), buf.position(), buf.remaining()); packetReceiver.mirrorPacketTo(mirrorOut);
mirrorOut.flush(); mirrorOut.flush();
} catch (IOException e) { } catch (IOException e) {
handleMirrorOutError(e); handleMirrorOutError(e);
} }
} }
buf.position(endOfHeader); ByteBuffer dataBuf = packetReceiver.getDataSlice();
ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
if (lastPacketInBlock || len == 0) { if (lastPacketInBlock || len == 0) {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
@ -646,18 +479,11 @@ private int receivePacket(long offsetInBlock, long seqno,
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)* int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
checksumSize; checksumSize;
if ( buf.remaining() != (checksumLen + len)) { if ( checksumBuf.capacity() != checksumLen) {
throw new IOException("Data remaining in packet does not match" + throw new IOException("Length of checksums in packet " +
"sum of checksumLen and dataLen " + checksumBuf.capacity() + " does not match calculated checksum " +
" size remaining: " + buf.remaining() + "length " + checksumLen);
" data len: " + len + }
" checksum Len: " + checksumLen);
}
int checksumOff = buf.position();
int dataOff = checksumOff + checksumLen;
byte pktBuf[] = buf.array();
buf.position(buf.limit()); // move to the end of the data.
/* skip verifying checksum iff this is not the last one in the /* skip verifying checksum iff this is not the last one in the
* pipeline and clientName is non-null. i.e. Checksum is verified * pipeline and clientName is non-null. i.e. Checksum is verified
@ -667,11 +493,11 @@ private int receivePacket(long offsetInBlock, long seqno,
* checksum. * checksum.
*/ */
if (mirrorOut == null || isDatanode || needsChecksumTranslation) { if (mirrorOut == null || isDatanode || needsChecksumTranslation) {
verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff); verifyChunks(dataBuf, checksumBuf);
if (needsChecksumTranslation) { if (needsChecksumTranslation) {
// overwrite the checksums in the packet buffer with the // overwrite the checksums in the packet buffer with the
// appropriate polynomial for the disk storage. // appropriate polynomial for the disk storage.
translateChunks(pktBuf, dataOff, len, pktBuf, checksumOff); translateChunks(dataBuf, checksumBuf);
} }
} }
@ -700,9 +526,13 @@ private int receivePacket(long offsetInBlock, long seqno,
computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum); computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
} }
int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock); int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
+ dataBuf.arrayOffset() + dataBuf.position();
int numBytesToDisk = (int)(offsetInBlock-onDiskLen); int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk);
// Write data to disk.
out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
// If this is a partial chunk, then verify that this is the only // If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk. // chunk in the packet. Calculate new crc for this chunk.
@ -714,7 +544,7 @@ private int receivePacket(long offsetInBlock, long seqno,
" len = " + len + " len = " + len +
" bytesPerChecksum " + bytesPerChecksum); " bytesPerChecksum " + bytesPerChecksum);
} }
partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk); partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize); byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
lastChunkChecksum = Arrays.copyOfRange( lastChunkChecksum = Arrays.copyOfRange(
buf, buf.length - checksumSize, buf.length buf, buf.length - checksumSize, buf.length
@ -726,11 +556,12 @@ private int receivePacket(long offsetInBlock, long seqno,
partialCrc = null; partialCrc = null;
} else { } else {
lastChunkChecksum = Arrays.copyOfRange( lastChunkChecksum = Arrays.copyOfRange(
pktBuf, checksumBuf.array(),
checksumOff + checksumLen - checksumSize, checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen - checksumSize,
checksumOff + checksumLen checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen);
); checksumOut.write(checksumBuf.array(),
checksumOut.write(pktBuf, checksumOff, checksumLen); checksumBuf.arrayOffset() + checksumBuf.position(),
checksumLen);
} }
/// flush entire packet, sync unless close() will sync /// flush entire packet, sync unless close() will sync
flushOrSync(syncBlock && !lastPacketInBlock); flushOrSync(syncBlock && !lastPacketInBlock);

View File

@ -62,40 +62,29 @@
* </pre> * </pre>
* An empty packet is sent to mark the end of block and read completion. * An empty packet is sent to mark the end of block and read completion.
* *
* PACKET Contains a packet header, checksum and data. Amount of data * PACKET Contains a packet header, checksum and data. Amount of data
* carried is set by BUFFER_SIZE. * carried is set by BUFFER_SIZE.
* <pre> * <pre>
* +-----------------------------------------------------+ * +-----------------------------------------------------+
* | 4 byte packet length (excluding packet header) | * | Variable length header. See {@link PacketHeader} |
* +-----------------------------------------------------+ * +-----------------------------------------------------+
* | 8 byte offset in the block | 8 byte sequence number | * | x byte checksum data. x is defined below |
* +-----------------------------------------------------+ * +-----------------------------------------------------+
* | 1 byte isLastPacketInBlock | * | actual data ...... |
* +-----------------------------------------------------+ * +-----------------------------------------------------+
* | 4 byte Length of actual data | *
* +-----------------------------------------------------+ * Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
* | x byte checksum data. x is defined below | * A checksum is calculated for each chunk.
* +-----------------------------------------------------+ *
* | actual data ...... | * x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
* +-----------------------------------------------------+ * CHECKSUM_SIZE
* *
* Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM. * CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
* A checksum is calculated for each chunk. * </pre>
*
* x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
* CHECKSUM_SIZE
*
* CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
* </pre>
* *
* The client reads data until it receives a packet with * The client reads data until it receives a packet with
* "LastPacketInBlock" set to true or with a zero length. If there is * "LastPacketInBlock" set to true or with a zero length. If there is
* no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK: * no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK.
* <pre>
* +------------------------------+
* | 2 byte OP_STATUS_CHECKSUM_OK |
* +------------------------------+
* </pre>
*/ */
class BlockSender implements java.io.Closeable { class BlockSender implements java.io.Closeable {
static final Log LOG = DataNode.LOG; static final Log LOG = DataNode.LOG;
@ -448,8 +437,22 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
int packetLen = dataLen + checksumDataLen + 4; int packetLen = dataLen + checksumDataLen + 4;
boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0; boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
writePacketHeader(pkt, dataLen, packetLen); // The packet buffer is organized as follows:
// _______HHHHCCCCD?D?D?D?
// ^ ^
// | \ checksumOff
// \ headerOff
// _ padding, since the header is variable-length
// H = header and length prefixes
// C = checksums
// D? = data, if transferTo is false.
int headerLen = writePacketHeader(pkt, dataLen, packetLen);
// Per above, the header doesn't start at the beginning of the
// buffer
int headerOff = pkt.position() - headerLen;
int checksumOff = pkt.position(); int checksumOff = pkt.position();
byte[] buf = pkt.array(); byte[] buf = pkt.array();
@ -479,7 +482,8 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
try { try {
if (transferTo) { if (transferTo) {
SocketOutputStream sockOut = (SocketOutputStream)out; SocketOutputStream sockOut = (SocketOutputStream)out;
sockOut.write(buf, 0, dataOff); // First write checksum // First write header and checksums
sockOut.write(buf, headerOff, dataOff - headerOff);
// no need to flush since we know out is not a buffered stream // no need to flush since we know out is not a buffered stream
FileChannel fileCh = ((FileInputStream)blockIn).getChannel(); FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
@ -492,7 +496,7 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
blockInPosition += dataLen; blockInPosition += dataLen;
} else { } else {
// normal transfer // normal transfer
out.write(buf, 0, dataOff + dataLen); out.write(buf, headerOff, dataOff + dataLen - headerOff);
} }
} catch (IOException e) { } catch (IOException e) {
if (e instanceof SocketTimeoutException) { if (e instanceof SocketTimeoutException) {
@ -625,7 +629,7 @@ long sendBlock(DataOutputStream out, OutputStream baseStream,
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
try { try {
int maxChunksPerPacket; int maxChunksPerPacket;
int pktSize = PacketHeader.PKT_HEADER_LEN; int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
boolean transferTo = transferToAllowed && !verifyChecksum boolean transferTo = transferToAllowed && !verifyChecksum
&& baseStream instanceof SocketOutputStream && baseStream instanceof SocketOutputStream
&& blockIn instanceof FileInputStream; && blockIn instanceof FileInputStream;
@ -636,15 +640,15 @@ long sendBlock(DataOutputStream out, OutputStream baseStream,
maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE); maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
// Smaller packet size to only hold checksum when doing transferTo // Smaller packet size to only hold checksum when doing transferTo
pktSize += checksumSize * maxChunksPerPacket; pktBufSize += checksumSize * maxChunksPerPacket;
} else { } else {
maxChunksPerPacket = Math.max(1, maxChunksPerPacket = Math.max(1,
numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE)); numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
// Packet size includes both checksum and data // Packet size includes both checksum and data
pktSize += (chunkSize + checksumSize) * maxChunksPerPacket; pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
} }
ByteBuffer pktBuf = ByteBuffer.allocate(pktSize); ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);
while (endOffset > offset) { while (endOffset > offset) {
manageOsCache(); manageOsCache();
@ -714,14 +718,19 @@ private boolean isLongRead() {
} }
/** /**
* Write packet header into {@code pkt} * Write packet header into {@code pkt},
* return the length of the header written.
*/ */
private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) { private int writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
pkt.clear(); pkt.clear();
// both syncBlock and syncPacket are false // both syncBlock and syncPacket are false
PacketHeader header = new PacketHeader(packetLen, offset, seqno, PacketHeader header = new PacketHeader(packetLen, offset, seqno,
(dataLen == 0), dataLen, false); (dataLen == 0), dataLen, false);
int size = header.getSerializedSize();
pkt.position(PacketHeader.PKT_MAX_HEADER_LEN - size);
header.putInBuffer(pkt); header.putInBuffer(pkt);
return size;
} }
boolean didSendEntireByteRange() { boolean didSendEntireByteRange() {