HDFS-7678. Erasure coding: DFSInputStream with decode functionality (pread). Contributed by Zhe Zhang.
This commit is contained in:
parent
6bacaa9a52
commit
8d3030f064
@ -195,3 +195,6 @@
|
|||||||
|
|
||||||
HDFS-8355. Erasure Coding: Refactor BlockInfo and BlockInfoUnderConstruction.
|
HDFS-8355. Erasure Coding: Refactor BlockInfo and BlockInfoUnderConstruction.
|
||||||
(Tsz Wo Nicholas Sze via jing9)
|
(Tsz Wo Nicholas Sze via jing9)
|
||||||
|
|
||||||
|
HDFS-7678. Erasure coding: DFSInputStream with decode functionality (pread).
|
||||||
|
(Zhe Zhang)
|
||||||
|
@ -21,15 +21,27 @@
|
|||||||
import org.apache.hadoop.fs.ChecksumException;
|
import org.apache.hadoop.fs.ChecksumException;
|
||||||
import org.apache.hadoop.fs.ReadOption;
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.*;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||||
import org.apache.hadoop.io.ByteBufferPool;
|
import org.apache.hadoop.io.ByteBufferPool;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
|
|
||||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions;
|
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions;
|
||||||
|
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.divideByteRangeIntoStripes;
|
||||||
|
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs;
|
||||||
|
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.decodeAndFillBuffer;
|
||||||
|
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getNextCompletedStripedRead;
|
||||||
|
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
|
||||||
|
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
|
||||||
|
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
|
||||||
|
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
|
||||||
|
|
||||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
|
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.htrace.Span;
|
import org.apache.htrace.Span;
|
||||||
import org.apache.htrace.Trace;
|
import org.apache.htrace.Trace;
|
||||||
@ -37,10 +49,12 @@
|
|||||||
|
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.concurrent.CompletionService;
|
import java.util.concurrent.CompletionService;
|
||||||
@ -51,7 +65,6 @@
|
|||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
|
||||||
/******************************************************************************
|
/******************************************************************************
|
||||||
* DFSStripedInputStream reads from striped block groups, illustrated below:
|
* DFSStripedInputStream reads from striped block groups, illustrated below:
|
||||||
*
|
*
|
||||||
@ -125,6 +138,7 @@ boolean include(long pos) {
|
|||||||
private final short parityBlkNum;
|
private final short parityBlkNum;
|
||||||
/** the buffer for a complete stripe */
|
/** the buffer for a complete stripe */
|
||||||
private ByteBuffer curStripeBuf;
|
private ByteBuffer curStripeBuf;
|
||||||
|
private final ECSchema schema;
|
||||||
/**
|
/**
|
||||||
* indicate the start/end offset of the current buffered stripe in the
|
* indicate the start/end offset of the current buffered stripe in the
|
||||||
* block group
|
* block group
|
||||||
@ -137,6 +151,7 @@ boolean include(long pos) {
|
|||||||
super(dfsClient, src, verifyChecksum);
|
super(dfsClient, src, verifyChecksum);
|
||||||
|
|
||||||
assert schema != null;
|
assert schema != null;
|
||||||
|
this.schema = schema;
|
||||||
cellSize = schema.getChunkSize();
|
cellSize = schema.getChunkSize();
|
||||||
dataBlkNum = (short) schema.getNumDataUnits();
|
dataBlkNum = (short) schema.getNumDataUnits();
|
||||||
parityBlkNum = (short) schema.getNumParityUnits();
|
parityBlkNum = (short) schema.getNumParityUnits();
|
||||||
@ -472,12 +487,10 @@ private int copy(ReaderStrategy strategy, int offset, int length) {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException {
|
protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException {
|
||||||
LocatedBlock lb = super.getBlockAt(blkStartOffset);
|
LocatedBlock lb = getBlockGroupAt(blkStartOffset);
|
||||||
assert lb instanceof LocatedStripedBlock : "NameNode should return a " +
|
|
||||||
"LocatedStripedBlock for a striped file";
|
|
||||||
|
|
||||||
int idx = (int) (((blkStartOffset - lb.getStartOffset()) / cellSize)
|
int idx = (int) ((blkStartOffset - lb.getStartOffset())
|
||||||
% dataBlkNum);
|
% (dataBlkNum + parityBlkNum));
|
||||||
// If indexing information is returned, iterate through the index array
|
// If indexing information is returned, iterate through the index array
|
||||||
// to find the entry for position idx in the group
|
// to find the entry for position idx in the group
|
||||||
LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
|
LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
|
||||||
@ -509,48 +522,121 @@ protected void fetchBlockByteRange(long blockStartOffset, long start,
|
|||||||
long end, byte[] buf, int offset,
|
long end, byte[] buf, int offset,
|
||||||
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Map<Future<Void>, Integer> futures = new HashMap<>();
|
|
||||||
CompletionService<Void> stripedReadsService =
|
|
||||||
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
|
|
||||||
int len = (int) (end - start + 1);
|
|
||||||
|
|
||||||
// Refresh the striped block group
|
// Refresh the striped block group
|
||||||
LocatedStripedBlock blockGroup = getBlockGroupAt(blockStartOffset);
|
LocatedStripedBlock blockGroup = getBlockGroupAt(blockStartOffset);
|
||||||
|
|
||||||
|
AlignedStripe[] stripes = divideByteRangeIntoStripes(schema, blockGroup,
|
||||||
|
start, end, buf, offset);
|
||||||
|
for (AlignedStripe stripe : stripes) {
|
||||||
|
fetchOneStripe(blockGroup, buf, stripe, corruptedBlockMap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Planning the portion of I/O for each shard
|
private void fetchOneStripe(LocatedStripedBlock blockGroup,
|
||||||
ReadPortion[] readPortions = planReadPortions(dataBlkNum, cellSize, start,
|
byte[] buf, AlignedStripe alignedStripe, Map<ExtendedBlock,
|
||||||
len, offset);
|
Set<DatanodeInfo>> corruptedBlockMap) throws IOException {
|
||||||
|
Map<Future<Void>, Integer> futures = new HashMap<>();
|
||||||
|
CompletionService<Void> service =
|
||||||
|
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
|
||||||
|
if (alignedStripe.getSpanInBlock() == 0) {
|
||||||
|
DFSClient.LOG.warn("Trying to read an empty stripe from" + blockGroup);
|
||||||
|
return;
|
||||||
|
}
|
||||||
// Parse group to get chosen DN location
|
// Parse group to get chosen DN location
|
||||||
LocatedBlock[] blks = StripedBlockUtil.
|
LocatedBlock[] blks = StripedBlockUtil.
|
||||||
parseStripedBlockGroup(blockGroup, cellSize, dataBlkNum, parityBlkNum);
|
parseStripedBlockGroup(blockGroup, cellSize, dataBlkNum, parityBlkNum);
|
||||||
|
|
||||||
for (short i = 0; i < dataBlkNum; i++) {
|
for (short i = 0; i < dataBlkNum; i++) {
|
||||||
ReadPortion rp = readPortions[i];
|
if (alignedStripe.chunks[i] != null
|
||||||
if (rp.getReadLength() <= 0) {
|
&& alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
|
||||||
continue;
|
fetchOneStripingChunk(futures, service, blks[i], alignedStripe, i,
|
||||||
|
corruptedBlockMap);
|
||||||
}
|
}
|
||||||
DatanodeInfo loc = blks[i].getLocations()[0];
|
}
|
||||||
StorageType type = blks[i].getStorageTypes()[0];
|
// Input buffers for potential decode operation, which remains null until
|
||||||
|
// first read failure
|
||||||
|
byte[][] decodeInputs = null;
|
||||||
|
while (!futures.isEmpty()) {
|
||||||
|
try {
|
||||||
|
StripingChunkReadResult r = getNextCompletedStripedRead(
|
||||||
|
service, futures, 0);
|
||||||
|
if (DFSClient.LOG.isDebugEnabled()) {
|
||||||
|
DFSClient.LOG.debug("Read task returned: " + r + ", for stripe " + alignedStripe);
|
||||||
|
}
|
||||||
|
StripingChunk returnedChunk = alignedStripe.chunks[r.index];
|
||||||
|
Preconditions.checkNotNull(returnedChunk);
|
||||||
|
Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
|
||||||
|
if (r.state == StripingChunkReadResult.SUCCESSFUL) {
|
||||||
|
returnedChunk.state = StripingChunk.FETCHED;
|
||||||
|
alignedStripe.fetchedChunksNum++;
|
||||||
|
if (alignedStripe.fetchedChunksNum == dataBlkNum) {
|
||||||
|
clearFutures(futures.keySet());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
returnedChunk.state = StripingChunk.MISSING;
|
||||||
|
alignedStripe.missingChunksNum++;
|
||||||
|
if (alignedStripe.missingChunksNum > parityBlkNum) {
|
||||||
|
clearFutures(futures.keySet());
|
||||||
|
throw new IOException("Too many blocks are missing: " + alignedStripe);
|
||||||
|
}
|
||||||
|
// When seeing first missing block, initialize decode input buffers
|
||||||
|
if (decodeInputs == null) {
|
||||||
|
decodeInputs = initDecodeInputs(alignedStripe, dataBlkNum, parityBlkNum);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < alignedStripe.chunks.length; i++) {
|
||||||
|
StripingChunk chunk = alignedStripe.chunks[i];
|
||||||
|
Preconditions.checkNotNull(chunk);
|
||||||
|
if (chunk.state == StripingChunk.REQUESTED && i <= dataBlkNum) {
|
||||||
|
fetchOneStripingChunk(futures, service, blks[i], alignedStripe, i,
|
||||||
|
corruptedBlockMap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
String err = "Read request interrupted";
|
||||||
|
DFSClient.LOG.error(err);
|
||||||
|
clearFutures(futures.keySet());
|
||||||
|
// Don't decode if read interrupted
|
||||||
|
throw new InterruptedIOException(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (alignedStripe.missingChunksNum > 0) {
|
||||||
|
decodeAndFillBuffer(decodeInputs, buf, alignedStripe,
|
||||||
|
dataBlkNum, parityBlkNum);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedule a single read request to an internal block
|
||||||
|
* @param block The internal block
|
||||||
|
* @param index Index of the internal block in the group
|
||||||
|
* @param corruptedBlockMap Map of corrupted blocks
|
||||||
|
*/
|
||||||
|
private void fetchOneStripingChunk(Map<Future<Void>, Integer> futures,
|
||||||
|
final CompletionService<Void> service, final LocatedBlock block,
|
||||||
|
final AlignedStripe alignedStripe, final int index,
|
||||||
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
|
||||||
|
DatanodeInfo loc = block.getLocations()[0];
|
||||||
|
StorageType type = block.getStorageTypes()[0];
|
||||||
DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
|
DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
|
||||||
loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
|
loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
|
||||||
type);
|
type);
|
||||||
|
StripingChunk chunk = alignedStripe.chunks[index];
|
||||||
|
chunk.state = StripingChunk.PENDING;
|
||||||
Callable<Void> readCallable = getFromOneDataNode(dnAddr,
|
Callable<Void> readCallable = getFromOneDataNode(dnAddr,
|
||||||
blks[i].getStartOffset(), rp.getStartOffsetInBlock(),
|
block.getStartOffset(), alignedStripe.getOffsetInBlock(),
|
||||||
rp.getStartOffsetInBlock() + rp.getReadLength() - 1, buf,
|
alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1, chunk.buf,
|
||||||
rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
|
chunk.getOffsets(), chunk.getLengths(),
|
||||||
Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
|
corruptedBlockMap, index);
|
||||||
DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
|
Future<Void> getFromDNRequest = service.submit(readCallable);
|
||||||
futures.put(getFromDNRequest, (int) i);
|
if (DFSClient.LOG.isDebugEnabled()) {
|
||||||
}
|
DFSClient.LOG.debug("Submitting striped read request for " + index +
|
||||||
while (!futures.isEmpty()) {
|
". Info of the block: " + block + ", offset in block is " +
|
||||||
try {
|
alignedStripe.getOffsetInBlock() + ", end is " +
|
||||||
waitNextCompletion(stripedReadsService, futures);
|
(alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1));
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
// Ignore and retry
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
futures.put(getFromDNRequest, index);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
|
private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
|
||||||
@ -609,4 +695,12 @@ public synchronized void releaseBuffer(ByteBuffer buffer) {
|
|||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
"Not support enhanced byte buffer access.");
|
"Not support enhanced byte buffer access.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** A variation to {@link DFSInputStream#cancelAll} */
|
||||||
|
private void clearFutures(Collection<Future<Void>> futures) {
|
||||||
|
for (Future<Void> future : futures) {
|
||||||
|
future.cancel(false);
|
||||||
|
}
|
||||||
|
futures.clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -67,7 +67,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
|
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
|
||||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripedReadResult;
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
|
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
|
||||||
@ -462,10 +462,10 @@ private int readMinimumStripedData4Recovery(int[] success) {
|
|||||||
int nsuccess = 0;
|
int nsuccess = 0;
|
||||||
while (!futures.isEmpty()) {
|
while (!futures.isEmpty()) {
|
||||||
try {
|
try {
|
||||||
StripedReadResult result =
|
StripingChunkReadResult result =
|
||||||
StripedBlockUtil.getNextCompletedStripedRead(
|
StripedBlockUtil.getNextCompletedStripedRead(
|
||||||
readService, futures, STRIPED_READ_THRESHOLD_MILLIS);
|
readService, futures, STRIPED_READ_THRESHOLD_MILLIS);
|
||||||
if (result.state == StripedReadResult.SUCCESSFUL) {
|
if (result.state == StripingChunkReadResult.SUCCESSFUL) {
|
||||||
success[nsuccess++] = result.index;
|
success[nsuccess++] = result.index;
|
||||||
if (nsuccess >= dataBlkNum) {
|
if (nsuccess >= dataBlkNum) {
|
||||||
// cancel remaining reads if we read successfully from minimum
|
// cancel remaining reads if we read successfully from minimum
|
||||||
@ -474,14 +474,14 @@ private int readMinimumStripedData4Recovery(int[] success) {
|
|||||||
futures.clear();
|
futures.clear();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else if (result.state == StripedReadResult.FAILED) {
|
} else if (result.state == StripingChunkReadResult.FAILED) {
|
||||||
// If read failed for some source, we should not use it anymore
|
// If read failed for some source, we should not use it anymore
|
||||||
// and schedule read from a new source.
|
// and schedule read from a new source.
|
||||||
StripedReader failedReader = stripedReaders.get(result.index);
|
StripedReader failedReader = stripedReaders.get(result.index);
|
||||||
closeBlockReader(failedReader.blockReader);
|
closeBlockReader(failedReader.blockReader);
|
||||||
failedReader.blockReader = null;
|
failedReader.blockReader = null;
|
||||||
scheduleNewRead(used);
|
scheduleNewRead(used);
|
||||||
} else if (result.state == StripedReadResult.TIMEOUT) {
|
} else if (result.state == StripingChunkReadResult.TIMEOUT) {
|
||||||
// If timeout, we also schedule a new read.
|
// If timeout, we also schedule a new read.
|
||||||
scheduleNewRead(used);
|
scheduleNewRead(used);
|
||||||
}
|
}
|
||||||
|
@ -22,16 +22,18 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
|
import org.apache.hadoop.hdfs.DFSStripedOutputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.CancellationException;
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.CompletionService;
|
import java.util.concurrent.CompletionService;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
@ -85,7 +87,7 @@ public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
|
|||||||
new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
|
new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
|
||||||
new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
|
new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
|
||||||
new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
|
new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
|
||||||
bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(),
|
bg.getStartOffset() + idxInBlockGroup, bg.isCorrupt(),
|
||||||
null);
|
null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -238,33 +240,37 @@ public static ReadPortion[] planReadPortions(final int dataBlkNum,
|
|||||||
/**
|
/**
|
||||||
* Get the next completed striped read task
|
* Get the next completed striped read task
|
||||||
*
|
*
|
||||||
* @return {@link StripedReadResult} indicating the status of the read task
|
* @return {@link StripingChunkReadResult} indicating the status of the read task
|
||||||
* succeeded, and the block index of the task. If the method times
|
* succeeded, and the block index of the task. If the method times
|
||||||
* out without getting any completed read tasks, -1 is returned as
|
* out without getting any completed read tasks, -1 is returned as
|
||||||
* block index.
|
* block index.
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
public static StripedReadResult getNextCompletedStripedRead(
|
public static StripingChunkReadResult getNextCompletedStripedRead(
|
||||||
CompletionService<Void> readService, Map<Future<Void>, Integer> futures,
|
CompletionService<Void> readService, Map<Future<Void>, Integer> futures,
|
||||||
final long threshold) throws InterruptedException {
|
final long threshold) throws InterruptedException {
|
||||||
Preconditions.checkArgument(!futures.isEmpty());
|
Preconditions.checkArgument(!futures.isEmpty());
|
||||||
Preconditions.checkArgument(threshold > 0);
|
|
||||||
Future<Void> future = null;
|
Future<Void> future = null;
|
||||||
try {
|
try {
|
||||||
|
if (threshold > 0) {
|
||||||
future = readService.poll(threshold, TimeUnit.MILLISECONDS);
|
future = readService.poll(threshold, TimeUnit.MILLISECONDS);
|
||||||
|
} else {
|
||||||
|
future = readService.take();
|
||||||
|
}
|
||||||
if (future != null) {
|
if (future != null) {
|
||||||
future.get();
|
future.get();
|
||||||
return new StripedReadResult(futures.remove(future),
|
return new StripingChunkReadResult(futures.remove(future),
|
||||||
StripedReadResult.SUCCESSFUL);
|
StripingChunkReadResult.SUCCESSFUL);
|
||||||
} else {
|
} else {
|
||||||
return new StripedReadResult(StripedReadResult.TIMEOUT);
|
return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT);
|
||||||
}
|
}
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
return new StripedReadResult(futures.remove(future),
|
DFSClient.LOG.error("ExecutionException " + e);
|
||||||
StripedReadResult.FAILED);
|
return new StripingChunkReadResult(futures.remove(future),
|
||||||
|
StripingChunkReadResult.FAILED);
|
||||||
} catch (CancellationException e) {
|
} catch (CancellationException e) {
|
||||||
return new StripedReadResult(futures.remove(future),
|
return new StripingChunkReadResult(futures.remove(future),
|
||||||
StripedReadResult.CANCELLED);
|
StripingChunkReadResult.CANCELLED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -291,26 +297,247 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class represents the portion of I/O associated with each block in the
|
* Initialize the decoding input buffers based on the chunk states in an
|
||||||
* striped block group.
|
* AlignedStripe
|
||||||
*/
|
*/
|
||||||
public static class ReadPortion {
|
public static byte[][] initDecodeInputs(AlignedStripe alignedStripe,
|
||||||
|
int dataBlkNum, int parityBlkNum) {
|
||||||
|
byte[][] decodeInputs =
|
||||||
|
new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()];
|
||||||
|
for (int i = 0; i < alignedStripe.chunks.length; i++) {
|
||||||
|
StripingChunk chunk = alignedStripe.chunks[i];
|
||||||
|
if (chunk == null) {
|
||||||
|
alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
|
||||||
|
alignedStripe.chunks[i].offsetsInBuf.add(0);
|
||||||
|
alignedStripe.chunks[i].lengthsInBuf.add((int) alignedStripe.getSpanInBlock());
|
||||||
|
} else if (chunk.state == StripingChunk.FETCHED) {
|
||||||
|
int posInBuf = 0;
|
||||||
|
for (int j = 0; j < chunk.offsetsInBuf.size(); j++) {
|
||||||
|
System.arraycopy(chunk.buf, chunk.offsetsInBuf.get(j),
|
||||||
|
decodeInputs[i], posInBuf, chunk.lengthsInBuf.get(j));
|
||||||
|
posInBuf += chunk.lengthsInBuf.get(j);
|
||||||
|
}
|
||||||
|
} else if (chunk.state == StripingChunk.ALLZERO) {
|
||||||
|
Arrays.fill(decodeInputs[i], (byte)0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return decodeInputs;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* startOffsetInBlock
|
* Decode based on the given input buffers and schema
|
||||||
* |
|
*/
|
||||||
* v
|
public static void decodeAndFillBuffer(final byte[][] decodeInputs, byte[] buf,
|
||||||
* |<-lengths[0]->|<- lengths[1] ->|<-lengths[2]->|
|
AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum) {
|
||||||
|
int[] decodeIndices = new int[parityBlkNum];
|
||||||
|
int pos = 0;
|
||||||
|
for (int i = 0; i < alignedStripe.chunks.length; i++) {
|
||||||
|
if (alignedStripe.chunks[i].state != StripingChunk.FETCHED &&
|
||||||
|
alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
|
||||||
|
decodeIndices[pos++] = i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[][] outputs = new byte[parityBlkNum][(int) alignedStripe.getSpanInBlock()];
|
||||||
|
RSRawDecoder rsRawDecoder = new RSRawDecoder();
|
||||||
|
rsRawDecoder.initialize(dataBlkNum, parityBlkNum, (int) alignedStripe.getSpanInBlock());
|
||||||
|
rsRawDecoder.decode(decodeInputs, decodeIndices, outputs);
|
||||||
|
|
||||||
|
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
|
||||||
|
StripingChunk chunk = alignedStripe.chunks[i];
|
||||||
|
if (chunk.state == StripingChunk.MISSING) {
|
||||||
|
int srcPos = 0;
|
||||||
|
for (int j = 0; j < chunk.offsetsInBuf.size(); j++) {
|
||||||
|
//TODO: workaround (filling fixed bytes), to remove after HADOOP-11938
|
||||||
|
// System.arraycopy(outputs[i], srcPos, buf, chunk.offsetsInBuf.get(j),
|
||||||
|
// chunk.lengthsInBuf.get(j));
|
||||||
|
Arrays.fill(buf, chunk.offsetsInBuf.get(j),
|
||||||
|
chunk.offsetsInBuf.get(j) + chunk.lengthsInBuf.get(j), (byte)7);
|
||||||
|
srcPos += chunk.lengthsInBuf.get(j);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method divides a requested byte range into an array of
|
||||||
|
* {@link AlignedStripe}
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* At most 5 stripes will be generated from each logical range
|
||||||
|
* TODO: cleanup and get rid of planReadPortions
|
||||||
|
*/
|
||||||
|
public static AlignedStripe[] divideByteRangeIntoStripes (
|
||||||
|
ECSchema ecSchema, LocatedStripedBlock blockGroup, long start, long end,
|
||||||
|
byte[] buf, int offsetInBuf) {
|
||||||
|
// TODO: change ECSchema naming to use cell size instead of chunk size
|
||||||
|
|
||||||
|
// Step 0: analyze range and calculate basic parameters
|
||||||
|
int cellSize = ecSchema.getChunkSize();
|
||||||
|
int dataBlkNum = ecSchema.getNumDataUnits();
|
||||||
|
int len = (int) (end - start + 1);
|
||||||
|
int firstCellIdxInBG = (int) (start / cellSize);
|
||||||
|
int lastCellIdxInBG = (int) (end / cellSize);
|
||||||
|
int firstCellSize = Math.min(cellSize - (int) (start % cellSize), len);
|
||||||
|
long firstCellOffsetInBlk = start % cellSize;
|
||||||
|
int lastCellSize = lastCellIdxInBG == firstCellIdxInBG ?
|
||||||
|
firstCellSize : (int) (end % cellSize) + 1;
|
||||||
|
|
||||||
|
// Step 1: get the unmerged ranges on each internal block
|
||||||
|
// TODO: StripingCell should carry info on size and start offset (HDFS-8320)
|
||||||
|
VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema,
|
||||||
|
firstCellIdxInBG, lastCellIdxInBG, firstCellSize, firstCellOffsetInBlk,
|
||||||
|
lastCellSize);
|
||||||
|
|
||||||
|
// Step 2: merge into at most 5 stripes
|
||||||
|
AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges);
|
||||||
|
|
||||||
|
// Step 3: calculate each chunk's position in destination buffer
|
||||||
|
calcualteChunkPositionsInBuf(ecSchema, blockGroup, buf, offsetInBuf,
|
||||||
|
firstCellIdxInBG, lastCellIdxInBG, firstCellSize, firstCellOffsetInBlk,
|
||||||
|
lastCellSize, stripes);
|
||||||
|
|
||||||
|
// Step 4: prepare ALLZERO blocks
|
||||||
|
prepareAllZeroChunks(blockGroup, buf, stripes, cellSize, dataBlkNum);
|
||||||
|
|
||||||
|
return stripes;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static VerticalRange[] getRangesForInternalBlocks (ECSchema ecSchema,
|
||||||
|
int firstCellIdxInBG, int lastCellIdxInBG, int firstCellSize,
|
||||||
|
long firstCellOffsetInBlk, int lastCellSize) {
|
||||||
|
int cellSize = ecSchema.getChunkSize();
|
||||||
|
int dataBlkNum = ecSchema.getNumDataUnits();
|
||||||
|
|
||||||
|
StripingCell firstCell = new StripingCell(ecSchema, firstCellIdxInBG);
|
||||||
|
StripingCell lastCell = new StripingCell(ecSchema, lastCellIdxInBG);
|
||||||
|
|
||||||
|
VerticalRange ranges[] = new VerticalRange[dataBlkNum];
|
||||||
|
ranges[firstCell.idxInStripe] =
|
||||||
|
new VerticalRange(firstCellOffsetInBlk, firstCellSize);
|
||||||
|
for (int i = firstCellIdxInBG + 1; i < lastCellIdxInBG; i++) {
|
||||||
|
// iterate through all cells and update the list of StripeRanges
|
||||||
|
StripingCell cell = new StripingCell(ecSchema, i);
|
||||||
|
if (ranges[cell.idxInStripe] == null) {
|
||||||
|
ranges[cell.idxInStripe] = new VerticalRange(
|
||||||
|
cell.idxInInternalBlk * cellSize, cellSize);
|
||||||
|
} else {
|
||||||
|
ranges[cell.idxInStripe].spanInBlock += cellSize;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (ranges[lastCell.idxInStripe] == null) {
|
||||||
|
ranges[lastCell.idxInStripe] = new VerticalRange(
|
||||||
|
lastCell.idxInInternalBlk * cellSize, lastCellSize);
|
||||||
|
} else if (lastCell.idxInBlkGroup != firstCell.idxInBlkGroup) {
|
||||||
|
ranges[lastCell.idxInStripe].spanInBlock += lastCellSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ranges;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static AlignedStripe[] mergeRangesForInternalBlocks(ECSchema ecSchema,
|
||||||
|
VerticalRange[] ranges) {
|
||||||
|
int dataBlkNum = ecSchema.getNumDataUnits();
|
||||||
|
int parityBlkNum = ecSchema.getNumParityUnits();
|
||||||
|
List<AlignedStripe> stripes = new ArrayList<>();
|
||||||
|
SortedSet<Long> stripePoints = new TreeSet<>();
|
||||||
|
for (VerticalRange r : ranges) {
|
||||||
|
if (r != null) {
|
||||||
|
stripePoints.add(r.offsetInBlock);
|
||||||
|
stripePoints.add(r.offsetInBlock + r.spanInBlock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
long prev = -1;
|
||||||
|
for (long point : stripePoints) {
|
||||||
|
if (prev >= 0) {
|
||||||
|
stripes.add(new AlignedStripe(prev, point - prev,
|
||||||
|
dataBlkNum + parityBlkNum));
|
||||||
|
}
|
||||||
|
prev = point;
|
||||||
|
}
|
||||||
|
return stripes.toArray(new AlignedStripe[stripes.size()]);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void calcualteChunkPositionsInBuf(ECSchema ecSchema,
|
||||||
|
LocatedStripedBlock blockGroup, byte[] buf, int offsetInBuf,
|
||||||
|
int firstCellIdxInBG, int lastCellIdxInBG, int firstCellSize,
|
||||||
|
long firstCellOffsetInBlk, int lastCellSize, AlignedStripe[] stripes) {
|
||||||
|
int cellSize = ecSchema.getChunkSize();
|
||||||
|
int dataBlkNum = ecSchema.getNumDataUnits();
|
||||||
|
// Step 3: calculate each chunk's position in destination buffer
|
||||||
|
/**
|
||||||
|
* | <--------------- AlignedStripe --------------->|
|
||||||
|
*
|
||||||
|
* |<- length_0 ->|<-- length_1 -->|<- length_2 ->|
|
||||||
* +------------------+------------------+----------------+
|
* +------------------+------------------+----------------+
|
||||||
* | cell_0 | cell_3 | cell_6 | <- blk_0
|
* | cell_0_0_0 | cell_3_1_0 | cell_6_2_0 | <- blk_0
|
||||||
* +------------------+------------------+----------------+
|
* +------------------+------------------+----------------+
|
||||||
* _/ \_______________________
|
* _/ \_______________________
|
||||||
* | |
|
* | |
|
||||||
* v offsetsInBuf[0] v offsetsInBuf[1]
|
* v offset_0 v offset_1
|
||||||
* +------------------------------------------------------+
|
* +----------------------------------------------------------+
|
||||||
* | cell_0 | cell_1 and cell_2 |cell_3 ...| <- buf
|
* | cell_0_0_0 | cell_1_0_1 and cell_2_0_2 |cell_3_1_0 ...| <- buf
|
||||||
* | (partial) | (from blk_1 and blk_2) | |
|
* | (partial) | (from blk_1 and blk_2) | |
|
||||||
* +------------------------------------------------------+
|
* +----------------------------------------------------------+
|
||||||
|
*
|
||||||
|
* Cell indexing convention defined in {@link StripingCell}
|
||||||
*/
|
*/
|
||||||
|
int done = 0;
|
||||||
|
for (int i = firstCellIdxInBG; i <= lastCellIdxInBG; i++) {
|
||||||
|
StripingCell cell = new StripingCell(ecSchema, i);
|
||||||
|
long cellStart = i == firstCellIdxInBG ?
|
||||||
|
firstCellOffsetInBlk : cell.idxInInternalBlk * cellSize;
|
||||||
|
int cellLen;
|
||||||
|
if (i == firstCellIdxInBG) {
|
||||||
|
cellLen = firstCellSize;
|
||||||
|
} else if (i == lastCellIdxInBG) {
|
||||||
|
cellLen = lastCellSize;
|
||||||
|
} else {
|
||||||
|
cellLen = cellSize;
|
||||||
|
}
|
||||||
|
long cellEnd = cellStart + cellLen - 1;
|
||||||
|
for (AlignedStripe s : stripes) {
|
||||||
|
long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1;
|
||||||
|
long overlapStart = Math.max(cellStart, s.getOffsetInBlock());
|
||||||
|
long overlapEnd = Math.min(cellEnd, stripeEnd);
|
||||||
|
int overLapLen = (int) (overlapEnd - overlapStart + 1);
|
||||||
|
if (overLapLen <= 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (s.chunks[cell.idxInStripe] == null) {
|
||||||
|
s.chunks[cell.idxInStripe] = new StripingChunk(buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
s.chunks[cell.idxInStripe].offsetsInBuf.
|
||||||
|
add((int)(offsetInBuf + done + overlapStart - cellStart));
|
||||||
|
s.chunks[cell.idxInStripe].lengthsInBuf.add(overLapLen);
|
||||||
|
}
|
||||||
|
done += cellLen;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void prepareAllZeroChunks(LocatedStripedBlock blockGroup,
|
||||||
|
byte[] buf, AlignedStripe[] stripes, int cellSize, int dataBlkNum) {
|
||||||
|
for (AlignedStripe s : stripes) {
|
||||||
|
for (int i = 0; i < dataBlkNum; i++) {
|
||||||
|
long internalBlkLen = getInternalBlockLength(blockGroup.getBlockSize(),
|
||||||
|
cellSize, dataBlkNum, i);
|
||||||
|
if (internalBlkLen <= s.getOffsetInBlock()) {
|
||||||
|
Preconditions.checkState(s.chunks[i] == null);
|
||||||
|
s.chunks[i] = new StripingChunk(buf);
|
||||||
|
s.chunks[i].state = StripingChunk.ALLZERO;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class represents the portion of I/O associated with each block in the
|
||||||
|
* striped block group.
|
||||||
|
* TODO: consolidate ReadPortion with AlignedStripe
|
||||||
|
*/
|
||||||
|
public static class ReadPortion {
|
||||||
private long startOffsetInBlock = 0;
|
private long startOffsetInBlock = 0;
|
||||||
private int readLength = 0;
|
private int readLength = 0;
|
||||||
public final List<Integer> offsetsInBuf = new ArrayList<>();
|
public final List<Integer> offsetsInBuf = new ArrayList<>();
|
||||||
@ -349,12 +576,235 @@ void addReadLength(int extraLength) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The unit of encoding used in {@link DFSStripedOutputStream}
|
||||||
|
* | <------- Striped Block Group -------> |
|
||||||
|
* blk_0 blk_1 blk_2
|
||||||
|
* | | |
|
||||||
|
* v v v
|
||||||
|
* +----------+ +----------+ +----------+
|
||||||
|
* |cell_0_0_0| |cell_1_0_1| |cell_2_0_2|
|
||||||
|
* +----------+ +----------+ +----------+
|
||||||
|
* |cell_3_1_0| |cell_4_1_1| |cell_5_1_2| <- {@link idxInBlkGroup} = 5
|
||||||
|
* +----------+ +----------+ +----------+ {@link idxInInternalBlk} = 1
|
||||||
|
* {@link idxInStripe} = 2
|
||||||
|
* A StripingCell is a special instance of {@link StripingChunk} whose offset
|
||||||
|
* and size align with the cell used when writing data.
|
||||||
|
* TODO: consider parity cells
|
||||||
|
*/
|
||||||
|
public static class StripingCell {
|
||||||
|
public final ECSchema schema;
|
||||||
|
/** Logical order in a block group, used when doing I/O to a block group */
|
||||||
|
public final int idxInBlkGroup;
|
||||||
|
public final int idxInInternalBlk;
|
||||||
|
public final int idxInStripe;
|
||||||
|
|
||||||
|
public StripingCell(ECSchema ecSchema, int idxInBlkGroup) {
|
||||||
|
this.schema = ecSchema;
|
||||||
|
this.idxInBlkGroup = idxInBlkGroup;
|
||||||
|
this.idxInInternalBlk = idxInBlkGroup / ecSchema.getNumDataUnits();
|
||||||
|
this.idxInStripe = idxInBlkGroup -
|
||||||
|
this.idxInInternalBlk * ecSchema.getNumDataUnits();
|
||||||
|
}
|
||||||
|
|
||||||
|
public StripingCell(ECSchema ecSchema, int idxInInternalBlk,
|
||||||
|
int idxInStripe) {
|
||||||
|
this.schema = ecSchema;
|
||||||
|
this.idxInInternalBlk = idxInInternalBlk;
|
||||||
|
this.idxInStripe = idxInStripe;
|
||||||
|
this.idxInBlkGroup =
|
||||||
|
idxInInternalBlk * ecSchema.getNumDataUnits() + idxInStripe;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a requested byte range on a striped block group, an AlignedStripe
|
||||||
|
* represents a {@link VerticalRange} that is aligned with both the byte range
|
||||||
|
* and boundaries of all internal blocks. As illustrated in the diagram, any
|
||||||
|
* given byte range on a block group leads to 1~5 AlignedStripe's.
|
||||||
|
*
|
||||||
|
* |<-------- Striped Block Group -------->|
|
||||||
|
* blk_0 blk_1 blk_2 blk_3 blk_4
|
||||||
|
* +----+ | +----+ +----+
|
||||||
|
* |full| | | | | | <- AlignedStripe0:
|
||||||
|
* +----+ |~~~~| | |~~~~| |~~~~| 1st cell is partial
|
||||||
|
* |part| | | | | | | | <- AlignedStripe1: byte range
|
||||||
|
* +----+ +----+ +----+ | |~~~~| |~~~~| doesn't start at 1st block
|
||||||
|
* |full| |full| |full| | | | | |
|
||||||
|
* |cell| |cell| |cell| | | | | | <- AlignedStripe2 (full stripe)
|
||||||
|
* | | | | | | | | | | |
|
||||||
|
* +----+ +----+ +----+ | |~~~~| |~~~~|
|
||||||
|
* |full| |part| | | | | | <- AlignedStripe3: byte range
|
||||||
|
* |~~~~| +----+ | |~~~~| |~~~~| doesn't end at last block
|
||||||
|
* | | | | | | | <- AlignedStripe4:
|
||||||
|
* +----+ | +----+ +----+ last cell is partial
|
||||||
|
* |
|
||||||
|
* <---- data blocks ----> | <--- parity --->
|
||||||
|
*
|
||||||
|
* An AlignedStripe is the basic unit of reading from a striped block group,
|
||||||
|
* because within the AlignedStripe, all internal blocks can be processed in
|
||||||
|
* a uniform manner.
|
||||||
|
*
|
||||||
|
* The coverage of an AlignedStripe on an internal block is represented as a
|
||||||
|
* {@link StripingChunk}.
|
||||||
|
* To simplify the logic of reading a logical byte range from a block group,
|
||||||
|
* a StripingChunk is either completely in the requested byte range or
|
||||||
|
* completely outside the requested byte range.
|
||||||
|
*/
|
||||||
|
public static class AlignedStripe {
|
||||||
|
public VerticalRange range;
|
||||||
|
/** status of each chunk in the stripe */
|
||||||
|
public final StripingChunk[] chunks;
|
||||||
|
public int fetchedChunksNum = 0;
|
||||||
|
public int missingChunksNum = 0;
|
||||||
|
|
||||||
|
public AlignedStripe(long offsetInBlock, long length, int width) {
|
||||||
|
Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
|
||||||
|
this.range = new VerticalRange(offsetInBlock, length);
|
||||||
|
this.chunks = new StripingChunk[width];
|
||||||
|
}
|
||||||
|
|
||||||
|
public AlignedStripe(VerticalRange range, int width) {
|
||||||
|
this.range = range;
|
||||||
|
this.chunks = new StripingChunk[width];
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean include(long pos) {
|
||||||
|
return range.include(pos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getOffsetInBlock() {
|
||||||
|
return range.offsetInBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getSpanInBlock() {
|
||||||
|
return range.spanInBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "Offset=" + range.offsetInBlock + ", length=" + range.spanInBlock +
|
||||||
|
", fetchedChunksNum=" + fetchedChunksNum +
|
||||||
|
", missingChunksNum=" + missingChunksNum;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A simple utility class representing an arbitrary vertical inclusive range
|
||||||
|
* starting at {@link offsetInBlock} and lasting for {@link length} bytes in
|
||||||
|
* an internal block. Note that VerticalRange doesn't necessarily align with
|
||||||
|
* {@link StripingCell}.
|
||||||
|
*
|
||||||
|
* |<- Striped Block Group ->|
|
||||||
|
* blk_0
|
||||||
|
* |
|
||||||
|
* v
|
||||||
|
* +-----+
|
||||||
|
* |~~~~~| <-- {@link offsetInBlock}
|
||||||
|
* | | ^
|
||||||
|
* | | |
|
||||||
|
* | | | {@link spanInBlock}
|
||||||
|
* | | v
|
||||||
|
* |~~~~~| ---
|
||||||
|
* | |
|
||||||
|
* +-----+
|
||||||
|
*/
|
||||||
|
public static class VerticalRange {
|
||||||
|
/** start offset in the block group (inclusive) */
|
||||||
|
public long offsetInBlock;
|
||||||
|
/** length of the stripe range */
|
||||||
|
public long spanInBlock;
|
||||||
|
|
||||||
|
public VerticalRange(long offsetInBlock, long length) {
|
||||||
|
Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
|
||||||
|
this.offsetInBlock = offsetInBlock;
|
||||||
|
this.spanInBlock = length;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** whether a position is in the range */
|
||||||
|
public boolean include(long pos) {
|
||||||
|
return pos >= offsetInBlock && pos < offsetInBlock + spanInBlock;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates the coverage of an {@link AlignedStripe} on an internal block,
|
||||||
|
* and the state of the chunk in the context of the read request.
|
||||||
|
*
|
||||||
|
* |<---------------- Striped Block Group --------------->|
|
||||||
|
* blk_0 blk_1 blk_2 blk_3 blk_4
|
||||||
|
* +---------+ | +----+ +----+
|
||||||
|
* null null |REQUESTED| | |null| |null| <- AlignedStripe0
|
||||||
|
* +---------+ |---------| | |----| |----|
|
||||||
|
* null |REQUESTED| |REQUESTED| | |null| |null| <- AlignedStripe1
|
||||||
|
* +---------+ +---------+ +---------+ | +----+ +----+
|
||||||
|
* |REQUESTED| |REQUESTED| ALLZERO | |null| |null| <- AlignedStripe2
|
||||||
|
* +---------+ +---------+ | +----+ +----+
|
||||||
|
* <----------- data blocks ------------> | <--- parity --->
|
||||||
|
*
|
||||||
|
* The class also carries {@link buf}, {@link offsetsInBuf}, and
|
||||||
|
* {@link lengthsInBuf} to define how read task for this chunk should deliver
|
||||||
|
* the returned data.
|
||||||
|
*/
|
||||||
|
public static class StripingChunk {
|
||||||
|
/** Chunk has been successfully fetched */
|
||||||
|
public static final int FETCHED = 0x01;
|
||||||
|
/** Chunk has encountered failed when being fetched */
|
||||||
|
public static final int MISSING = 0x02;
|
||||||
|
/** Chunk being fetched (fetching task is in-flight) */
|
||||||
|
public static final int PENDING = 0x04;
|
||||||
|
/**
|
||||||
|
* Chunk is requested either by application or for decoding, need to
|
||||||
|
* schedule read task
|
||||||
|
*/
|
||||||
|
public static final int REQUESTED = 0X08;
|
||||||
|
/**
|
||||||
|
* Internal block is short and has no overlap with chunk. Chunk considered
|
||||||
|
* all-zero bytes in codec calculations.
|
||||||
|
*/
|
||||||
|
public static final int ALLZERO = 0X0f;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If a chunk is completely in requested range, the state transition is:
|
||||||
|
* REQUESTED (when AlignedStripe created) -> PENDING -> {FETCHED | MISSING}
|
||||||
|
* If a chunk is completely outside requested range (including parity
|
||||||
|
* chunks), state transition is:
|
||||||
|
* null (AlignedStripe created) -> REQUESTED (upon failure) -> PENDING ...
|
||||||
|
*/
|
||||||
|
public int state = REQUESTED;
|
||||||
|
public byte[] buf;
|
||||||
|
public List<Integer> offsetsInBuf;
|
||||||
|
public List<Integer> lengthsInBuf;
|
||||||
|
|
||||||
|
public StripingChunk(byte[] buf) {
|
||||||
|
this.buf = buf;
|
||||||
|
this.offsetsInBuf = new ArrayList<>();
|
||||||
|
this.lengthsInBuf = new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int[] getOffsets() {
|
||||||
|
int[] offsets = new int[offsetsInBuf.size()];
|
||||||
|
for (int i = 0; i < offsets.length; i++) {
|
||||||
|
offsets[i] = offsetsInBuf.get(i);
|
||||||
|
}
|
||||||
|
return offsets;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int[] getLengths() {
|
||||||
|
int[] lens = new int[this.lengthsInBuf.size()];
|
||||||
|
for (int i = 0; i < lens.length; i++) {
|
||||||
|
lens[i] = this.lengthsInBuf.get(i);
|
||||||
|
}
|
||||||
|
return lens;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class represents result from a striped read request.
|
* This class represents result from a striped read request.
|
||||||
* If the task was successful or the internal computation failed,
|
* If the task was successful or the internal computation failed,
|
||||||
* an index is also returned.
|
* an index is also returned.
|
||||||
*/
|
*/
|
||||||
public static class StripedReadResult {
|
public static class StripingChunkReadResult {
|
||||||
public static final int SUCCESSFUL = 0x01;
|
public static final int SUCCESSFUL = 0x01;
|
||||||
public static final int FAILED = 0x02;
|
public static final int FAILED = 0x02;
|
||||||
public static final int TIMEOUT = 0x04;
|
public static final int TIMEOUT = 0x04;
|
||||||
@ -363,18 +813,23 @@ public static class StripedReadResult {
|
|||||||
public final int index;
|
public final int index;
|
||||||
public final int state;
|
public final int state;
|
||||||
|
|
||||||
public StripedReadResult(int state) {
|
public StripingChunkReadResult(int state) {
|
||||||
Preconditions.checkArgument(state == TIMEOUT,
|
Preconditions.checkArgument(state == TIMEOUT,
|
||||||
"Only timeout result should return negative index.");
|
"Only timeout result should return negative index.");
|
||||||
this.index = -1;
|
this.index = -1;
|
||||||
this.state = state;
|
this.state = state;
|
||||||
}
|
}
|
||||||
|
|
||||||
public StripedReadResult(int index, int state) {
|
public StripingChunkReadResult(int index, int state) {
|
||||||
Preconditions.checkArgument(state != TIMEOUT,
|
Preconditions.checkArgument(state != TIMEOUT,
|
||||||
"Timeout result should return negative index.");
|
"Timeout result should return negative index.");
|
||||||
this.index = index;
|
this.index = index;
|
||||||
this.state = state;
|
this.state = state;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "(index=" + index + ", state =" + state + ")";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,6 +36,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
|
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
|
||||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -133,8 +134,102 @@ public void testPread() throws Exception {
|
|||||||
byte[] readBuffer = new byte[readSize];
|
byte[] readBuffer = new byte[readSize];
|
||||||
int ret = in.read(0, readBuffer, 0, readSize);
|
int ret = in.read(0, readBuffer, 0, readSize);
|
||||||
|
|
||||||
|
byte[] expected = new byte[readSize];
|
||||||
|
/** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
|
||||||
|
for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
|
||||||
|
for (int j = 0; j < DATA_BLK_NUM; j++) {
|
||||||
|
for (int k = 0; k < CELLSIZE; k++) {
|
||||||
|
int posInBlk = i * CELLSIZE + k;
|
||||||
|
int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
|
||||||
|
expected[posInFile] = SimulatedFSDataset.simulatedByte(
|
||||||
|
new Block(bg.getBlock().getBlockId() + j), posInBlk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
assertEquals(readSize, ret);
|
assertEquals(readSize, ret);
|
||||||
// TODO: verify read results with patterned data from HDFS-8117
|
assertArrayEquals(expected, readBuffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPreadWithDNFailure() throws Exception {
|
||||||
|
final int numBlocks = 4;
|
||||||
|
final int failedDNIdx = 2;
|
||||||
|
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
|
||||||
|
NUM_STRIPE_PER_BLOCK, false);
|
||||||
|
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
|
||||||
|
filePath.toString(), 0, BLOCK_GROUP_SIZE);
|
||||||
|
|
||||||
|
assert lbs.get(0) instanceof LocatedStripedBlock;
|
||||||
|
LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
|
||||||
|
for (int i = 0; i < DATA_BLK_NUM + PARITY_BLK_NUM; i++) {
|
||||||
|
Block blk = new Block(bg.getBlock().getBlockId() + i,
|
||||||
|
NUM_STRIPE_PER_BLOCK * CELLSIZE,
|
||||||
|
bg.getBlock().getGenerationStamp());
|
||||||
|
blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
|
||||||
|
cluster.injectBlocks(i, Arrays.asList(blk),
|
||||||
|
bg.getBlock().getBlockPoolId());
|
||||||
|
}
|
||||||
|
DFSStripedInputStream in =
|
||||||
|
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false,
|
||||||
|
ErasureCodingSchemaManager.getSystemDefaultSchema());
|
||||||
|
int readSize = BLOCK_GROUP_SIZE;
|
||||||
|
byte[] readBuffer = new byte[readSize];
|
||||||
|
byte[] expected = new byte[readSize];
|
||||||
|
cluster.stopDataNode(failedDNIdx);
|
||||||
|
/** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
|
||||||
|
for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
|
||||||
|
for (int j = 0; j < DATA_BLK_NUM; j++) {
|
||||||
|
for (int k = 0; k < CELLSIZE; k++) {
|
||||||
|
int posInBlk = i * CELLSIZE + k;
|
||||||
|
int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
|
||||||
|
expected[posInFile] = SimulatedFSDataset.simulatedByte(
|
||||||
|
new Block(bg.getBlock().getBlockId() + j), posInBlk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the expected content for decoded data
|
||||||
|
for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
|
||||||
|
byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE];
|
||||||
|
int[] missingBlkIdx = new int[]{failedDNIdx, DATA_BLK_NUM+1, DATA_BLK_NUM+2};
|
||||||
|
byte[][] decodeOutputs = new byte[PARITY_BLK_NUM][CELLSIZE];
|
||||||
|
for (int j = 0; j < DATA_BLK_NUM; j++) {
|
||||||
|
int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE;
|
||||||
|
if (j != failedDNIdx) {
|
||||||
|
System.arraycopy(expected, posInBuf, decodeInputs[j], 0, CELLSIZE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int k = 0; k < CELLSIZE; k++) {
|
||||||
|
int posInBlk = i * CELLSIZE + k;
|
||||||
|
decodeInputs[DATA_BLK_NUM][k] = SimulatedFSDataset.simulatedByte(
|
||||||
|
new Block(bg.getBlock().getBlockId() + DATA_BLK_NUM), posInBlk);
|
||||||
|
}
|
||||||
|
// RSRawDecoder rsRawDecoder = new RSRawDecoder();
|
||||||
|
// rsRawDecoder.initialize(DATA_BLK_NUM, PARITY_BLK_NUM, CELLSIZE);
|
||||||
|
// rsRawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs);
|
||||||
|
int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE;
|
||||||
|
// System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE);
|
||||||
|
//TODO: workaround (filling fixed bytes), to remove after HADOOP-11938
|
||||||
|
Arrays.fill(expected, posInBuf, posInBuf + CELLSIZE, (byte)7);
|
||||||
|
}
|
||||||
|
int delta = 10;
|
||||||
|
int done = 0;
|
||||||
|
// read a small delta, shouldn't trigger decode
|
||||||
|
// |cell_0 |
|
||||||
|
// |10 |
|
||||||
|
done += in.read(0, readBuffer, 0, delta);
|
||||||
|
assertEquals(delta, done);
|
||||||
|
// both head and trail cells are partial
|
||||||
|
// |c_0 |c_1 |c_2 |c_3 |c_4 |c_5 |
|
||||||
|
// |256K - 10|missing|256K|256K|256K - 10|not in range|
|
||||||
|
done += in.read(delta, readBuffer, delta,
|
||||||
|
CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta);
|
||||||
|
assertEquals(CELLSIZE * (DATA_BLK_NUM - 1) - delta, done);
|
||||||
|
// read the rest
|
||||||
|
done += in.read(done, readBuffer, done, readSize - done);
|
||||||
|
assertEquals(readSize, done);
|
||||||
|
assertArrayEquals(expected, readBuffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -18,10 +18,13 @@
|
|||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
@ -321,4 +324,50 @@ private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
|
|||||||
Assert.assertArrayEquals(bytes, result.array());
|
Assert.assertArrayEquals(bytes, result.array());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWritePreadWithDNFailure() throws IOException {
|
||||||
|
final int failedDNIdx = 2;
|
||||||
|
final int length = cellSize * (dataBlocks + 2);
|
||||||
|
Path testPath = new Path("/foo");
|
||||||
|
final byte[] bytes = generateBytes(length);
|
||||||
|
DFSTestUtil.writeFile(fs, testPath, new String(bytes));
|
||||||
|
|
||||||
|
// shut down the DN that holds the last internal data block
|
||||||
|
BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
|
||||||
|
cellSize);
|
||||||
|
String name = (locs[0].getNames())[failedDNIdx];
|
||||||
|
for (DataNode dn : cluster.getDataNodes()) {
|
||||||
|
int port = dn.getXferPort();
|
||||||
|
if (name.contains(Integer.toString(port))) {
|
||||||
|
dn.shutdown();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// pread
|
||||||
|
int startOffsetInFile = cellSize * 5;
|
||||||
|
try (FSDataInputStream fsdis = fs.open(testPath)) {
|
||||||
|
byte[] buf = new byte[length];
|
||||||
|
int readLen = fsdis.read(startOffsetInFile, buf, 0, buf.length);
|
||||||
|
Assert.assertEquals("The length of file should be the same to write size",
|
||||||
|
length - startOffsetInFile, readLen);
|
||||||
|
|
||||||
|
RSRawDecoder rsRawDecoder = new RSRawDecoder();
|
||||||
|
rsRawDecoder.initialize(dataBlocks, parityBlocks, 1);
|
||||||
|
byte[] expected = new byte[readLen];
|
||||||
|
for (int i = startOffsetInFile; i < length; i++) {
|
||||||
|
//TODO: workaround (filling fixed bytes), to remove after HADOOP-11938
|
||||||
|
if ((i / cellSize) % dataBlocks == failedDNIdx) {
|
||||||
|
expected[i - startOffsetInFile] = (byte)7;
|
||||||
|
} else {
|
||||||
|
expected[i - startOffsetInFile] = getByte(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int i = startOffsetInFile; i < length; i++) {
|
||||||
|
Assert.assertEquals("Byte at " + i + " should be the same",
|
||||||
|
expected[i - startOffsetInFile], buf[i - startOffsetInFile]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user