HDFS-8235. Erasure Coding: Create DFSStripedInputStream in DFSClient#open. Contributed by Kai Sasaki.
This commit is contained in:
parent
5eb17e5726
commit
9593776e34
@ -139,4 +139,7 @@
|
|||||||
commands from standbynode if any (vinayakumarb)
|
commands from standbynode if any (vinayakumarb)
|
||||||
|
|
||||||
HDFS-8189. ClientProtocol#createErasureCodingZone API was wrongly annotated
|
HDFS-8189. ClientProtocol#createErasureCodingZone API was wrongly annotated
|
||||||
as Idempotent (vinayakumarb)
|
as Idempotent (vinayakumarb)
|
||||||
|
|
||||||
|
HDFS-8235. Erasure Coding: Create DFSStripedInputStream in DFSClient#open.
|
||||||
|
(Kai Sasaki via jing9)
|
||||||
|
@ -1193,7 +1193,12 @@ public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
|
|||||||
// Get block info from namenode
|
// Get block info from namenode
|
||||||
TraceScope scope = getPathTraceScope("newDFSInputStream", src);
|
TraceScope scope = getPathTraceScope("newDFSInputStream", src);
|
||||||
try {
|
try {
|
||||||
return new DFSInputStream(this, src, verifyChecksum);
|
ECInfo info = getErasureCodingInfo(src);
|
||||||
|
if (info != null) {
|
||||||
|
return new DFSStripedInputStream(this, src, verifyChecksum, info);
|
||||||
|
} else {
|
||||||
|
return new DFSInputStream(this, src, verifyChecksum);
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
scope.close();
|
scope.close();
|
||||||
}
|
}
|
||||||
|
@ -134,11 +134,12 @@ static ReadPortion[] planReadPortions(final int dataBlkNum,
|
|||||||
private final short parityBlkNum;
|
private final short parityBlkNum;
|
||||||
private final ECInfo ecInfo;
|
private final ECInfo ecInfo;
|
||||||
|
|
||||||
DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum)
|
DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, ECInfo info)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
super(dfsClient, src, verifyChecksum);
|
super(dfsClient, src, verifyChecksum);
|
||||||
// ECInfo is restored from NN just before reading striped file.
|
// ECInfo is restored from NN just before reading striped file.
|
||||||
ecInfo = dfsClient.getErasureCodingInfo(src);
|
assert info != null;
|
||||||
|
ecInfo = info;
|
||||||
cellSize = ecInfo.getSchema().getChunkSize();
|
cellSize = ecInfo.getSchema().getChunkSize();
|
||||||
dataBlkNum = (short)ecInfo.getSchema().getNumDataUnits();
|
dataBlkNum = (short)ecInfo.getSchema().getNumDataUnits();
|
||||||
parityBlkNum = (short)ecInfo.getSchema().getNumParityUnits();
|
parityBlkNum = (short)ecInfo.getSchema().getNumParityUnits();
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
@ -167,10 +168,9 @@ private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
|
|||||||
writeBytes, fileLength);
|
writeBytes, fileLength);
|
||||||
|
|
||||||
// pread
|
// pread
|
||||||
try (DFSStripedInputStream dis =
|
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
|
||||||
new DFSStripedInputStream(fs.getClient(), src, true)) {
|
|
||||||
byte[] buf = new byte[writeBytes + 100];
|
byte[] buf = new byte[writeBytes + 100];
|
||||||
int readLen = dis.read(0, buf, 0, buf.length);
|
int readLen = fsdis.read(0, buf, 0, buf.length);
|
||||||
readLen = readLen >= 0 ? readLen : 0;
|
readLen = readLen >= 0 ? readLen : 0;
|
||||||
Assert.assertEquals("The length of file should be the same to write size",
|
Assert.assertEquals("The length of file should be the same to write size",
|
||||||
writeBytes, readLen);
|
writeBytes, readLen);
|
||||||
@ -180,13 +180,12 @@ private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// stateful read with byte array
|
// stateful read with byte array
|
||||||
try (DFSStripedInputStream dis =
|
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
|
||||||
new DFSStripedInputStream(fs.getClient(), src, true)) {
|
|
||||||
byte[] buf = new byte[writeBytes + 100];
|
byte[] buf = new byte[writeBytes + 100];
|
||||||
int readLen = 0;
|
int readLen = 0;
|
||||||
int ret;
|
int ret;
|
||||||
do {
|
do {
|
||||||
ret = dis.read(buf, readLen, buf.length - readLen);
|
ret = fsdis.read(buf, readLen, buf.length - readLen);
|
||||||
if (ret > 0) {
|
if (ret > 0) {
|
||||||
readLen += ret;
|
readLen += ret;
|
||||||
}
|
}
|
||||||
@ -201,13 +200,12 @@ private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// stateful read with ByteBuffer
|
// stateful read with ByteBuffer
|
||||||
try (DFSStripedInputStream dis =
|
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
|
||||||
new DFSStripedInputStream(fs.getClient(), src, true)) {
|
|
||||||
ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);
|
ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);
|
||||||
int readLen = 0;
|
int readLen = 0;
|
||||||
int ret;
|
int ret;
|
||||||
do {
|
do {
|
||||||
ret = dis.read(buf);
|
ret = fsdis.read(buf);
|
||||||
if (ret > 0) {
|
if (ret > 0) {
|
||||||
readLen += ret;
|
readLen += ret;
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ECInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
@ -33,6 +34,7 @@
|
|||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ECSchemaManager;
|
||||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -52,6 +54,8 @@ public class TestReadStripedFile {
|
|||||||
private DistributedFileSystem fs;
|
private DistributedFileSystem fs;
|
||||||
private final Path dirPath = new Path("/striped");
|
private final Path dirPath = new Path("/striped");
|
||||||
private Path filePath = new Path(dirPath, "file");
|
private Path filePath = new Path(dirPath, "file");
|
||||||
|
private ECInfo info = new ECInfo(filePath.toString(),
|
||||||
|
ECSchemaManager.getSystemDefaultSchema());
|
||||||
private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
|
private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
|
||||||
private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
|
private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
|
||||||
private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
|
private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
|
||||||
@ -89,7 +93,7 @@ public void testGetBlock() throws Exception {
|
|||||||
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
|
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
|
||||||
filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
|
filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
|
||||||
final DFSStripedInputStream in =
|
final DFSStripedInputStream in =
|
||||||
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
|
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false, info);
|
||||||
|
|
||||||
List<LocatedBlock> lbList = lbs.getLocatedBlocks();
|
List<LocatedBlock> lbList = lbs.getLocatedBlocks();
|
||||||
for (LocatedBlock aLbList : lbList) {
|
for (LocatedBlock aLbList : lbList) {
|
||||||
@ -124,7 +128,8 @@ public void testPread() throws Exception {
|
|||||||
bg.getBlock().getBlockPoolId());
|
bg.getBlock().getBlockPoolId());
|
||||||
}
|
}
|
||||||
DFSStripedInputStream in =
|
DFSStripedInputStream in =
|
||||||
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
|
new DFSStripedInputStream(fs.getClient(),
|
||||||
|
filePath.toString(), false, info);
|
||||||
int readSize = BLOCK_GROUP_SIZE;
|
int readSize = BLOCK_GROUP_SIZE;
|
||||||
byte[] readBuffer = new byte[readSize];
|
byte[] readBuffer = new byte[readSize];
|
||||||
int ret = in.read(0, readBuffer, 0, readSize);
|
int ret = in.read(0, readBuffer, 0, readSize);
|
||||||
@ -170,7 +175,7 @@ private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket)
|
|||||||
|
|
||||||
DFSStripedInputStream in =
|
DFSStripedInputStream in =
|
||||||
new DFSStripedInputStream(fs.getClient(), filePath.toString(),
|
new DFSStripedInputStream(fs.getClient(), filePath.toString(),
|
||||||
false);
|
false, info);
|
||||||
|
|
||||||
byte[] expected = new byte[fileSize];
|
byte[] expected = new byte[fileSize];
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user