HDFS-7888. Change DFSOutputStream and DataStreamer for convenience of subclassing. Contributed by Li Bo
This commit is contained in:
parent
867d5d2675
commit
9ed43f2189
@ -376,6 +376,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HDFS-7978. Add LOG.isDebugEnabled() guard for some LOG.debug(..).
|
HDFS-7978. Add LOG.isDebugEnabled() guard for some LOG.debug(..).
|
||||||
(Walter Su via wang)
|
(Walter Su via wang)
|
||||||
|
|
||||||
|
HDFS-7888. Change DFSOutputStream and DataStreamer for convenience of
|
||||||
|
subclassing. (Li Bo via szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
@ -20,7 +20,6 @@
|
|||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.net.Socket;
|
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
@ -95,29 +94,29 @@ public class DFSOutputStream extends FSOutputSummer
|
|||||||
static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS =
|
static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS =
|
||||||
CryptoProtocolVersion.supported();
|
CryptoProtocolVersion.supported();
|
||||||
|
|
||||||
private final DFSClient dfsClient;
|
protected final DFSClient dfsClient;
|
||||||
private final ByteArrayManager byteArrayManager;
|
protected final ByteArrayManager byteArrayManager;
|
||||||
// closed is accessed by different threads under different locks.
|
// closed is accessed by different threads under different locks.
|
||||||
private volatile boolean closed = false;
|
protected volatile boolean closed = false;
|
||||||
|
|
||||||
private final String src;
|
protected final String src;
|
||||||
private final long fileId;
|
protected final long fileId;
|
||||||
private final long blockSize;
|
protected final long blockSize;
|
||||||
private final int bytesPerChecksum;
|
protected final int bytesPerChecksum;
|
||||||
|
|
||||||
private DFSPacket currentPacket = null;
|
protected DFSPacket currentPacket = null;
|
||||||
private DataStreamer streamer;
|
protected DataStreamer streamer;
|
||||||
private int packetSize = 0; // write packet size, not including the header.
|
protected int packetSize = 0; // write packet size, not including the header.
|
||||||
private int chunksPerPacket = 0;
|
protected int chunksPerPacket = 0;
|
||||||
private long lastFlushOffset = 0; // offset when flush was invoked
|
protected long lastFlushOffset = 0; // offset when flush was invoked
|
||||||
private long initialFileSize = 0; // at time of file open
|
private long initialFileSize = 0; // at time of file open
|
||||||
private final short blockReplication; // replication factor of file
|
private final short blockReplication; // replication factor of file
|
||||||
private boolean shouldSyncBlock = false; // force blocks to disk upon close
|
protected boolean shouldSyncBlock = false; // force blocks to disk upon close
|
||||||
private final AtomicReference<CachingStrategy> cachingStrategy;
|
protected final AtomicReference<CachingStrategy> cachingStrategy;
|
||||||
private FileEncryptionInfo fileEncryptionInfo;
|
private FileEncryptionInfo fileEncryptionInfo;
|
||||||
|
|
||||||
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
|
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
|
||||||
private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
|
protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
|
||||||
long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
|
long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
|
||||||
final byte[] buf;
|
final byte[] buf;
|
||||||
final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
|
final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
|
||||||
@ -206,7 +205,7 @@ private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Construct a new output stream for creating a file. */
|
/** Construct a new output stream for creating a file. */
|
||||||
private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
|
protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
|
||||||
EnumSet<CreateFlag> flag, Progressable progress,
|
EnumSet<CreateFlag> flag, Progressable progress,
|
||||||
DataChecksum checksum, String[] favoredNodes) throws IOException {
|
DataChecksum checksum, String[] favoredNodes) throws IOException {
|
||||||
this(dfsClient, src, progress, stat, checksum);
|
this(dfsClient, src, progress, stat, checksum);
|
||||||
@ -359,7 +358,7 @@ static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void computePacketChunkSize(int psize, int csize) {
|
protected void computePacketChunkSize(int psize, int csize) {
|
||||||
final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
|
final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
|
||||||
final int chunkSize = csize + getChecksumSize();
|
final int chunkSize = csize + getChecksumSize();
|
||||||
chunksPerPacket = Math.max(bodySize/chunkSize, 1);
|
chunksPerPacket = Math.max(bodySize/chunkSize, 1);
|
||||||
@ -426,33 +425,46 @@ protected synchronized void writeChunk(byte[] b, int offset, int len,
|
|||||||
streamer.waitAndQueuePacket(currentPacket);
|
streamer.waitAndQueuePacket(currentPacket);
|
||||||
currentPacket = null;
|
currentPacket = null;
|
||||||
|
|
||||||
// If the reopened file did not end at chunk boundary and the above
|
adjustChunkBoundary();
|
||||||
// write filled up its partial chunk. Tell the summer to generate full
|
|
||||||
// crc chunks from now on.
|
|
||||||
if (streamer.getAppendChunk() &&
|
|
||||||
streamer.getBytesCurBlock() % bytesPerChecksum == 0) {
|
|
||||||
streamer.setAppendChunk(false);
|
|
||||||
resetChecksumBufSize();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!streamer.getAppendChunk()) {
|
endBlock();
|
||||||
int psize = Math.min((int)(blockSize-streamer.getBytesCurBlock()),
|
}
|
||||||
dfsClient.getConf().writePacketSize);
|
}
|
||||||
computePacketChunkSize(psize, bytesPerChecksum);
|
|
||||||
}
|
/**
|
||||||
//
|
* If the reopened file did not end at chunk boundary and the above
|
||||||
// if encountering a block boundary, send an empty packet to
|
* write filled up its partial chunk. Tell the summer to generate full
|
||||||
// indicate the end of block and reset bytesCurBlock.
|
* crc chunks from now on.
|
||||||
//
|
*/
|
||||||
if (streamer.getBytesCurBlock() == blockSize) {
|
protected void adjustChunkBoundary() {
|
||||||
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
|
if (streamer.getAppendChunk() &&
|
||||||
streamer.getAndIncCurrentSeqno(), true);
|
streamer.getBytesCurBlock() % bytesPerChecksum == 0) {
|
||||||
currentPacket.setSyncBlock(shouldSyncBlock);
|
streamer.setAppendChunk(false);
|
||||||
streamer.waitAndQueuePacket(currentPacket);
|
resetChecksumBufSize();
|
||||||
currentPacket = null;
|
}
|
||||||
streamer.setBytesCurBlock(0);
|
|
||||||
lastFlushOffset = 0;
|
if (!streamer.getAppendChunk()) {
|
||||||
}
|
int psize = Math.min((int)(blockSize- streamer.getBytesCurBlock()),
|
||||||
|
dfsClient.getConf().writePacketSize);
|
||||||
|
computePacketChunkSize(psize, bytesPerChecksum);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* if encountering a block boundary, send an empty packet to
|
||||||
|
* indicate the end of block and reset bytesCurBlock.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
protected void endBlock() throws IOException {
|
||||||
|
if (streamer.getBytesCurBlock() == blockSize) {
|
||||||
|
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
|
||||||
|
streamer.getAndIncCurrentSeqno(), true);
|
||||||
|
currentPacket.setSyncBlock(shouldSyncBlock);
|
||||||
|
streamer.waitAndQueuePacket(currentPacket);
|
||||||
|
currentPacket = null;
|
||||||
|
streamer.setBytesCurBlock(0);
|
||||||
|
lastFlushOffset = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -676,7 +688,7 @@ public synchronized int getCurrentBlockReplication() throws IOException {
|
|||||||
* Waits till all existing data is flushed and confirmations
|
* Waits till all existing data is flushed and confirmations
|
||||||
* received from datanodes.
|
* received from datanodes.
|
||||||
*/
|
*/
|
||||||
private void flushInternal() throws IOException {
|
protected void flushInternal() throws IOException {
|
||||||
long toWaitFor;
|
long toWaitFor;
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
dfsClient.checkOpen();
|
dfsClient.checkOpen();
|
||||||
@ -692,7 +704,7 @@ private void flushInternal() throws IOException {
|
|||||||
streamer.waitForAckedSeqno(toWaitFor);
|
streamer.waitForAckedSeqno(toWaitFor);
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void start() {
|
protected synchronized void start() {
|
||||||
streamer.start();
|
streamer.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -721,7 +733,7 @@ void setClosed() {
|
|||||||
|
|
||||||
// shutdown datastreamer and responseprocessor threads.
|
// shutdown datastreamer and responseprocessor threads.
|
||||||
// interrupt datastreamer if force is true
|
// interrupt datastreamer if force is true
|
||||||
private void closeThreads(boolean force) throws IOException {
|
protected void closeThreads(boolean force) throws IOException {
|
||||||
try {
|
try {
|
||||||
streamer.close(force);
|
streamer.close(force);
|
||||||
streamer.join();
|
streamer.join();
|
||||||
@ -749,7 +761,7 @@ public synchronized void close() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void closeImpl() throws IOException {
|
protected synchronized void closeImpl() throws IOException {
|
||||||
if (isClosed()) {
|
if (isClosed()) {
|
||||||
IOException e = streamer.getLastException().getAndSet(null);
|
IOException e = streamer.getLastException().getAndSet(null);
|
||||||
if (e == null)
|
if (e == null)
|
||||||
@ -792,7 +804,7 @@ private synchronized void closeImpl() throws IOException {
|
|||||||
|
|
||||||
// should be called holding (this) lock since setTestFilename() may
|
// should be called holding (this) lock since setTestFilename() may
|
||||||
// be called during unit tests
|
// be called during unit tests
|
||||||
private void completeFile(ExtendedBlock last) throws IOException {
|
protected void completeFile(ExtendedBlock last) throws IOException {
|
||||||
long localstart = Time.monotonicNow();
|
long localstart = Time.monotonicNow();
|
||||||
long sleeptime = dfsClient.getConf().
|
long sleeptime = dfsClient.getConf().
|
||||||
blockWriteLocateFollowingInitialDelayMs;
|
blockWriteLocateFollowingInitialDelayMs;
|
||||||
|
@ -1519,7 +1519,7 @@ private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
|
protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
|
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
|
||||||
long sleeptime = dfsClient.getConf().
|
long sleeptime = dfsClient.getConf().
|
||||||
@ -1728,15 +1728,6 @@ AtomicReference<IOException> getLastException(){
|
|||||||
return lastException;
|
return lastException;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* get the socket connecting to the first datanode in pipeline
|
|
||||||
*
|
|
||||||
* @return socket connecting to the first datanode in pipeline
|
|
||||||
*/
|
|
||||||
Socket getSocket() {
|
|
||||||
return s;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* set socket to null
|
* set socket to null
|
||||||
*/
|
*/
|
||||||
|
Loading…
Reference in New Issue
Block a user