From 71329e817b99dbee630f902fa3640c3c93f04a44 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Tue, 2 Jun 2015 15:35:49 -0700 Subject: [PATCH] HDFS-8517. Fix a decoding issue in stripped block recovering in client side. Contributed by Kai Zheng. --- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../hadoop/hdfs/DFSStripedInputStream.java | 7 +- .../hadoop/hdfs/util/StripedBlockUtil.java | 57 +++-- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 14 +- .../hadoop/hdfs/StripedFileTestUtil.java | 59 +++++ .../hdfs/TestDFSStripedInputStream.java | 7 +- .../hdfs/TestReadStripedFileWithDecoding.java | 108 ++++++++ .../hadoop/hdfs/TestWriteReadStripedFile.java | 236 ++++++------------ 8 files changed, 304 insertions(+), 187 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index c3c55c731a..fa0a8e2c54 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -271,3 +271,6 @@ HDFS-8444. Erasure Coding: fix cannot rename a zone dir (Walter Su via vinayakumarb) + + HDFS-8517. Fix a decoding issue in stripped block recovering in client side. + (Kai Zheng via jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index b4aa033545..228368ba49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -597,9 +597,10 @@ private void fetchOneStripe(LocatedStripedBlock blockGroup, } if (alignedStripe.missingChunksNum > 0) { - finalizeDecodeInputs(decodeInputs, alignedStripe); - decodeAndFillBuffer(decodeInputs, buf, alignedStripe, parityBlkNum, - decoder); + finalizeDecodeInputs(decodeInputs, dataBlkNum, parityBlkNum, + alignedStripe); + decodeAndFillBuffer(decodeInputs, buf, alignedStripe, dataBlkNum, + parityBlkNum, decoder); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index e75209fcc2..80321ef959 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -31,7 +31,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.io.erasurecode.ECSchema; -import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import java.util.*; @@ -257,7 +256,8 @@ public static byte[][] initDecodeInputs(AlignedStripe alignedStripe, new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()]; for (int i = 0; i < alignedStripe.chunks.length; i++) { if (alignedStripe.chunks[i] == null) { - alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]); + final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum); + alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]); alignedStripe.chunks[i].offsetsInBuf.add(0); alignedStripe.chunks[i].lengthsInBuf.add((int) alignedStripe.getSpanInBlock()); } @@ -273,35 +273,57 @@ public static byte[][] initDecodeInputs(AlignedStripe alignedStripe, * finalize decode input buffers. */ public static void finalizeDecodeInputs(final byte[][] decodeInputs, - AlignedStripe alignedStripe) { + int dataBlkNum, int parityBlkNum, AlignedStripe alignedStripe) { for (int i = 0; i < alignedStripe.chunks.length; i++) { - StripingChunk chunk = alignedStripe.chunks[i]; + final StripingChunk chunk = alignedStripe.chunks[i]; + final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum); if (chunk.state == StripingChunk.FETCHED) { int posInBuf = 0; for (int j = 0; j < chunk.offsetsInBuf.size(); j++) { System.arraycopy(chunk.buf, chunk.offsetsInBuf.get(j), - decodeInputs[i], posInBuf, chunk.lengthsInBuf.get(j)); + decodeInputs[decodeIndex], posInBuf, chunk.lengthsInBuf.get(j)); posInBuf += chunk.lengthsInBuf.get(j); } } else if (chunk.state == StripingChunk.ALLZERO) { - Arrays.fill(decodeInputs[i], (byte)0); + Arrays.fill(decodeInputs[decodeIndex], (byte) 0); } else { - decodeInputs[i] = null; + decodeInputs[decodeIndex] = null; } } } + + /** + * Currently decoding requires parity chunks are before data chunks. + * The indices are opposite to what we store in NN. In future we may + * improve the decoding to make the indices order the same as in NN. + * + * @param index The index to convert + * @param dataBlkNum The number of data blocks + * @param parityBlkNum The number of parity blocks + * @return converted index + */ + public static int convertIndex4Decode(int index, int dataBlkNum, + int parityBlkNum) { + return index < dataBlkNum ? index + parityBlkNum : index - dataBlkNum; + } + + public static int convertDecodeIndexBack(int index, int dataBlkNum, + int parityBlkNum) { + return index < parityBlkNum ? index + dataBlkNum : index - parityBlkNum; + } + /** * Decode based on the given input buffers and schema. */ public static void decodeAndFillBuffer(final byte[][] decodeInputs, - byte[] buf, AlignedStripe alignedStripe, int parityBlkNum, + byte[] buf, AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum, RawErasureDecoder decoder) { // Step 1: prepare indices and output buffers for missing data units int[] decodeIndices = new int[parityBlkNum]; int pos = 0; for (int i = 0; i < alignedStripe.chunks.length; i++) { if (alignedStripe.chunks[i].state == StripingChunk.MISSING){ - decodeIndices[pos++] = i; + decodeIndices[pos++] = convertIndex4Decode(i, dataBlkNum, parityBlkNum); } } decodeIndices = Arrays.copyOf(decodeIndices, pos); @@ -313,13 +335,14 @@ public static void decodeAndFillBuffer(final byte[][] decodeInputs, // Step 3: fill original application buffer with decoded data for (int i = 0; i < decodeIndices.length; i++) { - int missingBlkIdx = decodeIndices[i]; + int missingBlkIdx = convertDecodeIndexBack(decodeIndices[i], + dataBlkNum, parityBlkNum); StripingChunk chunk = alignedStripe.chunks[missingBlkIdx]; if (chunk.state == StripingChunk.MISSING) { int srcPos = 0; for (int j = 0; j < chunk.offsetsInBuf.size(); j++) { - System.arraycopy(decodeOutputs[i], srcPos, buf, chunk.offsetsInBuf.get(j), - chunk.lengthsInBuf.get(j)); + System.arraycopy(decodeOutputs[i], srcPos, buf, + chunk.offsetsInBuf.get(j), chunk.lengthsInBuf.get(j)); srcPos += chunk.lengthsInBuf.get(j); } } @@ -330,7 +353,7 @@ public static void decodeAndFillBuffer(final byte[][] decodeInputs, * This method divides a requested byte range into an array of inclusive * {@link AlignedStripe}. * @param ecSchema The codec schema for the file, which carries the numbers - * of data / parity blocks, as well as cell size + * of data / parity blocks * @param cellSize Cell size of stripe * @param blockGroup The striped block group * @param rangeStartInBlockGroup The byte range's start offset in block group @@ -345,7 +368,6 @@ public static AlignedStripe[] divideByteRangeIntoStripes(ECSchema ecSchema, int cellSize, LocatedStripedBlock blockGroup, long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf, int offsetInBuf) { - // TODO: change ECSchema naming to use cell size instead of chunk size // Step 0: analyze range and calculate basic parameters int dataBlkNum = ecSchema.getNumDataUnits(); @@ -362,8 +384,7 @@ public static AlignedStripe[] divideByteRangeIntoStripes(ECSchema ecSchema, AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges); // Step 4: calculate each chunk's position in destination buffer - calcualteChunkPositionsInBuf(ecSchema, cellSize, stripes, cells, buf, - offsetInBuf); + calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf, offsetInBuf); // Step 5: prepare ALLZERO blocks prepareAllZeroChunks(blockGroup, buf, stripes, cellSize, dataBlkNum); @@ -508,8 +529,8 @@ private static AlignedStripe[] mergeRangesForInternalBlocks( return stripes.toArray(new AlignedStripe[stripes.size()]); } - private static void calcualteChunkPositionsInBuf(ECSchema ecSchema, - int cellSize, AlignedStripe[] stripes, StripingCell[] cells, byte[] buf, + private static void calcualteChunkPositionsInBuf(int cellSize, + AlignedStripe[] stripes, StripingCell[] cells, byte[] buf, int offsetInBuf) { /** * | <--------------- AlignedStripe --------------->| diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 82c078141f..6cd7003b3e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -790,15 +790,21 @@ public static byte[] readFileAsBytes(File f) throws IOException { return os.toByteArray(); } - /* Write the given string to the given file */ - public static void writeFile(FileSystem fs, Path p, String s) + /* Write the given bytes to the given file */ + public static void writeFile(FileSystem fs, Path p, byte[] bytes) throws IOException { if (fs.exists(p)) { fs.delete(p, true); } - InputStream is = new ByteArrayInputStream(s.getBytes()); + InputStream is = new ByteArrayInputStream(bytes); FSDataOutputStream os = fs.create(p); - IOUtils.copyBytes(is, os, s.length(), true); + IOUtils.copyBytes(is, os, bytes.length, true); + } + + /* Write the given string to the given file */ + public static void writeFile(FileSystem fs, Path p, String s) + throws IOException { + writeFile(fs, p, s.getBytes()); } /* Append the given string to the given file */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java new file mode 100644 index 0000000000..54367d743a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; + +import java.io.IOException; +import java.util.Random; + +public class StripedFileTestUtil { + static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; + + static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + static final int stripesPerBlock = 4; + static final int blockSize = cellSize * stripesPerBlock; + static final int numDNs = dataBlocks + parityBlocks + 2; + + static final Random r = new Random(); + + static byte[] generateBytes(int cnt) { + byte[] bytes = new byte[cnt]; + for (int i = 0; i < cnt; i++) { + bytes[i] = getByte(i); + } + return bytes; + } + + static int readAll(FSDataInputStream in, byte[] buf) throws IOException { + int readLen = 0; + int ret; + while ((ret = in.read(buf, readLen, buf.length - readLen)) >= 0 && + readLen <= buf.length) { + readLen += ret; + } + return readLen; + } + + static byte getByte(long pos) { + final int mod = 29; + return (byte) (pos % mod + 1); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index ce563254ae..b64e690e1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -208,17 +208,18 @@ public void testPreadWithDNFailure() throws Exception { // Update the expected content for decoded data for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE]; - int[] missingBlkIdx = new int[]{failedDNIdx, DATA_BLK_NUM+1, DATA_BLK_NUM+2}; + int[] missingBlkIdx = new int[]{failedDNIdx + PARITY_BLK_NUM, 1, 2}; byte[][] decodeOutputs = new byte[PARITY_BLK_NUM][CELLSIZE]; for (int j = 0; j < DATA_BLK_NUM; j++) { int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE; if (j != failedDNIdx) { - System.arraycopy(expected, posInBuf, decodeInputs[j], 0, CELLSIZE); + System.arraycopy(expected, posInBuf, decodeInputs[j + PARITY_BLK_NUM], + 0, CELLSIZE); } } for (int k = 0; k < CELLSIZE; k++) { int posInBlk = i * CELLSIZE + k; - decodeInputs[DATA_BLK_NUM][k] = SimulatedFSDataset.simulatedByte( + decodeInputs[0][k] = SimulatedFSDataset.simulatedByte( new Block(bg.getBlock().getBlockId() + DATA_BLK_NUM), posInBlk); } for (int m : missingBlkIdx) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java new file mode 100644 index 0000000000..7397caf470 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs; + +public class TestReadStripedFileWithDecoding { + + private MiniDFSCluster cluster; + private FileSystem fs; + + @Before + public void setup() throws IOException { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.getFileSystem().getClient().createErasureCodingZone("/", + null, cellSize); + fs = cluster.getFileSystem(); + } + + @After + public void tearDown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testWritePreadWithDNFailure1() throws IOException { + testWritePreadWithDNFailure("/foo", 0); + } + + @Test + public void testWritePreadWithDNFailure2() throws IOException { + testWritePreadWithDNFailure("/foo", cellSize * 5); + } + + private void testWritePreadWithDNFailure(String file, int startOffsetInFile) + throws IOException { + final int failedDNIdx = 2; + final int length = cellSize * (dataBlocks + 2); + Path testPath = new Path(file); + final byte[] bytes = StripedFileTestUtil.generateBytes(length); + DFSTestUtil.writeFile(fs, testPath, bytes); + + // shut down the DN that holds the last internal data block + BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5, + cellSize); + String name = (locs[0].getNames())[failedDNIdx]; + for (DataNode dn : cluster.getDataNodes()) { + int port = dn.getXferPort(); + if (name.contains(Integer.toString(port))) { + dn.shutdown(); + break; + } + } + + // pread + try (FSDataInputStream fsdis = fs.open(testPath)) { + byte[] buf = new byte[length]; + int readLen = fsdis.read(startOffsetInFile, buf, 0, buf.length); + Assert.assertEquals("The length of file should be the same to write size", + length - startOffsetInFile, readLen); + + byte[] expected = new byte[readLen]; + for (int i = startOffsetInFile; i < length; i++) { + expected[i - startOffsetInFile] = StripedFileTestUtil.getByte(i); + } + + for (int i = startOffsetInFile; i < length; i++) { + Assert.assertEquals("Byte at " + i + " should be the same", + expected[i - startOffsetInFile], buf[i - startOffsetInFile]); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java index b0436a68c5..e2e52467a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java @@ -18,17 +18,13 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.web.ByteRangeInputStream; import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; -import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -37,34 +33,30 @@ import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Random; + +import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.stripesPerBlock; public class TestWriteReadStripedFile { - private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; - private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; - - private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; - private final static int stripesPerBlock = 4; - static int blockSize = cellSize * stripesPerBlock; - static int numDNs = dataBlocks + parityBlocks + 2; - private static MiniDFSCluster cluster; - private static Configuration conf; private static FileSystem fs; - - private static Random r= new Random(); + private static Configuration conf; @BeforeClass public static void setup() throws IOException { - conf = new Configuration(); + conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); - cluster.getFileSystem().getClient().createErasureCodingZone("/", null, cellSize); + cluster.getFileSystem().getClient().createErasureCodingZone("/", + null, cellSize); fs = cluster.getFileSystem(); } @AfterClass - public static void tearDown() { + public static void tearDown() throws IOException { if (cluster != null) { cluster.shutdown(); } @@ -152,47 +144,21 @@ public void testFileMoreThanABlockGroup3() throws IOException { + cellSize + 123); } - private byte[] generateBytes(int cnt) { - byte[] bytes = new byte[cnt]; - for (int i = 0; i < cnt; i++) { - bytes[i] = getByte(i); - } - return bytes; - } - - private int readAll(FSDataInputStream in, byte[] buf) throws IOException { - int readLen = 0; - int ret; - do { - ret = in.read(buf, readLen, buf.length - readLen); - if (ret > 0) { - readLen += ret; - } - } while (ret >= 0 && readLen < buf.length); - return readLen; - } - - private byte getByte(long pos) { - final int mod = 29; - return (byte) (pos % mod + 1); - } - private void assertSeekAndRead(FSDataInputStream fsdis, int pos, int writeBytes) throws IOException { fsdis.seek(pos); byte[] buf = new byte[writeBytes]; - int readLen = readAll(fsdis, buf); + int readLen = StripedFileTestUtil.readAll(fsdis, buf); Assert.assertEquals(readLen, writeBytes - pos); for (int i = 0; i < readLen; i++) { Assert.assertEquals("Byte at " + i + " should be the same", - getByte(pos + i), buf[i]); + StripedFileTestUtil.getByte(pos + i), buf[i]); } } private void testOneFileUsingDFSStripedInputStream(String src, int fileLength) throws IOException { - - final byte[] expected = generateBytes(fileLength); + final byte[] expected = StripedFileTestUtil.generateBytes(fileLength); Path srcPath = new Path(src); DFSTestUtil.writeFile(fs, srcPath, new String(expected)); @@ -215,7 +181,7 @@ private void testOneFileUsingDFSStripedInputStream(String src, int fileLength) public void testWriteReadUsingWebHdfs() throws Exception { int fileLength = blockSize * dataBlocks + cellSize + 123; - final byte[] expected = generateBytes(fileLength); + final byte[] expected = StripedFileTestUtil.generateBytes(fileLength); FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME); Path srcPath = new Path("/testWriteReadUsingWebHdfs_stripe"); @@ -231,7 +197,6 @@ public void testWriteReadUsingWebHdfs() throws Exception { verifySeek(fs, srcPath, fileLength); verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf); //webhdfs doesn't support bytebuffer read - } void verifyLength(FileSystem fs, Path srcPath, int fileLength) @@ -243,152 +208,105 @@ void verifyLength(FileSystem fs, Path srcPath, int fileLength) void verifyPread(FileSystem fs, Path srcPath, int fileLength, byte[] expected, byte[] buf) throws IOException { - FSDataInputStream in = fs.open(srcPath); - int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102, - cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102, - cellSize * dataBlocks, fileLength - 102, fileLength - 1}; - for (int startOffset : startOffsets) { - startOffset = Math.max(0, Math.min(startOffset, fileLength - 1)); - int remaining = fileLength - startOffset; - in.readFully(startOffset, buf, 0, remaining); - for (int i = 0; i < remaining; i++) { - Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " + - "same", - expected[startOffset + i], buf[i]); + try (FSDataInputStream in = fs.open(srcPath)) { + int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102, + cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102, + cellSize * dataBlocks, fileLength - 102, fileLength - 1}; + for (int startOffset : startOffsets) { + startOffset = Math.max(0, Math.min(startOffset, fileLength - 1)); + int remaining = fileLength - startOffset; + in.readFully(startOffset, buf, 0, remaining); + for (int i = 0; i < remaining; i++) { + Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " + + "same", expected[startOffset + i], buf[i]); + } } } - in.close(); } void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength, byte[] expected, byte[] buf) throws IOException { - FSDataInputStream in = fs.open(srcPath); - final byte[] result = new byte[fileLength]; - int readLen = 0; - int ret; - do { - ret = in.read(buf, 0, buf.length); - if (ret > 0) { + try (FSDataInputStream in = fs.open(srcPath)) { + final byte[] result = new byte[fileLength]; + int readLen = 0; + int ret; + while ((ret = in.read(buf, 0, buf.length)) >= 0) { System.arraycopy(buf, 0, result, readLen, ret); readLen += ret; } - } while (ret >= 0); - Assert.assertEquals("The length of file should be the same to write size", - fileLength, readLen); - Assert.assertArrayEquals(expected, result); - in.close(); + Assert.assertEquals("The length of file should be the same to write size", + fileLength, readLen); + Assert.assertArrayEquals(expected, result); + } } void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength, byte[] expected, ByteBuffer buf) throws IOException { - FSDataInputStream in = fs.open(srcPath); - ByteBuffer result = ByteBuffer.allocate(fileLength); - int readLen = 0; - int ret; - do { - ret = in.read(buf); - if (ret > 0) { + try (FSDataInputStream in = fs.open(srcPath)) { + ByteBuffer result = ByteBuffer.allocate(fileLength); + int readLen = 0; + int ret; + while ((ret = in.read(buf)) >= 0) { readLen += ret; buf.flip(); result.put(buf); buf.clear(); } - } while (ret >= 0); - readLen = readLen >= 0 ? readLen : 0; - Assert.assertEquals("The length of file should be the same to write size", - fileLength, readLen); - Assert.assertArrayEquals(expected, result.array()); - in.close(); + Assert.assertEquals("The length of file should be the same to write size", + fileLength, readLen); + Assert.assertArrayEquals(expected, result.array()); + } } void verifySeek(FileSystem fs, Path srcPath, int fileLength) throws IOException { - FSDataInputStream in = fs.open(srcPath); - // seek to 1/2 of content - int pos = fileLength / 2; - assertSeekAndRead(in, pos, fileLength); - - // seek to 1/3 of content - pos = fileLength / 3; - assertSeekAndRead(in, pos, fileLength); - - // seek to 0 pos - pos = 0; - assertSeekAndRead(in, pos, fileLength); - - if (fileLength > cellSize) { - // seek to cellSize boundary - pos = cellSize - 1; + try (FSDataInputStream in = fs.open(srcPath)) { + // seek to 1/2 of content + int pos = fileLength / 2; assertSeekAndRead(in, pos, fileLength); - } - if (fileLength > cellSize * dataBlocks) { - // seek to striped cell group boundary - pos = cellSize * dataBlocks - 1; + // seek to 1/3 of content + pos = fileLength / 3; assertSeekAndRead(in, pos, fileLength); - } - if (fileLength > blockSize * dataBlocks) { - // seek to striped block group boundary - pos = blockSize * dataBlocks - 1; + // seek to 0 pos + pos = 0; assertSeekAndRead(in, pos, fileLength); - } - if(!(in.getWrappedStream() instanceof ByteRangeInputStream)){ - try { - in.seek(-1); - Assert.fail("Should be failed if seek to negative offset"); - } catch (EOFException e) { - // expected + if (fileLength > cellSize) { + // seek to cellSize boundary + pos = cellSize - 1; + assertSeekAndRead(in, pos, fileLength); } - try { - in.seek(fileLength + 1); - Assert.fail("Should be failed if seek after EOF"); - } catch (EOFException e) { - // expected + if (fileLength > cellSize * dataBlocks) { + // seek to striped cell group boundary + pos = cellSize * dataBlocks - 1; + assertSeekAndRead(in, pos, fileLength); } - } - in.close(); - } - @Test - public void testWritePreadWithDNFailure() throws IOException { - final int failedDNIdx = 2; - final int length = cellSize * (dataBlocks + 2); - Path testPath = new Path("/foo"); - final byte[] bytes = generateBytes(length); - DFSTestUtil.writeFile(fs, testPath, new String(bytes)); - - // shut down the DN that holds the last internal data block - BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5, - cellSize); - String name = (locs[0].getNames())[failedDNIdx]; - for (DataNode dn : cluster.getDataNodes()) { - int port = dn.getXferPort(); - if (name.contains(Integer.toString(port))) { - dn.shutdown(); - break; + if (fileLength > blockSize * dataBlocks) { + // seek to striped block group boundary + pos = blockSize * dataBlocks - 1; + assertSeekAndRead(in, pos, fileLength); } - } - // pread - int startOffsetInFile = cellSize * 5; - try (FSDataInputStream fsdis = fs.open(testPath)) { - byte[] buf = new byte[length]; - int readLen = fsdis.read(startOffsetInFile, buf, 0, buf.length); - Assert.assertEquals("The length of file should be the same to write size", - length - startOffsetInFile, readLen); + if (!(in.getWrappedStream() instanceof ByteRangeInputStream)) { + try { + in.seek(-1); + Assert.fail("Should be failed if seek to negative offset"); + } catch (EOFException e) { + // expected + } - byte[] expected = new byte[readLen]; - for (int i = startOffsetInFile; i < length; i++) { - expected[i - startOffsetInFile] = getByte(i); - } - for (int i = startOffsetInFile; i < length; i++) { - Assert.assertEquals("Byte at " + i + " should be the same", - expected[i - startOffsetInFile], buf[i - startOffsetInFile]); + try { + in.seek(fileLength + 1); + Assert.fail("Should be failed if seek after EOF"); + } catch (EOFException e) { + // expected + } } } }