diff --git a/hdfs/CHANGES.txt b/hdfs/CHANGES.txt
index 449b97452b..7ee73cab18 100644
--- a/hdfs/CHANGES.txt
+++ b/hdfs/CHANGES.txt
@@ -668,6 +668,9 @@ Trunk (unreleased changes)
HDFS-2265. Remove unnecessary BlockTokenSecretManager fields/methods from
BlockManager. (szetszwo)
+ HDFS-2260. Refactor BlockReader into an interface and implementation.
+ (todd)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java b/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
index 791f5cd800..3ebbeec68b 100644
--- a/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
+++ b/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
@@ -17,95 +17,18 @@
*/
package org.apache.hadoop.hdfs;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
import java.net.Socket;
-import java.nio.ByteBuffer;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FSInputChecker;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
-
-/** This is a wrapper around connection to datanode
- * and understands checksum, offset etc.
- *
- * Terminology:
- *
- * - block
- * - The hdfs block, typically large (~64MB).
- *
- * - chunk
- * - A block is divided into chunks, each comes with a checksum.
- * We want transfers to be chunk-aligned, to be able to
- * verify checksums.
- *
- * - packet
- * - A grouping of chunks used for transport. It contains a
- * header, followed by checksum data, followed by real data.
- *
- *
- * Please see DataNode for the RPC specification.
+/**
+ * A BlockReader is responsible for reading a single block
+ * from a single datanode.
*/
-@InterfaceAudience.Private
-public class BlockReader extends FSInputChecker {
+public interface BlockReader extends Seekable, PositionedReadable {
- Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
- private DataInputStream in;
- private DataChecksum checksum;
-
- /** offset in block of the last chunk received */
- private long lastChunkOffset = -1;
- private long lastChunkLen = -1;
- private long lastSeqNo = -1;
-
- /** offset in block where reader wants to actually read */
- private long startOffset;
-
- /** offset in block of of first chunk - may be less than startOffset
- if startOffset is not chunk-aligned */
- private final long firstChunkOffset;
-
- private int bytesPerChecksum;
- private int checksumSize;
-
- /**
- * The total number of bytes we need to transfer from the DN.
- * This is the amount that the user has requested plus some padding
- * at the beginning so that the read can begin on a chunk boundary.
- */
- private final long bytesNeededToFinish;
-
- private boolean eos = false;
- private boolean sentStatusCode = false;
-
- byte[] skipBuf = null;
- ByteBuffer checksumBytes = null;
- /** Amount of unread data in the current received packet */
- int dataLeft = 0;
-
- /* FSInputChecker interface */
-
/* same interface as inputStream java.io.InputStream#read()
* used by DFSInputStream#read()
* This violates one rule when there is a checksum error:
@@ -113,415 +36,35 @@ public class BlockReader extends FSInputChecker {
* because it first reads the data to user buffer and then checks
* the checksum.
*/
- @Override
- public synchronized int read(byte[] buf, int off, int len)
- throws IOException {
-
- // This has to be set here, *before* the skip, since we can
- // hit EOS during the skip, in the case that our entire read
- // is smaller than the checksum chunk.
- boolean eosBefore = eos;
-
- //for the first read, skip the extra bytes at the front.
- if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
- // Skip these bytes. But don't call this.skip()!
- int toSkip = (int)(startOffset - firstChunkOffset);
- if ( skipBuf == null ) {
- skipBuf = new byte[bytesPerChecksum];
- }
- if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
- // should never happen
- throw new IOException("Could not skip required number of bytes");
- }
- }
-
- int nRead = super.read(buf, off, len);
-
- // if eos was set in the previous read, send a status code to the DN
- if (eos && !eosBefore && nRead >= 0) {
- if (needChecksum()) {
- sendReadResult(dnSock, Status.CHECKSUM_OK);
- } else {
- sendReadResult(dnSock, Status.SUCCESS);
- }
- }
- return nRead;
- }
-
- @Override
- public synchronized long skip(long n) throws IOException {
- /* How can we make sure we don't throw a ChecksumException, at least
- * in majority of the cases?. This one throws. */
- if ( skipBuf == null ) {
- skipBuf = new byte[bytesPerChecksum];
- }
-
- long nSkipped = 0;
- while ( nSkipped < n ) {
- int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
- int ret = read(skipBuf, 0, toSkip);
- if ( ret <= 0 ) {
- return nSkipped;
- }
- nSkipped += ret;
- }
- return nSkipped;
- }
-
- @Override
- public int read() throws IOException {
- throw new IOException("read() is not expected to be invoked. " +
- "Use read(buf, off, len) instead.");
- }
-
- @Override
- public boolean seekToNewSource(long targetPos) throws IOException {
- /* Checksum errors are handled outside the BlockReader.
- * DFSInputStream does not always call 'seekToNewSource'. In the
- * case of pread(), it just tries a different replica without seeking.
- */
- return false;
- }
-
- @Override
- public void seek(long pos) throws IOException {
- throw new IOException("Seek() is not supported in BlockInputChecker");
- }
-
- @Override
- protected long getChunkPosition(long pos) {
- throw new RuntimeException("getChunkPosition() is not supported, " +
- "since seek is not required");
- }
-
- /**
- * Makes sure that checksumBytes has enough capacity
- * and limit is set to the number of checksum bytes needed
- * to be read.
- */
- private void adjustChecksumBytes(int dataLen) {
- int requiredSize =
- ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
- if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
- checksumBytes = ByteBuffer.wrap(new byte[requiredSize]);
- } else {
- checksumBytes.clear();
- }
- checksumBytes.limit(requiredSize);
- }
-
- @Override
- protected synchronized int readChunk(long pos, byte[] buf, int offset,
- int len, byte[] checksumBuf)
- throws IOException {
- // Read one chunk.
- if (eos) {
- // Already hit EOF
- return -1;
- }
-
- // Read one DATA_CHUNK.
- long chunkOffset = lastChunkOffset;
- if ( lastChunkLen > 0 ) {
- chunkOffset += lastChunkLen;
- }
-
- // pos is relative to the start of the first chunk of the read.
- // chunkOffset is relative to the start of the block.
- // This makes sure that the read passed from FSInputChecker is the
- // for the same chunk we expect to be reading from the DN.
- if ( (pos + firstChunkOffset) != chunkOffset ) {
- throw new IOException("Mismatch in pos : " + pos + " + " +
- firstChunkOffset + " != " + chunkOffset);
- }
-
- // Read next packet if the previous packet has been read completely.
- if (dataLeft <= 0) {
- //Read packet headers.
- PacketHeader header = new PacketHeader();
- header.readFields(in);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("DFSClient readChunk got header " + header);
- }
-
- // Sanity check the lengths
- if (!header.sanityCheck(lastSeqNo)) {
- throw new IOException("BlockReader: error in packet header " +
- header);
- }
-
- lastSeqNo = header.getSeqno();
- dataLeft = header.getDataLen();
- adjustChecksumBytes(header.getDataLen());
- if (header.getDataLen() > 0) {
- IOUtils.readFully(in, checksumBytes.array(), 0,
- checksumBytes.limit());
- }
- }
-
- // Sanity checks
- assert len >= bytesPerChecksum;
- assert checksum != null;
- assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
-
-
- int checksumsToRead, bytesToRead;
-
- if (checksumSize > 0) {
-
- // How many chunks left in our packet - this is a ceiling
- // since we may have a partial chunk at the end of the file
- int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
-
- // How many chunks we can fit in databuffer
- // - note this is a floor since we always read full chunks
- int chunksCanFit = Math.min(len / bytesPerChecksum,
- checksumBuf.length / checksumSize);
-
- // How many chunks should we read
- checksumsToRead = Math.min(chunksLeft, chunksCanFit);
- // How many bytes should we actually read
- bytesToRead = Math.min(
- checksumsToRead * bytesPerChecksum, // full chunks
- dataLeft); // in case we have a partial
- } else {
- // no checksum
- bytesToRead = Math.min(dataLeft, len);
- checksumsToRead = 0;
- }
-
- if ( bytesToRead > 0 ) {
- // Assert we have enough space
- assert bytesToRead <= len;
- assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
- assert checksumBuf.length >= checksumSize * checksumsToRead;
- IOUtils.readFully(in, buf, offset, bytesToRead);
- checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
- }
-
- dataLeft -= bytesToRead;
- assert dataLeft >= 0;
-
- lastChunkOffset = chunkOffset;
- lastChunkLen = bytesToRead;
-
- // If there's no data left in the current packet after satisfying
- // this read, and we have satisfied the client read, we expect
- // an empty packet header from the DN to signify this.
- // Note that pos + bytesToRead may in fact be greater since the
- // DN finishes off the entire last chunk.
- if (dataLeft == 0 &&
- pos + bytesToRead >= bytesNeededToFinish) {
-
- // Read header
- PacketHeader hdr = new PacketHeader();
- hdr.readFields(in);
-
- if (!hdr.isLastPacketInBlock() ||
- hdr.getDataLen() != 0) {
- throw new IOException("Expected empty end-of-read packet! Header: " +
- hdr);
- }
-
- eos = true;
- }
-
- if ( bytesToRead == 0 ) {
- return -1;
- }
-
- return bytesToRead;
- }
-
- private BlockReader(String file, String bpid, long blockId,
- DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
- long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
- // Path is used only for printing block and file information in debug
- super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/,
- 1, verifyChecksum,
- checksum.getChecksumSize() > 0? checksum : null,
- checksum.getBytesPerChecksum(),
- checksum.getChecksumSize());
-
- this.dnSock = dnSock;
- this.in = in;
- this.checksum = checksum;
- this.startOffset = Math.max( startOffset, 0 );
-
- // The total number of bytes that we need to transfer from the DN is
- // the amount that the user wants (bytesToRead), plus the padding at
- // the beginning in order to chunk-align. Note that the DN may elect
- // to send more than this amount if the read starts/ends mid-chunk.
- this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
-
- this.firstChunkOffset = firstChunkOffset;
- lastChunkOffset = firstChunkOffset;
- lastChunkLen = -1;
-
- bytesPerChecksum = this.checksum.getBytesPerChecksum();
- checksumSize = this.checksum.getChecksumSize();
- }
-
- public static BlockReader newBlockReader(Socket sock, String file,
- ExtendedBlock block, Token blockToken,
- long startOffset, long len, int bufferSize) throws IOException {
- return newBlockReader(sock, file, block, blockToken, startOffset, len, bufferSize,
- true);
- }
-
- /** Java Doc required */
- public static BlockReader newBlockReader( Socket sock, String file,
- ExtendedBlock block,
- Token blockToken,
- long startOffset, long len,
- int bufferSize, boolean verifyChecksum)
- throws IOException {
- return newBlockReader(sock, file, block, blockToken, startOffset,
- len, bufferSize, verifyChecksum, "");
- }
+ int read(byte[] buf, int off, int len) throws IOException;
/**
- * Create a new BlockReader specifically to satisfy a read.
- * This method also sends the OP_READ_BLOCK request.
- *
- * @param sock An established Socket to the DN. The BlockReader will not close it normally
- * @param file File location
- * @param block The block object
- * @param blockToken The block token for security
- * @param startOffset The read offset, relative to block head
- * @param len The number of bytes to read
- * @param bufferSize The IO buffer size (not the client buffer size)
- * @param verifyChecksum Whether to verify checksum
- * @param clientName Client name
- * @return New BlockReader instance, or null on error.
+ * Skip the given number of bytes
*/
- public static BlockReader newBlockReader( Socket sock, String file,
- ExtendedBlock block,
- Token blockToken,
- long startOffset, long len,
- int bufferSize, boolean verifyChecksum,
- String clientName)
- throws IOException {
- // in and out will be closed when sock is closed (by the caller)
- final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
- NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT)));
- new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
-
- //
- // Get bytes in block, set streams
- //
+ long skip(long n) throws IOException;
- DataInputStream in = new DataInputStream(
- new BufferedInputStream(NetUtils.getInputStream(sock),
- bufferSize));
-
- BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
- vintPrefixed(in));
- if (status.getStatus() != Status.SUCCESS) {
- if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
- throw new InvalidBlockTokenException(
- "Got access token error for OP_READ_BLOCK, self="
- + sock.getLocalSocketAddress() + ", remote="
- + sock.getRemoteSocketAddress() + ", for file " + file
- + ", for pool " + block.getBlockPoolId() + " block "
- + block.getBlockId() + "_" + block.getGenerationStamp());
- } else {
- throw new IOException("Got error for OP_READ_BLOCK, self="
- + sock.getLocalSocketAddress() + ", remote="
- + sock.getRemoteSocketAddress() + ", for file " + file
- + ", for pool " + block.getBlockPoolId() + " block "
- + block.getBlockId() + "_" + block.getGenerationStamp());
- }
- }
- DataChecksum checksum = DataChecksum.newDataChecksum( in );
- //Warning when we get CHECKSUM_NULL?
-
- // Read the first chunk offset.
- long firstChunkOffset = in.readLong();
-
- if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
- firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
- throw new IOException("BlockReader: error in first chunk offset (" +
- firstChunkOffset + ") startOffset is " +
- startOffset + " for file " + file);
- }
+ /**
+ * Read a single byte, returning -1 at enf of stream.
+ */
+ int read() throws IOException;
- return new BlockReader(file, block.getBlockPoolId(), block.getBlockId(),
- in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
- }
+ void close() throws IOException;
- @Override
- public synchronized void close() throws IOException {
- startOffset = -1;
- checksum = null;
- if (dnSock != null) {
- dnSock.close();
- }
-
- // in will be closed when its Socket is closed.
- }
-
- /** kind of like readFully(). Only reads as much as possible.
+ /**
+ * kind of like readFully(). Only reads as much as possible.
* And allows use of protected readFully().
*/
- public int readAll(byte[] buf, int offset, int len) throws IOException {
- return readFully(this, buf, offset, len);
- }
+ int readAll(byte[] buf, int offset, int len) throws IOException;
/**
* Take the socket used to talk to the DN.
*/
- public Socket takeSocket() {
- assert hasSentStatusCode() :
- "BlockReader shouldn't give back sockets mid-read";
- Socket res = dnSock;
- dnSock = null;
- return res;
- }
+ Socket takeSocket();
/**
* Whether the BlockReader has reached the end of its input stream
* and successfully sent a status code back to the datanode.
*/
- public boolean hasSentStatusCode() {
- return sentStatusCode;
- }
+ boolean hasSentStatusCode();
- /**
- * When the reader reaches end of the read, it sends a status response
- * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
- * closing our connection (which we will re-open), but won't affect
- * data correctness.
- */
- void sendReadResult(Socket sock, Status statusCode) {
- assert !sentStatusCode : "already sent status code to " + sock;
- try {
- OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
-
- ClientReadStatusProto.newBuilder()
- .setStatus(statusCode)
- .build()
- .writeDelimitedTo(out);
-
- out.flush();
- sentStatusCode = true;
- } catch (IOException e) {
- // It's ok not to be able to send this. But something is probably wrong.
- LOG.info("Could not send read status (" + statusCode + ") to datanode " +
- sock.getInetAddress() + ": " + e.getMessage());
- }
- }
-
- /**
- * File name to print when accessing a block directly (from servlets)
- * @param s Address of the block location
- * @param poolId Block pool ID of the block
- * @param blockId Block ID of the block
- * @return string that has a file name for debug purposes
- */
- public static String getFileName(final InetSocketAddress s,
- final String poolId, final long blockId) {
- return s.toString() + ":" + poolId + ":" + blockId;
- }
}
diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
new file mode 100644
index 0000000000..52c6cc42ce
--- /dev/null
+++ b/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+
+/**
+ * Utility class to create BlockReader implementations.
+ */
+@InterfaceAudience.Private
+public class BlockReaderFactory {
+ public static BlockReader newBlockReader(Socket sock, String file,
+ ExtendedBlock block, Token blockToken,
+ long startOffset, long len, int bufferSize) throws IOException {
+ return newBlockReader(sock, file, block, blockToken, startOffset,
+ len, bufferSize, true, "");
+ }
+
+ /**
+ * Create a new BlockReader specifically to satisfy a read.
+ * This method also sends the OP_READ_BLOCK request.
+ *
+ * @param sock An established Socket to the DN. The BlockReader will not close it normally
+ * @param file File location
+ * @param block The block object
+ * @param blockToken The block token for security
+ * @param startOffset The read offset, relative to block head
+ * @param len The number of bytes to read
+ * @param bufferSize The IO buffer size (not the client buffer size)
+ * @param verifyChecksum Whether to verify checksum
+ * @param clientName Client name
+ * @return New BlockReader instance, or null on error.
+ */
+ public static BlockReader newBlockReader(
+ Socket sock, String file,
+ ExtendedBlock block,
+ Token blockToken,
+ long startOffset, long len,
+ int bufferSize, boolean verifyChecksum,
+ String clientName)
+ throws IOException {
+ return RemoteBlockReader.newBlockReader(
+ sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
+ }
+
+ /**
+ * File name to print when accessing a block directly (from servlets)
+ * @param s Address of the block location
+ * @param poolId Block pool ID of the block
+ * @param blockId Block ID of the block
+ * @return string that has a file name for debug purposes
+ */
+ public static String getFileName(final InetSocketAddress s,
+ final String poolId, final long blockId) {
+ return s.toString() + ":" + poolId + ":" + blockId;
+ }
+}
diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 883776d167..e446122efb 100644
--- a/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -776,7 +776,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
try {
// The OP_READ_BLOCK request is sent as we make the BlockReader
BlockReader reader =
- BlockReader.newBlockReader(sock, file, block,
+ BlockReaderFactory.newBlockReader(sock, file, block,
blockToken,
startOffset, len,
bufferSize, verifyChecksum,
diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
new file mode 100644
index 0000000000..51311f5216
--- /dev/null
+++ b/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -0,0 +1,516 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+
+/** This is a wrapper around connection to datanode
+ * and understands checksum, offset etc.
+ *
+ * Terminology:
+ *
+ * - block
+ * - The hdfs block, typically large (~64MB).
+ *
+ * - chunk
+ * - A block is divided into chunks, each comes with a checksum.
+ * We want transfers to be chunk-aligned, to be able to
+ * verify checksums.
+ *
+ * - packet
+ * - A grouping of chunks used for transport. It contains a
+ * header, followed by checksum data, followed by real data.
+ *
+ *
+ * Please see DataNode for the RPC specification.
+ */
+@InterfaceAudience.Private
+public class RemoteBlockReader extends FSInputChecker implements BlockReader {
+
+ Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
+ private DataInputStream in;
+ private DataChecksum checksum;
+
+ /** offset in block of the last chunk received */
+ private long lastChunkOffset = -1;
+ private long lastChunkLen = -1;
+ private long lastSeqNo = -1;
+
+ /** offset in block where reader wants to actually read */
+ private long startOffset;
+
+ /** offset in block of of first chunk - may be less than startOffset
+ if startOffset is not chunk-aligned */
+ private final long firstChunkOffset;
+
+ private int bytesPerChecksum;
+ private int checksumSize;
+
+ /**
+ * The total number of bytes we need to transfer from the DN.
+ * This is the amount that the user has requested plus some padding
+ * at the beginning so that the read can begin on a chunk boundary.
+ */
+ private final long bytesNeededToFinish;
+
+ private boolean eos = false;
+ private boolean sentStatusCode = false;
+
+ byte[] skipBuf = null;
+ ByteBuffer checksumBytes = null;
+ /** Amount of unread data in the current received packet */
+ int dataLeft = 0;
+
+ /* FSInputChecker interface */
+
+ /* same interface as inputStream java.io.InputStream#read()
+ * used by DFSInputStream#read()
+ * This violates one rule when there is a checksum error:
+ * "Read should not modify user buffer before successful read"
+ * because it first reads the data to user buffer and then checks
+ * the checksum.
+ */
+ @Override
+ public synchronized int read(byte[] buf, int off, int len)
+ throws IOException {
+
+ // This has to be set here, *before* the skip, since we can
+ // hit EOS during the skip, in the case that our entire read
+ // is smaller than the checksum chunk.
+ boolean eosBefore = eos;
+
+ //for the first read, skip the extra bytes at the front.
+ if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
+ // Skip these bytes. But don't call this.skip()!
+ int toSkip = (int)(startOffset - firstChunkOffset);
+ if ( skipBuf == null ) {
+ skipBuf = new byte[bytesPerChecksum];
+ }
+ if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
+ // should never happen
+ throw new IOException("Could not skip required number of bytes");
+ }
+ }
+
+ int nRead = super.read(buf, off, len);
+
+ // if eos was set in the previous read, send a status code to the DN
+ if (eos && !eosBefore && nRead >= 0) {
+ if (needChecksum()) {
+ sendReadResult(dnSock, Status.CHECKSUM_OK);
+ } else {
+ sendReadResult(dnSock, Status.SUCCESS);
+ }
+ }
+ return nRead;
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ /* How can we make sure we don't throw a ChecksumException, at least
+ * in majority of the cases?. This one throws. */
+ if ( skipBuf == null ) {
+ skipBuf = new byte[bytesPerChecksum];
+ }
+
+ long nSkipped = 0;
+ while ( nSkipped < n ) {
+ int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
+ int ret = read(skipBuf, 0, toSkip);
+ if ( ret <= 0 ) {
+ return nSkipped;
+ }
+ nSkipped += ret;
+ }
+ return nSkipped;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new IOException("read() is not expected to be invoked. " +
+ "Use read(buf, off, len) instead.");
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ /* Checksum errors are handled outside the BlockReader.
+ * DFSInputStream does not always call 'seekToNewSource'. In the
+ * case of pread(), it just tries a different replica without seeking.
+ */
+ return false;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ throw new IOException("Seek() is not supported in BlockInputChecker");
+ }
+
+ @Override
+ protected long getChunkPosition(long pos) {
+ throw new RuntimeException("getChunkPosition() is not supported, " +
+ "since seek is not required");
+ }
+
+ /**
+ * Makes sure that checksumBytes has enough capacity
+ * and limit is set to the number of checksum bytes needed
+ * to be read.
+ */
+ private void adjustChecksumBytes(int dataLen) {
+ int requiredSize =
+ ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
+ if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
+ checksumBytes = ByteBuffer.wrap(new byte[requiredSize]);
+ } else {
+ checksumBytes.clear();
+ }
+ checksumBytes.limit(requiredSize);
+ }
+
+ @Override
+ protected synchronized int readChunk(long pos, byte[] buf, int offset,
+ int len, byte[] checksumBuf)
+ throws IOException {
+ // Read one chunk.
+ if (eos) {
+ // Already hit EOF
+ return -1;
+ }
+
+ // Read one DATA_CHUNK.
+ long chunkOffset = lastChunkOffset;
+ if ( lastChunkLen > 0 ) {
+ chunkOffset += lastChunkLen;
+ }
+
+ // pos is relative to the start of the first chunk of the read.
+ // chunkOffset is relative to the start of the block.
+ // This makes sure that the read passed from FSInputChecker is the
+ // for the same chunk we expect to be reading from the DN.
+ if ( (pos + firstChunkOffset) != chunkOffset ) {
+ throw new IOException("Mismatch in pos : " + pos + " + " +
+ firstChunkOffset + " != " + chunkOffset);
+ }
+
+ // Read next packet if the previous packet has been read completely.
+ if (dataLeft <= 0) {
+ //Read packet headers.
+ PacketHeader header = new PacketHeader();
+ header.readFields(in);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DFSClient readChunk got header " + header);
+ }
+
+ // Sanity check the lengths
+ if (!header.sanityCheck(lastSeqNo)) {
+ throw new IOException("BlockReader: error in packet header " +
+ header);
+ }
+
+ lastSeqNo = header.getSeqno();
+ dataLeft = header.getDataLen();
+ adjustChecksumBytes(header.getDataLen());
+ if (header.getDataLen() > 0) {
+ IOUtils.readFully(in, checksumBytes.array(), 0,
+ checksumBytes.limit());
+ }
+ }
+
+ // Sanity checks
+ assert len >= bytesPerChecksum;
+ assert checksum != null;
+ assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
+
+
+ int checksumsToRead, bytesToRead;
+
+ if (checksumSize > 0) {
+
+ // How many chunks left in our packet - this is a ceiling
+ // since we may have a partial chunk at the end of the file
+ int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
+
+ // How many chunks we can fit in databuffer
+ // - note this is a floor since we always read full chunks
+ int chunksCanFit = Math.min(len / bytesPerChecksum,
+ checksumBuf.length / checksumSize);
+
+ // How many chunks should we read
+ checksumsToRead = Math.min(chunksLeft, chunksCanFit);
+ // How many bytes should we actually read
+ bytesToRead = Math.min(
+ checksumsToRead * bytesPerChecksum, // full chunks
+ dataLeft); // in case we have a partial
+ } else {
+ // no checksum
+ bytesToRead = Math.min(dataLeft, len);
+ checksumsToRead = 0;
+ }
+
+ if ( bytesToRead > 0 ) {
+ // Assert we have enough space
+ assert bytesToRead <= len;
+ assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
+ assert checksumBuf.length >= checksumSize * checksumsToRead;
+ IOUtils.readFully(in, buf, offset, bytesToRead);
+ checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
+ }
+
+ dataLeft -= bytesToRead;
+ assert dataLeft >= 0;
+
+ lastChunkOffset = chunkOffset;
+ lastChunkLen = bytesToRead;
+
+ // If there's no data left in the current packet after satisfying
+ // this read, and we have satisfied the client read, we expect
+ // an empty packet header from the DN to signify this.
+ // Note that pos + bytesToRead may in fact be greater since the
+ // DN finishes off the entire last chunk.
+ if (dataLeft == 0 &&
+ pos + bytesToRead >= bytesNeededToFinish) {
+
+ // Read header
+ PacketHeader hdr = new PacketHeader();
+ hdr.readFields(in);
+
+ if (!hdr.isLastPacketInBlock() ||
+ hdr.getDataLen() != 0) {
+ throw new IOException("Expected empty end-of-read packet! Header: " +
+ hdr);
+ }
+
+ eos = true;
+ }
+
+ if ( bytesToRead == 0 ) {
+ return -1;
+ }
+
+ return bytesToRead;
+ }
+
+ private RemoteBlockReader(String file, String bpid, long blockId,
+ DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
+ long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
+ // Path is used only for printing block and file information in debug
+ super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/,
+ 1, verifyChecksum,
+ checksum.getChecksumSize() > 0? checksum : null,
+ checksum.getBytesPerChecksum(),
+ checksum.getChecksumSize());
+
+ this.dnSock = dnSock;
+ this.in = in;
+ this.checksum = checksum;
+ this.startOffset = Math.max( startOffset, 0 );
+
+ // The total number of bytes that we need to transfer from the DN is
+ // the amount that the user wants (bytesToRead), plus the padding at
+ // the beginning in order to chunk-align. Note that the DN may elect
+ // to send more than this amount if the read starts/ends mid-chunk.
+ this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+
+ this.firstChunkOffset = firstChunkOffset;
+ lastChunkOffset = firstChunkOffset;
+ lastChunkLen = -1;
+
+ bytesPerChecksum = this.checksum.getBytesPerChecksum();
+ checksumSize = this.checksum.getChecksumSize();
+ }
+
+ public static RemoteBlockReader newBlockReader(Socket sock, String file,
+ ExtendedBlock block, Token blockToken,
+ long startOffset, long len, int bufferSize) throws IOException {
+ return newBlockReader(sock, file, block, blockToken, startOffset,
+ len, bufferSize, true, "");
+ }
+
+ /**
+ * Create a new BlockReader specifically to satisfy a read.
+ * This method also sends the OP_READ_BLOCK request.
+ *
+ * @param sock An established Socket to the DN. The BlockReader will not close it normally
+ * @param file File location
+ * @param block The block object
+ * @param blockToken The block token for security
+ * @param startOffset The read offset, relative to block head
+ * @param len The number of bytes to read
+ * @param bufferSize The IO buffer size (not the client buffer size)
+ * @param verifyChecksum Whether to verify checksum
+ * @param clientName Client name
+ * @return New BlockReader instance, or null on error.
+ */
+ public static RemoteBlockReader newBlockReader( Socket sock, String file,
+ ExtendedBlock block,
+ Token blockToken,
+ long startOffset, long len,
+ int bufferSize, boolean verifyChecksum,
+ String clientName)
+ throws IOException {
+ // in and out will be closed when sock is closed (by the caller)
+ final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+ NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT)));
+ new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
+
+ //
+ // Get bytes in block, set streams
+ //
+
+ DataInputStream in = new DataInputStream(
+ new BufferedInputStream(NetUtils.getInputStream(sock),
+ bufferSize));
+
+ BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+ vintPrefixed(in));
+ checkSuccess(status, sock, block, file);
+ DataChecksum checksum = DataChecksum.newDataChecksum( in );
+ //Warning when we get CHECKSUM_NULL?
+
+ // Read the first chunk offset.
+ long firstChunkOffset = in.readLong();
+
+ if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+ firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
+ throw new IOException("BlockReader: error in first chunk offset (" +
+ firstChunkOffset + ") startOffset is " +
+ startOffset + " for file " + file);
+ }
+
+ return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
+ in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
+ }
+
+ private static void checkSuccess(
+ BlockOpResponseProto status, Socket sock,
+ ExtendedBlock block, String file)
+ throws IOException {
+ if (status.getStatus() != Status.SUCCESS) {
+ if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+ throw new InvalidBlockTokenException(
+ "Got access token error for OP_READ_BLOCK, self="
+ + sock.getLocalSocketAddress() + ", remote="
+ + sock.getRemoteSocketAddress() + ", for file " + file
+ + ", for pool " + block.getBlockPoolId() + " block "
+ + block.getBlockId() + "_" + block.getGenerationStamp());
+ } else {
+ throw new IOException("Got error for OP_READ_BLOCK, self="
+ + sock.getLocalSocketAddress() + ", remote="
+ + sock.getRemoteSocketAddress() + ", for file " + file
+ + ", for pool " + block.getBlockPoolId() + " block "
+ + block.getBlockId() + "_" + block.getGenerationStamp());
+ }
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ startOffset = -1;
+ checksum = null;
+ if (dnSock != null) {
+ dnSock.close();
+ }
+
+ // in will be closed when its Socket is closed.
+ }
+
+ @Override
+ public int readAll(byte[] buf, int offset, int len) throws IOException {
+ return readFully(this, buf, offset, len);
+ }
+
+ @Override
+ public Socket takeSocket() {
+ assert hasSentStatusCode() :
+ "BlockReader shouldn't give back sockets mid-read";
+ Socket res = dnSock;
+ dnSock = null;
+ return res;
+ }
+
+ @Override
+ public boolean hasSentStatusCode() {
+ return sentStatusCode;
+ }
+
+ /**
+ * When the reader reaches end of the read, it sends a status response
+ * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+ * closing our connection (which we will re-open), but won't affect
+ * data correctness.
+ */
+ void sendReadResult(Socket sock, Status statusCode) {
+ assert !sentStatusCode : "already sent status code to " + sock;
+ try {
+ OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
+
+ ClientReadStatusProto.newBuilder()
+ .setStatus(statusCode)
+ .build()
+ .writeDelimitedTo(out);
+
+ out.flush();
+ sentStatusCode = true;
+ } catch (IOException e) {
+ // It's ok not to be able to send this. But something is probably wrong.
+ LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+ sock.getInetAddress() + ": " + e.getMessage());
+ }
+ }
+
+ /**
+ * File name to print when accessing a block directly (from servlets)
+ * @param s Address of the block location
+ * @param poolId Block pool ID of the block
+ * @param blockId Block ID of the block
+ * @return string that has a file name for debug purposes
+ */
+ public static String getFileName(final InetSocketAddress s,
+ final String poolId, final long blockId) {
+ return s.toString() + ":" + poolId + ":" + blockId;
+ }
+}
diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
index cf2452992f..81f182d6ca 100644
--- a/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
+++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -195,8 +196,8 @@ public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
// Use the block name for file name.
int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
- String file = BlockReader.getFileName(addr, poolId, blockId);
- BlockReader blockReader = BlockReader.newBlockReader(s, file,
+ String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
+ BlockReader blockReader = BlockReaderFactory.newBlockReader(s, file,
new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
offsetIntoBlock, amtToRead, bufferSize);
diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index f9ba6d4a41..26376d476f 100644
--- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
@@ -504,9 +505,9 @@ private void copyBlock(DFSClient dfs, LocatedBlock lblock,
s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
- String file = BlockReader.getFileName(targetAddr, block.getBlockPoolId(),
+ String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
block.getBlockId());
- blockReader = BlockReader.newBlockReader(s, file, block, lblock
+ blockReader = BlockReaderFactory.newBlockReader(s, file, block, lblock
.getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096));
} catch (IOException ex) {
diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
index 64e4998c72..25585cecbb 100644
--- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
+++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
@@ -143,7 +143,7 @@ public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToR
sock.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
- return BlockReader.newBlockReader(
+ return BlockReaderFactory.newBlockReader(
sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
testBlock.getBlockToken(),
offset, lenToRead,
diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
index 639d7aff6d..8315e1f5d0 100644
--- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
+++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
@@ -20,6 +20,8 @@
import java.util.List;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
+import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.fs.Path;
@@ -52,7 +54,8 @@ public static void setupCluster() throws Exception {
*/
@Test
public void testBlockVerification() throws Exception {
- BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+ RemoteBlockReader reader = (RemoteBlockReader)spy(
+ util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
reader.close();
@@ -63,7 +66,8 @@ public void testBlockVerification() throws Exception {
*/
@Test
public void testIncompleteRead() throws Exception {
- BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+ RemoteBlockReader reader = (RemoteBlockReader)spy(
+ util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
// We asked the blockreader for the whole file, and only read
@@ -80,7 +84,8 @@ public void testIncompleteRead() throws Exception {
@Test
public void testCompletePartialRead() throws Exception {
// Ask for half the file
- BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
+ RemoteBlockReader reader = (RemoteBlockReader)spy(
+ util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
// And read half the file
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
@@ -99,7 +104,8 @@ public void testUnalignedReads() throws Exception {
for (int length : lengths) {
DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
" len=" + length);
- BlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length));
+ RemoteBlockReader reader = (RemoteBlockReader)spy(
+ util.getBlockReader(testBlock, startOffset, length));
util.readAndCheckEOS(reader, length, true);
verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
reader.close();
diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
index 311b724048..a7d11c165a 100644
--- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
+++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
@@ -28,6 +28,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.SocketCache;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -72,13 +76,13 @@ public class TestConnCache {
* It verifies that all invocation to DFSInputStream.getBlockReader()
* use the same socket.
*/
- private class MockGetBlockReader implements Answer {
- public BlockReader reader = null;
+ private class MockGetBlockReader implements Answer {
+ public RemoteBlockReader reader = null;
private Socket sock = null;
- public BlockReader answer(InvocationOnMock invocation) throws Throwable {
- BlockReader prevReader = reader;
- reader = (BlockReader) invocation.callRealMethod();
+ public RemoteBlockReader answer(InvocationOnMock invocation) throws Throwable {
+ RemoteBlockReader prevReader = reader;
+ reader = (RemoteBlockReader) invocation.callRealMethod();
if (sock == null) {
sock = reader.dnSock;
} else if (prevReader != null && prevReader.hasSentStatusCode()) {
diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
index c7f0986981..25a486b166 100644
--- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
+++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -139,9 +140,9 @@ private static void tryRead(Configuration conf, LocatedBlock lblock,
s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
- String file = BlockReader.getFileName(targetAddr,
+ String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid", block.getBlockId());
- blockReader = BlockReader.newBlockReader(s, file, block,
+ blockReader = BlockReaderFactory.newBlockReader(s, file, block,
lblock.getBlockToken(), 0, -1,
conf.getInt("io.file.buffer.size", 4096));
diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index 9b3789c00e..208df40795 100644
--- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -267,11 +268,11 @@ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock)
s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
- String file = BlockReader.getFileName(targetAddr,
+ String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid",
block.getBlockId());
BlockReader blockReader =
- BlockReader.newBlockReader(s, file, block, lblock
+ BlockReaderFactory.newBlockReader(s, file, block, lblock
.getBlockToken(), 0, -1, 4096);
// nothing - if it fails - it will throw and exception