diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index b2faac048f..8977c46989 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -119,3 +119,6 @@ HDFS-8156. Add/implement necessary APIs even we just have the system default schema. (Kai Zheng via Zhe Zhang) + + HDFS-8136. Client gets and uses EC schema when reads and writes a stripping + file. (Kai Sasaki via Kai Zheng) \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index d597407305..d0e2b6822d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -21,9 +21,9 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.protocol.ECInfo; import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.net.NetUtils; @@ -125,13 +125,19 @@ static ReadPortion[] planReadPortions(final int dataBlkNum, return results; } - private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; - private final short dataBlkNum = HdfsConstants.NUM_DATA_BLOCKS; - private final short parityBlkNum = HdfsConstants.NUM_PARITY_BLOCKS; + private final int cellSize; + private final short dataBlkNum; + private final short parityBlkNum; + private final ECInfo ecInfo; DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum) throws IOException { super(dfsClient, src, verifyChecksum); + // ECInfo is restored from NN just before reading striped file. + ecInfo = dfsClient.getErasureCodingInfo(src); + cellSize = ecInfo.getSchema().getChunkSize(); + dataBlkNum = (short)ecInfo.getSchema().getNumDataUnits(); + parityBlkNum = (short)ecInfo.getSchema().getNumParityUnits(); DFSClient.LOG.debug("Creating an striped input stream for file " + src); } @@ -279,9 +285,6 @@ private void waitNextCompletion(CompletionService stripedReadsService, throw new InterruptedException("let's retry"); } - public void setCellSize(int cellSize) { - this.cellSize = cellSize; - } /** * This class represents the portion of I/O associated with each block in the diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 7dc00919b8..eeb9d7ea96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -32,8 +32,8 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.hdfs.protocol.ECInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.util.StripedBlockUtil; @@ -61,11 +61,13 @@ public class DFSStripedOutputStream extends DFSOutputStream { /** * Size of each striping cell, must be a multiple of bytesPerChecksum */ - private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final ECInfo ecInfo; + private final int cellSize; private ByteBuffer[] cellBuffers; - private final short numAllBlocks = HdfsConstants.NUM_DATA_BLOCKS - + HdfsConstants.NUM_PARITY_BLOCKS; - private final short numDataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + + private final short numAllBlocks; + private final short numDataBlocks; + private int curIdx = 0; /* bytes written in current block group */ //private long currentBlockGroupBytes = 0; @@ -77,6 +79,10 @@ private StripedDataStreamer getLeadingStreamer() { return streamers.get(0); } + private long getBlockGroupSize() { + return blockSize * numDataBlocks; + } + /** Construct a new output stream for creating a file. */ DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet flag, Progressable progress, @@ -84,6 +90,14 @@ private StripedDataStreamer getLeadingStreamer() { throws IOException { super(dfsClient, src, stat, flag, progress, checksum, favoredNodes); DFSClient.LOG.info("Creating striped output stream"); + + // ECInfo is restored from NN just before writing striped files. + ecInfo = dfsClient.getErasureCodingInfo(src); + cellSize = ecInfo.getSchema().getChunkSize(); + numAllBlocks = (short)(ecInfo.getSchema().getNumDataUnits() + + ecInfo.getSchema().getNumParityUnits()); + numDataBlocks = (short)ecInfo.getSchema().getNumDataUnits(); + checkConfiguration(); cellBuffers = new ByteBuffer[numAllBlocks]; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java new file mode 100644 index 0000000000..6af4a7f330 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +public class TestDFSStripedInputStream { + private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; + + + private static DistributedFileSystem fs; + private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final static int stripesPerBlock = 4; + static int blockSize = cellSize * stripesPerBlock; + private int mod = 29; + static int numDNs = dataBlocks + parityBlocks + 2; + + private static MiniDFSCluster cluster; + private static Configuration conf; + + @BeforeClass + public static void setup() throws IOException { + conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + cluster + = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();; + cluster.getFileSystem().getClient().createErasureCodingZone("/", null); + fs = cluster.getFileSystem(); + } + + @AfterClass + public static void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testFileEmpty() throws IOException { + testOneFileUsingDFSStripedInputStream("/EmptyFile", 0); + } + + @Test + public void testFileSmallerThanOneCell1() throws IOException { + testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", 1); + } + + @Test + public void testFileSmallerThanOneCell2() throws IOException { + testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", cellSize - 1); + } + + @Test + public void testFileEqualsWithOneCell() throws IOException { + testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell", cellSize); + } + + @Test + public void testFileSmallerThanOneStripe1() throws IOException { + testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", cellSize * dataBlocks - 1); + } + + @Test + public void testFileSmallerThanOneStripe2() throws IOException { + testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", cellSize + 123); + } + + @Test + public void testFileEqualsWithOneStripe() throws IOException { + testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe", cellSize * dataBlocks); + } + + @Test + public void testFileMoreThanOneStripe1() throws IOException { + testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1", cellSize * dataBlocks + 123); + } + + @Test + public void testFileMoreThanOneStripe2() throws IOException { + testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2", cellSize * dataBlocks + + cellSize * dataBlocks + 123); + } + + @Test + public void testFileFullBlockGroup() throws IOException { + testOneFileUsingDFSStripedInputStream("/FullBlockGroup", blockSize * dataBlocks); + } + + @Test + public void testFileMoreThanABlockGroup1() throws IOException { + testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1", blockSize * dataBlocks + 123); + } + + @Test + public void testFileMoreThanABlockGroup2() throws IOException { + testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2", blockSize * dataBlocks + cellSize+ 123); + } + + + @Test + public void testFileMoreThanABlockGroup3() throws IOException { + testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup3", + blockSize * dataBlocks * 3 + cellSize * dataBlocks + + cellSize + 123); + } + + private byte[] generateBytes(int cnt) { + byte[] bytes = new byte[cnt]; + for (int i = 0; i < cnt; i++) { + bytes[i] = getByte(i); + } + return bytes; + } + + private byte getByte(long pos) { + return (byte) (pos % mod + 1); + } + + private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes) + throws IOException { + Path TestPath = new Path(src); + byte[] bytes = generateBytes(writeBytes); + DFSTestUtil.writeFile(fs, TestPath, new String(bytes)); + + //check file length + FileStatus status = fs.getFileStatus(TestPath); + long fileLength = status.getLen(); + Assert.assertEquals("File length should be the same", + writeBytes, fileLength); + + DFSStripedInputStream dis = new DFSStripedInputStream( + fs.getClient(), src, true); + try { + byte[] buf = new byte[writeBytes + 100]; + int readLen = dis.read(0, buf, 0, buf.length); + readLen = readLen >= 0 ? readLen : 0; + Assert.assertEquals("The length of file should be the same to write size", + writeBytes, readLen); + for (int i = 0; i < writeBytes; i++) { + Assert.assertEquals("Byte at i should be the same", + getByte(i), buf[i]); + } + } finally { + dis.close(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java index c2131834b8..26f6d2c22d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -159,7 +159,7 @@ private void testOneFile(String src, int writeBytes) throws IOException { // check file length FileStatus status = fs.getFileStatus(testPath); Assert.assertEquals(writeBytes, status.getLen()); - + checkData(src, writeBytes); } @@ -236,7 +236,7 @@ void checkData(String src, int writeBytes) throws IOException { cellSize, dataBlockBytes, parityBlockBytes); } } - + static void verifyParity(final long size, final int cellSize, byte[][] dataBytes, byte[][] parityBytes) { // verify the parity blocks diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java index 90488c19ea..b0631cedb6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java @@ -121,7 +121,6 @@ public void testPread() throws Exception { } DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(), filePath.toString(), false); - in.setCellSize(CELLSIZE); int readSize = BLOCKSIZE; byte[] readBuffer = new byte[readSize]; int ret = in.read(0, readBuffer, 0, readSize);