From 3052ad1f0069af5caee621374b29d17d7f12ab51 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Mon, 14 Jan 2013 20:47:08 +0000 Subject: [PATCH] HDFS-3429. DataNode reads checksums even if client does not need them. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1433117 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../apache/hadoop/hdfs/RemoteBlockReader.java | 3 +- .../hadoop/hdfs/RemoteBlockReader2.java | 3 +- .../datatransfer/DataTransferProtocol.java | 5 +- .../hdfs/protocol/datatransfer/Receiver.java | 3 +- .../hdfs/protocol/datatransfer/Sender.java | 8 ++- .../datanode/BlockPoolSliceScanner.java | 4 +- .../hdfs/server/datanode/BlockSender.java | 61 ++++++++++++------- .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../hdfs/server/datanode/DataXceiver.java | 7 ++- .../src/main/proto/datatransfer.proto | 1 + .../hadoop/hdfs/TestDataTransferProtocol.java | 12 ++-- .../apache/hadoop/hdfs/TestParallelRead.java | 10 +++ .../hadoop/hdfs/TestParallelReadUtil.java | 4 +- .../org/apache/hadoop/hdfs/TestPread.java | 18 +++++- 15 files changed, 101 insertions(+), 42 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 69773f9ae5..c23876e9c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -482,6 +482,8 @@ Release 2.0.3-alpha - Unreleased OPTIMIZATIONS + HDFS-3429. DataNode reads checksums even if client does not need them (todd) + BUG FIXES HDFS-3919. MiniDFSCluster:waitClusterUp can hang forever. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index dc449ee2f2..f7ac589921 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -380,7 +380,8 @@ public static RemoteBlockReader newBlockReader( Socket sock, String file, // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT))); - new Sender(out).readBlock(block, blockToken, clientName, startOffset, len); + new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, + verifyChecksum); // // Get bytes in block, set streams diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 3450cd1524..58bb37a724 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -392,7 +392,8 @@ public static BlockReader newBlockReader(Socket sock, String file, // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( ioStreams.out)); - new Sender(out).readBlock(block, blockToken, clientName, startOffset, len); + new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, + verifyChecksum); // // Get bytes in block diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index 98094472a7..7f4463789b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -55,12 +55,15 @@ public interface DataTransferProtocol { * @param clientName client's name. * @param blockOffset offset of the block. * @param length maximum number of bytes for this read. + * @param sendChecksum if false, the DN should skip reading and sending + * checksums */ public void readBlock(final ExtendedBlock blk, final Token blockToken, final String clientName, final long blockOffset, - final long length) throws IOException; + final long length, + final boolean sendChecksum) throws IOException; /** * Write a block to a datanode pipeline. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index b1edc20e3a..a156dfa538 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -88,7 +88,8 @@ private void opReadBlock() throws IOException { PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), proto.getOffset(), - proto.getLen()); + proto.getLen(), + proto.getSendChecksums()); } /** Receive OP_WRITE_BLOCK */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 8184c500f8..fb8bee5388 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -62,6 +62,10 @@ private static void op(final DataOutput out, final Op op private static void send(final DataOutputStream out, final Op opcode, final Message proto) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName() + + ": " + proto); + } op(out, opcode); proto.writeDelimitedTo(out); out.flush(); @@ -72,12 +76,14 @@ public void readBlock(final ExtendedBlock blk, final Token blockToken, final String clientName, final long blockOffset, - final long length) throws IOException { + final long length, + final boolean sendChecksum) throws IOException { OpReadBlockProto proto = OpReadBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken)) .setOffset(blockOffset) .setLen(length) + .setSendChecksums(sendChecksum) .build(); send(out, Op.READ_BLOCK, proto); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java index d3d2f915ca..8a117546ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java @@ -388,8 +388,8 @@ void verifyBlock(ExtendedBlock block) { try { adjustThrottler(); - blockSender = new BlockSender(block, 0, -1, false, true, datanode, - null); + blockSender = new BlockSender(block, 0, -1, false, true, true, + datanode, null); DataOutputStream out = new DataOutputStream(new IOUtils.NullOutputStream()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index bbcb2dd2e1..fdade84f0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -45,6 +45,8 @@ import org.apache.hadoop.net.SocketOutputStream; import org.apache.hadoop.util.DataChecksum; +import com.google.common.base.Preconditions; + /** * Reads a block from the disk and sends it to a recipient. * @@ -158,12 +160,14 @@ class BlockSender implements java.io.Closeable { * @param length length of data to read * @param corruptChecksumOk * @param verifyChecksum verify checksum while reading the data + * @param sendChecksum send checksum to client. * @param datanode datanode from which the block is being read * @param clientTraceFmt format string used to print client trace logs * @throws IOException */ BlockSender(ExtendedBlock block, long startOffset, long length, boolean corruptChecksumOk, boolean verifyChecksum, + boolean sendChecksum, DataNode datanode, String clientTraceFmt) throws IOException { try { @@ -175,6 +179,13 @@ class BlockSender implements java.io.Closeable { this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads; this.datanode = datanode; + if (verifyChecksum) { + // To simplify implementation, callers may not specify verification + // without sending. + Preconditions.checkArgument(sendChecksum, + "If verifying checksum, currently must also send it."); + } + final Replica replica; final long replicaVisibleLength; synchronized(datanode.data) { @@ -213,29 +224,37 @@ class BlockSender implements java.io.Closeable { * False, True: will verify checksum * False, False: throws IOException file not found */ - DataChecksum csum; - final InputStream metaIn = datanode.data.getMetaDataInputStream(block); - if (!corruptChecksumOk || metaIn != null) { - if (metaIn == null) { - //need checksum but meta-data not found - throw new FileNotFoundException("Meta-data not found for " + block); - } - - checksumIn = new DataInputStream( - new BufferedInputStream(metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); + DataChecksum csum = null; + if (verifyChecksum || sendChecksum) { + final InputStream metaIn = datanode.data.getMetaDataInputStream(block); + if (!corruptChecksumOk || metaIn != null) { + if (metaIn == null) { + //need checksum but meta-data not found + throw new FileNotFoundException("Meta-data not found for " + block); + } - // read and handle the common header here. For now just a version - BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); - short version = header.getVersion(); - if (version != BlockMetadataHeader.VERSION) { - LOG.warn("Wrong version (" + version + ") for metadata file for " - + block + " ignoring ..."); + checksumIn = new DataInputStream( + new BufferedInputStream(metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); + + // read and handle the common header here. For now just a version + BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); + short version = header.getVersion(); + if (version != BlockMetadataHeader.VERSION) { + LOG.warn("Wrong version (" + version + ") for metadata file for " + + block + " ignoring ..."); + } + csum = header.getChecksum(); + } else { + LOG.warn("Could not find metadata file for " + block); } - csum = header.getChecksum(); - } else { - LOG.warn("Could not find metadata file for " + block); - // This only decides the buffer size. Use BUFFER_SIZE? - csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 16 * 1024); + } + if (csum == null) { + // The number of bytes per checksum here determines the alignment + // of reads: we always start reading at a checksum chunk boundary, + // even if the checksum type is NULL. So, choosing too big of a value + // would risk sending too much unnecessary data. 512 (1 disk sector) + // is likely to result in minimal extra IO. + csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512); } /* diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c1845fd152..375c6954a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1441,7 +1441,7 @@ public void run() { HdfsConstants.SMALL_BUFFER_SIZE)); in = new DataInputStream(unbufIn); blockSender = new BlockSender(b, 0, b.getNumBytes(), - false, false, DataNode.this, null); + false, false, true, DataNode.this, null); DatanodeInfo srcNode = new DatanodeInfo(bpReg); // diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 255fd35ff3..1d4c1c3fc7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -241,7 +241,8 @@ public void readBlock(final ExtendedBlock block, final Token blockToken, final String clientName, final long blockOffset, - final long length) throws IOException { + final long length, + final boolean sendChecksum) throws IOException { previousOpClientName = clientName; OutputStream baseStream = getOutputStream(); @@ -266,7 +267,7 @@ public void readBlock(final ExtendedBlock block, try { try { blockSender = new BlockSender(block, blockOffset, length, - true, false, datanode, clientTraceFmt); + true, false, sendChecksum, datanode, clientTraceFmt); } catch(IOException e) { String msg = "opReadBlock " + block + " received exception " + e; LOG.info(msg); @@ -654,7 +655,7 @@ public void copyBlock(final ExtendedBlock block, try { // check if the block exists or not - blockSender = new BlockSender(block, 0, -1, false, false, datanode, + blockSender = new BlockSender(block, 0, -1, false, false, true, datanode, null); // set up response stream diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index 8ce5fd7566..d97bd7daee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -52,6 +52,7 @@ message OpReadBlockProto { required ClientOperationHeaderProto header = 1; required uint64 offset = 2; required uint64 len = 3; + optional bool sendChecksums = 4 [default = true]; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index 77ea9c5907..d699f750fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -444,21 +444,21 @@ public void testDataTransferProtocol() throws IOException { recvBuf.reset(); blk.setBlockId(blkid-1); sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", - 0L, fileLen); + 0L, fileLen, true); sendRecvData("Wrong block ID " + newBlockId + " for read", false); // negative block start offset -1L sendBuf.reset(); blk.setBlockId(blkid); sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", - -1L, fileLen); + -1L, fileLen, true); sendRecvData("Negative start-offset for read for block " + firstBlock.getBlockId(), false); // bad block start offset sendBuf.reset(); sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", - fileLen, fileLen); + fileLen, fileLen, true); sendRecvData("Wrong start-offset for reading block " + firstBlock.getBlockId(), false); @@ -475,7 +475,7 @@ public void testDataTransferProtocol() throws IOException { sendBuf.reset(); sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", - 0L, -1L-random.nextInt(oneMil)); + 0L, -1L-random.nextInt(oneMil), true); sendRecvData("Negative length for reading block " + firstBlock.getBlockId(), false); @@ -488,14 +488,14 @@ public void testDataTransferProtocol() throws IOException { recvOut); sendBuf.reset(); sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", - 0L, fileLen+1); + 0L, fileLen+1, true); sendRecvData("Wrong length for reading block " + firstBlock.getBlockId(), false); //At the end of all this, read the file to make sure that succeeds finally. sendBuf.reset(); sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", - 0L, fileLen); + 0L, fileLen, true); readFile(fileSys, file, fileLen); } finally { cluster.shutdown(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java index b432052035..fa384cde7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java @@ -19,6 +19,9 @@ import java.io.IOException; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; +import org.apache.log4j.Level; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -56,4 +59,11 @@ public void testParallelReadByteBuffer() throws IOException { public void testParallelReadMixed() throws IOException { runTestWorkload(new MixedWorkloadHelper()); } + + @Test + public void testParallelNoChecksums() throws IOException { + verifyChecksums = false; + runTestWorkload(new MixedWorkloadHelper()); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java index 1c59eca871..51c3200d2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java @@ -46,6 +46,7 @@ public class TestParallelReadUtil { static final int FILE_SIZE_K = 256; static Random rand = null; static final int DEFAULT_REPLICATION_FACTOR = 2; + protected boolean verifyChecksums = true; static { // The client-trace log ends up causing a lot of blocking threads @@ -317,7 +318,8 @@ boolean runParallelRead(int nFiles, int nWorkerEach, ReadWorkerHelper helper) th testInfo.filepath = new Path("/TestParallelRead.dat." + i); testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K); - testInfo.dis = dfsClient.open(testInfo.filepath.toString()); + testInfo.dis = dfsClient.open(testInfo.filepath.toString(), + dfsClient.dfsClientConf.ioBufferSize, verifyChecksums); for (int j = 0; j < nWorkerEach; ++j) { workers[nWorkers++] = new ReadWorker(testInfo, nWorkers, helper); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java index 1e0681f471..9afa493391 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java @@ -24,11 +24,14 @@ import java.io.IOException; import java.util.Random; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.log4j.Level; import org.junit.Test; /** @@ -194,11 +197,19 @@ private void cleanupFile(FileSystem fileSys, Path name) throws IOException { */ @Test public void testPreadDFS() throws IOException { - dfsPreadTest(false); //normal pread - dfsPreadTest(true); //trigger read code path without transferTo. + dfsPreadTest(false, true); //normal pread + dfsPreadTest(true, true); //trigger read code path without transferTo. } - private void dfsPreadTest(boolean disableTransferTo) throws IOException { + @Test + public void testPreadDFSNoChecksum() throws IOException { + ((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL); + dfsPreadTest(false, false); + dfsPreadTest(true, false); + } + + private void dfsPreadTest(boolean disableTransferTo, boolean verifyChecksum) + throws IOException { Configuration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096); @@ -210,6 +221,7 @@ private void dfsPreadTest(boolean disableTransferTo) throws IOException { } MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); FileSystem fileSys = cluster.getFileSystem(); + fileSys.setVerifyChecksum(verifyChecksum); try { Path file1 = new Path("preadtest.dat"); writeFile(fileSys, file1);