HDFS-2260. Refactor BlockReader into an interface and implementation. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1159004 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-08-18 03:02:19 +00:00
parent cc875f0124
commit dd86860633
12 changed files with 651 additions and 495 deletions

View File

@ -668,6 +668,9 @@ Trunk (unreleased changes)
HDFS-2265. Remove unnecessary BlockTokenSecretManager fields/methods from
BlockManager. (szetszwo)
HDFS-2260. Refactor BlockReader into an interface and implementation.
(todd)
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -17,95 +17,18 @@
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
/** This is a wrapper around connection to datanode
* and understands checksum, offset etc.
*
* Terminology:
* <dl>
* <dt>block</dt>
* <dd>The hdfs block, typically large (~64MB).
* </dd>
* <dt>chunk</dt>
* <dd>A block is divided into chunks, each comes with a checksum.
* We want transfers to be chunk-aligned, to be able to
* verify checksums.
* </dd>
* <dt>packet</dt>
* <dd>A grouping of chunks used for transport. It contains a
* header, followed by checksum data, followed by real data.
* </dd>
* </dl>
* Please see DataNode for the RPC specification.
/**
* A BlockReader is responsible for reading a single block
* from a single datanode.
*/
@InterfaceAudience.Private
public class BlockReader extends FSInputChecker {
public interface BlockReader extends Seekable, PositionedReadable {
Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
private DataInputStream in;
private DataChecksum checksum;
/** offset in block of the last chunk received */
private long lastChunkOffset = -1;
private long lastChunkLen = -1;
private long lastSeqNo = -1;
/** offset in block where reader wants to actually read */
private long startOffset;
/** offset in block of of first chunk - may be less than startOffset
if startOffset is not chunk-aligned */
private final long firstChunkOffset;
private int bytesPerChecksum;
private int checksumSize;
/**
* The total number of bytes we need to transfer from the DN.
* This is the amount that the user has requested plus some padding
* at the beginning so that the read can begin on a chunk boundary.
*/
private final long bytesNeededToFinish;
private boolean eos = false;
private boolean sentStatusCode = false;
byte[] skipBuf = null;
ByteBuffer checksumBytes = null;
/** Amount of unread data in the current received packet */
int dataLeft = 0;
/* FSInputChecker interface */
/* same interface as inputStream java.io.InputStream#read()
* used by DFSInputStream#read()
* This violates one rule when there is a checksum error:
@ -113,415 +36,35 @@ public class BlockReader extends FSInputChecker {
* because it first reads the data to user buffer and then checks
* the checksum.
*/
@Override
public synchronized int read(byte[] buf, int off, int len)
throws IOException {
// This has to be set here, *before* the skip, since we can
// hit EOS during the skip, in the case that our entire read
// is smaller than the checksum chunk.
boolean eosBefore = eos;
//for the first read, skip the extra bytes at the front.
if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
// Skip these bytes. But don't call this.skip()!
int toSkip = (int)(startOffset - firstChunkOffset);
if ( skipBuf == null ) {
skipBuf = new byte[bytesPerChecksum];
}
if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
// should never happen
throw new IOException("Could not skip required number of bytes");
}
}
int nRead = super.read(buf, off, len);
// if eos was set in the previous read, send a status code to the DN
if (eos && !eosBefore && nRead >= 0) {
if (needChecksum()) {
sendReadResult(dnSock, Status.CHECKSUM_OK);
} else {
sendReadResult(dnSock, Status.SUCCESS);
}
}
return nRead;
}
@Override
public synchronized long skip(long n) throws IOException {
/* How can we make sure we don't throw a ChecksumException, at least
* in majority of the cases?. This one throws. */
if ( skipBuf == null ) {
skipBuf = new byte[bytesPerChecksum];
}
long nSkipped = 0;
while ( nSkipped < n ) {
int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
int ret = read(skipBuf, 0, toSkip);
if ( ret <= 0 ) {
return nSkipped;
}
nSkipped += ret;
}
return nSkipped;
}
@Override
public int read() throws IOException {
throw new IOException("read() is not expected to be invoked. " +
"Use read(buf, off, len) instead.");
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
/* Checksum errors are handled outside the BlockReader.
* DFSInputStream does not always call 'seekToNewSource'. In the
* case of pread(), it just tries a different replica without seeking.
*/
return false;
}
@Override
public void seek(long pos) throws IOException {
throw new IOException("Seek() is not supported in BlockInputChecker");
}
@Override
protected long getChunkPosition(long pos) {
throw new RuntimeException("getChunkPosition() is not supported, " +
"since seek is not required");
}
/**
* Makes sure that checksumBytes has enough capacity
* and limit is set to the number of checksum bytes needed
* to be read.
*/
private void adjustChecksumBytes(int dataLen) {
int requiredSize =
((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
checksumBytes = ByteBuffer.wrap(new byte[requiredSize]);
} else {
checksumBytes.clear();
}
checksumBytes.limit(requiredSize);
}
@Override
protected synchronized int readChunk(long pos, byte[] buf, int offset,
int len, byte[] checksumBuf)
throws IOException {
// Read one chunk.
if (eos) {
// Already hit EOF
return -1;
}
// Read one DATA_CHUNK.
long chunkOffset = lastChunkOffset;
if ( lastChunkLen > 0 ) {
chunkOffset += lastChunkLen;
}
// pos is relative to the start of the first chunk of the read.
// chunkOffset is relative to the start of the block.
// This makes sure that the read passed from FSInputChecker is the
// for the same chunk we expect to be reading from the DN.
if ( (pos + firstChunkOffset) != chunkOffset ) {
throw new IOException("Mismatch in pos : " + pos + " + " +
firstChunkOffset + " != " + chunkOffset);
}
// Read next packet if the previous packet has been read completely.
if (dataLeft <= 0) {
//Read packet headers.
PacketHeader header = new PacketHeader();
header.readFields(in);
if (LOG.isDebugEnabled()) {
LOG.debug("DFSClient readChunk got header " + header);
}
// Sanity check the lengths
if (!header.sanityCheck(lastSeqNo)) {
throw new IOException("BlockReader: error in packet header " +
header);
}
lastSeqNo = header.getSeqno();
dataLeft = header.getDataLen();
adjustChecksumBytes(header.getDataLen());
if (header.getDataLen() > 0) {
IOUtils.readFully(in, checksumBytes.array(), 0,
checksumBytes.limit());
}
}
// Sanity checks
assert len >= bytesPerChecksum;
assert checksum != null;
assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
int checksumsToRead, bytesToRead;
if (checksumSize > 0) {
// How many chunks left in our packet - this is a ceiling
// since we may have a partial chunk at the end of the file
int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
// How many chunks we can fit in databuffer
// - note this is a floor since we always read full chunks
int chunksCanFit = Math.min(len / bytesPerChecksum,
checksumBuf.length / checksumSize);
// How many chunks should we read
checksumsToRead = Math.min(chunksLeft, chunksCanFit);
// How many bytes should we actually read
bytesToRead = Math.min(
checksumsToRead * bytesPerChecksum, // full chunks
dataLeft); // in case we have a partial
} else {
// no checksum
bytesToRead = Math.min(dataLeft, len);
checksumsToRead = 0;
}
if ( bytesToRead > 0 ) {
// Assert we have enough space
assert bytesToRead <= len;
assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
assert checksumBuf.length >= checksumSize * checksumsToRead;
IOUtils.readFully(in, buf, offset, bytesToRead);
checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
}
dataLeft -= bytesToRead;
assert dataLeft >= 0;
lastChunkOffset = chunkOffset;
lastChunkLen = bytesToRead;
// If there's no data left in the current packet after satisfying
// this read, and we have satisfied the client read, we expect
// an empty packet header from the DN to signify this.
// Note that pos + bytesToRead may in fact be greater since the
// DN finishes off the entire last chunk.
if (dataLeft == 0 &&
pos + bytesToRead >= bytesNeededToFinish) {
// Read header
PacketHeader hdr = new PacketHeader();
hdr.readFields(in);
if (!hdr.isLastPacketInBlock() ||
hdr.getDataLen() != 0) {
throw new IOException("Expected empty end-of-read packet! Header: " +
hdr);
}
eos = true;
}
if ( bytesToRead == 0 ) {
return -1;
}
return bytesToRead;
}
private BlockReader(String file, String bpid, long blockId,
DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
// Path is used only for printing block and file information in debug
super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/,
1, verifyChecksum,
checksum.getChecksumSize() > 0? checksum : null,
checksum.getBytesPerChecksum(),
checksum.getChecksumSize());
this.dnSock = dnSock;
this.in = in;
this.checksum = checksum;
this.startOffset = Math.max( startOffset, 0 );
// The total number of bytes that we need to transfer from the DN is
// the amount that the user wants (bytesToRead), plus the padding at
// the beginning in order to chunk-align. Note that the DN may elect
// to send more than this amount if the read starts/ends mid-chunk.
this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
this.firstChunkOffset = firstChunkOffset;
lastChunkOffset = firstChunkOffset;
lastChunkLen = -1;
bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize();
}
public static BlockReader newBlockReader(Socket sock, String file,
ExtendedBlock block, Token<BlockTokenIdentifier> blockToken,
long startOffset, long len, int bufferSize) throws IOException {
return newBlockReader(sock, file, block, blockToken, startOffset, len, bufferSize,
true);
}
/** Java Doc required */
public static BlockReader newBlockReader( Socket sock, String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum)
throws IOException {
return newBlockReader(sock, file, block, blockToken, startOffset,
len, bufferSize, verifyChecksum, "");
}
int read(byte[] buf, int off, int len) throws IOException;
/**
* Create a new BlockReader specifically to satisfy a read.
* This method also sends the OP_READ_BLOCK request.
*
* @param sock An established Socket to the DN. The BlockReader will not close it normally
* @param file File location
* @param block The block object
* @param blockToken The block token for security
* @param startOffset The read offset, relative to block head
* @param len The number of bytes to read
* @param bufferSize The IO buffer size (not the client buffer size)
* @param verifyChecksum Whether to verify checksum
* @param clientName Client name
* @return New BlockReader instance, or null on error.
* Skip the given number of bytes
*/
public static BlockReader newBlockReader( Socket sock, String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
String clientName)
throws IOException {
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT)));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
//
// Get bytes in block, set streams
//
long skip(long n) throws IOException;
DataInputStream in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(sock),
bufferSize));
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in));
if (status.getStatus() != Status.SUCCESS) {
if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for OP_READ_BLOCK, self="
+ sock.getLocalSocketAddress() + ", remote="
+ sock.getRemoteSocketAddress() + ", for file " + file
+ ", for pool " + block.getBlockPoolId() + " block "
+ block.getBlockId() + "_" + block.getGenerationStamp());
} else {
throw new IOException("Got error for OP_READ_BLOCK, self="
+ sock.getLocalSocketAddress() + ", remote="
+ sock.getRemoteSocketAddress() + ", for file " + file
+ ", for pool " + block.getBlockPoolId() + " block "
+ block.getBlockId() + "_" + block.getGenerationStamp());
}
}
DataChecksum checksum = DataChecksum.newDataChecksum( in );
//Warning when we get CHECKSUM_NULL?
// Read the first chunk offset.
long firstChunkOffset = in.readLong();
if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
throw new IOException("BlockReader: error in first chunk offset (" +
firstChunkOffset + ") startOffset is " +
startOffset + " for file " + file);
}
/**
* Read a single byte, returning -1 at enf of stream.
*/
int read() throws IOException;
return new BlockReader(file, block.getBlockPoolId(), block.getBlockId(),
in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
}
void close() throws IOException;
@Override
public synchronized void close() throws IOException {
startOffset = -1;
checksum = null;
if (dnSock != null) {
dnSock.close();
}
// in will be closed when its Socket is closed.
}
/** kind of like readFully(). Only reads as much as possible.
/**
* kind of like readFully(). Only reads as much as possible.
* And allows use of protected readFully().
*/
public int readAll(byte[] buf, int offset, int len) throws IOException {
return readFully(this, buf, offset, len);
}
int readAll(byte[] buf, int offset, int len) throws IOException;
/**
* Take the socket used to talk to the DN.
*/
public Socket takeSocket() {
assert hasSentStatusCode() :
"BlockReader shouldn't give back sockets mid-read";
Socket res = dnSock;
dnSock = null;
return res;
}
Socket takeSocket();
/**
* Whether the BlockReader has reached the end of its input stream
* and successfully sent a status code back to the datanode.
*/
public boolean hasSentStatusCode() {
return sentStatusCode;
}
boolean hasSentStatusCode();
/**
* When the reader reaches end of the read, it sends a status response
* (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
* closing our connection (which we will re-open), but won't affect
* data correctness.
*/
void sendReadResult(Socket sock, Status statusCode) {
assert !sentStatusCode : "already sent status code to " + sock;
try {
OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
ClientReadStatusProto.newBuilder()
.setStatus(statusCode)
.build()
.writeDelimitedTo(out);
out.flush();
sentStatusCode = true;
} catch (IOException e) {
// It's ok not to be able to send this. But something is probably wrong.
LOG.info("Could not send read status (" + statusCode + ") to datanode " +
sock.getInetAddress() + ": " + e.getMessage());
}
}
/**
* File name to print when accessing a block directly (from servlets)
* @param s Address of the block location
* @param poolId Block pool ID of the block
* @param blockId Block ID of the block
* @return string that has a file name for debug purposes
*/
public static String getFileName(final InetSocketAddress s,
final String poolId, final long blockId) {
return s.toString() + ":" + poolId + ":" + blockId;
}
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
/**
* Utility class to create BlockReader implementations.
*/
@InterfaceAudience.Private
public class BlockReaderFactory {
public static BlockReader newBlockReader(Socket sock, String file,
ExtendedBlock block, Token<BlockTokenIdentifier> blockToken,
long startOffset, long len, int bufferSize) throws IOException {
return newBlockReader(sock, file, block, blockToken, startOffset,
len, bufferSize, true, "");
}
/**
* Create a new BlockReader specifically to satisfy a read.
* This method also sends the OP_READ_BLOCK request.
*
* @param sock An established Socket to the DN. The BlockReader will not close it normally
* @param file File location
* @param block The block object
* @param blockToken The block token for security
* @param startOffset The read offset, relative to block head
* @param len The number of bytes to read
* @param bufferSize The IO buffer size (not the client buffer size)
* @param verifyChecksum Whether to verify checksum
* @param clientName Client name
* @return New BlockReader instance, or null on error.
*/
public static BlockReader newBlockReader(
Socket sock, String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
String clientName)
throws IOException {
return RemoteBlockReader.newBlockReader(
sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
}
/**
* File name to print when accessing a block directly (from servlets)
* @param s Address of the block location
* @param poolId Block pool ID of the block
* @param blockId Block ID of the block
* @return string that has a file name for debug purposes
*/
public static String getFileName(final InetSocketAddress s,
final String poolId, final long blockId) {
return s.toString() + ":" + poolId + ":" + blockId;
}
}

View File

@ -776,7 +776,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
try {
// The OP_READ_BLOCK request is sent as we make the BlockReader
BlockReader reader =
BlockReader.newBlockReader(sock, file, block,
BlockReaderFactory.newBlockReader(sock, file, block,
blockToken,
startOffset, len,
bufferSize, verifyChecksum,

View File

@ -0,0 +1,516 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
/** This is a wrapper around connection to datanode
* and understands checksum, offset etc.
*
* Terminology:
* <dl>
* <dt>block</dt>
* <dd>The hdfs block, typically large (~64MB).
* </dd>
* <dt>chunk</dt>
* <dd>A block is divided into chunks, each comes with a checksum.
* We want transfers to be chunk-aligned, to be able to
* verify checksums.
* </dd>
* <dt>packet</dt>
* <dd>A grouping of chunks used for transport. It contains a
* header, followed by checksum data, followed by real data.
* </dd>
* </dl>
* Please see DataNode for the RPC specification.
*/
@InterfaceAudience.Private
public class RemoteBlockReader extends FSInputChecker implements BlockReader {
Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
private DataInputStream in;
private DataChecksum checksum;
/** offset in block of the last chunk received */
private long lastChunkOffset = -1;
private long lastChunkLen = -1;
private long lastSeqNo = -1;
/** offset in block where reader wants to actually read */
private long startOffset;
/** offset in block of of first chunk - may be less than startOffset
if startOffset is not chunk-aligned */
private final long firstChunkOffset;
private int bytesPerChecksum;
private int checksumSize;
/**
* The total number of bytes we need to transfer from the DN.
* This is the amount that the user has requested plus some padding
* at the beginning so that the read can begin on a chunk boundary.
*/
private final long bytesNeededToFinish;
private boolean eos = false;
private boolean sentStatusCode = false;
byte[] skipBuf = null;
ByteBuffer checksumBytes = null;
/** Amount of unread data in the current received packet */
int dataLeft = 0;
/* FSInputChecker interface */
/* same interface as inputStream java.io.InputStream#read()
* used by DFSInputStream#read()
* This violates one rule when there is a checksum error:
* "Read should not modify user buffer before successful read"
* because it first reads the data to user buffer and then checks
* the checksum.
*/
@Override
public synchronized int read(byte[] buf, int off, int len)
throws IOException {
// This has to be set here, *before* the skip, since we can
// hit EOS during the skip, in the case that our entire read
// is smaller than the checksum chunk.
boolean eosBefore = eos;
//for the first read, skip the extra bytes at the front.
if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
// Skip these bytes. But don't call this.skip()!
int toSkip = (int)(startOffset - firstChunkOffset);
if ( skipBuf == null ) {
skipBuf = new byte[bytesPerChecksum];
}
if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
// should never happen
throw new IOException("Could not skip required number of bytes");
}
}
int nRead = super.read(buf, off, len);
// if eos was set in the previous read, send a status code to the DN
if (eos && !eosBefore && nRead >= 0) {
if (needChecksum()) {
sendReadResult(dnSock, Status.CHECKSUM_OK);
} else {
sendReadResult(dnSock, Status.SUCCESS);
}
}
return nRead;
}
@Override
public synchronized long skip(long n) throws IOException {
/* How can we make sure we don't throw a ChecksumException, at least
* in majority of the cases?. This one throws. */
if ( skipBuf == null ) {
skipBuf = new byte[bytesPerChecksum];
}
long nSkipped = 0;
while ( nSkipped < n ) {
int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
int ret = read(skipBuf, 0, toSkip);
if ( ret <= 0 ) {
return nSkipped;
}
nSkipped += ret;
}
return nSkipped;
}
@Override
public int read() throws IOException {
throw new IOException("read() is not expected to be invoked. " +
"Use read(buf, off, len) instead.");
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
/* Checksum errors are handled outside the BlockReader.
* DFSInputStream does not always call 'seekToNewSource'. In the
* case of pread(), it just tries a different replica without seeking.
*/
return false;
}
@Override
public void seek(long pos) throws IOException {
throw new IOException("Seek() is not supported in BlockInputChecker");
}
@Override
protected long getChunkPosition(long pos) {
throw new RuntimeException("getChunkPosition() is not supported, " +
"since seek is not required");
}
/**
* Makes sure that checksumBytes has enough capacity
* and limit is set to the number of checksum bytes needed
* to be read.
*/
private void adjustChecksumBytes(int dataLen) {
int requiredSize =
((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
checksumBytes = ByteBuffer.wrap(new byte[requiredSize]);
} else {
checksumBytes.clear();
}
checksumBytes.limit(requiredSize);
}
@Override
protected synchronized int readChunk(long pos, byte[] buf, int offset,
int len, byte[] checksumBuf)
throws IOException {
// Read one chunk.
if (eos) {
// Already hit EOF
return -1;
}
// Read one DATA_CHUNK.
long chunkOffset = lastChunkOffset;
if ( lastChunkLen > 0 ) {
chunkOffset += lastChunkLen;
}
// pos is relative to the start of the first chunk of the read.
// chunkOffset is relative to the start of the block.
// This makes sure that the read passed from FSInputChecker is the
// for the same chunk we expect to be reading from the DN.
if ( (pos + firstChunkOffset) != chunkOffset ) {
throw new IOException("Mismatch in pos : " + pos + " + " +
firstChunkOffset + " != " + chunkOffset);
}
// Read next packet if the previous packet has been read completely.
if (dataLeft <= 0) {
//Read packet headers.
PacketHeader header = new PacketHeader();
header.readFields(in);
if (LOG.isDebugEnabled()) {
LOG.debug("DFSClient readChunk got header " + header);
}
// Sanity check the lengths
if (!header.sanityCheck(lastSeqNo)) {
throw new IOException("BlockReader: error in packet header " +
header);
}
lastSeqNo = header.getSeqno();
dataLeft = header.getDataLen();
adjustChecksumBytes(header.getDataLen());
if (header.getDataLen() > 0) {
IOUtils.readFully(in, checksumBytes.array(), 0,
checksumBytes.limit());
}
}
// Sanity checks
assert len >= bytesPerChecksum;
assert checksum != null;
assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
int checksumsToRead, bytesToRead;
if (checksumSize > 0) {
// How many chunks left in our packet - this is a ceiling
// since we may have a partial chunk at the end of the file
int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
// How many chunks we can fit in databuffer
// - note this is a floor since we always read full chunks
int chunksCanFit = Math.min(len / bytesPerChecksum,
checksumBuf.length / checksumSize);
// How many chunks should we read
checksumsToRead = Math.min(chunksLeft, chunksCanFit);
// How many bytes should we actually read
bytesToRead = Math.min(
checksumsToRead * bytesPerChecksum, // full chunks
dataLeft); // in case we have a partial
} else {
// no checksum
bytesToRead = Math.min(dataLeft, len);
checksumsToRead = 0;
}
if ( bytesToRead > 0 ) {
// Assert we have enough space
assert bytesToRead <= len;
assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
assert checksumBuf.length >= checksumSize * checksumsToRead;
IOUtils.readFully(in, buf, offset, bytesToRead);
checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
}
dataLeft -= bytesToRead;
assert dataLeft >= 0;
lastChunkOffset = chunkOffset;
lastChunkLen = bytesToRead;
// If there's no data left in the current packet after satisfying
// this read, and we have satisfied the client read, we expect
// an empty packet header from the DN to signify this.
// Note that pos + bytesToRead may in fact be greater since the
// DN finishes off the entire last chunk.
if (dataLeft == 0 &&
pos + bytesToRead >= bytesNeededToFinish) {
// Read header
PacketHeader hdr = new PacketHeader();
hdr.readFields(in);
if (!hdr.isLastPacketInBlock() ||
hdr.getDataLen() != 0) {
throw new IOException("Expected empty end-of-read packet! Header: " +
hdr);
}
eos = true;
}
if ( bytesToRead == 0 ) {
return -1;
}
return bytesToRead;
}
private RemoteBlockReader(String file, String bpid, long blockId,
DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
// Path is used only for printing block and file information in debug
super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/,
1, verifyChecksum,
checksum.getChecksumSize() > 0? checksum : null,
checksum.getBytesPerChecksum(),
checksum.getChecksumSize());
this.dnSock = dnSock;
this.in = in;
this.checksum = checksum;
this.startOffset = Math.max( startOffset, 0 );
// The total number of bytes that we need to transfer from the DN is
// the amount that the user wants (bytesToRead), plus the padding at
// the beginning in order to chunk-align. Note that the DN may elect
// to send more than this amount if the read starts/ends mid-chunk.
this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
this.firstChunkOffset = firstChunkOffset;
lastChunkOffset = firstChunkOffset;
lastChunkLen = -1;
bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize();
}
public static RemoteBlockReader newBlockReader(Socket sock, String file,
ExtendedBlock block, Token<BlockTokenIdentifier> blockToken,
long startOffset, long len, int bufferSize) throws IOException {
return newBlockReader(sock, file, block, blockToken, startOffset,
len, bufferSize, true, "");
}
/**
* Create a new BlockReader specifically to satisfy a read.
* This method also sends the OP_READ_BLOCK request.
*
* @param sock An established Socket to the DN. The BlockReader will not close it normally
* @param file File location
* @param block The block object
* @param blockToken The block token for security
* @param startOffset The read offset, relative to block head
* @param len The number of bytes to read
* @param bufferSize The IO buffer size (not the client buffer size)
* @param verifyChecksum Whether to verify checksum
* @param clientName Client name
* @return New BlockReader instance, or null on error.
*/
public static RemoteBlockReader newBlockReader( Socket sock, String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
String clientName)
throws IOException {
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT)));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
//
// Get bytes in block, set streams
//
DataInputStream in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(sock),
bufferSize));
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in));
checkSuccess(status, sock, block, file);
DataChecksum checksum = DataChecksum.newDataChecksum( in );
//Warning when we get CHECKSUM_NULL?
// Read the first chunk offset.
long firstChunkOffset = in.readLong();
if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
throw new IOException("BlockReader: error in first chunk offset (" +
firstChunkOffset + ") startOffset is " +
startOffset + " for file " + file);
}
return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
}
private static void checkSuccess(
BlockOpResponseProto status, Socket sock,
ExtendedBlock block, String file)
throws IOException {
if (status.getStatus() != Status.SUCCESS) {
if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for OP_READ_BLOCK, self="
+ sock.getLocalSocketAddress() + ", remote="
+ sock.getRemoteSocketAddress() + ", for file " + file
+ ", for pool " + block.getBlockPoolId() + " block "
+ block.getBlockId() + "_" + block.getGenerationStamp());
} else {
throw new IOException("Got error for OP_READ_BLOCK, self="
+ sock.getLocalSocketAddress() + ", remote="
+ sock.getRemoteSocketAddress() + ", for file " + file
+ ", for pool " + block.getBlockPoolId() + " block "
+ block.getBlockId() + "_" + block.getGenerationStamp());
}
}
}
@Override
public synchronized void close() throws IOException {
startOffset = -1;
checksum = null;
if (dnSock != null) {
dnSock.close();
}
// in will be closed when its Socket is closed.
}
@Override
public int readAll(byte[] buf, int offset, int len) throws IOException {
return readFully(this, buf, offset, len);
}
@Override
public Socket takeSocket() {
assert hasSentStatusCode() :
"BlockReader shouldn't give back sockets mid-read";
Socket res = dnSock;
dnSock = null;
return res;
}
@Override
public boolean hasSentStatusCode() {
return sentStatusCode;
}
/**
* When the reader reaches end of the read, it sends a status response
* (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
* closing our connection (which we will re-open), but won't affect
* data correctness.
*/
void sendReadResult(Socket sock, Status statusCode) {
assert !sentStatusCode : "already sent status code to " + sock;
try {
OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
ClientReadStatusProto.newBuilder()
.setStatus(statusCode)
.build()
.writeDelimitedTo(out);
out.flush();
sentStatusCode = true;
} catch (IOException e) {
// It's ok not to be able to send this. But something is probably wrong.
LOG.info("Could not send read status (" + statusCode + ") to datanode " +
sock.getInetAddress() + ": " + e.getMessage());
}
}
/**
* File name to print when accessing a block directly (from servlets)
* @param s Address of the block location
* @param poolId Block pool ID of the block
* @param blockId Block ID of the block
* @return string that has a file name for debug purposes
*/
public static String getFileName(final InetSocketAddress s,
final String poolId, final long blockId) {
return s.toString() + ":" + poolId + ":" + blockId;
}
}

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -195,8 +196,8 @@ public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
// Use the block name for file name.
int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
String file = BlockReader.getFileName(addr, poolId, blockId);
BlockReader blockReader = BlockReader.newBlockReader(s, file,
String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
BlockReader blockReader = BlockReaderFactory.newBlockReader(s, file,
new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
offsetIntoBlock, amtToRead, bufferSize);

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
@ -504,9 +505,9 @@ private void copyBlock(DFSClient dfs, LocatedBlock lblock,
s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
String file = BlockReader.getFileName(targetAddr, block.getBlockPoolId(),
String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
block.getBlockId());
blockReader = BlockReader.newBlockReader(s, file, block, lblock
blockReader = BlockReaderFactory.newBlockReader(s, file, block, lblock
.getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096));
} catch (IOException ex) {

View File

@ -143,7 +143,7 @@ public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToR
sock.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
return BlockReader.newBlockReader(
return BlockReaderFactory.newBlockReader(
sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
testBlock.getBlockToken(),
offset, lenToRead,

View File

@ -20,6 +20,8 @@
import java.util.List;
import org.apache.hadoop.hdfs.RemoteBlockReader;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.fs.Path;
@ -52,7 +54,8 @@ public static void setupCluster() throws Exception {
*/
@Test
public void testBlockVerification() throws Exception {
BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
RemoteBlockReader reader = (RemoteBlockReader)spy(
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
reader.close();
@ -63,7 +66,8 @@ public void testBlockVerification() throws Exception {
*/
@Test
public void testIncompleteRead() throws Exception {
BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
RemoteBlockReader reader = (RemoteBlockReader)spy(
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
// We asked the blockreader for the whole file, and only read
@ -80,7 +84,8 @@ public void testIncompleteRead() throws Exception {
@Test
public void testCompletePartialRead() throws Exception {
// Ask for half the file
BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
RemoteBlockReader reader = (RemoteBlockReader)spy(
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
// And read half the file
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
@ -99,7 +104,8 @@ public void testUnalignedReads() throws Exception {
for (int length : lengths) {
DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
" len=" + length);
BlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length));
RemoteBlockReader reader = (RemoteBlockReader)spy(
util.getBlockReader(testBlock, startOffset, length));
util.readAndCheckEOS(reader, length, true);
verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
reader.close();

View File

@ -28,6 +28,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.RemoteBlockReader;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.SocketCache;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -72,13 +76,13 @@ public class TestConnCache {
* It verifies that all invocation to DFSInputStream.getBlockReader()
* use the same socket.
*/
private class MockGetBlockReader implements Answer<BlockReader> {
public BlockReader reader = null;
private class MockGetBlockReader implements Answer<RemoteBlockReader> {
public RemoteBlockReader reader = null;
private Socket sock = null;
public BlockReader answer(InvocationOnMock invocation) throws Throwable {
BlockReader prevReader = reader;
reader = (BlockReader) invocation.callRealMethod();
public RemoteBlockReader answer(InvocationOnMock invocation) throws Throwable {
RemoteBlockReader prevReader = reader;
reader = (RemoteBlockReader) invocation.callRealMethod();
if (sock == null) {
sock = reader.dnSock;
} else if (prevReader != null && prevReader.hasSentStatusCode()) {

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
@ -139,9 +140,9 @@ private static void tryRead(Configuration conf, LocatedBlock lblock,
s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
String file = BlockReader.getFileName(targetAddr,
String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid", block.getBlockId());
blockReader = BlockReader.newBlockReader(s, file, block,
blockReader = BlockReaderFactory.newBlockReader(s, file, block,
lblock.getBlockToken(), 0, -1,
conf.getInt("io.file.buffer.size", 4096));

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -267,11 +268,11 @@ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock)
s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
String file = BlockReader.getFileName(targetAddr,
String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid",
block.getBlockId());
BlockReader blockReader =
BlockReader.newBlockReader(s, file, block, lblock
BlockReaderFactory.newBlockReader(s, file, block, lblock
.getBlockToken(), 0, -1, 4096);
// nothing - if it fails - it will throw and exception