diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 1549930518..3170e9bc0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -223,3 +223,5 @@ HDFS-8418. Fix the isNeededReplication calculation for Striped block in NN. (Yi Liu via jing9) + + HDFS-8320. Erasure coding: consolidate striping-related terminologies. (zhz) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 8f15edad75..744d5863af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -23,19 +23,18 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.ByteBufferPool; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.divideByteRangeIntoStripes; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.decodeAndFillBuffer; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getNextCompletedStripedRead; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getStartOffsetsForInternalBlocks; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.parseStripedBlockGroup; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; @@ -65,30 +64,9 @@ import java.util.concurrent.Callable; import java.util.concurrent.Future; -/****************************************************************************** - * DFSStripedInputStream reads from striped block groups, illustrated below: - * - * | <- Striped Block Group -> | - * blk_0 blk_1 blk_2 <- A striped block group has - * | | | {@link #dataBlkNum} blocks - * v v v - * +------+ +------+ +------+ - * |cell_0| |cell_1| |cell_2| <- The logical read order should be - * +------+ +------+ +------+ cell_0, cell_1, ... - * |cell_3| |cell_4| |cell_5| - * +------+ +------+ +------+ - * |cell_6| |cell_7| |cell_8| - * +------+ +------+ +------+ - * |cell_9| - * +------+ <- A cell contains {@link #cellSize} bytes of data - * - * Three styles of read will eventually be supported: - * 1. Stateful read - * 2. pread without decode support - * This is implemented by calculating the portion of read from each block and - * issuing requests to each DataNode in parallel. - * 3. pread with decode support: TODO: will be supported after HDFS-7678 - *****************************************************************************/ +/** + * DFSStripedInputStream reads from striped block groups + */ public class DFSStripedInputStream extends DFSInputStream { private static class ReaderRetryPolicy { @@ -207,22 +185,24 @@ private synchronized void blockSeekTo(long target) throws IOException { currentLocatedBlock = targetBlockGroup; final long offsetIntoBlockGroup = getOffsetInBlockGroup(); - LocatedBlock[] targetBlocks = StripedBlockUtil.parseStripedBlockGroup( + LocatedBlock[] targetBlocks = parseStripedBlockGroup( targetBlockGroup, cellSize, dataBlkNum, parityBlkNum); - // The purpose is to get start offset into each block - ReadPortion[] readPortions = planReadPortions(groupSize, cellSize, - offsetIntoBlockGroup, 0, 0); + // The purpose is to get start offset into each block. + long[] offsetsForInternalBlocks = getStartOffsetsForInternalBlocks(schema, + targetBlockGroup, offsetIntoBlockGroup); + Preconditions.checkNotNull(offsetsForInternalBlocks); final ReaderRetryPolicy retry = new ReaderRetryPolicy(); for (int i = 0; i < groupSize; i++) { LocatedBlock targetBlock = targetBlocks[i]; if (targetBlock != null) { + long offsetInBlock = offsetsForInternalBlocks[i] < 0 ? + 0 : offsetsForInternalBlocks[i]; DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null); if (retval != null) { currentNodes[i] = retval.info; blockReaders[i] = getBlockReaderWithRetry(targetBlock, - readPortions[i].getStartOffsetInBlock(), - targetBlock.getBlockSize() - readPortions[i].getStartOffsetInBlock(), + offsetInBlock, targetBlock.getBlockSize() - offsetInBlock, retval.addr, retval.storageType, retval.info, target, retry); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 7b3c24d35a..a1c0f722ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -301,12 +301,12 @@ private class ReconstructAndTransferBlock implements Runnable { } private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) { - return StripedBlockUtil.constructStripedBlock(blockGroup, cellSize, + return StripedBlockUtil.constructInternalBlock(blockGroup, cellSize, dataBlkNum, i); } private long getBlockLen(ExtendedBlock blockGroup, int i) { - return StripedBlockUtil.getStripedBlockLength(blockGroup.getNumBytes(), + return StripedBlockUtil.getInternalBlockLength(blockGroup.getNumBytes(), cellSize, dataBlkNum, i); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 81c0c95d5e..2fa3fdf6ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -41,7 +41,28 @@ import java.util.concurrent.TimeUnit; /** - * Utility class for analyzing striped block groups + * When accessing a file in striped layout, operations on logical byte ranges + * in the file need to be mapped to physical byte ranges on block files stored + * on DataNodes. This utility class facilities this mapping by defining and + * exposing a number of striping-related concepts. The most basic ones are + * illustrated in the following diagram. Unless otherwise specified, all + * range-related calculations are inclusive (the end offset of the previous + * range should be 1 byte lower than the start offset of the next one). + * + * | <---- Block Group ----> | <- Block Group: logical unit composing + * | | striped HDFS files. + * blk_0 blk_1 blk_2 <- Internal Blocks: each internal block + * | | | represents a physically stored local + * v v v block file + * +------+ +------+ +------+ + * |cell_0| |cell_1| |cell_2| <- {@link StripingCell} represents the + * +------+ +------+ +------+ logical order that a Block Group should + * |cell_3| |cell_4| |cell_5| be accessed: cell_0, cell_1, ... + * +------+ +------+ +------+ + * |cell_6| |cell_7| |cell_8| + * +------+ +------+ +------+ + * |cell_9| + * +------+ <- A cell contains cellSize bytes of data */ @InterfaceAudience.Private public class StripedBlockUtil { @@ -103,31 +124,6 @@ public static ExtendedBlock constructInternalBlock(ExtendedBlock blockGroup, cellSize, dataBlkNum, idxInBlockGroup)); return block; } - - /** - * This method creates an internal {@link ExtendedBlock} at the given index - * of a block group, for both data and parity block. - */ - public static ExtendedBlock constructStripedBlock(ExtendedBlock blockGroup, - int cellSize, int dataBlkNum, int idxInBlockGroup) { - ExtendedBlock block = new ExtendedBlock(blockGroup); - block.setBlockId(blockGroup.getBlockId() + idxInBlockGroup); - block.setNumBytes(getStripedBlockLength(blockGroup.getNumBytes(), cellSize, - dataBlkNum, idxInBlockGroup)); - return block; - } - - /** - * Returns an internal block length at the given index of a block group, - * for both data and parity block. - */ - public static long getStripedBlockLength(long numBytes, int cellSize, - int dataBlkNum, int idxInBlockGroup) { - // parity block length is the same as the first striped block length. - return StripedBlockUtil.getInternalBlockLength( - numBytes, cellSize, dataBlkNum, - idxInBlockGroup < dataBlkNum ? idxInBlockGroup : 0); - } /** * Get the size of an internal block at the given index of a block group @@ -157,7 +153,7 @@ public static long getInternalBlockLength(long dataSize, return (numStripes - 1L)*cellSize + lastCellSize(lastStripeDataLen, cellSize, numDataBlocks, i); } - + private static int lastCellSize(int size, int cellSize, int numDataBlocks, int i) { if (i < numDataBlocks) { @@ -183,60 +179,6 @@ public static long offsetInBlkToOffsetInBG(int cellSize, int dataBlkNum, + offsetInBlk % cellSize; // partial cell } - /** - * This method plans the read portion from each block in the stripe - * @param dataBlkNum The number of data blocks in the striping group - * @param cellSize The size of each striping cell - * @param startInBlk Starting offset in the striped block - * @param len Length of the read request - * @param bufOffset Initial offset in the result buffer - * @return array of {@link ReadPortion}, each representing the portion of I/O - * for an individual block in the group - */ - @VisibleForTesting - public static ReadPortion[] planReadPortions(final int dataBlkNum, - final int cellSize, final long startInBlk, final int len, int bufOffset) { - ReadPortion[] results = new ReadPortion[dataBlkNum]; - for (int i = 0; i < dataBlkNum; i++) { - results[i] = new ReadPortion(); - } - - // cellIdxInBlk is the index of the cell in the block - // E.g., cell_3 is the 2nd cell in blk_0 - int cellIdxInBlk = (int) (startInBlk / (cellSize * dataBlkNum)); - - // blkIdxInGroup is the index of the block in the striped block group - // E.g., blk_2 is the 3rd block in the group - final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum); - results[blkIdxInGroup].setStartOffsetInBlock(cellSize * cellIdxInBlk + - startInBlk % cellSize); - boolean crossStripe = false; - for (int i = 1; i < dataBlkNum; i++) { - if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) { - cellIdxInBlk++; - crossStripe = true; - } - results[(blkIdxInGroup + i) % dataBlkNum].setStartOffsetInBlock( - cellSize * cellIdxInBlk); - } - - int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len); - results[blkIdxInGroup].offsetsInBuf.add(bufOffset); - results[blkIdxInGroup].lengths.add(firstCellLen); - results[blkIdxInGroup].addReadLength(firstCellLen); - - int i = (blkIdxInGroup + 1) % dataBlkNum; - for (int done = firstCellLen; done < len; done += cellSize) { - ReadPortion rp = results[i]; - rp.offsetsInBuf.add(done + bufOffset); - final int readLen = Math.min(len - done, cellSize); - rp.lengths.add(readLen); - rp.addReadLength(readLen); - i = (i + 1) % dataBlkNum; - } - return results; - } - /** * Get the next completed striped read task * @@ -360,84 +302,167 @@ public static void decodeAndFillBuffer(final byte[][] decodeInputs, byte[] buf, } /** - * This method divides a requested byte range into an array of - * {@link AlignedStripe} + * This method divides a requested byte range into an array of inclusive + * {@link AlignedStripe}. + * @param ecSchema The codec schema for the file, which carries the numbers + * of data / parity blocks, as well as cell size + * @param blockGroup The striped block group + * @param rangeStartInBlockGroup The byte range's start offset in block group + * @param rangeEndInBlockGroup The byte range's end offset in block group + * @param buf Destination buffer of the read operation for the byte range + * @param offsetInBuf Start offset into the destination buffer * - * - * At most 5 stripes will be generated from each logical range - * TODO: cleanup and get rid of planReadPortions + * At most 5 stripes will be generated from each logical range, as + * demonstrated in the header of {@link AlignedStripe}. */ public static AlignedStripe[] divideByteRangeIntoStripes ( - ECSchema ecSchema, LocatedStripedBlock blockGroup, long start, long end, - byte[] buf, int offsetInBuf) { + ECSchema ecSchema, LocatedStripedBlock blockGroup, + long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf, + int offsetInBuf) { // TODO: change ECSchema naming to use cell size instead of chunk size // Step 0: analyze range and calculate basic parameters int cellSize = ecSchema.getChunkSize(); int dataBlkNum = ecSchema.getNumDataUnits(); - int len = (int) (end - start + 1); - int firstCellIdxInBG = (int) (start / cellSize); - int lastCellIdxInBG = (int) (end / cellSize); - int firstCellSize = Math.min(cellSize - (int) (start % cellSize), len); - long firstCellOffsetInBlk = firstCellIdxInBG / dataBlkNum * cellSize + - start % cellSize; - int lastCellSize = lastCellIdxInBG == firstCellIdxInBG ? - firstCellSize : (int) (end % cellSize) + 1; - // Step 1: get the unmerged ranges on each internal block - // TODO: StripingCell should carry info on size and start offset (HDFS-8320) - VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema, - firstCellIdxInBG, lastCellIdxInBG, firstCellSize, firstCellOffsetInBlk, - lastCellSize); + // Step 1: map the byte range to StripingCells + StripingCell[] cells = getStripingCellsOfByteRange(ecSchema, blockGroup, + rangeStartInBlockGroup, rangeEndInBlockGroup); - // Step 2: merge into at most 5 stripes + // Step 2: get the unmerged ranges on each internal block + VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema, cells); + + // Step 3: merge into at most 5 stripes AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges); - // Step 3: calculate each chunk's position in destination buffer - calcualteChunkPositionsInBuf(ecSchema, blockGroup, buf, offsetInBuf, - firstCellIdxInBG, lastCellIdxInBG, firstCellSize, firstCellOffsetInBlk, - lastCellSize, stripes); + // Step 4: calculate each chunk's position in destination buffer + calcualteChunkPositionsInBuf(ecSchema, stripes, cells, buf, offsetInBuf); - // Step 4: prepare ALLZERO blocks + // Step 5: prepare ALLZERO blocks prepareAllZeroChunks(blockGroup, buf, stripes, cellSize, dataBlkNum); return stripes; } - private static VerticalRange[] getRangesForInternalBlocks (ECSchema ecSchema, - int firstCellIdxInBG, int lastCellIdxInBG, int firstCellSize, - long firstCellOffsetInBlk, int lastCellSize) { + /** + * Map the logical byte range to a set of inclusive {@link StripingCell} + * instances, each representing the overlap of the byte range to a cell + * used by {@link DFSStripedOutputStream} in encoding + */ + @VisibleForTesting + private static StripingCell[] getStripingCellsOfByteRange(ECSchema ecSchema, + LocatedStripedBlock blockGroup, + long rangeStartInBlockGroup, long rangeEndInBlockGroup) { + Preconditions.checkArgument( + rangeStartInBlockGroup <= rangeEndInBlockGroup && + rangeEndInBlockGroup < blockGroup.getBlockSize()); int cellSize = ecSchema.getChunkSize(); + int len = (int) (rangeEndInBlockGroup - rangeStartInBlockGroup + 1); + int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize); + int lastCellIdxInBG = (int) (rangeEndInBlockGroup / cellSize); + int numCells = lastCellIdxInBG - firstCellIdxInBG + 1; + StripingCell[] cells = new StripingCell[numCells]; + cells[0] = new StripingCell(ecSchema, firstCellIdxInBG); + cells[numCells - 1] = new StripingCell(ecSchema, lastCellIdxInBG); + + cells[0].offset = (int) (rangeStartInBlockGroup % cellSize); + cells[0].size = + Math.min(cellSize - (int) (rangeStartInBlockGroup % cellSize), len); + if (lastCellIdxInBG != firstCellIdxInBG) { + cells[numCells - 1].size = (int) (rangeEndInBlockGroup % cellSize) + 1; + } + + for (int i = 1; i < numCells - 1; i++) { + cells[i] = new StripingCell(ecSchema, i + firstCellIdxInBG); + } + + return cells; + } + + /** + * Given a logical start offset in a block group, calculate the physical + * start offset into each stored internal block. + */ + public static long[] getStartOffsetsForInternalBlocks( + ECSchema ecSchema, LocatedStripedBlock blockGroup, + long rangeStartInBlockGroup) { + Preconditions.checkArgument( + rangeStartInBlockGroup < blockGroup.getBlockSize()); int dataBlkNum = ecSchema.getNumDataUnits(); - + int parityBlkNum = ecSchema.getNumParityUnits(); + int cellSize = ecSchema.getChunkSize(); + long[] startOffsets = new long[dataBlkNum + parityBlkNum]; + Arrays.fill(startOffsets, -1L); + int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize); StripingCell firstCell = new StripingCell(ecSchema, firstCellIdxInBG); - StripingCell lastCell = new StripingCell(ecSchema, lastCellIdxInBG); - - VerticalRange ranges[] = new VerticalRange[dataBlkNum]; - ranges[firstCell.idxInStripe] = - new VerticalRange(firstCellOffsetInBlk, firstCellSize); - for (int i = firstCellIdxInBG + 1; i < lastCellIdxInBG; i++) { - // iterate through all cells and update the list of StripeRanges - StripingCell cell = new StripingCell(ecSchema, i); - if (ranges[cell.idxInStripe] == null) { - ranges[cell.idxInStripe] = new VerticalRange( - cell.idxInInternalBlk * cellSize, cellSize); - } else { - ranges[cell.idxInStripe].spanInBlock += cellSize; + firstCell.offset = (int) (rangeStartInBlockGroup % cellSize); + startOffsets[firstCell.idxInStripe] = + firstCell.idxInInternalBlk * cellSize + firstCell.offset; + long earliestStart = startOffsets[firstCell.idxInStripe]; + for (int i = 1; i < dataBlkNum; i++) { + int idx = firstCellIdxInBG + i; + if (idx * cellSize >= blockGroup.getBlockSize()) { + break; + } + StripingCell cell = new StripingCell(ecSchema, idx); + startOffsets[cell.idxInStripe] = cell.idxInInternalBlk * cellSize; + if (startOffsets[cell.idxInStripe] < earliestStart) { + earliestStart = startOffsets[cell.idxInStripe]; } } - if (ranges[lastCell.idxInStripe] == null) { - ranges[lastCell.idxInStripe] = new VerticalRange( - lastCell.idxInInternalBlk * cellSize, lastCellSize); - } else if (lastCell.idxInBlkGroup != firstCell.idxInBlkGroup) { - ranges[lastCell.idxInStripe].spanInBlock += lastCellSize; + for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) { + startOffsets[i] = earliestStart; + } + return startOffsets; + } + + /** + * Given a logical byte range, mapped to each {@link StripingCell}, calculate + * the physical byte range (inclusive) on each stored internal block. + */ + @VisibleForTesting + private static VerticalRange[] getRangesForInternalBlocks(ECSchema ecSchema, + StripingCell[] cells) { + int cellSize = ecSchema.getChunkSize(); + int dataBlkNum = ecSchema.getNumDataUnits(); + int parityBlkNum = ecSchema.getNumParityUnits(); + + VerticalRange ranges[] = new VerticalRange[dataBlkNum + parityBlkNum]; + + long earliestStart = Long.MAX_VALUE; + long latestEnd = -1; + for (StripingCell cell : cells) { + // iterate through all cells and update the list of StripeRanges + if (ranges[cell.idxInStripe] == null) { + ranges[cell.idxInStripe] = new VerticalRange( + cell.idxInInternalBlk * cellSize + cell.offset, cell.size); + } else { + ranges[cell.idxInStripe].spanInBlock += cell.size; + } + VerticalRange range = ranges[cell.idxInStripe]; + if (range.offsetInBlock < earliestStart) { + earliestStart = range.offsetInBlock; + } + if (range.offsetInBlock + range.spanInBlock - 1 > latestEnd) { + latestEnd = range.offsetInBlock + range.spanInBlock - 1; + } + } + + // Each parity block should be fetched at maximum range of all data blocks + for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) { + ranges[i] = new VerticalRange(earliestStart, + latestEnd - earliestStart + 1); } return ranges; } - private static AlignedStripe[] mergeRangesForInternalBlocks(ECSchema ecSchema, - VerticalRange[] ranges) { + /** + * Merge byte ranges on each internal block into a set of inclusive + * {@link AlignedStripe} instances. + */ + private static AlignedStripe[] mergeRangesForInternalBlocks( + ECSchema ecSchema, VerticalRange[] ranges) { int dataBlkNum = ecSchema.getNumDataUnits(); int parityBlkNum = ecSchema.getNumParityUnits(); List stripes = new ArrayList<>(); @@ -461,12 +486,8 @@ private static AlignedStripe[] mergeRangesForInternalBlocks(ECSchema ecSchema, } private static void calcualteChunkPositionsInBuf(ECSchema ecSchema, - LocatedStripedBlock blockGroup, byte[] buf, int offsetInBuf, - int firstCellIdxInBG, int lastCellIdxInBG, int firstCellSize, - long firstCellOffsetInBlk, int lastCellSize, AlignedStripe[] stripes) { - int cellSize = ecSchema.getChunkSize(); - int dataBlkNum = ecSchema.getNumDataUnits(); - // Step 3: calculate each chunk's position in destination buffer + AlignedStripe[] stripes, StripingCell[] cells, byte[] buf, + int offsetInBuf) { /** * | <--------------- AlignedStripe --------------->| * @@ -484,20 +505,11 @@ private static void calcualteChunkPositionsInBuf(ECSchema ecSchema, * * Cell indexing convention defined in {@link StripingCell} */ + int cellSize = ecSchema.getChunkSize(); int done = 0; - for (int i = firstCellIdxInBG; i <= lastCellIdxInBG; i++) { - StripingCell cell = new StripingCell(ecSchema, i); - long cellStart = i == firstCellIdxInBG ? - firstCellOffsetInBlk : cell.idxInInternalBlk * cellSize; - int cellLen; - if (i == firstCellIdxInBG) { - cellLen = firstCellSize; - } else if (i == lastCellIdxInBG) { - cellLen = lastCellSize; - } else { - cellLen = cellSize; - } - long cellEnd = cellStart + cellLen - 1; + for (StripingCell cell : cells) { + long cellStart = cell.idxInInternalBlk * cellSize + cell.offset; + long cellEnd = cellStart + cell.size - 1; for (AlignedStripe s : stripes) { long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1; long overlapStart = Math.max(cellStart, s.getOffsetInBlock()); @@ -514,10 +526,14 @@ private static void calcualteChunkPositionsInBuf(ECSchema ecSchema, add((int)(offsetInBuf + done + overlapStart - cellStart)); s.chunks[cell.idxInStripe].lengthsInBuf.add(overLapLen); } - done += cellLen; + done += cell.size; } } + /** + * If a {@link StripingChunk} maps to a byte range beyond an internal block's + * size, the chunk should be treated as zero bytes in decoding. + */ private static void prepareAllZeroChunks(LocatedStripedBlock blockGroup, byte[] buf, AlignedStripe[] stripes, int cellSize, int dataBlkNum) { for (AlignedStripe s : stripes) { @@ -534,51 +550,13 @@ private static void prepareAllZeroChunks(LocatedStripedBlock blockGroup, } /** - * This class represents the portion of I/O associated with each block in the - * striped block group. - * TODO: consolidate ReadPortion with AlignedStripe - */ - public static class ReadPortion { - private long startOffsetInBlock = 0; - private int readLength = 0; - public final List offsetsInBuf = new ArrayList<>(); - public final List lengths = new ArrayList<>(); - - public int[] getOffsets() { - int[] offsets = new int[offsetsInBuf.size()]; - for (int i = 0; i < offsets.length; i++) { - offsets[i] = offsetsInBuf.get(i); - } - return offsets; - } - - public int[] getLengths() { - int[] lens = new int[this.lengths.size()]; - for (int i = 0; i < lens.length; i++) { - lens[i] = this.lengths.get(i); - } - return lens; - } - - public long getStartOffsetInBlock() { - return startOffsetInBlock; - } - - public int getReadLength() { - return readLength; - } - - public void setStartOffsetInBlock(long startOffsetInBlock) { - this.startOffsetInBlock = startOffsetInBlock; - } - - void addReadLength(int extraLength) { - this.readLength += extraLength; - } - } - - /** - * The unit of encoding used in {@link DFSStripedOutputStream} + * Cell is the unit of encoding used in {@link DFSStripedOutputStream}. This + * size impacts how a logical offset in the file or block group translates + * to physical byte offset in a stored internal block. The StripingCell util + * class facilitates this calculation. Each StripingCell is inclusive with + * its start and end offsets -- e.g., the end logical offset of cell_0_0_0 + * should be 1 byte lower than the start logical offset of cell_1_0_1. + * * | <------- Striped Block Group -------> | * blk_0 blk_1 blk_2 * | | | @@ -586,43 +564,57 @@ void addReadLength(int extraLength) { * +----------+ +----------+ +----------+ * |cell_0_0_0| |cell_1_0_1| |cell_2_0_2| * +----------+ +----------+ +----------+ - * |cell_3_1_0| |cell_4_1_1| |cell_5_1_2| <- {@link idxInBlkGroup} = 5 - * +----------+ +----------+ +----------+ {@link idxInInternalBlk} = 1 - * {@link idxInStripe} = 2 + * |cell_3_1_0| |cell_4_1_1| |cell_5_1_2| <- {@link #idxInBlkGroup} = 5 + * +----------+ +----------+ +----------+ {@link #idxInInternalBlk} = 1 + * {@link #idxInStripe} = 2 * A StripingCell is a special instance of {@link StripingChunk} whose offset * and size align with the cell used when writing data. * TODO: consider parity cells */ - public static class StripingCell { + @VisibleForTesting + static class StripingCell { public final ECSchema schema; /** Logical order in a block group, used when doing I/O to a block group */ - public final int idxInBlkGroup; - public final int idxInInternalBlk; - public final int idxInStripe; + final int idxInBlkGroup; + final int idxInInternalBlk; + final int idxInStripe; + /** + * When a logical byte range is mapped to a set of cells, it might + * partially overlap with the first and last cells. This field and the + * {@link #size} variable represent the start offset and size of the + * overlap. + */ + int offset; + int size; - public StripingCell(ECSchema ecSchema, int idxInBlkGroup) { + StripingCell(ECSchema ecSchema, int idxInBlkGroup) { this.schema = ecSchema; this.idxInBlkGroup = idxInBlkGroup; this.idxInInternalBlk = idxInBlkGroup / ecSchema.getNumDataUnits(); this.idxInStripe = idxInBlkGroup - this.idxInInternalBlk * ecSchema.getNumDataUnits(); + this.offset = 0; + this.size = ecSchema.getChunkSize(); } - public StripingCell(ECSchema ecSchema, int idxInInternalBlk, + StripingCell(ECSchema ecSchema, int idxInInternalBlk, int idxInStripe) { this.schema = ecSchema; this.idxInInternalBlk = idxInInternalBlk; this.idxInStripe = idxInStripe; this.idxInBlkGroup = idxInInternalBlk * ecSchema.getNumDataUnits() + idxInStripe; + this.offset = 0; + this.size = ecSchema.getChunkSize(); } } /** * Given a requested byte range on a striped block group, an AlignedStripe - * represents a {@link VerticalRange} that is aligned with both the byte range - * and boundaries of all internal blocks. As illustrated in the diagram, any - * given byte range on a block group leads to 1~5 AlignedStripe's. + * represents an inclusive {@link VerticalRange} that is aligned with both + * the byte range and boundaries of all internal blocks. As illustrated in + * the diagram, any given byte range on a block group leads to 1~5 + * AlignedStripe's. * * |<-------- Striped Block Group -------->| * blk_0 blk_1 blk_2 blk_3 blk_4 @@ -648,6 +640,7 @@ public StripingCell(ECSchema ecSchema, int idxInInternalBlk, * * The coverage of an AlignedStripe on an internal block is represented as a * {@link StripingChunk}. + * * To simplify the logic of reading a logical byte range from a block group, * a StripingChunk is either completely in the requested byte range or * completely outside the requested byte range. @@ -692,19 +685,19 @@ public String toString() { /** * A simple utility class representing an arbitrary vertical inclusive range - * starting at {@link offsetInBlock} and lasting for {@link length} bytes in - * an internal block. Note that VerticalRange doesn't necessarily align with - * {@link StripingCell}. + * starting at {@link #offsetInBlock} and lasting for {@link #spanInBlock} + * bytes in an internal block. Note that VerticalRange doesn't necessarily + * align with {@link StripingCell}. * * |<- Striped Block Group ->| * blk_0 * | * v * +-----+ - * |~~~~~| <-- {@link offsetInBlock} + * |~~~~~| <-- {@link #offsetInBlock} * | | ^ * | | | - * | | | {@link spanInBlock} + * | | | {@link #spanInBlock} * | | v * |~~~~~| --- * | | @@ -743,9 +736,9 @@ public boolean include(long pos) { * +---------+ +---------+ | +----+ +----+ * <----------- data blocks ------------> | <--- parity ---> * - * The class also carries {@link buf}, {@link offsetsInBuf}, and - * {@link lengthsInBuf} to define how read task for this chunk should deliver - * the returned data. + * The class also carries {@link #buf}, {@link #offsetsInBuf}, and + * {@link #lengthsInBuf} to define how read task for this chunk should + * deliver the returned data. */ public static class StripingChunk { /** Chunk has been successfully fetched */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java deleted file mode 100644 index 75d05879f2..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import org.junit.Test; - -import org.apache.hadoop.hdfs.util.StripedBlockUtil; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion; -import static org.junit.Assert.*; - -public class TestPlanReadPortions { - - // We only support this as num of data blocks. It might be good enough for now - // for the purpose, even not flexible yet for any number in a schema. - private final short GROUP_SIZE = 3; - private final int CELLSIZE = 128 * 1024; - - private void testPlanReadPortions(int startInBlk, int length, - int bufferOffset, int[] readLengths, int[] offsetsInBlock, - int[][] bufferOffsets, int[][] bufferLengths) { - ReadPortion[] results = StripedBlockUtil.planReadPortions(GROUP_SIZE, - CELLSIZE, startInBlk, length, bufferOffset); - assertEquals(GROUP_SIZE, results.length); - - for (int i = 0; i < GROUP_SIZE; i++) { - assertEquals(readLengths[i], results[i].getReadLength()); - assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock()); - final int[] bOffsets = results[i].getOffsets(); - assertArrayEquals(bufferOffsets[i], bOffsets); - final int[] bLengths = results[i].getLengths(); - assertArrayEquals(bufferLengths[i], bLengths); - } - } - - /** - * Test {@link StripedBlockUtil#planReadPortions} - */ - @Test - public void testPlanReadPortions() { - /** - * start block offset is 0, read cellSize - 10 - */ - testPlanReadPortions(0, CELLSIZE - 10, 0, - new int[]{CELLSIZE - 10, 0, 0}, new int[]{0, 0, 0}, - new int[][]{new int[]{0}, new int[]{}, new int[]{}}, - new int[][]{new int[]{CELLSIZE - 10}, new int[]{}, new int[]{}}); - - /** - * start block offset is 0, read 3 * cellSize - */ - testPlanReadPortions(0, GROUP_SIZE * CELLSIZE, 0, - new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, new int[]{0, 0, 0}, - new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{CELLSIZE * 2}}, - new int[][]{new int[]{CELLSIZE}, new int[]{CELLSIZE}, new int[]{CELLSIZE}}); - - /** - * start block offset is 0, read cellSize + 10 - */ - testPlanReadPortions(0, CELLSIZE + 10, 0, - new int[]{CELLSIZE, 10, 0}, new int[]{0, 0, 0}, - new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{}}, - new int[][]{new int[]{CELLSIZE}, new int[]{10}, new int[]{}}); - - /** - * start block offset is 0, read 5 * cellSize + 10, buffer start offset is 100 - */ - testPlanReadPortions(0, 5 * CELLSIZE + 10, 100, - new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE + 10}, new int[]{0, 0, 0}, - new int[][]{new int[]{100, 100 + CELLSIZE * GROUP_SIZE}, - new int[]{100 + CELLSIZE, 100 + CELLSIZE * 4}, - new int[]{100 + CELLSIZE * 2, 100 + CELLSIZE * 5}}, - new int[][]{new int[]{CELLSIZE, CELLSIZE}, - new int[]{CELLSIZE, CELLSIZE}, - new int[]{CELLSIZE, 10}}); - - /** - * start block offset is 2, read 3 * cellSize - */ - testPlanReadPortions(2, GROUP_SIZE * CELLSIZE, 100, - new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, - new int[]{2, 0, 0}, - new int[][]{new int[]{100, 100 + GROUP_SIZE * CELLSIZE - 2}, - new int[]{100 + CELLSIZE - 2}, - new int[]{100 + CELLSIZE * 2 - 2}}, - new int[][]{new int[]{CELLSIZE - 2, 2}, - new int[]{CELLSIZE}, - new int[]{CELLSIZE}}); - - /** - * start block offset is 2, read 3 * cellSize + 10 - */ - testPlanReadPortions(2, GROUP_SIZE * CELLSIZE + 10, 0, - new int[]{CELLSIZE + 10, CELLSIZE, CELLSIZE}, - new int[]{2, 0, 0}, - new int[][]{new int[]{0, GROUP_SIZE * CELLSIZE - 2}, - new int[]{CELLSIZE - 2}, - new int[]{CELLSIZE * 2 - 2}}, - new int[][]{new int[]{CELLSIZE - 2, 12}, - new int[]{CELLSIZE}, - new int[]{CELLSIZE}}); - - /** - * start block offset is cellSize * 2 - 1, read 5 * cellSize + 10 - */ - testPlanReadPortions(CELLSIZE * 2 - 1, 5 * CELLSIZE + 10, 0, - new int[]{CELLSIZE * 2, CELLSIZE + 10, CELLSIZE * 2}, - new int[]{CELLSIZE, CELLSIZE - 1, 0}, - new int[][]{new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1}, - new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1}, - new int[]{1, 3 * CELLSIZE + 1}}, - new int[][]{new int[]{CELLSIZE, CELLSIZE}, - new int[]{1, CELLSIZE, 9}, - new int[]{CELLSIZE, CELLSIZE}}); - - /** - * start block offset is cellSize * 6 - 1, read 7 * cellSize + 10 - */ - testPlanReadPortions(CELLSIZE * 6 - 1, 7 * CELLSIZE + 10, 0, - new int[]{CELLSIZE * 3, CELLSIZE * 2 + 9, CELLSIZE * 2 + 1}, - new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE * 2 - 1}, - new int[][]{new int[]{1, 3 * CELLSIZE + 1, 6 * CELLSIZE + 1}, - new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1, 7 * CELLSIZE + 1}, - new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1}}, - new int[][]{new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, - new int[]{CELLSIZE, CELLSIZE, 9}, - new int[]{1, CELLSIZE, CELLSIZE}}); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java index b4f05d46b9..dfdcee29f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java @@ -189,13 +189,13 @@ private void assertFileBlocksRecovery(String fileName, int fileLen, deadDnIndices[i] = dnMap.get(dataDNs[i]); // Check the block replica file on deadDn before it dead. - blocks[i] = StripedBlockUtil.constructStripedBlock( + blocks[i] = StripedBlockUtil.constructInternalBlock( lastBlock.getBlock(), cellSize, dataBlkNum, indices[toDead[i]]); replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]); metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]); // the block replica on the datanode should be the same as expected assertEquals(replicas[i].length(), - StripedBlockUtil.getStripedBlockLength( + StripedBlockUtil.getInternalBlockLength( lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[toDead[i]])); assertTrue(metadatas[i].getName(). endsWith(blocks[i].getGenerationStamp() + ".meta")); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java index ec0b1bbdae..6f29d69048 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.util; +import com.google.common.base.Preconditions; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -26,26 +27,107 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.parseStripedBlockGroup; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.*; + +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.junit.Before; import org.junit.Test; +import java.util.Random; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +/** + * Need to cover the following combinations: + * 1. Block group size: + * 1.1 One byte + * 1.2 Smaller than cell + * 1.3 One full cell + * 1.4 x full cells, where x is smaller than number of data blocks + * 1.5 x full cells plus a partial cell + * 1.6 One full stripe + * 1.7 One full stripe plus a partial cell + * 1.8 One full stripe plus x full cells + * 1.9 One full stripe plus x full cells plus a partial cell + * 1.10 y full stripes, but smaller than full block group size + * 1.11 Full block group size + * + * 2. Byte range start + * 2.1 Zero + * 2.2 Within first cell + * 2.3 End of first cell + * 2.4 Start of a middle* cell in the first stripe (* neither first or last) + * 2.5 End of middle cell in the first stripe + * 2.6 Within a middle cell in the first stripe + * 2.7 Start of the last cell in the first stripe + * 2.8 Within the last cell in the first stripe + * 2.9 End of the last cell in the first stripe + * 2.10 Start of a middle stripe + * 2.11 Within a middle stripe + * 2.12 End of a middle stripe + * 2.13 Start of the last stripe + * 2.14 Within the last stripe + * 2.15 End of the last stripe (last byte) + * + * 3. Byte range length: same settings as block group size + * + * We should test in total 11 x 15 x 11 = 1815 combinations + * TODO: test parity block logic + */ public class TestStripedBlockUtil { private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS; private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS; - private final short BLK_GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM; + private final short BLK_GROUP_WIDTH = DATA_BLK_NUM + PARITY_BLK_NUM; private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final int FULL_STRIPE_SIZE = DATA_BLK_NUM * CELLSIZE; + /** number of full stripes in a full block group */ + private final int BLK_GROUP_STRIPE_NUM = 16; + private final ECSchema SCEHMA = ErasureCodingSchemaManager. + getSystemDefaultSchema(); + private final Random random = new Random(); - private LocatedStripedBlock createDummyLocatedBlock() { + private int[] blockGroupSizes; + private int[] byteRangeStartOffsets; + private int[] byteRangeSizes; + + @Before + public void setup(){ + blockGroupSizes = new int[]{1, getDelta(CELLSIZE), CELLSIZE, + getDelta(DATA_BLK_NUM) * CELLSIZE, + getDelta(DATA_BLK_NUM) * CELLSIZE + getDelta(CELLSIZE), + FULL_STRIPE_SIZE, FULL_STRIPE_SIZE + getDelta(CELLSIZE), + FULL_STRIPE_SIZE + getDelta(DATA_BLK_NUM) * CELLSIZE, + FULL_STRIPE_SIZE + getDelta(DATA_BLK_NUM) * CELLSIZE + getDelta(CELLSIZE), + getDelta(BLK_GROUP_STRIPE_NUM) * FULL_STRIPE_SIZE, + BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE}; + byteRangeStartOffsets = new int[] {0, getDelta(CELLSIZE), CELLSIZE - 1}; + byteRangeSizes = new int[]{1, getDelta(CELLSIZE), CELLSIZE, + getDelta(DATA_BLK_NUM) * CELLSIZE, + getDelta(DATA_BLK_NUM) * CELLSIZE + getDelta(CELLSIZE), + FULL_STRIPE_SIZE, FULL_STRIPE_SIZE + getDelta(CELLSIZE), + FULL_STRIPE_SIZE + getDelta(DATA_BLK_NUM) * CELLSIZE, + FULL_STRIPE_SIZE + getDelta(DATA_BLK_NUM) * CELLSIZE + getDelta(CELLSIZE), + getDelta(BLK_GROUP_STRIPE_NUM) * FULL_STRIPE_SIZE, + BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE}; + } + + private int getDelta(int size) { + return 1 + random.nextInt(size - 2); + } + private byte hashIntToByte(int i) { + int BYTE_MASK = 0xff; + return (byte) (((i + 13) * 29) & BYTE_MASK); + } + + private LocatedStripedBlock createDummyLocatedBlock(int bgSize) { final long blockGroupID = -1048576; - DatanodeInfo[] locs = new DatanodeInfo[BLK_GROUP_SIZE]; - String[] storageIDs = new String[BLK_GROUP_SIZE]; - StorageType[] storageTypes = new StorageType[BLK_GROUP_SIZE]; - int[] indices = new int[BLK_GROUP_SIZE]; - for (int i = 0; i < BLK_GROUP_SIZE; i++) { + DatanodeInfo[] locs = new DatanodeInfo[BLK_GROUP_WIDTH]; + String[] storageIDs = new String[BLK_GROUP_WIDTH]; + StorageType[] storageTypes = new StorageType[BLK_GROUP_WIDTH]; + int[] indices = new int[BLK_GROUP_WIDTH]; + for (int i = 0; i < BLK_GROUP_WIDTH; i++) { indices[i] = (i + 2) % DATA_BLK_NUM; // Location port always equal to logical index of a block, // for easier verification @@ -53,13 +135,40 @@ private LocatedStripedBlock createDummyLocatedBlock() { storageIDs[i] = locs[i].getDatanodeUuid(); storageTypes[i] = StorageType.DISK; } - return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID), - locs, storageIDs, storageTypes, indices, 0, false, null); + return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID, + bgSize, 1001), locs, storageIDs, storageTypes, indices, 0, false, + null); + } + + private byte[][] createInternalBlkBuffers(int bgSize) { + byte[][] bufs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][]; + int[] pos = new int[DATA_BLK_NUM + PARITY_BLK_NUM]; + for (int i = 0; i < DATA_BLK_NUM + PARITY_BLK_NUM; i++) { + int bufSize = (int) getInternalBlockLength( + bgSize, CELLSIZE, DATA_BLK_NUM, i); + bufs[i] = new byte[bufSize]; + pos[i] = 0; + } + int done = 0; + while (done < bgSize) { + Preconditions.checkState(done % CELLSIZE == 0); + StripingCell cell = new StripingCell(SCEHMA, done / CELLSIZE); + int idxInStripe = cell.idxInStripe; + int size = Math.min(CELLSIZE, bgSize - done); + for (int i = 0; i < size; i++) { + bufs[idxInStripe][pos[idxInStripe] + i] = hashIntToByte(done + i); + } + done += size; + pos[idxInStripe] += size; + } + + return bufs; } @Test public void testParseDummyStripedBlock() { - LocatedStripedBlock lsb = createDummyLocatedBlock(); + LocatedStripedBlock lsb = createDummyLocatedBlock( + BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE); LocatedBlock[] blocks = parseStripedBlockGroup( lsb, CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM); assertEquals(DATA_BLK_NUM + PARITY_BLK_NUM, blocks.length); @@ -68,14 +177,15 @@ public void testParseDummyStripedBlock() { assertEquals(i, BlockIdManager.getBlockIndex(blocks[i].getBlock().getLocalBlock())); assertEquals(i * CELLSIZE, blocks[i].getStartOffset()); + /** TODO: properly define {@link LocatedBlock#offset} for internal blocks */ assertEquals(1, blocks[i].getLocations().length); assertEquals(i, blocks[i].getLocations()[0].getIpcPort()); assertEquals(i, blocks[i].getLocations()[0].getXferPort()); } } - private void verifyInternalBlocks (long numBytesInGroup, long[] expected) { - for (int i = 1; i < BLK_GROUP_SIZE; i++) { + private void verifyInternalBlocks (int numBytesInGroup, int[] expected) { + for (int i = 1; i < BLK_GROUP_WIDTH; i++) { assertEquals(expected[i], getInternalBlockLength(numBytesInGroup, CELLSIZE, DATA_BLK_NUM, i)); } @@ -85,41 +195,85 @@ private void verifyInternalBlocks (long numBytesInGroup, long[] expected) { public void testGetInternalBlockLength () { // A small delta that is smaller than a cell final int delta = 10; - assert delta < CELLSIZE; // Block group is smaller than a cell verifyInternalBlocks(CELLSIZE - delta, - new long[] {CELLSIZE - delta, 0, 0, 0, 0, 0, + new int[] {CELLSIZE - delta, 0, 0, 0, 0, 0, CELLSIZE - delta, CELLSIZE - delta, CELLSIZE - delta}); // Block group is exactly as large as a cell verifyInternalBlocks(CELLSIZE, - new long[] {CELLSIZE, 0, 0, 0, 0, 0, + new int[] {CELLSIZE, 0, 0, 0, 0, 0, CELLSIZE, CELLSIZE, CELLSIZE}); // Block group is a little larger than a cell verifyInternalBlocks(CELLSIZE + delta, - new long[] {CELLSIZE, delta, 0, 0, 0, 0, + new int[] {CELLSIZE, delta, 0, 0, 0, 0, CELLSIZE, CELLSIZE, CELLSIZE}); // Block group contains multiple stripes and ends at stripe boundary verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE, - new long[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, + new int[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE}); // Block group contains multiple stripes and ends at cell boundary // (not ending at stripe boundary) verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE + CELLSIZE, - new long[] {3 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, + new int[] {3 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, 3 * CELLSIZE, 3 * CELLSIZE, 3 * CELLSIZE}); // Block group contains multiple stripes and doesn't end at cell boundary verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE - delta, - new long[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, + new int[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE - delta, 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE}); } + /** + * Test dividing a byte range into aligned stripes and verify the aligned + * ranges can be translated back to the byte range. + */ + @Test + public void testDivideByteRangeIntoStripes() { + byte[] assembled = new byte[BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE]; + for (int bgSize : blockGroupSizes) { + LocatedStripedBlock blockGroup = createDummyLocatedBlock(bgSize); + byte[][] internalBlkBufs = createInternalBlkBuffers(bgSize); + for (int brStart : byteRangeStartOffsets) { + for (int brSize : byteRangeSizes) { + if (brStart + brSize > bgSize) { + continue; + } + AlignedStripe[] stripes = divideByteRangeIntoStripes(SCEHMA, + blockGroup, brStart, brStart + brSize - 1, assembled, 0); + + for (AlignedStripe stripe : stripes) { + for (int i = 0; i < DATA_BLK_NUM; i++) { + StripingChunk chunk = stripe.chunks[i]; + if (chunk == null || chunk.state != StripingChunk.REQUESTED) { + continue; + } + int done = 0; + for (int j = 0; j < chunk.getLengths().length; j++) { + System.arraycopy(internalBlkBufs[i], + (int) stripe.getOffsetInBlock() + done, assembled, + chunk.getOffsets()[j], chunk.getLengths()[j]); + done += chunk.getLengths()[j]; + } + } + } + for (int i = 0; i < brSize; i++) { + if (hashIntToByte(brStart + i) != assembled[i]) { + System.out.println("Oops"); + } + assertEquals("Byte at " + (brStart + i) + " should be the same", + hashIntToByte(brStart + i), assembled[i]); + } + } + } + } + } + }