diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index e85ba11940..53fb0fbeae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -4999,7 +4999,7 @@ public ProvidedStorageMap getProvidedStorageMap() { */ public void satisfyStoragePolicy(long id) { storageMovementNeeded.add(id); - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("Added block collection id {} to block " + "storageMovementNeeded queue", id); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java index d31f075fda..2de88fcb1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java @@ -27,8 +27,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler; import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult; +import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus; +import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,15 +109,32 @@ public void run() { } } + /** + * Mark as block movement failure for the given trackId and blockId. + * + * @param trackId tracking id + * @param blockId block id + */ + void markBlockMovementFailure(long trackId, long blockId) { + LOG.debug("Mark as block movement failure for the given " + + "trackId:{} and blockId:{}", trackId, blockId); + BlockMovementResult result = new BlockMovementResult(trackId, blockId, null, + BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE); + addMovementResultToTrackIdList(result); + } + private List addMovementResultToTrackIdList( BlockMovementResult result) { long trackId = result.getTrackId(); - List perTrackIdList = movementResults.get(trackId); - if (perTrackIdList == null) { - perTrackIdList = new ArrayList<>(); - movementResults.put(trackId, perTrackIdList); + List perTrackIdList; + synchronized (movementResults) { + perTrackIdList = movementResults.get(trackId); + if (perTrackIdList == null) { + perTrackIdList = new ArrayList<>(); + movementResults.put(trackId, perTrackIdList); + } + perTrackIdList.add(result); } - perTrackIdList.add(result); return perTrackIdList; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java index 1bd851eccd..a69a38beb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java @@ -151,14 +151,24 @@ public void rejectedExecution(Runnable runnable, */ public void processBlockMovingTasks(long trackID, String blockPoolID, Collection blockMovingInfos) { + LOG.debug("Received BlockMovingTasks {}", blockMovingInfos); for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { - assert blkMovingInfo - .getSources().length == blkMovingInfo.getTargets().length; - - for (int i = 0; i < blkMovingInfo.getSources().length; i++) { + // Iterating backwards. This is to ensure that all the block src location + // which doesn't have a target node will be marked as failure before + // scheduling the block movement to valid target nodes. + for (int i = blkMovingInfo.getSources().length - 1; i >= 0; i--) { + if (i >= blkMovingInfo.getTargets().length) { + // Since there is no target selected for scheduling the block, + // just mark this block storage movement as failure. Later, namenode + // can take action on this. + movementTracker.markBlockMovementFailure(trackID, + blkMovingInfo.getBlock().getBlockId()); + continue; + } + DatanodeInfo target = blkMovingInfo.getTargets()[i]; BlockMovingTask blockMovingTask = new BlockMovingTask( trackID, blockPoolID, blkMovingInfo.getBlock(), - blkMovingInfo.getSources()[i], blkMovingInfo.getTargets()[i], + blkMovingInfo.getSources()[i], target, blkMovingInfo.getSourceStorageTypes()[i], blkMovingInfo.getTargetStorageTypes()[i]); Future moveCallable = moverCompletionService diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java index 580d0d6178..5457dc29dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java @@ -218,4 +218,8 @@ public int resultsCount() { return storageMovementAttemptedResults.size(); } + @VisibleForTesting + public int getAttemptedItemsCount() { + return storageMovementAttemptedItems.size(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index 4967a89786..617ab2c791 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -211,6 +211,14 @@ private void computeAndAssignStorageMismatchedBlocksToDNs( } } + addBlockMovingInfosToCoordinatorDn(blockCollectionID, blockMovingInfos, + coordinatorNode); + } + + private void addBlockMovingInfosToCoordinatorDn(long blockCollectionID, + List blockMovingInfos, + DatanodeDescriptor coordinatorNode) { + if (blockMovingInfos.size() < 1) { // TODO: Major: handle this case. I think we need retry cases to // be implemented. Idea is, if some files are not getting storage movement @@ -218,6 +226,20 @@ private void computeAndAssignStorageMismatchedBlocksToDNs( return; } + boolean needBlockStorageMovement = false; + for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { + // Check for atleast one block storage movement has been chosen + if (blkMovingInfo.getTargets().length > 0){ + needBlockStorageMovement = true; + break; + } + } + if (!needBlockStorageMovement) { + // Simply return as there is no targets selected for scheduling the block + // movement. + return; + } + // 'BlockCollectionId' is used as the tracking ID. All the blocks under this // blockCollectionID will be added to this datanode. coordinatorNode.addBlocksToMoveStorage(blockCollectionID, blockMovingInfos); @@ -251,9 +273,8 @@ private BlockMovingInfo findSourceAndTargetToMove(BlockInfo blockInfo, List chosenNodes = new ArrayList<>(); for (int i = 0; i < sourceWithStorageList.size(); i++) { StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i); - StorageTypeNodePair chosenTarget = - chooseTargetTypeInSameNode(existingTypeNodePair.dn, expected, - locsForExpectedStorageTypes, chosenNodes); + StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode( + existingTypeNodePair.dn, expected); if (chosenTarget == null && blockManager.getDatanodeManager() .getNetworkTopology().isNodeGroupAware()) { @@ -282,15 +303,14 @@ private BlockMovingInfo findSourceAndTargetToMove(BlockInfo blockInfo, chosenNodes.add(chosenTarget.dn); // TODO: We can increment scheduled block count for this node? } else { - // TODO: Failed to ChooseTargetNodes...So let just retry. Shall we - // proceed without this targets? Then what should be final result? - // How about pack empty target, means target node could not be chosen , - // so result should be RETRY_REQUIRED from DN always. - // Log..unable to choose target node for source datanodeDescriptor + LOG.warn( + "Failed to choose target datanode for the required" + + " storage types {}, block:{}, existing storage type:{}", + expected, blockInfo, existingTypeNodePair.storageType); sourceNodes.add(existingTypeNodePair.dn); sourceStorageTypes.add(existingTypeNodePair.storageType); - targetNodes.add(null); - targetStorageTypes.add(null); + // Imp: Not setting the target details, empty targets. Later, this is + // used as an indicator for retrying this block movement. } } BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blockInfo, @@ -302,15 +322,13 @@ private BlockMovingInfo findSourceAndTargetToMove(BlockInfo blockInfo, } /** - * Choose the target storage within same Datanode if possible. + * Choose the target storage within same datanode if possible. * - * @param locsForExpectedStorageTypes - * @param chosenNodes + * @param source source datanode + * @param targetTypes list of target storage types */ private StorageTypeNodePair chooseTargetTypeInSameNode( - DatanodeDescriptor source, List targetTypes, - StorageTypeNodeMap locsForExpectedStorageTypes, - List chosenNodes) { + DatanodeDescriptor source, List targetTypes) { for (StorageType t : targetTypes) { DatanodeStorageInfo chooseStorage4Block = source.chooseStorage4Block(t, 0); @@ -328,6 +346,9 @@ private StorageTypeNodePair chooseTarget(Block block, for (StorageType t : targetTypes) { List nodesWithStorages = locsForExpectedStorageTypes.getNodesWithStorages(t); + if (nodesWithStorages == null || nodesWithStorages.isEmpty()) { + continue; // no target nodes with the required storage type. + } Collections.shuffle(nodesWithStorages); for (DatanodeDescriptor target : nodesWithStorages) { if (!chosenNodes.contains(target) && matcher.match( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index 901e1bae21..499fe3c56f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; + import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -44,8 +46,6 @@ import com.google.common.base.Supplier; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; - /** * Tests that StoragePolicySatisfier daemon is able to check the blocks to be * moved and finding its suggested target locations to move. @@ -79,7 +79,7 @@ public void testWhenStoragePolicySetToCOLD() throws Exception { try { - // Change policy to ALL_SSD + // Change policy to COLD dfs.setStoragePolicy(new Path(file), "COLD"); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file); @@ -356,6 +356,108 @@ public void testSatisfyWithExceptions() throws Exception { } } + /** + * Tests to verify that for the given path, some of the blocks or block src + * locations(src nodes) under the given path will be scheduled for block + * movement. + * + * For example, there are two block for a file: + * + * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)], + * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD. + * Only one datanode is available with storage type ARCHIVE, say D. + * + * SPS will schedule block movement to the coordinator node with the details, + * blk_1[move A(DISK) -> D(ARCHIVE)], blk_2[move A(DISK) -> D(ARCHIVE)]. + */ + @Test(timeout = 300000) + public void testWhenOnlyFewTargetDatanodeAreAvailableToSatisfyStoragePolicy() + throws Exception { + try { + // Change policy to COLD + dfs.setStoragePolicy(new Path(file), "COLD"); + FSNamesystem namesystem = hdfsCluster.getNamesystem(); + INode inode = namesystem.getFSDirectory().getINode(file); + + StorageType[][] newtypes = + new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}}; + + // Adding ARCHIVE based datanodes. + startAdditionalDNs(config, 1, numOfDatanodes, newtypes, + storagesPerDatanode, capacity, hdfsCluster); + + namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + hdfsCluster.triggerHeartbeats(); + // Wait till StorgePolicySatisfier identified that block to move to + // ARCHIVE area. + waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000); + waitExpectedStorageType(file, StorageType.DISK, 2, 30000); + + waitForBlocksMovementResult(1, 30000); + } finally { + hdfsCluster.shutdown(); + } + } + + /** + * Tests to verify that for the given path, no blocks or block src + * locations(src nodes) under the given path will be scheduled for block + * movement as there are no available datanode with required storage type. + * + * For example, there are two block for a file: + * + * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)], + * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD. + * No datanode is available with storage type ARCHIVE. + * + * SPS won't schedule any block movement for this path. + */ + @Test(timeout = 300000) + public void testWhenNoTargetDatanodeToSatisfyStoragePolicy() + throws Exception { + try { + // Change policy to COLD + dfs.setStoragePolicy(new Path(file), "COLD"); + FSNamesystem namesystem = hdfsCluster.getNamesystem(); + INode inode = namesystem.getFSDirectory().getINode(file); + + StorageType[][] newtypes = + new StorageType[][]{{StorageType.DISK, StorageType.DISK}}; + // Adding DISK based datanodes + startAdditionalDNs(config, 1, numOfDatanodes, newtypes, + storagesPerDatanode, capacity, hdfsCluster); + + namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + hdfsCluster.triggerHeartbeats(); + + // No block movement will be scheduled as there is no target node available + // with the required storage type. + waitForAttemptedItems(1, 30000); + waitExpectedStorageType(file, StorageType.DISK, 3, 30000); + // Since there is no target node the item will get timed out and then + // re-attempted. + waitForAttemptedItems(1, 30000); + } finally { + hdfsCluster.shutdown(); + } + } + + private void waitForAttemptedItems(long expectedBlkMovAttemptedCount, + int timeout) throws TimeoutException, InterruptedException { + BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); + final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", + expectedBlkMovAttemptedCount, + sps.getAttemptedItemsMonitor().getAttemptedItemsCount()); + return sps.getAttemptedItemsMonitor() + .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; + } + }, 100, timeout); + } + private void waitForBlocksMovementResult(long expectedBlkMovResultsCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();