From cd458c38a0e85c8688be75f82e6f762c73777cf6 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Fri, 17 Apr 2015 17:55:19 -0700 Subject: [PATCH] HDFS-8166. DFSStripedOutputStream should not create empty blocks. Contributed by Jing Zhao. --- .../hadoop/hdfs/DFSStripedOutputStream.java | 163 +++++++++-------- .../hadoop/hdfs/StripedDataStreamer.java | 72 +++----- .../server/blockmanagement/BlockManager.java | 17 +- .../hdfs/TestDFSStripedOutputStream.java | 164 ++++++++++++------ 4 files changed, 237 insertions(+), 179 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index f11a657d73..7dc00919b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -22,10 +22,14 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -59,12 +63,12 @@ public class DFSStripedOutputStream extends DFSOutputStream { */ private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; private ByteBuffer[] cellBuffers; - private final short blockGroupBlocks = HdfsConstants.NUM_DATA_BLOCKS + private final short numAllBlocks = HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS; - private final short blockGroupDataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + private final short numDataBlocks = HdfsConstants.NUM_DATA_BLOCKS; private int curIdx = 0; /* bytes written in current block group */ - private long currentBlockGroupBytes = 0; + //private long currentBlockGroupBytes = 0; //TODO: Use ErasureCoder interface (HDFS-7781) private RawErasureEncoder encoder; @@ -73,10 +77,6 @@ private StripedDataStreamer getLeadingStreamer() { return streamers.get(0); } - private long getBlockGroupSize() { - return blockSize * HdfsConstants.NUM_DATA_BLOCKS; - } - /** Construct a new output stream for creating a file. */ DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet flag, Progressable progress, @@ -84,15 +84,13 @@ private long getBlockGroupSize() { throws IOException { super(dfsClient, src, stat, flag, progress, checksum, favoredNodes); DFSClient.LOG.info("Creating striped output stream"); - if (blockGroupBlocks <= 1) { - throw new IOException("The block group must contain more than one block."); - } + checkConfiguration(); - cellBuffers = new ByteBuffer[blockGroupBlocks]; + cellBuffers = new ByteBuffer[numAllBlocks]; List> stripeBlocks = new ArrayList<>(); - for (int i = 0; i < blockGroupBlocks; i++) { - stripeBlocks.add(new LinkedBlockingQueue(blockGroupBlocks)); + for (int i = 0; i < numAllBlocks; i++) { + stripeBlocks.add(new LinkedBlockingQueue(numAllBlocks)); try { cellBuffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize)); } catch (InterruptedException ie) { @@ -103,29 +101,38 @@ private long getBlockGroupSize() { } } encoder = new RSRawEncoder(); - encoder.initialize(blockGroupDataBlocks, - blockGroupBlocks - blockGroupDataBlocks, cellSize); + encoder.initialize(numDataBlocks, + numAllBlocks - numDataBlocks, cellSize); - streamers = new ArrayList<>(blockGroupBlocks); - for (short i = 0; i < blockGroupBlocks; i++) { + List s = new ArrayList<>(numAllBlocks); + for (short i = 0; i < numAllBlocks; i++) { StripedDataStreamer streamer = new StripedDataStreamer(stat, null, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager, i, stripeBlocks); if (favoredNodes != null && favoredNodes.length != 0) { streamer.setFavoredNodes(favoredNodes); } - streamers.add(streamer); + s.add(streamer); } + streamers = Collections.unmodifiableList(s); refreshStreamer(); } + private void checkConfiguration() { + if (cellSize % bytesPerChecksum != 0) { + throw new HadoopIllegalArgumentException("Invalid values: " + + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum + + ") must divide cell size (=" + cellSize + ")."); + } + } + private void refreshStreamer() { streamer = streamers.get(curIdx); } private void moveToNextStreamer() { - curIdx = (curIdx + 1) % blockGroupBlocks; + curIdx = (curIdx + 1) % numAllBlocks; refreshStreamer(); } @@ -136,20 +143,21 @@ private void moveToNextStreamer() { * @param buffers data buffers + parity buffers */ private void encode(ByteBuffer[] buffers) { - ByteBuffer[] dataBuffers = new ByteBuffer[blockGroupDataBlocks]; - ByteBuffer[] parityBuffers = new ByteBuffer[blockGroupBlocks - blockGroupDataBlocks]; - for (int i = 0; i < blockGroupBlocks; i++) { - if (i < blockGroupDataBlocks) { + ByteBuffer[] dataBuffers = new ByteBuffer[numDataBlocks]; + ByteBuffer[] parityBuffers = new ByteBuffer[numAllBlocks - numDataBlocks]; + for (int i = 0; i < numAllBlocks; i++) { + if (i < numDataBlocks) { dataBuffers[i] = buffers[i]; } else { - parityBuffers[i - blockGroupDataBlocks] = buffers[i]; + parityBuffers[i - numDataBlocks] = buffers[i]; } } encoder.encode(dataBuffers, parityBuffers); } /** - * Generate packets from a given buffer + * Generate packets from a given buffer. This is only used for streamers + * writing parity blocks. * * @param byteBuffer the given buffer to generate packets * @return packets generated @@ -185,7 +193,6 @@ protected synchronized void writeChunk(byte[] b, int offset, int len, throw new IOException(msg); } - // If current packet has not been enqueued for transmission, // but the cell buffer is full, we need to enqueue the packet if (currentPacket != null && getSizeOfCellnBuffer(curIdx) == cellSize) { @@ -213,13 +220,13 @@ protected synchronized void writeChunk(byte[] b, int offset, int len, //When all data cells in a stripe are ready, we need to encode //them and generate some parity cells. These cells will be //converted to packets and put to their DataStreamer's queue. - if (curIdx == blockGroupDataBlocks) { + if (curIdx == numDataBlocks) { //encode the data cells - for (int k = 0; k < blockGroupDataBlocks; k++) { + for (int k = 0; k < numDataBlocks; k++) { cellBuffers[k].flip(); } encode(cellBuffers); - for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) { + for (int i = numDataBlocks; i < numAllBlocks; i++) { ByteBuffer parityBuffer = cellBuffers[i]; List packets = generatePackets(parityBuffer); for (DFSPacket p : packets) { @@ -245,13 +252,24 @@ private int getSizeOfCellnBuffer(int cellIndex) { } private void clearCellBuffers() { - for (int i = 0; i< blockGroupBlocks; i++) { + for (int i = 0; i< numAllBlocks; i++) { cellBuffers[i].clear(); + if (i >= numDataBlocks) { + Arrays.fill(cellBuffers[i].array(), (byte) 0); + } } } private int stripeDataSize() { - return blockGroupDataBlocks * cellSize; + return numDataBlocks * cellSize; + } + + private long getCurrentBlockGroupBytes() { + long sum = 0; + for (int i = 0; i < numDataBlocks; i++) { + sum += streamers.get(i).getBytesCurBlock(); + } + return sum; } private void notSupported(String headMsg) @@ -270,7 +288,6 @@ public void hsync() throws IOException { notSupported("hsync"); } - @Override protected synchronized void start() { for (StripedDataStreamer streamer : streamers) { @@ -302,15 +319,11 @@ boolean isClosed() { // interrupt datastreamer if force is true @Override protected void closeThreads(boolean force) throws IOException { - StripedDataStreamer leadingStreamer = null; for (StripedDataStreamer streamer : streamers) { try { streamer.close(force); streamer.join(); streamer.closeSocket(); - if (streamer.isLeadingStreamer()) { - leadingStreamer = streamer; - } } catch (InterruptedException e) { throw new IOException("Failed to shutdown streamer"); } finally { @@ -318,40 +331,26 @@ protected void closeThreads(boolean force) throws IOException { setClosed(); } } - assert leadingStreamer != null : "One streamer should be leader"; - leadingStreamer.countTailingBlockGroupBytes(); } - @Override - public synchronized void write(int b) throws IOException { - super.write(b); - currentBlockGroupBytes = (currentBlockGroupBytes + 1) % getBlockGroupSize(); - } - - @Override - public synchronized void write(byte b[], int off, int len) - throws IOException { - super.write(b, off, len); - currentBlockGroupBytes = (currentBlockGroupBytes + len) % getBlockGroupSize(); - } - - private void writeParityCellsForLastStripe() throws IOException{ + private void writeParityCellsForLastStripe() throws IOException { + final long currentBlockGroupBytes = getCurrentBlockGroupBytes(); long parityBlkSize = StripedBlockUtil.getInternalBlockLength( - currentBlockGroupBytes, cellSize, blockGroupDataBlocks, - blockGroupDataBlocks + 1); + currentBlockGroupBytes, cellSize, numDataBlocks, + numDataBlocks + 1); if (parityBlkSize == 0 || currentBlockGroupBytes % stripeDataSize() == 0) { return; } int parityCellSize = parityBlkSize % cellSize == 0 ? cellSize : (int) (parityBlkSize % cellSize); - for (int i = 0; i < blockGroupBlocks; i++) { + for (int i = 0; i < numAllBlocks; i++) { long internalBlkLen = StripedBlockUtil.getInternalBlockLength( - currentBlockGroupBytes, cellSize, blockGroupDataBlocks, i); + currentBlockGroupBytes, cellSize, numDataBlocks, i); // Pad zero bytes to make all cells exactly the size of parityCellSize // If internal block is smaller than parity block, pad zero bytes. // Also pad zero bytes to all parity cells - if (internalBlkLen < parityBlkSize || i >= blockGroupDataBlocks) { + if (internalBlkLen < parityBlkSize || i >= numDataBlocks) { int position = cellBuffers[i].position(); assert position <= parityCellSize : "If an internal block is smaller" + " than parity block, then its last cell should be small than last" + @@ -365,9 +364,9 @@ private void writeParityCellsForLastStripe() throws IOException{ encode(cellBuffers); //write parity cells - curIdx = blockGroupDataBlocks; + curIdx = numDataBlocks; refreshStreamer(); - for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) { + for (int i = numDataBlocks; i < numAllBlocks; i++) { ByteBuffer parityBuffer = cellBuffers[i]; List packets = generatePackets(parityBuffer); for (DFSPacket p : packets) { @@ -385,7 +384,7 @@ private void writeParityCellsForLastStripe() throws IOException{ @Override void setClosed() { super.setClosed(); - for (int i = 0; i < blockGroupBlocks; i++) { + for (int i = 0; i < numAllBlocks; i++) { byteArrayManager.release(cellBuffers[i].array()); streamers.get(i).release(); } @@ -395,10 +394,11 @@ void setClosed() { protected synchronized void closeImpl() throws IOException { if (isClosed()) { IOException e = getLeadingStreamer().getLastException().getAndSet(null); - if (e == null) - return; - else + if (e != null) { throw e; + } else { + return; + } } try { @@ -408,14 +408,13 @@ protected synchronized void closeImpl() throws IOException { streamer.waitAndQueuePacket(currentPacket); currentPacket = null; } - //if the last stripe is incomplete, generate and write parity cells + // if the last stripe is incomplete, generate and write parity cells writeParityCellsForLastStripe(); - for (int i = 0; i < blockGroupBlocks; i++) { + for (int i = 0; i < numAllBlocks; i++) { curIdx = i; refreshStreamer(); - if (streamer.getBytesCurBlock()!= 0 || - currentBlockGroupBytes < getBlockGroupSize()) { + if (streamer.getBytesCurBlock() > 0) { // send an empty packet to mark the end of the block currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), true); @@ -425,9 +424,8 @@ currentBlockGroupBytes < getBlockGroupSize()) { flushInternal(); } - // get last block before destroying the streamer - ExtendedBlock lastBlock = streamers.get(0).getBlock(); closeThreads(false); + final ExtendedBlock lastBlock = getCommittedBlock(); TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER); try { completeFile(lastBlock); @@ -435,10 +433,35 @@ currentBlockGroupBytes < getBlockGroupSize()) { scope.close(); } dfsClient.endFileLease(fileId); - } catch (ClosedChannelException e) { + } catch (ClosedChannelException ignored) { } finally { setClosed(); } } + /** + * Generate the block which is reported and will be committed in NameNode. + * Need to go through all the streamers writing data blocks and add their + * bytesCurBlock together. Note that at this time all streamers have been + * closed. Also this calculation can cover streamers with writing failures. + * + * @return An ExtendedBlock with size of the whole block group. + */ + ExtendedBlock getCommittedBlock() throws IOException { + ExtendedBlock b = getLeadingStreamer().getBlock(); + if (b == null) { + return null; + } + final ExtendedBlock block = new ExtendedBlock(b); + final boolean atBlockGroupBoundary = + getLeadingStreamer().getBytesCurBlock() == 0 && + getLeadingStreamer().getBlock() != null && + getLeadingStreamer().getBlock().getNumBytes() > 0; + for (int i = 1; i < numDataBlocks; i++) { + block.setNumBytes(block.getNumBytes() + + (atBlockGroupBoundary ? streamers.get(i).getBlock().getNumBytes() : + streamers.get(i).getBytesCurBlock())); + } + return block; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index 56148529ae..19c205e27b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; @@ -37,6 +36,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS; + /**************************************************************************** * The StripedDataStreamer class is used by {@link DFSStripedOutputStream}. * There are two kinds of StripedDataStreamer, leading streamer and ordinary @@ -47,9 +50,7 @@ ****************************************************************************/ public class StripedDataStreamer extends DataStreamer { private final short index; - private final List> stripedBlocks; - private static short blockGroupSize = HdfsConstants.NUM_DATA_BLOCKS - + HdfsConstants.NUM_PARITY_BLOCKS; + private final List> stripedBlocks; private boolean hasCommittedBlock = false; StripedDataStreamer(HdfsFileStatus stat, ExtendedBlock block, @@ -88,66 +89,38 @@ public boolean isLeadingStreamer () { } private boolean isParityStreamer() { - return index >= HdfsConstants.NUM_DATA_BLOCKS; + return index >= NUM_DATA_BLOCKS; } @Override protected void endBlock() { if (!isLeadingStreamer() && !isParityStreamer()) { - //before retrieving a new block, transfer the finished block to - //leading streamer + // before retrieving a new block, transfer the finished block to + // leading streamer LocatedBlock finishedBlock = new LocatedBlock( new ExtendedBlock(block.getBlockPoolId(), block.getBlockId(), - block.getNumBytes(),block.getGenerationStamp()), null); - try{ + block.getNumBytes(), block.getGenerationStamp()), null); + try { boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30, TimeUnit.SECONDS); - }catch (InterruptedException ie) { - //TODO: Handle InterruptedException (HDFS-7786) + } catch (InterruptedException ie) { + // TODO: Handle InterruptedException (HDFS-7786) } } super.endBlock(); } - /** - * This function is called after the streamer is closed. - */ - void countTailingBlockGroupBytes () throws IOException { - if (isLeadingStreamer()) { - //when committing a block group, leading streamer has to adjust - // {@link block} including the size of block group - for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) { - try { - LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30, - TimeUnit.SECONDS); - if (finishedLocatedBlock == null) { - throw new IOException("Fail to get finished LocatedBlock " + - "from streamer, i=" + i); - } - ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock(); - long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes(); - if (block != null) { - block.setNumBytes(block.getNumBytes() + bytes); - } - } catch (InterruptedException ie) { - DFSClient.LOG.info("InterruptedException received when " + - "putting a block to stripeBlocks, ie = " + ie); - } - } - } - } - @Override protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException { LocatedBlock lb = null; if (isLeadingStreamer()) { - if(hasCommittedBlock) { + if (hasCommittedBlock) { /** * when committing a block group, leading streamer has to adjust * {@link block} to include the size of block group */ - for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) { + for (int i = 1; i < NUM_DATA_BLOCKS; i++) { try { LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30, TimeUnit.SECONDS); @@ -157,7 +130,7 @@ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) } ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock(); long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes(); - if(block != null) { + if (block != null) { block.setNumBytes(block.getNumBytes() + bytes); } } catch (InterruptedException ie) { @@ -171,14 +144,13 @@ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) hasCommittedBlock = true; assert lb instanceof LocatedStripedBlock; DFSClient.LOG.debug("Leading streamer obtained bg " + lb); - LocatedBlock[] blocks = StripedBlockUtil. - parseStripedBlockGroup((LocatedStripedBlock) lb, - HdfsConstants.BLOCK_STRIPED_CELL_SIZE, HdfsConstants.NUM_DATA_BLOCKS, - HdfsConstants.NUM_PARITY_BLOCKS - ); - assert blocks.length == blockGroupSize : + LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( + (LocatedStripedBlock) lb, BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, + NUM_PARITY_BLOCKS); + assert blocks.length == (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) : "Fail to get block group from namenode: blockGroupSize: " + - blockGroupSize + ", blocks.length: " + blocks.length; + (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) + ", blocks.length: " + + blocks.length; lb = blocks[0]; for (int i = 1; i < blocks.length; i++) { try { @@ -199,7 +171,7 @@ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) } } else { try { - //wait 90 seconds to get a block from the queue + // wait 90 seconds to get a block from the queue lb = stripedBlocks.get(index).poll(90, TimeUnit.SECONDS); } catch (InterruptedException ie) { DFSClient.LOG.info("InterruptedException received when retrieving " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 00ee0a7349..90507e99bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -598,8 +599,20 @@ public int getDefaultStorageNum(BlockInfo block) { } public short getMinStorageNum(BlockInfo block) { - return block.isStriped() ? - ((BlockInfoStriped) block).getDataBlockNum() : minReplication; + if (block.isStriped()) { + final BlockInfoStriped sblock = (BlockInfoStriped) block; + short dataBlockNum = sblock.getDataBlockNum(); + if (sblock.isComplete() || + sblock.getBlockUCState() == BlockUCState.COMMITTED) { + // if the sblock is committed/completed and its length is less than a + // full stripe, the minimum storage number needs to be adjusted + dataBlockNum = (short) Math.min(dataBlockNum, + (sblock.getNumBytes() - 1) / HdfsConstants.BLOCK_STRIPED_CELL_SIZE + 1); + } + return dataBlockNum; + } else { + return minReplication; + } } public boolean hasMinStorage(BlockInfo block) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java index 4a09bda6ec..cc20f4055b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -1,5 +1,6 @@ package org.apache.hadoop.hdfs; +import java.nio.ByteBuffer; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -20,6 +21,8 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; import org.junit.After; @@ -42,8 +45,8 @@ public class TestDFSStripedOutputStream { private DistributedFileSystem fs; private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; private final int stripesPerBlock = 4; - int blockSize = cellSize * stripesPerBlock; - private int mod = 29; + private final int blockSize = cellSize * stripesPerBlock; + private final RawErasureEncoder encoder = new RSRawEncoder(); @Before public void setup() throws IOException { @@ -53,6 +56,7 @@ public void setup() throws IOException { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster.getFileSystem().getClient().createErasureCodingZone("/", null); fs = cluster.getFileSystem(); + encoder.initialize(dataBlocks, parityBlocks, cellSize); } @After @@ -144,60 +148,27 @@ private byte[] generateBytes(int cnt) { } private byte getByte(long pos) { + int mod = 29; return (byte) (pos % mod + 1); } - private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes) - throws IOException { - Path TestPath = new Path(src); - byte[] bytes = generateBytes(writeBytes); - DFSTestUtil.writeFile(fs, TestPath, new String(bytes)); - - //check file length - FileStatus status = fs.getFileStatus(TestPath); - long fileLength = status.getLen(); - if (fileLength != writeBytes) { - Assert.fail("File Length error: expect=" + writeBytes - + ", actual=" + fileLength); - } - - DFSStripedInputStream dis = new DFSStripedInputStream( - fs.getClient(), src, true); - byte[] buf = new byte[writeBytes + 100]; - int readLen = dis.read(0, buf, 0, buf.length); - readLen = readLen >= 0 ? readLen : 0; - if (readLen != writeBytes) { - Assert.fail("The length of file is not correct."); - } - - for (int i = 0; i < writeBytes; i++) { - if (getByte(i) != buf[i]) { - Assert.fail("Byte at i = " + i + " is wrongly written."); - } - } - } - private void testOneFile(String src, int writeBytes) throws IOException { - Path TestPath = new Path(src); + Path testPath = new Path(src); - int allBlocks = dataBlocks + parityBlocks; byte[] bytes = generateBytes(writeBytes); - DFSTestUtil.writeFile(fs, TestPath, new String(bytes)); + DFSTestUtil.writeFile(fs, testPath, new String(bytes)); - //check file length - FileStatus status = fs.getFileStatus(TestPath); + // check file length + FileStatus status = fs.getFileStatus(testPath); long fileLength = status.getLen(); - if (fileLength != writeBytes) { - Assert.fail("File Length error: expect=" + writeBytes - + ", actual=" + fileLength); - } + Assert.assertEquals(writeBytes, fileLength); List> blockGroupList = new ArrayList<>(); LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L); for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) { - assert firstBlock instanceof LocatedStripedBlock; + Assert.assertTrue(firstBlock instanceof LocatedStripedBlock); LocatedBlock[] blocks = StripedBlockUtil. parseStripedBlockGroup((LocatedStripedBlock) firstBlock, cellSize, dataBlocks, parityBlocks); @@ -205,15 +176,14 @@ private void testOneFile(String src, int writeBytes) blockGroupList.add(oneGroup); } - //test each block group + // test each block group for (int group = 0; group < blockGroupList.size(); group++) { //get the data of this block List blockList = blockGroupList.get(group); byte[][] dataBlockBytes = new byte[dataBlocks][]; - byte[][] parityBlockBytes = new byte[allBlocks - dataBlocks][]; + byte[][] parityBlockBytes = new byte[parityBlocks][]; - - //for each block, use BlockReader to read data + // for each block, use BlockReader to read data for (int i = 0; i < blockList.size(); i++) { LocatedBlock lblock = blockList.get(i); if (lblock == null) { @@ -269,19 +239,20 @@ public Peer newConnectedPeer(InetSocketAddress addr, } }).build(); - blockReader.readAll(blockBytes, 0, (int)block.getNumBytes()); + blockReader.readAll(blockBytes, 0, (int) block.getNumBytes()); blockReader.close(); } - //check if we write the data correctly - for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length; blkIdxInGroup++) { - byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup]; + // check if we write the data correctly + for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length; + blkIdxInGroup++) { + final byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup]; if (actualBlkBytes == null) { continue; } for (int posInBlk = 0; posInBlk < actualBlkBytes.length; posInBlk++) { byte expected; - //calculate the postion of this byte in the file + // calculate the position of this byte in the file long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(cellSize, dataBlocks, posInBlk, blkIdxInGroup) + group * blockSize * dataBlocks; @@ -291,15 +262,94 @@ public Peer newConnectedPeer(InetSocketAddress addr, expected = getByte(posInFile); } - if (expected != actualBlkBytes[posInBlk]) { - Assert.fail("Unexpected byte " + actualBlkBytes[posInBlk] + ", expect " + expected - + ". Block group index is " + group + - ", stripe index is " + posInBlk / cellSize + - ", cell index is " + blkIdxInGroup + ", byte index is " + posInBlk % cellSize); - } + String s = "Unexpected byte " + actualBlkBytes[posInBlk] + + ", expect " + expected + + ". Block group index is " + group + + ", stripe index is " + posInBlk / cellSize + + ", cell index is " + blkIdxInGroup + + ", byte index is " + posInBlk % cellSize; + Assert.assertEquals(s, expected, actualBlkBytes[posInBlk]); } } + + // verify the parity blocks + final ByteBuffer[] parityBuffers = new ByteBuffer[parityBlocks]; + final long groupSize = lbs.getLocatedBlocks().get(group).getBlockSize(); + int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(groupSize, + cellSize, dataBlocks, dataBlocks); + for (int i = 0; i < parityBlocks; i++) { + parityBuffers[i] = ByteBuffer.allocate(parityBlkSize); + } + final int numStripes = (int) (groupSize - 1) / stripeDataSize() + 1; + for (int i = 0; i < numStripes; i++) { + final int parityCellSize = i < numStripes - 1 || parityBlkSize % cellSize == 0 + ? cellSize : parityBlkSize % cellSize; + ByteBuffer[] stripeBuf = new ByteBuffer[dataBlocks]; + for (int k = 0; k < stripeBuf.length; k++) { + stripeBuf[k] = ByteBuffer.allocate(cellSize); + } + for (int j = 0; j < dataBlocks; j++) { + if (dataBlockBytes[j] != null) { + int length = Math.min(cellSize, + dataBlockBytes[j].length - cellSize * i); + if (length > 0) { + stripeBuf[j].put(dataBlockBytes[j], cellSize * i, length); + } + } + final long pos = stripeBuf[j].position(); + for (int k = 0; k < parityCellSize - pos; k++) { + stripeBuf[j].put((byte) 0); + } + stripeBuf[j].flip(); + } + ByteBuffer[] parityBuf = new ByteBuffer[parityBlocks]; + for (int j = 0; j < parityBlocks; j++) { + parityBuf[j] = ByteBuffer.allocate(cellSize); + for (int k = 0; k < parityCellSize; k++) { + parityBuf[j].put((byte) 0); + } + parityBuf[j].flip(); + } + + encoder.encode(stripeBuf, parityBuf); + for (int j = 0; j < parityBlocks; j++) { + parityBuffers[j].put(parityBuf[j]); + } + } + + for (int i = 0; i < parityBlocks; i++) { + Assert.assertArrayEquals(parityBuffers[i].array(), parityBlockBytes[i]); + } } } + private void testReadWriteOneFile(String src, int writeBytes) + throws IOException { + Path TestPath = new Path(src); + byte[] bytes = generateBytes(writeBytes); + DFSTestUtil.writeFile(fs, TestPath, new String(bytes)); + + //check file length + FileStatus status = fs.getFileStatus(TestPath); + long fileLength = status.getLen(); + if (fileLength != writeBytes) { + Assert.fail("File Length error: expect=" + writeBytes + + ", actual=" + fileLength); + } + + DFSStripedInputStream dis = new DFSStripedInputStream( + fs.getClient(), src, true); + byte[] buf = new byte[writeBytes + 100]; + int readLen = dis.read(0, buf, 0, buf.length); + readLen = readLen >= 0 ? readLen : 0; + if (readLen != writeBytes) { + Assert.fail("The length of file is not correct."); + } + + for (int i = 0; i < writeBytes; i++) { + if (getByte(i) != buf[i]) { + Assert.fail("Byte at i = " + i + " is wrongly written."); + } + } + } }