HDFS-7621. Erasure Coding: update the Balancer/Mover data migration logic. Contributed by Walter Su.

This commit is contained in:
Zhe Zhang 2015-06-03 11:51:58 -07:00
parent 5f15084bd5
commit 673280df24
11 changed files with 452 additions and 78 deletions

View File

@ -277,3 +277,6 @@
HDFS-8453. Erasure coding: properly handle start offset for internal blocks
in a block group. (Zhe Zhang via jing9)
HDFS-7621. Erasure Coding: update the Balancer/Mover data migration logic.
(Walter Su via zhz)

View File

@ -211,6 +211,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@ -458,22 +459,32 @@ public class PBHelper {
}
public static BlockWithLocationsProto convert(BlockWithLocations blk) {
return BlockWithLocationsProto.newBuilder()
.setBlock(convert(blk.getBlock()))
BlockWithLocationsProto.Builder builder = BlockWithLocationsProto
.newBuilder().setBlock(convert(blk.getBlock()))
.addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
.addAllStorageUuids(Arrays.asList(blk.getStorageIDs()))
.addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()))
.build();
.addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()));
if (blk instanceof StripedBlockWithLocations) {
StripedBlockWithLocations sblk = (StripedBlockWithLocations) blk;
builder.setIndices(getByteString(sblk.getIndices()));
builder.setDataBlockNum(sblk.getDataBlockNum());
}
return builder.build();
}
public static BlockWithLocations convert(BlockWithLocationsProto b) {
final List<String> datanodeUuids = b.getDatanodeUuidsList();
final List<String> storageUuids = b.getStorageUuidsList();
final List<StorageTypeProto> storageTypes = b.getStorageTypesList();
return new BlockWithLocations(convert(b.getBlock()),
BlockWithLocations blk = new BlockWithLocations(convert(b.getBlock()),
datanodeUuids.toArray(new String[datanodeUuids.size()]),
storageUuids.toArray(new String[storageUuids.size()]),
convertStorageTypes(storageTypes, storageUuids.size()));
if (b.hasIndices()) {
blk = new StripedBlockWithLocations(blk, b.getIndices().toByteArray(),
(short) b.getDataBlockNum());
}
return blk;
}
public static BlocksWithLocationsProto convert(BlocksWithLocations blks) {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.balancer;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@ -65,6 +67,7 @@ import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
@ -124,16 +127,15 @@ public class Dispatcher {
private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();
/**
* Get the block from the map;
* if the block is not found, create a new block and put it in the map.
* Put block in the map if it's not found
* @return the block which be put in the map the first time
*/
private DBlock get(Block b) {
DBlock block = map.get(b);
if (block == null) {
block = new DBlock(b);
map.put(b, block);
private DBlock putIfAbsent(Block blk, DBlock dblk) {
if (!map.containsKey(blk)) {
map.put(blk, dblk);
return dblk;
}
return block;
return map.get(blk);
}
/** Remove all blocks except for the moved blocks. */
@ -176,9 +178,9 @@ public class Dispatcher {
}
}
/** This class keeps track of a scheduled block move */
/** This class keeps track of a scheduled reportedBlock move */
public class PendingMove {
private DBlock block;
private DBlock reportedBlock;
private Source source;
private DDatanode proxySource;
private StorageGroup target;
@ -190,7 +192,7 @@ public class Dispatcher {
@Override
public String toString() {
final Block b = block != null ? block.getBlock() : null;
final Block b = reportedBlock != null ? reportedBlock.getBlock() : null;
String bStr = b != null ? (b + " with size=" + b.getNumBytes() + " ")
: " ";
return bStr + "from " + source.getDisplayName() + " to " + target
@ -199,8 +201,8 @@ public class Dispatcher {
}
/**
* Choose a block & a proxy source for this pendingMove whose source &
* target have already been chosen.
* Choose a good block/blockGroup from source & Get reportedBlock from
* the block & Choose a proxy source for the reportedBlock.
*
* @return true if a block and its proxy are chosen; false otherwise
*/
@ -224,7 +226,11 @@ public class Dispatcher {
synchronized (block) {
synchronized (movedBlocks) {
if (isGoodBlockCandidate(source, target, targetStorageType, block)) {
this.block = block;
if (block instanceof DBlockStriped) {
reportedBlock = ((DBlockStriped) block).getInternalBlock(source);
} else {
reportedBlock = block;
}
if (chooseProxySource()) {
movedBlocks.put(block);
if (LOG.isDebugEnabled()) {
@ -251,7 +257,7 @@ public class Dispatcher {
}
// if node group is supported, first try add nodes in the same node group
if (cluster.isNodeGroupAware()) {
for (StorageGroup loc : block.getLocations()) {
for (StorageGroup loc : reportedBlock.getLocations()) {
if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN)
&& addTo(loc)) {
return true;
@ -259,13 +265,13 @@ public class Dispatcher {
}
}
// check if there is replica which is on the same rack with the target
for (StorageGroup loc : block.getLocations()) {
for (StorageGroup loc : reportedBlock.getLocations()) {
if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) {
return true;
}
}
// find out a non-busy replica
for (StorageGroup loc : block.getLocations()) {
for (StorageGroup loc : reportedBlock.getLocations()) {
if (addTo(loc)) {
return true;
}
@ -273,7 +279,7 @@ public class Dispatcher {
return false;
}
/** add to a proxy source for specific block movement */
/** add to a proxy source for specific reportedBlock movement */
private boolean addTo(StorageGroup g) {
final DDatanode dn = g.getDDatanode();
if (dn.addPendingBlock(this)) {
@ -288,6 +294,7 @@ public class Dispatcher {
if (LOG.isDebugEnabled()) {
LOG.debug("Start moving " + this);
}
assert !(reportedBlock instanceof DBlockStriped);
Socket sock = new Socket();
DataOutputStream out = null;
@ -302,7 +309,7 @@ public class Dispatcher {
OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
block.getBlock());
reportedBlock.getBlock());
final KeyManager km = nnc.getKeyManager();
Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
@ -316,7 +323,7 @@ public class Dispatcher {
sendRequest(out, eb, accessToken);
receiveResponse(in);
nnc.getBytesMoved().addAndGet(block.getNumBytes());
nnc.getBytesMoved().addAndGet(reportedBlock.getNumBytes());
LOG.info("Successfully moved " + this);
} catch (IOException e) {
LOG.warn("Failed to move " + this + ": " + e.getMessage());
@ -344,14 +351,14 @@ public class Dispatcher {
}
}
/** Send a block replace request to the output stream */
/** Send a reportedBlock replace request to the output stream */
private void sendRequest(DataOutputStream out, ExtendedBlock eb,
Token<BlockTokenIdentifier> accessToken) throws IOException {
new Sender(out).replaceBlock(eb, target.storageType, accessToken,
source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
}
/** Receive a block copy response from the input stream */
/** Receive a reportedBlock copy response from the input stream */
private void receiveResponse(DataInputStream in) throws IOException {
BlockOpResponseProto response =
BlockOpResponseProto.parseFrom(vintPrefixed(in));
@ -359,13 +366,13 @@ public class Dispatcher {
// read intermediate responses
response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
}
String logInfo = "block move is failed";
String logInfo = "reportedBlock move is failed";
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
}
/** reset the object */
private void reset() {
block = null;
reportedBlock = null;
source = null;
proxySource = null;
target = null;
@ -377,6 +384,44 @@ public class Dispatcher {
public DBlock(Block block) {
super(block);
}
public long getNumBytes(StorageGroup storage) {
return super.getNumBytes();
}
}
public static class DBlockStriped extends DBlock {
final byte[] indices;
final short dataBlockNum;
public DBlockStriped(Block block, byte[] indices, short dataBlockNum) {
super(block);
this.indices = indices;
this.dataBlockNum = dataBlockNum;
}
public DBlock getInternalBlock(StorageGroup storage) {
int idxInLocs = locations.indexOf(storage);
if (idxInLocs == -1) {
return null;
}
byte idxInGroup = indices[idxInLocs];
long blkId = getBlock().getBlockId() + idxInGroup;
long numBytes = getInternalBlockLength(getNumBytes(),
HdfsConstants.BLOCK_STRIPED_CELL_SIZE, dataBlockNum, idxInGroup);
Block blk = new Block(getBlock());
blk.setBlockId(blkId);
blk.setNumBytes(numBytes);
DBlock dblk = new DBlock(blk);
dblk.addLocation(storage);
return dblk;
}
@Override
public long getNumBytes(StorageGroup storage) {
return getInternalBlock(storage).getNumBytes();
}
}
/** The class represents a desired move. */
@ -452,7 +497,7 @@ public class Dispatcher {
private PendingMove addPendingMove(DBlock block, final PendingMove pm) {
if (getDDatanode().addPendingBlock(pm)) {
if (pm.markMovedIfGoodBlock(block, getStorageType())) {
incScheduledSize(pm.block.getNumBytes());
incScheduledSize(pm.reportedBlock.getNumBytes());
return pm;
} else {
getDDatanode().removePendingBlock(pm);
@ -612,19 +657,34 @@ public class Dispatcher {
*/
private long getBlockList() throws IOException {
final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
final BlocksWithLocations newBlksLocs =
nnc.getBlocks(getDatanodeInfo(), size);
long bytesReceived = 0;
for (BlockWithLocations blk : newBlocks.getBlocks()) {
bytesReceived += blk.getBlock().getNumBytes();
for (BlockWithLocations blkLocs : newBlksLocs.getBlocks()) {
DBlock block;
if (blkLocs instanceof StripedBlockWithLocations) {
StripedBlockWithLocations sblkLocs =
(StripedBlockWithLocations) blkLocs;
// approximate size
bytesReceived += sblkLocs.getBlock().getNumBytes() /
sblkLocs.getDataBlockNum();
block = new DBlockStriped(sblkLocs.getBlock(), sblkLocs.getIndices(),
sblkLocs.getDataBlockNum());
} else{
bytesReceived += blkLocs.getBlock().getNumBytes();
block = new DBlock(blkLocs.getBlock());
}
synchronized (globalBlocks) {
final DBlock block = globalBlocks.get(blk.getBlock());
block = globalBlocks.putIfAbsent(blkLocs.getBlock(), block);
synchronized (block) {
block.clearLocations();
// update locations
final String[] datanodeUuids = blk.getDatanodeUuids();
final StorageType[] storageTypes = blk.getStorageTypes();
final String[] datanodeUuids = blkLocs.getDatanodeUuids();
final StorageType[] storageTypes = blkLocs.getStorageTypes();
for (int i = 0; i < datanodeUuids.length; i++) {
final StorageGroup g = storageGroupMap.get(
datanodeUuids[i], storageTypes[i]);
@ -661,6 +721,8 @@ public class Dispatcher {
* target throttling has been considered. They are chosen only when they
* have the capacity to support this block move. The block should be
* dispatched immediately after this method is returned.
* If the block is a block group. Only the internal block on this source
* will be dispatched.
*
* @return a move that's good for the source to dispatch immediately.
*/
@ -672,7 +734,7 @@ public class Dispatcher {
if (target.addPendingBlock(pendingBlock)) {
// target is not busy, so do a tentative block allocation
if (pendingBlock.chooseBlockAndProxy()) {
long blockSize = pendingBlock.block.getNumBytes();
long blockSize = pendingBlock.reportedBlock.getNumBytes(this);
incScheduledSize(-blockSize);
task.size -= blockSize;
if (task.size == 0) {
@ -744,7 +806,7 @@ public class Dispatcher {
blocksToReceive -= getBlockList();
continue;
} catch (IOException e) {
LOG.warn("Exception while getting block list", e);
LOG.warn("Exception while getting reportedBlock list", e);
return;
}
} else {
@ -883,7 +945,7 @@ public class Dispatcher {
}
public void executePendingMove(final PendingMove p) {
// move the block
// move the reportedBlock
moveExecutor.execute(new Runnable() {
@Override
public void run() {
@ -928,17 +990,17 @@ public class Dispatcher {
}
}
// wait for all block moving to be done
// wait for all reportedBlock moving to be done
waitForMoveCompletion(targets);
return getBytesMoved() - bytesLastMoved;
}
/** The sleeping period before checking if block move is completed again */
/** The sleeping period before checking if reportedBlock move is completed again */
static private long blockMoveWaitTime = 30000L;
/**
* Wait for all block move confirmations.
* Wait for all reportedBlock move confirmations.
* @return true if there is failed move execution
*/
public static boolean waitForMoveCompletion(
@ -965,10 +1027,10 @@ public class Dispatcher {
}
/**
* Decide if the block is a good candidate to be moved from source to target.
* A block is a good candidate if
* Decide if the block/blockGroup is a good candidate to be moved from source
* to target. A block is a good candidate if
* 1. the block is not in the process of being moved/has not been moved;
* 2. the block does not have a replica on the target;
* 2. the block does not have a replica/internalBlock on the target;
* 3. doing the move does not reduce the number of racks that the block has
*/
private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
@ -985,7 +1047,7 @@ public class Dispatcher {
}
final DatanodeInfo targetDatanode = target.getDatanodeInfo();
if (source.getDatanodeInfo().equals(targetDatanode)) {
// the block is moved inside same DN
// the reportedBlock is moved inside same DN
return true;
}
@ -1068,7 +1130,7 @@ public class Dispatcher {
movedBlocks.cleanup();
}
/** set the sleeping period for block move completion check */
/** set the sleeping period for reportedBlock move completion check */
@VisibleForTesting
public static void setBlockMoveWaitTime(long time) {
blockMoveWaitTime = time;

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
@ -3265,9 +3266,10 @@ public class BlockManager {
/**
* Get all valid locations of the block & add the block to results
* return the length of the added block; 0 if the block is not added
* @return the length of the added block; 0 if the block is not added. If the
* added block is a block group, return its approximate internal block size
*/
private long addBlock(Block block, List<BlockWithLocations> results) {
private long addBlock(BlockInfo block, List<BlockWithLocations> results) {
final List<DatanodeStorageInfo> locations = getValidLocations(block);
if(locations.size() == 0) {
return 0;
@ -3281,11 +3283,25 @@ public class BlockManager {
storageIDs[i] = s.getStorageID();
storageTypes[i] = s.getStorageType();
}
results.add(new BlockWithLocations(block, datanodeUuids, storageIDs,
storageTypes));
BlockWithLocations blkWithLocs = new BlockWithLocations(block,
datanodeUuids, storageIDs, storageTypes);
if(block.isStriped()) {
BlockInfoStriped blockStriped = (BlockInfoStriped) block;
byte[] indices = new byte[locations.size()];
for (int i = 0; i < locations.size(); i++) {
indices[i] =
(byte) blockStriped.getStorageBlockIndex(locations.get(i));
}
results.add(new StripedBlockWithLocations(blkWithLocs, indices,
blockStriped.getDataBlockNum()));
// approximate size
return block.getNumBytes() / blockStriped.getDataBlockNum();
}else{
results.add(blkWithLocs);
return block.getNumBytes();
}
}
}
/**
* The given node is reporting that it received a certain block.

View File

@ -46,12 +46,15 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
@ -176,8 +179,20 @@ public class Mover {
}
}
DBlock newDBlock(Block block, List<MLocation> locations) {
final DBlock db = new DBlock(block);
DBlock newDBlock(LocatedBlock lb, List<MLocation> locations,
ECSchema ecSchema) {
Block blk = lb.getBlock().getLocalBlock();
DBlock db;
if (lb.isStriped()) {
LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
byte[] indices = new byte[lsb.getBlockIndices().length];
for (int i = 0; i < indices.length; i++) {
indices[i] = (byte) lsb.getBlockIndices()[i];
}
db = new DBlockStriped(blk, indices, (short) ecSchema.getNumDataUnits());
} else {
db = new DBlock(blk);
}
for(MLocation ml : locations) {
StorageGroup source = storages.getSource(ml);
if (source != null) {
@ -358,9 +373,10 @@ public class Mover {
LOG.warn("Failed to get the storage policy of file " + fullPath);
return false;
}
final List<StorageType> types = policy.chooseStorageTypes(
List<StorageType> types = policy.chooseStorageTypes(
status.getReplication());
final ECSchema ecSchema = status.getECSchema();
final LocatedBlocks locatedBlocks = status.getBlockLocations();
boolean hasRemaining = false;
final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
@ -371,10 +387,13 @@ public class Mover {
continue;
}
LocatedBlock lb = lbs.get(i);
if (lb.isStriped()) {
types = policy.chooseStorageTypes((short) lb.getLocations().length);
}
final StorageTypeDiff diff = new StorageTypeDiff(types,
lb.getStorageTypes());
if (!diff.removeOverlap(true)) {
if (scheduleMoves4Block(diff, lb)) {
if (scheduleMoves4Block(diff, lb, ecSchema)) {
hasRemaining |= (diff.existing.size() > 1 &&
diff.expected.size() > 1);
}
@ -383,10 +402,13 @@ public class Mover {
return hasRemaining;
}
boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb,
ECSchema ecSchema) {
final List<MLocation> locations = MLocation.toLocations(lb);
if (!(lb instanceof LocatedStripedBlock)) {
Collections.shuffle(locations);
final DBlock db = newDBlock(lb.getBlock().getLocalBlock(), locations);
}
final DBlock db = newDBlock(lb, locations, ecSchema);
for (final StorageType t : diff.existing) {
for (final MLocation ml : locations) {

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.protocol;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
@ -91,6 +92,30 @@ public class BlocksWithLocations {
}
}
public static class StripedBlockWithLocations extends BlockWithLocations {
final byte[] indices;
final short dataBlockNum;
public StripedBlockWithLocations(BlockWithLocations blk, byte[] indices,
short dataBlockNum) {
super(blk.getBlock(), blk.getDatanodeUuids(), blk.getStorageIDs(),
blk.getStorageTypes());
Preconditions.checkArgument(
blk.getDatanodeUuids().length == indices.length);
this.indices = indices;
this.dataBlockNum = dataBlockNum;
}
public byte[] getIndices() {
return indices;
}
public short getDataBlockNum() {
return dataBlockNum;
}
}
private final BlockWithLocations[] blocks;
/** Constructor with one parameter */

View File

@ -532,6 +532,9 @@ message BlockWithLocationsProto {
repeated string datanodeUuids = 2; // Datanodes with replicas of the block
repeated string storageUuids = 3; // Storages with replicas of the block
repeated StorageTypeProto storageTypes = 4;
optional bytes indices = 5;
optional uint32 dataBlockNum = 6;
}
/**

View File

@ -1964,4 +1964,19 @@ public class DFSTestUtil {
out.flushInternal();
return out.getBlock();
}
/**
* Verify that blocks in striped block group are on different nodes.
*/
public static void verifyLocatedStripedBlocks(LocatedBlocks lbs,
int groupSize) {
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
HashSet<DatanodeInfo> locs = new HashSet<>();
for (DatanodeInfo datanodeInfo : lb.getLocations()) {
locs.add(datanodeInfo);
}
assertEquals(groupSize, lb.getLocations().length);
assertEquals(groupSize, locs.size());
}
}
}

View File

@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -189,40 +190,58 @@ public class TestPBHelper {
assertEquals(b, b2);
}
private static BlockWithLocations getBlockWithLocations(int bid) {
private static BlockWithLocations getBlockWithLocations(
int bid, boolean isStriped) {
final String[] datanodeUuids = {"dn1", "dn2", "dn3"};
final String[] storageIDs = {"s1", "s2", "s3"};
final StorageType[] storageTypes = {
StorageType.DISK, StorageType.DISK, StorageType.DISK};
return new BlockWithLocations(new Block(bid, 0, 1),
final byte[] indices = {0, 1, 2};
final short dataBlkNum = 6;
BlockWithLocations blkLocs = new BlockWithLocations(new Block(bid, 0, 1),
datanodeUuids, storageIDs, storageTypes);
if (isStriped) {
blkLocs = new StripedBlockWithLocations(blkLocs, indices, dataBlkNum);
}
return blkLocs;
}
private void compare(BlockWithLocations locs1, BlockWithLocations locs2) {
assertEquals(locs1.getBlock(), locs2.getBlock());
assertTrue(Arrays.equals(locs1.getStorageIDs(), locs2.getStorageIDs()));
if (locs1 instanceof StripedBlockWithLocations) {
assertTrue(Arrays.equals(((StripedBlockWithLocations) locs1).getIndices(),
((StripedBlockWithLocations) locs2).getIndices()));
}
}
@Test
public void testConvertBlockWithLocations() {
BlockWithLocations locs = getBlockWithLocations(1);
boolean[] testSuite = new boolean[]{false, true};
for (int i = 0; i < testSuite.length; i++) {
BlockWithLocations locs = getBlockWithLocations(1, testSuite[i]);
BlockWithLocationsProto locsProto = PBHelper.convert(locs);
BlockWithLocations locs2 = PBHelper.convert(locsProto);
compare(locs, locs2);
}
}
@Test
public void testConvertBlocksWithLocations() {
boolean[] testSuite = new boolean[]{false, true};
for (int i = 0; i < testSuite.length; i++) {
BlockWithLocations[] list = new BlockWithLocations[]{
getBlockWithLocations(1), getBlockWithLocations(2) };
getBlockWithLocations(1, testSuite[i]),
getBlockWithLocations(2, testSuite[i])};
BlocksWithLocations locs = new BlocksWithLocations(list);
BlocksWithLocationsProto locsProto = PBHelper.convert(locs);
BlocksWithLocations locs2 = PBHelper.convert(locsProto);
BlockWithLocations[] blocks = locs.getBlocks();
BlockWithLocations[] blocks2 = locs2.getBlocks();
assertEquals(blocks.length, blocks2.length);
for (int i = 0; i < blocks.length; i++) {
compare(blocks[i], blocks2[i]);
for (int j = 0; j < blocks.length; j++) {
compare(blocks[j], blocks2[j]);
}
}
}

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
@ -133,6 +134,21 @@ public class TestBalancer {
LazyPersistTestCase.initCacheManipulator();
}
int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
int groupSize = dataBlocks + parityBlocks;
private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final static int stripesPerBlock = 4;
static int DEFAULT_STRIPE_BLOCK_SIZE = cellSize * stripesPerBlock;
static void initConfWithStripe(Configuration conf) {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_STRIPE_BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
SimulatedFSDataset.setFactory(conf);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
}
/* create a file with a length of <code>fileLen</code> */
static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
short replicationFactor, int nnIndex)
@ -1452,6 +1468,66 @@ public class TestBalancer {
}
}
@Test(timeout = 100000)
public void testBalancerWithStripedFile() throws Exception {
Configuration conf = new Configuration();
initConfWithStripe(conf);
int numOfDatanodes = dataBlocks + parityBlocks + 2;
int numOfRacks = dataBlocks;
long capacity = 20 * DEFAULT_STRIPE_BLOCK_SIZE;
long[] capacities = new long[numOfDatanodes];
for (int i = 0; i < capacities.length; i++) {
capacities[i] = capacity;
}
String[] racks = new String[numOfDatanodes];
for (int i = 0; i < numOfDatanodes; i++) {
racks[i] = "/rack" + (i % numOfRacks);
}
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
.racks(racks)
.simulatedCapacities(capacities)
.build();
try {
cluster.waitActive();
client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
ClientProtocol.class).getProxy();
client.createErasureCodingZone("/", null, 0);
long totalCapacity = sum(capacities);
// fill up the cluster with 30% data. It'll be 45% full plus parity.
long fileLen = totalCapacity * 3 / 10;
long totalUsedSpace = fileLen * (dataBlocks + parityBlocks) / dataBlocks;
FileSystem fs = cluster.getFileSystem(0);
DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, r.nextLong());
// verify locations of striped blocks
LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
// add one datanode
String newRack = "/rack" + (++numOfRacks);
cluster.startDataNodes(conf, 1, true, null,
new String[]{newRack}, null, new long[]{capacity});
totalCapacity += capacity;
cluster.triggerHeartbeats();
// run balancer and validate results
Balancer.Parameters p = Balancer.Parameters.DEFAULT;
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
runBalancer(conf, totalUsedSpace, totalCapacity, p, 0);
// verify locations of striped blocks
locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
} finally {
cluster.shutdown();
}
}
/**
* @param args
*/

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.DFSTestUtil;
@ -34,10 +35,16 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.test.GenericTestUtils;
@ -83,7 +90,7 @@ public class TestMover {
final LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
final List<MLocation> locations = MLocation.toLocations(lb);
final MLocation ml = locations.get(0);
final DBlock db = mover.newDBlock(lb.getBlock().getLocalBlock(), locations);
final DBlock db = mover.newDBlock(lb, locations, null);
final List<StorageType> storageTypes = new ArrayList<StorageType>(
Arrays.asList(StorageType.DEFAULT, StorageType.DEFAULT));
@ -361,4 +368,119 @@ public class TestMover {
cluster.shutdown();
}
}
int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final static int stripesPerBlock = 4;
static int DEFAULT_STRIPE_BLOCK_SIZE = cellSize * stripesPerBlock;
static void initConfWithStripe(Configuration conf) {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_STRIPE_BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
Dispatcher.setBlockMoveWaitTime(3000L);
}
@Test(timeout = 300000)
public void testMoverWithStripedFile() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConfWithStripe(conf);
// start 10 datanodes
int numOfDatanodes =10;
int storagesPerDatanode=2;
long capacity = 10 * DEFAULT_STRIPE_BLOCK_SIZE;
long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
for (int i = 0; i < numOfDatanodes; i++) {
for(int j=0;j<storagesPerDatanode;j++){
capacities[i][j]=capacity;
}
}
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
.storagesPerDatanode(storagesPerDatanode)
.storageTypes(new StorageType[][]{
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE}})
.storageCapacities(capacities)
.build();
try {
cluster.waitActive();
// set "/bar" directory with HOT storage policy.
ClientProtocol client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
String barDir = "/bar";
client.mkdirs(barDir, new FsPermission((short) 777), true);
client.setStoragePolicy(barDir,
HdfsServerConstants.HOT_STORAGE_POLICY_NAME);
// set "/bar" directory with EC zone.
client.createErasureCodingZone(barDir, null, 0);
// write file to barDir
final String fooFile = "/bar/foo";
long fileLen = 20 * DEFAULT_STRIPE_BLOCK_SIZE ;
DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
fileLen,(short) 3, 0);
// verify storage types and locations
LocatedBlocks locatedBlocks =
client.getBlockLocations(fooFile, 0, fileLen);
for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){
for( StorageType type : lb.getStorageTypes()){
Assert.assertEquals(StorageType.DISK, type);
}
}
DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
dataBlocks + parityBlocks);
// start 5 more datanodes
numOfDatanodes +=5;
capacities = new long[5][storagesPerDatanode];
for (int i = 0; i < 5; i++) {
for(int j=0;j<storagesPerDatanode;j++){
capacities[i][j]=capacity;
}
}
cluster.startDataNodes(conf, 5,
new StorageType[][]{
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}},
true, null, null, null,capacities, null, false, false, false, null);
cluster.triggerHeartbeats();
// move file to ARCHIVE
client.setStoragePolicy(barDir, "COLD");
// run Mover
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] { "-p", barDir });
Assert.assertEquals("Movement to ARCHIVE should be successfull", 0, rc);
// verify storage types and locations
locatedBlocks = client.getBlockLocations(fooFile, 0, fileLen);
for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){
for( StorageType type : lb.getStorageTypes()){
Assert.assertEquals(StorageType.ARCHIVE, type);
}
}
DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
dataBlocks + parityBlocks);
}finally{
cluster.shutdown();
}
}
}