HDFS-8120. Erasure coding: created util class to analyze striped block groups. Contributed by Zhe Zhang and Li Bo.

This commit is contained in:
Jing Zhao 2015-04-15 12:59:27 -07:00 committed by Zhe Zhang
parent ceb3d1c170
commit 5e8837dd6c
12 changed files with 562 additions and 276 deletions

View File

@ -1151,9 +1151,9 @@ void actualGetFromOneDataNode(final DNAddrPair datanode,
for (int i = 0; i < offsets.length; i++) {
int nread = reader.readAll(buf, offsets[i], lengths[i]);
updateReadStatistics(readStatistics, nread, reader);
if (nread != len) {
if (nread != lengths[i]) {
throw new IOException("truncated return from reader.read(): " +
"excpected " + len + ", got " + nread);
"excpected " + lengths[i] + ", got " + nread);
}
}
DFSClientFaultInjector.get().readFromDatanodeDelay();

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.net.NetUtils;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
@ -50,7 +51,7 @@
*
* | <- Striped Block Group -> |
* blk_0 blk_1 blk_2 <- A striped block group has
* | | | {@link #groupSize} blocks
* | | | {@link #dataBlkNum} blocks
* v v v
* +------+ +------+ +------+
* |cell_0| |cell_1| |cell_2| <- The logical read order should be
@ -72,7 +73,7 @@
public class DFSStripedInputStream extends DFSInputStream {
/**
* This method plans the read portion from each block in the stripe
* @param groupSize The size / width of the striping group
* @param dataBlkNum The number of data blocks in the striping group
* @param cellSize The size of each striping cell
* @param startInBlk Starting offset in the striped block
* @param len Length of the read request
@ -81,29 +82,29 @@ public class DFSStripedInputStream extends DFSInputStream {
* for an individual block in the group
*/
@VisibleForTesting
static ReadPortion[] planReadPortions(final int groupSize,
static ReadPortion[] planReadPortions(final int dataBlkNum,
final int cellSize, final long startInBlk, final int len, int bufOffset) {
ReadPortion[] results = new ReadPortion[groupSize];
for (int i = 0; i < groupSize; i++) {
ReadPortion[] results = new ReadPortion[dataBlkNum];
for (int i = 0; i < dataBlkNum; i++) {
results[i] = new ReadPortion();
}
// cellIdxInBlk is the index of the cell in the block
// E.g., cell_3 is the 2nd cell in blk_0
int cellIdxInBlk = (int) (startInBlk / (cellSize * groupSize));
int cellIdxInBlk = (int) (startInBlk / (cellSize * dataBlkNum));
// blkIdxInGroup is the index of the block in the striped block group
// E.g., blk_2 is the 3rd block in the group
final int blkIdxInGroup = (int) (startInBlk / cellSize % groupSize);
final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum);
results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
startInBlk % cellSize;
boolean crossStripe = false;
for (int i = 1; i < groupSize; i++) {
if (blkIdxInGroup + i >= groupSize && !crossStripe) {
for (int i = 1; i < dataBlkNum; i++) {
if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) {
cellIdxInBlk++;
crossStripe = true;
}
results[(blkIdxInGroup + i) % groupSize].startOffsetInBlock =
results[(blkIdxInGroup + i) % dataBlkNum].startOffsetInBlock =
cellSize * cellIdxInBlk;
}
@ -112,57 +113,21 @@ static ReadPortion[] planReadPortions(final int groupSize,
results[blkIdxInGroup].lengths.add(firstCellLen);
results[blkIdxInGroup].readLength += firstCellLen;
int i = (blkIdxInGroup + 1) % groupSize;
int i = (blkIdxInGroup + 1) % dataBlkNum;
for (int done = firstCellLen; done < len; done += cellSize) {
ReadPortion rp = results[i];
rp.offsetsInBuf.add(done + bufOffset);
final int readLen = Math.min(len - done, cellSize);
rp.lengths.add(readLen);
rp.readLength += readLen;
i = (i + 1) % groupSize;
i = (i + 1) % dataBlkNum;
}
return results;
}
/**
* This method parses a striped block group into individual blocks.
*
* @param bg The striped block group
* @param dataBlkNum the number of data blocks
* @return An array containing the blocks in the group
*/
@VisibleForTesting
static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
int dataBlkNum, int cellSize) {
int locatedBGSize = bg.getBlockIndices().length;
// TODO not considering missing blocks for now, only identify data blocks
LocatedBlock[] lbs = new LocatedBlock[dataBlkNum];
for (short i = 0; i < locatedBGSize; i++) {
final int idx = bg.getBlockIndices()[i];
if (idx < dataBlkNum && lbs[idx] == null) {
lbs[idx] = constructInternalBlock(bg, i, cellSize, idx);
}
}
return lbs;
}
private static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
int idxInReturnedLocs, int cellSize, int idxInBlockGroup) {
final ExtendedBlock blk = new ExtendedBlock(bg.getBlock());
blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup);
// TODO: fix the numBytes computation
return new LocatedBlock(blk,
new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(),
null);
}
private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
private final short dataBlkNum = HdfsConstants.NUM_DATA_BLOCKS;
private final short parityBlkNum = HdfsConstants.NUM_PARITY_BLOCKS;
DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum)
throws IOException {
@ -199,7 +164,7 @@ protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException {
"LocatedStripedBlock for a striped file";
int idx = (int) (((blkStartOffset - lb.getStartOffset()) / cellSize)
% groupSize);
% dataBlkNum);
// If indexing information is returned, iterate through the index array
// to find the entry for position idx in the group
LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
@ -213,7 +178,8 @@ protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException {
DFSClient.LOG.debug("getBlockAt for striped blocks, offset="
+ blkStartOffset + ". Obtained block " + lb + ", idx=" + idx);
}
return constructInternalBlock(lsb, i, cellSize, idx);
return StripedBlockUtil.constructInternalBlock(lsb, i, cellSize,
dataBlkNum, idx);
}
private LocatedBlock getBlockGroupAt(long offset) throws IOException {
@ -240,13 +206,14 @@ protected void fetchBlockByteRange(long blockStartOffset, long start,
LocatedStripedBlock blockGroup = (LocatedStripedBlock) block;
// Planning the portion of I/O for each shard
ReadPortion[] readPortions = planReadPortions(groupSize, cellSize, start,
ReadPortion[] readPortions = planReadPortions(dataBlkNum, cellSize, start,
len, offset);
// Parse group to get chosen DN location
LocatedBlock[] blks = parseStripedBlockGroup(blockGroup, groupSize, cellSize);
LocatedBlock[] blks = StripedBlockUtil.
parseStripedBlockGroup(blockGroup, cellSize, dataBlkNum, parityBlkNum);
for (short i = 0; i < groupSize; i++) {
for (short i = 0; i < dataBlkNum; i++) {
ReadPortion rp = readPortions[i];
if (rp.readLength <= 0) {
continue;

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.util.DataChecksum;
@ -309,10 +310,7 @@ protected void closeThreads(boolean force) throws IOException {
streamer.closeSocket();
if (streamer.isLeadingStreamer()) {
leadingStreamer = streamer;
} else {
streamer.countTailingBlockGroupBytes();
}
} catch (InterruptedException e) {
throw new IOException("Failed to shutdown streamer");
} finally {
@ -320,6 +318,7 @@ protected void closeThreads(boolean force) throws IOException {
setClosed();
}
}
assert leadingStreamer != null : "One streamer should be leader";
leadingStreamer.countTailingBlockGroupBytes();
}
@ -337,23 +336,28 @@ public synchronized void write(byte b[], int off, int len)
}
private void writeParityCellsForLastStripe() throws IOException{
if(currentBlockGroupBytes == 0 ||
currentBlockGroupBytes % stripeDataSize() == 0)
long parityBlkSize = StripedBlockUtil.getInternalBlockLength(
currentBlockGroupBytes, cellSize, blockGroupDataBlocks,
blockGroupDataBlocks + 1);
if (parityBlkSize == 0 || currentBlockGroupBytes % stripeDataSize() == 0) {
return;
int lastStripeLen =(int)(currentBlockGroupBytes % stripeDataSize());
// Size of parity cells should equal the size of the first cell, if it
// is not full.
int parityCellSize = cellSize;
int index = lastStripeLen / cellSize;
if (lastStripeLen < cellSize) {
parityCellSize = lastStripeLen;
index++;
}
int parityCellSize = parityBlkSize % cellSize == 0 ? cellSize :
(int) (parityBlkSize % cellSize);
for (int i = 0; i < blockGroupBlocks; i++) {
if (i >= index) {
long internalBlkLen = StripedBlockUtil.getInternalBlockLength(
currentBlockGroupBytes, cellSize, blockGroupDataBlocks, i);
// Pad zero bytes to make all cells exactly the size of parityCellSize
// If internal block is smaller than parity block, pad zero bytes.
// Also pad zero bytes to all parity cells
if (internalBlkLen < parityBlkSize || i >= blockGroupDataBlocks) {
int position = cellBuffers[i].position();
assert position <= parityCellSize : "If an internal block is smaller" +
" than parity block, then its last cell should be small than last" +
" parity cell";
for (int j = 0; j < parityCellSize - position; j++) {
cellBuffers[i].put((byte)0);
cellBuffers[i].put((byte) 0);
}
}
cellBuffers[i].flip();

View File

@ -19,16 +19,16 @@
package org.apache.hadoop.hdfs;
import java.util.List;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
@ -134,20 +134,8 @@ void countTailingBlockGroupBytes () throws IOException {
"putting a block to stripeBlocks, ie = " + ie);
}
}
} else if (!isParityStreamer()) {
if (block == null || block.getNumBytes() == 0) {
LocatedBlock finishedBlock = new LocatedBlock(null, null);
try {
boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
TimeUnit.SECONDS);
} catch (InterruptedException ie) {
//TODO: Handle InterruptedException (HDFS-7786)
ie.printStackTrace();
}
}
}
}
@Override
protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
@ -155,8 +143,10 @@ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
LocatedBlock lb = null;
if (isLeadingStreamer()) {
if(hasCommittedBlock) {
//when committing a block group, leading streamer has to adjust
// {@link block} including the size of block group
/**
* when committing a block group, leading streamer has to adjust
* {@link block} to include the size of block group
*/
for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
try {
LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
@ -179,7 +169,13 @@ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
lb = super.locateFollowingBlock(excludedNodes);
hasCommittedBlock = true;
LocatedBlock[] blocks = unwrapBlockGroup(lb);
assert lb instanceof LocatedStripedBlock;
DFSClient.LOG.debug("Leading streamer obtained bg " + lb);
LocatedBlock[] blocks = StripedBlockUtil.
parseStripedBlockGroup((LocatedStripedBlock) lb,
HdfsConstants.BLOCK_STRIPED_CELL_SIZE, HdfsConstants.NUM_DATA_BLOCKS,
HdfsConstants.NUM_PARITY_BLOCKS
);
assert blocks.length == blockGroupSize :
"Fail to get block group from namenode: blockGroupSize: " +
blockGroupSize + ", blocks.length: " + blocks.length;
@ -212,30 +208,4 @@ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
}
return lb;
}
/**
* Generate other blocks in a block group according to the first one.
*
* @param firstBlockInGroup the first block in a block group
* @return other blocks in this group
*/
public static LocatedBlock[] unwrapBlockGroup(
final LocatedBlock firstBlockInGroup) {
ExtendedBlock eb = firstBlockInGroup.getBlock();
DatanodeInfo[] locs = firstBlockInGroup.getLocations();
String[] storageIDs = firstBlockInGroup.getStorageIDs();
StorageType[] storageTypes = firstBlockInGroup.getStorageTypes();
Token<BlockTokenIdentifier> blockToken = firstBlockInGroup.getBlockToken();
LocatedBlock[] blocksInGroup = new LocatedBlock[locs.length];
for (int i = 0; i < blocksInGroup.length; i++) {
//each block in a group has the same number of bytes and timestamp
ExtendedBlock extendedBlock = new ExtendedBlock(eb.getBlockPoolId(),
eb.getBlockId() + i, eb.getNumBytes(), eb.getGenerationStamp());
blocksInGroup[i] = new LocatedBlock(extendedBlock,
new DatanodeInfo[] {locs[i]}, new String[]{storageIDs[i]},
new StorageType[] {storageTypes[i]});
blocksInGroup[i].setBlockToken(blockToken);
}
return blocksInGroup;
}
}

View File

@ -83,6 +83,7 @@
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
@ -2487,7 +2488,22 @@ private BlockToMarkCorrupt checkReplicaCorrupt(
"block is " + ucState + " and reported genstamp " + reportedGS
+ " does not match genstamp in block map "
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
} else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
}
boolean wrongSize;
if (storedBlock.isStriped()) {
assert BlockIdManager.isStripedBlockID(reported.getBlockId());
assert storedBlock.getBlockId() ==
BlockIdManager.convertToStripedID(reported.getBlockId());
BlockInfoStriped stripedBlock = (BlockInfoStriped) storedBlock;
int reportedBlkIdx = BlockIdManager.getBlockIndex(reported);
wrongSize = reported.getNumBytes() !=
getInternalBlockLength(stripedBlock.getNumBytes(),
HdfsConstants.BLOCK_STRIPED_CELL_SIZE,
stripedBlock.getDataBlockNum(), reportedBlkIdx);
} else {
wrongSize = storedBlock.getNumBytes() != reported.getNumBytes();
}
if (wrongSize) {
return new BlockToMarkCorrupt(new Block(reported), storedBlock,
"block is " + ucState + " and reported length " +
reported.getNumBytes() + " does not match " +

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
/**
* Utility class for analyzing striped block groups
*/
@InterfaceAudience.Private
public class StripedBlockUtil {
/**
* This method parses a striped block group into individual blocks.
*
* @param bg The striped block group
* @param cellSize The size of a striping cell
* @param dataBlkNum The number of data blocks
* @return An array containing the blocks in the group
*/
public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
int cellSize, int dataBlkNum, int parityBlkNum) {
int locatedBGSize = bg.getBlockIndices().length;
// TODO not considering missing blocks for now, only identify data blocks
LocatedBlock[] lbs = new LocatedBlock[dataBlkNum + parityBlkNum];
for (short i = 0; i < locatedBGSize; i++) {
final int idx = bg.getBlockIndices()[i];
if (idx < (dataBlkNum + parityBlkNum) && lbs[idx] == null) {
lbs[idx] = constructInternalBlock(bg, i, cellSize,
dataBlkNum, idx);
}
}
return lbs;
}
/**
* This method creates an internal block at the given index of a block group
*
* @param idxInReturnedLocs The index in the stored locations in the
* {@link LocatedStripedBlock} object
* @param idxInBlockGroup The logical index in the striped block group
* @return The constructed internal block
*/
public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
int idxInReturnedLocs, int cellSize, int dataBlkNum,
int idxInBlockGroup) {
final ExtendedBlock blk = new ExtendedBlock(bg.getBlock());
blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup);
blk.setNumBytes(getInternalBlockLength(bg.getBlockSize(),
cellSize, dataBlkNum, idxInBlockGroup));
return new LocatedBlock(blk,
new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(),
null);
}
/**
* Get the size of an internal block at the given index of a block group
*
* @param numBytesInGroup Size of the block group only counting data blocks
* @param cellSize The size of a striping cell
* @param dataBlkNum The number of data blocks
* @param idxInGroup The logical index in the striped block group
* @return The size of the internal block at the specified index
*/
public static long getInternalBlockLength(long numBytesInGroup,
int cellSize, int dataBlkNum, int idxInGroup) {
// Size of each stripe (only counting data blocks)
final long numBytesPerStripe = cellSize * dataBlkNum;
assert numBytesPerStripe > 0:
"getInternalBlockLength should only be called on valid striped blocks";
// If block group ends at stripe boundary, each internal block has an equal
// share of the group
if (numBytesInGroup % numBytesPerStripe == 0) {
return numBytesInGroup / dataBlkNum;
}
int numStripes = (int) ((numBytesInGroup - 1) / numBytesPerStripe + 1);
assert numStripes >= 1 : "There should be at least 1 stripe";
// All stripes but the last one are full stripes. The block should at least
// contain (numStripes - 1) full cells.
long blkSize = (numStripes - 1) * cellSize;
long lastStripeLen = numBytesInGroup % numBytesPerStripe;
// Size of parity cells should equal the size of the first cell, if it
// is not full.
long lastParityCellLen = Math.min(cellSize, lastStripeLen);
if (idxInGroup >= dataBlkNum) {
// for parity blocks
blkSize += lastParityCellLen;
} else {
// for data blocks
blkSize += Math.min(cellSize,
Math.max(0, lastStripeLen - cellSize * idxInGroup));
}
return blkSize;
}
/**
* Given a byte's offset in an internal block, calculate the offset in
* the block group
*/
public static long offsetInBlkToOffsetInBG(int cellSize, int dataBlkNum,
long offsetInBlk, int idxInBlockGroup) {
int cellIdxInBlk = (int) (offsetInBlk / cellSize);
return cellIdxInBlk * cellSize * dataBlkNum // n full stripes before offset
+ idxInBlockGroup * cellSize // m full cells before offset
+ offsetInBlk % cellSize; // partial cell
}
}

View File

@ -108,7 +108,6 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@ -1851,11 +1850,30 @@ public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
return reports;
}
public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
int numBlocks, int numStripesPerBlk) throws Exception {
/**
* Creates the metadata of a file in striped layout. This method only
* manipulates the NameNode state without injecting data to DataNode.
* @param file Path of the file to create
* @param dir Parent path of the file
* @param numBlocks Number of striped block groups to add to the file
* @param numStripesPerBlk Number of striped cells in each block
* @param toMkdir
*/
public static void createStripedFile(MiniDFSCluster cluster, Path file, Path dir,
int numBlocks, int numStripesPerBlk, boolean toMkdir) throws Exception {
DistributedFileSystem dfs = cluster.getFileSystem();
// If outer test already created EC zone, dir should be left as null
if (toMkdir) {
assert dir != null;
dfs.mkdirs(dir);
try {
dfs.getClient().createErasureCodingZone(dir.toString(), null);
} catch (IOException e) {
if (!e.getMessage().contains("non-empty directory")) {
throw e;
}
}
}
FSDataOutputStream out = null;
try {
@ -1867,7 +1885,7 @@ public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
ExtendedBlock previous = null;
for (int i = 0; i < numBlocks; i++) {
Block newBlock = createBlock(cluster.getDataNodes(), dfs, ns,
Block newBlock = addStripedBlockToFile(cluster.getDataNodes(), dfs, ns,
file.toString(), fileNode, dfs.getClient().getClientName(),
previous, numStripesPerBlk);
previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
@ -1880,19 +1898,29 @@ public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
}
}
static Block createBlock(List<DataNode> dataNodes, DistributedFileSystem fs,
FSNamesystem ns, String file, INodeFile fileNode, String clientName,
ExtendedBlock previous, int numStripes) throws Exception {
/**
* Adds a striped block group to a file. This method only manipulates NameNode
* states of the file and the block without injecting data to DataNode.
* It does mimic block reports.
* @param dataNodes List DataNodes to host the striped block group
* @param previous Previous block in the file
* @param numStripes Number of stripes in each block group
* @return The added block group
*/
public static Block addStripedBlockToFile(List<DataNode> dataNodes,
DistributedFileSystem fs, FSNamesystem ns, String file, INodeFile fileNode,
String clientName, ExtendedBlock previous, int numStripes)
throws Exception {
fs.getClient().namenode.addBlock(file, clientName, previous, null,
fileNode.getId(), null);
final BlockInfo lastBlock = fileNode.getLastBlock();
final int groupSize = fileNode.getBlockReplication();
assert dataNodes.size() >= groupSize;
// 1. RECEIVING_BLOCK IBR
int i = 0;
for (DataNode dn : dataNodes) {
if (i < groupSize) {
final Block block = new Block(lastBlock.getBlockId() + i++, 0,
for (int i = 0; i < groupSize; i++) {
DataNode dn = dataNodes.get(i);
final Block block = new Block(lastBlock.getBlockId() + i, 0,
lastBlock.getGenerationStamp());
DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
StorageReceivedDeletedBlocks[] reports = DFSTestUtil
@ -1902,13 +1930,11 @@ static Block createBlock(List<DataNode> dataNodes, DistributedFileSystem fs,
ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
}
}
}
// 2. RECEIVED_BLOCK IBR
i = 0;
for (DataNode dn : dataNodes) {
if (i < groupSize) {
final Block block = new Block(lastBlock.getBlockId() + i++,
for (int i = 0; i < groupSize; i++) {
DataNode dn = dataNodes.get(i);
final Block block = new Block(lastBlock.getBlockId() + i,
numStripes * BLOCK_STRIPED_CELL_SIZE, lastBlock.getGenerationStamp());
DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
StorageReceivedDeletedBlocks[] reports = DFSTestUtil
@ -1918,7 +1944,6 @@ static Block createBlock(List<DataNode> dataNodes, DistributedFileSystem fs,
ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
}
}
}
lastBlock.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS);
return lastBlock;

View File

@ -2,7 +2,6 @@
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
@ -14,10 +13,12 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
@ -39,16 +40,16 @@ public class TestDFSStripedOutputStream {
private MiniDFSCluster cluster;
private Configuration conf = new Configuration();
private DistributedFileSystem fs;
int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
int blockSize = 8 * 1024 * 1024;
int cellsInBlock = blockSize / cellSize;
private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final int stripesPerBlock = 4;
int blockSize = cellSize * stripesPerBlock;
private int mod = 29;
@Before
public void setup() throws IOException {
int numDNs = dataBlocks + parityBlocks + 2;
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, cellsInBlock * cellSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
fs = cluster.getFileSystem();
@ -103,8 +104,7 @@ public void TestFileMoreThanOneStripe1() throws IOException {
@Test
public void TestFileMoreThanOneStripe2() throws IOException {
testOneFile("/MoreThanOneStripe2",
cellSize * dataBlocks * (cellsInBlock >= 2 ? cellsInBlock / 2 : 1)
testOneFile("/MoreThanOneStripe2", cellSize * dataBlocks
+ cellSize * dataBlocks + 123);
}
@ -113,18 +113,22 @@ public void TestFileFullBlockGroup() throws IOException {
testOneFile("/FullBlockGroup", blockSize * dataBlocks);
}
//TODO: The following tests will pass after HDFS-8121 fixed
// @Test
@Test
public void TestFileMoreThanABlockGroup1() throws IOException {
testOneFile("/MoreThanABlockGroup1", blockSize * dataBlocks + 123);
}
// @Test
@Test
public void TestFileMoreThanABlockGroup2() throws IOException {
testOneFile("/MoreThanABlockGroup2",
blockSize * dataBlocks * 3
+ (cellsInBlock >= 2 ? cellsInBlock / 2 : 1) * cellSize * dataBlocks
+ 123);
testOneFile("/MoreThanABlockGroup2", blockSize * dataBlocks + cellSize+ 123);
}
@Test
public void TestFileMoreThanABlockGroup3() throws IOException {
testOneFile("/MoreThanABlockGroup3",
blockSize * dataBlocks * 3 + cellSize * dataBlocks
+ cellSize + 123);
}
private int stripeDataSize() {
@ -193,7 +197,10 @@ private void testOneFile(String src, int writeBytes)
LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
LocatedBlock[] blocks = StripedDataStreamer.unwrapBlockGroup(firstBlock);
assert firstBlock instanceof LocatedStripedBlock;
LocatedBlock[] blocks = StripedBlockUtil.
parseStripedBlockGroup((LocatedStripedBlock) firstBlock,
cellSize, dataBlocks, parityBlocks);
List<LocatedBlock> oneGroup = Arrays.asList(blocks);
blockGroupList.add(oneGroup);
}
@ -205,12 +212,6 @@ private void testOneFile(String src, int writeBytes)
byte[][] dataBlockBytes = new byte[dataBlocks][];
byte[][] parityBlockBytes = new byte[allBlocks - dataBlocks][];
//calculate the size of this block group
int lenOfBlockGroup = group < blockGroupList.size() - 1 ?
blockSize * dataBlocks :
writeBytes - blockSize * (blockGroupList.size() - 1) * dataBlocks;
int intactStripes = lenOfBlockGroup / stripeDataSize();
int lastStripeLen = lenOfBlockGroup % stripeDataSize();
//for each block, use BlockReader to read data
for (int i = 0; i < blockList.size(); i++) {
@ -223,25 +224,17 @@ private void testOneFile(String src, int writeBytes)
InetSocketAddress targetAddr = NetUtils.createSocketAddr(
nodes[0].getXferAddr());
int lenOfCell = cellSize;
if (i == lastStripeLen / cellSize) {
lenOfCell = lastStripeLen % cellSize;
} else if (i > lastStripeLen / cellSize) {
lenOfCell = 0;
}
int lenOfBlock = cellSize * intactStripes + lenOfCell;
byte[] blockBytes = new byte[lenOfBlock];
byte[] blockBytes = new byte[(int)block.getNumBytes()];
if (i < dataBlocks) {
dataBlockBytes[i] = blockBytes;
} else {
parityBlockBytes[i - dataBlocks] = blockBytes;
}
if (lenOfBlock == 0) {
if (block.getNumBytes() == 0) {
continue;
}
block.setNumBytes(lenOfBlock);
BlockReader blockReader = new BlockReaderFactory(new DfsClientConf(conf)).
setFileName(src).
setBlock(block).
@ -276,33 +269,33 @@ public Peer newConnectedPeer(InetSocketAddress addr,
}
}).build();
blockReader.readAll(blockBytes, 0, lenOfBlock);
blockReader.readAll(blockBytes, 0, (int)block.getNumBytes());
blockReader.close();
}
//check if we write the data correctly
for (int i = 0; i < dataBlockBytes.length; i++) {
byte[] cells = dataBlockBytes[i];
if (cells == null) {
for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length; blkIdxInGroup++) {
byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup];
if (actualBlkBytes == null) {
continue;
}
for (int j = 0; j < cells.length; j++) {
for (int posInBlk = 0; posInBlk < actualBlkBytes.length; posInBlk++) {
byte expected;
//calculate the postion of this byte in the file
long pos = group * dataBlocks * blockSize
+ (i * cellSize + j / cellSize * cellSize * dataBlocks)
+ j % cellSize;
if (pos >= writeBytes) {
long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(cellSize,
dataBlocks, posInBlk, blkIdxInGroup) +
group * blockSize * dataBlocks;
if (posInFile >= writeBytes) {
expected = 0;
} else {
expected = getByte(pos);
expected = getByte(posInFile);
}
if (expected != cells[j]) {
Assert.fail("Unexpected byte " + cells[j] + ", expect " + expected
if (expected != actualBlkBytes[posInBlk]) {
Assert.fail("Unexpected byte " + actualBlkBytes[posInBlk] + ", expect " + expected
+ ". Block group index is " + group +
", stripe index is " + j / cellSize +
", cell index is " + i + ", byte index is " + j % cellSize);
", stripe index is " + posInBlk / cellSize +
", cell index is " + blkIdxInGroup + ", byte index is " + posInBlk % cellSize);
}
}
}

View File

@ -21,10 +21,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -34,10 +31,9 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -54,17 +50,18 @@ public class TestReadStripedFile {
private DistributedFileSystem fs;
private final Path dirPath = new Path("/striped");
private Path filePath = new Path(dirPath, "file");
private final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS;
private final short TOTAL_SIZE = HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
private final short BLK_GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM;
private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final int NUM_STRIPE_PER_BLOCK = 2;
private final int BLOCKSIZE = 2 * GROUP_SIZE * CELLSIZE;
private final int BLOCKSIZE = NUM_STRIPE_PER_BLOCK * DATA_BLK_NUM * CELLSIZE;
@Before
public void setup() throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
SimulatedFSDataset.setFactory(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(TOTAL_SIZE)
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(BLK_GROUP_SIZE)
.build();
cluster.waitActive();
fs = cluster.getFileSystem();
@ -77,72 +74,14 @@ public void tearDown() {
}
}
private LocatedStripedBlock createDummyLocatedBlock() {
final long blockGroupID = -1048576;
DatanodeInfo[] locs = new DatanodeInfo[TOTAL_SIZE];
String[] storageIDs = new String[TOTAL_SIZE];
StorageType[] storageTypes = new StorageType[TOTAL_SIZE];
int[] indices = new int[TOTAL_SIZE];
for (int i = 0; i < TOTAL_SIZE; i++) {
locs[i] = new DatanodeInfo(cluster.getDataNodes().get(i).getDatanodeId());
storageIDs[i] = cluster.getDataNodes().get(i).getDatanodeUuid();
storageTypes[i] = StorageType.DISK;
indices[i] = (i + 2) % GROUP_SIZE;
}
return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID),
locs, storageIDs, storageTypes, indices, 0, false, null);
}
@Test
public void testParseDummyStripedBlock() {
LocatedStripedBlock lsb = createDummyLocatedBlock();
LocatedBlock[] blocks = DFSStripedInputStream.parseStripedBlockGroup(
lsb, GROUP_SIZE, CELLSIZE);
assertEquals(GROUP_SIZE, blocks.length);
for (int j = 0; j < GROUP_SIZE; j++) {
assertFalse(blocks[j].isStriped());
assertEquals(j,
BlockIdManager.getBlockIndex(blocks[j].getBlock().getLocalBlock()));
assertEquals(j * CELLSIZE, blocks[j].getStartOffset());
}
}
@Test
public void testParseStripedBlock() throws Exception {
final int numBlocks = 4;
DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
NUM_STRIPE_PER_BLOCK);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, BLOCKSIZE * numBlocks);
assertEquals(4, lbs.locatedBlockCount());
List<LocatedBlock> lbList = lbs.getLocatedBlocks();
for (LocatedBlock lb : lbList) {
assertTrue(lb.isStriped());
}
for (int i = 0; i < numBlocks; i++) {
LocatedStripedBlock lsb = (LocatedStripedBlock) (lbs.get(i));
LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb,
GROUP_SIZE, CELLSIZE);
assertEquals(GROUP_SIZE, blks.length);
for (int j = 0; j < GROUP_SIZE; j++) {
assertFalse(blks[j].isStriped());
assertEquals(j,
BlockIdManager.getBlockIndex(blks[j].getBlock().getLocalBlock()));
assertEquals(i * BLOCKSIZE + j * CELLSIZE, blks[j].getStartOffset());
}
}
}
/**
* Test {@link DFSStripedInputStream#getBlockAt(long)}
*/
@Test
public void testGetBlock() throws Exception {
final int numBlocks = 4;
DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
NUM_STRIPE_PER_BLOCK);
DFSTestUtil.createStripedFile(cluster, filePath, dirPath, numBlocks,
NUM_STRIPE_PER_BLOCK, true);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, BLOCKSIZE * numBlocks);
final DFSStripedInputStream in =
@ -151,9 +90,9 @@ public void testGetBlock() throws Exception {
List<LocatedBlock> lbList = lbs.getLocatedBlocks();
for (LocatedBlock aLbList : lbList) {
LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList;
LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb,
GROUP_SIZE, CELLSIZE);
for (int j = 0; j < GROUP_SIZE; j++) {
LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(lsb,
CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM);
for (int j = 0; j < DATA_BLK_NUM; j++) {
LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset());
assertEquals(blks[j].getBlock(), refreshed.getBlock());
assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset());
@ -165,15 +104,16 @@ public void testGetBlock() throws Exception {
@Test
public void testPread() throws Exception {
final int numBlocks = 4;
DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
NUM_STRIPE_PER_BLOCK);
DFSTestUtil.createStripedFile(cluster, filePath, dirPath, numBlocks,
NUM_STRIPE_PER_BLOCK, true);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, BLOCKSIZE);
assert lbs.get(0) instanceof LocatedStripedBlock;
LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
for (int i = 0; i < GROUP_SIZE; i++) {
Block blk = new Block(bg.getBlock().getBlockId() + i, BLOCKSIZE,
for (int i = 0; i < DATA_BLK_NUM; i++) {
Block blk = new Block(bg.getBlock().getBlockId() + i,
NUM_STRIPE_PER_BLOCK * CELLSIZE,
bg.getBlock().getGenerationStamp());
blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
cluster.injectBlocks(i, Arrays.asList(blk),

View File

@ -34,11 +34,13 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
@ -53,6 +55,8 @@
import java.util.List;
import java.util.UUID;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
import static org.junit.Assert.assertEquals;
public class TestAddStripedBlocks {
@ -284,4 +288,107 @@ public void testAddUCReplica() throws Exception {
Assert.assertEquals(GROUP_SIZE - i - 1, indices[i]);
}
}
@Test
public void testCheckStripedReplicaCorrupt() throws Exception {
final int numBlocks = 4;
final int numStripes = 4;
final Path filePath = new Path("/corrupt");
final FSNamesystem ns = cluster.getNameNode().getNamesystem();
DFSTestUtil.createStripedFile(cluster, filePath, null,
numBlocks, numStripes, false);
INodeFile fileNode = ns.getFSDirectory().getINode(filePath.toString()).
asFile();
Assert.assertTrue(fileNode.isStriped());
BlockInfoStriped stored = fileNode.getStripedBlocksFeature().getBlocks()[0];
BlockManagerTestUtil.updateState(ns.getBlockManager());
Assert.assertEquals(0, ns.getCorruptReplicaBlocks());
// Now send a block report with correct size
DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
final Block reported = new Block(stored);
reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE);
StorageReceivedDeletedBlocks[] reports = DFSTestUtil
.makeReportForReceivedBlock(reported,
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
ns.processIncrementalBlockReport(
cluster.getDataNodes().get(0).getDatanodeId(), reports[0]);
BlockManagerTestUtil.updateState(ns.getBlockManager());
Assert.assertEquals(0, ns.getCorruptReplicaBlocks());
// Now send a block report with wrong size
reported.setBlockId(stored.getBlockId() + 1);
reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE - 1);
reports = DFSTestUtil.makeReportForReceivedBlock(reported,
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
ns.processIncrementalBlockReport(
cluster.getDataNodes().get(1).getDatanodeId(), reports[0]);
BlockManagerTestUtil.updateState(ns.getBlockManager());
Assert.assertEquals(1, ns.getCorruptReplicaBlocks());
// Now send a parity block report with correct size
reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS);
reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE);
reports = DFSTestUtil.makeReportForReceivedBlock(reported,
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
ns.processIncrementalBlockReport(
cluster.getDataNodes().get(2).getDatanodeId(), reports[0]);
BlockManagerTestUtil.updateState(ns.getBlockManager());
Assert.assertEquals(1, ns.getCorruptReplicaBlocks());
// Now send a parity block report with wrong size
reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS);
reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE + 1);
reports = DFSTestUtil.makeReportForReceivedBlock(reported,
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
ns.processIncrementalBlockReport(
cluster.getDataNodes().get(3).getDatanodeId(), reports[0]);
BlockManagerTestUtil.updateState(ns.getBlockManager());
Assert.assertEquals(2, ns.getCorruptReplicaBlocks());
// Now change the size of stored block, and test verifying the last
// block size
stored.setNumBytes(stored.getNumBytes() + 10);
reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS + 2);
reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE);
reports = DFSTestUtil.makeReportForReceivedBlock(reported,
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
ns.processIncrementalBlockReport(
cluster.getDataNodes().get(3).getDatanodeId(), reports[0]);
BlockManagerTestUtil.updateState(ns.getBlockManager());
Assert.assertEquals(3, ns.getCorruptReplicaBlocks());
// Now send a parity block report with correct size based on adjusted
// size of stored block
/** Now stored block has {@link numStripes} full stripes + a cell + 10 */
stored.setNumBytes(stored.getNumBytes() + BLOCK_STRIPED_CELL_SIZE);
reported.setBlockId(stored.getBlockId());
reported.setNumBytes((numStripes + 1) * BLOCK_STRIPED_CELL_SIZE);
reports = DFSTestUtil.makeReportForReceivedBlock(reported,
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
ns.processIncrementalBlockReport(
cluster.getDataNodes().get(0).getDatanodeId(), reports[0]);
BlockManagerTestUtil.updateState(ns.getBlockManager());
Assert.assertEquals(3, ns.getCorruptReplicaBlocks());
reported.setBlockId(stored.getBlockId() + 1);
reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE + 10);
reports = DFSTestUtil.makeReportForReceivedBlock(reported,
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
ns.processIncrementalBlockReport(
cluster.getDataNodes().get(1).getDatanodeId(), reports[0]);
BlockManagerTestUtil.updateState(ns.getBlockManager());
Assert.assertEquals(3, ns.getCorruptReplicaBlocks());
reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS);
reported.setNumBytes((numStripes + 1) * BLOCK_STRIPED_CELL_SIZE);
reports = DFSTestUtil.makeReportForReceivedBlock(reported,
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
ns.processIncrementalBlockReport(
cluster.getDataNodes().get(2).getDatanodeId(), reports[0]);
BlockManagerTestUtil.updateState(ns.getBlockManager());
Assert.assertEquals(3, ns.getCorruptReplicaBlocks());
}
}

View File

@ -78,7 +78,8 @@ public void tearDown() throws Exception {
@Test
public void testMissingStripedBlock() throws Exception {
final int numBlocks = 4;
DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks, 1);
DFSTestUtil.createStripedFile(cluster, filePath,
dirPath, numBlocks, 1, true);
// make sure the file is complete in NN
final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.parseStripedBlockGroup;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class TestStripedBlockUtil {
private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
private final short BLK_GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM;
private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private LocatedStripedBlock createDummyLocatedBlock() {
final long blockGroupID = -1048576;
DatanodeInfo[] locs = new DatanodeInfo[BLK_GROUP_SIZE];
String[] storageIDs = new String[BLK_GROUP_SIZE];
StorageType[] storageTypes = new StorageType[BLK_GROUP_SIZE];
int[] indices = new int[BLK_GROUP_SIZE];
for (int i = 0; i < BLK_GROUP_SIZE; i++) {
indices[i] = (i + 2) % DATA_BLK_NUM;
// Location port always equal to logical index of a block,
// for easier verification
locs[i] = DFSTestUtil.getLocalDatanodeInfo(indices[i]);
storageIDs[i] = locs[i].getDatanodeUuid();
storageTypes[i] = StorageType.DISK;
}
return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID),
locs, storageIDs, storageTypes, indices, 0, false, null);
}
@Test
public void testParseDummyStripedBlock() {
LocatedStripedBlock lsb = createDummyLocatedBlock();
LocatedBlock[] blocks = parseStripedBlockGroup(
lsb, CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM);
assertEquals(DATA_BLK_NUM + PARITY_BLK_NUM, blocks.length);
for (int i = 0; i < DATA_BLK_NUM; i++) {
assertFalse(blocks[i].isStriped());
assertEquals(i,
BlockIdManager.getBlockIndex(blocks[i].getBlock().getLocalBlock()));
assertEquals(i * CELLSIZE, blocks[i].getStartOffset());
assertEquals(1, blocks[i].getLocations().length);
assertEquals(i, blocks[i].getLocations()[0].getIpcPort());
assertEquals(i, blocks[i].getLocations()[0].getXferPort());
}
}
private void verifyInternalBlocks (long numBytesInGroup, long[] expected) {
for (int i = 1; i < BLK_GROUP_SIZE; i++) {
assertEquals(expected[i],
getInternalBlockLength(numBytesInGroup, CELLSIZE, DATA_BLK_NUM, i));
}
}
@Test
public void testGetInternalBlockLength () {
// A small delta that is smaller than a cell
final int delta = 10;
assert delta < CELLSIZE;
// Block group is smaller than a cell
verifyInternalBlocks(CELLSIZE - delta,
new long[] {CELLSIZE - delta, 0, 0, 0, 0, 0,
CELLSIZE - delta, CELLSIZE - delta, CELLSIZE - delta});
// Block group is exactly as large as a cell
verifyInternalBlocks(CELLSIZE,
new long[] {CELLSIZE, 0, 0, 0, 0, 0,
CELLSIZE, CELLSIZE, CELLSIZE});
// Block group is a little larger than a cell
verifyInternalBlocks(CELLSIZE + delta,
new long[] {CELLSIZE, delta, 0, 0, 0, 0,
CELLSIZE, CELLSIZE, CELLSIZE});
// Block group contains multiple stripes and ends at stripe boundary
verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE,
new long[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE});
// Block group contains multiple stripes and ends at cell boundary
// (not ending at stripe boundary)
verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE + CELLSIZE,
new long[] {3 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
3 * CELLSIZE, 3 * CELLSIZE, 3 * CELLSIZE});
// Block group contains multiple stripes and doesn't end at cell boundary
verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE - delta,
new long[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE - delta,
2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE});
}
}