HDFS-7855. Separate class Packet from DFSOutputStream. Contributed by Li Bo.
This commit is contained in:
parent
138c9cadee
commit
952640fa4c
@ -715,6 +715,8 @@ Release 2.7.0 - UNRELEASED
|
||||
HADOOP-11648. Set DomainSocketWatcher thread name explicitly.
|
||||
(Liang Xie via ozawa)
|
||||
|
||||
HDFS-7855. Separate class Packet from DFSOutputStream. (Li Bo bia jing9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
||||
|
238
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
Normal file → Executable file
238
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
Normal file → Executable file
@ -30,7 +30,6 @@ import java.io.OutputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.nio.BufferOverflowException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@ -79,7 +78,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
||||
@ -160,9 +158,9 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
private final int bytesPerChecksum;
|
||||
|
||||
// both dataQueue and ackQueue are protected by dataQueue lock
|
||||
private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
|
||||
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
|
||||
private Packet currentPacket = null;
|
||||
private final LinkedList<DFSPacket> dataQueue = new LinkedList<DFSPacket>();
|
||||
private final LinkedList<DFSPacket> ackQueue = new LinkedList<DFSPacket>();
|
||||
private DFSPacket currentPacket = null;
|
||||
private DataStreamer streamer;
|
||||
private long currentSeqno = 0;
|
||||
private long lastQueuedSeqno = -1;
|
||||
@ -187,8 +185,8 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
BlockStoragePolicySuite.createDefaultSuite();
|
||||
|
||||
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
|
||||
private Packet createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
|
||||
long seqno) throws InterruptedIOException {
|
||||
private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
|
||||
long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
|
||||
final byte[] buf;
|
||||
final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
|
||||
|
||||
@ -201,171 +199,20 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
throw iioe;
|
||||
}
|
||||
|
||||
return new Packet(buf, chunksPerPkt, offsetInBlock, seqno, getChecksumSize());
|
||||
return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno,
|
||||
getChecksumSize(), lastPacketInBlock);
|
||||
}
|
||||
|
||||
/**
|
||||
* For heartbeat packets, create buffer directly by new byte[]
|
||||
* since heartbeats should not be blocked.
|
||||
*/
|
||||
private Packet createHeartbeatPacket() throws InterruptedIOException {
|
||||
private DFSPacket createHeartbeatPacket() throws InterruptedIOException {
|
||||
final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
|
||||
return new Packet(buf, 0, 0, Packet.HEART_BEAT_SEQNO, getChecksumSize());
|
||||
return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO,
|
||||
getChecksumSize(), false);
|
||||
}
|
||||
|
||||
private static class Packet {
|
||||
private static final long HEART_BEAT_SEQNO = -1L;
|
||||
final long seqno; // sequencenumber of buffer in block
|
||||
final long offsetInBlock; // offset in block
|
||||
boolean syncBlock; // this packet forces the current block to disk
|
||||
int numChunks; // number of chunks currently in packet
|
||||
final int maxChunks; // max chunks in packet
|
||||
private byte[] buf;
|
||||
private boolean lastPacketInBlock; // is this the last packet in block?
|
||||
|
||||
/**
|
||||
* buf is pointed into like follows:
|
||||
* (C is checksum data, D is payload data)
|
||||
*
|
||||
* [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
|
||||
* ^ ^ ^ ^
|
||||
* | checksumPos dataStart dataPos
|
||||
* checksumStart
|
||||
*
|
||||
* Right before sending, we move the checksum data to immediately precede
|
||||
* the actual data, and then insert the header into the buffer immediately
|
||||
* preceding the checksum data, so we make sure to keep enough space in
|
||||
* front of the checksum data to support the largest conceivable header.
|
||||
*/
|
||||
int checksumStart;
|
||||
int checksumPos;
|
||||
final int dataStart;
|
||||
int dataPos;
|
||||
|
||||
/**
|
||||
* Create a new packet.
|
||||
*
|
||||
* @param chunksPerPkt maximum number of chunks per packet.
|
||||
* @param offsetInBlock offset in bytes into the HDFS block.
|
||||
*/
|
||||
private Packet(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
|
||||
int checksumSize) {
|
||||
this.lastPacketInBlock = false;
|
||||
this.numChunks = 0;
|
||||
this.offsetInBlock = offsetInBlock;
|
||||
this.seqno = seqno;
|
||||
|
||||
this.buf = buf;
|
||||
|
||||
checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
|
||||
checksumPos = checksumStart;
|
||||
dataStart = checksumStart + (chunksPerPkt * checksumSize);
|
||||
dataPos = dataStart;
|
||||
maxChunks = chunksPerPkt;
|
||||
}
|
||||
|
||||
synchronized void writeData(byte[] inarray, int off, int len)
|
||||
throws ClosedChannelException {
|
||||
checkBuffer();
|
||||
if (dataPos + len > buf.length) {
|
||||
throw new BufferOverflowException();
|
||||
}
|
||||
System.arraycopy(inarray, off, buf, dataPos, len);
|
||||
dataPos += len;
|
||||
}
|
||||
|
||||
synchronized void writeChecksum(byte[] inarray, int off, int len)
|
||||
throws ClosedChannelException {
|
||||
checkBuffer();
|
||||
if (len == 0) {
|
||||
return;
|
||||
}
|
||||
if (checksumPos + len > dataStart) {
|
||||
throw new BufferOverflowException();
|
||||
}
|
||||
System.arraycopy(inarray, off, buf, checksumPos, len);
|
||||
checksumPos += len;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write the full packet, including the header, to the given output stream.
|
||||
*/
|
||||
synchronized void writeTo(DataOutputStream stm) throws IOException {
|
||||
checkBuffer();
|
||||
|
||||
final int dataLen = dataPos - dataStart;
|
||||
final int checksumLen = checksumPos - checksumStart;
|
||||
final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
|
||||
|
||||
PacketHeader header = new PacketHeader(
|
||||
pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
|
||||
|
||||
if (checksumPos != dataStart) {
|
||||
// Move the checksum to cover the gap. This can happen for the last
|
||||
// packet or during an hflush/hsync call.
|
||||
System.arraycopy(buf, checksumStart, buf,
|
||||
dataStart - checksumLen , checksumLen);
|
||||
checksumPos = dataStart;
|
||||
checksumStart = checksumPos - checksumLen;
|
||||
}
|
||||
|
||||
final int headerStart = checksumStart - header.getSerializedSize();
|
||||
assert checksumStart + 1 >= header.getSerializedSize();
|
||||
assert checksumPos == dataStart;
|
||||
assert headerStart >= 0;
|
||||
assert headerStart + header.getSerializedSize() == checksumStart;
|
||||
|
||||
// Copy the header data into the buffer immediately preceding the checksum
|
||||
// data.
|
||||
System.arraycopy(header.getBytes(), 0, buf, headerStart,
|
||||
header.getSerializedSize());
|
||||
|
||||
// corrupt the data for testing.
|
||||
if (DFSClientFaultInjector.get().corruptPacket()) {
|
||||
buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
|
||||
}
|
||||
|
||||
// Write the now contiguous full packet to the output stream.
|
||||
stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
|
||||
|
||||
// undo corruption.
|
||||
if (DFSClientFaultInjector.get().uncorruptPacket()) {
|
||||
buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void checkBuffer() throws ClosedChannelException {
|
||||
if (buf == null) {
|
||||
throw new ClosedChannelException();
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void releaseBuffer(ByteArrayManager bam) {
|
||||
bam.release(buf);
|
||||
buf = null;
|
||||
}
|
||||
|
||||
// get the packet's last byte's offset in the block
|
||||
synchronized long getLastByteOffsetBlock() {
|
||||
return offsetInBlock + dataPos - dataStart;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if this packet is a heart beat packet
|
||||
* @return true if the sequence number is HEART_BEAT_SEQNO
|
||||
*/
|
||||
private boolean isHeartbeatPacket() {
|
||||
return seqno == HEART_BEAT_SEQNO;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "packet seqno: " + this.seqno +
|
||||
" offsetInBlock: " + this.offsetInBlock +
|
||||
" lastPacketInBlock: " + this.lastPacketInBlock +
|
||||
" lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// The DataStreamer class is responsible for sending data packets to the
|
||||
@ -556,7 +403,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
}
|
||||
}
|
||||
|
||||
Packet one;
|
||||
DFSPacket one;
|
||||
try {
|
||||
// process datanode IO errors if any
|
||||
boolean doSleep = false;
|
||||
@ -620,7 +467,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
" Aborting file " + src);
|
||||
}
|
||||
|
||||
if (one.lastPacketInBlock) {
|
||||
if (one.isLastPacketInBlock()) {
|
||||
// wait for all data packets have been successfully acked
|
||||
synchronized (dataQueue) {
|
||||
while (!streamerClosed && !hasError &&
|
||||
@ -681,7 +528,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
}
|
||||
|
||||
// Is this block full?
|
||||
if (one.lastPacketInBlock) {
|
||||
if (one.isLastPacketInBlock()) {
|
||||
// wait for the close packet has been acked
|
||||
synchronized (dataQueue) {
|
||||
while (!streamerClosed && !hasError &&
|
||||
@ -883,7 +730,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
ack.readFields(blockReplyStream);
|
||||
long duration = Time.monotonicNow() - begin;
|
||||
if (duration > dfsclientSlowLogThresholdMs
|
||||
&& ack.getSeqno() != Packet.HEART_BEAT_SEQNO) {
|
||||
&& ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
|
||||
DFSClient.LOG
|
||||
.warn("Slow ReadProcessor read fields took " + duration
|
||||
+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
|
||||
@ -920,21 +767,21 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
|
||||
assert seqno != PipelineAck.UNKOWN_SEQNO :
|
||||
"Ack for unknown seqno should be a failed ack: " + ack;
|
||||
if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack
|
||||
if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack
|
||||
continue;
|
||||
}
|
||||
|
||||
// a success ack for a data packet
|
||||
Packet one;
|
||||
DFSPacket one;
|
||||
synchronized (dataQueue) {
|
||||
one = ackQueue.getFirst();
|
||||
}
|
||||
if (one.seqno != seqno) {
|
||||
if (one.getSeqno() != seqno) {
|
||||
throw new IOException("ResponseProcessor: Expecting seqno " +
|
||||
" for block " + block +
|
||||
one.seqno + " but received " + seqno);
|
||||
one.getSeqno() + " but received " + seqno);
|
||||
}
|
||||
isLastPacketInBlock = one.lastPacketInBlock;
|
||||
isLastPacketInBlock = one.isLastPacketInBlock();
|
||||
|
||||
// Fail the packet write for testing in order to force a
|
||||
// pipeline recovery.
|
||||
@ -1032,10 +879,10 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
// We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
|
||||
// a client waiting on close() will be aware that the flush finished.
|
||||
synchronized (dataQueue) {
|
||||
Packet endOfBlockPacket = dataQueue.remove(); // remove the end of block packet
|
||||
assert endOfBlockPacket.lastPacketInBlock;
|
||||
assert lastAckedSeqno == endOfBlockPacket.seqno - 1;
|
||||
lastAckedSeqno = endOfBlockPacket.seqno;
|
||||
DFSPacket endOfBlockPacket = dataQueue.remove(); // remove the end of block packet
|
||||
assert endOfBlockPacket.isLastPacketInBlock();
|
||||
assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
|
||||
lastAckedSeqno = endOfBlockPacket.getSeqno();
|
||||
dataQueue.notifyAll();
|
||||
}
|
||||
endBlock();
|
||||
@ -1862,9 +1709,9 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
synchronized (dataQueue) {
|
||||
if (currentPacket == null) return;
|
||||
dataQueue.addLast(currentPacket);
|
||||
lastQueuedSeqno = currentPacket.seqno;
|
||||
lastQueuedSeqno = currentPacket.getSeqno();
|
||||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug("Queued packet " + currentPacket.seqno);
|
||||
DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno());
|
||||
}
|
||||
currentPacket = null;
|
||||
dataQueue.notifyAll();
|
||||
@ -1916,10 +1763,10 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
|
||||
if (currentPacket == null) {
|
||||
currentPacket = createPacket(packetSize, chunksPerPacket,
|
||||
bytesCurBlock, currentSeqno++);
|
||||
bytesCurBlock, currentSeqno++, false);
|
||||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
|
||||
currentPacket.seqno +
|
||||
currentPacket.getSeqno() +
|
||||
", src=" + src +
|
||||
", packetSize=" + packetSize +
|
||||
", chunksPerPacket=" + chunksPerPacket +
|
||||
@ -1929,16 +1776,16 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
|
||||
currentPacket.writeChecksum(checksum, ckoff, cklen);
|
||||
currentPacket.writeData(b, offset, len);
|
||||
currentPacket.numChunks++;
|
||||
currentPacket.incNumChunks();
|
||||
bytesCurBlock += len;
|
||||
|
||||
// If packet is full, enqueue it for transmission
|
||||
//
|
||||
if (currentPacket.numChunks == currentPacket.maxChunks ||
|
||||
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
|
||||
bytesCurBlock == blockSize) {
|
||||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
|
||||
currentPacket.seqno +
|
||||
currentPacket.getSeqno() +
|
||||
", src=" + src +
|
||||
", bytesCurBlock=" + bytesCurBlock +
|
||||
", blockSize=" + blockSize +
|
||||
@ -1963,9 +1810,8 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
// indicate the end of block and reset bytesCurBlock.
|
||||
//
|
||||
if (bytesCurBlock == blockSize) {
|
||||
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
|
||||
currentPacket.lastPacketInBlock = true;
|
||||
currentPacket.syncBlock = shouldSyncBlock;
|
||||
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
|
||||
currentPacket.setSyncBlock(shouldSyncBlock);
|
||||
waitAndQueueCurrentPacket();
|
||||
bytesCurBlock = 0;
|
||||
lastFlushOffset = 0;
|
||||
@ -2053,7 +1899,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
// but sync was requested.
|
||||
// Send an empty packet if we do not end the block right now
|
||||
currentPacket = createPacket(packetSize, chunksPerPacket,
|
||||
bytesCurBlock, currentSeqno++);
|
||||
bytesCurBlock, currentSeqno++, false);
|
||||
}
|
||||
} else {
|
||||
if (isSync && bytesCurBlock > 0 && !endBlock) {
|
||||
@ -2062,7 +1908,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
// and sync was requested.
|
||||
// So send an empty sync packet if we do not end the block right now
|
||||
currentPacket = createPacket(packetSize, chunksPerPacket,
|
||||
bytesCurBlock, currentSeqno++);
|
||||
bytesCurBlock, currentSeqno++, false);
|
||||
} else if (currentPacket != null) {
|
||||
// just discard the current packet since it is already been sent.
|
||||
currentPacket.releaseBuffer(byteArrayManager);
|
||||
@ -2070,15 +1916,14 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
}
|
||||
}
|
||||
if (currentPacket != null) {
|
||||
currentPacket.syncBlock = isSync;
|
||||
currentPacket.setSyncBlock(isSync);
|
||||
waitAndQueueCurrentPacket();
|
||||
}
|
||||
if (endBlock && bytesCurBlock > 0) {
|
||||
// Need to end the current block, thus send an empty packet to
|
||||
// indicate this is the end of the block and reset bytesCurBlock
|
||||
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
|
||||
currentPacket.lastPacketInBlock = true;
|
||||
currentPacket.syncBlock = shouldSyncBlock || isSync;
|
||||
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
|
||||
currentPacket.setSyncBlock(shouldSyncBlock || isSync);
|
||||
waitAndQueueCurrentPacket();
|
||||
bytesCurBlock = 0;
|
||||
lastFlushOffset = 0;
|
||||
@ -2249,8 +2094,8 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
}
|
||||
}
|
||||
|
||||
private static void releaseBuffer(List<Packet> packets, ByteArrayManager bam) {
|
||||
for(Packet p : packets) {
|
||||
private static void releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam) {
|
||||
for (DFSPacket p : packets) {
|
||||
p.releaseBuffer(bam);
|
||||
}
|
||||
packets.clear();
|
||||
@ -2297,9 +2142,8 @@ public class DFSOutputStream extends FSOutputSummer
|
||||
|
||||
if (bytesCurBlock != 0) {
|
||||
// send an empty packet to mark the end of the block
|
||||
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
|
||||
currentPacket.lastPacketInBlock = true;
|
||||
currentPacket.syncBlock = shouldSyncBlock;
|
||||
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
|
||||
currentPacket.setSyncBlock(shouldSyncBlock);
|
||||
}
|
||||
|
||||
flushInternal(); // flush all data to Datanodes
|
||||
|
@ -0,0 +1,270 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.BufferOverflowException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
||||
import org.apache.hadoop.hdfs.util.ByteArrayManager;
|
||||
|
||||
/****************************************************************
|
||||
* DFSPacket is used by DataStreamer and DFSOutputStream.
|
||||
* DFSOutputStream generates packets and then ask DatStreamer
|
||||
* to send them to datanodes.
|
||||
****************************************************************/
|
||||
|
||||
class DFSPacket {
|
||||
public static final long HEART_BEAT_SEQNO = -1L;
|
||||
private final long seqno; // sequence number of buffer in block
|
||||
private final long offsetInBlock; // offset in block
|
||||
private boolean syncBlock; // this packet forces the current block to disk
|
||||
private int numChunks; // number of chunks currently in packet
|
||||
private final int maxChunks; // max chunks in packet
|
||||
private byte[] buf;
|
||||
private final boolean lastPacketInBlock; // is this the last packet in block?
|
||||
|
||||
/**
|
||||
* buf is pointed into like follows:
|
||||
* (C is checksum data, D is payload data)
|
||||
*
|
||||
* [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
|
||||
* ^ ^ ^ ^
|
||||
* | checksumPos dataStart dataPos
|
||||
* checksumStart
|
||||
*
|
||||
* Right before sending, we move the checksum data to immediately precede
|
||||
* the actual data, and then insert the header into the buffer immediately
|
||||
* preceding the checksum data, so we make sure to keep enough space in
|
||||
* front of the checksum data to support the largest conceivable header.
|
||||
*/
|
||||
private int checksumStart;
|
||||
private int checksumPos;
|
||||
private final int dataStart;
|
||||
private int dataPos;
|
||||
|
||||
/**
|
||||
* Create a new packet.
|
||||
*
|
||||
* @param buf the buffer storing data and checksums
|
||||
* @param chunksPerPkt maximum number of chunks per packet.
|
||||
* @param offsetInBlock offset in bytes into the HDFS block.
|
||||
* @param seqno the sequence number of this packet
|
||||
* @param checksumSize the size of checksum
|
||||
* @param lastPacketInBlock if this is the last packet
|
||||
*/
|
||||
DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
|
||||
int checksumSize, boolean lastPacketInBlock) {
|
||||
this.lastPacketInBlock = lastPacketInBlock;
|
||||
this.numChunks = 0;
|
||||
this.offsetInBlock = offsetInBlock;
|
||||
this.seqno = seqno;
|
||||
|
||||
this.buf = buf;
|
||||
|
||||
checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
|
||||
checksumPos = checksumStart;
|
||||
dataStart = checksumStart + (chunksPerPkt * checksumSize);
|
||||
dataPos = dataStart;
|
||||
maxChunks = chunksPerPkt;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write data to this packet.
|
||||
*
|
||||
* @param inarray input array of data
|
||||
* @param off the offset of data to write
|
||||
* @param len the length of data to write
|
||||
* @throws ClosedChannelException
|
||||
*/
|
||||
synchronized void writeData(byte[] inarray, int off, int len)
|
||||
throws ClosedChannelException {
|
||||
checkBuffer();
|
||||
if (dataPos + len > buf.length) {
|
||||
throw new BufferOverflowException();
|
||||
}
|
||||
System.arraycopy(inarray, off, buf, dataPos, len);
|
||||
dataPos += len;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write checksums to this packet
|
||||
*
|
||||
* @param inarray input array of checksums
|
||||
* @param off the offset of checksums to write
|
||||
* @param len the length of checksums to write
|
||||
* @throws ClosedChannelException
|
||||
*/
|
||||
synchronized void writeChecksum(byte[] inarray, int off, int len)
|
||||
throws ClosedChannelException {
|
||||
checkBuffer();
|
||||
if (len == 0) {
|
||||
return;
|
||||
}
|
||||
if (checksumPos + len > dataStart) {
|
||||
throw new BufferOverflowException();
|
||||
}
|
||||
System.arraycopy(inarray, off, buf, checksumPos, len);
|
||||
checksumPos += len;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write the full packet, including the header, to the given output stream.
|
||||
*
|
||||
* @param stm
|
||||
* @throws IOException
|
||||
*/
|
||||
synchronized void writeTo(DataOutputStream stm) throws IOException {
|
||||
checkBuffer();
|
||||
|
||||
final int dataLen = dataPos - dataStart;
|
||||
final int checksumLen = checksumPos - checksumStart;
|
||||
final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
|
||||
|
||||
PacketHeader header = new PacketHeader(
|
||||
pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
|
||||
|
||||
if (checksumPos != dataStart) {
|
||||
// Move the checksum to cover the gap. This can happen for the last
|
||||
// packet or during an hflush/hsync call.
|
||||
System.arraycopy(buf, checksumStart, buf,
|
||||
dataStart - checksumLen , checksumLen);
|
||||
checksumPos = dataStart;
|
||||
checksumStart = checksumPos - checksumLen;
|
||||
}
|
||||
|
||||
final int headerStart = checksumStart - header.getSerializedSize();
|
||||
assert checksumStart + 1 >= header.getSerializedSize();
|
||||
assert headerStart >= 0;
|
||||
assert headerStart + header.getSerializedSize() == checksumStart;
|
||||
|
||||
// Copy the header data into the buffer immediately preceding the checksum
|
||||
// data.
|
||||
System.arraycopy(header.getBytes(), 0, buf, headerStart,
|
||||
header.getSerializedSize());
|
||||
|
||||
// corrupt the data for testing.
|
||||
if (DFSClientFaultInjector.get().corruptPacket()) {
|
||||
buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
|
||||
}
|
||||
|
||||
// Write the now contiguous full packet to the output stream.
|
||||
stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
|
||||
|
||||
// undo corruption.
|
||||
if (DFSClientFaultInjector.get().uncorruptPacket()) {
|
||||
buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void checkBuffer() throws ClosedChannelException {
|
||||
if (buf == null) {
|
||||
throw new ClosedChannelException();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Release the buffer in this packet to ByteArrayManager.
|
||||
*
|
||||
* @param bam
|
||||
*/
|
||||
synchronized void releaseBuffer(ByteArrayManager bam) {
|
||||
bam.release(buf);
|
||||
buf = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* get the packet's last byte's offset in the block
|
||||
*
|
||||
* @return the packet's last byte's offset in the block
|
||||
*/
|
||||
synchronized long getLastByteOffsetBlock() {
|
||||
return offsetInBlock + dataPos - dataStart;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if this packet is a heart beat packet
|
||||
*
|
||||
* @return true if the sequence number is HEART_BEAT_SEQNO
|
||||
*/
|
||||
boolean isHeartbeatPacket() {
|
||||
return seqno == HEART_BEAT_SEQNO;
|
||||
}
|
||||
|
||||
/**
|
||||
* check if this packet is the last packet in block
|
||||
*
|
||||
* @return true if the packet is the last packet
|
||||
*/
|
||||
boolean isLastPacketInBlock(){
|
||||
return lastPacketInBlock;
|
||||
}
|
||||
|
||||
/**
|
||||
* get sequence number of this packet
|
||||
*
|
||||
* @return the sequence number of this packet
|
||||
*/
|
||||
long getSeqno(){
|
||||
return seqno;
|
||||
}
|
||||
|
||||
/**
|
||||
* get the number of chunks this packet contains
|
||||
*
|
||||
* @return the number of chunks in this packet
|
||||
*/
|
||||
synchronized int getNumChunks(){
|
||||
return numChunks;
|
||||
}
|
||||
|
||||
/**
|
||||
* increase the number of chunks by one
|
||||
*/
|
||||
synchronized void incNumChunks(){
|
||||
numChunks++;
|
||||
}
|
||||
|
||||
/**
|
||||
* get the maximum number of packets
|
||||
*
|
||||
* @return the maximum number of packets
|
||||
*/
|
||||
int getMaxChunks(){
|
||||
return maxChunks;
|
||||
}
|
||||
|
||||
/**
|
||||
* set if to sync block
|
||||
*
|
||||
* @param syncBlock if to sync block
|
||||
*/
|
||||
synchronized void setSyncBlock(boolean syncBlock){
|
||||
this.syncBlock = syncBlock;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "packet seqno: " + this.seqno +
|
||||
" offsetInBlock: " + this.offsetInBlock +
|
||||
" lastPacketInBlock: " + this.lastPacketInBlock +
|
||||
" lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
|
||||
}
|
||||
}
|
@ -0,0 +1,68 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.util.Random;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestDFSPacket {
|
||||
private static final int chunkSize = 512;
|
||||
private static final int checksumSize = 4;
|
||||
private static final int maxChunksPerPacket = 4;
|
||||
|
||||
@Test
|
||||
public void testPacket() throws Exception {
|
||||
Random r = new Random(12345L);
|
||||
byte[] data = new byte[chunkSize];
|
||||
r.nextBytes(data);
|
||||
byte[] checksum = new byte[checksumSize];
|
||||
r.nextBytes(checksum);
|
||||
|
||||
DataOutputBuffer os = new DataOutputBuffer(data.length * 2);
|
||||
|
||||
byte[] packetBuf = new byte[data.length * 2];
|
||||
DFSPacket p = new DFSPacket(packetBuf, maxChunksPerPacket,
|
||||
0, 0, checksumSize, false);
|
||||
p.setSyncBlock(true);
|
||||
p.writeData(data, 0, data.length);
|
||||
p.writeChecksum(checksum, 0, checksum.length);
|
||||
p.writeTo(os);
|
||||
|
||||
//we have set syncBlock to true, so the header has the maximum length
|
||||
int headerLen = PacketHeader.PKT_MAX_HEADER_LEN;
|
||||
byte[] readBuf = os.getData();
|
||||
|
||||
assertArrayRegionsEqual(readBuf, headerLen, checksum, 0, checksum.length);
|
||||
assertArrayRegionsEqual(readBuf, headerLen + checksum.length, data, 0, data.length);
|
||||
|
||||
}
|
||||
|
||||
public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
|
||||
int off2, int len) {
|
||||
for (int i = 0; i < len; i++) {
|
||||
if (buf1[off1 + i] != buf2[off2 + i]) {
|
||||
Assert.fail("arrays differ at byte " + i + ". " +
|
||||
"The first array has " + (int) buf1[off1 + i] +
|
||||
", but the second array has " + (int) buf2[off2 + i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user