HDFS-8136. Client gets and uses EC schema when reads and writes a stripping file. Contributed by Kai Sasaki

This commit is contained in:
Kai Zheng 2015-04-24 00:19:12 +08:00 committed by Zhe Zhang
parent e8df2581c3
commit fcd54ecce2
6 changed files with 209 additions and 15 deletions

View File

@ -119,3 +119,6 @@
HDFS-8156. Add/implement necessary APIs even we just have the system default
schema. (Kai Zheng via Zhe Zhang)
HDFS-8136. Client gets and uses EC schema when reads and writes a stripping
file. (Kai Sasaki via Kai Zheng)

View File

@ -21,9 +21,9 @@
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.net.NetUtils;
@ -125,13 +125,19 @@ static ReadPortion[] planReadPortions(final int dataBlkNum,
return results;
}
private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final short dataBlkNum = HdfsConstants.NUM_DATA_BLOCKS;
private final short parityBlkNum = HdfsConstants.NUM_PARITY_BLOCKS;
private final int cellSize;
private final short dataBlkNum;
private final short parityBlkNum;
private final ECInfo ecInfo;
DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum)
throws IOException {
super(dfsClient, src, verifyChecksum);
// ECInfo is restored from NN just before reading striped file.
ecInfo = dfsClient.getErasureCodingInfo(src);
cellSize = ecInfo.getSchema().getChunkSize();
dataBlkNum = (short)ecInfo.getSchema().getNumDataUnits();
parityBlkNum = (short)ecInfo.getSchema().getNumParityUnits();
DFSClient.LOG.debug("Creating an striped input stream for file " + src);
}
@ -279,9 +285,6 @@ private void waitNextCompletion(CompletionService<Void> stripedReadsService,
throw new InterruptedException("let's retry");
}
public void setCellSize(int cellSize) {
this.cellSize = cellSize;
}
/**
* This class represents the portion of I/O associated with each block in the

View File

@ -32,8 +32,8 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
@ -61,11 +61,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
/**
* Size of each striping cell, must be a multiple of bytesPerChecksum
*/
private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final ECInfo ecInfo;
private final int cellSize;
private ByteBuffer[] cellBuffers;
private final short numAllBlocks = HdfsConstants.NUM_DATA_BLOCKS
+ HdfsConstants.NUM_PARITY_BLOCKS;
private final short numDataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
private final short numAllBlocks;
private final short numDataBlocks;
private int curIdx = 0;
/* bytes written in current block group */
//private long currentBlockGroupBytes = 0;
@ -77,6 +79,10 @@ private StripedDataStreamer getLeadingStreamer() {
return streamers.get(0);
}
private long getBlockGroupSize() {
return blockSize * numDataBlocks;
}
/** Construct a new output stream for creating a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
@ -84,6 +90,14 @@ private StripedDataStreamer getLeadingStreamer() {
throws IOException {
super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
DFSClient.LOG.info("Creating striped output stream");
// ECInfo is restored from NN just before writing striped files.
ecInfo = dfsClient.getErasureCodingInfo(src);
cellSize = ecInfo.getSchema().getChunkSize();
numAllBlocks = (short)(ecInfo.getSchema().getNumDataUnits()
+ ecInfo.getSchema().getNumParityUnits());
numDataBlocks = (short)ecInfo.getSchema().getNumDataUnits();
checkConfiguration();
cellBuffers = new ByteBuffer[numAllBlocks];

View File

@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
public class TestDFSStripedInputStream {
private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
private static DistributedFileSystem fs;
private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final static int stripesPerBlock = 4;
static int blockSize = cellSize * stripesPerBlock;
private int mod = 29;
static int numDNs = dataBlocks + parityBlocks + 2;
private static MiniDFSCluster cluster;
private static Configuration conf;
@BeforeClass
public static void setup() throws IOException {
conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
cluster
= new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();;
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
fs = cluster.getFileSystem();
}
@AfterClass
public static void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testFileEmpty() throws IOException {
testOneFileUsingDFSStripedInputStream("/EmptyFile", 0);
}
@Test
public void testFileSmallerThanOneCell1() throws IOException {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", 1);
}
@Test
public void testFileSmallerThanOneCell2() throws IOException {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", cellSize - 1);
}
@Test
public void testFileEqualsWithOneCell() throws IOException {
testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell", cellSize);
}
@Test
public void testFileSmallerThanOneStripe1() throws IOException {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", cellSize * dataBlocks - 1);
}
@Test
public void testFileSmallerThanOneStripe2() throws IOException {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", cellSize + 123);
}
@Test
public void testFileEqualsWithOneStripe() throws IOException {
testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe", cellSize * dataBlocks);
}
@Test
public void testFileMoreThanOneStripe1() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1", cellSize * dataBlocks + 123);
}
@Test
public void testFileMoreThanOneStripe2() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2", cellSize * dataBlocks
+ cellSize * dataBlocks + 123);
}
@Test
public void testFileFullBlockGroup() throws IOException {
testOneFileUsingDFSStripedInputStream("/FullBlockGroup", blockSize * dataBlocks);
}
@Test
public void testFileMoreThanABlockGroup1() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1", blockSize * dataBlocks + 123);
}
@Test
public void testFileMoreThanABlockGroup2() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2", blockSize * dataBlocks + cellSize+ 123);
}
@Test
public void testFileMoreThanABlockGroup3() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup3",
blockSize * dataBlocks * 3 + cellSize * dataBlocks
+ cellSize + 123);
}
private byte[] generateBytes(int cnt) {
byte[] bytes = new byte[cnt];
for (int i = 0; i < cnt; i++) {
bytes[i] = getByte(i);
}
return bytes;
}
private byte getByte(long pos) {
return (byte) (pos % mod + 1);
}
private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
throws IOException {
Path TestPath = new Path(src);
byte[] bytes = generateBytes(writeBytes);
DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
//check file length
FileStatus status = fs.getFileStatus(TestPath);
long fileLength = status.getLen();
Assert.assertEquals("File length should be the same",
writeBytes, fileLength);
DFSStripedInputStream dis = new DFSStripedInputStream(
fs.getClient(), src, true);
try {
byte[] buf = new byte[writeBytes + 100];
int readLen = dis.read(0, buf, 0, buf.length);
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
Assert.assertEquals("Byte at i should be the same",
getByte(i), buf[i]);
}
} finally {
dis.close();
}
}
}

View File

@ -159,7 +159,7 @@ private void testOneFile(String src, int writeBytes) throws IOException {
// check file length
FileStatus status = fs.getFileStatus(testPath);
Assert.assertEquals(writeBytes, status.getLen());
checkData(src, writeBytes);
}
@ -236,7 +236,7 @@ void checkData(String src, int writeBytes) throws IOException {
cellSize, dataBlockBytes, parityBlockBytes);
}
}
static void verifyParity(final long size, final int cellSize,
byte[][] dataBytes, byte[][] parityBytes) {
// verify the parity blocks

View File

@ -121,7 +121,6 @@ public void testPread() throws Exception {
}
DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
in.setCellSize(CELLSIZE);
int readSize = BLOCKSIZE;
byte[] readBuffer = new byte[readSize];
int ret = in.read(0, readBuffer, 0, readSize);