diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 7c2dcf94b7..ecf74f7abb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -1151,9 +1151,9 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, for (int i = 0; i < offsets.length; i++) { int nread = reader.readAll(buf, offsets[i], lengths[i]); updateReadStatistics(readStatistics, nread, reader); - if (nread != len) { + if (nread != lengths[i]) { throw new IOException("truncated return from reader.read(): " + - "excpected " + len + ", got " + nread); + "excpected " + lengths[i] + ", got " + nread); } } DFSClientFaultInjector.get().readFromDatanodeDelay(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 8a431b1ba8..d597407305 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.net.NetUtils; import org.apache.htrace.Span; import org.apache.htrace.Trace; @@ -50,7 +51,7 @@ * * | <- Striped Block Group -> | * blk_0 blk_1 blk_2 <- A striped block group has - * | | | {@link #groupSize} blocks + * | | | {@link #dataBlkNum} blocks * v v v * +------+ +------+ +------+ * |cell_0| |cell_1| |cell_2| <- The logical read order should be @@ -72,7 +73,7 @@ public class DFSStripedInputStream extends DFSInputStream { /** * This method plans the read portion from each block in the stripe - * @param groupSize The size / width of the striping group + * @param dataBlkNum The number of data blocks in the striping group * @param cellSize The size of each striping cell * @param startInBlk Starting offset in the striped block * @param len Length of the read request @@ -81,29 +82,29 @@ public class DFSStripedInputStream extends DFSInputStream { * for an individual block in the group */ @VisibleForTesting - static ReadPortion[] planReadPortions(final int groupSize, + static ReadPortion[] planReadPortions(final int dataBlkNum, final int cellSize, final long startInBlk, final int len, int bufOffset) { - ReadPortion[] results = new ReadPortion[groupSize]; - for (int i = 0; i < groupSize; i++) { + ReadPortion[] results = new ReadPortion[dataBlkNum]; + for (int i = 0; i < dataBlkNum; i++) { results[i] = new ReadPortion(); } // cellIdxInBlk is the index of the cell in the block // E.g., cell_3 is the 2nd cell in blk_0 - int cellIdxInBlk = (int) (startInBlk / (cellSize * groupSize)); + int cellIdxInBlk = (int) (startInBlk / (cellSize * dataBlkNum)); // blkIdxInGroup is the index of the block in the striped block group // E.g., blk_2 is the 3rd block in the group - final int blkIdxInGroup = (int) (startInBlk / cellSize % groupSize); + final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum); results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk + startInBlk % cellSize; boolean crossStripe = false; - for (int i = 1; i < groupSize; i++) { - if (blkIdxInGroup + i >= groupSize && !crossStripe) { + for (int i = 1; i < dataBlkNum; i++) { + if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) { cellIdxInBlk++; crossStripe = true; } - results[(blkIdxInGroup + i) % groupSize].startOffsetInBlock = + results[(blkIdxInGroup + i) % dataBlkNum].startOffsetInBlock = cellSize * cellIdxInBlk; } @@ -112,57 +113,21 @@ static ReadPortion[] planReadPortions(final int groupSize, results[blkIdxInGroup].lengths.add(firstCellLen); results[blkIdxInGroup].readLength += firstCellLen; - int i = (blkIdxInGroup + 1) % groupSize; + int i = (blkIdxInGroup + 1) % dataBlkNum; for (int done = firstCellLen; done < len; done += cellSize) { ReadPortion rp = results[i]; rp.offsetsInBuf.add(done + bufOffset); final int readLen = Math.min(len - done, cellSize); rp.lengths.add(readLen); rp.readLength += readLen; - i = (i + 1) % groupSize; + i = (i + 1) % dataBlkNum; } return results; } - /** - * This method parses a striped block group into individual blocks. - * - * @param bg The striped block group - * @param dataBlkNum the number of data blocks - * @return An array containing the blocks in the group - */ - @VisibleForTesting - static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg, - int dataBlkNum, int cellSize) { - int locatedBGSize = bg.getBlockIndices().length; - // TODO not considering missing blocks for now, only identify data blocks - LocatedBlock[] lbs = new LocatedBlock[dataBlkNum]; - for (short i = 0; i < locatedBGSize; i++) { - final int idx = bg.getBlockIndices()[i]; - if (idx < dataBlkNum && lbs[idx] == null) { - lbs[idx] = constructInternalBlock(bg, i, cellSize, idx); - } - } - return lbs; - } - - private static LocatedBlock constructInternalBlock(LocatedStripedBlock bg, - int idxInReturnedLocs, int cellSize, int idxInBlockGroup) { - final ExtendedBlock blk = new ExtendedBlock(bg.getBlock()); - blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup); - // TODO: fix the numBytes computation - - return new LocatedBlock(blk, - new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]}, - new String[]{bg.getStorageIDs()[idxInReturnedLocs]}, - new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]}, - bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(), - null); - } - - private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; - private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS; + private final short dataBlkNum = HdfsConstants.NUM_DATA_BLOCKS; + private final short parityBlkNum = HdfsConstants.NUM_PARITY_BLOCKS; DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum) throws IOException { @@ -199,7 +164,7 @@ protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException { "LocatedStripedBlock for a striped file"; int idx = (int) (((blkStartOffset - lb.getStartOffset()) / cellSize) - % groupSize); + % dataBlkNum); // If indexing information is returned, iterate through the index array // to find the entry for position idx in the group LocatedStripedBlock lsb = (LocatedStripedBlock) lb; @@ -213,7 +178,8 @@ protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException { DFSClient.LOG.debug("getBlockAt for striped blocks, offset=" + blkStartOffset + ". Obtained block " + lb + ", idx=" + idx); } - return constructInternalBlock(lsb, i, cellSize, idx); + return StripedBlockUtil.constructInternalBlock(lsb, i, cellSize, + dataBlkNum, idx); } private LocatedBlock getBlockGroupAt(long offset) throws IOException { @@ -240,13 +206,14 @@ protected void fetchBlockByteRange(long blockStartOffset, long start, LocatedStripedBlock blockGroup = (LocatedStripedBlock) block; // Planning the portion of I/O for each shard - ReadPortion[] readPortions = planReadPortions(groupSize, cellSize, start, + ReadPortion[] readPortions = planReadPortions(dataBlkNum, cellSize, start, len, offset); // Parse group to get chosen DN location - LocatedBlock[] blks = parseStripedBlockGroup(blockGroup, groupSize, cellSize); + LocatedBlock[] blks = StripedBlockUtil. + parseStripedBlockGroup(blockGroup, cellSize, dataBlkNum, parityBlkNum); - for (short i = 0; i < groupSize; i++) { + for (short i = 0; i < dataBlkNum; i++) { ReadPortion rp = readPortions[i]; if (rp.readLength <= 0) { continue; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 1d0e1beef3..f11a657d73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.util.DataChecksum; @@ -309,10 +310,7 @@ protected void closeThreads(boolean force) throws IOException { streamer.closeSocket(); if (streamer.isLeadingStreamer()) { leadingStreamer = streamer; - } else { - streamer.countTailingBlockGroupBytes(); } - } catch (InterruptedException e) { throw new IOException("Failed to shutdown streamer"); } finally { @@ -320,6 +318,7 @@ protected void closeThreads(boolean force) throws IOException { setClosed(); } } + assert leadingStreamer != null : "One streamer should be leader"; leadingStreamer.countTailingBlockGroupBytes(); } @@ -337,23 +336,28 @@ public synchronized void write(byte b[], int off, int len) } private void writeParityCellsForLastStripe() throws IOException{ - if(currentBlockGroupBytes == 0 || - currentBlockGroupBytes % stripeDataSize() == 0) + long parityBlkSize = StripedBlockUtil.getInternalBlockLength( + currentBlockGroupBytes, cellSize, blockGroupDataBlocks, + blockGroupDataBlocks + 1); + if (parityBlkSize == 0 || currentBlockGroupBytes % stripeDataSize() == 0) { return; - int lastStripeLen =(int)(currentBlockGroupBytes % stripeDataSize()); - // Size of parity cells should equal the size of the first cell, if it - // is not full. - int parityCellSize = cellSize; - int index = lastStripeLen / cellSize; - if (lastStripeLen < cellSize) { - parityCellSize = lastStripeLen; - index++; } + int parityCellSize = parityBlkSize % cellSize == 0 ? cellSize : + (int) (parityBlkSize % cellSize); + for (int i = 0; i < blockGroupBlocks; i++) { - if (i >= index) { + long internalBlkLen = StripedBlockUtil.getInternalBlockLength( + currentBlockGroupBytes, cellSize, blockGroupDataBlocks, i); + // Pad zero bytes to make all cells exactly the size of parityCellSize + // If internal block is smaller than parity block, pad zero bytes. + // Also pad zero bytes to all parity cells + if (internalBlkLen < parityBlkSize || i >= blockGroupDataBlocks) { int position = cellBuffers[i].position(); + assert position <= parityCellSize : "If an internal block is smaller" + + " than parity block, then its last cell should be small than last" + + " parity cell"; for (int j = 0; j < parityCellSize - position; j++) { - cellBuffers[i].put((byte)0); + cellBuffers[i].put((byte) 0); } } cellBuffers[i].flip(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index 710d92d736..56148529ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -19,16 +19,16 @@ package org.apache.hadoop.hdfs; import java.util.List; -import org.apache.hadoop.fs.StorageType; + import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.util.ByteArrayManager; -import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; @@ -134,19 +134,7 @@ void countTailingBlockGroupBytes () throws IOException { "putting a block to stripeBlocks, ie = " + ie); } } - } else if (!isParityStreamer()) { - if (block == null || block.getNumBytes() == 0) { - LocatedBlock finishedBlock = new LocatedBlock(null, null); - try { - boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30, - TimeUnit.SECONDS); - } catch (InterruptedException ie) { - //TODO: Handle InterruptedException (HDFS-7786) - ie.printStackTrace(); - } - } } - } @Override @@ -155,8 +143,10 @@ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) LocatedBlock lb = null; if (isLeadingStreamer()) { if(hasCommittedBlock) { - //when committing a block group, leading streamer has to adjust - // {@link block} including the size of block group + /** + * when committing a block group, leading streamer has to adjust + * {@link block} to include the size of block group + */ for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) { try { LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30, @@ -179,7 +169,13 @@ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) lb = super.locateFollowingBlock(excludedNodes); hasCommittedBlock = true; - LocatedBlock[] blocks = unwrapBlockGroup(lb); + assert lb instanceof LocatedStripedBlock; + DFSClient.LOG.debug("Leading streamer obtained bg " + lb); + LocatedBlock[] blocks = StripedBlockUtil. + parseStripedBlockGroup((LocatedStripedBlock) lb, + HdfsConstants.BLOCK_STRIPED_CELL_SIZE, HdfsConstants.NUM_DATA_BLOCKS, + HdfsConstants.NUM_PARITY_BLOCKS + ); assert blocks.length == blockGroupSize : "Fail to get block group from namenode: blockGroupSize: " + blockGroupSize + ", blocks.length: " + blocks.length; @@ -212,30 +208,4 @@ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) } return lb; } - - /** - * Generate other blocks in a block group according to the first one. - * - * @param firstBlockInGroup the first block in a block group - * @return other blocks in this group - */ - public static LocatedBlock[] unwrapBlockGroup( - final LocatedBlock firstBlockInGroup) { - ExtendedBlock eb = firstBlockInGroup.getBlock(); - DatanodeInfo[] locs = firstBlockInGroup.getLocations(); - String[] storageIDs = firstBlockInGroup.getStorageIDs(); - StorageType[] storageTypes = firstBlockInGroup.getStorageTypes(); - Token blockToken = firstBlockInGroup.getBlockToken(); - LocatedBlock[] blocksInGroup = new LocatedBlock[locs.length]; - for (int i = 0; i < blocksInGroup.length; i++) { - //each block in a group has the same number of bytes and timestamp - ExtendedBlock extendedBlock = new ExtendedBlock(eb.getBlockPoolId(), - eb.getBlockId() + i, eb.getNumBytes(), eb.getGenerationStamp()); - blocksInGroup[i] = new LocatedBlock(extendedBlock, - new DatanodeInfo[] {locs[i]}, new String[]{storageIDs[i]}, - new StorageType[] {storageTypes[i]}); - blocksInGroup[i].setBlockToken(blockToken); - } - return blocksInGroup; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index fd8c68492d..147cc31341 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Daemon; @@ -1974,8 +1975,8 @@ public boolean processReport(final DatanodeID nodeID, metrics.addBlockReport((int) (endTime - startTime)); } blockLog.info("BLOCK* processReport: from storage {} node {}, " + - "blocks: {}, hasStaleStorage: {}, processing time: {} msecs", storage - .getStorageID(), nodeID, newReport.getNumberOfBlocks(), + "blocks: {}, hasStaleStorage: {}, processing time: {} msecs", storage + .getStorageID(), nodeID, newReport.getNumberOfBlocks(), node.hasStaleStorages(), (endTime - startTime)); return !node.hasStaleStorages(); } @@ -2002,8 +2003,8 @@ private void removeZombieReplicas(BlockReportContext context, assert(zombie.numBlocks() == 0); LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " + "which no longer exists on the DataNode.", - Long.toHexString(context.getReportId()), prevBlocks, - zombie.getStorageID()); + Long.toHexString(context.getReportId()), prevBlocks, + zombie.getStorageID()); } /** @@ -2487,7 +2488,22 @@ private BlockToMarkCorrupt checkReplicaCorrupt( "block is " + ucState + " and reported genstamp " + reportedGS + " does not match genstamp in block map " + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); - } else if (storedBlock.getNumBytes() != reported.getNumBytes()) { + } + boolean wrongSize; + if (storedBlock.isStriped()) { + assert BlockIdManager.isStripedBlockID(reported.getBlockId()); + assert storedBlock.getBlockId() == + BlockIdManager.convertToStripedID(reported.getBlockId()); + BlockInfoStriped stripedBlock = (BlockInfoStriped) storedBlock; + int reportedBlkIdx = BlockIdManager.getBlockIndex(reported); + wrongSize = reported.getNumBytes() != + getInternalBlockLength(stripedBlock.getNumBytes(), + HdfsConstants.BLOCK_STRIPED_CELL_SIZE, + stripedBlock.getDataBlockNum(), reportedBlkIdx); + } else { + wrongSize = storedBlock.getNumBytes() != reported.getNumBytes(); + } + if (wrongSize) { return new BlockToMarkCorrupt(new Block(reported), storedBlock, "block is " + ucState + " and reported length " + reported.getNumBytes() + " does not match " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java new file mode 100644 index 0000000000..23680216b8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; + +/** + * Utility class for analyzing striped block groups + */ +@InterfaceAudience.Private +public class StripedBlockUtil { + + /** + * This method parses a striped block group into individual blocks. + * + * @param bg The striped block group + * @param cellSize The size of a striping cell + * @param dataBlkNum The number of data blocks + * @return An array containing the blocks in the group + */ + public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg, + int cellSize, int dataBlkNum, int parityBlkNum) { + int locatedBGSize = bg.getBlockIndices().length; + // TODO not considering missing blocks for now, only identify data blocks + LocatedBlock[] lbs = new LocatedBlock[dataBlkNum + parityBlkNum]; + for (short i = 0; i < locatedBGSize; i++) { + final int idx = bg.getBlockIndices()[i]; + if (idx < (dataBlkNum + parityBlkNum) && lbs[idx] == null) { + lbs[idx] = constructInternalBlock(bg, i, cellSize, + dataBlkNum, idx); + } + } + return lbs; + } + + /** + * This method creates an internal block at the given index of a block group + * + * @param idxInReturnedLocs The index in the stored locations in the + * {@link LocatedStripedBlock} object + * @param idxInBlockGroup The logical index in the striped block group + * @return The constructed internal block + */ + public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg, + int idxInReturnedLocs, int cellSize, int dataBlkNum, + int idxInBlockGroup) { + final ExtendedBlock blk = new ExtendedBlock(bg.getBlock()); + blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup); + blk.setNumBytes(getInternalBlockLength(bg.getBlockSize(), + cellSize, dataBlkNum, idxInBlockGroup)); + + return new LocatedBlock(blk, + new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]}, + new String[]{bg.getStorageIDs()[idxInReturnedLocs]}, + new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]}, + bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(), + null); + } + + /** + * Get the size of an internal block at the given index of a block group + * + * @param numBytesInGroup Size of the block group only counting data blocks + * @param cellSize The size of a striping cell + * @param dataBlkNum The number of data blocks + * @param idxInGroup The logical index in the striped block group + * @return The size of the internal block at the specified index + */ + public static long getInternalBlockLength(long numBytesInGroup, + int cellSize, int dataBlkNum, int idxInGroup) { + // Size of each stripe (only counting data blocks) + final long numBytesPerStripe = cellSize * dataBlkNum; + assert numBytesPerStripe > 0: + "getInternalBlockLength should only be called on valid striped blocks"; + // If block group ends at stripe boundary, each internal block has an equal + // share of the group + if (numBytesInGroup % numBytesPerStripe == 0) { + return numBytesInGroup / dataBlkNum; + } + + int numStripes = (int) ((numBytesInGroup - 1) / numBytesPerStripe + 1); + assert numStripes >= 1 : "There should be at least 1 stripe"; + + // All stripes but the last one are full stripes. The block should at least + // contain (numStripes - 1) full cells. + long blkSize = (numStripes - 1) * cellSize; + + long lastStripeLen = numBytesInGroup % numBytesPerStripe; + // Size of parity cells should equal the size of the first cell, if it + // is not full. + long lastParityCellLen = Math.min(cellSize, lastStripeLen); + + if (idxInGroup >= dataBlkNum) { + // for parity blocks + blkSize += lastParityCellLen; + } else { + // for data blocks + blkSize += Math.min(cellSize, + Math.max(0, lastStripeLen - cellSize * idxInGroup)); + } + + return blkSize; + } + + /** + * Given a byte's offset in an internal block, calculate the offset in + * the block group + */ + public static long offsetInBlkToOffsetInBG(int cellSize, int dataBlkNum, + long offsetInBlk, int idxInBlockGroup) { + int cellIdxInBlk = (int) (offsetInBlk / cellSize); + return cellIdxInBlk * cellSize * dataBlkNum // n full stripes before offset + + idxInBlockGroup * cellSize // m full cells before offset + + offsetInBlk % cellSize; // partial cell + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index ed508fcbf1..0c88842f9b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -108,7 +108,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LayoutVersion; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; @@ -1851,11 +1850,30 @@ public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock( return reports; } - public static void createECFile(MiniDFSCluster cluster, Path file, Path dir, - int numBlocks, int numStripesPerBlk) throws Exception { + /** + * Creates the metadata of a file in striped layout. This method only + * manipulates the NameNode state without injecting data to DataNode. + * @param file Path of the file to create + * @param dir Parent path of the file + * @param numBlocks Number of striped block groups to add to the file + * @param numStripesPerBlk Number of striped cells in each block + * @param toMkdir + */ + public static void createStripedFile(MiniDFSCluster cluster, Path file, Path dir, + int numBlocks, int numStripesPerBlk, boolean toMkdir) throws Exception { DistributedFileSystem dfs = cluster.getFileSystem(); - dfs.mkdirs(dir); - dfs.getClient().createErasureCodingZone(dir.toString(), null); + // If outer test already created EC zone, dir should be left as null + if (toMkdir) { + assert dir != null; + dfs.mkdirs(dir); + try { + dfs.getClient().createErasureCodingZone(dir.toString(), null); + } catch (IOException e) { + if (!e.getMessage().contains("non-empty directory")) { + throw e; + } + } + } FSDataOutputStream out = null; try { @@ -1867,7 +1885,7 @@ public static void createECFile(MiniDFSCluster cluster, Path file, Path dir, ExtendedBlock previous = null; for (int i = 0; i < numBlocks; i++) { - Block newBlock = createBlock(cluster.getDataNodes(), dfs, ns, + Block newBlock = addStripedBlockToFile(cluster.getDataNodes(), dfs, ns, file.toString(), fileNode, dfs.getClient().getClientName(), previous, numStripesPerBlk); previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock); @@ -1880,43 +1898,50 @@ public static void createECFile(MiniDFSCluster cluster, Path file, Path dir, } } - static Block createBlock(List dataNodes, DistributedFileSystem fs, - FSNamesystem ns, String file, INodeFile fileNode, String clientName, - ExtendedBlock previous, int numStripes) throws Exception { + /** + * Adds a striped block group to a file. This method only manipulates NameNode + * states of the file and the block without injecting data to DataNode. + * It does mimic block reports. + * @param dataNodes List DataNodes to host the striped block group + * @param previous Previous block in the file + * @param numStripes Number of stripes in each block group + * @return The added block group + */ + public static Block addStripedBlockToFile(List dataNodes, + DistributedFileSystem fs, FSNamesystem ns, String file, INodeFile fileNode, + String clientName, ExtendedBlock previous, int numStripes) + throws Exception { fs.getClient().namenode.addBlock(file, clientName, previous, null, fileNode.getId(), null); final BlockInfo lastBlock = fileNode.getLastBlock(); final int groupSize = fileNode.getBlockReplication(); + assert dataNodes.size() >= groupSize; // 1. RECEIVING_BLOCK IBR - int i = 0; - for (DataNode dn : dataNodes) { - if (i < groupSize) { - final Block block = new Block(lastBlock.getBlockId() + i++, 0, - lastBlock.getGenerationStamp()); - DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); - StorageReceivedDeletedBlocks[] reports = DFSTestUtil - .makeReportForReceivedBlock(block, - ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage); - for (StorageReceivedDeletedBlocks report : reports) { - ns.processIncrementalBlockReport(dn.getDatanodeId(), report); - } + for (int i = 0; i < groupSize; i++) { + DataNode dn = dataNodes.get(i); + final Block block = new Block(lastBlock.getBlockId() + i, 0, + lastBlock.getGenerationStamp()); + DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); + StorageReceivedDeletedBlocks[] reports = DFSTestUtil + .makeReportForReceivedBlock(block, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage); + for (StorageReceivedDeletedBlocks report : reports) { + ns.processIncrementalBlockReport(dn.getDatanodeId(), report); } } // 2. RECEIVED_BLOCK IBR - i = 0; - for (DataNode dn : dataNodes) { - if (i < groupSize) { - final Block block = new Block(lastBlock.getBlockId() + i++, - numStripes * BLOCK_STRIPED_CELL_SIZE, lastBlock.getGenerationStamp()); - DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); - StorageReceivedDeletedBlocks[] reports = DFSTestUtil - .makeReportForReceivedBlock(block, - ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); - for (StorageReceivedDeletedBlocks report : reports) { - ns.processIncrementalBlockReport(dn.getDatanodeId(), report); - } + for (int i = 0; i < groupSize; i++) { + DataNode dn = dataNodes.get(i); + final Block block = new Block(lastBlock.getBlockId() + i, + numStripes * BLOCK_STRIPED_CELL_SIZE, lastBlock.getGenerationStamp()); + DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); + StorageReceivedDeletedBlocks[] reports = DFSTestUtil + .makeReportForReceivedBlock(block, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + for (StorageReceivedDeletedBlocks report : reports) { + ns.processIncrementalBlockReport(dn.getDatanodeId(), report); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java index c78922e47d..4a09bda6ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -2,7 +2,6 @@ import java.util.Arrays; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; @@ -14,10 +13,12 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; @@ -39,16 +40,16 @@ public class TestDFSStripedOutputStream { private MiniDFSCluster cluster; private Configuration conf = new Configuration(); private DistributedFileSystem fs; - int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; - int blockSize = 8 * 1024 * 1024; - int cellsInBlock = blockSize / cellSize; + private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final int stripesPerBlock = 4; + int blockSize = cellSize * stripesPerBlock; private int mod = 29; @Before public void setup() throws IOException { int numDNs = dataBlocks + parityBlocks + 2; Configuration conf = new Configuration(); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, cellsInBlock * cellSize); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster.getFileSystem().getClient().createErasureCodingZone("/", null); fs = cluster.getFileSystem(); @@ -103,8 +104,7 @@ public void TestFileMoreThanOneStripe1() throws IOException { @Test public void TestFileMoreThanOneStripe2() throws IOException { - testOneFile("/MoreThanOneStripe2", - cellSize * dataBlocks * (cellsInBlock >= 2 ? cellsInBlock / 2 : 1) + testOneFile("/MoreThanOneStripe2", cellSize * dataBlocks + cellSize * dataBlocks + 123); } @@ -113,18 +113,22 @@ public void TestFileFullBlockGroup() throws IOException { testOneFile("/FullBlockGroup", blockSize * dataBlocks); } - //TODO: The following tests will pass after HDFS-8121 fixed -// @Test + @Test public void TestFileMoreThanABlockGroup1() throws IOException { testOneFile("/MoreThanABlockGroup1", blockSize * dataBlocks + 123); } - // @Test + @Test public void TestFileMoreThanABlockGroup2() throws IOException { - testOneFile("/MoreThanABlockGroup2", - blockSize * dataBlocks * 3 - + (cellsInBlock >= 2 ? cellsInBlock / 2 : 1) * cellSize * dataBlocks - + 123); + testOneFile("/MoreThanABlockGroup2", blockSize * dataBlocks + cellSize+ 123); + } + + + @Test + public void TestFileMoreThanABlockGroup3() throws IOException { + testOneFile("/MoreThanABlockGroup3", + blockSize * dataBlocks * 3 + cellSize * dataBlocks + + cellSize + 123); } private int stripeDataSize() { @@ -193,7 +197,10 @@ private void testOneFile(String src, int writeBytes) LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L); for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) { - LocatedBlock[] blocks = StripedDataStreamer.unwrapBlockGroup(firstBlock); + assert firstBlock instanceof LocatedStripedBlock; + LocatedBlock[] blocks = StripedBlockUtil. + parseStripedBlockGroup((LocatedStripedBlock) firstBlock, + cellSize, dataBlocks, parityBlocks); List oneGroup = Arrays.asList(blocks); blockGroupList.add(oneGroup); } @@ -205,12 +212,6 @@ private void testOneFile(String src, int writeBytes) byte[][] dataBlockBytes = new byte[dataBlocks][]; byte[][] parityBlockBytes = new byte[allBlocks - dataBlocks][]; - //calculate the size of this block group - int lenOfBlockGroup = group < blockGroupList.size() - 1 ? - blockSize * dataBlocks : - writeBytes - blockSize * (blockGroupList.size() - 1) * dataBlocks; - int intactStripes = lenOfBlockGroup / stripeDataSize(); - int lastStripeLen = lenOfBlockGroup % stripeDataSize(); //for each block, use BlockReader to read data for (int i = 0; i < blockList.size(); i++) { @@ -223,25 +224,17 @@ private void testOneFile(String src, int writeBytes) InetSocketAddress targetAddr = NetUtils.createSocketAddr( nodes[0].getXferAddr()); - int lenOfCell = cellSize; - if (i == lastStripeLen / cellSize) { - lenOfCell = lastStripeLen % cellSize; - } else if (i > lastStripeLen / cellSize) { - lenOfCell = 0; - } - int lenOfBlock = cellSize * intactStripes + lenOfCell; - byte[] blockBytes = new byte[lenOfBlock]; + byte[] blockBytes = new byte[(int)block.getNumBytes()]; if (i < dataBlocks) { dataBlockBytes[i] = blockBytes; } else { parityBlockBytes[i - dataBlocks] = blockBytes; } - if (lenOfBlock == 0) { + if (block.getNumBytes() == 0) { continue; } - block.setNumBytes(lenOfBlock); BlockReader blockReader = new BlockReaderFactory(new DfsClientConf(conf)). setFileName(src). setBlock(block). @@ -276,33 +269,33 @@ public Peer newConnectedPeer(InetSocketAddress addr, } }).build(); - blockReader.readAll(blockBytes, 0, lenOfBlock); + blockReader.readAll(blockBytes, 0, (int)block.getNumBytes()); blockReader.close(); } //check if we write the data correctly - for (int i = 0; i < dataBlockBytes.length; i++) { - byte[] cells = dataBlockBytes[i]; - if (cells == null) { + for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length; blkIdxInGroup++) { + byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup]; + if (actualBlkBytes == null) { continue; } - for (int j = 0; j < cells.length; j++) { + for (int posInBlk = 0; posInBlk < actualBlkBytes.length; posInBlk++) { byte expected; //calculate the postion of this byte in the file - long pos = group * dataBlocks * blockSize - + (i * cellSize + j / cellSize * cellSize * dataBlocks) - + j % cellSize; - if (pos >= writeBytes) { + long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(cellSize, + dataBlocks, posInBlk, blkIdxInGroup) + + group * blockSize * dataBlocks; + if (posInFile >= writeBytes) { expected = 0; } else { - expected = getByte(pos); + expected = getByte(posInFile); } - if (expected != cells[j]) { - Assert.fail("Unexpected byte " + cells[j] + ", expect " + expected + if (expected != actualBlkBytes[posInBlk]) { + Assert.fail("Unexpected byte " + actualBlkBytes[posInBlk] + ", expect " + expected + ". Block group index is " + group + - ", stripe index is " + j / cellSize + - ", cell index is " + i + ", byte index is " + j % cellSize); + ", stripe index is " + posInBlk / cellSize + + ", cell index is " + blkIdxInGroup + ", byte index is " + posInBlk % cellSize); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java index 849e12ea8e..90488c19ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java @@ -21,10 +21,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; @@ -34,10 +31,9 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -54,17 +50,18 @@ public class TestReadStripedFile { private DistributedFileSystem fs; private final Path dirPath = new Path("/striped"); private Path filePath = new Path(dirPath, "file"); - private final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS; - private final short TOTAL_SIZE = HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS; + private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS; + private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS; + private final short BLK_GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM; private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; private final int NUM_STRIPE_PER_BLOCK = 2; - private final int BLOCKSIZE = 2 * GROUP_SIZE * CELLSIZE; + private final int BLOCKSIZE = NUM_STRIPE_PER_BLOCK * DATA_BLK_NUM * CELLSIZE; @Before public void setup() throws IOException { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE); SimulatedFSDataset.setFactory(conf); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(TOTAL_SIZE) + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(BLK_GROUP_SIZE) .build(); cluster.waitActive(); fs = cluster.getFileSystem(); @@ -77,72 +74,14 @@ public void tearDown() { } } - private LocatedStripedBlock createDummyLocatedBlock() { - final long blockGroupID = -1048576; - DatanodeInfo[] locs = new DatanodeInfo[TOTAL_SIZE]; - String[] storageIDs = new String[TOTAL_SIZE]; - StorageType[] storageTypes = new StorageType[TOTAL_SIZE]; - int[] indices = new int[TOTAL_SIZE]; - for (int i = 0; i < TOTAL_SIZE; i++) { - locs[i] = new DatanodeInfo(cluster.getDataNodes().get(i).getDatanodeId()); - storageIDs[i] = cluster.getDataNodes().get(i).getDatanodeUuid(); - storageTypes[i] = StorageType.DISK; - indices[i] = (i + 2) % GROUP_SIZE; - } - return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID), - locs, storageIDs, storageTypes, indices, 0, false, null); - } - - @Test - public void testParseDummyStripedBlock() { - LocatedStripedBlock lsb = createDummyLocatedBlock(); - LocatedBlock[] blocks = DFSStripedInputStream.parseStripedBlockGroup( - lsb, GROUP_SIZE, CELLSIZE); - assertEquals(GROUP_SIZE, blocks.length); - for (int j = 0; j < GROUP_SIZE; j++) { - assertFalse(blocks[j].isStriped()); - assertEquals(j, - BlockIdManager.getBlockIndex(blocks[j].getBlock().getLocalBlock())); - assertEquals(j * CELLSIZE, blocks[j].getStartOffset()); - } - } - - @Test - public void testParseStripedBlock() throws Exception { - final int numBlocks = 4; - DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks, - NUM_STRIPE_PER_BLOCK); - LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( - filePath.toString(), 0, BLOCKSIZE * numBlocks); - - assertEquals(4, lbs.locatedBlockCount()); - List lbList = lbs.getLocatedBlocks(); - for (LocatedBlock lb : lbList) { - assertTrue(lb.isStriped()); - } - - for (int i = 0; i < numBlocks; i++) { - LocatedStripedBlock lsb = (LocatedStripedBlock) (lbs.get(i)); - LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb, - GROUP_SIZE, CELLSIZE); - assertEquals(GROUP_SIZE, blks.length); - for (int j = 0; j < GROUP_SIZE; j++) { - assertFalse(blks[j].isStriped()); - assertEquals(j, - BlockIdManager.getBlockIndex(blks[j].getBlock().getLocalBlock())); - assertEquals(i * BLOCKSIZE + j * CELLSIZE, blks[j].getStartOffset()); - } - } - } - /** * Test {@link DFSStripedInputStream#getBlockAt(long)} */ @Test public void testGetBlock() throws Exception { final int numBlocks = 4; - DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks, - NUM_STRIPE_PER_BLOCK); + DFSTestUtil.createStripedFile(cluster, filePath, dirPath, numBlocks, + NUM_STRIPE_PER_BLOCK, true); LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( filePath.toString(), 0, BLOCKSIZE * numBlocks); final DFSStripedInputStream in = @@ -151,9 +90,9 @@ public void testGetBlock() throws Exception { List lbList = lbs.getLocatedBlocks(); for (LocatedBlock aLbList : lbList) { LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList; - LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb, - GROUP_SIZE, CELLSIZE); - for (int j = 0; j < GROUP_SIZE; j++) { + LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(lsb, + CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM); + for (int j = 0; j < DATA_BLK_NUM; j++) { LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset()); assertEquals(blks[j].getBlock(), refreshed.getBlock()); assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset()); @@ -165,15 +104,16 @@ public void testGetBlock() throws Exception { @Test public void testPread() throws Exception { final int numBlocks = 4; - DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks, - NUM_STRIPE_PER_BLOCK); + DFSTestUtil.createStripedFile(cluster, filePath, dirPath, numBlocks, + NUM_STRIPE_PER_BLOCK, true); LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( filePath.toString(), 0, BLOCKSIZE); assert lbs.get(0) instanceof LocatedStripedBlock; LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0)); - for (int i = 0; i < GROUP_SIZE; i++) { - Block blk = new Block(bg.getBlock().getBlockId() + i, BLOCKSIZE, + for (int i = 0; i < DATA_BLK_NUM; i++) { + Block blk = new Block(bg.getBlock().getBlockId() + i, + NUM_STRIPE_PER_BLOCK * CELLSIZE, bg.getBlock().getGenerationStamp()); blk.setGenerationStamp(bg.getBlock().getGenerationStamp()); cluster.injectBlocks(i, Arrays.asList(blk), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java index 27df1cd49d..6bb1162ade 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java @@ -34,11 +34,13 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; @@ -53,6 +55,8 @@ import java.util.List; import java.util.UUID; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; import static org.junit.Assert.assertEquals; public class TestAddStripedBlocks { @@ -284,4 +288,107 @@ public void testAddUCReplica() throws Exception { Assert.assertEquals(GROUP_SIZE - i - 1, indices[i]); } } + + @Test + public void testCheckStripedReplicaCorrupt() throws Exception { + final int numBlocks = 4; + final int numStripes = 4; + final Path filePath = new Path("/corrupt"); + final FSNamesystem ns = cluster.getNameNode().getNamesystem(); + DFSTestUtil.createStripedFile(cluster, filePath, null, + numBlocks, numStripes, false); + + INodeFile fileNode = ns.getFSDirectory().getINode(filePath.toString()). + asFile(); + Assert.assertTrue(fileNode.isStriped()); + BlockInfoStriped stored = fileNode.getStripedBlocksFeature().getBlocks()[0]; + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(0, ns.getCorruptReplicaBlocks()); + + // Now send a block report with correct size + DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); + final Block reported = new Block(stored); + reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE); + StorageReceivedDeletedBlocks[] reports = DFSTestUtil + .makeReportForReceivedBlock(reported, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + ns.processIncrementalBlockReport( + cluster.getDataNodes().get(0).getDatanodeId(), reports[0]); + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(0, ns.getCorruptReplicaBlocks()); + + // Now send a block report with wrong size + reported.setBlockId(stored.getBlockId() + 1); + reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE - 1); + reports = DFSTestUtil.makeReportForReceivedBlock(reported, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + ns.processIncrementalBlockReport( + cluster.getDataNodes().get(1).getDatanodeId(), reports[0]); + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(1, ns.getCorruptReplicaBlocks()); + + // Now send a parity block report with correct size + reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS); + reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE); + reports = DFSTestUtil.makeReportForReceivedBlock(reported, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + ns.processIncrementalBlockReport( + cluster.getDataNodes().get(2).getDatanodeId(), reports[0]); + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(1, ns.getCorruptReplicaBlocks()); + + // Now send a parity block report with wrong size + reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS); + reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE + 1); + reports = DFSTestUtil.makeReportForReceivedBlock(reported, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + ns.processIncrementalBlockReport( + cluster.getDataNodes().get(3).getDatanodeId(), reports[0]); + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(2, ns.getCorruptReplicaBlocks()); + + // Now change the size of stored block, and test verifying the last + // block size + stored.setNumBytes(stored.getNumBytes() + 10); + reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS + 2); + reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE); + reports = DFSTestUtil.makeReportForReceivedBlock(reported, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + ns.processIncrementalBlockReport( + cluster.getDataNodes().get(3).getDatanodeId(), reports[0]); + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(3, ns.getCorruptReplicaBlocks()); + + // Now send a parity block report with correct size based on adjusted + // size of stored block + /** Now stored block has {@link numStripes} full stripes + a cell + 10 */ + stored.setNumBytes(stored.getNumBytes() + BLOCK_STRIPED_CELL_SIZE); + reported.setBlockId(stored.getBlockId()); + reported.setNumBytes((numStripes + 1) * BLOCK_STRIPED_CELL_SIZE); + reports = DFSTestUtil.makeReportForReceivedBlock(reported, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + ns.processIncrementalBlockReport( + cluster.getDataNodes().get(0).getDatanodeId(), reports[0]); + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(3, ns.getCorruptReplicaBlocks()); + + reported.setBlockId(stored.getBlockId() + 1); + reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE + 10); + reports = DFSTestUtil.makeReportForReceivedBlock(reported, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + ns.processIncrementalBlockReport( + cluster.getDataNodes().get(1).getDatanodeId(), reports[0]); + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(3, ns.getCorruptReplicaBlocks()); + + reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS); + reported.setNumBytes((numStripes + 1) * BLOCK_STRIPED_CELL_SIZE); + reports = DFSTestUtil.makeReportForReceivedBlock(reported, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + ns.processIncrementalBlockReport( + cluster.getDataNodes().get(2).getDatanodeId(), reports[0]); + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(3, ns.getCorruptReplicaBlocks()); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java index 4292f9ad6f..ea18c3ee66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java @@ -78,7 +78,8 @@ public void tearDown() throws Exception { @Test public void testMissingStripedBlock() throws Exception { final int numBlocks = 4; - DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks, 1); + DFSTestUtil.createStripedFile(cluster, filePath, + dirPath, numBlocks, 1, true); // make sure the file is complete in NN final INodeFile fileNode = cluster.getNamesystem().getFSDirectory() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java new file mode 100644 index 0000000000..ec0b1bbdae --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.util; + +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.parseStripedBlockGroup; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class TestStripedBlockUtil { + private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS; + private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS; + private final short BLK_GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM; + private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + + private LocatedStripedBlock createDummyLocatedBlock() { + final long blockGroupID = -1048576; + DatanodeInfo[] locs = new DatanodeInfo[BLK_GROUP_SIZE]; + String[] storageIDs = new String[BLK_GROUP_SIZE]; + StorageType[] storageTypes = new StorageType[BLK_GROUP_SIZE]; + int[] indices = new int[BLK_GROUP_SIZE]; + for (int i = 0; i < BLK_GROUP_SIZE; i++) { + indices[i] = (i + 2) % DATA_BLK_NUM; + // Location port always equal to logical index of a block, + // for easier verification + locs[i] = DFSTestUtil.getLocalDatanodeInfo(indices[i]); + storageIDs[i] = locs[i].getDatanodeUuid(); + storageTypes[i] = StorageType.DISK; + } + return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID), + locs, storageIDs, storageTypes, indices, 0, false, null); + } + + @Test + public void testParseDummyStripedBlock() { + LocatedStripedBlock lsb = createDummyLocatedBlock(); + LocatedBlock[] blocks = parseStripedBlockGroup( + lsb, CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM); + assertEquals(DATA_BLK_NUM + PARITY_BLK_NUM, blocks.length); + for (int i = 0; i < DATA_BLK_NUM; i++) { + assertFalse(blocks[i].isStriped()); + assertEquals(i, + BlockIdManager.getBlockIndex(blocks[i].getBlock().getLocalBlock())); + assertEquals(i * CELLSIZE, blocks[i].getStartOffset()); + assertEquals(1, blocks[i].getLocations().length); + assertEquals(i, blocks[i].getLocations()[0].getIpcPort()); + assertEquals(i, blocks[i].getLocations()[0].getXferPort()); + } + } + + private void verifyInternalBlocks (long numBytesInGroup, long[] expected) { + for (int i = 1; i < BLK_GROUP_SIZE; i++) { + assertEquals(expected[i], + getInternalBlockLength(numBytesInGroup, CELLSIZE, DATA_BLK_NUM, i)); + } + } + + @Test + public void testGetInternalBlockLength () { + // A small delta that is smaller than a cell + final int delta = 10; + assert delta < CELLSIZE; + + // Block group is smaller than a cell + verifyInternalBlocks(CELLSIZE - delta, + new long[] {CELLSIZE - delta, 0, 0, 0, 0, 0, + CELLSIZE - delta, CELLSIZE - delta, CELLSIZE - delta}); + + // Block group is exactly as large as a cell + verifyInternalBlocks(CELLSIZE, + new long[] {CELLSIZE, 0, 0, 0, 0, 0, + CELLSIZE, CELLSIZE, CELLSIZE}); + + // Block group is a little larger than a cell + verifyInternalBlocks(CELLSIZE + delta, + new long[] {CELLSIZE, delta, 0, 0, 0, 0, + CELLSIZE, CELLSIZE, CELLSIZE}); + + // Block group contains multiple stripes and ends at stripe boundary + verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE, + new long[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, + 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, + 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE}); + + // Block group contains multiple stripes and ends at cell boundary + // (not ending at stripe boundary) + verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE + CELLSIZE, + new long[] {3 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, + 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, + 3 * CELLSIZE, 3 * CELLSIZE, 3 * CELLSIZE}); + + // Block group contains multiple stripes and doesn't end at cell boundary + verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE - delta, + new long[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE, + 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE - delta, + 2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE}); + } + +}