HDFS-6071. BlockReaderLocal does not return -1 on EOF when doing a zero-length read on a short file. (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1575797 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2014-03-10 02:41:45 +00:00
parent 94a1632fcb
commit d59dbc9e38
4 changed files with 136 additions and 4 deletions

View File

@ -729,6 +729,9 @@ Release 2.4.0 - UNRELEASED
HDFS-6078. TestIncrementalBlockReports is flaky. (Arpit Agarwal)
HDFS-6071. BlockReaderLocal doesn't return -1 on EOF when doing a
zero-length read on a short file (cmccabe)
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

View File

@ -478,7 +478,7 @@ private synchronized int readWithBounceBuffer(ByteBuffer buf,
total += bb;
if (buf.remaining() == 0) return total;
}
boolean eof = false;
boolean eof = true, done = false;
do {
if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength)
&& ((dataPos % bytesPerChecksum) == 0)) {
@ -493,20 +493,24 @@ private synchronized int readWithBounceBuffer(ByteBuffer buf,
buf.limit(oldLimit);
}
if (nRead < maxReadaheadLength) {
eof = true;
done = true;
}
if (nRead > 0) {
eof = false;
}
total += nRead;
} else {
// Slow lane: refill bounce buffer.
if (fillDataBuf(canSkipChecksum)) {
eof = true;
done = true;
}
bb = drainDataBuf(buf); // drain bounce buffer if possible
if (bb >= 0) {
eof = false;
total += bb;
}
}
} while ((!eof) && (buf.remaining() > 0));
} while ((!done) && (buf.remaining() > 0));
return (eof && total == 0) ? -1 : total;
}

View File

@ -58,11 +58,14 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.VersionInfo;
import org.junit.Assume;
import java.io.*;
import java.net.*;
@ -1138,4 +1141,43 @@ public static long roundUpToMultiple(long val, int factor) {
long c = (val + factor - 1) / factor;
return c * factor;
}
/**
* A short-circuit test context which makes it easier to get a short-circuit
* configuration and set everything up.
*/
public static class ShortCircuitTestContext implements Closeable {
private final String testName;
private final TemporarySocketDirectory sockDir;
private boolean closed = false;
private boolean formerTcpReadsDisabled;
public ShortCircuitTestContext(String testName) {
this.testName = testName;
this.sockDir = new TemporarySocketDirectory();
DomainSocket.disableBindPathValidation();
formerTcpReadsDisabled = DFSInputStream.tcpReadsDisabledForTesting;
Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null);
}
public Configuration newConfiguration() {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(),
testName + "._PORT.sock").getAbsolutePath());
return conf;
}
public String getTestName() {
return testName;
}
public void close() throws IOException {
if (closed) return;
closed = true;
DFSInputStream.tcpReadsDisabledForTesting = formerTcpReadsDisabled;
sockDir.close();
}
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.nio.ByteBuffer;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil.ShortCircuitTestContext;
import org.junit.Test;
public class TestRead {
final private int BLOCK_SIZE = 512;
private void testEOF(MiniDFSCluster cluster, int fileLength) throws IOException {
FileSystem fs = cluster.getFileSystem();
Path path = new Path("testEOF." + fileLength);
DFSTestUtil.createFile(fs, path, fileLength, (short)1, 0xBEEFBEEF);
FSDataInputStream fis = fs.open(path);
ByteBuffer empty = ByteBuffer.allocate(0);
// A read into an empty bytebuffer at the beginning of the file gives 0.
Assert.assertEquals(0, fis.read(empty));
fis.seek(fileLength);
// A read into an empty bytebuffer at the end of the file gives -1.
Assert.assertEquals(-1, fis.read(empty));
if (fileLength > BLOCK_SIZE) {
fis.seek(fileLength - BLOCK_SIZE + 1);
ByteBuffer dbb = ByteBuffer.allocateDirect(BLOCK_SIZE);
Assert.assertEquals(BLOCK_SIZE - 1, fis.read(dbb));
}
fis.close();
}
@Test(timeout=60000)
public void testEOFWithBlockReaderLocal() throws Exception {
ShortCircuitTestContext testContext =
new ShortCircuitTestContext("testEOFWithBlockReaderLocal");
try {
final Configuration conf = testContext.newConfiguration();
conf.setLong(DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD, BLOCK_SIZE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).build();
testEOF(cluster, 1);
testEOF(cluster, 14);
testEOF(cluster, 10000);
cluster.shutdown();
} finally {
testContext.close();
}
}
@Test(timeout=60000)
public void testEOFWithRemoteBlockReader() throws Exception {
final Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD, BLOCK_SIZE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).build();
testEOF(cluster, 1);
testEOF(cluster, 14);
testEOF(cluster, 10000);
cluster.shutdown();
}
}