From 9ea7c06468d236452f03c38a31d1a45f7f09dc50 Mon Sep 17 00:00:00 2001 From: Aaron Myers Date: Thu, 9 Aug 2012 21:31:12 +0000 Subject: [PATCH] HDFS-3721. hsync support broke wire compatibility. Contributed by Todd Lipcon and Aaron T. Myers. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1371495 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../apache/hadoop/hdfs/DFSOutputStream.java | 130 ++++---- .../hadoop/hdfs/RemoteBlockReader2.java | 118 ++----- .../protocol/datatransfer/PacketHeader.java | 76 ++++- .../protocol/datatransfer/PacketReceiver.java | 292 +++++++++++++++++ .../hdfs/server/datanode/BlockReceiver.java | 295 ++++-------------- .../hdfs/server/datanode/BlockSender.java | 91 +++--- 7 files changed, 557 insertions(+), 447 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index e05f8aabb3..4e630fd23f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -579,6 +579,8 @@ Branch-2 ( Unreleased changes ) HDFS-3710. libhdfs misuses O_RDONLY/WRONLY/RDWR. (Andy Isaacson via atm) + HDFS-3721. hsync support broke wire compatibility. (todd and atm) + BREAKDOWN OF HDFS-3042 SUBTASKS HDFS-2185. HDFS portion of ZK-based FailoverController (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index dd98b65691..50785098bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -30,7 +30,6 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.nio.BufferOverflowException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; @@ -126,7 +125,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { private long lastQueuedSeqno = -1; private long lastAckedSeqno = -1; private long bytesCurBlock = 0; // bytes writen in current block - private int packetSize = 0; // write packet size, including the header. + private int packetSize = 0; // write packet size, not including the header. private int chunksPerPacket = 0; private volatile IOException lastException = null; private long artificialSlowdown = 0; @@ -147,28 +146,31 @@ private class Packet { int numChunks; // number of chunks currently in packet int maxChunks; // max chunks in packet - /** buffer for accumulating packet checksum and data */ - ByteBuffer buffer; // wraps buf, only one of these two may be non-null byte[] buf; /** * buf is pointed into like follows: * (C is checksum data, D is payload data) * - * [HHHHHCCCCC________________DDDDDDDDDDDDDDDD___] - * ^ ^ ^ ^ - * | checksumPos dataStart dataPos - * checksumStart + * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___] + * ^ ^ ^ ^ + * | checksumPos dataStart dataPos + * checksumStart + * + * Right before sending, we move the checksum data to immediately precede + * the actual data, and then insert the header into the buffer immediately + * preceding the checksum data, so we make sure to keep enough space in + * front of the checksum data to support the largest conceivable header. */ int checksumStart; + int checksumPos; int dataStart; int dataPos; - int checksumPos; private static final long HEART_BEAT_SEQNO = -1L; /** - * create a heartbeat packet + * Create a heartbeat packet. */ Packet() { this.lastPacketInBlock = false; @@ -176,17 +178,19 @@ private class Packet { this.offsetInBlock = 0; this.seqno = HEART_BEAT_SEQNO; - buffer = null; - int packetSize = PacketHeader.PKT_HEADER_LEN + HdfsConstants.BYTES_IN_INTEGER; - buf = new byte[packetSize]; + buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN]; - checksumStart = dataStart = packetSize; - checksumPos = checksumStart; - dataPos = dataStart; + checksumStart = checksumPos = dataPos = dataStart = PacketHeader.PKT_MAX_HEADER_LEN; maxChunks = 0; } - // create a new packet + /** + * Create a new packet. + * + * @param pktSize maximum size of the packet, including checksum data and actual data. + * @param chunksPerPkt maximum number of chunks per packet. + * @param offsetInBlock offset in bytes into the HDFS block. + */ Packet(int pktSize, int chunksPerPkt, long offsetInBlock) { this.lastPacketInBlock = false; this.numChunks = 0; @@ -194,25 +198,24 @@ private class Packet { this.seqno = currentSeqno; currentSeqno++; - buffer = null; - buf = new byte[pktSize]; + buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize]; - checksumStart = PacketHeader.PKT_HEADER_LEN; + checksumStart = PacketHeader.PKT_MAX_HEADER_LEN; checksumPos = checksumStart; - dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize(); + dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize()); dataPos = dataStart; maxChunks = chunksPerPkt; } void writeData(byte[] inarray, int off, int len) { - if ( dataPos + len > buf.length) { + if (dataPos + len > buf.length) { throw new BufferOverflowException(); } System.arraycopy(inarray, off, buf, dataPos, len); dataPos += len; } - void writeChecksum(byte[] inarray, int off, int len) { + void writeChecksum(byte[] inarray, int off, int len) { if (checksumPos + len > dataStart) { throw new BufferOverflowException(); } @@ -221,45 +224,38 @@ void writeChecksum(byte[] inarray, int off, int len) { } /** - * Returns ByteBuffer that contains one full packet, including header. + * Write the full packet, including the header, to the given output stream. */ - ByteBuffer getBuffer() { - /* Once this is called, no more data can be added to the packet. - * setting 'buf' to null ensures that. - * This is called only when the packet is ready to be sent. - */ - if (buffer != null) { - return buffer; - } - - //prepare the header and close any gap between checksum and data. - - int dataLen = dataPos - dataStart; - int checksumLen = checksumPos - checksumStart; - - if (checksumPos != dataStart) { - /* move the checksum to cover the gap. - * This can happen for the last packet. - */ - System.arraycopy(buf, checksumStart, buf, - dataStart - checksumLen , checksumLen); - } - - int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen; - - //normally dataStart == checksumPos, i.e., offset is zero. - buffer = ByteBuffer.wrap( - buf, dataStart - checksumPos, - PacketHeader.PKT_HEADER_LEN + pktLen - HdfsConstants.BYTES_IN_INTEGER); - buf = null; - buffer.mark(); + void writeTo(DataOutputStream stm) throws IOException { + final int dataLen = dataPos - dataStart; + final int checksumLen = checksumPos - checksumStart; + final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen; PacketHeader header = new PacketHeader( pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock); - header.putInBuffer(buffer); - buffer.reset(); - return buffer; + if (checksumPos != dataStart) { + // Move the checksum to cover the gap. This can happen for the last + // packet or during an hflush/hsync call. + System.arraycopy(buf, checksumStart, buf, + dataStart - checksumLen , checksumLen); + checksumPos = dataStart; + checksumStart = checksumPos - checksumLen; + } + + final int headerStart = checksumStart - header.getSerializedSize(); + assert checksumStart + 1 >= header.getSerializedSize(); + assert checksumPos == dataStart; + assert headerStart >= 0; + assert headerStart + header.getSerializedSize() == checksumStart; + + // Copy the header data into the buffer immediately preceding the checksum + // data. + System.arraycopy(header.getBytes(), 0, buf, headerStart, + header.getSerializedSize()); + + // Write the now contiguous full packet to the output stream. + stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen); } // get the packet's last byte's offset in the block @@ -502,8 +498,6 @@ public void run() { } // send the packet - ByteBuffer buf = one.getBuffer(); - synchronized (dataQueue) { // move packet from dataQueue to ackQueue if (!one.isHeartbeatPacket()) { @@ -519,8 +513,8 @@ public void run() { } // write out data to remote datanode - try { - blockStream.write(buf.array(), buf.position(), buf.remaining()); + try { + one.writeTo(blockStream); blockStream.flush(); } catch (IOException e) { // HDFS-3398 treat primary DN is down since client is unable to @@ -1358,9 +1352,8 @@ static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src, private void computePacketChunkSize(int psize, int csize) { int chunkSize = csize + checksum.getChecksumSize(); - int n = PacketHeader.PKT_HEADER_LEN; - chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1); - packetSize = n + chunkSize*chunksPerPacket; + chunksPerPacket = Math.max(psize/chunkSize, 1); + packetSize = chunkSize*chunksPerPacket; if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("computePacketChunkSize: src=" + src + ", chunkSize=" + chunkSize + @@ -1474,8 +1467,7 @@ protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] che // indicate the end of block and reset bytesCurBlock. // if (bytesCurBlock == blockSize) { - currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, - bytesCurBlock); + currentPacket = new Packet(0, 0, bytesCurBlock); currentPacket.lastPacketInBlock = true; currentPacket.syncBlock = shouldSyncBlock; waitAndQueueCurrentPacket(); @@ -1745,8 +1737,7 @@ public synchronized void close() throws IOException { if (bytesCurBlock != 0) { // send an empty packet to mark the end of the block - currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, - bytesCurBlock); + currentPacket = new Packet(0, 0, bytesCurBlock); currentPacket.lastPacketInBlock = true; currentPacket.syncBlock = shouldSyncBlock; } @@ -1799,8 +1790,7 @@ public void setArtificialSlowdown(long period) { @VisibleForTesting public synchronized void setChunksPerPacket(int value) { chunksPerPacket = Math.min(chunksPerPacket, value); - packetSize = PacketHeader.PKT_HEADER_LEN + - (checksum.getBytesPerChecksum() + + packetSize = (checksum.getBytesPerChecksum() + checksum.getChecksumSize()) * chunksPerPacket; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index e1e9ca5e81..39a9b3086a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -33,12 +33,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; @@ -48,14 +48,11 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.util.DirectBufferPool; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.SocketInputWrapper; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; -import com.google.common.base.Preconditions; - /** * This is a wrapper around connection to datanode * and understands checksum, offset etc. @@ -93,11 +90,9 @@ public class RemoteBlockReader2 implements BlockReader { private final ReadableByteChannel in; private DataChecksum checksum; - private PacketHeader curHeader; - private ByteBuffer curPacketBuf = null; + private PacketReceiver packetReceiver = new PacketReceiver(true); private ByteBuffer curDataSlice = null; - /** offset in block of the last chunk received */ private long lastSeqNo = -1; @@ -105,10 +100,6 @@ public class RemoteBlockReader2 implements BlockReader { private long startOffset; private final String filename; - private static DirectBufferPool bufferPool = new DirectBufferPool(); - private final ByteBuffer headerBuf = ByteBuffer.allocate( - PacketHeader.PKT_HEADER_LEN); - private final int bytesPerChecksum; private final int checksumSize; @@ -132,7 +123,7 @@ public class RemoteBlockReader2 implements BlockReader { public synchronized int read(byte[] buf, int off, int len) throws IOException { - if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { readNextPacket(); } if (curDataSlice.remaining() == 0) { @@ -149,7 +140,7 @@ public synchronized int read(byte[] buf, int off, int len) @Override public int read(ByteBuffer buf) throws IOException { - if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { readNextPacket(); } if (curDataSlice.remaining() == 0) { @@ -167,11 +158,13 @@ public int read(ByteBuffer buf) throws IOException { } private void readNextPacket() throws IOException { - Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock()); - //Read packet headers. - readPacketHeader(); + packetReceiver.receiveNextPacket(in); + PacketHeader curHeader = packetReceiver.getHeader(); + curDataSlice = packetReceiver.getDataSlice(); + assert curDataSlice.capacity() == curHeader.getDataLen(); + if (LOG.isTraceEnabled()) { LOG.trace("DFSClient readNextPacket got header " + curHeader); } @@ -185,17 +178,20 @@ private void readNextPacket() throws IOException { if (curHeader.getDataLen() > 0) { int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum; int checksumsLen = chunks * checksumSize; - int bufsize = checksumsLen + curHeader.getDataLen(); + + assert packetReceiver.getChecksumSlice().capacity() == checksumsLen : + "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() + + " checksumsLen=" + checksumsLen; - resetPacketBuffer(checksumsLen, curHeader.getDataLen()); - lastSeqNo = curHeader.getSeqno(); - if (bufsize > 0) { - readChannelFully(in, curPacketBuf); - curPacketBuf.flip(); - if (verifyChecksum) { - verifyPacketChecksums(); - } + if (verifyChecksum && curDataSlice.remaining() > 0) { + // N.B.: the checksum error offset reported here is actually + // relative to the start of the block, not the start of the file. + // This is slightly misleading, but preserves the behavior from + // the older BlockReader. + checksum.verifyChunkedSums(curDataSlice, + packetReceiver.getChecksumSlice(), + filename, curHeader.getOffsetInBlock()); } bytesNeededToFinish -= curHeader.getDataLen(); } @@ -218,40 +214,7 @@ private void readNextPacket() throws IOException { } } } - - private void verifyPacketChecksums() throws ChecksumException { - // N.B.: the checksum error offset reported here is actually - // relative to the start of the block, not the start of the file. - // This is slightly misleading, but preserves the behavior from - // the older BlockReader. - checksum.verifyChunkedSums(curDataSlice, curPacketBuf, - filename, curHeader.getOffsetInBlock()); - } - - private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf) - throws IOException { - while (buf.remaining() > 0) { - int n = ch.read(buf); - if (n < 0) { - throw new IOException("Premature EOF reading from " + ch); - } - } - } - - private void resetPacketBuffer(int checksumsLen, int dataLen) { - int packetLen = checksumsLen + dataLen; - if (curPacketBuf == null || - curPacketBuf.capacity() < packetLen) { - returnPacketBufToPool(); - curPacketBuf = bufferPool.getBuffer(packetLen); - } - curPacketBuf.position(checksumsLen); - curDataSlice = curPacketBuf.slice(); - curDataSlice.limit(dataLen); - curPacketBuf.clear(); - curPacketBuf.limit(checksumsLen + dataLen); - } - + @Override public synchronized long skip(long n) throws IOException { /* How can we make sure we don't throw a ChecksumException, at least @@ -272,23 +235,14 @@ public synchronized long skip(long n) throws IOException { return nSkipped; } - private void readPacketHeader() throws IOException { - headerBuf.clear(); - readChannelFully(in, headerBuf); - headerBuf.flip(); - if (curHeader == null) curHeader = new PacketHeader(); - curHeader.readFields(headerBuf); - } - private void readTrailingEmptyPacket() throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Reading empty packet at end of read"); } - headerBuf.clear(); - readChannelFully(in, headerBuf); - headerBuf.flip(); - PacketHeader trailer = new PacketHeader(); - trailer.readFields(headerBuf); + + packetReceiver.receiveNextPacket(in); + + PacketHeader trailer = packetReceiver.getHeader(); if (!trailer.isLastPacketInBlock() || trailer.getDataLen() != 0) { throw new IOException("Expected empty end-of-read packet! Header: " + @@ -321,7 +275,7 @@ protected RemoteBlockReader2(String file, String bpid, long blockId, @Override public synchronized void close() throws IOException { - returnPacketBufToPool(); + packetReceiver.close(); startOffset = -1; checksum = null; @@ -332,24 +286,6 @@ public synchronized void close() throws IOException { // in will be closed when its Socket is closed. } - @Override - protected void finalize() throws Throwable { - try { - // just in case it didn't get closed, we - // may as well still try to return the buffer - returnPacketBufToPool(); - } finally { - super.finalize(); - } - } - - private void returnPacketBufToPool() { - if (curPacketBuf != null) { - bufferPool.returnBuffer(curPacketBuf); - curPacketBuf = null; - } - } - /** * Take the socket used to talk to the DN. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java index 083e2b0b91..c9966a71a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java @@ -27,14 +27,31 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto; import org.apache.hadoop.hdfs.util.ByteBufferOutputStream; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Shorts; +import com.google.common.primitives.Ints; +import com.google.protobuf.InvalidProtocolBufferException; + /** * Header data for each packet that goes through the read/write pipelines. + * Includes all of the information about the packet, excluding checksums and + * actual data. + * + * This data includes: + * - the offset in bytes into the HDFS block of the data in this packet + * - the sequence number of this packet in the pipeline + * - whether or not this is the last packet in the pipeline + * - the length of the data in this packet + * - whether or not this packet should be synced by the DNs. + * + * When serialized, this header is written out as a protocol buffer, preceded + * by a 4-byte integer representing the full packet length, and a 2-byte short + * representing the header length. */ @InterfaceAudience.Private @InterfaceStability.Evolving public class PacketHeader { - /** Header size for a packet */ - private static final int PROTO_SIZE = + private static final int MAX_PROTO_SIZE = PacketHeaderProto.newBuilder() .setOffsetInBlock(0) .setSeqno(0) @@ -42,8 +59,10 @@ public class PacketHeader { .setDataLen(0) .setSyncBlock(false) .build().getSerializedSize(); - public static final int PKT_HEADER_LEN = - 6 + PROTO_SIZE; + public static final int PKT_LENGTHS_LEN = + Ints.BYTES + Shorts.BYTES; + public static final int PKT_MAX_HEADER_LEN = + PKT_LENGTHS_LEN + MAX_PROTO_SIZE; private int packetLen; private PacketHeaderProto proto; @@ -54,13 +73,25 @@ public PacketHeader() { public PacketHeader(int packetLen, long offsetInBlock, long seqno, boolean lastPacketInBlock, int dataLen, boolean syncBlock) { this.packetLen = packetLen; - proto = PacketHeaderProto.newBuilder() + Preconditions.checkArgument(packetLen >= Ints.BYTES, + "packet len %s should always be at least 4 bytes", + packetLen); + + PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder() .setOffsetInBlock(offsetInBlock) .setSeqno(seqno) .setLastPacketInBlock(lastPacketInBlock) - .setDataLen(dataLen) - .setSyncBlock(syncBlock) - .build(); + .setDataLen(dataLen); + + if (syncBlock) { + // Only set syncBlock if it is specified. + // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721 + // because it changes the length of the packet header, and BlockReceiver + // in that version did not support variable-length headers. + builder.setSyncBlock(syncBlock); + } + + proto = builder.build(); } public int getDataLen() { @@ -90,10 +121,16 @@ public boolean getSyncBlock() { @Override public String toString() { return "PacketHeader with packetLen=" + packetLen + - "Header data: " + + " header data: " + proto.toString(); } + public void setFieldsFromData( + int packetLen, byte[] headerData) throws InvalidProtocolBufferException { + this.packetLen = packetLen; + proto = PacketHeaderProto.parseFrom(headerData); + } + public void readFields(ByteBuffer buf) throws IOException { packetLen = buf.getInt(); short protoLen = buf.getShort(); @@ -110,14 +147,21 @@ public void readFields(DataInputStream in) throws IOException { proto = PacketHeaderProto.parseFrom(data); } + /** + * @return the number of bytes necessary to write out this header, + * including the length-prefixing of the payload and header + */ + public int getSerializedSize() { + return PKT_LENGTHS_LEN + proto.getSerializedSize(); + } /** * Write the header into the buffer. * This requires that PKT_HEADER_LEN bytes are available. */ public void putInBuffer(final ByteBuffer buf) { - assert proto.getSerializedSize() == PROTO_SIZE - : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize(); + assert proto.getSerializedSize() <= MAX_PROTO_SIZE + : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize(); try { buf.putInt(packetLen); buf.putShort((short) proto.getSerializedSize()); @@ -128,12 +172,18 @@ public void putInBuffer(final ByteBuffer buf) { } public void write(DataOutputStream out) throws IOException { - assert proto.getSerializedSize() == PROTO_SIZE - : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize(); + assert proto.getSerializedSize() <= MAX_PROTO_SIZE + : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize(); out.writeInt(packetLen); out.writeShort(proto.getSerializedSize()); proto.writeTo(out); } + + public byte[] getBytes() { + ByteBuffer buf = ByteBuffer.allocate(getSerializedSize()); + putInBuffer(buf); + return buf.array(); + } /** * Perform a sanity check on the packet, returning true if it is sane. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java new file mode 100644 index 0000000000..6f84fe0202 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.Closeable; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.util.DirectBufferPool; +import org.apache.hadoop.io.IOUtils; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; + +/** + * Class to handle reading packets one-at-a-time from the wire. + * These packets are used both for reading and writing data to/from + * DataNodes. + */ +@InterfaceAudience.Private +public class PacketReceiver implements Closeable { + + /** + * The max size of any single packet. This prevents OOMEs when + * invalid data is sent. + */ + private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024; + + static Log LOG = LogFactory.getLog(PacketReceiver.class); + + private static final DirectBufferPool bufferPool = new DirectBufferPool(); + private final boolean useDirectBuffers; + + /** + * Internal buffer for reading the length prefixes at the start of + * the packet. + */ + private final ByteBuffer lengthPrefixBuf = ByteBuffer.allocate( + PacketHeader.PKT_LENGTHS_LEN); + + /** + * The entirety of the most recently read packet, excepting the + * length prefixes. + */ + private ByteBuffer curPacketBuf = null; + + /** + * A slice of {@link #curPacketBuf} which contains just the checksums. + */ + private ByteBuffer curChecksumSlice = null; + + /** + * A slice of {@link #curPacketBuf} which contains just the data. + */ + private ByteBuffer curDataSlice = null; + + /** + * The packet header of the most recently read packet. + */ + private PacketHeader curHeader; + + public PacketReceiver(boolean useDirectBuffers) { + this.useDirectBuffers = useDirectBuffers; + } + + public PacketHeader getHeader() { + return curHeader; + } + + public ByteBuffer getDataSlice() { + return curDataSlice; + } + + public ByteBuffer getChecksumSlice() { + return curChecksumSlice; + } + + /** + * Reads all of the data for the next packet into the appropriate buffers. + * + * The data slice and checksum slice members will be set to point to the + * user data and corresponding checksums. The header will be parsed and + * set. + */ + public void receiveNextPacket(ReadableByteChannel in) throws IOException { + doRead(in, null); + } + + /** + * @see #receiveNextPacket(ReadableByteChannel) + */ + public void receiveNextPacket(InputStream in) throws IOException { + doRead(null, in); + } + + private void doRead(ReadableByteChannel ch, InputStream in) + throws IOException { + // Each packet looks like: + // PLEN HLEN HEADER CHECKSUMS DATA + // 32-bit 16-bit + // + // PLEN: Payload length + // = length(PLEN) + length(CHECKSUMS) + length(DATA) + // This length includes its own encoded length in + // the sum for historical reasons. + // + // HLEN: Header length + // = length(HEADER) + // + // HEADER: the actual packet header fields, encoded in protobuf + // CHECKSUMS: the crcs for the data chunk. May be missing if + // checksums were not requested + // DATA the actual block data + Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock()); + + lengthPrefixBuf.clear(); + doReadFully(ch, in, lengthPrefixBuf); + lengthPrefixBuf.flip(); + int payloadLen = lengthPrefixBuf.getInt(); + + if (payloadLen < Ints.BYTES) { + // The "payload length" includes its own length. Therefore it + // should never be less than 4 bytes + throw new IOException("Invalid payload length " + + payloadLen); + } + int dataPlusChecksumLen = payloadLen - Ints.BYTES; + int headerLen = lengthPrefixBuf.getShort(); + if (headerLen < 0) { + throw new IOException("Invalid header length " + headerLen); + } + + if (LOG.isTraceEnabled()) { + LOG.trace("readNextPacket: dataPlusChecksumLen = " + dataPlusChecksumLen + + " headerLen = " + headerLen); + } + + // Sanity check the buffer size so we don't allocate too much memory + // and OOME. + int totalLen = payloadLen + headerLen; + if (totalLen < 0 || totalLen > MAX_PACKET_SIZE) { + throw new IOException("Incorrect value for packet payload size: " + + payloadLen); + } + + // Make sure we have space for the whole packet, and + // read it. + reallocPacketBuf(dataPlusChecksumLen + headerLen); + curPacketBuf.clear(); + curPacketBuf.limit(dataPlusChecksumLen + headerLen); + doReadFully(ch, in, curPacketBuf); + curPacketBuf.flip(); + + // Extract the header from the front of the buffer. + byte[] headerBuf = new byte[headerLen]; + curPacketBuf.get(headerBuf); + if (curHeader == null) { + curHeader = new PacketHeader(); + } + curHeader.setFieldsFromData(dataPlusChecksumLen, headerBuf); + + // Compute the sub-slices of the packet + int checksumLen = dataPlusChecksumLen - curHeader.getDataLen(); + if (checksumLen < 0) { + throw new IOException("Invalid packet: data length in packet header " + + "exceeds data length received. dataPlusChecksumLen=" + + dataPlusChecksumLen + " header: " + curHeader); + } + + reslicePacket(headerLen, checksumLen, curHeader.getDataLen()); + } + + /** + * Rewrite the last-read packet on the wire to the given output stream. + */ + public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException { + Preconditions.checkState(!useDirectBuffers, + "Currently only supported for non-direct buffers"); + assert lengthPrefixBuf.capacity() == PacketHeader.PKT_LENGTHS_LEN; + mirrorOut.write(lengthPrefixBuf.array(), + lengthPrefixBuf.arrayOffset(), + lengthPrefixBuf.capacity()); + mirrorOut.write(curPacketBuf.array(), + curPacketBuf.arrayOffset(), + curPacketBuf.remaining()); + } + + + private static void doReadFully(ReadableByteChannel ch, InputStream in, + ByteBuffer buf) throws IOException { + if (ch != null) { + readChannelFully(ch, buf); + } else { + Preconditions.checkState(!buf.isDirect(), + "Must not use direct buffers with InputStream API"); + IOUtils.readFully(in, buf.array(), + buf.arrayOffset() + buf.position(), + buf.remaining()); + buf.position(buf.position() + buf.remaining()); + } + } + + private void reslicePacket( + int headerLen, int checksumsLen, int dataLen) { + assert dataLen >= 0 : "invalid datalen: " + dataLen; + + assert curPacketBuf.position() == headerLen; + assert checksumsLen + dataLen == curPacketBuf.remaining() : + "headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen + + " rem=" + curPacketBuf.remaining(); + + curPacketBuf.position(headerLen); + curPacketBuf.limit(headerLen + checksumsLen); + curChecksumSlice = curPacketBuf.slice(); + + curPacketBuf.position(headerLen + checksumsLen); + curPacketBuf.limit(headerLen + checksumsLen + dataLen); + curDataSlice = curPacketBuf.slice(); + + curPacketBuf.position(0); + curPacketBuf.limit(headerLen + checksumsLen + dataLen); + } + + + private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf) + throws IOException { + while (buf.remaining() > 0) { + int n = ch.read(buf); + if (n < 0) { + throw new IOException("Premature EOF reading from " + ch); + } + } + } + + private void reallocPacketBuf(int atLeastCapacity) { + // Realloc the buffer if this packet is longer than the previous + // one. + if (curPacketBuf == null || + curPacketBuf.capacity() < atLeastCapacity) { + returnPacketBufToPool(); + if (useDirectBuffers) { + curPacketBuf = bufferPool.getBuffer(atLeastCapacity); + } else { + curPacketBuf = ByteBuffer.allocate(atLeastCapacity); + } + } + } + + private void returnPacketBufToPool() { + if (curPacketBuf != null && curPacketBuf.isDirect()) { + bufferPool.returnBuffer(curPacketBuf); + curPacketBuf = null; + } + } + + @Override // Closeable + public void close() { + returnPacketBufToPool(); + } + + @Override + protected void finalize() throws Throwable { + try { + // just in case it didn't get closed, we + // may as well still try to return the buffer + returnPacketBufToPool(); + } finally { + super.finalize(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 1b49824f82..fa9a014f49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -23,7 +23,6 @@ import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.EOFException; import java.io.FileDescriptor; import java.io.FileOutputStream; import java.io.IOException; @@ -34,12 +33,14 @@ import java.util.zip.Checksum; import org.apache.commons.logging.Log; +import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSOutputSummer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; @@ -77,9 +78,10 @@ class BlockReceiver implements Closeable { private DataOutputStream checksumOut = null; // to crc file at local disk private int bytesPerChecksum; private int checksumSize; - private ByteBuffer buf; // contains one full packet. - private int bufRead; //amount of valid data in the buf - private int maxPacketReadLen; + + private PacketReceiver packetReceiver = + new PacketReceiver(false); + protected final String inAddr; protected final String myAddr; private String mirrorAddr; @@ -248,6 +250,10 @@ class BlockReceiver implements Closeable { */ @Override public void close() throws IOException { + if (packetReceiver != null) { + packetReceiver.close(); + } + IOException ioe = null; if (syncOnClose && (out != null || checksumOut != null)) { datanode.metrics.incrFsyncCount(); @@ -365,33 +371,24 @@ private void handleMirrorOutError(IOException ioe) throws IOException { /** * Verify multiple CRC chunks. */ - private void verifyChunks( byte[] dataBuf, int dataOff, int len, - byte[] checksumBuf, int checksumOff ) - throws IOException { - while (len > 0) { - int chunkLen = Math.min(len, bytesPerChecksum); - - clientChecksum.update(dataBuf, dataOff, chunkLen); - - if (!clientChecksum.compare(checksumBuf, checksumOff)) { - if (srcDataNode != null) { - try { - LOG.info("report corrupt block " + block + " from datanode " + - srcDataNode + " to namenode"); - datanode.reportRemoteBadBlock(srcDataNode, block); - } catch (IOException e) { - LOG.warn("Failed to report bad block " + block + - " from datanode " + srcDataNode + " to namenode"); - } + private void verifyChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) + throws IOException { + try { + clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, clientname, 0); + } catch (ChecksumException ce) { + LOG.warn("Checksum error in block " + block + " from " + inAddr, ce); + if (srcDataNode != null) { + try { + LOG.info("report corrupt block " + block + " from datanode " + + srcDataNode + " to namenode"); + datanode.reportRemoteBadBlock(srcDataNode, block); + } catch (IOException e) { + LOG.warn("Failed to report bad block " + block + + " from datanode " + srcDataNode + " to namenode"); } - throw new IOException("Unexpected checksum mismatch " + - "while writing " + block + " from " + inAddr); } - - clientChecksum.reset(); - dataOff += chunkLen; - checksumOff += checksumSize; - len -= chunkLen; + throw new IOException("Unexpected checksum mismatch " + + "while writing " + block + " from " + inAddr); } } @@ -403,163 +400,24 @@ private void verifyChunks( byte[] dataBuf, int dataOff, int len, * This does not verify the original checksums, under the assumption * that they have already been validated. */ - private void translateChunks( byte[] dataBuf, int dataOff, int len, - byte[] checksumBuf, int checksumOff ) { - if (len == 0) return; - - int numChunks = (len - 1)/bytesPerChecksum + 1; - - diskChecksum.calculateChunkedSums( - ByteBuffer.wrap(dataBuf, dataOff, len), - ByteBuffer.wrap(checksumBuf, checksumOff, numChunks * checksumSize)); + private void translateChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) { + diskChecksum.calculateChunkedSums(dataBuf, checksumBuf); } - /** - * Makes sure buf.position() is zero without modifying buf.remaining(). - * It moves the data if position needs to be changed. - */ - private void shiftBufData() { - if (bufRead != buf.limit()) { - throw new IllegalStateException("bufRead should be same as " + - "buf.limit()"); - } - - //shift the remaining data on buf to the front - if (buf.position() > 0) { - int dataLeft = buf.remaining(); - if (dataLeft > 0) { - byte[] b = buf.array(); - System.arraycopy(b, buf.position(), b, 0, dataLeft); - } - buf.position(0); - bufRead = dataLeft; - buf.limit(bufRead); - } - } - - /** - * reads upto toRead byte to buf at buf.limit() and increments the limit. - * throws an IOException if read does not succeed. - */ - private int readToBuf(int toRead) throws IOException { - if (toRead < 0) { - toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity()) - - buf.limit(); - } - - int nRead = in.read(buf.array(), buf.limit(), toRead); - - if (nRead < 0) { - throw new EOFException("while trying to read " + toRead + " bytes"); - } - bufRead = buf.limit() + nRead; - buf.limit(bufRead); - return nRead; - } - - - /** - * Reads (at least) one packet and returns the packet length. - * buf.position() points to the start of the packet and - * buf.limit() point to the end of the packet. There could - * be more data from next packet in buf.

- * - * It tries to read a full packet with single read call. - * Consecutive packets are usually of the same length. - */ - private void readNextPacket() throws IOException { - /* This dances around buf a little bit, mainly to read - * full packet with single read and to accept arbitrary size - * for next packet at the same time. - */ - if (buf == null) { - /* initialize buffer to the best guess size: - * 'chunksPerPacket' calculation here should match the same - * calculation in DFSClient to make the guess accurate. - */ - int chunkSize = bytesPerChecksum + checksumSize; - int chunksPerPacket = (datanode.getDnConf().writePacketSize - PacketHeader.PKT_HEADER_LEN - + chunkSize - 1)/chunkSize; - buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN + - Math.max(chunksPerPacket, 1) * chunkSize); - buf.limit(0); - } - - // See if there is data left in the buffer : - if (bufRead > buf.limit()) { - buf.limit(bufRead); - } - - while (buf.remaining() < HdfsConstants.BYTES_IN_INTEGER) { - if (buf.position() > 0) { - shiftBufData(); - } - readToBuf(-1); - } - - /* We mostly have the full packet or at least enough for an int - */ - buf.mark(); - int payloadLen = buf.getInt(); - buf.reset(); - - // check corrupt values for pktLen, 100MB upper limit should be ok? - if (payloadLen < 0 || payloadLen > (100*1024*1024)) { - throw new IOException("Incorrect value for packet payload : " + - payloadLen); - } - - // Subtract BYTES_IN_INTEGER since that accounts for the payloadLen that - // we read above. - int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN - - HdfsConstants.BYTES_IN_INTEGER; - - if (buf.remaining() < pktSize) { - //we need to read more data - int toRead = pktSize - buf.remaining(); - - // first make sure buf has enough space. - int spaceLeft = buf.capacity() - buf.limit(); - if (toRead > spaceLeft && buf.position() > 0) { - shiftBufData(); - spaceLeft = buf.capacity() - buf.limit(); - } - if (toRead > spaceLeft) { - byte oldBuf[] = buf.array(); - int toCopy = buf.limit(); - buf = ByteBuffer.allocate(toCopy + toRead); - System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy); - buf.limit(toCopy); - } - - //now read: - while (toRead > 0) { - toRead -= readToBuf(toRead); - } - } - - if (buf.remaining() > pktSize) { - buf.limit(buf.position() + pktSize); - } - - if (pktSize > maxPacketReadLen) { - maxPacketReadLen = pktSize; - } - } - + /** * Receives and processes a packet. It can contain many chunks. * returns the number of data bytes that the packet has. */ private int receivePacket() throws IOException { // read the next packet - readNextPacket(); + packetReceiver.receiveNextPacket(in); - buf.mark(); - PacketHeader header = new PacketHeader(); - header.readFields(buf); - int endOfHeader = buf.position(); - buf.reset(); + PacketHeader header = packetReceiver.getHeader(); + if (LOG.isDebugEnabled()){ + LOG.debug("Receiving one packet for block " + block + + ": " + header); + } // Sanity check the header if (header.getOffsetInBlock() > replicaInfo.getNumBytes()) { @@ -574,38 +432,12 @@ private int receivePacket() throws IOException { header.getDataLen()); } - return receivePacket( - header.getOffsetInBlock(), - header.getSeqno(), - header.isLastPacketInBlock(), - header.getDataLen(), - header.getSyncBlock(), - endOfHeader); - } + long offsetInBlock = header.getOffsetInBlock(); + long seqno = header.getSeqno(); + boolean lastPacketInBlock = header.isLastPacketInBlock(); + int len = header.getDataLen(); + boolean syncBlock = header.getSyncBlock(); - /** - * Write the received packet to disk (data only) - */ - private void writePacketToDisk(byte[] pktBuf, int startByteToDisk, - int numBytesToDisk) throws IOException { - out.write(pktBuf, startByteToDisk, numBytesToDisk); - } - - /** - * Receives and processes a packet. It can contain many chunks. - * returns the number of data bytes that the packet has. - */ - private int receivePacket(long offsetInBlock, long seqno, - boolean lastPacketInBlock, int len, boolean syncBlock, - int endOfHeader) throws IOException { - if (LOG.isDebugEnabled()){ - LOG.debug("Receiving one packet for block " + block + - " of length " + len + - " seqno " + seqno + - " offsetInBlock " + offsetInBlock + - " syncBlock " + syncBlock + - " lastPacketInBlock " + lastPacketInBlock); - } // make sure the block gets sync'ed upon close this.syncOnClose |= syncBlock && lastPacketInBlock; @@ -625,14 +457,15 @@ private int receivePacket(long offsetInBlock, long seqno, //First write the packet to the mirror: if (mirrorOut != null && !mirrorError) { try { - mirrorOut.write(buf.array(), buf.position(), buf.remaining()); + packetReceiver.mirrorPacketTo(mirrorOut); mirrorOut.flush(); } catch (IOException e) { handleMirrorOutError(e); } } - buf.position(endOfHeader); + ByteBuffer dataBuf = packetReceiver.getDataSlice(); + ByteBuffer checksumBuf = packetReceiver.getChecksumSlice(); if (lastPacketInBlock || len == 0) { if(LOG.isDebugEnabled()) { @@ -646,18 +479,11 @@ private int receivePacket(long offsetInBlock, long seqno, int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)* checksumSize; - if ( buf.remaining() != (checksumLen + len)) { - throw new IOException("Data remaining in packet does not match" + - "sum of checksumLen and dataLen " + - " size remaining: " + buf.remaining() + - " data len: " + len + - " checksum Len: " + checksumLen); - } - int checksumOff = buf.position(); - int dataOff = checksumOff + checksumLen; - byte pktBuf[] = buf.array(); - - buf.position(buf.limit()); // move to the end of the data. + if ( checksumBuf.capacity() != checksumLen) { + throw new IOException("Length of checksums in packet " + + checksumBuf.capacity() + " does not match calculated checksum " + + "length " + checksumLen); + } /* skip verifying checksum iff this is not the last one in the * pipeline and clientName is non-null. i.e. Checksum is verified @@ -667,11 +493,11 @@ private int receivePacket(long offsetInBlock, long seqno, * checksum. */ if (mirrorOut == null || isDatanode || needsChecksumTranslation) { - verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff); + verifyChunks(dataBuf, checksumBuf); if (needsChecksumTranslation) { // overwrite the checksums in the packet buffer with the // appropriate polynomial for the disk storage. - translateChunks(pktBuf, dataOff, len, pktBuf, checksumOff); + translateChunks(dataBuf, checksumBuf); } } @@ -700,9 +526,13 @@ private int receivePacket(long offsetInBlock, long seqno, computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum); } - int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock); + int startByteToDisk = (int)(onDiskLen-firstByteInBlock) + + dataBuf.arrayOffset() + dataBuf.position(); + int numBytesToDisk = (int)(offsetInBlock-onDiskLen); - writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk); + + // Write data to disk. + out.write(dataBuf.array(), startByteToDisk, numBytesToDisk); // If this is a partial chunk, then verify that this is the only // chunk in the packet. Calculate new crc for this chunk. @@ -714,7 +544,7 @@ private int receivePacket(long offsetInBlock, long seqno, " len = " + len + " bytesPerChecksum " + bytesPerChecksum); } - partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk); + partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk); byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize); lastChunkChecksum = Arrays.copyOfRange( buf, buf.length - checksumSize, buf.length @@ -726,11 +556,12 @@ private int receivePacket(long offsetInBlock, long seqno, partialCrc = null; } else { lastChunkChecksum = Arrays.copyOfRange( - pktBuf, - checksumOff + checksumLen - checksumSize, - checksumOff + checksumLen - ); - checksumOut.write(pktBuf, checksumOff, checksumLen); + checksumBuf.array(), + checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen - checksumSize, + checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen); + checksumOut.write(checksumBuf.array(), + checksumBuf.arrayOffset() + checksumBuf.position(), + checksumLen); } /// flush entire packet, sync unless close() will sync flushOrSync(syncBlock && !lastPacketInBlock); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index d8cba72003..6d38febab9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -62,40 +62,29 @@ * * An empty packet is sent to mark the end of block and read completion. * - * PACKET Contains a packet header, checksum and data. Amount of data - * carried is set by BUFFER_SIZE. - *
- *    +-----------------------------------------------------+
- *    | 4 byte packet length (excluding packet header)      |
- *    +-----------------------------------------------------+
- *    | 8 byte offset in the block | 8 byte sequence number |
- *    +-----------------------------------------------------+
- *    | 1 byte isLastPacketInBlock                          |
- *    +-----------------------------------------------------+
- *    | 4 byte Length of actual data                        |
- *    +-----------------------------------------------------+
- *    | x byte checksum data. x is defined below            |
- *    +-----------------------------------------------------+
- *    | actual data ......                                  |
- *    +-----------------------------------------------------+
- *    
- *    Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
- *    A checksum is calculated for each chunk.
- *    
- *    x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
- *        CHECKSUM_SIZE
- *        
- *    CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) 
- *    
+ * PACKET Contains a packet header, checksum and data. Amount of data + * carried is set by BUFFER_SIZE. + *
+ *   +-----------------------------------------------------+
+ *   | Variable length header. See {@link PacketHeader}    |
+ *   +-----------------------------------------------------+
+ *   | x byte checksum data. x is defined below            |
+ *   +-----------------------------------------------------+
+ *   | actual data ......                                  |
+ *   +-----------------------------------------------------+
+ * 
+ *   Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
+ *   A checksum is calculated for each chunk.
+ *  
+ *   x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
+ *       CHECKSUM_SIZE
+ *  
+ *   CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) 
+ *  
* * The client reads data until it receives a packet with * "LastPacketInBlock" set to true or with a zero length. If there is - * no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK: - *
- *    +------------------------------+
- *    | 2 byte OP_STATUS_CHECKSUM_OK |
- *    +------------------------------+
- *  
+ * no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK. */ class BlockSender implements java.io.Closeable { static final Log LOG = DataNode.LOG; @@ -448,8 +437,22 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out, int packetLen = dataLen + checksumDataLen + 4; boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0; - writePacketHeader(pkt, dataLen, packetLen); - + // The packet buffer is organized as follows: + // _______HHHHCCCCD?D?D?D? + // ^ ^ + // | \ checksumOff + // \ headerOff + // _ padding, since the header is variable-length + // H = header and length prefixes + // C = checksums + // D? = data, if transferTo is false. + + int headerLen = writePacketHeader(pkt, dataLen, packetLen); + + // Per above, the header doesn't start at the beginning of the + // buffer + int headerOff = pkt.position() - headerLen; + int checksumOff = pkt.position(); byte[] buf = pkt.array(); @@ -479,7 +482,8 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out, try { if (transferTo) { SocketOutputStream sockOut = (SocketOutputStream)out; - sockOut.write(buf, 0, dataOff); // First write checksum + // First write header and checksums + sockOut.write(buf, headerOff, dataOff - headerOff); // no need to flush since we know out is not a buffered stream FileChannel fileCh = ((FileInputStream)blockIn).getChannel(); @@ -492,7 +496,7 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out, blockInPosition += dataLen; } else { // normal transfer - out.write(buf, 0, dataOff + dataLen); + out.write(buf, headerOff, dataOff + dataLen - headerOff); } } catch (IOException e) { if (e instanceof SocketTimeoutException) { @@ -625,7 +629,7 @@ long sendBlock(DataOutputStream out, OutputStream baseStream, final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; try { int maxChunksPerPacket; - int pktSize = PacketHeader.PKT_HEADER_LEN; + int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN; boolean transferTo = transferToAllowed && !verifyChecksum && baseStream instanceof SocketOutputStream && blockIn instanceof FileInputStream; @@ -636,15 +640,15 @@ long sendBlock(DataOutputStream out, OutputStream baseStream, maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE); // Smaller packet size to only hold checksum when doing transferTo - pktSize += checksumSize * maxChunksPerPacket; + pktBufSize += checksumSize * maxChunksPerPacket; } else { maxChunksPerPacket = Math.max(1, numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE)); // Packet size includes both checksum and data - pktSize += (chunkSize + checksumSize) * maxChunksPerPacket; + pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket; } - ByteBuffer pktBuf = ByteBuffer.allocate(pktSize); + ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize); while (endOffset > offset) { manageOsCache(); @@ -714,14 +718,19 @@ private boolean isLongRead() { } /** - * Write packet header into {@code pkt} + * Write packet header into {@code pkt}, + * return the length of the header written. */ - private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) { + private int writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) { pkt.clear(); // both syncBlock and syncPacket are false PacketHeader header = new PacketHeader(packetLen, offset, seqno, (dataLen == 0), dataLen, false); + + int size = header.getSerializedSize(); + pkt.position(PacketHeader.PKT_MAX_HEADER_LEN - size); header.putInBuffer(pkt); + return size; } boolean didSendEntireByteRange() {