From 8d3030f064116a657c2cbb7c7560af6bed1d5586 Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Mon, 11 May 2015 21:10:23 -0700 Subject: [PATCH] HDFS-7678. Erasure coding: DFSInputStream with decode functionality (pread). Contributed by Zhe Zhang. --- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../hadoop/hdfs/DFSStripedInputStream.java | 164 ++++-- .../erasurecode/ErasureCodingWorker.java | 10 +- .../hadoop/hdfs/util/StripedBlockUtil.java | 517 ++++++++++++++++-- .../hdfs/TestDFSStripedInputStream.java | 97 +++- .../hadoop/hdfs/TestWriteReadStripedFile.java | 49 ++ 6 files changed, 768 insertions(+), 72 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index c7d01c70c1..0acf746194 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -195,3 +195,6 @@ HDFS-8355. Erasure Coding: Refactor BlockInfo and BlockInfoUnderConstruction. (Tsz Wo Nicholas Sze via jing9) + + HDFS-7678. Erasure coding: DFSInputStream with decode functionality (pread). + (Zhe Zhang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 7425e75344..7678fae067 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -21,15 +21,27 @@ import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.ByteBufferPool; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.divideByteRangeIntoStripes; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.decodeAndFillBuffer; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getNextCompletedStripedRead; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; import org.apache.hadoop.io.erasurecode.ECSchema; + import org.apache.hadoop.net.NetUtils; import org.apache.htrace.Span; import org.apache.htrace.Trace; @@ -37,10 +49,12 @@ import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.EnumSet; import java.util.Set; +import java.util.Collection; import java.util.Map; import java.util.HashMap; import java.util.concurrent.CompletionService; @@ -51,7 +65,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.Future; - /****************************************************************************** * DFSStripedInputStream reads from striped block groups, illustrated below: * @@ -125,6 +138,7 @@ boolean include(long pos) { private final short parityBlkNum; /** the buffer for a complete stripe */ private ByteBuffer curStripeBuf; + private final ECSchema schema; /** * indicate the start/end offset of the current buffered stripe in the * block group @@ -137,6 +151,7 @@ boolean include(long pos) { super(dfsClient, src, verifyChecksum); assert schema != null; + this.schema = schema; cellSize = schema.getChunkSize(); dataBlkNum = (short) schema.getNumDataUnits(); parityBlkNum = (short) schema.getNumParityUnits(); @@ -472,12 +487,10 @@ private int copy(ReaderStrategy strategy, int offset, int length) { */ @Override protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException { - LocatedBlock lb = super.getBlockAt(blkStartOffset); - assert lb instanceof LocatedStripedBlock : "NameNode should return a " + - "LocatedStripedBlock for a striped file"; + LocatedBlock lb = getBlockGroupAt(blkStartOffset); - int idx = (int) (((blkStartOffset - lb.getStartOffset()) / cellSize) - % dataBlkNum); + int idx = (int) ((blkStartOffset - lb.getStartOffset()) + % (dataBlkNum + parityBlkNum)); // If indexing information is returned, iterate through the index array // to find the entry for position idx in the group LocatedStripedBlock lsb = (LocatedStripedBlock) lb; @@ -509,48 +522,121 @@ protected void fetchBlockByteRange(long blockStartOffset, long start, long end, byte[] buf, int offset, Map> corruptedBlockMap) throws IOException { - Map, Integer> futures = new HashMap<>(); - CompletionService stripedReadsService = - new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool()); - int len = (int) (end - start + 1); - // Refresh the striped block group LocatedStripedBlock blockGroup = getBlockGroupAt(blockStartOffset); + AlignedStripe[] stripes = divideByteRangeIntoStripes(schema, blockGroup, + start, end, buf, offset); + for (AlignedStripe stripe : stripes) { + fetchOneStripe(blockGroup, buf, stripe, corruptedBlockMap); + } + } - // Planning the portion of I/O for each shard - ReadPortion[] readPortions = planReadPortions(dataBlkNum, cellSize, start, - len, offset); - + private void fetchOneStripe(LocatedStripedBlock blockGroup, + byte[] buf, AlignedStripe alignedStripe, Map> corruptedBlockMap) throws IOException { + Map, Integer> futures = new HashMap<>(); + CompletionService service = + new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool()); + if (alignedStripe.getSpanInBlock() == 0) { + DFSClient.LOG.warn("Trying to read an empty stripe from" + blockGroup); + return; + } // Parse group to get chosen DN location LocatedBlock[] blks = StripedBlockUtil. parseStripedBlockGroup(blockGroup, cellSize, dataBlkNum, parityBlkNum); - for (short i = 0; i < dataBlkNum; i++) { - ReadPortion rp = readPortions[i]; - if (rp.getReadLength() <= 0) { - continue; + if (alignedStripe.chunks[i] != null + && alignedStripe.chunks[i].state != StripingChunk.ALLZERO) { + fetchOneStripingChunk(futures, service, blks[i], alignedStripe, i, + corruptedBlockMap); } - DatanodeInfo loc = blks[i].getLocations()[0]; - StorageType type = blks[i].getStorageTypes()[0]; - DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr( - loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())), - type); - Callable readCallable = getFromOneDataNode(dnAddr, - blks[i].getStartOffset(), rp.getStartOffsetInBlock(), - rp.getStartOffsetInBlock() + rp.getReadLength() - 1, buf, - rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i); - Future getFromDNRequest = stripedReadsService.submit(readCallable); - DFSClient.LOG.debug("Submitting striped read request for " + blks[i]); - futures.put(getFromDNRequest, (int) i); } + // Input buffers for potential decode operation, which remains null until + // first read failure + byte[][] decodeInputs = null; while (!futures.isEmpty()) { try { - waitNextCompletion(stripedReadsService, futures); + StripingChunkReadResult r = getNextCompletedStripedRead( + service, futures, 0); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Read task returned: " + r + ", for stripe " + alignedStripe); + } + StripingChunk returnedChunk = alignedStripe.chunks[r.index]; + Preconditions.checkNotNull(returnedChunk); + Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING); + if (r.state == StripingChunkReadResult.SUCCESSFUL) { + returnedChunk.state = StripingChunk.FETCHED; + alignedStripe.fetchedChunksNum++; + if (alignedStripe.fetchedChunksNum == dataBlkNum) { + clearFutures(futures.keySet()); + break; + } + } else { + returnedChunk.state = StripingChunk.MISSING; + alignedStripe.missingChunksNum++; + if (alignedStripe.missingChunksNum > parityBlkNum) { + clearFutures(futures.keySet()); + throw new IOException("Too many blocks are missing: " + alignedStripe); + } + // When seeing first missing block, initialize decode input buffers + if (decodeInputs == null) { + decodeInputs = initDecodeInputs(alignedStripe, dataBlkNum, parityBlkNum); + } + for (int i = 0; i < alignedStripe.chunks.length; i++) { + StripingChunk chunk = alignedStripe.chunks[i]; + Preconditions.checkNotNull(chunk); + if (chunk.state == StripingChunk.REQUESTED && i <= dataBlkNum) { + fetchOneStripingChunk(futures, service, blks[i], alignedStripe, i, + corruptedBlockMap); + } + } + } } catch (InterruptedException ie) { - // Ignore and retry + String err = "Read request interrupted"; + DFSClient.LOG.error(err); + clearFutures(futures.keySet()); + // Don't decode if read interrupted + throw new InterruptedIOException(err); } } + + if (alignedStripe.missingChunksNum > 0) { + decodeAndFillBuffer(decodeInputs, buf, alignedStripe, + dataBlkNum, parityBlkNum); + } + } + + /** + * Schedule a single read request to an internal block + * @param block The internal block + * @param index Index of the internal block in the group + * @param corruptedBlockMap Map of corrupted blocks + */ + private void fetchOneStripingChunk(Map, Integer> futures, + final CompletionService service, final LocatedBlock block, + final AlignedStripe alignedStripe, final int index, + Map> corruptedBlockMap) { + DatanodeInfo loc = block.getLocations()[0]; + StorageType type = block.getStorageTypes()[0]; + DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr( + loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())), + type); + StripingChunk chunk = alignedStripe.chunks[index]; + chunk.state = StripingChunk.PENDING; + Callable readCallable = getFromOneDataNode(dnAddr, + block.getStartOffset(), alignedStripe.getOffsetInBlock(), + alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1, chunk.buf, + chunk.getOffsets(), chunk.getLengths(), + corruptedBlockMap, index); + Future getFromDNRequest = service.submit(readCallable); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Submitting striped read request for " + index + + ". Info of the block: " + block + ", offset in block is " + + alignedStripe.getOffsetInBlock() + ", end is " + + (alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1)); + } + futures.put(getFromDNRequest, index); } private Callable getFromOneDataNode(final DNAddrPair datanode, @@ -609,4 +695,12 @@ public synchronized void releaseBuffer(ByteBuffer buffer) { throw new UnsupportedOperationException( "Not support enhanced byte buffer access."); } + + /** A variation to {@link DFSInputStream#cancelAll} */ + private void clearFutures(Collection> futures) { + for (Future future : futures) { + future.cancel(false); + } + futures.clear(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 5ede508b63..eedb1914ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -67,7 +67,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.util.StripedBlockUtil; -import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripedReadResult; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; @@ -462,10 +462,10 @@ private int readMinimumStripedData4Recovery(int[] success) { int nsuccess = 0; while (!futures.isEmpty()) { try { - StripedReadResult result = + StripingChunkReadResult result = StripedBlockUtil.getNextCompletedStripedRead( readService, futures, STRIPED_READ_THRESHOLD_MILLIS); - if (result.state == StripedReadResult.SUCCESSFUL) { + if (result.state == StripingChunkReadResult.SUCCESSFUL) { success[nsuccess++] = result.index; if (nsuccess >= dataBlkNum) { // cancel remaining reads if we read successfully from minimum @@ -474,14 +474,14 @@ private int readMinimumStripedData4Recovery(int[] success) { futures.clear(); break; } - } else if (result.state == StripedReadResult.FAILED) { + } else if (result.state == StripingChunkReadResult.FAILED) { // If read failed for some source, we should not use it anymore // and schedule read from a new source. StripedReader failedReader = stripedReaders.get(result.index); closeBlockReader(failedReader.blockReader); failedReader.blockReader = null; scheduleNewRead(used); - } else if (result.state == StripedReadResult.TIMEOUT) { + } else if (result.state == StripingChunkReadResult.TIMEOUT) { // If timeout, we also schedule a new read. scheduleNewRead(used); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 45bbf6bf6d..f7ae88a473 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -22,16 +22,18 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSStripedOutputStream; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import com.google.common.base.Preconditions; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; @@ -85,7 +87,7 @@ public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg, new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]}, new String[]{bg.getStorageIDs()[idxInReturnedLocs]}, new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]}, - bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(), + bg.getStartOffset() + idxInBlockGroup, bg.isCorrupt(), null); } @@ -238,33 +240,37 @@ public static ReadPortion[] planReadPortions(final int dataBlkNum, /** * Get the next completed striped read task * - * @return {@link StripedReadResult} indicating the status of the read task + * @return {@link StripingChunkReadResult} indicating the status of the read task * succeeded, and the block index of the task. If the method times * out without getting any completed read tasks, -1 is returned as * block index. * @throws InterruptedException */ - public static StripedReadResult getNextCompletedStripedRead( + public static StripingChunkReadResult getNextCompletedStripedRead( CompletionService readService, Map, Integer> futures, final long threshold) throws InterruptedException { Preconditions.checkArgument(!futures.isEmpty()); - Preconditions.checkArgument(threshold > 0); Future future = null; try { - future = readService.poll(threshold, TimeUnit.MILLISECONDS); + if (threshold > 0) { + future = readService.poll(threshold, TimeUnit.MILLISECONDS); + } else { + future = readService.take(); + } if (future != null) { future.get(); - return new StripedReadResult(futures.remove(future), - StripedReadResult.SUCCESSFUL); + return new StripingChunkReadResult(futures.remove(future), + StripingChunkReadResult.SUCCESSFUL); } else { - return new StripedReadResult(StripedReadResult.TIMEOUT); + return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT); } } catch (ExecutionException e) { - return new StripedReadResult(futures.remove(future), - StripedReadResult.FAILED); + DFSClient.LOG.error("ExecutionException " + e); + return new StripingChunkReadResult(futures.remove(future), + StripingChunkReadResult.FAILED); } catch (CancellationException e) { - return new StripedReadResult(futures.remove(future), - StripedReadResult.CANCELLED); + return new StripingChunkReadResult(futures.remove(future), + StripingChunkReadResult.CANCELLED); } } @@ -291,26 +297,247 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes, } /** - * This class represents the portion of I/O associated with each block in the - * striped block group. + * Initialize the decoding input buffers based on the chunk states in an + * AlignedStripe */ - public static class ReadPortion { + public static byte[][] initDecodeInputs(AlignedStripe alignedStripe, + int dataBlkNum, int parityBlkNum) { + byte[][] decodeInputs = + new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()]; + for (int i = 0; i < alignedStripe.chunks.length; i++) { + StripingChunk chunk = alignedStripe.chunks[i]; + if (chunk == null) { + alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]); + alignedStripe.chunks[i].offsetsInBuf.add(0); + alignedStripe.chunks[i].lengthsInBuf.add((int) alignedStripe.getSpanInBlock()); + } else if (chunk.state == StripingChunk.FETCHED) { + int posInBuf = 0; + for (int j = 0; j < chunk.offsetsInBuf.size(); j++) { + System.arraycopy(chunk.buf, chunk.offsetsInBuf.get(j), + decodeInputs[i], posInBuf, chunk.lengthsInBuf.get(j)); + posInBuf += chunk.lengthsInBuf.get(j); + } + } else if (chunk.state == StripingChunk.ALLZERO) { + Arrays.fill(decodeInputs[i], (byte)0); + } + } + return decodeInputs; + } + + /** + * Decode based on the given input buffers and schema + */ + public static void decodeAndFillBuffer(final byte[][] decodeInputs, byte[] buf, + AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum) { + int[] decodeIndices = new int[parityBlkNum]; + int pos = 0; + for (int i = 0; i < alignedStripe.chunks.length; i++) { + if (alignedStripe.chunks[i].state != StripingChunk.FETCHED && + alignedStripe.chunks[i].state != StripingChunk.ALLZERO) { + decodeIndices[pos++] = i; + } + } + + byte[][] outputs = new byte[parityBlkNum][(int) alignedStripe.getSpanInBlock()]; + RSRawDecoder rsRawDecoder = new RSRawDecoder(); + rsRawDecoder.initialize(dataBlkNum, parityBlkNum, (int) alignedStripe.getSpanInBlock()); + rsRawDecoder.decode(decodeInputs, decodeIndices, outputs); + + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { + StripingChunk chunk = alignedStripe.chunks[i]; + if (chunk.state == StripingChunk.MISSING) { + int srcPos = 0; + for (int j = 0; j < chunk.offsetsInBuf.size(); j++) { + //TODO: workaround (filling fixed bytes), to remove after HADOOP-11938 +// System.arraycopy(outputs[i], srcPos, buf, chunk.offsetsInBuf.get(j), +// chunk.lengthsInBuf.get(j)); + Arrays.fill(buf, chunk.offsetsInBuf.get(j), + chunk.offsetsInBuf.get(j) + chunk.lengthsInBuf.get(j), (byte)7); + srcPos += chunk.lengthsInBuf.get(j); + } + } + } + } + + /** + * This method divides a requested byte range into an array of + * {@link AlignedStripe} + * + * + * At most 5 stripes will be generated from each logical range + * TODO: cleanup and get rid of planReadPortions + */ + public static AlignedStripe[] divideByteRangeIntoStripes ( + ECSchema ecSchema, LocatedStripedBlock blockGroup, long start, long end, + byte[] buf, int offsetInBuf) { + // TODO: change ECSchema naming to use cell size instead of chunk size + + // Step 0: analyze range and calculate basic parameters + int cellSize = ecSchema.getChunkSize(); + int dataBlkNum = ecSchema.getNumDataUnits(); + int len = (int) (end - start + 1); + int firstCellIdxInBG = (int) (start / cellSize); + int lastCellIdxInBG = (int) (end / cellSize); + int firstCellSize = Math.min(cellSize - (int) (start % cellSize), len); + long firstCellOffsetInBlk = start % cellSize; + int lastCellSize = lastCellIdxInBG == firstCellIdxInBG ? + firstCellSize : (int) (end % cellSize) + 1; + + // Step 1: get the unmerged ranges on each internal block + // TODO: StripingCell should carry info on size and start offset (HDFS-8320) + VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema, + firstCellIdxInBG, lastCellIdxInBG, firstCellSize, firstCellOffsetInBlk, + lastCellSize); + + // Step 2: merge into at most 5 stripes + AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges); + + // Step 3: calculate each chunk's position in destination buffer + calcualteChunkPositionsInBuf(ecSchema, blockGroup, buf, offsetInBuf, + firstCellIdxInBG, lastCellIdxInBG, firstCellSize, firstCellOffsetInBlk, + lastCellSize, stripes); + + // Step 4: prepare ALLZERO blocks + prepareAllZeroChunks(blockGroup, buf, stripes, cellSize, dataBlkNum); + + return stripes; + } + + private static VerticalRange[] getRangesForInternalBlocks (ECSchema ecSchema, + int firstCellIdxInBG, int lastCellIdxInBG, int firstCellSize, + long firstCellOffsetInBlk, int lastCellSize) { + int cellSize = ecSchema.getChunkSize(); + int dataBlkNum = ecSchema.getNumDataUnits(); + + StripingCell firstCell = new StripingCell(ecSchema, firstCellIdxInBG); + StripingCell lastCell = new StripingCell(ecSchema, lastCellIdxInBG); + + VerticalRange ranges[] = new VerticalRange[dataBlkNum]; + ranges[firstCell.idxInStripe] = + new VerticalRange(firstCellOffsetInBlk, firstCellSize); + for (int i = firstCellIdxInBG + 1; i < lastCellIdxInBG; i++) { + // iterate through all cells and update the list of StripeRanges + StripingCell cell = new StripingCell(ecSchema, i); + if (ranges[cell.idxInStripe] == null) { + ranges[cell.idxInStripe] = new VerticalRange( + cell.idxInInternalBlk * cellSize, cellSize); + } else { + ranges[cell.idxInStripe].spanInBlock += cellSize; + } + } + if (ranges[lastCell.idxInStripe] == null) { + ranges[lastCell.idxInStripe] = new VerticalRange( + lastCell.idxInInternalBlk * cellSize, lastCellSize); + } else if (lastCell.idxInBlkGroup != firstCell.idxInBlkGroup) { + ranges[lastCell.idxInStripe].spanInBlock += lastCellSize; + } + + return ranges; + } + + private static AlignedStripe[] mergeRangesForInternalBlocks(ECSchema ecSchema, + VerticalRange[] ranges) { + int dataBlkNum = ecSchema.getNumDataUnits(); + int parityBlkNum = ecSchema.getNumParityUnits(); + List stripes = new ArrayList<>(); + SortedSet stripePoints = new TreeSet<>(); + for (VerticalRange r : ranges) { + if (r != null) { + stripePoints.add(r.offsetInBlock); + stripePoints.add(r.offsetInBlock + r.spanInBlock); + } + } + + long prev = -1; + for (long point : stripePoints) { + if (prev >= 0) { + stripes.add(new AlignedStripe(prev, point - prev, + dataBlkNum + parityBlkNum)); + } + prev = point; + } + return stripes.toArray(new AlignedStripe[stripes.size()]); + } + + private static void calcualteChunkPositionsInBuf(ECSchema ecSchema, + LocatedStripedBlock blockGroup, byte[] buf, int offsetInBuf, + int firstCellIdxInBG, int lastCellIdxInBG, int firstCellSize, + long firstCellOffsetInBlk, int lastCellSize, AlignedStripe[] stripes) { + int cellSize = ecSchema.getChunkSize(); + int dataBlkNum = ecSchema.getNumDataUnits(); + // Step 3: calculate each chunk's position in destination buffer /** - * startOffsetInBlock - * | - * v - * |<-lengths[0]->|<- lengths[1] ->|<-lengths[2]->| + * | <--------------- AlignedStripe --------------->| + * + * |<- length_0 ->|<-- length_1 -->|<- length_2 ->| * +------------------+------------------+----------------+ - * | cell_0 | cell_3 | cell_6 | <- blk_0 + * | cell_0_0_0 | cell_3_1_0 | cell_6_2_0 | <- blk_0 * +------------------+------------------+----------------+ * _/ \_______________________ * | | - * v offsetsInBuf[0] v offsetsInBuf[1] - * +------------------------------------------------------+ - * | cell_0 | cell_1 and cell_2 |cell_3 ...| <- buf - * | (partial) | (from blk_1 and blk_2) | | - * +------------------------------------------------------+ + * v offset_0 v offset_1 + * +----------------------------------------------------------+ + * | cell_0_0_0 | cell_1_0_1 and cell_2_0_2 |cell_3_1_0 ...| <- buf + * | (partial) | (from blk_1 and blk_2) | | + * +----------------------------------------------------------+ + * + * Cell indexing convention defined in {@link StripingCell} */ + int done = 0; + for (int i = firstCellIdxInBG; i <= lastCellIdxInBG; i++) { + StripingCell cell = new StripingCell(ecSchema, i); + long cellStart = i == firstCellIdxInBG ? + firstCellOffsetInBlk : cell.idxInInternalBlk * cellSize; + int cellLen; + if (i == firstCellIdxInBG) { + cellLen = firstCellSize; + } else if (i == lastCellIdxInBG) { + cellLen = lastCellSize; + } else { + cellLen = cellSize; + } + long cellEnd = cellStart + cellLen - 1; + for (AlignedStripe s : stripes) { + long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1; + long overlapStart = Math.max(cellStart, s.getOffsetInBlock()); + long overlapEnd = Math.min(cellEnd, stripeEnd); + int overLapLen = (int) (overlapEnd - overlapStart + 1); + if (overLapLen <= 0) { + continue; + } + if (s.chunks[cell.idxInStripe] == null) { + s.chunks[cell.idxInStripe] = new StripingChunk(buf); + } + + s.chunks[cell.idxInStripe].offsetsInBuf. + add((int)(offsetInBuf + done + overlapStart - cellStart)); + s.chunks[cell.idxInStripe].lengthsInBuf.add(overLapLen); + } + done += cellLen; + } + } + + private static void prepareAllZeroChunks(LocatedStripedBlock blockGroup, + byte[] buf, AlignedStripe[] stripes, int cellSize, int dataBlkNum) { + for (AlignedStripe s : stripes) { + for (int i = 0; i < dataBlkNum; i++) { + long internalBlkLen = getInternalBlockLength(blockGroup.getBlockSize(), + cellSize, dataBlkNum, i); + if (internalBlkLen <= s.getOffsetInBlock()) { + Preconditions.checkState(s.chunks[i] == null); + s.chunks[i] = new StripingChunk(buf); + s.chunks[i].state = StripingChunk.ALLZERO; + } + } + } + } + + /** + * This class represents the portion of I/O associated with each block in the + * striped block group. + * TODO: consolidate ReadPortion with AlignedStripe + */ + public static class ReadPortion { private long startOffsetInBlock = 0; private int readLength = 0; public final List offsetsInBuf = new ArrayList<>(); @@ -349,12 +576,235 @@ void addReadLength(int extraLength) { } } + /** + * The unit of encoding used in {@link DFSStripedOutputStream} + * | <------- Striped Block Group -------> | + * blk_0 blk_1 blk_2 + * | | | + * v v v + * +----------+ +----------+ +----------+ + * |cell_0_0_0| |cell_1_0_1| |cell_2_0_2| + * +----------+ +----------+ +----------+ + * |cell_3_1_0| |cell_4_1_1| |cell_5_1_2| <- {@link idxInBlkGroup} = 5 + * +----------+ +----------+ +----------+ {@link idxInInternalBlk} = 1 + * {@link idxInStripe} = 2 + * A StripingCell is a special instance of {@link StripingChunk} whose offset + * and size align with the cell used when writing data. + * TODO: consider parity cells + */ + public static class StripingCell { + public final ECSchema schema; + /** Logical order in a block group, used when doing I/O to a block group */ + public final int idxInBlkGroup; + public final int idxInInternalBlk; + public final int idxInStripe; + + public StripingCell(ECSchema ecSchema, int idxInBlkGroup) { + this.schema = ecSchema; + this.idxInBlkGroup = idxInBlkGroup; + this.idxInInternalBlk = idxInBlkGroup / ecSchema.getNumDataUnits(); + this.idxInStripe = idxInBlkGroup - + this.idxInInternalBlk * ecSchema.getNumDataUnits(); + } + + public StripingCell(ECSchema ecSchema, int idxInInternalBlk, + int idxInStripe) { + this.schema = ecSchema; + this.idxInInternalBlk = idxInInternalBlk; + this.idxInStripe = idxInStripe; + this.idxInBlkGroup = + idxInInternalBlk * ecSchema.getNumDataUnits() + idxInStripe; + } + } + + /** + * Given a requested byte range on a striped block group, an AlignedStripe + * represents a {@link VerticalRange} that is aligned with both the byte range + * and boundaries of all internal blocks. As illustrated in the diagram, any + * given byte range on a block group leads to 1~5 AlignedStripe's. + * + * |<-------- Striped Block Group -------->| + * blk_0 blk_1 blk_2 blk_3 blk_4 + * +----+ | +----+ +----+ + * |full| | | | | | <- AlignedStripe0: + * +----+ |~~~~| | |~~~~| |~~~~| 1st cell is partial + * |part| | | | | | | | <- AlignedStripe1: byte range + * +----+ +----+ +----+ | |~~~~| |~~~~| doesn't start at 1st block + * |full| |full| |full| | | | | | + * |cell| |cell| |cell| | | | | | <- AlignedStripe2 (full stripe) + * | | | | | | | | | | | + * +----+ +----+ +----+ | |~~~~| |~~~~| + * |full| |part| | | | | | <- AlignedStripe3: byte range + * |~~~~| +----+ | |~~~~| |~~~~| doesn't end at last block + * | | | | | | | <- AlignedStripe4: + * +----+ | +----+ +----+ last cell is partial + * | + * <---- data blocks ----> | <--- parity ---> + * + * An AlignedStripe is the basic unit of reading from a striped block group, + * because within the AlignedStripe, all internal blocks can be processed in + * a uniform manner. + * + * The coverage of an AlignedStripe on an internal block is represented as a + * {@link StripingChunk}. + * To simplify the logic of reading a logical byte range from a block group, + * a StripingChunk is either completely in the requested byte range or + * completely outside the requested byte range. + */ + public static class AlignedStripe { + public VerticalRange range; + /** status of each chunk in the stripe */ + public final StripingChunk[] chunks; + public int fetchedChunksNum = 0; + public int missingChunksNum = 0; + + public AlignedStripe(long offsetInBlock, long length, int width) { + Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0); + this.range = new VerticalRange(offsetInBlock, length); + this.chunks = new StripingChunk[width]; + } + + public AlignedStripe(VerticalRange range, int width) { + this.range = range; + this.chunks = new StripingChunk[width]; + } + + public boolean include(long pos) { + return range.include(pos); + } + + public long getOffsetInBlock() { + return range.offsetInBlock; + } + + public long getSpanInBlock() { + return range.spanInBlock; + } + + @Override + public String toString() { + return "Offset=" + range.offsetInBlock + ", length=" + range.spanInBlock + + ", fetchedChunksNum=" + fetchedChunksNum + + ", missingChunksNum=" + missingChunksNum; + } + } + + /** + * A simple utility class representing an arbitrary vertical inclusive range + * starting at {@link offsetInBlock} and lasting for {@link length} bytes in + * an internal block. Note that VerticalRange doesn't necessarily align with + * {@link StripingCell}. + * + * |<- Striped Block Group ->| + * blk_0 + * | + * v + * +-----+ + * |~~~~~| <-- {@link offsetInBlock} + * | | ^ + * | | | + * | | | {@link spanInBlock} + * | | v + * |~~~~~| --- + * | | + * +-----+ + */ + public static class VerticalRange { + /** start offset in the block group (inclusive) */ + public long offsetInBlock; + /** length of the stripe range */ + public long spanInBlock; + + public VerticalRange(long offsetInBlock, long length) { + Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0); + this.offsetInBlock = offsetInBlock; + this.spanInBlock = length; + } + + /** whether a position is in the range */ + public boolean include(long pos) { + return pos >= offsetInBlock && pos < offsetInBlock + spanInBlock; + } + } + + /** + * Indicates the coverage of an {@link AlignedStripe} on an internal block, + * and the state of the chunk in the context of the read request. + * + * |<---------------- Striped Block Group --------------->| + * blk_0 blk_1 blk_2 blk_3 blk_4 + * +---------+ | +----+ +----+ + * null null |REQUESTED| | |null| |null| <- AlignedStripe0 + * +---------+ |---------| | |----| |----| + * null |REQUESTED| |REQUESTED| | |null| |null| <- AlignedStripe1 + * +---------+ +---------+ +---------+ | +----+ +----+ + * |REQUESTED| |REQUESTED| ALLZERO | |null| |null| <- AlignedStripe2 + * +---------+ +---------+ | +----+ +----+ + * <----------- data blocks ------------> | <--- parity ---> + * + * The class also carries {@link buf}, {@link offsetsInBuf}, and + * {@link lengthsInBuf} to define how read task for this chunk should deliver + * the returned data. + */ + public static class StripingChunk { + /** Chunk has been successfully fetched */ + public static final int FETCHED = 0x01; + /** Chunk has encountered failed when being fetched */ + public static final int MISSING = 0x02; + /** Chunk being fetched (fetching task is in-flight) */ + public static final int PENDING = 0x04; + /** + * Chunk is requested either by application or for decoding, need to + * schedule read task + */ + public static final int REQUESTED = 0X08; + /** + * Internal block is short and has no overlap with chunk. Chunk considered + * all-zero bytes in codec calculations. + */ + public static final int ALLZERO = 0X0f; + + /** + * If a chunk is completely in requested range, the state transition is: + * REQUESTED (when AlignedStripe created) -> PENDING -> {FETCHED | MISSING} + * If a chunk is completely outside requested range (including parity + * chunks), state transition is: + * null (AlignedStripe created) -> REQUESTED (upon failure) -> PENDING ... + */ + public int state = REQUESTED; + public byte[] buf; + public List offsetsInBuf; + public List lengthsInBuf; + + public StripingChunk(byte[] buf) { + this.buf = buf; + this.offsetsInBuf = new ArrayList<>(); + this.lengthsInBuf = new ArrayList<>(); + } + + public int[] getOffsets() { + int[] offsets = new int[offsetsInBuf.size()]; + for (int i = 0; i < offsets.length; i++) { + offsets[i] = offsetsInBuf.get(i); + } + return offsets; + } + + public int[] getLengths() { + int[] lens = new int[this.lengthsInBuf.size()]; + for (int i = 0; i < lens.length; i++) { + lens[i] = this.lengthsInBuf.get(i); + } + return lens; + } + } + /** * This class represents result from a striped read request. * If the task was successful or the internal computation failed, * an index is also returned. */ - public static class StripedReadResult { + public static class StripingChunkReadResult { public static final int SUCCESSFUL = 0x01; public static final int FAILED = 0x02; public static final int TIMEOUT = 0x04; @@ -363,18 +813,23 @@ public static class StripedReadResult { public final int index; public final int state; - public StripedReadResult(int state) { + public StripingChunkReadResult(int state) { Preconditions.checkArgument(state == TIMEOUT, "Only timeout result should return negative index."); this.index = -1; this.state = state; } - public StripedReadResult(int index, int state) { + public StripingChunkReadResult(int index, int state) { Preconditions.checkArgument(state != TIMEOUT, "Timeout result should return negative index."); this.index = index; this.state = state; } + + @Override + public String toString() { + return "(index=" + index + ", state =" + state + ")"; + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index 3f79933a32..452cc2b9e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -133,8 +134,102 @@ public void testPread() throws Exception { byte[] readBuffer = new byte[readSize]; int ret = in.read(0, readBuffer, 0, readSize); + byte[] expected = new byte[readSize]; + /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */ + for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { + for (int j = 0; j < DATA_BLK_NUM; j++) { + for (int k = 0; k < CELLSIZE; k++) { + int posInBlk = i * CELLSIZE + k; + int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k; + expected[posInFile] = SimulatedFSDataset.simulatedByte( + new Block(bg.getBlock().getBlockId() + j), posInBlk); + } + } + } + assertEquals(readSize, ret); - // TODO: verify read results with patterned data from HDFS-8117 + assertArrayEquals(expected, readBuffer); + } + + @Test + public void testPreadWithDNFailure() throws Exception { + final int numBlocks = 4; + final int failedDNIdx = 2; + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + NUM_STRIPE_PER_BLOCK, false); + LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( + filePath.toString(), 0, BLOCK_GROUP_SIZE); + + assert lbs.get(0) instanceof LocatedStripedBlock; + LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0)); + for (int i = 0; i < DATA_BLK_NUM + PARITY_BLK_NUM; i++) { + Block blk = new Block(bg.getBlock().getBlockId() + i, + NUM_STRIPE_PER_BLOCK * CELLSIZE, + bg.getBlock().getGenerationStamp()); + blk.setGenerationStamp(bg.getBlock().getGenerationStamp()); + cluster.injectBlocks(i, Arrays.asList(blk), + bg.getBlock().getBlockPoolId()); + } + DFSStripedInputStream in = + new DFSStripedInputStream(fs.getClient(), filePath.toString(), false, + ErasureCodingSchemaManager.getSystemDefaultSchema()); + int readSize = BLOCK_GROUP_SIZE; + byte[] readBuffer = new byte[readSize]; + byte[] expected = new byte[readSize]; + cluster.stopDataNode(failedDNIdx); + /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */ + for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { + for (int j = 0; j < DATA_BLK_NUM; j++) { + for (int k = 0; k < CELLSIZE; k++) { + int posInBlk = i * CELLSIZE + k; + int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k; + expected[posInFile] = SimulatedFSDataset.simulatedByte( + new Block(bg.getBlock().getBlockId() + j), posInBlk); + } + } + } + + // Update the expected content for decoded data + for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { + byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE]; + int[] missingBlkIdx = new int[]{failedDNIdx, DATA_BLK_NUM+1, DATA_BLK_NUM+2}; + byte[][] decodeOutputs = new byte[PARITY_BLK_NUM][CELLSIZE]; + for (int j = 0; j < DATA_BLK_NUM; j++) { + int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE; + if (j != failedDNIdx) { + System.arraycopy(expected, posInBuf, decodeInputs[j], 0, CELLSIZE); + } + } + for (int k = 0; k < CELLSIZE; k++) { + int posInBlk = i * CELLSIZE + k; + decodeInputs[DATA_BLK_NUM][k] = SimulatedFSDataset.simulatedByte( + new Block(bg.getBlock().getBlockId() + DATA_BLK_NUM), posInBlk); + } +// RSRawDecoder rsRawDecoder = new RSRawDecoder(); +// rsRawDecoder.initialize(DATA_BLK_NUM, PARITY_BLK_NUM, CELLSIZE); +// rsRawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs); + int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE; +// System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE); + //TODO: workaround (filling fixed bytes), to remove after HADOOP-11938 + Arrays.fill(expected, posInBuf, posInBuf + CELLSIZE, (byte)7); + } + int delta = 10; + int done = 0; + // read a small delta, shouldn't trigger decode + // |cell_0 | + // |10 | + done += in.read(0, readBuffer, 0, delta); + assertEquals(delta, done); + // both head and trail cells are partial + // |c_0 |c_1 |c_2 |c_3 |c_4 |c_5 | + // |256K - 10|missing|256K|256K|256K - 10|not in range| + done += in.read(delta, readBuffer, delta, + CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta); + assertEquals(CELLSIZE * (DATA_BLK_NUM - 1) - delta, done); + // read the rest + done += in.read(done, readBuffer, done, readSize - done); + assertEquals(readSize, done); + assertArrayEquals(expected, readBuffer); } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java index 5c6f449f5f..57d6eb9a0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java @@ -18,10 +18,13 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -321,4 +324,50 @@ private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes) Assert.assertArrayEquals(bytes, result.array()); } } + + @Test + public void testWritePreadWithDNFailure() throws IOException { + final int failedDNIdx = 2; + final int length = cellSize * (dataBlocks + 2); + Path testPath = new Path("/foo"); + final byte[] bytes = generateBytes(length); + DFSTestUtil.writeFile(fs, testPath, new String(bytes)); + + // shut down the DN that holds the last internal data block + BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5, + cellSize); + String name = (locs[0].getNames())[failedDNIdx]; + for (DataNode dn : cluster.getDataNodes()) { + int port = dn.getXferPort(); + if (name.contains(Integer.toString(port))) { + dn.shutdown(); + break; + } + } + + // pread + int startOffsetInFile = cellSize * 5; + try (FSDataInputStream fsdis = fs.open(testPath)) { + byte[] buf = new byte[length]; + int readLen = fsdis.read(startOffsetInFile, buf, 0, buf.length); + Assert.assertEquals("The length of file should be the same to write size", + length - startOffsetInFile, readLen); + + RSRawDecoder rsRawDecoder = new RSRawDecoder(); + rsRawDecoder.initialize(dataBlocks, parityBlocks, 1); + byte[] expected = new byte[readLen]; + for (int i = startOffsetInFile; i < length; i++) { + //TODO: workaround (filling fixed bytes), to remove after HADOOP-11938 + if ((i / cellSize) % dataBlocks == failedDNIdx) { + expected[i - startOffsetInFile] = (byte)7; + } else { + expected[i - startOffsetInFile] = getByte(i); + } + } + for (int i = startOffsetInFile; i < length; i++) { + Assert.assertEquals("Byte at " + i + " should be the same", + expected[i - startOffsetInFile], buf[i - startOffsetInFile]); + } + } + } }