From b8448dea82c72ff6c1558b9ebf3f24cd1c6e728b Mon Sep 17 00:00:00 2001 From: Jitendra Nath Pandey Date: Thu, 16 Feb 2012 18:58:19 +0000 Subject: [PATCH] HDFS-2655. BlockReaderLocal#skip performs unnecessary IO. Contributed by Brandon Li. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1245118 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../apache/hadoop/hdfs/BlockReaderLocal.java | 68 +++++++++++++++---- .../hdfs/TestShortCircuitLocalRead.java | 46 +++++++++++++ 3 files changed, 104 insertions(+), 13 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6ce1558d04..66efd7e9b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -132,6 +132,9 @@ Trunk (unreleased changes) HDFS-2878. Fix TestBlockRecovery and move it back into main test directory. (todd) + HDFS-2655. BlockReaderLocal#skip performs unnecessary IO. (Brandon Li + via jitendra) + OPTIMIZATIONS HDFS-2477. Optimize computing the diff between a block report and the namenode state. (Tomasz Nykiel via hairong) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index 3090419fb0..cc61697ede 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -369,26 +369,68 @@ public synchronized long skip(long n) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("skip " + n); } + if (n <= 0) { + return 0; + } if (!verifyChecksum) { return dataIn.skip(n); } - // Skip by reading the data so we stay in sync with checksums. - // This could be implemented more efficiently in the future to - // skip to the beginning of the appropriate checksum chunk - // and then only read to the middle of that chunk. + + // caller made sure newPosition is not beyond EOF. + int remaining = dataBuff.remaining(); + int position = dataBuff.position(); + int newPosition = position + (int)n; + + // if the new offset is already read into dataBuff, just reposition + if (n <= remaining) { + assert offsetFromChunkBoundary == 0; + dataBuff.position(newPosition); + return n; + } + + // for small gap, read through to keep the data/checksum in sync + if (n - remaining <= bytesPerChecksum) { + dataBuff.position(position + remaining); + if (skipBuf == null) { + skipBuf = new byte[bytesPerChecksum]; + } + int ret = read(skipBuf, 0, (int)(n - remaining)); + return ret; + } + + // optimize for big gap: discard the current buffer, skip to + // the beginning of the appropriate checksum chunk and then + // read to the middle of that chunk to be in sync with checksums. + this.offsetFromChunkBoundary = newPosition % bytesPerChecksum; + long toskip = n - remaining - this.offsetFromChunkBoundary; + + dataBuff.clear(); + checksumBuff.clear(); + + long dataSkipped = dataIn.skip(toskip); + if (dataSkipped != toskip) { + throw new IOException("skip error in data input stream"); + } + long checkSumOffset = (dataSkipped / bytesPerChecksum) * checksumSize; + if (checkSumOffset > 0) { + long skipped = checksumIn.skip(checkSumOffset); + if (skipped != checkSumOffset) { + throw new IOException("skip error in checksum input stream"); + } + } + + // read into the middle of the chunk if (skipBuf == null) { skipBuf = new byte[bytesPerChecksum]; } - long nSkipped = 0; - while ( nSkipped < n ) { - int toSkip = (int)Math.min(n-nSkipped, skipBuf.length); - int ret = read(skipBuf, 0, toSkip); - if ( ret <= 0 ) { - return nSkipped; - } - nSkipped += ret; + assert skipBuf.length == bytesPerChecksum; + assert this.offsetFromChunkBoundary < bytesPerChecksum; + int ret = read(skipBuf, 0, this.offsetFromChunkBoundary); + if (ret == -1) { // EOS + return toskip; + } else { + return (toskip + ret); } - return nSkipped; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java index d14b05d076..34ed50a9cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java @@ -238,7 +238,53 @@ public ClientDatanodeProtocol run() throws Exception { cluster.shutdown(); } } + + @Test + public void testSkipWithVerifyChecksum() throws IOException { + int size = blockSize; + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false); + conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, + UserGroupInformation.getCurrentUser().getShortUserName()); + if (simulatedStorage) { + conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); + } + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) + .format(true).build(); + FileSystem fs = cluster.getFileSystem(); + try { + // check that / exists + Path path = new Path("/"); + assertTrue("/ should be a directory", fs.getFileStatus(path) + .isDirectory() == true); + + byte[] fileData = AppendTestUtil.randomBytes(seed, size*3); + // create a new file in home directory. Do not close it. + Path file1 = new Path("filelocal.dat"); + FSDataOutputStream stm = createFile(fs, file1, 1); + // write to file + stm.write(fileData); + stm.close(); + + // now test the skip function + FSDataInputStream instm = fs.open(file1); + byte[] actual = new byte[fileData.length]; + // read something from the block first, otherwise BlockReaderLocal.skip() + // will not be invoked + int nread = instm.read(actual, 0, 3); + long skipped = 2*size+3; + instm.seek(skipped); + nread = instm.read(actual, (int)(skipped + nread), 3); + instm.close(); + + } finally { + fs.close(); + cluster.shutdown(); + } + } + /** * Test to run benchmarks between shortcircuit read vs regular read with * specified number of threads simultaneously reading.