diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java index 889ccc1486..9b66c950bc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java @@ -214,7 +214,30 @@ private void fill( ) throws IOException { count = readChecksumChunk(buf, 0, maxChunkSize); if (count < 0) count = 0; } - + + /** + * Like read(byte[], int, int), but does not provide a dest buffer, + * so the read data is discarded. + * @param len maximum number of bytes to read. + * @return the number of bytes read. + * @throws IOException if an I/O error occurs. + */ + final protected synchronized int readAndDiscard(int len) throws IOException { + int total = 0; + while (total < len) { + if (pos >= count) { + count = readChecksumChunk(buf, 0, maxChunkSize); + if (count <= 0) { + break; + } + } + int rd = Math.min(count - pos, len - total); + pos += rd; + total += rd; + } + return total; + } + /* * Read characters into a portion of an array, reading from the underlying * stream at most once if necessary. diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index fbeb45dee1..c538b78df9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -483,6 +483,9 @@ Release 2.8.0 - UNRELEASED HDFS-8283. DataStreamer cleanup and some minor improvement. (szetszwo via jing9) + HDFS-5574. Remove buffer copy in BlockReader.skip. + (Binglin Chang via aajisaka) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index ce96ac9fa3..d70f41904b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -97,7 +97,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { private boolean eos = false; private boolean sentStatusCode = false; - byte[] skipBuf = null; ByteBuffer checksumBytes = null; /** Amount of unread data in the current received packet */ int dataLeft = 0; @@ -126,10 +125,7 @@ public synchronized int read(byte[] buf, int off, int len) if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) { // Skip these bytes. But don't call this.skip()! int toSkip = (int)(startOffset - firstChunkOffset); - if ( skipBuf == null ) { - skipBuf = new byte[bytesPerChecksum]; - } - if ( super.read(skipBuf, 0, toSkip) != toSkip ) { + if ( super.readAndDiscard(toSkip) != toSkip ) { // should never happen throw new IOException("Could not skip required number of bytes"); } @@ -152,15 +148,11 @@ public synchronized int read(byte[] buf, int off, int len) public synchronized long skip(long n) throws IOException { /* How can we make sure we don't throw a ChecksumException, at least * in majority of the cases?. This one throws. */ - if ( skipBuf == null ) { - skipBuf = new byte[bytesPerChecksum]; - } - long nSkipped = 0; - while ( nSkipped < n ) { - int toSkip = (int)Math.min(n-nSkipped, skipBuf.length); - int ret = read(skipBuf, 0, toSkip); - if ( ret <= 0 ) { + while (nSkipped < n) { + int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE); + int ret = readAndDiscard(toSkip); + if (ret <= 0) { return nSkipped; } nSkipped += ret; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 9245a84c69..c368d6515f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -122,12 +122,7 @@ public class RemoteBlockReader2 implements BlockReader { private final boolean verifyChecksum; private boolean sentStatusCode = false; - - byte[] skipBuf = null; - ByteBuffer checksumBytes = null; - /** Amount of unread data in the current received packet */ - int dataLeft = 0; - + @VisibleForTesting public Peer getPeer() { return peer; @@ -172,7 +167,7 @@ public synchronized int read(byte[] buf, int off, int len) @Override - public int read(ByteBuffer buf) throws IOException { + public synchronized int read(ByteBuffer buf) throws IOException { if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { TraceScope scope = Trace.startSpan( "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER); @@ -257,21 +252,23 @@ private void readNextPacket() throws IOException { @Override public synchronized long skip(long n) throws IOException { /* How can we make sure we don't throw a ChecksumException, at least - * in majority of the cases?. This one throws. */ - if ( skipBuf == null ) { - skipBuf = new byte[bytesPerChecksum]; - } - - long nSkipped = 0; - while ( nSkipped < n ) { - int toSkip = (int)Math.min(n-nSkipped, skipBuf.length); - int ret = read(skipBuf, 0, toSkip); - if ( ret <= 0 ) { - return nSkipped; + * in majority of the cases?. This one throws. */ + long skipped = 0; + while (skipped < n) { + long needToSkip = n - skipped; + if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + readNextPacket(); } - nSkipped += ret; + if (curDataSlice.remaining() == 0) { + // we're at EOF now + break; + } + + int skip = (int)Math.min(curDataSlice.remaining(), needToSkip); + curDataSlice.position(curDataSlice.position() + skip); + skipped += skip; } - return nSkipped; + return skipped; } private void readTrailingEmptyPacket() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java new file mode 100644 index 0000000000..3d916a79fc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +abstract public class TestBlockReaderBase { + private BlockReaderTestUtil util; + private byte[] blockData; + private BlockReader reader; + + /** + * if override this, make sure return array length is less than + * block size. + */ + byte [] getBlockData() { + int length = 1 << 22; + byte[] data = new byte[length]; + for (int i = 0; i < length; i++) { + data[i] = (byte) (i % 133); + } + return data; + } + + private BlockReader getBlockReader(LocatedBlock block) throws Exception { + return util.getBlockReader(block, 0, blockData.length); + } + + abstract HdfsConfiguration createConf(); + + @Before + public void setup() throws Exception { + util = new BlockReaderTestUtil(1, createConf()); + blockData = getBlockData(); + DistributedFileSystem fs = util.getCluster().getFileSystem(); + Path testfile = new Path("/testfile"); + FSDataOutputStream fout = fs.create(testfile); + fout.write(blockData); + fout.close(); + LocatedBlock blk = util.getFileBlocks(testfile, blockData.length).get(0); + reader = getBlockReader(blk); + } + + @After + public void shutdown() throws Exception { + util.shutdown(); + } + + @Test(timeout=60000) + public void testSkip() throws IOException { + Random random = new Random(); + byte [] buf = new byte[1]; + for (int pos = 0; pos < blockData.length;) { + long skip = random.nextInt(100) + 1; + long skipped = reader.skip(skip); + if (pos + skip >= blockData.length) { + assertEquals(blockData.length, pos + skipped); + break; + } else { + assertEquals(skip, skipped); + pos += skipped; + assertEquals(1, reader.read(buf, 0, 1)); + + assertEquals(blockData[pos], buf[0]); + pos += 1; + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java new file mode 100644 index 0000000000..b9ec2cea3b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.hamcrest.CoreMatchers.equalTo; + +import java.io.File; +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.junit.Assume; +import org.junit.Test; + +public class TestDFSInputStream { + private void testSkipInner(MiniDFSCluster cluster) throws IOException { + DistributedFileSystem fs = cluster.getFileSystem(); + DFSClient client = fs.dfs; + Path file = new Path("/testfile"); + int fileLength = 1 << 22; + byte[] fileContent = new byte[fileLength]; + for (int i = 0; i < fileLength; i++) { + fileContent[i] = (byte) (i % 133); + } + FSDataOutputStream fout = fs.create(file); + fout.write(fileContent); + fout.close(); + Random random = new Random(); + for (int i = 3; i < 18; i++) { + DFSInputStream fin = client.open("/testfile"); + for (long pos = 0; pos < fileLength;) { + long skip = random.nextInt(1 << i) + 1; + long skipped = fin.skip(skip); + if (pos + skip >= fileLength) { + assertEquals(fileLength, pos + skipped); + break; + } else { + assertEquals(skip, skipped); + pos += skipped; + int data = fin.read(); + assertEquals(pos % 133, data); + pos += 1; + } + } + fin.close(); + } + } + + @Test(timeout=60000) + public void testSkipWithRemoteBlockReader() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + testSkipInner(cluster); + } finally { + cluster.shutdown(); + } + } + + @Test(timeout=60000) + public void testSkipWithRemoteBlockReader2() throws IOException { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + testSkipInner(cluster); + } finally { + cluster.shutdown(); + } + } + + @Test(timeout=60000) + public void testSkipWithLocalBlockReader() throws IOException { + Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + DomainSocket.disableBindPathValidation(); + Configuration conf = new Configuration(); + conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); + conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, + new File(sockDir.getDir(), + "TestShortCircuitLocalRead._PORT.sock").getAbsolutePath()); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + DFSInputStream.tcpReadsDisabledForTesting = true; + testSkipInner(cluster); + } finally { + DFSInputStream.tcpReadsDisabledForTesting = false; + cluster.shutdown(); + sockDir.close(); + } + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java new file mode 100644 index 0000000000..8ab110d5aa --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +public class TestRemoteBlockReader extends TestBlockReaderBase { + + HdfsConfiguration createConf() { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true); + return conf; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java new file mode 100644 index 0000000000..c23b4b78f5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +public class TestRemoteBlockReader2 extends TestBlockReaderBase { + HdfsConfiguration createConf() { + HdfsConfiguration conf = new HdfsConfiguration(); + return conf; + } +}