diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2d8662c8a7..1d8e7b55ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -204,6 +204,8 @@ Release 0.23.1 - UNRELEASED HDFS-2604. Add a log message to show if WebHDFS is enabled and a configuration section in the forrest doc. (szetszwo) + HDFS-2654. Make BlockReaderLocal not extend RemoteBlockReader2. (eli) + OPTIMIZATIONS HDFS-2130. Switch default checksum to CRC32C. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index d34d74d438..6c7f829f56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.net.Socket; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; @@ -37,6 +38,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.FSDataset; +import org.apache.hadoop.hdfs.util.DirectBufferPool; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -57,8 +59,8 @@ * if security is enabled. * */ -class BlockReaderLocal extends RemoteBlockReader2 { - public static final Log LOG = LogFactory.getLog(DFSClient.class); +class BlockReaderLocal implements BlockReader { + private static final Log LOG = LogFactory.getLog(DFSClient.class); //Stores the cache and proxy for a local datanode. private static class LocalDatanodeInfo { @@ -117,13 +119,24 @@ private void removeBlockLocalPathInfo(ExtendedBlock b) { private static Map localDatanodeInfoMap = new HashMap(); private final FileInputStream dataIn; // reader for the data file - private FileInputStream checksumIn; // reader for the checksum file private int offsetFromChunkBoundary; - ByteBuffer dataBuff = null; - ByteBuffer checksumBuff = null; + private byte[] skipBuf = null; + private ByteBuffer dataBuff = null; + private ByteBuffer checksumBuff = null; + private DataChecksum checksum; + private final boolean verifyChecksum; + + private static DirectBufferPool bufferPool = new DirectBufferPool(); + + private int bytesPerChecksum; + private int checksumSize; + + /** offset in block where reader wants to actually read */ + private long startOffset; + private final String filename; /** * The only way this object can be instantiated. @@ -256,9 +269,14 @@ private BlockReaderLocal(Configuration conf, String hdfsfile, long length, BlockLocalPathInfo pathinfo, DataChecksum checksum, boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset, FileInputStream checksumIn) throws IOException { - super(hdfsfile, block.getBlockPoolId(), block.getBlockId(), dataIn - .getChannel(), checksum, verifyChecksum, startOffset, firstChunkOffset, - length, null); + this.filename = hdfsfile; + this.checksum = checksum; + this.verifyChecksum = verifyChecksum; + this.startOffset = Math.max(startOffset, 0); + + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + this.dataIn = dataIn; this.checksumIn = checksumIn; this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset); @@ -322,10 +340,8 @@ public synchronized int read(byte[] buf, int off, int len) throws IOException { readIntoBuffer(checksumIn, checksumBuff); checksumBuff.flip(); dataBuff.flip(); - if (verifyChecksum) { - checksum.verifyChunkedSums(dataBuff, checksumBuff, filename, - this.startOffset); - } + checksum.verifyChunkedSums(dataBuff, checksumBuff, filename, + this.startOffset); } else { dataRead = dataBuff.remaining(); } @@ -356,9 +372,24 @@ public synchronized long skip(long n) throws IOException { } if (!verifyChecksum) { return dataIn.skip(n); - } else { - return super.skip(n); } + // Skip by reading the data so we stay in sync with checksums. + // This could be implemented more efficiently in the future to + // skip to the beginning of the appropriate checksum chunk + // and then only read to the middle of that chunk. + if (skipBuf == null) { + skipBuf = new byte[bytesPerChecksum]; + } + long nSkipped = 0; + while ( nSkipped < n ) { + int toSkip = (int)Math.min(n-nSkipped, skipBuf.length); + int ret = read(skipBuf, 0, toSkip); + if ( ret <= 0 ) { + return nSkipped; + } + nSkipped += ret; + } + return nSkipped; } @Override @@ -375,6 +406,27 @@ public synchronized void close() throws IOException { bufferPool.returnBuffer(checksumBuff); checksumBuff = null; } - super.close(); + startOffset = -1; + checksum = null; + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public void readFully(byte[] buf, int off, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, off, len); + } + + @Override + public Socket takeSocket() { + return null; + } + + @Override + public boolean hasSentStatusCode() { + return false; } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 1f5f12bda7..ea24777571 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -85,7 +85,7 @@ public class RemoteBlockReader2 implements BlockReader { Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. private ReadableByteChannel in; - protected DataChecksum checksum; + private DataChecksum checksum; private PacketHeader curHeader; private ByteBuffer curPacketBuf = null; @@ -96,25 +96,24 @@ public class RemoteBlockReader2 implements BlockReader { private long lastSeqNo = -1; /** offset in block where reader wants to actually read */ - protected long startOffset; - protected final String filename; + private long startOffset; + private final String filename; - protected static DirectBufferPool bufferPool = - new DirectBufferPool(); + private static DirectBufferPool bufferPool = new DirectBufferPool(); private ByteBuffer headerBuf = ByteBuffer.allocate( PacketHeader.PKT_HEADER_LEN); - protected int bytesPerChecksum; - protected int checksumSize; + private int bytesPerChecksum; + private int checksumSize; /** * The total number of bytes we need to transfer from the DN. * This is the amount that the user has requested plus some padding * at the beginning so that the read can begin on a chunk boundary. */ - protected long bytesNeededToFinish; + private long bytesNeededToFinish; - protected final boolean verifyChecksum; + private final boolean verifyChecksum; private boolean sentStatusCode = false; @@ -389,29 +388,12 @@ public static String getFileName(final InetSocketAddress s, @Override public int readAll(byte[] buf, int offset, int len) throws IOException { - int n = 0; - for (;;) { - int nread = read(buf, offset + n, len - n); - if (nread <= 0) - return (n == 0) ? nread : n; - n += nread; - if (n >= len) - return n; - } + return BlockReaderUtil.readAll(this, buf, offset, len); } @Override - public void readFully(byte[] buf, int off, int len) - throws IOException { - int toRead = len; - while (toRead > 0) { - int ret = read(buf, off, toRead); - if (ret < 0) { - throw new IOException("Premature EOF from inputStream"); - } - toRead -= ret; - off += ret; - } + public void readFully(byte[] buf, int off, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, off, len); } /**