HDFS-5574. Remove buffer copy in BlockReader.skip. Contributed by Binglin Chang.

This commit is contained in:
Akira Ajisaka 2015-04-30 19:09:57 +09:00
parent f5b38477f9
commit e89fc53a1d
8 changed files with 309 additions and 34 deletions

View File

@ -215,6 +215,29 @@ private void fill( ) throws IOException {
if (count < 0) count = 0; if (count < 0) count = 0;
} }
/**
* Like read(byte[], int, int), but does not provide a dest buffer,
* so the read data is discarded.
* @param len maximum number of bytes to read.
* @return the number of bytes read.
* @throws IOException if an I/O error occurs.
*/
final protected synchronized int readAndDiscard(int len) throws IOException {
int total = 0;
while (total < len) {
if (pos >= count) {
count = readChecksumChunk(buf, 0, maxChunkSize);
if (count <= 0) {
break;
}
}
int rd = Math.min(count - pos, len - total);
pos += rd;
total += rd;
}
return total;
}
/* /*
* Read characters into a portion of an array, reading from the underlying * Read characters into a portion of an array, reading from the underlying
* stream at most once if necessary. * stream at most once if necessary.

View File

@ -483,6 +483,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8283. DataStreamer cleanup and some minor improvement. (szetszwo via HDFS-8283. DataStreamer cleanup and some minor improvement. (szetszwo via
jing9) jing9)
HDFS-5574. Remove buffer copy in BlockReader.skip.
(Binglin Chang via aajisaka)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -97,7 +97,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
private boolean eos = false; private boolean eos = false;
private boolean sentStatusCode = false; private boolean sentStatusCode = false;
byte[] skipBuf = null;
ByteBuffer checksumBytes = null; ByteBuffer checksumBytes = null;
/** Amount of unread data in the current received packet */ /** Amount of unread data in the current received packet */
int dataLeft = 0; int dataLeft = 0;
@ -126,10 +125,7 @@ public synchronized int read(byte[] buf, int off, int len)
if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) { if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
// Skip these bytes. But don't call this.skip()! // Skip these bytes. But don't call this.skip()!
int toSkip = (int)(startOffset - firstChunkOffset); int toSkip = (int)(startOffset - firstChunkOffset);
if ( skipBuf == null ) { if ( super.readAndDiscard(toSkip) != toSkip ) {
skipBuf = new byte[bytesPerChecksum];
}
if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
// should never happen // should never happen
throw new IOException("Could not skip required number of bytes"); throw new IOException("Could not skip required number of bytes");
} }
@ -152,15 +148,11 @@ public synchronized int read(byte[] buf, int off, int len)
public synchronized long skip(long n) throws IOException { public synchronized long skip(long n) throws IOException {
/* How can we make sure we don't throw a ChecksumException, at least /* How can we make sure we don't throw a ChecksumException, at least
* in majority of the cases?. This one throws. */ * in majority of the cases?. This one throws. */
if ( skipBuf == null ) {
skipBuf = new byte[bytesPerChecksum];
}
long nSkipped = 0; long nSkipped = 0;
while ( nSkipped < n ) { while (nSkipped < n) {
int toSkip = (int)Math.min(n-nSkipped, skipBuf.length); int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE);
int ret = read(skipBuf, 0, toSkip); int ret = readAndDiscard(toSkip);
if ( ret <= 0 ) { if (ret <= 0) {
return nSkipped; return nSkipped;
} }
nSkipped += ret; nSkipped += ret;

View File

@ -123,11 +123,6 @@ public class RemoteBlockReader2 implements BlockReader {
private boolean sentStatusCode = false; private boolean sentStatusCode = false;
byte[] skipBuf = null;
ByteBuffer checksumBytes = null;
/** Amount of unread data in the current received packet */
int dataLeft = 0;
@VisibleForTesting @VisibleForTesting
public Peer getPeer() { public Peer getPeer() {
return peer; return peer;
@ -172,7 +167,7 @@ public synchronized int read(byte[] buf, int off, int len)
@Override @Override
public int read(ByteBuffer buf) throws IOException { public synchronized int read(ByteBuffer buf) throws IOException {
if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
TraceScope scope = Trace.startSpan( TraceScope scope = Trace.startSpan(
"RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER); "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
@ -258,20 +253,22 @@ private void readNextPacket() throws IOException {
public synchronized long skip(long n) throws IOException { public synchronized long skip(long n) throws IOException {
/* How can we make sure we don't throw a ChecksumException, at least /* How can we make sure we don't throw a ChecksumException, at least
* in majority of the cases?. This one throws. */ * in majority of the cases?. This one throws. */
if ( skipBuf == null ) { long skipped = 0;
skipBuf = new byte[bytesPerChecksum]; while (skipped < n) {
} long needToSkip = n - skipped;
if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
long nSkipped = 0; readNextPacket();
while ( nSkipped < n ) {
int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
int ret = read(skipBuf, 0, toSkip);
if ( ret <= 0 ) {
return nSkipped;
} }
nSkipped += ret; if (curDataSlice.remaining() == 0) {
// we're at EOF now
break;
}
int skip = (int)Math.min(curDataSlice.remaining(), needToSkip);
curDataSlice.position(curDataSlice.position() + skip);
skipped += skip;
} }
return nSkipped; return skipped;
} }
private void readTrailingEmptyPacket() throws IOException { private void readTrailingEmptyPacket() throws IOException {

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
abstract public class TestBlockReaderBase {
private BlockReaderTestUtil util;
private byte[] blockData;
private BlockReader reader;
/**
* if override this, make sure return array length is less than
* block size.
*/
byte [] getBlockData() {
int length = 1 << 22;
byte[] data = new byte[length];
for (int i = 0; i < length; i++) {
data[i] = (byte) (i % 133);
}
return data;
}
private BlockReader getBlockReader(LocatedBlock block) throws Exception {
return util.getBlockReader(block, 0, blockData.length);
}
abstract HdfsConfiguration createConf();
@Before
public void setup() throws Exception {
util = new BlockReaderTestUtil(1, createConf());
blockData = getBlockData();
DistributedFileSystem fs = util.getCluster().getFileSystem();
Path testfile = new Path("/testfile");
FSDataOutputStream fout = fs.create(testfile);
fout.write(blockData);
fout.close();
LocatedBlock blk = util.getFileBlocks(testfile, blockData.length).get(0);
reader = getBlockReader(blk);
}
@After
public void shutdown() throws Exception {
util.shutdown();
}
@Test(timeout=60000)
public void testSkip() throws IOException {
Random random = new Random();
byte [] buf = new byte[1];
for (int pos = 0; pos < blockData.length;) {
long skip = random.nextInt(100) + 1;
long skipped = reader.skip(skip);
if (pos + skip >= blockData.length) {
assertEquals(blockData.length, pos + skipped);
break;
} else {
assertEquals(skip, skipped);
pos += skipped;
assertEquals(1, reader.read(buf, 0, 1));
assertEquals(blockData[pos], buf[0]);
pos += 1;
}
}
}
}

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.hamcrest.CoreMatchers.equalTo;
import java.io.File;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.junit.Assume;
import org.junit.Test;
public class TestDFSInputStream {
private void testSkipInner(MiniDFSCluster cluster) throws IOException {
DistributedFileSystem fs = cluster.getFileSystem();
DFSClient client = fs.dfs;
Path file = new Path("/testfile");
int fileLength = 1 << 22;
byte[] fileContent = new byte[fileLength];
for (int i = 0; i < fileLength; i++) {
fileContent[i] = (byte) (i % 133);
}
FSDataOutputStream fout = fs.create(file);
fout.write(fileContent);
fout.close();
Random random = new Random();
for (int i = 3; i < 18; i++) {
DFSInputStream fin = client.open("/testfile");
for (long pos = 0; pos < fileLength;) {
long skip = random.nextInt(1 << i) + 1;
long skipped = fin.skip(skip);
if (pos + skip >= fileLength) {
assertEquals(fileLength, pos + skipped);
break;
} else {
assertEquals(skip, skipped);
pos += skipped;
int data = fin.read();
assertEquals(pos % 133, data);
pos += 1;
}
}
fin.close();
}
}
@Test(timeout=60000)
public void testSkipWithRemoteBlockReader() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
testSkipInner(cluster);
} finally {
cluster.shutdown();
}
}
@Test(timeout=60000)
public void testSkipWithRemoteBlockReader2() throws IOException {
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
testSkipInner(cluster);
} finally {
cluster.shutdown();
}
}
@Test(timeout=60000)
public void testSkipWithLocalBlockReader() throws IOException {
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
DomainSocket.disableBindPathValidation();
Configuration conf = new Configuration();
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(),
"TestShortCircuitLocalRead._PORT.sock").getAbsolutePath());
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
DFSInputStream.tcpReadsDisabledForTesting = true;
testSkipInner(cluster);
} finally {
DFSInputStream.tcpReadsDisabledForTesting = false;
cluster.shutdown();
sockDir.close();
}
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
public class TestRemoteBlockReader extends TestBlockReaderBase {
HdfsConfiguration createConf() {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
return conf;
}
}

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
public class TestRemoteBlockReader2 extends TestBlockReaderBase {
HdfsConfiguration createConf() {
HdfsConfiguration conf = new HdfsConfiguration();
return conf;
}
}