diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 7b6d165195..cd9e19d847 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -344,3 +344,6 @@ HDFS-8744. Erasure Coding: the number of chunks in packet is not updated when writing parity data. (Li Bo) + + HDFS-8669. Erasure Coding: handle missing internal block locations in + DFSStripedInputStream. (jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java index 0a5511e16f..8f988afaa4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs; +import java.io.Closeable; import java.io.IOException; import java.util.EnumSet; @@ -31,7 +32,7 @@ import org.apache.hadoop.util.DataChecksum; * from a single datanode. */ @InterfaceAudience.Private -public interface BlockReader extends ByteBufferReadable { +public interface BlockReader extends ByteBufferReadable, Closeable { /* same interface as inputStream java.io.InputStream#read() @@ -63,6 +64,7 @@ public interface BlockReader extends ByteBufferReadable { * * @throws IOException */ + @Override // java.io.Closeable void close() throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index a71da931da..75090036f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -43,6 +43,7 @@ import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECSchema; @@ -113,16 +114,43 @@ public class DFSStripedInputStream extends DFSInputStream { } } - private final BlockReader[] blockReaders; - /** - * when initializing block readers, their starting offsets are set to the same - * number: the smallest internal block offsets among all the readers. This is - * because it is possible that for some internal blocks we have to read - * "backwards" for decoding purpose. We thus use this offset array to track - * offsets for all the block readers so that we can skip data if necessary. - */ - private final long[] blockReaderOffsets; - private final DatanodeInfo[] currentNodes; + private static class BlockReaderInfo { + final BlockReader reader; + final DatanodeInfo datanode; + /** + * when initializing block readers, their starting offsets are set to the same + * number: the smallest internal block offsets among all the readers. This is + * because it is possible that for some internal blocks we have to read + * "backwards" for decoding purpose. We thus use this offset array to track + * offsets for all the block readers so that we can skip data if necessary. + */ + long blockReaderOffset; + LocatedBlock targetBlock; + /** + * We use this field to indicate whether we should use this reader. In case + * we hit any issue with this reader, we set this field to true and avoid + * using it for the next stripe. + */ + boolean shouldSkip = false; + + BlockReaderInfo(BlockReader reader, LocatedBlock targetBlock, + DatanodeInfo dn, long offset) { + this.reader = reader; + this.targetBlock = targetBlock; + this.datanode = dn; + this.blockReaderOffset = offset; + } + + void setOffset(long offset) { + this.blockReaderOffset = offset; + } + + void skip() { + this.shouldSkip = true; + } + } + + private final BlockReaderInfo[] blockReaders; private final int cellSize; private final short dataBlkNum; private final short parityBlkNum; @@ -151,9 +179,7 @@ public class DFSStripedInputStream extends DFSInputStream { dataBlkNum = (short) schema.getNumDataUnits(); parityBlkNum = (short) schema.getNumParityUnits(); groupSize = dataBlkNum + parityBlkNum; - blockReaders = new BlockReader[groupSize]; - blockReaderOffsets = new long[groupSize]; - currentNodes = new DatanodeInfo[groupSize]; + blockReaders = new BlockReaderInfo[groupSize]; curStripeRange = new StripeRange(0, 0); readingService = new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool()); @@ -218,18 +244,26 @@ public class DFSStripedInputStream extends DFSInputStream { for (int i = 0; i < dataBlkNum; i++) { LocatedBlock targetBlock = targetBlocks[i]; if (targetBlock != null) { - DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null); - if (retval != null) { - currentNodes[i] = retval.info; - blockReaders[i] = getBlockReaderWithRetry(targetBlock, + DNAddrPair dnInfo = getBestNodeDNAddrPair(targetBlock, null); + if (dnInfo != null) { + BlockReader reader = getBlockReaderWithRetry(targetBlock, minOffset, targetBlock.getBlockSize() - minOffset, - retval.addr, retval.storageType, retval.info, target, retry); - blockReaderOffsets[i] = minOffset; + dnInfo.addr, dnInfo.storageType, dnInfo.info, target, retry); + if (reader != null) { + blockReaders[i] = new BlockReaderInfo(reader, targetBlock, + dnInfo.info, minOffset); + } } } } } + /** + * @throws IOException only when failing to refetch block token, which happens + * when this client cannot get located block information from NameNode. This + * method returns null instead of throwing exception when failing to connect + * to the DataNode. + */ private BlockReader getBlockReaderWithRetry(LocatedBlock targetBlock, long offsetInBlock, long length, InetSocketAddress targetAddr, StorageType storageType, DatanodeInfo datanode, long offsetInFile, @@ -275,21 +309,16 @@ public class DFSStripedInputStream extends DFSInputStream { } for (int i = 0; i < groupSize; i++) { closeReader(i); - currentNodes[i] = null; + blockReaders[i] = null; } blockEnd = -1; } private void closeReader(int index) { if (blockReaders[index] != null) { - try { - blockReaders[index].close(); - } catch (IOException e) { - DFSClient.LOG.error("error closing blockReader " + index, e); - } - blockReaders[index] = null; + IOUtils.cleanup(DFSClient.LOG, blockReaders[index].reader); + blockReaders[index].skip(); } - blockReaderOffsets[index] = 0; } private long getOffsetInBlockGroup() { @@ -323,16 +352,14 @@ public class DFSStripedInputStream extends DFSInputStream { AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(schema, cellSize, blockGroup, offsetInBlockGroup, offsetInBlockGroup + curStripeRange.length - 1, curStripeBuf); - // TODO handle null elements in blks (e.g., NN does not know locations for - // all the internal blocks) final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( blockGroup, cellSize, dataBlkNum, parityBlkNum); // read the whole stripe for (AlignedStripe stripe : stripes) { // Parse group to get chosen DN location StripeReader sreader = new StatefulStripeReader(readingService, stripe, - blks); - sreader.readStripe(blks, corruptedBlockMap); + blks, corruptedBlockMap); + sreader.readStripe(); } curStripeBuf.position(stripeBufOffset); curStripeBuf.limit(stripeLimit); @@ -549,14 +576,13 @@ public class DFSStripedInputStream extends DFSInputStream { blockGroup, start, end, buf, offset); CompletionService readService = new ExecutorCompletionService<>( dfsClient.getStripedReadsThreadPool()); - // TODO handle null elements in blks (e.g., NN does not know locations for - // all the internal blocks) final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( blockGroup, cellSize, dataBlkNum, parityBlkNum); for (AlignedStripe stripe : stripes) { // Parse group to get chosen DN location - StripeReader preader = new PositionStripeReader(readService, stripe); - preader.readStripe(blks, corruptedBlockMap); + StripeReader preader = new PositionStripeReader(readService, stripe, + blks, corruptedBlockMap); + preader.readStripe(); } } @@ -586,43 +612,89 @@ public class DFSStripedInputStream extends DFSInputStream { final Map, Integer> futures = new HashMap<>(); final AlignedStripe alignedStripe; final CompletionService service; + final LocatedBlock[] targetBlocks; + final Map> corruptedBlockMap; - StripeReader(CompletionService service, AlignedStripe alignedStripe) { + StripeReader(CompletionService service, AlignedStripe alignedStripe, + LocatedBlock[] targetBlocks, + Map> corruptedBlockMap) { this.service = service; this.alignedStripe = alignedStripe; + this.targetBlocks = targetBlocks; + this.corruptedBlockMap = corruptedBlockMap; } - /** submit reading chunk task */ - abstract void readChunk(final CompletionService service, - final LocatedBlock block, int chunkIndex, - Map> corruptedBlockMap); + abstract boolean readChunk(final CompletionService service, + final LocatedBlock block, int chunkIndex); - /** - * When seeing first missing block, initialize decode input buffers. - * Also prepare the reading for data blocks outside of the reading range. - */ - abstract void prepareDecodeInputs() throws IOException; + /** prepare all the data chunks */ + abstract void prepareDecodeInputs(); - /** - * Prepare reading for one more parity chunk. - */ - abstract void prepareParityChunk() throws IOException; + /** prepare the parity chunk and block reader if necessary */ + abstract boolean prepareParityChunk(int index) throws IOException; abstract void decode(); abstract void updateState4SuccessRead(StripingChunkReadResult result); - /** read the whole stripe. do decoding if necessary */ - void readStripe(LocatedBlock[] blocks, - Map> corruptedBlockMap) - throws IOException { - assert alignedStripe.getSpanInBlock() > 0; - for (short i = 0; i < dataBlkNum; i++) { - if (alignedStripe.chunks[i] != null - && alignedStripe.chunks[i].state != StripingChunk.ALLZERO) { - readChunk(service, blocks[i], i, corruptedBlockMap); + private void checkMissingBlocks() throws IOException { + if (alignedStripe.missingChunksNum > parityBlkNum) { + clearFutures(futures.keySet()); + throw new IOException(alignedStripe.missingChunksNum + + " missing blocks, the stripe is: " + alignedStripe); + } + } + + /** + * We need decoding. Thus go through all the data chunks and make sure we + * submit read requests for all of them. + */ + private void readDataForDecoding() throws IOException { + prepareDecodeInputs(); + for (int i = 0; i < dataBlkNum; i++) { + Preconditions.checkNotNull(alignedStripe.chunks[i]); + if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) { + if (!readChunk(service, targetBlocks[i], i)) { + alignedStripe.missingChunksNum++; + } } } + checkMissingBlocks(); + } + + void readParityChunks(int num) throws IOException { + for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num; + i++) { + if (alignedStripe.chunks[i] == null) { + if (prepareParityChunk(i) && readChunk(service, targetBlocks[i], i)) { + j++; + } else { + alignedStripe.missingChunksNum++; + } + } + } + checkMissingBlocks(); + } + + /** read the whole stripe. do decoding if necessary */ + void readStripe() throws IOException { + for (int i = 0; i < dataBlkNum; i++) { + if (alignedStripe.chunks[i] != null && + alignedStripe.chunks[i].state != StripingChunk.ALLZERO) { + if (!readChunk(service, targetBlocks[i], i)) { + alignedStripe.missingChunksNum++; + } + } + } + // There are missing block locations at this stage. Thus we need to read + // the full stripe and one more parity block. + if (alignedStripe.missingChunksNum > 0) { + checkMissingBlocks(); + readDataForDecoding(); + // read parity chunks + readParityChunks(alignedStripe.missingChunksNum); + } + // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks // Input buffers for potential decode operation, which remains null until // first read failure @@ -648,24 +720,15 @@ public class DFSStripedInputStream extends DFSInputStream { } } else { returnedChunk.state = StripingChunk.MISSING; - alignedStripe.missingChunksNum++; - if (alignedStripe.missingChunksNum > parityBlkNum) { - clearFutures(futures.keySet()); - throw new IOException("Too many blocks are missing: " - + alignedStripe); - } - - prepareDecodeInputs(); - prepareParityChunk(); // close the corresponding reader closeReader(r.index); - for (int i = 0; i < alignedStripe.chunks.length; i++) { - StripingChunk chunk = alignedStripe.chunks[i]; - if (chunk != null && chunk.state == StripingChunk.REQUESTED) { - readChunk(service, blocks[i], i, corruptedBlockMap); - } - } + final int missing = alignedStripe.missingChunksNum; + alignedStripe.missingChunksNum++; + checkMissingBlocks(); + + readDataForDecoding(); + readParityChunks(alignedStripe.missingChunksNum - missing); } } catch (InterruptedException ie) { String err = "Read request interrupted"; @@ -686,20 +749,24 @@ public class DFSStripedInputStream extends DFSInputStream { private byte[][] decodeInputs = null; PositionStripeReader(CompletionService service, - AlignedStripe alignedStripe) { - super(service, alignedStripe); + AlignedStripe alignedStripe, LocatedBlock[] targetBlocks, + Map> corruptedBlockMap) { + super(service, alignedStripe, targetBlocks, corruptedBlockMap); } @Override - void readChunk(final CompletionService service, - final LocatedBlock block, int chunkIndex, - Map> corruptedBlockMap) { + boolean readChunk(final CompletionService service, + final LocatedBlock block, int chunkIndex) { + final StripingChunk chunk = alignedStripe.chunks[chunkIndex]; + if (block == null) { + chunk.state = StripingChunk.MISSING; + return false; + } DatanodeInfo loc = block.getLocations()[0]; StorageType type = block.getStorageTypes()[0]; DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr( loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())), type); - StripingChunk chunk = alignedStripe.chunks[chunkIndex]; chunk.state = StripingChunk.PENDING; Callable readCallable = getFromOneDataNode(dnAddr, block, alignedStripe.getOffsetInBlock(), @@ -715,6 +782,7 @@ public class DFSStripedInputStream extends DFSInputStream { + alignedStripe.getSpanInBlock() - 1)); } futures.put(getFromDNRequest, chunkIndex); + return true; } @Override @@ -728,18 +796,15 @@ public class DFSStripedInputStream extends DFSInputStream { } @Override - void prepareParityChunk() { - for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) { - if (alignedStripe.chunks[i] == null) { - final int decodeIndex = convertIndex4Decode(i, - dataBlkNum, parityBlkNum); - alignedStripe.chunks[i] = - new StripingChunk(decodeInputs[decodeIndex]); - alignedStripe.chunks[i].addByteArraySlice(0, - (int) alignedStripe.getSpanInBlock()); - break; - } - } + boolean prepareParityChunk(int index) { + Preconditions.checkState(index >= dataBlkNum && + alignedStripe.chunks[index] == null); + final int decodeIndex = convertIndex4Decode(index, dataBlkNum, + parityBlkNum); + alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]); + alignedStripe.chunks[index].addByteArraySlice(0, + (int) alignedStripe.getSpanInBlock()); + return true; } @Override @@ -753,39 +818,43 @@ public class DFSStripedInputStream extends DFSInputStream { class StatefulStripeReader extends StripeReader { ByteBuffer[] decodeInputs; - final LocatedBlock[] targetBlocks; StatefulStripeReader(CompletionService service, - AlignedStripe alignedStripe, LocatedBlock[] targetBlocks) { - super(service, alignedStripe); - this.targetBlocks = targetBlocks; + AlignedStripe alignedStripe, LocatedBlock[] targetBlocks, + Map> corruptedBlockMap) { + super(service, alignedStripe, targetBlocks, corruptedBlockMap); } @Override - void readChunk(final CompletionService service, - final LocatedBlock block, int chunkIndex, Map> corruptedBlockMap) { - StripingChunk chunk = alignedStripe.chunks[chunkIndex]; + boolean readChunk(final CompletionService service, + final LocatedBlock block, int chunkIndex) { + final StripingChunk chunk = alignedStripe.chunks[chunkIndex]; + final BlockReaderInfo readerInfo = blockReaders[chunkIndex]; + if (readerInfo == null || block == null || readerInfo.shouldSkip) { + chunk.state = StripingChunk.MISSING; + return false; + } chunk.state = StripingChunk.PENDING; ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer); - Callable readCallable = readCell(blockReaders[chunkIndex], - currentNodes[chunkIndex], blockReaderOffsets[chunkIndex], + Callable readCallable = readCell(readerInfo.reader, + readerInfo.datanode, readerInfo.blockReaderOffset, alignedStripe.getOffsetInBlock(), strategy, chunk.byteBuffer.remaining(), block.getBlock(), corruptedBlockMap); Future request = readingService.submit(readCallable); futures.put(request, chunkIndex); + return true; } @Override void updateState4SuccessRead(StripingChunkReadResult result) { Preconditions.checkArgument( result.state == StripingChunkReadResult.SUCCESSFUL); - blockReaderOffsets[result.index] = - alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock(); + blockReaders[result.index].setOffset(alignedStripe.getOffsetInBlock() + + alignedStripe.getSpanInBlock()); } @Override - void prepareDecodeInputs() throws IOException { + void prepareDecodeInputs() { if (decodeInputs == null) { decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum]; ByteBuffer cur = curStripeBuf.duplicate(); @@ -799,52 +868,58 @@ public class DFSStripedInputStream extends DFSInputStream { parityBlkNum); decodeInputs[decodeIndex] = cur.slice(); if (alignedStripe.chunks[i] == null) { - alignedStripe.chunks[i] = - new StripingChunk(decodeInputs[decodeIndex]); + alignedStripe.chunks[i] = new StripingChunk( + decodeInputs[decodeIndex]); } } } } @Override - void prepareParityChunk() throws IOException { - for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) { - if (alignedStripe.chunks[i] == null) { - final int decodeIndex = convertIndex4Decode(i, dataBlkNum, - parityBlkNum); - decodeInputs[decodeIndex] = ByteBuffer.allocateDirect( - (int) alignedStripe.range.spanInBlock); - alignedStripe.chunks[i] = - new StripingChunk(decodeInputs[decodeIndex]); - if (blockReaders[i] == null) { - prepareParityBlockReader(i); - } - break; - } + boolean prepareParityChunk(int index) throws IOException { + Preconditions.checkState(index >= dataBlkNum + && alignedStripe.chunks[index] == null); + if (blockReaders[index] != null && blockReaders[index].shouldSkip) { + alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING); + // we have failed the block reader before + return false; } + final int decodeIndex = convertIndex4Decode(index, dataBlkNum, + parityBlkNum); + decodeInputs[decodeIndex] = ByteBuffer.allocateDirect( + (int) alignedStripe.range.spanInBlock); + alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]); + if (blockReaders[index] == null && !prepareParityBlockReader(index)) { + alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING); + return false; + } + return true; } - private void prepareParityBlockReader(int i) throws IOException { + private boolean prepareParityBlockReader(int i) throws IOException { // prepare the block reader for the parity chunk LocatedBlock targetBlock = targetBlocks[i]; if (targetBlock != null) { final long offsetInBlock = alignedStripe.getOffsetInBlock(); - DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null); - if (retval != null) { - currentNodes[i] = retval.info; - blockReaders[i] = getBlockReaderWithRetry(targetBlock, + DNAddrPair dnInfo = getBestNodeDNAddrPair(targetBlock, null); + if (dnInfo != null) { + BlockReader reader = getBlockReaderWithRetry(targetBlock, offsetInBlock, targetBlock.getBlockSize() - offsetInBlock, - retval.addr, retval.storageType, retval.info, + dnInfo.addr, dnInfo.storageType, dnInfo.info, DFSStripedInputStream.this.getPos(), retry); - blockReaderOffsets[i] = offsetInBlock; + if (reader != null) { + blockReaders[i] = new BlockReaderInfo(reader, targetBlock, + dnInfo.info, offsetInBlock); + return true; + } } } + return false; } @Override void decode() { - // TODO no copy for data chunks. this depends on HADOOP-12047 for some - // decoders to work + // TODO no copy for data chunks. this depends on HADOOP-12047 final int span = (int) alignedStripe.getSpanInBlock(); for (int i = 0; i < alignedStripe.chunks.length; i++) { final int decodeIndex = convertIndex4Decode(i, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 579434b670..6bd5e1f010 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -83,6 +83,7 @@ public class StripedBlockUtil { LocatedBlock[] lbs = new LocatedBlock[dataBlkNum + parityBlkNum]; for (short i = 0; i < locatedBGSize; i++) { final int idx = bg.getBlockIndices()[i]; + // for now we do not use redundant replica of an internal block if (idx < (dataBlkNum + parityBlkNum) && lbs[idx] == null) { lbs[idx] = constructInternalBlock(bg, i, cellSize, dataBlkNum, idx); @@ -212,7 +213,9 @@ public class StripedBlockUtil { return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT); } } catch (ExecutionException e) { - DFSClient.LOG.warn("ExecutionException " + e); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("ExecutionException " + e); + } return new StripingChunkReadResult(futures.remove(future), StripingChunkReadResult.FAILED); } catch (CancellationException e) { @@ -623,7 +626,7 @@ public class StripedBlockUtil { cellSize, dataBlkNum, i); if (internalBlkLen <= s.getOffsetInBlock()) { Preconditions.checkState(s.chunks[i] == null); - s.chunks[i] = new StripingChunk(); // chunk state is set to ALLZERO + s.chunks[i] = new StripingChunk(StripingChunk.ALLZERO); } } } @@ -841,10 +844,10 @@ public class StripedBlockUtil { this.byteBuffer = buf; } - public StripingChunk() { + public StripingChunk(int state) { this.byteArray = null; this.byteBuffer = null; - this.state = ALLZERO; + this.state = state; } public void addByteArraySlice(int offset, int length) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java index 23697040db..815a50d80a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java @@ -18,9 +18,16 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.web.ByteRangeInputStream; +import org.junit.Assert; +import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Random; public class StripedFileTestUtil { @@ -56,4 +63,125 @@ public class StripedFileTestUtil { final int mod = 29; return (byte) (pos % mod + 1); } + + static void verifyLength(FileSystem fs, Path srcPath, int fileLength) + throws IOException { + FileStatus status = fs.getFileStatus(srcPath); + Assert.assertEquals("File length should be the same", fileLength, status.getLen()); + } + + static void verifyPread(FileSystem fs, Path srcPath, int fileLength, + byte[] expected, byte[] buf) throws IOException { + try (FSDataInputStream in = fs.open(srcPath)) { + int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102, + cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102, + cellSize * dataBlocks, fileLength - 102, fileLength - 1}; + for (int startOffset : startOffsets) { + startOffset = Math.max(0, Math.min(startOffset, fileLength - 1)); + int remaining = fileLength - startOffset; + in.readFully(startOffset, buf, 0, remaining); + for (int i = 0; i < remaining; i++) { + Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " + + "same", expected[startOffset + i], buf[i]); + } + } + } + } + + static void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength, + byte[] expected, byte[] buf) throws IOException { + try (FSDataInputStream in = fs.open(srcPath)) { + final byte[] result = new byte[fileLength]; + int readLen = 0; + int ret; + while ((ret = in.read(buf, 0, buf.length)) >= 0) { + System.arraycopy(buf, 0, result, readLen, ret); + readLen += ret; + } + Assert.assertEquals("The length of file should be the same to write size", + fileLength, readLen); + Assert.assertArrayEquals(expected, result); + } + } + + static void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength, + byte[] expected, ByteBuffer buf) throws IOException { + try (FSDataInputStream in = fs.open(srcPath)) { + ByteBuffer result = ByteBuffer.allocate(fileLength); + int readLen = 0; + int ret; + while ((ret = in.read(buf)) >= 0) { + readLen += ret; + buf.flip(); + result.put(buf); + buf.clear(); + } + Assert.assertEquals("The length of file should be the same to write size", + fileLength, readLen); + Assert.assertArrayEquals(expected, result.array()); + } + } + + static void verifySeek(FileSystem fs, Path srcPath, int fileLength) + throws IOException { + try (FSDataInputStream in = fs.open(srcPath)) { + // seek to 1/2 of content + int pos = fileLength / 2; + assertSeekAndRead(in, pos, fileLength); + + // seek to 1/3 of content + pos = fileLength / 3; + assertSeekAndRead(in, pos, fileLength); + + // seek to 0 pos + pos = 0; + assertSeekAndRead(in, pos, fileLength); + + if (fileLength > cellSize) { + // seek to cellSize boundary + pos = cellSize - 1; + assertSeekAndRead(in, pos, fileLength); + } + + if (fileLength > cellSize * dataBlocks) { + // seek to striped cell group boundary + pos = cellSize * dataBlocks - 1; + assertSeekAndRead(in, pos, fileLength); + } + + if (fileLength > blockSize * dataBlocks) { + // seek to striped block group boundary + pos = blockSize * dataBlocks - 1; + assertSeekAndRead(in, pos, fileLength); + } + + if (!(in.getWrappedStream() instanceof ByteRangeInputStream)) { + try { + in.seek(-1); + Assert.fail("Should be failed if seek to negative offset"); + } catch (EOFException e) { + // expected + } + + try { + in.seek(fileLength + 1); + Assert.fail("Should be failed if seek after EOF"); + } catch (EOFException e) { + // expected + } + } + } + } + + static void assertSeekAndRead(FSDataInputStream fsdis, int pos, + int writeBytes) throws IOException { + fsdis.seek(pos); + byte[] buf = new byte[writeBytes]; + int readLen = StripedFileTestUtil.readAll(fsdis, buf); + Assert.assertEquals(readLen, writeBytes - pos); + for (int i = 0; i < readLen; i++) { + Assert.assertEquals("Byte at " + i + " should be the same", + StripedFileTestUtil.getByte(pos + i), buf[i]); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java new file mode 100644 index 0000000000..4c2438d76b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs; + +/** + * Test reading a striped file when some of its blocks are missing (not included + * in the block locations returned by the NameNode). + */ +public class TestReadStripedFileWithMissingBlocks { + public static final Log LOG = LogFactory + .getLog(TestReadStripedFileWithMissingBlocks.class); + private static MiniDFSCluster cluster; + private static FileSystem fs; + private static Configuration conf = new HdfsConfiguration(); + private final int fileLength = blockSize * dataBlocks + 123; + + @Before + public void setup() throws IOException { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.getFileSystem().getClient().createErasureCodingZone("/", + null, cellSize); + fs = cluster.getFileSystem(); + } + + @After + public void tearDown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testReadFileWithMissingBlocks1() throws IOException { + readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 0); + } + + @Test + public void testReadFileWithMissingBlocks2() throws IOException { + readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 1); + } + + @Test + public void testReadFileWithMissingBlocks3() throws IOException { + readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 2); + } + + @Test + public void testReadFileWithMissingBlocks4() throws IOException { + readFileWithMissingBlocks(new Path("/foo"), fileLength, 2, 0); + } + + @Test + public void testReadFileWithMissingBlocks5() throws IOException { + readFileWithMissingBlocks(new Path("/foo"), fileLength, 2, 1); + } + + @Test + public void testReadFileWithMissingBlocks6() throws IOException { + readFileWithMissingBlocks(new Path("/foo"), fileLength, 3, 0); + } + + private void readFileWithMissingBlocks(Path srcPath, int fileLength, + int missingDataNum, int missingParityNum) + throws IOException { + LOG.info("readFileWithMissingBlocks: (" + missingDataNum + "," + + missingParityNum + ")"); + final byte[] expected = StripedFileTestUtil.generateBytes(fileLength); + DFSTestUtil.writeFile(fs, srcPath, new String(expected)); + StripedFileTestUtil.verifyLength(fs, srcPath, fileLength); + int dataBlocks = (fileLength - 1) / cellSize + 1; + BlockLocation[] locs = fs.getFileBlockLocations(srcPath, 0, cellSize); + + int[] missingDataNodes = new int[missingDataNum + missingParityNum]; + for (int i = 0; i < missingDataNum; i++) { + missingDataNodes[i] = i; + } + for (int i = 0; i < missingParityNum; i++) { + missingDataNodes[i + missingDataNum] = i + + Math.min(StripedFileTestUtil.dataBlocks, dataBlocks); + } + stopDataNodes(locs, missingDataNodes); + + // make sure there are missing block locations + BlockLocation[] newLocs = fs.getFileBlockLocations(srcPath, 0, cellSize); + Assert.assertTrue(newLocs[0].getNames().length < locs[0].getNames().length); + + byte[] smallBuf = new byte[1024]; + byte[] largeBuf = new byte[fileLength + 100]; + StripedFileTestUtil.verifySeek(fs, srcPath, fileLength); + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, + smallBuf); + StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf); + + // delete the file + fs.delete(srcPath, true); + } + + private void stopDataNodes(BlockLocation[] locs, int[] datanodes) + throws IOException { + if (locs != null && locs.length > 0) { + for (int failedDNIdx : datanodes) { + String name = (locs[0].getNames())[failedDNIdx]; + for (DataNode dn : cluster.getDataNodes()) { + int port = dn.getXferPort(); + if (name.contains(Integer.toString(port))) { + dn.shutdown(); + cluster.setDataNodeDead(dn.getDatanodeId()); + LOG.info("stop datanode " + failedDNIdx); + break; + } + } + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java index 272650d358..2f9322dcdd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java @@ -21,20 +21,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.web.ByteRangeInputStream; import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; @@ -48,11 +43,10 @@ public class TestWriteReadStripedFile { public static final Log LOG = LogFactory.getLog(TestWriteReadStripedFile.class); private static MiniDFSCluster cluster; private static FileSystem fs; - private static Configuration conf; + private static Configuration conf = new HdfsConfiguration(); @Before public void setup() throws IOException { - conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster.getFileSystem().getClient().createErasureCodingZone("/", @@ -175,18 +169,6 @@ public class TestWriteReadStripedFile { + cellSize + 123, true); } - private void assertSeekAndRead(FSDataInputStream fsdis, int pos, - int writeBytes) throws IOException { - fsdis.seek(pos); - byte[] buf = new byte[writeBytes]; - int readLen = StripedFileTestUtil.readAll(fsdis, buf); - Assert.assertEquals(readLen, writeBytes - pos); - for (int i = 0; i < readLen; i++) { - Assert.assertEquals("Byte at " + i + " should be the same", - StripedFileTestUtil.getByte(pos + i), buf[i]); - } - } - private void testOneFileUsingDFSStripedInputStream(String src, int fileLength) throws IOException { testOneFileUsingDFSStripedInputStream(src, fileLength, false); @@ -198,7 +180,7 @@ public class TestWriteReadStripedFile { Path srcPath = new Path(src); DFSTestUtil.writeFile(fs, srcPath, new String(expected)); - verifyLength(fs, srcPath, fileLength); + StripedFileTestUtil.verifyLength(fs, srcPath, fileLength); if (withDataNodeFailure) { int dnIndex = 1; // TODO: StripedFileTestUtil.random.nextInt(dataBlocks); @@ -208,14 +190,16 @@ public class TestWriteReadStripedFile { byte[] smallBuf = new byte[1024]; byte[] largeBuf = new byte[fileLength + 100]; - verifyPread(fs, srcPath, fileLength, expected, largeBuf); + StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf); - verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf); - verifySeek(fs, srcPath, fileLength); - verifyStatefulRead(fs, srcPath, fileLength, expected, + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, + largeBuf); + StripedFileTestUtil.verifySeek(fs, srcPath, fileLength); + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, ByteBuffer.allocate(fileLength + 100)); - verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf); - verifyStatefulRead(fs, srcPath, fileLength, expected, + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, + smallBuf); + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, ByteBuffer.allocate(1024)); } @@ -241,130 +225,18 @@ public class TestWriteReadStripedFile { final byte[] expected = StripedFileTestUtil.generateBytes(fileLength); FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME); - Path srcPath = new Path("/testWriteReadUsingWebHdfs_stripe"); + Path srcPath = new Path("/testWriteReadUsingWebHdfs"); DFSTestUtil.writeFile(fs, srcPath, new String(expected)); - verifyLength(fs, srcPath, fileLength); + StripedFileTestUtil.verifyLength(fs, srcPath, fileLength); byte[] smallBuf = new byte[1024]; byte[] largeBuf = new byte[fileLength + 100]; - verifyPread(fs, srcPath, fileLength, expected, largeBuf); + StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf); - verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf); - verifySeek(fs, srcPath, fileLength); - verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf); - //webhdfs doesn't support bytebuffer read - } - - void verifyLength(FileSystem fs, Path srcPath, int fileLength) - throws IOException { - FileStatus status = fs.getFileStatus(srcPath); - Assert.assertEquals("File length should be the same", - fileLength, status.getLen()); - } - - void verifyPread(FileSystem fs, Path srcPath, int fileLength, - byte[] expected, byte[] buf) throws IOException { - try (FSDataInputStream in = fs.open(srcPath)) { - int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102, - cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102, - cellSize * dataBlocks, fileLength - 102, fileLength - 1}; - for (int startOffset : startOffsets) { - startOffset = Math.max(0, Math.min(startOffset, fileLength - 1)); - int remaining = fileLength - startOffset; - in.readFully(startOffset, buf, 0, remaining); - for (int i = 0; i < remaining; i++) { - Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " + - "same", expected[startOffset + i], buf[i]); - } - } - } - } - - void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength, - byte[] expected, byte[] buf) throws IOException { - try (FSDataInputStream in = fs.open(srcPath)) { - final byte[] result = new byte[fileLength]; - int readLen = 0; - int ret; - while ((ret = in.read(buf, 0, buf.length)) >= 0) { - System.arraycopy(buf, 0, result, readLen, ret); - readLen += ret; - } - Assert.assertEquals("The length of file should be the same to write size", - fileLength, readLen); - Assert.assertArrayEquals(expected, result); - } - } - - - void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength, - byte[] expected, ByteBuffer buf) throws IOException { - try (FSDataInputStream in = fs.open(srcPath)) { - ByteBuffer result = ByteBuffer.allocate(fileLength); - int readLen = 0; - int ret; - while ((ret = in.read(buf)) >= 0) { - readLen += ret; - buf.flip(); - result.put(buf); - buf.clear(); - } - Assert.assertEquals("The length of file should be the same to write size", - fileLength, readLen); - Assert.assertArrayEquals(expected, result.array()); - } - } - - - void verifySeek(FileSystem fs, Path srcPath, int fileLength) - throws IOException { - try (FSDataInputStream in = fs.open(srcPath)) { - // seek to 1/2 of content - int pos = fileLength / 2; - assertSeekAndRead(in, pos, fileLength); - - // seek to 1/3 of content - pos = fileLength / 3; - assertSeekAndRead(in, pos, fileLength); - - // seek to 0 pos - pos = 0; - assertSeekAndRead(in, pos, fileLength); - - if (fileLength > cellSize) { - // seek to cellSize boundary - pos = cellSize - 1; - assertSeekAndRead(in, pos, fileLength); - } - - if (fileLength > cellSize * dataBlocks) { - // seek to striped cell group boundary - pos = cellSize * dataBlocks - 1; - assertSeekAndRead(in, pos, fileLength); - } - - if (fileLength > blockSize * dataBlocks) { - // seek to striped block group boundary - pos = blockSize * dataBlocks - 1; - assertSeekAndRead(in, pos, fileLength); - } - - if (!(in.getWrappedStream() instanceof ByteRangeInputStream)) { - try { - in.seek(-1); - Assert.fail("Should be failed if seek to negative offset"); - } catch (EOFException e) { - // expected - } - - try { - in.seek(fileLength + 1); - Assert.fail("Should be failed if seek after EOF"); - } catch (EOFException e) { - // expected - } - } - } + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf); + StripedFileTestUtil.verifySeek(fs, srcPath, fileLength); + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf); + // webhdfs doesn't support bytebuffer read } }