HDFS-2654. Make BlockReaderLocal not extend RemoteBlockReader2. Contributed by Eli Collins

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1213592 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2011-12-13 08:09:50 +00:00
parent 197634f2f7
commit 5f39d6c239
3 changed files with 80 additions and 44 deletions

View File

@ -204,6 +204,8 @@ Release 0.23.1 - UNRELEASED
HDFS-2604. Add a log message to show if WebHDFS is enabled and a HDFS-2604. Add a log message to show if WebHDFS is enabled and a
configuration section in the forrest doc. (szetszwo) configuration section in the forrest doc. (szetszwo)
HDFS-2654. Make BlockReaderLocal not extend RemoteBlockReader2. (eli)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2130. Switch default checksum to CRC32C. (todd) HDFS-2130. Switch default checksum to CRC32C. (todd)

View File

@ -21,6 +21,7 @@
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -37,6 +38,7 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.FSDataset; import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -57,8 +59,8 @@
* if security is enabled.</li> * if security is enabled.</li>
* </ul> * </ul>
*/ */
class BlockReaderLocal extends RemoteBlockReader2 { class BlockReaderLocal implements BlockReader {
public static final Log LOG = LogFactory.getLog(DFSClient.class); private static final Log LOG = LogFactory.getLog(DFSClient.class);
//Stores the cache and proxy for a local datanode. //Stores the cache and proxy for a local datanode.
private static class LocalDatanodeInfo { private static class LocalDatanodeInfo {
@ -117,13 +119,24 @@ private void removeBlockLocalPathInfo(ExtendedBlock b) {
private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>(); private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
private final FileInputStream dataIn; // reader for the data file private final FileInputStream dataIn; // reader for the data file
private FileInputStream checksumIn; // reader for the checksum file private FileInputStream checksumIn; // reader for the checksum file
private int offsetFromChunkBoundary; private int offsetFromChunkBoundary;
ByteBuffer dataBuff = null; private byte[] skipBuf = null;
ByteBuffer checksumBuff = null; private ByteBuffer dataBuff = null;
private ByteBuffer checksumBuff = null;
private DataChecksum checksum;
private final boolean verifyChecksum;
private static DirectBufferPool bufferPool = new DirectBufferPool();
private int bytesPerChecksum;
private int checksumSize;
/** offset in block where reader wants to actually read */
private long startOffset;
private final String filename;
/** /**
* The only way this object can be instantiated. * The only way this object can be instantiated.
@ -256,9 +269,14 @@ private BlockReaderLocal(Configuration conf, String hdfsfile,
long length, BlockLocalPathInfo pathinfo, DataChecksum checksum, long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset, boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
FileInputStream checksumIn) throws IOException { FileInputStream checksumIn) throws IOException {
super(hdfsfile, block.getBlockPoolId(), block.getBlockId(), dataIn this.filename = hdfsfile;
.getChannel(), checksum, verifyChecksum, startOffset, firstChunkOffset, this.checksum = checksum;
length, null); this.verifyChecksum = verifyChecksum;
this.startOffset = Math.max(startOffset, 0);
bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize();
this.dataIn = dataIn; this.dataIn = dataIn;
this.checksumIn = checksumIn; this.checksumIn = checksumIn;
this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset); this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
@ -322,10 +340,8 @@ public synchronized int read(byte[] buf, int off, int len) throws IOException {
readIntoBuffer(checksumIn, checksumBuff); readIntoBuffer(checksumIn, checksumBuff);
checksumBuff.flip(); checksumBuff.flip();
dataBuff.flip(); dataBuff.flip();
if (verifyChecksum) {
checksum.verifyChunkedSums(dataBuff, checksumBuff, filename, checksum.verifyChunkedSums(dataBuff, checksumBuff, filename,
this.startOffset); this.startOffset);
}
} else { } else {
dataRead = dataBuff.remaining(); dataRead = dataBuff.remaining();
} }
@ -356,9 +372,24 @@ public synchronized long skip(long n) throws IOException {
} }
if (!verifyChecksum) { if (!verifyChecksum) {
return dataIn.skip(n); return dataIn.skip(n);
} else {
return super.skip(n);
} }
// Skip by reading the data so we stay in sync with checksums.
// This could be implemented more efficiently in the future to
// skip to the beginning of the appropriate checksum chunk
// and then only read to the middle of that chunk.
if (skipBuf == null) {
skipBuf = new byte[bytesPerChecksum];
}
long nSkipped = 0;
while ( nSkipped < n ) {
int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
int ret = read(skipBuf, 0, toSkip);
if ( ret <= 0 ) {
return nSkipped;
}
nSkipped += ret;
}
return nSkipped;
} }
@Override @Override
@ -375,6 +406,27 @@ public synchronized void close() throws IOException {
bufferPool.returnBuffer(checksumBuff); bufferPool.returnBuffer(checksumBuff);
checksumBuff = null; checksumBuff = null;
} }
super.close(); startOffset = -1;
checksum = null;
}
@Override
public int readAll(byte[] buf, int offset, int len) throws IOException {
return BlockReaderUtil.readAll(this, buf, offset, len);
}
@Override
public void readFully(byte[] buf, int off, int len) throws IOException {
BlockReaderUtil.readFully(this, buf, off, len);
}
@Override
public Socket takeSocket() {
return null;
}
@Override
public boolean hasSentStatusCode() {
return false;
} }
} }

View File

@ -85,7 +85,7 @@ public class RemoteBlockReader2 implements BlockReader {
Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
private ReadableByteChannel in; private ReadableByteChannel in;
protected DataChecksum checksum; private DataChecksum checksum;
private PacketHeader curHeader; private PacketHeader curHeader;
private ByteBuffer curPacketBuf = null; private ByteBuffer curPacketBuf = null;
@ -96,25 +96,24 @@ public class RemoteBlockReader2 implements BlockReader {
private long lastSeqNo = -1; private long lastSeqNo = -1;
/** offset in block where reader wants to actually read */ /** offset in block where reader wants to actually read */
protected long startOffset; private long startOffset;
protected final String filename; private final String filename;
protected static DirectBufferPool bufferPool = private static DirectBufferPool bufferPool = new DirectBufferPool();
new DirectBufferPool();
private ByteBuffer headerBuf = ByteBuffer.allocate( private ByteBuffer headerBuf = ByteBuffer.allocate(
PacketHeader.PKT_HEADER_LEN); PacketHeader.PKT_HEADER_LEN);
protected int bytesPerChecksum; private int bytesPerChecksum;
protected int checksumSize; private int checksumSize;
/** /**
* The total number of bytes we need to transfer from the DN. * The total number of bytes we need to transfer from the DN.
* This is the amount that the user has requested plus some padding * This is the amount that the user has requested plus some padding
* at the beginning so that the read can begin on a chunk boundary. * at the beginning so that the read can begin on a chunk boundary.
*/ */
protected long bytesNeededToFinish; private long bytesNeededToFinish;
protected final boolean verifyChecksum; private final boolean verifyChecksum;
private boolean sentStatusCode = false; private boolean sentStatusCode = false;
@ -389,29 +388,12 @@ public static String getFileName(final InetSocketAddress s,
@Override @Override
public int readAll(byte[] buf, int offset, int len) throws IOException { public int readAll(byte[] buf, int offset, int len) throws IOException {
int n = 0; return BlockReaderUtil.readAll(this, buf, offset, len);
for (;;) {
int nread = read(buf, offset + n, len - n);
if (nread <= 0)
return (n == 0) ? nread : n;
n += nread;
if (n >= len)
return n;
}
} }
@Override @Override
public void readFully(byte[] buf, int off, int len) public void readFully(byte[] buf, int off, int len) throws IOException {
throws IOException { BlockReaderUtil.readFully(this, buf, off, len);
int toRead = len;
while (toRead > 0) {
int ret = read(buf, off, toRead);
if (ret < 0) {
throw new IOException("Premature EOF from inputStream");
}
toRead -= ret;
off += ret;
}
} }
/** /**