From dd86860633d2ed64705b669a75bf318442ed6225 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Thu, 18 Aug 2011 03:02:19 +0000 Subject: [PATCH] HDFS-2260. Refactor BlockReader into an interface and implementation. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1159004 13f79535-47bb-0310-9956-ffa450edef68 --- hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/BlockReader.java | 495 +---------------- .../hadoop/hdfs/BlockReaderFactory.java | 80 +++ .../apache/hadoop/hdfs/DFSInputStream.java | 2 +- .../apache/hadoop/hdfs/RemoteBlockReader.java | 516 ++++++++++++++++++ .../hadoop/hdfs/server/common/JspHelper.java | 5 +- .../hdfs/server/namenode/NamenodeFsck.java | 5 +- .../hadoop/hdfs/BlockReaderTestUtil.java | 2 +- .../hdfs/TestClientBlockVerification.java | 14 +- .../org/apache/hadoop/hdfs/TestConnCache.java | 14 +- .../TestBlockTokenWithDFS.java | 5 +- .../datanode/TestDataNodeVolumeFailure.java | 5 +- 12 files changed, 651 insertions(+), 495 deletions(-) create mode 100644 hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderFactory.java create mode 100644 hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java diff --git a/hdfs/CHANGES.txt b/hdfs/CHANGES.txt index 449b97452b..7ee73cab18 100644 --- a/hdfs/CHANGES.txt +++ b/hdfs/CHANGES.txt @@ -668,6 +668,9 @@ Trunk (unreleased changes) HDFS-2265. Remove unnecessary BlockTokenSecretManager fields/methods from BlockManager. (szetszwo) + HDFS-2260. Refactor BlockReader into an interface and implementation. + (todd) + OPTIMIZATIONS HDFS-1458. Improve checkpoint performance by avoiding unnecessary image diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java b/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java index 791f5cd800..3ebbeec68b 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java @@ -17,95 +17,18 @@ */ package org.apache.hadoop.hdfs; -import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; -import java.io.OutputStream; -import java.net.InetSocketAddress; import java.net.Socket; -import java.nio.ByteBuffer; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.FSInputChecker; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; -import org.apache.hadoop.hdfs.server.common.HdfsConstants; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; - -/** This is a wrapper around connection to datanode - * and understands checksum, offset etc. - * - * Terminology: - *
- *
block
- *
The hdfs block, typically large (~64MB). - *
- *
chunk
- *
A block is divided into chunks, each comes with a checksum. - * We want transfers to be chunk-aligned, to be able to - * verify checksums. - *
- *
packet
- *
A grouping of chunks used for transport. It contains a - * header, followed by checksum data, followed by real data. - *
- *
- * Please see DataNode for the RPC specification. +/** + * A BlockReader is responsible for reading a single block + * from a single datanode. */ -@InterfaceAudience.Private -public class BlockReader extends FSInputChecker { +public interface BlockReader extends Seekable, PositionedReadable { - Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. - private DataInputStream in; - private DataChecksum checksum; - - /** offset in block of the last chunk received */ - private long lastChunkOffset = -1; - private long lastChunkLen = -1; - private long lastSeqNo = -1; - - /** offset in block where reader wants to actually read */ - private long startOffset; - - /** offset in block of of first chunk - may be less than startOffset - if startOffset is not chunk-aligned */ - private final long firstChunkOffset; - - private int bytesPerChecksum; - private int checksumSize; - - /** - * The total number of bytes we need to transfer from the DN. - * This is the amount that the user has requested plus some padding - * at the beginning so that the read can begin on a chunk boundary. - */ - private final long bytesNeededToFinish; - - private boolean eos = false; - private boolean sentStatusCode = false; - - byte[] skipBuf = null; - ByteBuffer checksumBytes = null; - /** Amount of unread data in the current received packet */ - int dataLeft = 0; - - /* FSInputChecker interface */ - /* same interface as inputStream java.io.InputStream#read() * used by DFSInputStream#read() * This violates one rule when there is a checksum error: @@ -113,415 +36,35 @@ public class BlockReader extends FSInputChecker { * because it first reads the data to user buffer and then checks * the checksum. */ - @Override - public synchronized int read(byte[] buf, int off, int len) - throws IOException { - - // This has to be set here, *before* the skip, since we can - // hit EOS during the skip, in the case that our entire read - // is smaller than the checksum chunk. - boolean eosBefore = eos; - - //for the first read, skip the extra bytes at the front. - if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) { - // Skip these bytes. But don't call this.skip()! - int toSkip = (int)(startOffset - firstChunkOffset); - if ( skipBuf == null ) { - skipBuf = new byte[bytesPerChecksum]; - } - if ( super.read(skipBuf, 0, toSkip) != toSkip ) { - // should never happen - throw new IOException("Could not skip required number of bytes"); - } - } - - int nRead = super.read(buf, off, len); - - // if eos was set in the previous read, send a status code to the DN - if (eos && !eosBefore && nRead >= 0) { - if (needChecksum()) { - sendReadResult(dnSock, Status.CHECKSUM_OK); - } else { - sendReadResult(dnSock, Status.SUCCESS); - } - } - return nRead; - } - - @Override - public synchronized long skip(long n) throws IOException { - /* How can we make sure we don't throw a ChecksumException, at least - * in majority of the cases?. This one throws. */ - if ( skipBuf == null ) { - skipBuf = new byte[bytesPerChecksum]; - } - - long nSkipped = 0; - while ( nSkipped < n ) { - int toSkip = (int)Math.min(n-nSkipped, skipBuf.length); - int ret = read(skipBuf, 0, toSkip); - if ( ret <= 0 ) { - return nSkipped; - } - nSkipped += ret; - } - return nSkipped; - } - - @Override - public int read() throws IOException { - throw new IOException("read() is not expected to be invoked. " + - "Use read(buf, off, len) instead."); - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - /* Checksum errors are handled outside the BlockReader. - * DFSInputStream does not always call 'seekToNewSource'. In the - * case of pread(), it just tries a different replica without seeking. - */ - return false; - } - - @Override - public void seek(long pos) throws IOException { - throw new IOException("Seek() is not supported in BlockInputChecker"); - } - - @Override - protected long getChunkPosition(long pos) { - throw new RuntimeException("getChunkPosition() is not supported, " + - "since seek is not required"); - } - - /** - * Makes sure that checksumBytes has enough capacity - * and limit is set to the number of checksum bytes needed - * to be read. - */ - private void adjustChecksumBytes(int dataLen) { - int requiredSize = - ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize; - if (checksumBytes == null || requiredSize > checksumBytes.capacity()) { - checksumBytes = ByteBuffer.wrap(new byte[requiredSize]); - } else { - checksumBytes.clear(); - } - checksumBytes.limit(requiredSize); - } - - @Override - protected synchronized int readChunk(long pos, byte[] buf, int offset, - int len, byte[] checksumBuf) - throws IOException { - // Read one chunk. - if (eos) { - // Already hit EOF - return -1; - } - - // Read one DATA_CHUNK. - long chunkOffset = lastChunkOffset; - if ( lastChunkLen > 0 ) { - chunkOffset += lastChunkLen; - } - - // pos is relative to the start of the first chunk of the read. - // chunkOffset is relative to the start of the block. - // This makes sure that the read passed from FSInputChecker is the - // for the same chunk we expect to be reading from the DN. - if ( (pos + firstChunkOffset) != chunkOffset ) { - throw new IOException("Mismatch in pos : " + pos + " + " + - firstChunkOffset + " != " + chunkOffset); - } - - // Read next packet if the previous packet has been read completely. - if (dataLeft <= 0) { - //Read packet headers. - PacketHeader header = new PacketHeader(); - header.readFields(in); - - if (LOG.isDebugEnabled()) { - LOG.debug("DFSClient readChunk got header " + header); - } - - // Sanity check the lengths - if (!header.sanityCheck(lastSeqNo)) { - throw new IOException("BlockReader: error in packet header " + - header); - } - - lastSeqNo = header.getSeqno(); - dataLeft = header.getDataLen(); - adjustChecksumBytes(header.getDataLen()); - if (header.getDataLen() > 0) { - IOUtils.readFully(in, checksumBytes.array(), 0, - checksumBytes.limit()); - } - } - - // Sanity checks - assert len >= bytesPerChecksum; - assert checksum != null; - assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0); - - - int checksumsToRead, bytesToRead; - - if (checksumSize > 0) { - - // How many chunks left in our packet - this is a ceiling - // since we may have a partial chunk at the end of the file - int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1; - - // How many chunks we can fit in databuffer - // - note this is a floor since we always read full chunks - int chunksCanFit = Math.min(len / bytesPerChecksum, - checksumBuf.length / checksumSize); - - // How many chunks should we read - checksumsToRead = Math.min(chunksLeft, chunksCanFit); - // How many bytes should we actually read - bytesToRead = Math.min( - checksumsToRead * bytesPerChecksum, // full chunks - dataLeft); // in case we have a partial - } else { - // no checksum - bytesToRead = Math.min(dataLeft, len); - checksumsToRead = 0; - } - - if ( bytesToRead > 0 ) { - // Assert we have enough space - assert bytesToRead <= len; - assert checksumBytes.remaining() >= checksumSize * checksumsToRead; - assert checksumBuf.length >= checksumSize * checksumsToRead; - IOUtils.readFully(in, buf, offset, bytesToRead); - checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead); - } - - dataLeft -= bytesToRead; - assert dataLeft >= 0; - - lastChunkOffset = chunkOffset; - lastChunkLen = bytesToRead; - - // If there's no data left in the current packet after satisfying - // this read, and we have satisfied the client read, we expect - // an empty packet header from the DN to signify this. - // Note that pos + bytesToRead may in fact be greater since the - // DN finishes off the entire last chunk. - if (dataLeft == 0 && - pos + bytesToRead >= bytesNeededToFinish) { - - // Read header - PacketHeader hdr = new PacketHeader(); - hdr.readFields(in); - - if (!hdr.isLastPacketInBlock() || - hdr.getDataLen() != 0) { - throw new IOException("Expected empty end-of-read packet! Header: " + - hdr); - } - - eos = true; - } - - if ( bytesToRead == 0 ) { - return -1; - } - - return bytesToRead; - } - - private BlockReader(String file, String bpid, long blockId, - DataInputStream in, DataChecksum checksum, boolean verifyChecksum, - long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) { - // Path is used only for printing block and file information in debug - super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/, - 1, verifyChecksum, - checksum.getChecksumSize() > 0? checksum : null, - checksum.getBytesPerChecksum(), - checksum.getChecksumSize()); - - this.dnSock = dnSock; - this.in = in; - this.checksum = checksum; - this.startOffset = Math.max( startOffset, 0 ); - - // The total number of bytes that we need to transfer from the DN is - // the amount that the user wants (bytesToRead), plus the padding at - // the beginning in order to chunk-align. Note that the DN may elect - // to send more than this amount if the read starts/ends mid-chunk. - this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); - - this.firstChunkOffset = firstChunkOffset; - lastChunkOffset = firstChunkOffset; - lastChunkLen = -1; - - bytesPerChecksum = this.checksum.getBytesPerChecksum(); - checksumSize = this.checksum.getChecksumSize(); - } - - public static BlockReader newBlockReader(Socket sock, String file, - ExtendedBlock block, Token blockToken, - long startOffset, long len, int bufferSize) throws IOException { - return newBlockReader(sock, file, block, blockToken, startOffset, len, bufferSize, - true); - } - - /** Java Doc required */ - public static BlockReader newBlockReader( Socket sock, String file, - ExtendedBlock block, - Token blockToken, - long startOffset, long len, - int bufferSize, boolean verifyChecksum) - throws IOException { - return newBlockReader(sock, file, block, blockToken, startOffset, - len, bufferSize, verifyChecksum, ""); - } + int read(byte[] buf, int off, int len) throws IOException; /** - * Create a new BlockReader specifically to satisfy a read. - * This method also sends the OP_READ_BLOCK request. - * - * @param sock An established Socket to the DN. The BlockReader will not close it normally - * @param file File location - * @param block The block object - * @param blockToken The block token for security - * @param startOffset The read offset, relative to block head - * @param len The number of bytes to read - * @param bufferSize The IO buffer size (not the client buffer size) - * @param verifyChecksum Whether to verify checksum - * @param clientName Client name - * @return New BlockReader instance, or null on error. + * Skip the given number of bytes */ - public static BlockReader newBlockReader( Socket sock, String file, - ExtendedBlock block, - Token blockToken, - long startOffset, long len, - int bufferSize, boolean verifyChecksum, - String clientName) - throws IOException { - // in and out will be closed when sock is closed (by the caller) - final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( - NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT))); - new Sender(out).readBlock(block, blockToken, clientName, startOffset, len); - - // - // Get bytes in block, set streams - // + long skip(long n) throws IOException; - DataInputStream in = new DataInputStream( - new BufferedInputStream(NetUtils.getInputStream(sock), - bufferSize)); - - BlockOpResponseProto status = BlockOpResponseProto.parseFrom( - vintPrefixed(in)); - if (status.getStatus() != Status.SUCCESS) { - if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) { - throw new InvalidBlockTokenException( - "Got access token error for OP_READ_BLOCK, self=" - + sock.getLocalSocketAddress() + ", remote=" - + sock.getRemoteSocketAddress() + ", for file " + file - + ", for pool " + block.getBlockPoolId() + " block " - + block.getBlockId() + "_" + block.getGenerationStamp()); - } else { - throw new IOException("Got error for OP_READ_BLOCK, self=" - + sock.getLocalSocketAddress() + ", remote=" - + sock.getRemoteSocketAddress() + ", for file " + file - + ", for pool " + block.getBlockPoolId() + " block " - + block.getBlockId() + "_" + block.getGenerationStamp()); - } - } - DataChecksum checksum = DataChecksum.newDataChecksum( in ); - //Warning when we get CHECKSUM_NULL? - - // Read the first chunk offset. - long firstChunkOffset = in.readLong(); - - if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || - firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) { - throw new IOException("BlockReader: error in first chunk offset (" + - firstChunkOffset + ") startOffset is " + - startOffset + " for file " + file); - } + /** + * Read a single byte, returning -1 at enf of stream. + */ + int read() throws IOException; - return new BlockReader(file, block.getBlockPoolId(), block.getBlockId(), - in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock); - } + void close() throws IOException; - @Override - public synchronized void close() throws IOException { - startOffset = -1; - checksum = null; - if (dnSock != null) { - dnSock.close(); - } - - // in will be closed when its Socket is closed. - } - - /** kind of like readFully(). Only reads as much as possible. + /** + * kind of like readFully(). Only reads as much as possible. * And allows use of protected readFully(). */ - public int readAll(byte[] buf, int offset, int len) throws IOException { - return readFully(this, buf, offset, len); - } + int readAll(byte[] buf, int offset, int len) throws IOException; /** * Take the socket used to talk to the DN. */ - public Socket takeSocket() { - assert hasSentStatusCode() : - "BlockReader shouldn't give back sockets mid-read"; - Socket res = dnSock; - dnSock = null; - return res; - } + Socket takeSocket(); /** * Whether the BlockReader has reached the end of its input stream * and successfully sent a status code back to the datanode. */ - public boolean hasSentStatusCode() { - return sentStatusCode; - } + boolean hasSentStatusCode(); - /** - * When the reader reaches end of the read, it sends a status response - * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN - * closing our connection (which we will re-open), but won't affect - * data correctness. - */ - void sendReadResult(Socket sock, Status statusCode) { - assert !sentStatusCode : "already sent status code to " + sock; - try { - OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT); - - ClientReadStatusProto.newBuilder() - .setStatus(statusCode) - .build() - .writeDelimitedTo(out); - - out.flush(); - sentStatusCode = true; - } catch (IOException e) { - // It's ok not to be able to send this. But something is probably wrong. - LOG.info("Could not send read status (" + statusCode + ") to datanode " + - sock.getInetAddress() + ": " + e.getMessage()); - } - } - - /** - * File name to print when accessing a block directly (from servlets) - * @param s Address of the block location - * @param poolId Block pool ID of the block - * @param blockId Block ID of the block - * @return string that has a file name for debug purposes - */ - public static String getFileName(final InetSocketAddress s, - final String poolId, final long blockId) { - return s.toString() + ":" + poolId + ":" + blockId; - } } diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderFactory.java new file mode 100644 index 0000000000..52c6cc42ce --- /dev/null +++ b/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; + + +/** + * Utility class to create BlockReader implementations. + */ +@InterfaceAudience.Private +public class BlockReaderFactory { + public static BlockReader newBlockReader(Socket sock, String file, + ExtendedBlock block, Token blockToken, + long startOffset, long len, int bufferSize) throws IOException { + return newBlockReader(sock, file, block, blockToken, startOffset, + len, bufferSize, true, ""); + } + + /** + * Create a new BlockReader specifically to satisfy a read. + * This method also sends the OP_READ_BLOCK request. + * + * @param sock An established Socket to the DN. The BlockReader will not close it normally + * @param file File location + * @param block The block object + * @param blockToken The block token for security + * @param startOffset The read offset, relative to block head + * @param len The number of bytes to read + * @param bufferSize The IO buffer size (not the client buffer size) + * @param verifyChecksum Whether to verify checksum + * @param clientName Client name + * @return New BlockReader instance, or null on error. + */ + public static BlockReader newBlockReader( + Socket sock, String file, + ExtendedBlock block, + Token blockToken, + long startOffset, long len, + int bufferSize, boolean verifyChecksum, + String clientName) + throws IOException { + return RemoteBlockReader.newBlockReader( + sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName); + } + + /** + * File name to print when accessing a block directly (from servlets) + * @param s Address of the block location + * @param poolId Block pool ID of the block + * @param blockId Block ID of the block + * @return string that has a file name for debug purposes + */ + public static String getFileName(final InetSocketAddress s, + final String poolId, final long blockId) { + return s.toString() + ":" + poolId + ":" + blockId; + } +} diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java index 883776d167..e446122efb 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -776,7 +776,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr, try { // The OP_READ_BLOCK request is sent as we make the BlockReader BlockReader reader = - BlockReader.newBlockReader(sock, file, block, + BlockReaderFactory.newBlockReader(sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java new file mode 100644 index 0000000000..51311f5216 --- /dev/null +++ b/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -0,0 +1,516 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FSInputChecker; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.hdfs.server.common.HdfsConstants; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; + + +/** This is a wrapper around connection to datanode + * and understands checksum, offset etc. + * + * Terminology: + *
+ *
block
+ *
The hdfs block, typically large (~64MB). + *
+ *
chunk
+ *
A block is divided into chunks, each comes with a checksum. + * We want transfers to be chunk-aligned, to be able to + * verify checksums. + *
+ *
packet
+ *
A grouping of chunks used for transport. It contains a + * header, followed by checksum data, followed by real data. + *
+ *
+ * Please see DataNode for the RPC specification. + */ +@InterfaceAudience.Private +public class RemoteBlockReader extends FSInputChecker implements BlockReader { + + Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. + private DataInputStream in; + private DataChecksum checksum; + + /** offset in block of the last chunk received */ + private long lastChunkOffset = -1; + private long lastChunkLen = -1; + private long lastSeqNo = -1; + + /** offset in block where reader wants to actually read */ + private long startOffset; + + /** offset in block of of first chunk - may be less than startOffset + if startOffset is not chunk-aligned */ + private final long firstChunkOffset; + + private int bytesPerChecksum; + private int checksumSize; + + /** + * The total number of bytes we need to transfer from the DN. + * This is the amount that the user has requested plus some padding + * at the beginning so that the read can begin on a chunk boundary. + */ + private final long bytesNeededToFinish; + + private boolean eos = false; + private boolean sentStatusCode = false; + + byte[] skipBuf = null; + ByteBuffer checksumBytes = null; + /** Amount of unread data in the current received packet */ + int dataLeft = 0; + + /* FSInputChecker interface */ + + /* same interface as inputStream java.io.InputStream#read() + * used by DFSInputStream#read() + * This violates one rule when there is a checksum error: + * "Read should not modify user buffer before successful read" + * because it first reads the data to user buffer and then checks + * the checksum. + */ + @Override + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + + // This has to be set here, *before* the skip, since we can + // hit EOS during the skip, in the case that our entire read + // is smaller than the checksum chunk. + boolean eosBefore = eos; + + //for the first read, skip the extra bytes at the front. + if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) { + // Skip these bytes. But don't call this.skip()! + int toSkip = (int)(startOffset - firstChunkOffset); + if ( skipBuf == null ) { + skipBuf = new byte[bytesPerChecksum]; + } + if ( super.read(skipBuf, 0, toSkip) != toSkip ) { + // should never happen + throw new IOException("Could not skip required number of bytes"); + } + } + + int nRead = super.read(buf, off, len); + + // if eos was set in the previous read, send a status code to the DN + if (eos && !eosBefore && nRead >= 0) { + if (needChecksum()) { + sendReadResult(dnSock, Status.CHECKSUM_OK); + } else { + sendReadResult(dnSock, Status.SUCCESS); + } + } + return nRead; + } + + @Override + public synchronized long skip(long n) throws IOException { + /* How can we make sure we don't throw a ChecksumException, at least + * in majority of the cases?. This one throws. */ + if ( skipBuf == null ) { + skipBuf = new byte[bytesPerChecksum]; + } + + long nSkipped = 0; + while ( nSkipped < n ) { + int toSkip = (int)Math.min(n-nSkipped, skipBuf.length); + int ret = read(skipBuf, 0, toSkip); + if ( ret <= 0 ) { + return nSkipped; + } + nSkipped += ret; + } + return nSkipped; + } + + @Override + public int read() throws IOException { + throw new IOException("read() is not expected to be invoked. " + + "Use read(buf, off, len) instead."); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + /* Checksum errors are handled outside the BlockReader. + * DFSInputStream does not always call 'seekToNewSource'. In the + * case of pread(), it just tries a different replica without seeking. + */ + return false; + } + + @Override + public void seek(long pos) throws IOException { + throw new IOException("Seek() is not supported in BlockInputChecker"); + } + + @Override + protected long getChunkPosition(long pos) { + throw new RuntimeException("getChunkPosition() is not supported, " + + "since seek is not required"); + } + + /** + * Makes sure that checksumBytes has enough capacity + * and limit is set to the number of checksum bytes needed + * to be read. + */ + private void adjustChecksumBytes(int dataLen) { + int requiredSize = + ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize; + if (checksumBytes == null || requiredSize > checksumBytes.capacity()) { + checksumBytes = ByteBuffer.wrap(new byte[requiredSize]); + } else { + checksumBytes.clear(); + } + checksumBytes.limit(requiredSize); + } + + @Override + protected synchronized int readChunk(long pos, byte[] buf, int offset, + int len, byte[] checksumBuf) + throws IOException { + // Read one chunk. + if (eos) { + // Already hit EOF + return -1; + } + + // Read one DATA_CHUNK. + long chunkOffset = lastChunkOffset; + if ( lastChunkLen > 0 ) { + chunkOffset += lastChunkLen; + } + + // pos is relative to the start of the first chunk of the read. + // chunkOffset is relative to the start of the block. + // This makes sure that the read passed from FSInputChecker is the + // for the same chunk we expect to be reading from the DN. + if ( (pos + firstChunkOffset) != chunkOffset ) { + throw new IOException("Mismatch in pos : " + pos + " + " + + firstChunkOffset + " != " + chunkOffset); + } + + // Read next packet if the previous packet has been read completely. + if (dataLeft <= 0) { + //Read packet headers. + PacketHeader header = new PacketHeader(); + header.readFields(in); + + if (LOG.isDebugEnabled()) { + LOG.debug("DFSClient readChunk got header " + header); + } + + // Sanity check the lengths + if (!header.sanityCheck(lastSeqNo)) { + throw new IOException("BlockReader: error in packet header " + + header); + } + + lastSeqNo = header.getSeqno(); + dataLeft = header.getDataLen(); + adjustChecksumBytes(header.getDataLen()); + if (header.getDataLen() > 0) { + IOUtils.readFully(in, checksumBytes.array(), 0, + checksumBytes.limit()); + } + } + + // Sanity checks + assert len >= bytesPerChecksum; + assert checksum != null; + assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0); + + + int checksumsToRead, bytesToRead; + + if (checksumSize > 0) { + + // How many chunks left in our packet - this is a ceiling + // since we may have a partial chunk at the end of the file + int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1; + + // How many chunks we can fit in databuffer + // - note this is a floor since we always read full chunks + int chunksCanFit = Math.min(len / bytesPerChecksum, + checksumBuf.length / checksumSize); + + // How many chunks should we read + checksumsToRead = Math.min(chunksLeft, chunksCanFit); + // How many bytes should we actually read + bytesToRead = Math.min( + checksumsToRead * bytesPerChecksum, // full chunks + dataLeft); // in case we have a partial + } else { + // no checksum + bytesToRead = Math.min(dataLeft, len); + checksumsToRead = 0; + } + + if ( bytesToRead > 0 ) { + // Assert we have enough space + assert bytesToRead <= len; + assert checksumBytes.remaining() >= checksumSize * checksumsToRead; + assert checksumBuf.length >= checksumSize * checksumsToRead; + IOUtils.readFully(in, buf, offset, bytesToRead); + checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead); + } + + dataLeft -= bytesToRead; + assert dataLeft >= 0; + + lastChunkOffset = chunkOffset; + lastChunkLen = bytesToRead; + + // If there's no data left in the current packet after satisfying + // this read, and we have satisfied the client read, we expect + // an empty packet header from the DN to signify this. + // Note that pos + bytesToRead may in fact be greater since the + // DN finishes off the entire last chunk. + if (dataLeft == 0 && + pos + bytesToRead >= bytesNeededToFinish) { + + // Read header + PacketHeader hdr = new PacketHeader(); + hdr.readFields(in); + + if (!hdr.isLastPacketInBlock() || + hdr.getDataLen() != 0) { + throw new IOException("Expected empty end-of-read packet! Header: " + + hdr); + } + + eos = true; + } + + if ( bytesToRead == 0 ) { + return -1; + } + + return bytesToRead; + } + + private RemoteBlockReader(String file, String bpid, long blockId, + DataInputStream in, DataChecksum checksum, boolean verifyChecksum, + long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) { + // Path is used only for printing block and file information in debug + super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/, + 1, verifyChecksum, + checksum.getChecksumSize() > 0? checksum : null, + checksum.getBytesPerChecksum(), + checksum.getChecksumSize()); + + this.dnSock = dnSock; + this.in = in; + this.checksum = checksum; + this.startOffset = Math.max( startOffset, 0 ); + + // The total number of bytes that we need to transfer from the DN is + // the amount that the user wants (bytesToRead), plus the padding at + // the beginning in order to chunk-align. Note that the DN may elect + // to send more than this amount if the read starts/ends mid-chunk. + this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); + + this.firstChunkOffset = firstChunkOffset; + lastChunkOffset = firstChunkOffset; + lastChunkLen = -1; + + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + } + + public static RemoteBlockReader newBlockReader(Socket sock, String file, + ExtendedBlock block, Token blockToken, + long startOffset, long len, int bufferSize) throws IOException { + return newBlockReader(sock, file, block, blockToken, startOffset, + len, bufferSize, true, ""); + } + + /** + * Create a new BlockReader specifically to satisfy a read. + * This method also sends the OP_READ_BLOCK request. + * + * @param sock An established Socket to the DN. The BlockReader will not close it normally + * @param file File location + * @param block The block object + * @param blockToken The block token for security + * @param startOffset The read offset, relative to block head + * @param len The number of bytes to read + * @param bufferSize The IO buffer size (not the client buffer size) + * @param verifyChecksum Whether to verify checksum + * @param clientName Client name + * @return New BlockReader instance, or null on error. + */ + public static RemoteBlockReader newBlockReader( Socket sock, String file, + ExtendedBlock block, + Token blockToken, + long startOffset, long len, + int bufferSize, boolean verifyChecksum, + String clientName) + throws IOException { + // in and out will be closed when sock is closed (by the caller) + final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( + NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT))); + new Sender(out).readBlock(block, blockToken, clientName, startOffset, len); + + // + // Get bytes in block, set streams + // + + DataInputStream in = new DataInputStream( + new BufferedInputStream(NetUtils.getInputStream(sock), + bufferSize)); + + BlockOpResponseProto status = BlockOpResponseProto.parseFrom( + vintPrefixed(in)); + checkSuccess(status, sock, block, file); + DataChecksum checksum = DataChecksum.newDataChecksum( in ); + //Warning when we get CHECKSUM_NULL? + + // Read the first chunk offset. + long firstChunkOffset = in.readLong(); + + if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || + firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) { + throw new IOException("BlockReader: error in first chunk offset (" + + firstChunkOffset + ") startOffset is " + + startOffset + " for file " + file); + } + + return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), + in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock); + } + + private static void checkSuccess( + BlockOpResponseProto status, Socket sock, + ExtendedBlock block, String file) + throws IOException { + if (status.getStatus() != Status.SUCCESS) { + if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) { + throw new InvalidBlockTokenException( + "Got access token error for OP_READ_BLOCK, self=" + + sock.getLocalSocketAddress() + ", remote=" + + sock.getRemoteSocketAddress() + ", for file " + file + + ", for pool " + block.getBlockPoolId() + " block " + + block.getBlockId() + "_" + block.getGenerationStamp()); + } else { + throw new IOException("Got error for OP_READ_BLOCK, self=" + + sock.getLocalSocketAddress() + ", remote=" + + sock.getRemoteSocketAddress() + ", for file " + file + + ", for pool " + block.getBlockPoolId() + " block " + + block.getBlockId() + "_" + block.getGenerationStamp()); + } + } + } + + @Override + public synchronized void close() throws IOException { + startOffset = -1; + checksum = null; + if (dnSock != null) { + dnSock.close(); + } + + // in will be closed when its Socket is closed. + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return readFully(this, buf, offset, len); + } + + @Override + public Socket takeSocket() { + assert hasSentStatusCode() : + "BlockReader shouldn't give back sockets mid-read"; + Socket res = dnSock; + dnSock = null; + return res; + } + + @Override + public boolean hasSentStatusCode() { + return sentStatusCode; + } + + /** + * When the reader reaches end of the read, it sends a status response + * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN + * closing our connection (which we will re-open), but won't affect + * data correctness. + */ + void sendReadResult(Socket sock, Status statusCode) { + assert !sentStatusCode : "already sent status code to " + sock; + try { + OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT); + + ClientReadStatusProto.newBuilder() + .setStatus(statusCode) + .build() + .writeDelimitedTo(out); + + out.flush(); + sentStatusCode = true; + } catch (IOException e) { + // It's ok not to be able to send this. But something is probably wrong. + LOG.info("Could not send read status (" + statusCode + ") to datanode " + + sock.getInetAddress() + ": " + e.getMessage()); + } + } + + /** + * File name to print when accessing a block directly (from servlets) + * @param s Address of the block location + * @param poolId Block pool ID of the block + * @param blockId Block ID of the block + * @return string that has a file name for debug purposes + */ + public static String getFileName(final InetSocketAddress s, + final String poolId, final long blockId) { + return s.toString() + ":" + poolId + ":" + blockId; + } +} diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java index cf2452992f..81f182d6ca 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java @@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -195,8 +196,8 @@ public static void streamBlockInAscii(InetSocketAddress addr, String poolId, // Use the block name for file name. int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY, DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT); - String file = BlockReader.getFileName(addr, poolId, blockId); - BlockReader blockReader = BlockReader.newBlockReader(s, file, + String file = BlockReaderFactory.getFileName(addr, poolId, blockId); + BlockReader blockReader = BlockReaderFactory.newBlockReader(s, file, new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken, offsetIntoBlock, amtToRead, bufferSize); diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index f9ba6d4a41..26376d476f 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -37,6 +37,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; @@ -504,9 +505,9 @@ private void copyBlock(DFSClient dfs, LocatedBlock lblock, s.connect(targetAddr, HdfsConstants.READ_TIMEOUT); s.setSoTimeout(HdfsConstants.READ_TIMEOUT); - String file = BlockReader.getFileName(targetAddr, block.getBlockPoolId(), + String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(), block.getBlockId()); - blockReader = BlockReader.newBlockReader(s, file, block, lblock + blockReader = BlockReaderFactory.newBlockReader(s, file, block, lblock .getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096)); } catch (IOException ex) { diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java index 64e4998c72..25585cecbb 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java @@ -143,7 +143,7 @@ public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToR sock.connect(targetAddr, HdfsConstants.READ_TIMEOUT); sock.setSoTimeout(HdfsConstants.READ_TIMEOUT); - return BlockReader.newBlockReader( + return BlockReaderFactory.newBlockReader( sock, targetAddr.toString()+ ":" + block.getBlockId(), block, testBlock.getBlockToken(), offset, lenToRead, diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java index 639d7aff6d..8315e1f5d0 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java @@ -20,6 +20,8 @@ import java.util.List; +import org.apache.hadoop.hdfs.RemoteBlockReader; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.fs.Path; @@ -52,7 +54,8 @@ public static void setupCluster() throws Exception { */ @Test public void testBlockVerification() throws Exception { - BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); + RemoteBlockReader reader = (RemoteBlockReader)spy( + util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true); verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK); reader.close(); @@ -63,7 +66,8 @@ public void testBlockVerification() throws Exception { */ @Test public void testIncompleteRead() throws Exception { - BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); + RemoteBlockReader reader = (RemoteBlockReader)spy( + util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false); // We asked the blockreader for the whole file, and only read @@ -80,7 +84,8 @@ public void testIncompleteRead() throws Exception { @Test public void testCompletePartialRead() throws Exception { // Ask for half the file - BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2)); + RemoteBlockReader reader = (RemoteBlockReader)spy( + util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2)); // And read half the file util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true); verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK); @@ -99,7 +104,8 @@ public void testUnalignedReads() throws Exception { for (int length : lengths) { DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " + " len=" + length); - BlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length)); + RemoteBlockReader reader = (RemoteBlockReader)spy( + util.getBlockReader(testBlock, startOffset, length)); util.readAndCheckEOS(reader, length, true); verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK); reader.close(); diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java index 311b724048..a7d11c165a 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java @@ -28,6 +28,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.RemoteBlockReader; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSInputStream; +import org.apache.hadoop.hdfs.SocketCache; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -72,13 +76,13 @@ public class TestConnCache { * It verifies that all invocation to DFSInputStream.getBlockReader() * use the same socket. */ - private class MockGetBlockReader implements Answer { - public BlockReader reader = null; + private class MockGetBlockReader implements Answer { + public RemoteBlockReader reader = null; private Socket sock = null; - public BlockReader answer(InvocationOnMock invocation) throws Throwable { - BlockReader prevReader = reader; - reader = (BlockReader) invocation.callRealMethod(); + public RemoteBlockReader answer(InvocationOnMock invocation) throws Throwable { + RemoteBlockReader prevReader = reader; + reader = (RemoteBlockReader) invocation.callRealMethod(); if (sock == null) { sock = reader.dnSock; } else if (prevReader != null && prevReader.hasSentStatusCode()) { diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java index c7f0986981..25a486b166 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; @@ -139,9 +140,9 @@ private static void tryRead(Configuration conf, LocatedBlock lblock, s.connect(targetAddr, HdfsConstants.READ_TIMEOUT); s.setSoTimeout(HdfsConstants.READ_TIMEOUT); - String file = BlockReader.getFileName(targetAddr, + String file = BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId()); - blockReader = BlockReader.newBlockReader(s, file, block, + blockReader = BlockReaderFactory.newBlockReader(s, file, block, lblock.getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096)); diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 9b3789c00e..208df40795 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -267,11 +268,11 @@ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock) s.connect(targetAddr, HdfsConstants.READ_TIMEOUT); s.setSoTimeout(HdfsConstants.READ_TIMEOUT); - String file = BlockReader.getFileName(targetAddr, + String file = BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId()); BlockReader blockReader = - BlockReader.newBlockReader(s, file, block, lblock + BlockReaderFactory.newBlockReader(s, file, block, lblock .getBlockToken(), 0, -1, 4096); // nothing - if it fails - it will throw and exception