From 673280df24f0228bf01777035ceeab8807da8c40 Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Wed, 3 Jun 2015 11:51:58 -0700 Subject: [PATCH] HDFS-7621. Erasure Coding: update the Balancer/Mover data migration logic. Contributed by Walter Su. --- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../hadoop/hdfs/protocolPB/PBHelper.java | 21 ++- .../hdfs/server/balancer/Dispatcher.java | 148 +++++++++++++----- .../server/blockmanagement/BlockManager.java | 26 ++- .../hadoop/hdfs/server/mover/Mover.java | 38 ++++- .../server/protocol/BlocksWithLocations.java | 25 +++ .../hadoop-hdfs/src/main/proto/hdfs.proto | 3 + .../org/apache/hadoop/hdfs/DFSTestUtil.java | 15 ++ .../hadoop/hdfs/protocolPB/TestPBHelper.java | 51 ++++-- .../hdfs/server/balancer/TestBalancer.java | 76 +++++++++ .../hadoop/hdfs/server/mover/TestMover.java | 124 ++++++++++++++- 11 files changed, 452 insertions(+), 78 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 278f897e42..511ebecc20 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -277,3 +277,6 @@ HDFS-8453. Erasure coding: properly handle start offset for internal blocks in a block group. (Zhe Zhang via jing9) + + HDFS-7621. Erasure Coding: update the Balancer/Mover data migration logic. + (Walter Su via zhz) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index b2415fa83b..0bfc3bbf3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -211,6 +211,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -458,22 +459,32 @@ public static StripedBlockProto convert(BlockInfoStriped blk) { } public static BlockWithLocationsProto convert(BlockWithLocations blk) { - return BlockWithLocationsProto.newBuilder() - .setBlock(convert(blk.getBlock())) + BlockWithLocationsProto.Builder builder = BlockWithLocationsProto + .newBuilder().setBlock(convert(blk.getBlock())) .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids())) .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())) - .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes())) - .build(); + .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes())); + if (blk instanceof StripedBlockWithLocations) { + StripedBlockWithLocations sblk = (StripedBlockWithLocations) blk; + builder.setIndices(getByteString(sblk.getIndices())); + builder.setDataBlockNum(sblk.getDataBlockNum()); + } + return builder.build(); } public static BlockWithLocations convert(BlockWithLocationsProto b) { final List datanodeUuids = b.getDatanodeUuidsList(); final List storageUuids = b.getStorageUuidsList(); final List storageTypes = b.getStorageTypesList(); - return new BlockWithLocations(convert(b.getBlock()), + BlockWithLocations blk = new BlockWithLocations(convert(b.getBlock()), datanodeUuids.toArray(new String[datanodeUuids.size()]), storageUuids.toArray(new String[storageUuids.size()]), convertStorageTypes(storageTypes, storageUuids.size())); + if (b.hasIndices()) { + blk = new StripedBlockWithLocations(blk, b.getIndices().toByteArray(), + (short) b.getDataBlockNum()); + } + return blk; } public static BlocksWithLocationsProto convert(BlocksWithLocations blks) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 4a8f40fa33..930001aa51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.balancer; import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; @@ -65,6 +67,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; @@ -124,18 +127,17 @@ private static class GlobalBlockMap { private final Map map = new HashMap(); /** - * Get the block from the map; - * if the block is not found, create a new block and put it in the map. + * Put block in the map if it's not found + * @return the block which be put in the map the first time */ - private DBlock get(Block b) { - DBlock block = map.get(b); - if (block == null) { - block = new DBlock(b); - map.put(b, block); + private DBlock putIfAbsent(Block blk, DBlock dblk) { + if (!map.containsKey(blk)) { + map.put(blk, dblk); + return dblk; } - return block; + return map.get(blk); } - + /** Remove all blocks except for the moved blocks. */ private void removeAllButRetain(MovedBlocks movedBlocks) { for (Iterator i = map.keySet().iterator(); i.hasNext();) { @@ -176,9 +178,9 @@ public Collection values() { } } - /** This class keeps track of a scheduled block move */ + /** This class keeps track of a scheduled reportedBlock move */ public class PendingMove { - private DBlock block; + private DBlock reportedBlock; private Source source; private DDatanode proxySource; private StorageGroup target; @@ -190,7 +192,7 @@ private PendingMove(Source source, StorageGroup target) { @Override public String toString() { - final Block b = block != null ? block.getBlock() : null; + final Block b = reportedBlock != null ? reportedBlock.getBlock() : null; String bStr = b != null ? (b + " with size=" + b.getNumBytes() + " ") : " "; return bStr + "from " + source.getDisplayName() + " to " + target @@ -199,8 +201,8 @@ public String toString() { } /** - * Choose a block & a proxy source for this pendingMove whose source & - * target have already been chosen. + * Choose a good block/blockGroup from source & Get reportedBlock from + * the block & Choose a proxy source for the reportedBlock. * * @return true if a block and its proxy are chosen; false otherwise */ @@ -224,7 +226,11 @@ private boolean markMovedIfGoodBlock(DBlock block, StorageType targetStorageType synchronized (block) { synchronized (movedBlocks) { if (isGoodBlockCandidate(source, target, targetStorageType, block)) { - this.block = block; + if (block instanceof DBlockStriped) { + reportedBlock = ((DBlockStriped) block).getInternalBlock(source); + } else { + reportedBlock = block; + } if (chooseProxySource()) { movedBlocks.put(block); if (LOG.isDebugEnabled()) { @@ -251,7 +257,7 @@ private boolean chooseProxySource() { } // if node group is supported, first try add nodes in the same node group if (cluster.isNodeGroupAware()) { - for (StorageGroup loc : block.getLocations()) { + for (StorageGroup loc : reportedBlock.getLocations()) { if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN) && addTo(loc)) { return true; @@ -259,13 +265,13 @@ && addTo(loc)) { } } // check if there is replica which is on the same rack with the target - for (StorageGroup loc : block.getLocations()) { + for (StorageGroup loc : reportedBlock.getLocations()) { if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) { return true; } } // find out a non-busy replica - for (StorageGroup loc : block.getLocations()) { + for (StorageGroup loc : reportedBlock.getLocations()) { if (addTo(loc)) { return true; } @@ -273,7 +279,7 @@ && addTo(loc)) { return false; } - /** add to a proxy source for specific block movement */ + /** add to a proxy source for specific reportedBlock movement */ private boolean addTo(StorageGroup g) { final DDatanode dn = g.getDDatanode(); if (dn.addPendingBlock(this)) { @@ -288,6 +294,7 @@ private void dispatch() { if (LOG.isDebugEnabled()) { LOG.debug("Start moving " + this); } + assert !(reportedBlock instanceof DBlockStriped); Socket sock = new Socket(); DataOutputStream out = null; @@ -302,7 +309,7 @@ private void dispatch() { OutputStream unbufOut = sock.getOutputStream(); InputStream unbufIn = sock.getInputStream(); ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), - block.getBlock()); + reportedBlock.getBlock()); final KeyManager km = nnc.getKeyManager(); Token accessToken = km.getAccessToken(eb); IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, @@ -316,7 +323,7 @@ private void dispatch() { sendRequest(out, eb, accessToken); receiveResponse(in); - nnc.getBytesMoved().addAndGet(block.getNumBytes()); + nnc.getBytesMoved().addAndGet(reportedBlock.getNumBytes()); LOG.info("Successfully moved " + this); } catch (IOException e) { LOG.warn("Failed to move " + this + ": " + e.getMessage()); @@ -344,14 +351,14 @@ private void dispatch() { } } - /** Send a block replace request to the output stream */ + /** Send a reportedBlock replace request to the output stream */ private void sendRequest(DataOutputStream out, ExtendedBlock eb, Token accessToken) throws IOException { new Sender(out).replaceBlock(eb, target.storageType, accessToken, source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); } - /** Receive a block copy response from the input stream */ + /** Receive a reportedBlock copy response from the input stream */ private void receiveResponse(DataInputStream in) throws IOException { BlockOpResponseProto response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); @@ -359,13 +366,13 @@ private void receiveResponse(DataInputStream in) throws IOException { // read intermediate responses response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); } - String logInfo = "block move is failed"; + String logInfo = "reportedBlock move is failed"; DataTransferProtoUtil.checkBlockOpStatus(response, logInfo); } /** reset the object */ private void reset() { - block = null; + reportedBlock = null; source = null; proxySource = null; target = null; @@ -377,6 +384,44 @@ public static class DBlock extends MovedBlocks.Locations { public DBlock(Block block) { super(block); } + + public long getNumBytes(StorageGroup storage) { + return super.getNumBytes(); + } + } + + public static class DBlockStriped extends DBlock { + + final byte[] indices; + final short dataBlockNum; + + public DBlockStriped(Block block, byte[] indices, short dataBlockNum) { + super(block); + this.indices = indices; + this.dataBlockNum = dataBlockNum; + } + + public DBlock getInternalBlock(StorageGroup storage) { + int idxInLocs = locations.indexOf(storage); + if (idxInLocs == -1) { + return null; + } + byte idxInGroup = indices[idxInLocs]; + long blkId = getBlock().getBlockId() + idxInGroup; + long numBytes = getInternalBlockLength(getNumBytes(), + HdfsConstants.BLOCK_STRIPED_CELL_SIZE, dataBlockNum, idxInGroup); + Block blk = new Block(getBlock()); + blk.setBlockId(blkId); + blk.setNumBytes(numBytes); + DBlock dblk = new DBlock(blk); + dblk.addLocation(storage); + return dblk; + } + + @Override + public long getNumBytes(StorageGroup storage) { + return getInternalBlock(storage).getNumBytes(); + } } /** The class represents a desired move. */ @@ -452,7 +497,7 @@ synchronized void resetScheduledSize() { private PendingMove addPendingMove(DBlock block, final PendingMove pm) { if (getDDatanode().addPendingBlock(pm)) { if (pm.markMovedIfGoodBlock(block, getStorageType())) { - incScheduledSize(pm.block.getNumBytes()); + incScheduledSize(pm.reportedBlock.getNumBytes()); return pm; } else { getDDatanode().removePendingBlock(pm); @@ -612,19 +657,34 @@ Iterator getBlockIterator() { */ private long getBlockList() throws IOException { final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); - final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size); + final BlocksWithLocations newBlksLocs = + nnc.getBlocks(getDatanodeInfo(), size); long bytesReceived = 0; - for (BlockWithLocations blk : newBlocks.getBlocks()) { - bytesReceived += blk.getBlock().getNumBytes(); + for (BlockWithLocations blkLocs : newBlksLocs.getBlocks()) { + + DBlock block; + if (blkLocs instanceof StripedBlockWithLocations) { + StripedBlockWithLocations sblkLocs = + (StripedBlockWithLocations) blkLocs; + // approximate size + bytesReceived += sblkLocs.getBlock().getNumBytes() / + sblkLocs.getDataBlockNum(); + block = new DBlockStriped(sblkLocs.getBlock(), sblkLocs.getIndices(), + sblkLocs.getDataBlockNum()); + } else{ + bytesReceived += blkLocs.getBlock().getNumBytes(); + block = new DBlock(blkLocs.getBlock()); + } + synchronized (globalBlocks) { - final DBlock block = globalBlocks.get(blk.getBlock()); + block = globalBlocks.putIfAbsent(blkLocs.getBlock(), block); synchronized (block) { block.clearLocations(); // update locations - final String[] datanodeUuids = blk.getDatanodeUuids(); - final StorageType[] storageTypes = blk.getStorageTypes(); + final String[] datanodeUuids = blkLocs.getDatanodeUuids(); + final StorageType[] storageTypes = blkLocs.getStorageTypes(); for (int i = 0; i < datanodeUuids.length; i++) { final StorageGroup g = storageGroupMap.get( datanodeUuids[i], storageTypes[i]); @@ -661,6 +721,8 @@ private boolean isGoodBlockCandidate(DBlock block) { * target throttling has been considered. They are chosen only when they * have the capacity to support this block move. The block should be * dispatched immediately after this method is returned. + * If the block is a block group. Only the internal block on this source + * will be dispatched. * * @return a move that's good for the source to dispatch immediately. */ @@ -672,7 +734,7 @@ private PendingMove chooseNextMove() { if (target.addPendingBlock(pendingBlock)) { // target is not busy, so do a tentative block allocation if (pendingBlock.chooseBlockAndProxy()) { - long blockSize = pendingBlock.block.getNumBytes(); + long blockSize = pendingBlock.reportedBlock.getNumBytes(this); incScheduledSize(-blockSize); task.size -= blockSize; if (task.size == 0) { @@ -744,7 +806,7 @@ private void dispatchBlocks() { blocksToReceive -= getBlockList(); continue; } catch (IOException e) { - LOG.warn("Exception while getting block list", e); + LOG.warn("Exception while getting reportedBlock list", e); return; } } else { @@ -883,7 +945,7 @@ public DDatanode newDatanode(DatanodeInfo datanode) { } public void executePendingMove(final PendingMove p) { - // move the block + // move the reportedBlock moveExecutor.execute(new Runnable() { @Override public void run() { @@ -928,17 +990,17 @@ public void run() { } } - // wait for all block moving to be done + // wait for all reportedBlock moving to be done waitForMoveCompletion(targets); return getBytesMoved() - bytesLastMoved; } - /** The sleeping period before checking if block move is completed again */ + /** The sleeping period before checking if reportedBlock move is completed again */ static private long blockMoveWaitTime = 30000L; /** - * Wait for all block move confirmations. + * Wait for all reportedBlock move confirmations. * @return true if there is failed move execution */ public static boolean waitForMoveCompletion( @@ -965,10 +1027,10 @@ public static boolean waitForMoveCompletion( } /** - * Decide if the block is a good candidate to be moved from source to target. - * A block is a good candidate if + * Decide if the block/blockGroup is a good candidate to be moved from source + * to target. A block is a good candidate if * 1. the block is not in the process of being moved/has not been moved; - * 2. the block does not have a replica on the target; + * 2. the block does not have a replica/internalBlock on the target; * 3. doing the move does not reduce the number of racks that the block has */ private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target, @@ -985,7 +1047,7 @@ private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target, } final DatanodeInfo targetDatanode = target.getDatanodeInfo(); if (source.getDatanodeInfo().equals(targetDatanode)) { - // the block is moved inside same DN + // the reportedBlock is moved inside same DN return true; } @@ -1068,7 +1130,7 @@ void reset(Configuration conf) { movedBlocks.cleanup(); } - /** set the sleeping period for block move completion check */ + /** set the sleeping period for reportedBlock move completion check */ @VisibleForTesting public static void setBlockMoveWaitTime(long time) { blockMoveWaitTime = time; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 32757f925b..48a1b35d45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; @@ -3265,9 +3266,10 @@ public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) { /** * Get all valid locations of the block & add the block to results - * return the length of the added block; 0 if the block is not added + * @return the length of the added block; 0 if the block is not added. If the + * added block is a block group, return its approximate internal block size */ - private long addBlock(Block block, List results) { + private long addBlock(BlockInfo block, List results) { final List locations = getValidLocations(block); if(locations.size() == 0) { return 0; @@ -3281,9 +3283,23 @@ private long addBlock(Block block, List results) { storageIDs[i] = s.getStorageID(); storageTypes[i] = s.getStorageType(); } - results.add(new BlockWithLocations(block, datanodeUuids, storageIDs, - storageTypes)); - return block.getNumBytes(); + BlockWithLocations blkWithLocs = new BlockWithLocations(block, + datanodeUuids, storageIDs, storageTypes); + if(block.isStriped()) { + BlockInfoStriped blockStriped = (BlockInfoStriped) block; + byte[] indices = new byte[locations.size()]; + for (int i = 0; i < locations.size(); i++) { + indices[i] = + (byte) blockStriped.getStorageBlockIndex(locations.get(i)); + } + results.add(new StripedBlockWithLocations(blkWithLocs, indices, + blockStriped.getDataBlockNum())); + // approximate size + return block.getNumBytes() / blockStriped.getDataBlockNum(); + }else{ + results.add(blkWithLocs); + return block.getNumBytes(); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 8715ce4cd9..ddfd1ea546 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -46,12 +46,15 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; + import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; @@ -176,8 +179,20 @@ private ExitStatus run() { } } - DBlock newDBlock(Block block, List locations) { - final DBlock db = new DBlock(block); + DBlock newDBlock(LocatedBlock lb, List locations, + ECSchema ecSchema) { + Block blk = lb.getBlock().getLocalBlock(); + DBlock db; + if (lb.isStriped()) { + LocatedStripedBlock lsb = (LocatedStripedBlock) lb; + byte[] indices = new byte[lsb.getBlockIndices().length]; + for (int i = 0; i < indices.length; i++) { + indices[i] = (byte) lsb.getBlockIndices()[i]; + } + db = new DBlockStriped(blk, indices, (short) ecSchema.getNumDataUnits()); + } else { + db = new DBlock(blk); + } for(MLocation ml : locations) { StorageGroup source = storages.getSource(ml); if (source != null) { @@ -358,9 +373,10 @@ private boolean processFile(String fullPath, HdfsLocatedFileStatus status) { LOG.warn("Failed to get the storage policy of file " + fullPath); return false; } - final List types = policy.chooseStorageTypes( + List types = policy.chooseStorageTypes( status.getReplication()); + final ECSchema ecSchema = status.getECSchema(); final LocatedBlocks locatedBlocks = status.getBlockLocations(); boolean hasRemaining = false; final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete(); @@ -371,10 +387,13 @@ private boolean processFile(String fullPath, HdfsLocatedFileStatus status) { continue; } LocatedBlock lb = lbs.get(i); + if (lb.isStriped()) { + types = policy.chooseStorageTypes((short) lb.getLocations().length); + } final StorageTypeDiff diff = new StorageTypeDiff(types, lb.getStorageTypes()); if (!diff.removeOverlap(true)) { - if (scheduleMoves4Block(diff, lb)) { + if (scheduleMoves4Block(diff, lb, ecSchema)) { hasRemaining |= (diff.existing.size() > 1 && diff.expected.size() > 1); } @@ -383,10 +402,13 @@ private boolean processFile(String fullPath, HdfsLocatedFileStatus status) { return hasRemaining; } - boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) { + boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb, + ECSchema ecSchema) { final List locations = MLocation.toLocations(lb); - Collections.shuffle(locations); - final DBlock db = newDBlock(lb.getBlock().getLocalBlock(), locations); + if (!(lb instanceof LocatedStripedBlock)) { + Collections.shuffle(locations); + } + final DBlock db = newDBlock(lb, locations, ecSchema); for (final StorageType t : diff.existing) { for (final MLocation ml : locations) { @@ -729,4 +751,4 @@ public static void main(String[] args) { System.exit(-1); } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java index a985dbdc8c..0507faf1af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.protocol; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.StorageType; @@ -91,6 +92,30 @@ private StringBuilder appendString(int i, StringBuilder b) { } } + public static class StripedBlockWithLocations extends BlockWithLocations { + final byte[] indices; + final short dataBlockNum; + + public StripedBlockWithLocations(BlockWithLocations blk, byte[] indices, + short dataBlockNum) { + super(blk.getBlock(), blk.getDatanodeUuids(), blk.getStorageIDs(), + blk.getStorageTypes()); + Preconditions.checkArgument( + blk.getDatanodeUuids().length == indices.length); + this.indices = indices; + this.dataBlockNum = dataBlockNum; + + } + + public byte[] getIndices() { + return indices; + } + + public short getDataBlockNum() { + return dataBlockNum; + } + } + private final BlockWithLocations[] blocks; /** Constructor with one parameter */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto index f64cf8f025..e6db596bc9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto @@ -532,6 +532,9 @@ message BlockWithLocationsProto { repeated string datanodeUuids = 2; // Datanodes with replicas of the block repeated string storageUuids = 3; // Storages with replicas of the block repeated StorageTypeProto storageTypes = 4; + + optional bytes indices = 5; + optional uint32 dataBlockNum = 6; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 6cd7003b3e..db230e3266 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1964,4 +1964,19 @@ public static ExtendedBlock flushInternal(DFSStripedOutputStream out) out.flushInternal(); return out.getBlock(); } + + /** + * Verify that blocks in striped block group are on different nodes. + */ + public static void verifyLocatedStripedBlocks(LocatedBlocks lbs, + int groupSize) { + for (LocatedBlock lb : lbs.getLocatedBlocks()) { + HashSet locs = new HashSet<>(); + for (DatanodeInfo datanodeInfo : lb.getLocations()) { + locs.add(datanodeInfo); + } + assertEquals(groupSize, lb.getLocations().length); + assertEquals(groupSize, locs.size()); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index a0b203887e..3675e63d66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -189,40 +190,58 @@ public void testConvertBlock() { assertEquals(b, b2); } - private static BlockWithLocations getBlockWithLocations(int bid) { + private static BlockWithLocations getBlockWithLocations( + int bid, boolean isStriped) { final String[] datanodeUuids = {"dn1", "dn2", "dn3"}; final String[] storageIDs = {"s1", "s2", "s3"}; final StorageType[] storageTypes = { StorageType.DISK, StorageType.DISK, StorageType.DISK}; - return new BlockWithLocations(new Block(bid, 0, 1), + final byte[] indices = {0, 1, 2}; + final short dataBlkNum = 6; + BlockWithLocations blkLocs = new BlockWithLocations(new Block(bid, 0, 1), datanodeUuids, storageIDs, storageTypes); + if (isStriped) { + blkLocs = new StripedBlockWithLocations(blkLocs, indices, dataBlkNum); + } + return blkLocs; } private void compare(BlockWithLocations locs1, BlockWithLocations locs2) { assertEquals(locs1.getBlock(), locs2.getBlock()); assertTrue(Arrays.equals(locs1.getStorageIDs(), locs2.getStorageIDs())); + if (locs1 instanceof StripedBlockWithLocations) { + assertTrue(Arrays.equals(((StripedBlockWithLocations) locs1).getIndices(), + ((StripedBlockWithLocations) locs2).getIndices())); + } } @Test public void testConvertBlockWithLocations() { - BlockWithLocations locs = getBlockWithLocations(1); - BlockWithLocationsProto locsProto = PBHelper.convert(locs); - BlockWithLocations locs2 = PBHelper.convert(locsProto); - compare(locs, locs2); + boolean[] testSuite = new boolean[]{false, true}; + for (int i = 0; i < testSuite.length; i++) { + BlockWithLocations locs = getBlockWithLocations(1, testSuite[i]); + BlockWithLocationsProto locsProto = PBHelper.convert(locs); + BlockWithLocations locs2 = PBHelper.convert(locsProto); + compare(locs, locs2); + } } @Test public void testConvertBlocksWithLocations() { - BlockWithLocations[] list = new BlockWithLocations[] { - getBlockWithLocations(1), getBlockWithLocations(2) }; - BlocksWithLocations locs = new BlocksWithLocations(list); - BlocksWithLocationsProto locsProto = PBHelper.convert(locs); - BlocksWithLocations locs2 = PBHelper.convert(locsProto); - BlockWithLocations[] blocks = locs.getBlocks(); - BlockWithLocations[] blocks2 = locs2.getBlocks(); - assertEquals(blocks.length, blocks2.length); - for (int i = 0; i < blocks.length; i++) { - compare(blocks[i], blocks2[i]); + boolean[] testSuite = new boolean[]{false, true}; + for (int i = 0; i < testSuite.length; i++) { + BlockWithLocations[] list = new BlockWithLocations[]{ + getBlockWithLocations(1, testSuite[i]), + getBlockWithLocations(2, testSuite[i])}; + BlocksWithLocations locs = new BlocksWithLocations(list); + BlocksWithLocationsProto locsProto = PBHelper.convert(locs); + BlocksWithLocations locs2 = PBHelper.convert(locsProto); + BlockWithLocations[] blocks = locs.getBlocks(); + BlockWithLocations[] blocks2 = locs2.getBlocks(); + assertEquals(blocks.length, blocks2.length); + for (int j = 0; j < blocks.length; j++) { + compare(blocks[j], blocks2[j]); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 92d31d00af..f6475cd82b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; @@ -133,6 +134,21 @@ static void initConfWithRamDisk(Configuration conf, LazyPersistTestCase.initCacheManipulator(); } + int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; + int groupSize = dataBlocks + parityBlocks; + private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final static int stripesPerBlock = 4; + static int DEFAULT_STRIPE_BLOCK_SIZE = cellSize * stripesPerBlock; + + static void initConfWithStripe(Configuration conf) { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_STRIPE_BLOCK_SIZE); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + SimulatedFSDataset.setFactory(conf); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L); + conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); + } + /* create a file with a length of fileLen */ static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen, short replicationFactor, int nnIndex) @@ -1452,6 +1468,66 @@ public void testManyBalancerSimultaneously() throws Exception { } } + @Test(timeout = 100000) + public void testBalancerWithStripedFile() throws Exception { + Configuration conf = new Configuration(); + initConfWithStripe(conf); + int numOfDatanodes = dataBlocks + parityBlocks + 2; + int numOfRacks = dataBlocks; + long capacity = 20 * DEFAULT_STRIPE_BLOCK_SIZE; + long[] capacities = new long[numOfDatanodes]; + for (int i = 0; i < capacities.length; i++) { + capacities[i] = capacity; + } + String[] racks = new String[numOfDatanodes]; + for (int i = 0; i < numOfDatanodes; i++) { + racks[i] = "/rack" + (i % numOfRacks); + } + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numOfDatanodes) + .racks(racks) + .simulatedCapacities(capacities) + .build(); + + try { + cluster.waitActive(); + client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), + ClientProtocol.class).getProxy(); + client.createErasureCodingZone("/", null, 0); + + long totalCapacity = sum(capacities); + + // fill up the cluster with 30% data. It'll be 45% full plus parity. + long fileLen = totalCapacity * 3 / 10; + long totalUsedSpace = fileLen * (dataBlocks + parityBlocks) / dataBlocks; + FileSystem fs = cluster.getFileSystem(0); + DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, r.nextLong()); + + // verify locations of striped blocks + LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen); + DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize); + + // add one datanode + String newRack = "/rack" + (++numOfRacks); + cluster.startDataNodes(conf, 1, true, null, + new String[]{newRack}, null, new long[]{capacity}); + totalCapacity += capacity; + cluster.triggerHeartbeats(); + + // run balancer and validate results + Balancer.Parameters p = Balancer.Parameters.DEFAULT; + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + runBalancer(conf, totalUsedSpace, totalCapacity, p, 0); + + // verify locations of striped blocks + locatedBlocks = client.getBlockLocations(fileName, 0, fileLen); + DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize); + + } finally { + cluster.shutdown(); + } + } + /** * @param args */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index f4bedabf82..74f09fdabd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DFSTestUtil; @@ -34,10 +35,16 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock; import org.apache.hadoop.hdfs.server.balancer.ExitStatus; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.mover.Mover.MLocation; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.test.GenericTestUtils; @@ -83,7 +90,7 @@ public void testScheduleSameBlock() throws IOException { final LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); final List locations = MLocation.toLocations(lb); final MLocation ml = locations.get(0); - final DBlock db = mover.newDBlock(lb.getBlock().getLocalBlock(), locations); + final DBlock db = mover.newDBlock(lb, locations, null); final List storageTypes = new ArrayList( Arrays.asList(StorageType.DEFAULT, StorageType.DEFAULT)); @@ -361,4 +368,119 @@ public void testMoverFailedRetry() throws Exception { cluster.shutdown(); } } + + int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; + private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final static int stripesPerBlock = 4; + static int DEFAULT_STRIPE_BLOCK_SIZE = cellSize * stripesPerBlock; + + static void initConfWithStripe(Configuration conf) { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_STRIPE_BLOCK_SIZE); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L); + Dispatcher.setBlockMoveWaitTime(3000L); + } + + @Test(timeout = 300000) + public void testMoverWithStripedFile() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConfWithStripe(conf); + + // start 10 datanodes + int numOfDatanodes =10; + int storagesPerDatanode=2; + long capacity = 10 * DEFAULT_STRIPE_BLOCK_SIZE; + long[][] capacities = new long[numOfDatanodes][storagesPerDatanode]; + for (int i = 0; i < numOfDatanodes; i++) { + for(int j=0;j