HDFS-8203. Erasure Coding: Seek and other Ops in DFSStripedInputStream. Contributed by Yi Liu.
This commit is contained in:
parent
ac97edd1ab
commit
a17cedb44c
@ -186,3 +186,6 @@
|
||||
|
||||
HDFS-8129. Erasure Coding: Maintain consistent naming for Erasure Coding related classes - EC/ErasureCoding
|
||||
(umamahesh)
|
||||
|
||||
HDFS-8203. Erasure Coding: Seek and other Ops in DFSStripedInputStream.
|
||||
(Yi Liu via jing9)
|
||||
|
@ -19,10 +19,13 @@
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.ChecksumException;
|
||||
import org.apache.hadoop.fs.ReadOption;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.*;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||
import org.apache.hadoop.io.ByteBufferPool;
|
||||
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions;
|
||||
|
||||
@ -31,9 +34,11 @@
|
||||
import org.apache.htrace.Trace;
|
||||
import org.apache.htrace.TraceScope;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
@ -263,6 +268,10 @@ protected void closeCurrentBlockReaders() {
|
||||
}
|
||||
|
||||
private long getOffsetInBlockGroup() {
|
||||
return getOffsetInBlockGroup(pos);
|
||||
}
|
||||
|
||||
private long getOffsetInBlockGroup(long pos) {
|
||||
return pos - currentLocatedBlock.getStartOffset();
|
||||
}
|
||||
|
||||
@ -278,18 +287,22 @@ private void readOneStripe(
|
||||
// compute stripe range based on pos
|
||||
final long offsetInBlockGroup = getOffsetInBlockGroup();
|
||||
final long stripeLen = cellSize * dataBlkNum;
|
||||
int stripeIndex = (int) (offsetInBlockGroup / stripeLen);
|
||||
curStripeRange = new StripeRange(stripeIndex * stripeLen,
|
||||
Math.min(currentLocatedBlock.getBlockSize() - (stripeIndex * stripeLen),
|
||||
stripeLen));
|
||||
final int numCell = (int) ((curStripeRange.length - 1) / cellSize + 1);
|
||||
final int stripeIndex = (int) (offsetInBlockGroup / stripeLen);
|
||||
final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
|
||||
final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
|
||||
- (stripeIndex * stripeLen), stripeLen);
|
||||
curStripeRange = new StripeRange(offsetInBlockGroup,
|
||||
stripeLimit - stripeBufOffset);
|
||||
|
||||
final int startCell = stripeBufOffset / cellSize;
|
||||
final int numCell = (stripeLimit - 1) / cellSize + 1;
|
||||
|
||||
// read the whole stripe in parallel
|
||||
Map<Future<Integer>, Integer> futures = new HashMap<>();
|
||||
for (int i = 0; i < numCell; i++) {
|
||||
curStripeBuf.position(cellSize * i);
|
||||
curStripeBuf.limit((int) Math.min(cellSize * (i + 1),
|
||||
curStripeRange.length));
|
||||
for (int i = startCell; i < numCell; i++) {
|
||||
int bufPos = i == startCell ? stripeBufOffset : cellSize * i;
|
||||
curStripeBuf.position(bufPos);
|
||||
curStripeBuf.limit(Math.min(cellSize * (i + 1), stripeLimit));
|
||||
ByteBuffer buf = curStripeBuf.slice();
|
||||
ByteBufferStrategy strategy = new ByteBufferStrategy(buf);
|
||||
final int targetLength = buf.remaining();
|
||||
@ -329,6 +342,39 @@ public Integer call() throws Exception {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Seek to a new arbitrary location
|
||||
*/
|
||||
@Override
|
||||
public synchronized void seek(long targetPos) throws IOException {
|
||||
if (targetPos > getFileLength()) {
|
||||
throw new EOFException("Cannot seek after EOF");
|
||||
}
|
||||
if (targetPos < 0) {
|
||||
throw new EOFException("Cannot seek to negative offset");
|
||||
}
|
||||
if (closed.get()) {
|
||||
throw new IOException("Stream is closed!");
|
||||
}
|
||||
if (targetPos <= blockEnd) {
|
||||
final long targetOffsetInBlk = getOffsetInBlockGroup(targetPos);
|
||||
if (curStripeRange.include(targetOffsetInBlk)) {
|
||||
int bufOffset = getStripedBufOffset(targetOffsetInBlk);
|
||||
curStripeBuf.position(bufOffset);
|
||||
pos = targetPos;
|
||||
return;
|
||||
}
|
||||
}
|
||||
pos = targetPos;
|
||||
blockEnd = -1;
|
||||
}
|
||||
|
||||
private int getStripedBufOffset(long offsetInBlockGroup) {
|
||||
final long stripeLen = cellSize * dataBlkNum;
|
||||
// compute the position in the curStripeBuf based on "pos"
|
||||
return (int) (offsetInBlockGroup % stripeLen);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized int readWithStrategy(ReaderStrategy strategy,
|
||||
int off, int len) throws IOException {
|
||||
@ -405,10 +451,8 @@ private int readBuffer(BlockReader blockReader,
|
||||
* @return number of bytes copied
|
||||
*/
|
||||
private int copy(ReaderStrategy strategy, int offset, int length) {
|
||||
final long stripeLen = cellSize * dataBlkNum;
|
||||
final long offsetInBlk = pos - currentLocatedBlock.getStartOffset();
|
||||
// compute the position in the curStripeBuf based on "pos"
|
||||
int bufOffset = (int) (offsetInBlk % stripeLen);
|
||||
final long offsetInBlk = getOffsetInBlockGroup();
|
||||
int bufOffset = getStripedBufOffset(offsetInBlk);
|
||||
curStripeBuf.position(bufOffset);
|
||||
return strategy.copyFrom(curStripeBuf, offset,
|
||||
Math.min(length, curStripeBuf.remaining()));
|
||||
@ -546,4 +590,22 @@ private <T> void waitNextCompletion(CompletionService<T> service,
|
||||
}
|
||||
throw new InterruptedException("let's retry");
|
||||
}
|
||||
|
||||
/**
|
||||
* May need online read recovery, zero-copy read doesn't make
|
||||
* sense, so don't support it.
|
||||
*/
|
||||
@Override
|
||||
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
|
||||
int maxLength, EnumSet<ReadOption> opts)
|
||||
throws IOException, UnsupportedOperationException {
|
||||
throw new UnsupportedOperationException(
|
||||
"Not support enhanced byte buffer access.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void releaseBuffer(ByteBuffer buffer) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Not support enhanced byte buffer access.");
|
||||
}
|
||||
}
|
||||
|
@ -22,12 +22,12 @@
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
@ -150,11 +150,35 @@ private byte[] generateBytes(int cnt) {
|
||||
return bytes;
|
||||
}
|
||||
|
||||
private int readAll(FSDataInputStream in, byte[] buf) throws IOException {
|
||||
int readLen = 0;
|
||||
int ret;
|
||||
do {
|
||||
ret = in.read(buf, readLen, buf.length - readLen);
|
||||
if (ret > 0) {
|
||||
readLen += ret;
|
||||
}
|
||||
} while (ret >= 0 && readLen < buf.length);
|
||||
return readLen;
|
||||
}
|
||||
|
||||
private byte getByte(long pos) {
|
||||
final int mod = 29;
|
||||
return (byte) (pos % mod + 1);
|
||||
}
|
||||
|
||||
private void assertSeekAndRead(FSDataInputStream fsdis, int pos,
|
||||
int writeBytes) throws IOException {
|
||||
fsdis.seek(pos);
|
||||
byte[] buf = new byte[writeBytes];
|
||||
int readLen = readAll(fsdis, buf);
|
||||
Assert.assertEquals(readLen, writeBytes - pos);
|
||||
for (int i = 0; i < readLen; i++) {
|
||||
Assert.assertEquals("Byte at " + i + " should be the same",
|
||||
getByte(pos + i), buf[i]);
|
||||
}
|
||||
}
|
||||
|
||||
private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
|
||||
throws IOException {
|
||||
Path testPath = new Path(src);
|
||||
@ -183,15 +207,7 @@ private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
|
||||
// stateful read with byte array
|
||||
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
|
||||
byte[] buf = new byte[writeBytes + 100];
|
||||
int readLen = 0;
|
||||
int ret;
|
||||
do {
|
||||
ret = fsdis.read(buf, readLen, buf.length - readLen);
|
||||
if (ret > 0) {
|
||||
readLen += ret;
|
||||
}
|
||||
} while (ret >= 0);
|
||||
readLen = readLen >= 0 ? readLen : 0;
|
||||
int readLen = readAll(fsdis, buf);
|
||||
Assert.assertEquals("The length of file should be the same to write size",
|
||||
writeBytes, readLen);
|
||||
for (int i = 0; i < writeBytes; i++) {
|
||||
@ -200,6 +216,53 @@ private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
|
||||
}
|
||||
}
|
||||
|
||||
// seek and stateful read
|
||||
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
|
||||
// seek to 1/2 of content
|
||||
int pos = writeBytes/2;
|
||||
assertSeekAndRead(fsdis, pos, writeBytes);
|
||||
|
||||
// seek to 1/3 of content
|
||||
pos = writeBytes/3;
|
||||
assertSeekAndRead(fsdis, pos, writeBytes);
|
||||
|
||||
// seek to 0 pos
|
||||
pos = 0;
|
||||
assertSeekAndRead(fsdis, pos, writeBytes);
|
||||
|
||||
if (writeBytes > cellSize) {
|
||||
// seek to cellSize boundary
|
||||
pos = cellSize -1;
|
||||
assertSeekAndRead(fsdis, pos, writeBytes);
|
||||
}
|
||||
|
||||
if (writeBytes > cellSize * dataBlocks) {
|
||||
// seek to striped cell group boundary
|
||||
pos = cellSize * dataBlocks - 1;
|
||||
assertSeekAndRead(fsdis, pos, writeBytes);
|
||||
}
|
||||
|
||||
if (writeBytes > blockSize * dataBlocks) {
|
||||
// seek to striped block group boundary
|
||||
pos = blockSize * dataBlocks - 1;
|
||||
assertSeekAndRead(fsdis, pos, writeBytes);
|
||||
}
|
||||
|
||||
try {
|
||||
fsdis.seek(-1);
|
||||
Assert.fail("Should be failed if seek to negative offset");
|
||||
} catch (EOFException e) {
|
||||
// expected
|
||||
}
|
||||
|
||||
try {
|
||||
fsdis.seek(writeBytes + 1);
|
||||
Assert.fail("Should be failed if seek after EOF");
|
||||
} catch (EOFException e) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
// stateful read with ByteBuffer
|
||||
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
|
||||
ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);
|
||||
|
Loading…
Reference in New Issue
Block a user