HDFS-14111. hdfsOpenFile on HDFS causes unnecessary IO from file offset 0. Contributed by Sahil Takiar.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
parent
01ada40ea4
commit
f5a4b43a49
@ -740,6 +740,7 @@ public boolean hasCapability(String capability) {
|
|||||||
case StreamCapabilities.READAHEAD:
|
case StreamCapabilities.READAHEAD:
|
||||||
case StreamCapabilities.DROPBEHIND:
|
case StreamCapabilities.DROPBEHIND:
|
||||||
case StreamCapabilities.UNBUFFER:
|
case StreamCapabilities.UNBUFFER:
|
||||||
|
case StreamCapabilities.READBYTEBUFFER:
|
||||||
return true;
|
return true;
|
||||||
default:
|
default:
|
||||||
return false;
|
return false;
|
||||||
|
@ -0,0 +1,64 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementers of this interface provide a positioned read API that writes to a
|
||||||
|
* {@link ByteBuffer} rather than a {@code byte[]}.
|
||||||
|
*
|
||||||
|
* @see PositionedReadable
|
||||||
|
* @see ByteBufferReadable
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public interface ByteBufferPositionedReadable {
|
||||||
|
/**
|
||||||
|
* Reads up to {@code buf.remaining()} bytes into buf from a given position
|
||||||
|
* in the file and returns the number of bytes read. Callers should use
|
||||||
|
* {@code buf.limit(...)} to control the size of the desired read and
|
||||||
|
* {@code buf.position(...)} to control the offset into the buffer the data
|
||||||
|
* should be written to.
|
||||||
|
* <p>
|
||||||
|
* After a successful call, {@code buf.position()} will be advanced by the
|
||||||
|
* number of bytes read and {@code buf.limit()} should be unchanged.
|
||||||
|
* <p>
|
||||||
|
* In the case of an exception, the values of {@code buf.position()} and
|
||||||
|
* {@code buf.limit()} are undefined, and callers should be prepared to
|
||||||
|
* recover from this eventuality.
|
||||||
|
* <p>
|
||||||
|
* Many implementations will throw {@link UnsupportedOperationException}, so
|
||||||
|
* callers that are not confident in support for this method from the
|
||||||
|
* underlying filesystem should be prepared to handle that exception.
|
||||||
|
* <p>
|
||||||
|
* Implementations should treat 0-length requests as legitimate, and must not
|
||||||
|
* signal an error upon their receipt.
|
||||||
|
*
|
||||||
|
* @param position position within file
|
||||||
|
* @param buf the ByteBuffer to receive the results of the read operation.
|
||||||
|
* @return the number of bytes read, possibly zero, or -1 if reached
|
||||||
|
* end-of-stream
|
||||||
|
* @throws IOException if there is some error performing the read
|
||||||
|
*/
|
||||||
|
int read(long position, ByteBuffer buf) throws IOException;
|
||||||
|
}
|
@ -59,6 +59,12 @@ public interface StreamCapabilities {
|
|||||||
*/
|
*/
|
||||||
String UNBUFFER = "in:unbuffer";
|
String UNBUFFER = "in:unbuffer";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream read(ByteBuffer) capability implemented by
|
||||||
|
* {@link ByteBufferReadable#read(java.nio.ByteBuffer)}.
|
||||||
|
*/
|
||||||
|
String READBYTEBUFFER = "in:readbytebuffer";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Capabilities that a stream can support and be queried for.
|
* Capabilities that a stream can support and be queried for.
|
||||||
*/
|
*/
|
||||||
|
@ -1779,6 +1779,7 @@ public boolean hasCapability(String capability) {
|
|||||||
case StreamCapabilities.READAHEAD:
|
case StreamCapabilities.READAHEAD:
|
||||||
case StreamCapabilities.DROPBEHIND:
|
case StreamCapabilities.DROPBEHIND:
|
||||||
case StreamCapabilities.UNBUFFER:
|
case StreamCapabilities.UNBUFFER:
|
||||||
|
case StreamCapabilities.READBYTEBUFFER:
|
||||||
return true;
|
return true;
|
||||||
default:
|
default:
|
||||||
return false;
|
return false;
|
||||||
|
@ -1013,7 +1013,7 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
|
|||||||
return f{is|os};
|
return f{is|os};
|
||||||
*/
|
*/
|
||||||
int accmode = flags & O_ACCMODE;
|
int accmode = flags & O_ACCMODE;
|
||||||
jstring jStrBufferSize = NULL, jStrReplication = NULL;
|
jstring jStrBufferSize = NULL, jStrReplication = NULL, jCapabilityString = NULL;
|
||||||
jobject jConfiguration = NULL, jPath = NULL, jFile = NULL;
|
jobject jConfiguration = NULL, jPath = NULL, jFile = NULL;
|
||||||
jobject jFS = (jobject)fs;
|
jobject jFS = (jobject)fs;
|
||||||
jthrowable jthr;
|
jthrowable jthr;
|
||||||
@ -1171,16 +1171,22 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
|
|||||||
file->flags = 0;
|
file->flags = 0;
|
||||||
|
|
||||||
if ((flags & O_WRONLY) == 0) {
|
if ((flags & O_WRONLY) == 0) {
|
||||||
// Try a test read to see if we can do direct reads
|
// Check the StreamCapabilities of jFile to see if we can do direct reads
|
||||||
char buf;
|
jthr = newJavaStr(env, "in:readbytebuffer", &jCapabilityString);
|
||||||
if (readDirect(fs, file, &buf, 0) == 0) {
|
if (jthr) {
|
||||||
// Success - 0-byte read should return 0
|
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||||
|
"hdfsOpenFile(%s): newJavaStr", path);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
jthr = invokeMethod(env, &jVal, INSTANCE, jFile, HADOOP_ISTRM,
|
||||||
|
"hasCapability", "(Ljava/lang/String;)Z", jCapabilityString);
|
||||||
|
if (jthr) {
|
||||||
|
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||||
|
"hdfsOpenFile(%s): FSDataInputStream#hasCapability", path);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
if (jVal.z) {
|
||||||
file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
|
file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
|
||||||
} else if (errno != ENOTSUP) {
|
|
||||||
// Unexpected error. Clear it, don't set the direct flag.
|
|
||||||
fprintf(stderr,
|
|
||||||
"hdfsOpenFile(%s): WARN: Unexpected error %d when testing "
|
|
||||||
"for direct read compatibility\n", path, errno);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ret = 0;
|
ret = 0;
|
||||||
@ -1190,7 +1196,8 @@ done:
|
|||||||
destroyLocalReference(env, jStrReplication);
|
destroyLocalReference(env, jStrReplication);
|
||||||
destroyLocalReference(env, jConfiguration);
|
destroyLocalReference(env, jConfiguration);
|
||||||
destroyLocalReference(env, jPath);
|
destroyLocalReference(env, jPath);
|
||||||
destroyLocalReference(env, jFile);
|
destroyLocalReference(env, jFile);
|
||||||
|
destroyLocalReference(env, jCapabilityString);
|
||||||
if (ret) {
|
if (ret) {
|
||||||
if (file) {
|
if (file) {
|
||||||
if (file->file) {
|
if (file->file) {
|
||||||
|
@ -503,7 +503,10 @@ TEST_F(HdfsExtTest, TestReadStats) {
|
|||||||
hdfsFileFreeReadStatistics(stats);
|
hdfsFileFreeReadStatistics(stats);
|
||||||
|
|
||||||
EXPECT_EQ(0, hdfsCloseFile(fs, file));
|
EXPECT_EQ(0, hdfsCloseFile(fs, file));
|
||||||
EXPECT_EQ(0, errno);
|
// Since libhdfs is not guaranteed to set errno to 0 on successful
|
||||||
|
// operations, we disable this check for now, see HDFS-14325 for a
|
||||||
|
// long term solution to this problem
|
||||||
|
// EXPECT_EQ(0, errno);
|
||||||
}
|
}
|
||||||
|
|
||||||
//Testing working directory
|
//Testing working directory
|
||||||
|
@ -0,0 +1,27 @@
|
|||||||
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This exception collects all IOExceptions thrown when adding block pools and
|
||||||
|
* scanning volumes. It keeps the information about which volume is associated
|
||||||
|
* with an exception.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class AddBlockPoolException extends IOException {
|
||||||
|
private Map<FsVolumeSpi, IOException> unhealthyDataDirs;
|
||||||
|
public AddBlockPoolException(Map<FsVolumeSpi, IOException>
|
||||||
|
unhealthyDataDirs) {
|
||||||
|
this.unhealthyDataDirs = unhealthyDataDirs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<FsVolumeSpi, IOException> getFailingVolumes() {
|
||||||
|
return unhealthyDataDirs;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return getClass().getName() + ": " + unhealthyDataDirs.toString();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,269 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class tests the DFS positional read functionality on a single node
|
||||||
|
* mini-cluster. These tests are inspired from {@link TestPread}. The tests
|
||||||
|
* are much less comprehensive than other pread tests because pread already
|
||||||
|
* internally uses {@link ByteBuffer}s.
|
||||||
|
*/
|
||||||
|
public class TestByteBufferPread {
|
||||||
|
|
||||||
|
private static MiniDFSCluster cluster;
|
||||||
|
private static FileSystem fs;
|
||||||
|
private static byte[] fileContents;
|
||||||
|
private static Path testFile;
|
||||||
|
private static Random rand;
|
||||||
|
|
||||||
|
private static final long SEED = 0xDEADBEEFL;
|
||||||
|
private static final int BLOCK_SIZE = 4096;
|
||||||
|
private static final int FILE_SIZE = 12 * BLOCK_SIZE;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() throws IOException {
|
||||||
|
// Setup the cluster with a small block size so we can create small files
|
||||||
|
// that span multiple blocks
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
|
fs = cluster.getFileSystem();
|
||||||
|
|
||||||
|
// Create a test file that spans 12 blocks, and contains a bunch of random
|
||||||
|
// bytes
|
||||||
|
fileContents = new byte[FILE_SIZE];
|
||||||
|
rand = new Random(SEED);
|
||||||
|
rand.nextBytes(fileContents);
|
||||||
|
testFile = new Path("/byte-buffer-pread-test.dat");
|
||||||
|
try (FSDataOutputStream out = fs.create(testFile, (short) 3)) {
|
||||||
|
out.write(fileContents);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test preads with {@link java.nio.HeapByteBuffer}s.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testPreadWithHeapByteBuffer() throws IOException {
|
||||||
|
testPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
|
||||||
|
testPreadWithFullByteBuffer(ByteBuffer.allocate(FILE_SIZE));
|
||||||
|
testPreadWithPositionedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
|
||||||
|
testPreadWithLimitedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
|
||||||
|
testPositionedPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test preads with {@link java.nio.DirectByteBuffer}s.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testPreadWithDirectByteBuffer() throws IOException {
|
||||||
|
testPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
|
||||||
|
testPreadWithFullByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
|
||||||
|
testPreadWithPositionedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
|
||||||
|
testPreadWithLimitedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
|
||||||
|
testPositionedPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads the entire testFile using the pread API and validates that its
|
||||||
|
* contents are properly loaded into the supplied {@link ByteBuffer}.
|
||||||
|
*/
|
||||||
|
private void testPreadWithByteBuffer(ByteBuffer buffer) throws IOException {
|
||||||
|
int bytesRead;
|
||||||
|
int totalBytesRead = 0;
|
||||||
|
try (FSDataInputStream in = fs.open(testFile)) {
|
||||||
|
while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
|
||||||
|
totalBytesRead += bytesRead;
|
||||||
|
// Check that each call to read changes the position of the ByteBuffer
|
||||||
|
// correctly
|
||||||
|
assertEquals(totalBytesRead, buffer.position());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the buffer is full
|
||||||
|
assertFalse(buffer.hasRemaining());
|
||||||
|
// Make sure the contents of the read buffer equal the contents of the
|
||||||
|
// file
|
||||||
|
buffer.position(0);
|
||||||
|
byte[] bufferContents = new byte[FILE_SIZE];
|
||||||
|
buffer.get(bufferContents);
|
||||||
|
assertArrayEquals(bufferContents, fileContents);
|
||||||
|
buffer.position(buffer.limit());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempts to read the testFile into a {@link ByteBuffer} that is already
|
||||||
|
* full, and validates that doing so does not change the contents of the
|
||||||
|
* supplied {@link ByteBuffer}.
|
||||||
|
*/
|
||||||
|
private void testPreadWithFullByteBuffer(ByteBuffer buffer)
|
||||||
|
throws IOException {
|
||||||
|
// Load some dummy data into the buffer
|
||||||
|
byte[] existingBufferBytes = new byte[FILE_SIZE];
|
||||||
|
rand.nextBytes(existingBufferBytes);
|
||||||
|
buffer.put(existingBufferBytes);
|
||||||
|
// Make sure the buffer is full
|
||||||
|
assertFalse(buffer.hasRemaining());
|
||||||
|
|
||||||
|
try (FSDataInputStream in = fs.open(testFile)) {
|
||||||
|
// Attempt to read into the buffer, 0 bytes should be read since the
|
||||||
|
// buffer is full
|
||||||
|
assertEquals(0, in.read(buffer));
|
||||||
|
|
||||||
|
// Double check the buffer is still full and its contents have not
|
||||||
|
// changed
|
||||||
|
assertFalse(buffer.hasRemaining());
|
||||||
|
buffer.position(0);
|
||||||
|
byte[] bufferContents = new byte[FILE_SIZE];
|
||||||
|
buffer.get(bufferContents);
|
||||||
|
assertArrayEquals(bufferContents, existingBufferBytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads half of the testFile into the {@link ByteBuffer} by setting a
|
||||||
|
* {@link ByteBuffer#limit} on the buffer. Validates that only half of the
|
||||||
|
* testFile is loaded into the buffer.
|
||||||
|
*/
|
||||||
|
private void testPreadWithLimitedByteBuffer(
|
||||||
|
ByteBuffer buffer) throws IOException {
|
||||||
|
int bytesRead;
|
||||||
|
int totalBytesRead = 0;
|
||||||
|
// Set the buffer limit to half the size of the file
|
||||||
|
buffer.limit(FILE_SIZE / 2);
|
||||||
|
|
||||||
|
try (FSDataInputStream in = fs.open(testFile)) {
|
||||||
|
while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
|
||||||
|
totalBytesRead += bytesRead;
|
||||||
|
// Check that each call to read changes the position of the ByteBuffer
|
||||||
|
// correctly
|
||||||
|
assertEquals(totalBytesRead, buffer.position());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since we set the buffer limit to half the size of the file, we should
|
||||||
|
// have only read half of the file into the buffer
|
||||||
|
assertEquals(totalBytesRead, FILE_SIZE / 2);
|
||||||
|
// Check that the buffer is full and the contents equal the first half of
|
||||||
|
// the file
|
||||||
|
assertFalse(buffer.hasRemaining());
|
||||||
|
buffer.position(0);
|
||||||
|
byte[] bufferContents = new byte[FILE_SIZE / 2];
|
||||||
|
buffer.get(bufferContents);
|
||||||
|
assertArrayEquals(bufferContents,
|
||||||
|
Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads half of the testFile into the {@link ByteBuffer} by setting the
|
||||||
|
* {@link ByteBuffer#position} the half the size of the file. Validates that
|
||||||
|
* only half of the testFile is loaded into the buffer.
|
||||||
|
*/
|
||||||
|
private void testPreadWithPositionedByteBuffer(
|
||||||
|
ByteBuffer buffer) throws IOException {
|
||||||
|
int bytesRead;
|
||||||
|
int totalBytesRead = 0;
|
||||||
|
// Set the buffer position to half the size of the file
|
||||||
|
buffer.position(FILE_SIZE / 2);
|
||||||
|
|
||||||
|
try (FSDataInputStream in = fs.open(testFile)) {
|
||||||
|
while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
|
||||||
|
totalBytesRead += bytesRead;
|
||||||
|
// Check that each call to read changes the position of the ByteBuffer
|
||||||
|
// correctly
|
||||||
|
assertEquals(totalBytesRead + FILE_SIZE / 2, buffer.position());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since we set the buffer position to half the size of the file, we
|
||||||
|
// should have only read half of the file into the buffer
|
||||||
|
assertEquals(totalBytesRead, FILE_SIZE / 2);
|
||||||
|
// Check that the buffer is full and the contents equal the first half of
|
||||||
|
// the file
|
||||||
|
assertFalse(buffer.hasRemaining());
|
||||||
|
buffer.position(FILE_SIZE / 2);
|
||||||
|
byte[] bufferContents = new byte[FILE_SIZE / 2];
|
||||||
|
buffer.get(bufferContents);
|
||||||
|
assertArrayEquals(bufferContents,
|
||||||
|
Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads half of the testFile into the {@link ByteBuffer} by specifying a
|
||||||
|
* position for the pread API that is half of the file size. Validates that
|
||||||
|
* only half of the testFile is loaded into the buffer.
|
||||||
|
*/
|
||||||
|
private void testPositionedPreadWithByteBuffer(
|
||||||
|
ByteBuffer buffer) throws IOException {
|
||||||
|
int bytesRead;
|
||||||
|
int totalBytesRead = 0;
|
||||||
|
|
||||||
|
try (FSDataInputStream in = fs.open(testFile)) {
|
||||||
|
// Start reading from halfway through the file
|
||||||
|
while ((bytesRead = in.read(totalBytesRead + FILE_SIZE / 2,
|
||||||
|
buffer)) > 0) {
|
||||||
|
totalBytesRead += bytesRead;
|
||||||
|
// Check that each call to read changes the position of the ByteBuffer
|
||||||
|
// correctly
|
||||||
|
assertEquals(totalBytesRead, buffer.position());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since we starting reading halfway through the file, the buffer should
|
||||||
|
// only be half full
|
||||||
|
assertEquals(totalBytesRead, FILE_SIZE / 2);
|
||||||
|
assertEquals(buffer.position(), FILE_SIZE / 2);
|
||||||
|
assertTrue(buffer.hasRemaining());
|
||||||
|
// Check that the buffer contents equal the second half of the file
|
||||||
|
buffer.position(0);
|
||||||
|
byte[] bufferContents = new byte[FILE_SIZE / 2];
|
||||||
|
buffer.get(bufferContents);
|
||||||
|
assertArrayEquals(bufferContents,
|
||||||
|
Arrays.copyOfRange(fileContents, FILE_SIZE / 2, FILE_SIZE));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void shutdown() throws IOException {
|
||||||
|
try {
|
||||||
|
fs.delete(testFile, false);
|
||||||
|
fs.close();
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user