diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f1580740f0..d40cd12ccb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -17,6 +17,9 @@ HDFS-6584: Archival Storage HDFS-6679. Bump NameNodeLayoutVersion and update editsStored test files. (vinayakumarb via szetszwo) + HDFS-6686. Change BlockPlacementPolicy to use fallback when some storage + types are unavailable. (szetszwo) + Trunk (Unreleased) INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 9932bff2aa..b049a462bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -218,7 +219,8 @@ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, boolean avoidStaleNodes = (stats != null && stats.isAvoidingStaleDataNodesForWrite()); final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy); + blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy, + EnumSet.noneOf(StorageType.class), results.isEmpty()); if (!returnChosenNodes) { results.removeAll(chosenStorage); } @@ -238,7 +240,40 @@ private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) { int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2; return new int[] {numOfReplicas, maxNodesPerRack}; } - + + private static List selectStorageTypes( + final BlockStoragePolicy storagePolicy, + final short replication, + final Iterable chosen, + final EnumSet unavailableStorages, + final boolean isNewBlock) { + final List storageTypes = storagePolicy.chooseStorageTypes( + replication, chosen); + final List removed = new ArrayList(); + for(int i = storageTypes.size() - 1; i >= 0; i--) { + // replace/remove unavailable storage types. + final StorageType t = storageTypes.get(i); + if (unavailableStorages.contains(t)) { + final StorageType fallback = isNewBlock? + storagePolicy.getCreationFallback(unavailableStorages) + : storagePolicy.getReplicationFallback(unavailableStorages); + if (fallback == null) { + removed.add(storageTypes.remove(i)); + } else { + storageTypes.set(i, fallback); + } + } + } + if (storageTypes.size() < replication) { + LOG.warn("Failed to place enough replicas: replication is " + replication + + " but only " + storageTypes.size() + " storage types can be selected " + + "(selected=" + storageTypes + + ", unavailable=" + unavailableStorages + + ", removed=" + removed + + ", policy=" + storagePolicy + ")"); + } + return storageTypes; + } /** * choose numOfReplicas from all data nodes * @param numOfReplicas additional number of replicas wanted @@ -257,14 +292,14 @@ private Node chooseTarget(int numOfReplicas, final int maxNodesPerRack, final List results, final boolean avoidStaleNodes, - final BlockStoragePolicy storagePolicy) { + final BlockStoragePolicy storagePolicy, + final EnumSet unavailableStorages, + final boolean newBlock) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { return writer; } - int totalReplicasExpected = numOfReplicas + results.size(); - - int numOfResults = results.size(); - boolean newBlock = (numOfResults==0); + final int numOfResults = results.size(); + final int totalReplicasExpected = numOfReplicas + numOfResults; if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) { writer = results.get(0).getDatanodeDescriptor(); } @@ -272,12 +307,25 @@ private Node chooseTarget(int numOfReplicas, // Keep a copy of original excludedNodes final Set oldExcludedNodes = avoidStaleNodes ? new HashSet(excludedNodes) : null; - final List storageTypes = storagePolicy.chooseStorageTypes( - (short)totalReplicasExpected, DatanodeStorageInfo.toStorageTypes(results)); + + // choose storage types; use fallbacks for unavailable storages + final List storageTypes = selectStorageTypes(storagePolicy, + (short)totalReplicasExpected, DatanodeStorageInfo.toStorageTypes(results), + unavailableStorages, newBlock); + + StorageType curStorageType = null; try { + if ((numOfReplicas = storageTypes.size()) == 0) { + throw new NotEnoughReplicasException( + "All required storage types are unavailable: " + + " unavailableStorages=" + unavailableStorages + + ", storagePolicy=" + storagePolicy); + } + if (numOfResults == 0) { + curStorageType = storageTypes.remove(0); writer = chooseLocalStorage(writer, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0), true) + maxNodesPerRack, results, avoidStaleNodes, curStorageType, true) .getDatanodeDescriptor(); if (--numOfReplicas == 0) { return writer; @@ -285,30 +333,33 @@ private Node chooseTarget(int numOfReplicas, } final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor(); if (numOfResults <= 1) { + curStorageType = storageTypes.remove(0); chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageTypes.remove(0)); + results, avoidStaleNodes, curStorageType); if (--numOfReplicas == 0) { return writer; } } if (numOfResults <= 2) { final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor(); + curStorageType = storageTypes.remove(0); if (clusterMap.isOnSameRack(dn0, dn1)) { chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageTypes.remove(0)); + results, avoidStaleNodes, curStorageType); } else if (newBlock){ chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageTypes.remove(0)); + results, avoidStaleNodes, curStorageType); } else { chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageTypes.remove(0)); + results, avoidStaleNodes, curStorageType); } if (--numOfReplicas == 0) { return writer; } } + curStorageType = storageTypes.remove(0); chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0)); + maxNodesPerRack, results, avoidStaleNodes, curStorageType); } catch (NotEnoughReplicasException e) { final String message = "Failed to place enough replicas, still in need of " + (totalReplicasExpected - results.size()) + " to reach " @@ -333,7 +384,16 @@ private Node chooseTarget(int numOfReplicas, // if the NotEnoughReplicasException was thrown in chooseRandom(). numOfReplicas = totalReplicasExpected - results.size(); return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, - maxNodesPerRack, results, false, storagePolicy); + maxNodesPerRack, results, false, storagePolicy, unavailableStorages, + newBlock); + } + + if (storageTypes.size() > 0) { + // Retry chooseTarget with fallback storage types + unavailableStorages.add(curStorageType); + return chooseTarget(numOfReplicas, writer, excludedNodes, blocksize, + maxNodesPerRack, results, false, storagePolicy, unavailableStorages, + newBlock); } } return writer;