From 1438da494424193e330f24edef823bbd60dc37d2 Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Fri, 23 Sep 2016 13:41:29 -0700 Subject: [PATCH] HDFS-10800: [SPS]: Daemon thread in Namenode to find blocks placed in other storage than what the policy specifies. Contributed by Uma Maheswara Rao G --- .../java/org/apache/hadoop/hdfs/DFSUtil.java | 41 ++ .../server/blockmanagement/BlockManager.java | 20 + .../blockmanagement/DatanodeDescriptor.java | 38 ++ .../blockmanagement/DatanodeManager.java | 1 + .../datanode/StoragePolicySatisfyWorker.java | 29 +- .../namenode/BlockStorageMovementNeeded.java | 53 +++ .../namenode/StoragePolicySatisfier.java | 397 ++++++++++++++++++ .../protocol/BlockStorageMovementCommand.java | 11 +- .../TestStoragePolicySatisfyWorker.java | 24 +- .../namenode/TestStoragePolicySatisfier.java | 209 +++++++++ 10 files changed, 791 insertions(+), 32 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index f7cd32b558..f5ceeafc5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -53,6 +53,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.Date; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -73,6 +74,7 @@ import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -1716,4 +1718,43 @@ public class DFSUtil { } return id; } + + /** + * Remove the overlap between the expected types and the existing types. + * + * @param expected + * - Expected storage types list. + * @param existing + * - Existing storage types list. + * @param ignoreNonMovable + * ignore non-movable storage types by removing them from both + * expected and existing storage type list to prevent non-movable + * storage from being moved. + * @returns if the existing types or the expected types is empty after + * removing the overlap. + */ + public static boolean removeOverlapBetweenStorageTypes( + List expected, + List existing, boolean ignoreNonMovable) { + for (Iterator i = existing.iterator(); i.hasNext();) { + final StorageType t = i.next(); + if (expected.remove(t)) { + i.remove(); + } + } + if (ignoreNonMovable) { + removeNonMovable(existing); + removeNonMovable(expected); + } + return expected.isEmpty() || existing.isEmpty(); + } + + private static void removeNonMovable(List types) { + for (Iterator i = types.iterator(); i.hasNext();) { + final StorageType t = i.next(); + if (!t.isMovable()) { + i.remove(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index bac89bfd64..8581e7853f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -89,6 +89,8 @@ import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.INodesInPath; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementNeeded; +import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; @@ -425,6 +427,11 @@ public class BlockManager implements BlockStatsMXBean { private final BlockIdManager blockIdManager; + /** For satisfying block storage policies */ + private final StoragePolicySatisfier sps; + private final BlockStorageMovementNeeded storageMovementNeeded = + new BlockStorageMovementNeeded(); + /** Minimum live replicas needed for the datanode to be transitioned * from ENTERING_MAINTENANCE to IN_MAINTENANCE. */ @@ -464,6 +471,7 @@ public class BlockManager implements BlockStatsMXBean { DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); + sps = new StoragePolicySatisfier(namesystem, storageMovementNeeded, this); blockTokenSecretManager = createBlockTokenSecretManager(conf); providedStorageMap = new ProvidedStorageMap(namesystem, this, conf); @@ -688,9 +696,11 @@ public class BlockManager implements BlockStatsMXBean { this.blockReportThread.start(); mxBeanName = MBeans.register("NameNode", "BlockStats", this); bmSafeMode.activate(blockTotal); + sps.start(); } public void close() { + sps.stop(); bmSafeMode.close(); try { redundancyThread.interrupt(); @@ -4980,4 +4990,14 @@ public class BlockManager implements BlockStatsMXBean { public ProvidedStorageMap getProvidedStorageMap() { return providedStorageMap; } + + /** + * Set file block collection for which storage movement needed for its blocks. + * + * @param id + * - file block collection id. + */ + public void satisfyStoragePolicy(long id) { + storageMovementNeeded.add(id); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 16ffb4346d..bff74240b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -43,6 +43,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -206,6 +208,10 @@ public class DatanodeDescriptor extends DatanodeInfo { private final LightWeightHashSet invalidateBlocks = new LightWeightHashSet<>(); + /** A queue of blocks for moving its storage placements by this datanode. */ + private final Queue> storageMovementBlocks = + new LinkedList<>(); + /* Variables for maintaining number of blocks scheduled to be written to * this storage. This count is approximate and might be slightly bigger * in case of errors (e.g. datanode does not report if an error occurs @@ -1065,5 +1071,37 @@ public class DatanodeDescriptor extends DatanodeInfo { } return false; } + + /** + * Add the block infos which needs to move its storage locations. + * + * @param storageMismatchedBlocks + * - storage mismatched block infos + */ + public void addBlocksToMoveStorage( + List storageMismatchedBlocks) { + storageMovementBlocks.offer(storageMismatchedBlocks); + } + + /** + * @return block infos which needs to move its storage locations. + */ + public List getBlocksToMoveStorages() { + return storageMovementBlocks.poll(); + } + + // TODO: we will remove this method once DN side handling integrated. We can + // convert the test to check real block movements instead of this ds. + @VisibleForTesting + public List getStorageMovementPendingItems() { + List flatList = new ArrayList<>(); + Iterator> iterator = storageMovementBlocks + .iterator(); + while (iterator.hasNext()) { + List next = iterator.next(); + flatList.addAll(next); + } + return flatList; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 9ebc693a23..698205ac47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.protocol.*; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.*; import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java index 6df4e816e7..fa408f608a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java @@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; @@ -125,7 +126,7 @@ public class StoragePolicySatisfyWorker { return moverThreadPool; } - public void processBlockMovingTasks(long trackID, + public void processBlockMovingTasks(long trackID, String blockPoolID, List blockMovingInfos) { Future moveCallable = null; for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { @@ -133,13 +134,11 @@ public class StoragePolicySatisfyWorker { .getSources().length == blkMovingInfo.getTargets().length; for (int i = 0; i < blkMovingInfo.getSources().length; i++) { - BlockMovingTask blockMovingTask = - new BlockMovingTask(blkMovingInfo.getBlock(), - blkMovingInfo.getSources()[i], - blkMovingInfo.getTargets()[i], + BlockMovingTask blockMovingTask = new BlockMovingTask( + blkMovingInfo.getBlock(), blockPoolID, + blkMovingInfo.getSources()[i], blkMovingInfo.getTargets()[i], blkMovingInfo.getTargetStorageTypes()[i]); - moveCallable = moverExecutorCompletionService - .submit(blockMovingTask); + moveCallable = moverExecutorCompletionService.submit(blockMovingTask); moverTaskFutures.add(moveCallable); } } @@ -163,14 +162,16 @@ public class StoragePolicySatisfyWorker { * given target. */ private class BlockMovingTask implements Callable { - private final ExtendedBlock block; + private final Block block; private final DatanodeInfo source; private final DatanodeInfo target; private final StorageType targetStorageType; + private String blockPoolID; - BlockMovingTask(ExtendedBlock block, DatanodeInfo source, + BlockMovingTask(Block block, String blockPoolID, DatanodeInfo source, DatanodeInfo target, StorageType targetStorageType) { this.block = block; + this.blockPoolID = blockPoolID; this.source = source; this.target = target; this.targetStorageType = targetStorageType; @@ -201,12 +202,12 @@ public class StoragePolicySatisfyWorker { OutputStream unbufOut = sock.getOutputStream(); InputStream unbufIn = sock.getInputStream(); - + ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block); Token accessToken = datanode.getBlockAccessToken( - block, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); + extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); DataEncryptionKeyFactory keyFactory = datanode - .getDataEncryptionKeyFactoryForBlock(block); + .getDataEncryptionKeyFactoryForBlock(extendedBlock); IOStreamPair saslStreams = datanode.getSaslClient().socketSend(sock, unbufOut, unbufIn, keyFactory, accessToken, target); unbufOut = saslStreams.out; @@ -215,10 +216,10 @@ public class StoragePolicySatisfyWorker { new BufferedOutputStream(unbufOut, ioFileBufferSize)); in = new DataInputStream( new BufferedInputStream(unbufIn, ioFileBufferSize)); - sendRequest(out, block, accessToken, source, targetStorageType); + sendRequest(out, extendedBlock, accessToken, source, targetStorageType); receiveResponse(in); - LOG.debug( + LOG.info( "Successfully moved block:{} from src:{} to destin:{} for" + " satisfying storageType:{}", block, source, target, targetStorageType); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java new file mode 100644 index 0000000000..c91667232b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A Class to track the block collection IDs for which physical storage movement + * needed as per the Namespace and StorageReports from DN. + */ +@InterfaceAudience.Private +public class BlockStorageMovementNeeded { + private final Queue storageMovementNeeded = new LinkedList(); + + /** + * Add the block collection id to tracking list for which storage movement + * expected if necessary. + * + * @param blockCollectionID + * - block collection id, which is nothing but inode id. + */ + public synchronized void add(Long blockCollectionID) { + storageMovementNeeded.add(blockCollectionID); + } + + /** + * Gets the block collection id for which storage movements check necessary + * and make the movement if required. + * + * @return block collection ID + */ + public synchronized Long get() { + return storageMovementNeeded.poll(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java new file mode 100644 index 0000000000..b5aed37623 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -0,0 +1,397 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.balancer.Matcher; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.util.Daemon; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Setting storagePolicy on a file after the file write will only update the new + * storage policy type in Namespace, but physical block storage movement will + * not happen until user runs "Mover Tool" explicitly for such files. The + * StoragePolicySatisfier Daemon thread implemented for addressing the case + * where users may want to physically move the blocks by HDFS itself instead of + * running mover tool explicitly. Just calling client API to + * satisfyStoragePolicy on a file/dir will automatically trigger to move its + * physical storage locations as expected in asynchronous manner. Here Namenode + * will pick the file blocks which are expecting to change its storages, then it + * will build the mapping of source block location and expected storage type and + * location to move. After that this class will also prepare commands to send to + * Datanode for processing the physical block movements. + */ +@InterfaceAudience.Private +public class StoragePolicySatisfier implements Runnable { + public static final Logger LOG = + LoggerFactory.getLogger(StoragePolicySatisfier.class); + private Daemon storagePolicySatisfierThread; + private final Namesystem namesystem; + private final BlockManager blockManager; + private final BlockStorageMovementNeeded storageMovementNeeded; + + public StoragePolicySatisfier(final Namesystem namesystem, + final BlockStorageMovementNeeded storageMovementNeeded, + final BlockManager blkManager) { + this.namesystem = namesystem; + this.storageMovementNeeded = storageMovementNeeded; + this.blockManager = blkManager; + } + + /** + * Start storage policy satisfier demon thread. + */ + public void start() { + storagePolicySatisfierThread = new Daemon(this); + storagePolicySatisfierThread.setName("StoragePolicySatisfier"); + storagePolicySatisfierThread.start(); + } + + /** + * Stop storage policy satisfier demon thread. + */ + public void stop() { + if (storagePolicySatisfierThread == null) { + return; + } + storagePolicySatisfierThread.interrupt(); + try { + storagePolicySatisfierThread.join(3000); + } catch (InterruptedException ie) { + } + } + + @Override + public void run() { + while (namesystem.isRunning()) { + try { + Long blockCollectionID = storageMovementNeeded.get(); + if (blockCollectionID != null) { + computeAndAssignStorageMismatchedBlocksToDNs(blockCollectionID); + } + // TODO: We can think to make this as configurable later, how frequently + // we want to check block movements. + Thread.sleep(3000); + } catch (Throwable t) { + if (!namesystem.isRunning()) { + LOG.info("Stopping StoragePolicySatisfier."); + if (!(t instanceof InterruptedException)) { + LOG.info("StoragePolicySatisfier received an exception" + + " while shutting down.", t); + } + break; + } + LOG.error("StoragePolicySatisfier thread received runtime exception. " + + "Stopping Storage policy satisfier work", t); + // TODO: Just break for now. Once we implement dynamic start/stop + // option, we can add conditions here when to break/terminate. + break; + } + } + } + + private void computeAndAssignStorageMismatchedBlocksToDNs( + long blockCollectionID) { + BlockCollection blockCollection = + namesystem.getBlockCollection(blockCollectionID); + if (blockCollection == null) { + return; + } + byte existingStoragePolicyID = blockCollection.getStoragePolicyID(); + BlockStoragePolicy existingStoragePolicy = + blockManager.getStoragePolicy(existingStoragePolicyID); + if (!blockCollection.getLastBlock().isComplete()) { + // Postpone, currently file is under construction + // So, should we add back? or leave it to user + return; + } + + // First datanode will be chosen as the co-ordinator node for storage + // movements. Later this can be optimized if needed. + DatanodeDescriptor coordinatorNode = null; + BlockInfo[] blocks = blockCollection.getBlocks(); + List blockMovingInfos = new ArrayList(); + for (int i = 0; i < blocks.length; i++) { + BlockInfo blockInfo = blocks[i]; + List expectedStorageTypes = + existingStoragePolicy.chooseStorageTypes(blockInfo.getReplication()); + DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo); + StorageType[] storageTypes = new StorageType[storages.length]; + for (int j = 0; j < storages.length; j++) { + DatanodeStorageInfo datanodeStorageInfo = storages[j]; + StorageType storageType = datanodeStorageInfo.getStorageType(); + storageTypes[j] = storageType; + } + List existing = + new LinkedList(Arrays.asList(storageTypes)); + if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes, + existing, true)) { + List sourceWithStorageMap = + new ArrayList(); + List existingBlockStorages = + new ArrayList(Arrays.asList(storages)); + for (StorageType existingType : existing) { + Iterator iterator = + existingBlockStorages.iterator(); + while (iterator.hasNext()) { + DatanodeStorageInfo datanodeStorageInfo = iterator.next(); + StorageType storageType = datanodeStorageInfo.getStorageType(); + if (storageType == existingType) { + iterator.remove(); + sourceWithStorageMap.add(new StorageTypeNodePair(storageType, + datanodeStorageInfo.getDatanodeDescriptor())); + break; + } + } + } + + StorageTypeNodeMap locsForExpectedStorageTypes = + findTargetsForExpectedStorageTypes(expectedStorageTypes); + + BlockMovingInfo blockMovingInfo = + findSourceAndTargetToMove(blockInfo, existing, sourceWithStorageMap, + expectedStorageTypes, locsForExpectedStorageTypes); + if (coordinatorNode == null) { + // For now, first datanode will be chosen as the co-ordinator. Later + // this can be optimized if needed. + coordinatorNode = + (DatanodeDescriptor) blockMovingInfo.getSources()[0]; + } + blockMovingInfos.add(blockMovingInfo); + } + } + + if (blockMovingInfos.size() < 1) { + // TODO: Major: handle this case. I think we need retry cases to + // be implemented. Idea is, if some files are not getting storage movement + // chances, then we can just retry limited number of times and exit. + return; + } + coordinatorNode.addBlocksToMoveStorage(blockMovingInfos); + } + + /** + * Find the good target node for each source node for which block storages was + * misplaced. + * + * @param blockInfo + * - Block + * @param existing + * - Existing storage types of block + * @param sourceWithStorageList + * - Source Datanode with storages list + * @param expected + * - Expecting storages to move + * @param locsForExpectedStorageTypes + * - Available DNs for expected storage types + * @return list of block source and target node pair + */ + private BlockMovingInfo findSourceAndTargetToMove(BlockInfo blockInfo, + List existing, + List sourceWithStorageList, + List expected, + StorageTypeNodeMap locsForExpectedStorageTypes) { + List sourceNodes = new ArrayList<>(); + List sourceStorageTypes = new ArrayList<>(); + List targetNodes = new ArrayList<>(); + List targetStorageTypes = new ArrayList<>(); + List chosenNodes = new ArrayList<>(); + for (int i = 0; i < sourceWithStorageList.size(); i++) { + StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i); + StorageTypeNodePair chosenTarget = + chooseTargetTypeInSameNode(existingTypeNodePair.dn, expected, + locsForExpectedStorageTypes, chosenNodes); + + if (chosenTarget == null && blockManager.getDatanodeManager() + .getNetworkTopology().isNodeGroupAware()) { + chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn, + expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes, + chosenNodes); + } + + // Then, match nodes on the same rack + if (chosenTarget == null) { + chosenTarget = + chooseTarget(blockInfo, existingTypeNodePair.dn, expected, + Matcher.SAME_RACK, locsForExpectedStorageTypes, chosenNodes); + } + + if (chosenTarget == null) { + chosenTarget = + chooseTarget(blockInfo, existingTypeNodePair.dn, expected, + Matcher.ANY_OTHER, locsForExpectedStorageTypes, chosenNodes); + } + if (null != chosenTarget) { + sourceNodes.add(existingTypeNodePair.dn); + sourceStorageTypes.add(existingTypeNodePair.storageType); + targetNodes.add(chosenTarget.dn); + targetStorageTypes.add(chosenTarget.storageType); + chosenNodes.add(chosenTarget.dn); + // TODO: We can increment scheduled block count for this node? + } else { + // TODO: Failed to ChooseTargetNodes...So let just retry. Shall we + // proceed without this targets? Then what should be final result? + // How about pack empty target, means target node could not be chosen , + // so result should be RETRY_REQUIRED from DN always. + // Log..unable to choose target node for source datanodeDescriptor + sourceNodes.add(existingTypeNodePair.dn); + sourceStorageTypes.add(existingTypeNodePair.storageType); + targetNodes.add(null); + targetStorageTypes.add(null); + } + } + BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blockInfo, + sourceNodes.toArray(new DatanodeInfo[sourceNodes.size()]), + targetNodes.toArray(new DatanodeInfo[targetNodes.size()]), + sourceStorageTypes.toArray(new StorageType[sourceStorageTypes.size()]), + targetStorageTypes.toArray(new StorageType[targetStorageTypes.size()])); + return blkMovingInfo; + } + + /** + * Choose the target storage within same Datanode if possible. + * + * @param locsForExpectedStorageTypes + * @param chosenNodes + */ + private StorageTypeNodePair chooseTargetTypeInSameNode( + DatanodeDescriptor source, List targetTypes, + StorageTypeNodeMap locsForExpectedStorageTypes, + List chosenNodes) { + for (StorageType t : targetTypes) { + DatanodeStorageInfo chooseStorage4Block = + source.chooseStorage4Block(t, 0); + if (chooseStorage4Block != null) { + return new StorageTypeNodePair(t, source); + } + } + return null; + } + + private StorageTypeNodePair chooseTarget(Block block, + DatanodeDescriptor source, List targetTypes, Matcher matcher, + StorageTypeNodeMap locsForExpectedStorageTypes, + List chosenNodes) { + for (StorageType t : targetTypes) { + List nodesWithStorages = + locsForExpectedStorageTypes.getNodesWithStorages(t); + Collections.shuffle(nodesWithStorages); + for (DatanodeDescriptor target : nodesWithStorages) { + if (!chosenNodes.contains(target) && matcher.match( + blockManager.getDatanodeManager().getNetworkTopology(), source, + target)) { + if (null != target.chooseStorage4Block(t, block.getNumBytes())) { + return new StorageTypeNodePair(t, target); + } + } + } + } + return null; + } + + private static class StorageTypeNodePair { + public StorageType storageType = null; + public DatanodeDescriptor dn = null; + + public StorageTypeNodePair(StorageType storageType, DatanodeDescriptor dn) { + this.storageType = storageType; + this.dn = dn; + } + } + + private StorageTypeNodeMap findTargetsForExpectedStorageTypes( + List expected) { + StorageTypeNodeMap targetMap = new StorageTypeNodeMap(); + List reports = blockManager.getDatanodeManager() + .getDatanodeListForReport(DatanodeReportType.LIVE); + for (DatanodeDescriptor dn : reports) { + StorageReport[] storageReports = dn.getStorageReports(); + for (StorageReport storageReport : storageReports) { + StorageType t = storageReport.getStorage().getStorageType(); + if (expected.contains(t)) { + final long maxRemaining = getMaxRemaining(dn.getStorageReports(), t); + if (maxRemaining > 0L) { + targetMap.add(t, dn); + } + } + } + } + return targetMap; + } + + private static long getMaxRemaining(StorageReport[] storageReports, + StorageType t) { + long max = 0L; + for (StorageReport r : storageReports) { + if (r.getStorage().getStorageType() == t) { + if (r.getRemaining() > max) { + max = r.getRemaining(); + } + } + } + return max; + } + + private static class StorageTypeNodeMap { + private final EnumMap> typeNodeMap = + new EnumMap>(StorageType.class); + + private void add(StorageType t, DatanodeDescriptor dn) { + List nodesWithStorages = getNodesWithStorages(t); + LinkedList value = null; + if (nodesWithStorages == null) { + value = new LinkedList(); + value.add(dn); + typeNodeMap.put(t, value); + } else { + nodesWithStorages.add(dn); + } + } + + /** + * @param type + * - Storage type + * @return datanodes which has the given storage type + */ + private List getNodesWithStorages(StorageType type) { + return typeNodeMap.get(type); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java index 42ba265176..c1ab800607 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java @@ -20,8 +20,8 @@ package org.apache.hadoop.hdfs.server.protocol; import java.util.Arrays; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; /** * A BlockStorageMovementCommand is an instruction to a DataNode to move the @@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; * NameNode about the movement status. */ public class BlockStorageMovementCommand extends DatanodeCommand { - // TODO: constructor needs to be refined based on the block movement data // structure. BlockStorageMovementCommand(int action) { @@ -46,13 +45,13 @@ public class BlockStorageMovementCommand extends DatanodeCommand { * Stores block to storage info that can be used for block movement. */ public static class BlockMovingInfo { - private ExtendedBlock blk; + private Block blk; private DatanodeInfo[] sourceNodes; private StorageType[] sourceStorageTypes; private DatanodeInfo[] targetNodes; private StorageType[] targetStorageTypes; - public BlockMovingInfo(ExtendedBlock block, + public BlockMovingInfo(Block block, DatanodeInfo[] sourceDnInfos, DatanodeInfo[] targetDnInfos, StorageType[] srcStorageTypes, StorageType[] targetStorageTypes) { this.blk = block; @@ -62,11 +61,11 @@ public class BlockStorageMovementCommand extends DatanodeCommand { this.targetStorageTypes = targetStorageTypes; } - public void addBlock(ExtendedBlock block) { + public void addBlock(Block block) { this.blk = block; } - public ExtendedBlock getBlock() { + public Block getBlock() { return this.blk; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java index c7223067e1..d803f1a9ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java @@ -30,8 +30,8 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; @@ -71,14 +71,14 @@ public class TestStoragePolicySatisfyWorker { public void testMoveSingleBlockToAnotherDatanode() throws Exception { final Configuration conf = new HdfsConfiguration(); initConf(conf); - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(4) - .storageTypes( - new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE}, - {StorageType.DISK, StorageType.ARCHIVE}, - {StorageType.DISK, StorageType.ARCHIVE}, - {StorageType.DISK, StorageType.ARCHIVE}}) - .build(); + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(4) + .storageTypes( + new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}}) + .build(); try { cluster.waitActive(); final DistributedFileSystem dfs = cluster.getFileSystem(); @@ -108,12 +108,12 @@ public class TestStoragePolicySatisfyWorker { src); List blockMovingInfos = new ArrayList<>(); BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo( - lb.getBlock(), lb.getLocations()[0], targetDnInfo, + lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo, lb.getStorageTypes()[0], StorageType.ARCHIVE); blockMovingInfos.add(blockMovingInfo); INode inode = cluster.getNamesystem().getFSDirectory().getINode(file); worker.processBlockMovingTasks(inode.getId(), - blockMovingInfos); + cluster.getNamesystem().getBlockPoolId(), blockMovingInfos); cluster.triggerHeartbeats(); // Wait till NameNode notified about the block location details @@ -150,7 +150,7 @@ public class TestStoragePolicySatisfyWorker { }, 100, timeout); } - BlockMovingInfo prepareBlockMovingInfo(ExtendedBlock block, + BlockMovingInfo prepareBlockMovingInfo(Block block, DatanodeInfo src, DatanodeInfo destin, StorageType storageType, StorageType targetStorageType) { return new BlockMovingInfo(block, new DatanodeInfo[] {src}, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java new file mode 100644 index 0000000000..b61814dbbf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Supplier; + +/** + * Tests that StoragePolicySatisfier daemon is able to check the blocks to be + * moved and finding its suggested target locations to move. + */ +public class TestStoragePolicySatisfier { + private static final Logger LOG = + LoggerFactory.getLogger(TestStoragePolicySatisfier.class); + private final Configuration config = new HdfsConfiguration(); + private StorageType[][] allDiskTypes = + new StorageType[][]{{StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}}; + private MiniDFSCluster hdfsCluster = null; + final private int numOfDatanodes = 3; + final private int storagesPerDatanode = 2; + final private long capacity = 2 * 256 * 1024 * 1024; + final private String file = "/testMoveWhenStoragePolicyNotSatisfying"; + private DistributedFileSystem distributedFS = null; + + @Before + public void setUp() throws IOException { + config.setLong("dfs.block.size", 1024); + hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes, + storagesPerDatanode, capacity); + distributedFS = hdfsCluster.getFileSystem(); + writeContent(distributedFS, file); + } + + @Test(timeout = 300000) + public void testWhenStoragePolicySetToCOLD() + throws Exception { + + try { + // Change policy to ALL_SSD + distributedFS.setStoragePolicy(new Path(file), "COLD"); + Set previousNodes = + hdfsCluster.getNameNode().getNamesystem().getBlockManager() + .getDatanodeManager().getDatanodes(); + FSNamesystem namesystem = hdfsCluster.getNamesystem(); + INode inode = namesystem.getFSDirectory().getINode(file); + + StorageType[][] newtypes = + new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}}; + startAdditionalDNs(config, 3, numOfDatanodes, newtypes, + storagesPerDatanode, capacity, hdfsCluster); + + namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + + hdfsCluster.triggerHeartbeats(); + // Wait till namenode notified about the block location details + waitExpectedStorageType(StorageType.ARCHIVE, distributedFS, previousNodes, + 6, 30000); + } finally { + hdfsCluster.shutdown(); + } + } + + @Test(timeout = 300000) + public void testWhenStoragePolicySetToALLSSD() + throws Exception { + try { + // Change policy to ALL_SSD + distributedFS.setStoragePolicy(new Path(file), "ALL_SSD"); + Set previousNodes = + hdfsCluster.getNameNode().getNamesystem().getBlockManager() + .getDatanodeManager().getDatanodes(); + FSNamesystem namesystem = hdfsCluster.getNamesystem(); + INode inode = namesystem.getFSDirectory().getINode(file); + + StorageType[][] newtypes = + new StorageType[][]{{StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}}; + + // Making sure SDD based nodes added to cluster. Adding SSD based + // datanodes. + startAdditionalDNs(config, 3, numOfDatanodes, newtypes, + storagesPerDatanode, capacity, hdfsCluster); + namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + hdfsCluster.triggerHeartbeats(); + // Wait till StorgePolicySatisfier Identified that block to move to SSD + // areas + waitExpectedStorageType(StorageType.SSD, distributedFS, previousNodes, 6, + 30000); + } finally { + hdfsCluster.shutdown(); + } + } + + private void writeContent(final DistributedFileSystem dfs, + final String fileName) throws IOException { + // write to DISK + final FSDataOutputStream out = dfs.create(new Path(fileName)); + for (int i = 0; i < 1000; i++) { + out.writeChars("t"); + } + out.close(); + } + + private void startAdditionalDNs(final Configuration conf, + int newNodesRequired, int existingNodesNum, StorageType[][] newTypes, + int storagesPerDatanode, long capacity, final MiniDFSCluster cluster) + throws IOException { + long[][] capacities; + existingNodesNum += newNodesRequired; + capacities = new long[newNodesRequired][storagesPerDatanode]; + for (int i = 0; i < newNodesRequired; i++) { + for (int j = 0; j < storagesPerDatanode; j++) { + capacities[i][j] = capacity; + } + } + + cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null, + null, capacities, null, false, false, false, null); + cluster.triggerHeartbeats(); + } + + private MiniDFSCluster startCluster(final Configuration conf, + StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn, + long nodeCapacity) throws IOException { + long[][] capacities = new long[numberOfDatanodes][storagesPerDn]; + for (int i = 0; i < numberOfDatanodes; i++) { + for (int j = 0; j < storagesPerDn; j++) { + capacities[i][j] = nodeCapacity; + } + } + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn) + .storageTypes(storageTypes).storageCapacities(capacities).build(); + cluster.waitActive(); + return cluster; + } + + // TODO: this assertion can be changed to end to end based assertion later + // when DN side processing work integrated to this work. + private void waitExpectedStorageType(final StorageType expectedStorageType, + final DistributedFileSystem dfs, + final Set previousNodes, int expectedArchiveCount, + int timeout) throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + Iterator iterator = previousNodes.iterator(); + int archiveCount = 0; + while (iterator.hasNext()) { + DatanodeDescriptor dn = iterator.next(); + List pendingItemsToMove = + dn.getStorageMovementPendingItems(); + for (BlockMovingInfo blkInfoToMoveStorage : pendingItemsToMove) { + StorageType[] targetStorageTypes = + blkInfoToMoveStorage.getTargetStorageTypes(); + for (StorageType storageType : targetStorageTypes) { + if (storageType == expectedStorageType) { + archiveCount++; + } + } + } + } + LOG.info( + expectedStorageType + " replica count, expected={} and actual={}", + expectedArchiveCount, archiveCount); + return expectedArchiveCount == archiveCount; + } + }, 100, timeout); + } +}