diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java index 57d6eb9a0d..f78fb7a5c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java @@ -21,9 +21,13 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.web.ByteRangeInputStream; +import org.apache.hadoop.hdfs.web.WebHdfsConstants; +import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import org.junit.AfterClass; import org.junit.Assert; @@ -33,23 +37,26 @@ import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Random; public class TestWriteReadStripedFile { private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; - - private static DistributedFileSystem fs; private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; private final static int stripesPerBlock = 4; static int blockSize = cellSize * stripesPerBlock; static int numDNs = dataBlocks + parityBlocks + 2; private static MiniDFSCluster cluster; + private static Configuration conf; + private static FileSystem fs; + + private static Random r= new Random(); @BeforeClass public static void setup() throws IOException { - Configuration conf = new Configuration(); + conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster.getFileSystem().getClient().createErasureCodingZone("/", null); @@ -134,7 +141,7 @@ public void testFileMoreThanABlockGroup1() throws IOException { @Test public void testFileMoreThanABlockGroup2() throws IOException { testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2", - blockSize * dataBlocks + cellSize+ 123); + blockSize * dataBlocks + cellSize + 123); } @@ -171,7 +178,7 @@ private byte getByte(long pos) { } private void assertSeekAndRead(FSDataInputStream fsdis, int pos, - int writeBytes) throws IOException { + int writeBytes) throws IOException { fsdis.seek(pos); byte[] buf = new byte[writeBytes]; int readLen = readAll(fsdis, buf); @@ -182,147 +189,169 @@ private void assertSeekAndRead(FSDataInputStream fsdis, int pos, } } - private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes) + private void testOneFileUsingDFSStripedInputStream(String src, int fileLength) throws IOException { - Path testPath = new Path(src); - final byte[] bytes = generateBytes(writeBytes); - DFSTestUtil.writeFile(fs, testPath, new String(bytes)); - //check file length - FileStatus status = fs.getFileStatus(testPath); - long fileLength = status.getLen(); + final byte[] expected = generateBytes(fileLength); + Path srcPath = new Path(src); + DFSTestUtil.writeFile(fs, srcPath, new String(expected)); + + verifyLength(fs, srcPath, fileLength); + + byte[] smallBuf = new byte[1024]; + byte[] largeBuf = new byte[fileLength + 100]; + verifyPread(fs, srcPath, fileLength, expected, largeBuf); + + verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf); + verifySeek(fs, srcPath, fileLength); + verifyStatefulRead(fs, srcPath, fileLength, expected, + ByteBuffer.allocate(fileLength + 100)); + verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf); + verifyStatefulRead(fs, srcPath, fileLength, expected, + ByteBuffer.allocate(1024)); + } + + @Test + public void testWriteReadUsingWebHdfs() throws Exception { + int fileLength = blockSize * dataBlocks + cellSize + 123; + + final byte[] expected = generateBytes(fileLength); + FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, + WebHdfsConstants.WEBHDFS_SCHEME); + Path srcPath = new Path("/testWriteReadUsingWebHdfs_stripe"); + DFSTestUtil.writeFile(fs, srcPath, new String(expected)); + + verifyLength(fs, srcPath, fileLength); + + byte[] smallBuf = new byte[1024]; + byte[] largeBuf = new byte[fileLength + 100]; + verifyPread(fs, srcPath, fileLength, expected, largeBuf); + + verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf); + verifySeek(fs, srcPath, fileLength); + verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf); + //webhdfs doesn't support bytebuffer read + + } + + void verifyLength(FileSystem fs, Path srcPath, int fileLength) + throws IOException { + FileStatus status = fs.getFileStatus(srcPath); Assert.assertEquals("File length should be the same", - writeBytes, fileLength); + fileLength, status.getLen()); + } - // pread - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - byte[] buf = new byte[writeBytes + 100]; - int readLen = fsdis.read(0, buf, 0, buf.length); - readLen = readLen >= 0 ? readLen : 0; - Assert.assertEquals("The length of file should be the same to write size", - writeBytes, readLen); - for (int i = 0; i < writeBytes; i++) { - Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), - buf[i]); + void verifyPread(FileSystem fs, Path srcPath, int fileLength, + byte[] expected, byte[] buf) throws IOException { + FSDataInputStream in = fs.open(srcPath); + int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102, + cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102, + cellSize * dataBlocks, fileLength - 102, fileLength - 1}; + for (int startOffset : startOffsets) { + startOffset = Math.max(0, Math.min(startOffset, fileLength - 1)); + int remaining = fileLength - startOffset; + in.readFully(startOffset, buf, 0, remaining); + for (int i = 0; i < remaining; i++) { + Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " + + "same", + expected[startOffset + i], buf[i]); } } + in.close(); + } - // stateful read with byte array - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - byte[] buf = new byte[writeBytes + 100]; - int readLen = readAll(fsdis, buf); - Assert.assertEquals("The length of file should be the same to write size", - writeBytes, readLen); - for (int i = 0; i < writeBytes; i++) { - Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), - buf[i]); + void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength, + byte[] expected, byte[] buf) throws IOException { + FSDataInputStream in = fs.open(srcPath); + final byte[] result = new byte[fileLength]; + int readLen = 0; + int ret; + do { + ret = in.read(buf, 0, buf.length); + if (ret > 0) { + System.arraycopy(buf, 0, result, readLen, ret); + readLen += ret; } + } while (ret >= 0); + Assert.assertEquals("The length of file should be the same to write size", + fileLength, readLen); + Assert.assertArrayEquals(expected, result); + in.close(); + } + + + void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength, + byte[] expected, ByteBuffer buf) throws IOException { + FSDataInputStream in = fs.open(srcPath); + ByteBuffer result = ByteBuffer.allocate(fileLength); + int readLen = 0; + int ret; + do { + ret = in.read(buf); + if (ret > 0) { + readLen += ret; + buf.flip(); + result.put(buf); + buf.clear(); + } + } while (ret >= 0); + readLen = readLen >= 0 ? readLen : 0; + Assert.assertEquals("The length of file should be the same to write size", + fileLength, readLen); + Assert.assertArrayEquals(expected, result.array()); + in.close(); + } + + + void verifySeek(FileSystem fs, Path srcPath, int fileLength) + throws IOException { + FSDataInputStream in = fs.open(srcPath); + // seek to 1/2 of content + int pos = fileLength / 2; + assertSeekAndRead(in, pos, fileLength); + + // seek to 1/3 of content + pos = fileLength / 3; + assertSeekAndRead(in, pos, fileLength); + + // seek to 0 pos + pos = 0; + assertSeekAndRead(in, pos, fileLength); + + if (fileLength > cellSize) { + // seek to cellSize boundary + pos = cellSize - 1; + assertSeekAndRead(in, pos, fileLength); } - // seek and stateful read - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - // seek to 1/2 of content - int pos = writeBytes/2; - assertSeekAndRead(fsdis, pos, writeBytes); + if (fileLength > cellSize * dataBlocks) { + // seek to striped cell group boundary + pos = cellSize * dataBlocks - 1; + assertSeekAndRead(in, pos, fileLength); + } - // seek to 1/3 of content - pos = writeBytes/3; - assertSeekAndRead(fsdis, pos, writeBytes); - - // seek to 0 pos - pos = 0; - assertSeekAndRead(fsdis, pos, writeBytes); - - if (writeBytes > cellSize) { - // seek to cellSize boundary - pos = cellSize -1; - assertSeekAndRead(fsdis, pos, writeBytes); - } - - if (writeBytes > cellSize * dataBlocks) { - // seek to striped cell group boundary - pos = cellSize * dataBlocks - 1; - assertSeekAndRead(fsdis, pos, writeBytes); - } - - if (writeBytes > blockSize * dataBlocks) { - // seek to striped block group boundary - pos = blockSize * dataBlocks - 1; - assertSeekAndRead(fsdis, pos, writeBytes); - } + if (fileLength > blockSize * dataBlocks) { + // seek to striped block group boundary + pos = blockSize * dataBlocks - 1; + assertSeekAndRead(in, pos, fileLength); + } + if(!(in.getWrappedStream() instanceof ByteRangeInputStream)){ try { - fsdis.seek(-1); + in.seek(-1); Assert.fail("Should be failed if seek to negative offset"); } catch (EOFException e) { // expected } try { - fsdis.seek(writeBytes + 1); + in.seek(fileLength + 1); Assert.fail("Should be failed if seek after EOF"); } catch (EOFException e) { // expected } } - - // stateful read with ByteBuffer - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100); - int readLen = 0; - int ret; - do { - ret = fsdis.read(buf); - if (ret > 0) { - readLen += ret; - } - } while (ret >= 0); - readLen = readLen >= 0 ? readLen : 0; - Assert.assertEquals("The length of file should be the same to write size", - writeBytes, readLen); - for (int i = 0; i < writeBytes; i++) { - Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), - buf.array()[i]); - } - } - - // stateful read with 1KB size byte array - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - final byte[] result = new byte[writeBytes]; - final byte[] buf = new byte[1024]; - int readLen = 0; - int ret; - do { - ret = fsdis.read(buf, 0, buf.length); - if (ret > 0) { - System.arraycopy(buf, 0, result, readLen, ret); - readLen += ret; - } - } while (ret >= 0); - Assert.assertEquals("The length of file should be the same to write size", - writeBytes, readLen); - Assert.assertArrayEquals(bytes, result); - } - - // stateful read using ByteBuffer with 1KB size - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - final ByteBuffer result = ByteBuffer.allocate(writeBytes); - final ByteBuffer buf = ByteBuffer.allocate(1024); - int readLen = 0; - int ret; - do { - ret = fsdis.read(buf); - if (ret > 0) { - readLen += ret; - buf.flip(); - result.put(buf); - buf.clear(); - } - } while (ret >= 0); - Assert.assertEquals("The length of file should be the same to write size", - writeBytes, readLen); - Assert.assertArrayEquals(bytes, result.array()); - } + in.close(); } @Test