HDFS-6671. Change BlockPlacementPolicy to consider block storage policy in replicaiton.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-6584@1611334 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-07-17 11:56:22 +00:00
parent 395c763aef
commit 38af6101d8
18 changed files with 330 additions and 147 deletions

View File

@ -8,6 +8,9 @@ HDFS-6584: Archival Storage
HDFS-6670. Add block storage policy support with default HOT, WARM and COLD
policies. (szetszwo)
HDFS-6671. Change BlockPlacementPolicy to consider block storage policy
in replicaiton. (szetszwo)
Trunk (Unreleased)
INCOMPATIBLE CHANGES

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
</configuration>

View File

@ -16,6 +16,7 @@
<!-- Put site-specific property overrides in this file. -->
<configuration>
<configuration xmlns:xi="http://www.w3.org/2001/XInclude">
<xi:include href="blockStoragePolicy-site.xml" />
</configuration>

View File

@ -20,6 +20,8 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -46,6 +48,28 @@ public class BlockStoragePolicy {
public static final int ID_BIT_LENGTH = 4;
public static final int ID_MAX = (1 << ID_BIT_LENGTH) - 1;
/** A block storage policy suite. */
public static class Suite {
private final byte defaultPolicyID;
private final BlockStoragePolicy[] policies;
private Suite(byte defaultPolicyID, BlockStoragePolicy[] policies) {
this.defaultPolicyID = defaultPolicyID;
this.policies = policies;
}
/** @return the corresponding policy. */
public BlockStoragePolicy getPolicy(byte id) {
// id == 0 means policy not specified.
return id == 0? getDefaultPolicy(): policies[id];
}
/** @return the default policy. */
public BlockStoragePolicy getDefaultPolicy() {
return getPolicy(defaultPolicyID);
}
}
/** A 4-bit policy ID */
private final byte id;
/** Policy name */
@ -70,26 +94,48 @@ public class BlockStoragePolicy {
/**
* @return a list of {@link StorageType}s for storing the replicas of a block.
*/
StorageType[] getStoragteTypes(short replication) {
final StorageType[] types = new StorageType[replication];
public List<StorageType> chooseStorageTypes(final short replication) {
final List<StorageType> types = new LinkedList<StorageType>();
int i = 0;
for(; i < types.length && i < storageTypes.length; i++) {
types[i] = storageTypes[i];
for(; i < replication && i < storageTypes.length; i++) {
types.add(storageTypes[i]);
}
final StorageType last = storageTypes[storageTypes.length - 1];
for(; i < types.length; i++) {
types[i] = last;
for(; i < replication; i++) {
types.add(last);
}
return types;
}
/**
* Choose the storage types for storing the remaining replicas, given the
* replication number and the storage types of the chosen replicas.
*
* @param replication the replication number.
* @param chosen the storage types of the chosen replicas.
* @return a list of {@link StorageType}s for storing the replicas of a block.
*/
public List<StorageType> chooseStorageTypes(final short replication,
final Iterable<StorageType> chosen) {
final List<StorageType> types = chooseStorageTypes(replication);
//remove the chosen storage types
for(StorageType c : chosen) {
final int i = types.indexOf(c);
if (i >= 0) {
types.remove(i);
}
}
return types;
}
/** @return the fallback {@link StorageType} for creation. */
StorageType getCreationFallback(EnumSet<StorageType> unavailables) {
public StorageType getCreationFallback(EnumSet<StorageType> unavailables) {
return getFallback(unavailables, creationFallbacks);
}
/** @return the fallback {@link StorageType} for replication. */
StorageType getReplicationFallback(EnumSet<StorageType> unavailables) {
public StorageType getReplicationFallback(EnumSet<StorageType> unavailables) {
return getFallback(unavailables, replicationFallbacks);
}
@ -111,21 +157,28 @@ private static StorageType getFallback(EnumSet<StorageType> unavailables,
return null;
}
private static byte parseID(String string) {
final byte id = Byte.parseByte(string);
if (id < 1) {
throw new IllegalArgumentException(
"Invalid block storage policy ID: id = " + id + " < 1");
private static byte parseID(String idString, String element, Configuration conf) {
Byte id = null;
try {
id = Byte.parseByte(idString);
} catch(NumberFormatException nfe) {
throwIllegalArgumentException("Failed to parse policy ID \"" + idString
+ "\" to a " + ID_BIT_LENGTH + "-bit integer", conf);
}
if (id > 15) {
throw new IllegalArgumentException(
"Invalid block storage policy ID: id = " + id + " > MAX = " + ID_MAX);
if (id < 0) {
throwIllegalArgumentException("Invalid policy ID: id = " + id
+ " < 1 in \"" + element + "\"", conf);
} else if (id == 0) {
throw new IllegalArgumentException("Policy ID 0 is reserved: " + element);
} else if (id > ID_MAX) {
throwIllegalArgumentException("Invalid policy ID: id = " + id
+ " > MAX = " + ID_MAX + " in \"" + element + "\"", conf);
}
return id;
}
private static StorageType[] parseStorageTypes(String[] strings) {
if (strings == null) {
if (strings == null || strings.length == 0) {
return StorageType.EMPTY_ARRAY;
}
final StorageType[] types = new StorageType[strings.length];
@ -137,14 +190,24 @@ private static StorageType[] parseStorageTypes(String[] strings) {
private static StorageType[] readStorageTypes(byte id, String keyPrefix,
Configuration conf) {
final String[] values = conf.getStrings(keyPrefix + id);
return parseStorageTypes(values);
final String key = keyPrefix + id;
final String[] values = conf.getStrings(key);
try {
return parseStorageTypes(values);
} catch(Exception e) {
throw new IllegalArgumentException("Failed to parse " + key
+ " \"" + conf.get(key), e);
}
}
public static BlockStoragePolicy readBlockStoragePolicy(byte id, String name,
private static BlockStoragePolicy readBlockStoragePolicy(byte id, String name,
Configuration conf) {
final StorageType[] storageTypes = readStorageTypes(id,
DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX, conf);
if (storageTypes.length == 0) {
throw new IllegalArgumentException(
DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX + id + " is missing or is empty.");
}
final StorageType[] creationFallbacks = readStorageTypes(id,
DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX, conf);
final StorageType[] replicationFallbacks = readStorageTypes(id,
@ -153,23 +216,53 @@ public static BlockStoragePolicy readBlockStoragePolicy(byte id, String name,
replicationFallbacks);
}
public static BlockStoragePolicy[] readBlockStoragePolicies(
Configuration conf) {
final BlockStoragePolicy[] policies = new BlockStoragePolicy[ID_MAX + 1];
/** Read {@link Suite} from conf. */
public static Suite readBlockStorageSuite(Configuration conf) {
final BlockStoragePolicy[] policies = new BlockStoragePolicy[1 << ID_BIT_LENGTH];
final String[] values = conf.getStrings(DFS_BLOCK_STORAGE_POLICIES_KEY);
byte firstID = -1;
for(String v : values) {
v = v.trim();
final int i = v.indexOf(':');
final String name = v.substring(0, i);
final byte id = parseID(v.substring(i + 1));
if (i < 0) {
throwIllegalArgumentException("Failed to parse element \"" + v
+ "\" (expected format is NAME:ID)", conf);
} else if (i == 0) {
throwIllegalArgumentException("Policy name is missing in \"" + v + "\"", conf);
} else if (i == v.length() - 1) {
throwIllegalArgumentException("Policy ID is missing in \"" + v + "\"", conf);
}
final String name = v.substring(0, i).trim();
for(int j = 1; j < policies.length; j++) {
if (policies[j] != null && policies[j].name.equals(name)) {
throwIllegalArgumentException("Policy name duplication: \""
+ name + "\" appears more than once", conf);
}
}
final byte id = parseID(v.substring(i + 1).trim(), v, conf);
if (policies[id] != null) {
throw new IllegalArgumentException(
"Policy duplication: ID " + id + " appears more than once in "
+ DFS_BLOCK_STORAGE_POLICIES_KEY);
throwIllegalArgumentException("Policy duplication: ID " + id
+ " appears more than once", conf);
}
policies[id] = readBlockStoragePolicy(id, name, conf);
LOG.info(policies[id]);
String prefix = "";
if (firstID == -1) {
firstID = id;
prefix = "(default) ";
}
LOG.info(prefix + policies[id]);
}
return policies;
if (firstID == -1) {
throwIllegalArgumentException("Empty list is not allowed", conf);
}
return new Suite(firstID, policies);
}
private static void throwIllegalArgumentException(String message,
Configuration conf) {
throw new IllegalArgumentException(message + " in "
+ DFS_BLOCK_STORAGE_POLICIES_KEY + " \""
+ conf.get(DFS_BLOCK_STORAGE_POLICIES_KEY) + "\".");
}
}

View File

@ -60,6 +60,11 @@ public interface BlockCollection {
*/
public short getBlockReplication();
/**
* @return the storage policy ID.
*/
public byte getStoragePolicyID();
/**
* Get the name of the collection.
*/

View File

@ -42,10 +42,10 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
@ -252,6 +252,7 @@ public int getPendingDataNodeMessageCount() {
/** for block replicas placement */
private BlockPlacementPolicy blockplacement;
private final BlockStoragePolicy.Suite storagePolicySuite;
/** Check whether name system is running before terminating */
private boolean checkNSRunning = true;
@ -274,6 +275,7 @@ public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
blockplacement = BlockPlacementPolicy.getInstance(
conf, stats, datanodeManager.getNetworkTopology(),
datanodeManager.getHost2DatanodeMap());
storagePolicySuite = BlockStoragePolicy.readBlockStorageSuite(conf);
pendingReplications = new PendingReplicationBlocks(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
@ -443,8 +445,8 @@ public DatanodeManager getDatanodeManager() {
return datanodeManager;
}
/** @return the BlockPlacementPolicy */
public BlockPlacementPolicy getBlockPlacementPolicy() {
@VisibleForTesting
BlockPlacementPolicy getBlockPlacementPolicy() {
return blockplacement;
}
@ -725,7 +727,6 @@ private List<DatanodeStorageInfo> getValidLocations(Block block) {
final List<DatanodeStorageInfo> locations
= new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
final String storageID = storage.getStorageID();
// filter invalidate replicas
if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) {
locations.add(storage);
@ -1351,7 +1352,7 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
// It is costly to extract the filename for which chooseTargets is called,
// so for now we pass in the block collection itself.
rw.chooseTargets(blockplacement, excludedNodes);
rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes);
}
namesystem.writeLock();
@ -1452,24 +1453,46 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
return scheduledWork;
}
/** Choose target for WebHDFS redirection. */
public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
DatanodeDescriptor clientnode, long blocksize) {
return blockplacement.chooseTarget(src, 1, clientnode,
Collections.<DatanodeStorageInfo>emptyList(), false, null, blocksize,
storagePolicySuite.getDefaultPolicy());
}
/** Choose target for getting additional datanodes for an existing pipeline. */
public DatanodeStorageInfo[] chooseTarget4AdditionalDatanode(String src,
int numAdditionalNodes,
DatanodeDescriptor clientnode,
List<DatanodeStorageInfo> chosen,
Set<Node> excludes,
long blocksize,
byte storagePolicyID) {
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID);
return blockplacement.chooseTarget(src, numAdditionalNodes, clientnode,
chosen, true, excludes, blocksize, storagePolicy);
}
/**
* Choose target datanodes according to the replication policy.
* Choose target datanodes for creating a new block.
*
* @throws IOException
* if the number of targets < minimum replication.
* @see BlockPlacementPolicy#chooseTarget(String, int, Node,
* List, boolean, Set, long)
*/
public DatanodeStorageInfo[] chooseTarget(final String src,
public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
final int numOfReplicas, final DatanodeDescriptor client,
final Set<Node> excludedNodes,
final long blocksize, List<String> favoredNodes) throws IOException {
final long blocksize,
final List<String> favoredNodes,
final byte storagePolicyID) throws IOException {
List<DatanodeDescriptor> favoredDatanodeDescriptors =
getDatanodeDescriptors(favoredNodes);
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID);
final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
numOfReplicas, client, excludedNodes, blocksize,
// TODO: get storage type from file
favoredDatanodeDescriptors, StorageType.DEFAULT);
favoredDatanodeDescriptors, storagePolicy);
if (targets.length < minReplication) {
throw new IOException("File " + src + " could only be replicated to "
+ targets.length + " nodes instead of minReplication (="
@ -3498,10 +3521,12 @@ public ReplicationWork(Block block,
}
private void chooseTargets(BlockPlacementPolicy blockplacement,
BlockStoragePolicy.Suite storagePolicySuite,
Set<Node> excludedNodes) {
targets = blockplacement.chooseTarget(bc.getName(),
additionalReplRequired, srcNode, liveReplicaStorages, false,
excludedNodes, block.getNumBytes(), StorageType.DEFAULT);
excludedNodes, block.getNumBytes(),
storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
}
}

View File

@ -27,6 +27,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
@ -75,7 +76,7 @@ public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
StorageType storageType);
BlockStoragePolicy storagePolicy);
/**
* Same as {@link #chooseTarget(String, int, Node, Set, long, List, StorageType)}
@ -89,14 +90,14 @@ DatanodeStorageInfo[] chooseTarget(String src,
Set<Node> excludedNodes,
long blocksize,
List<DatanodeDescriptor> favoredNodes,
StorageType storageType) {
BlockStoragePolicy storagePolicy) {
// This class does not provide the functionality of placing
// a block in favored datanodes. The implementations of this class
// are expected to provide this functionality
return chooseTarget(src, numOfReplicas, writer,
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
excludedNodes, blocksize, storageType);
excludedNodes, blocksize, storagePolicy);
}
/**

View File

@ -22,12 +22,14 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
@ -117,9 +119,9 @@ public DatanodeStorageInfo[] chooseTarget(String srcPath,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
StorageType storageType) {
final BlockStoragePolicy storagePolicy) {
return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
excludedNodes, blocksize, storageType);
excludedNodes, blocksize, storagePolicy);
}
@Override
@ -129,17 +131,19 @@ DatanodeStorageInfo[] chooseTarget(String src,
Set<Node> excludedNodes,
long blocksize,
List<DatanodeDescriptor> favoredNodes,
StorageType storageType) {
BlockStoragePolicy storagePolicy) {
try {
if (favoredNodes == null || favoredNodes.size() == 0) {
// Favored nodes not specified, fall back to regular block placement.
return chooseTarget(src, numOfReplicas, writer,
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
excludedNodes, blocksize, storageType);
excludedNodes, blocksize, storagePolicy);
}
Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
new HashSet<Node>() : new HashSet<Node>(excludedNodes);
final List<StorageType> storageTypes = storagePolicy.chooseStorageTypes(
(short)numOfReplicas);
// Choose favored nodes
List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
@ -152,12 +156,13 @@ DatanodeStorageInfo[] chooseTarget(String src,
final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
favoriteAndExcludedNodes, blocksize,
getMaxNodesPerRack(results.size(), numOfReplicas)[1],
results, avoidStaleNodes, storageType);
results, avoidStaleNodes, storageTypes.get(0));
if (target == null) {
LOG.warn("Could not find a target for file " + src
+ " with favored node " + favoredNode);
continue;
}
storageTypes.remove(0);
favoriteAndExcludedNodes.add(target.getDatanodeDescriptor());
}
@ -166,7 +171,7 @@ DatanodeStorageInfo[] chooseTarget(String src,
numOfReplicas -= results.size();
DatanodeStorageInfo[] remainingTargets =
chooseTarget(src, numOfReplicas, writer, results,
false, favoriteAndExcludedNodes, blocksize, storageType);
false, favoriteAndExcludedNodes, blocksize, storagePolicy);
for (int i = 0; i < remainingTargets.length; i++) {
results.add(remainingTargets[i]);
}
@ -177,7 +182,7 @@ DatanodeStorageInfo[] chooseTarget(String src,
// Fall back to regular block placement disregarding favored nodes hint
return chooseTarget(src, numOfReplicas, writer,
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
excludedNodes, blocksize, storageType);
excludedNodes, blocksize, storagePolicy);
}
}
@ -188,7 +193,7 @@ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
StorageType storageType) {
final BlockStoragePolicy storagePolicy) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return DatanodeStorageInfo.EMPTY_ARRAY;
}
@ -213,8 +218,8 @@ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
boolean avoidStaleNodes = (stats != null
&& stats.isAvoidingStaleDataNodesForWrite());
Node localNode = chooseTarget(numOfReplicas, writer,
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy);
if (!returnChosenNodes) {
results.removeAll(chosenStorage);
}
@ -247,13 +252,13 @@ private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
* @return local node of writer (not chosen node)
*/
private Node chooseTarget(int numOfReplicas,
Node writer,
Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeStorageInfo> results,
final boolean avoidStaleNodes,
StorageType storageType) {
Node writer,
final Set<Node> excludedNodes,
final long blocksize,
final int maxNodesPerRack,
final List<DatanodeStorageInfo> results,
final boolean avoidStaleNodes,
final BlockStoragePolicy storagePolicy) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return writer;
}
@ -268,10 +273,12 @@ private Node chooseTarget(int numOfReplicas,
// Keep a copy of original excludedNodes
final Set<Node> oldExcludedNodes = avoidStaleNodes ?
new HashSet<Node>(excludedNodes) : null;
final List<StorageType> storageTypes = chooseStorageTypes(storagePolicy,
(short)totalReplicasExpected, results);
try {
if (numOfResults == 0) {
writer = chooseLocalStorage(writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType)
maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0))
.getDatanodeDescriptor();
if (--numOfReplicas == 0) {
return writer;
@ -280,7 +287,7 @@ private Node chooseTarget(int numOfReplicas,
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
if (numOfResults <= 1) {
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageType);
results, avoidStaleNodes, storageTypes.remove(0));
if (--numOfReplicas == 0) {
return writer;
}
@ -289,20 +296,20 @@ private Node chooseTarget(int numOfReplicas,
final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
if (clusterMap.isOnSameRack(dn0, dn1)) {
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageType);
results, avoidStaleNodes, storageTypes.remove(0));
} else if (newBlock){
chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageType);
results, avoidStaleNodes, storageTypes.remove(0));
} else {
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageType);
results, avoidStaleNodes, storageTypes.remove(0));
}
if (--numOfReplicas == 0) {
return writer;
}
}
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType);
maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0));
} catch (NotEnoughReplicasException e) {
final String message = "Failed to place enough replicas, still in need of "
+ (totalReplicasExpected - results.size()) + " to reach "
@ -327,7 +334,7 @@ private Node chooseTarget(int numOfReplicas,
// if the NotEnoughReplicasException was thrown in chooseRandom().
numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false, storageType);
maxNodesPerRack, results, false, storagePolicy);
}
}
return writer;
@ -664,7 +671,29 @@ private boolean isGoodTarget(DatanodeStorageInfo storage,
}
return true;
}
private static List<StorageType> chooseStorageTypes(
final BlockStoragePolicy storagePolicy, final short replication,
final Iterable<DatanodeStorageInfo> chosen) {
return storagePolicy.chooseStorageTypes(
replication, new Iterable<StorageType>() {
@Override
public Iterator<StorageType> iterator() {
return new Iterator<StorageType>() {
final Iterator<DatanodeStorageInfo> i = chosen.iterator();
@Override
public boolean hasNext() {return i.hasNext();}
@Override
public StorageType next() {return i.next().getStorageType();}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
});
}
/**
* Return a pipeline of nodes.
* The pipeline is formed finding a shortest path that

View File

@ -156,7 +156,6 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block;
@ -2747,8 +2746,9 @@ LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
throws LeaseExpiredException, NotReplicatedYetException,
QuotaExceededException, SafeModeException, UnresolvedLinkException,
IOException {
long blockSize;
int replication;
final long blockSize;
final int replication;
final byte storagePolicyID;
DatanodeDescriptor clientNode = null;
if(NameNode.stateChangeLog.isDebugEnabled()) {
@ -2783,13 +2783,15 @@ LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
clientNode = blockManager.getDatanodeManager().getDatanodeByHost(
pendingFile.getFileUnderConstructionFeature().getClientMachine());
replication = pendingFile.getFileReplication();
storagePolicyID = pendingFile.getStoragePolicyID();
} finally {
readUnlock();
}
// choose targets for the new block to be allocated.
final DatanodeStorageInfo targets[] = getBlockManager().chooseTarget(
src, replication, clientNode, excludedNodes, blockSize, favoredNodes);
final DatanodeStorageInfo targets[] = getBlockManager().chooseTarget4NewBlock(
src, replication, clientNode, excludedNodes, blockSize, favoredNodes,
storagePolicyID);
// Part II.
// Allocate a new block, add it to the INode and the BlocksMap.
@ -2977,6 +2979,7 @@ LocatedBlock getAdditionalDatanode(String src, long fileId,
final DatanodeDescriptor clientnode;
final long preferredblocksize;
final byte storagePolicyID;
final List<DatanodeStorageInfo> chosen;
checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
@ -3003,6 +3006,7 @@ LocatedBlock getAdditionalDatanode(String src, long fileId,
.getClientMachine();
clientnode = blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
preferredblocksize = file.getPreferredBlockSize();
storagePolicyID = file.getStoragePolicyID();
//find datanode storages
final DatanodeManager dm = blockManager.getDatanodeManager();
@ -3012,10 +3016,9 @@ LocatedBlock getAdditionalDatanode(String src, long fileId,
}
// choose new datanodes.
final DatanodeStorageInfo[] targets = blockManager.getBlockPlacementPolicy(
).chooseTarget(src, numAdditionalNodes, clientnode, chosen, true,
// TODO: get storage type from the file
excludes, preferredblocksize, StorageType.DEFAULT);
final DatanodeStorageInfo[] targets = blockManager.chooseTarget4AdditionalDatanode(
src, numAdditionalNodes, clientnode, chosen,
excludes, preferredblocksize, storagePolicyID);
final LocatedBlock lb = new LocatedBlock(blk, targets);
blockManager.setBlockToken(lb, AccessMode.COPY);
return lb;

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
@ -78,7 +79,7 @@ public static INodeFile valueOf(INode inode, String path, boolean acceptNull)
static enum HeaderFormat {
PREFERRED_BLOCK_SIZE(null, 48, 1),
REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 12, 1),
STORAGE_POLICY_ID(REPLICATION.BITS, 4, 0);
STORAGE_POLICY_ID(REPLICATION.BITS, BlockStoragePolicy.ID_BIT_LENGTH, 0);
private final LongBitFormat BITS;

View File

@ -26,7 +26,6 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@ -56,7 +55,6 @@
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@ -202,11 +200,8 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
final DatanodeDescriptor clientNode = bm.getDatanodeManager(
).getDatanodeByHost(getRemoteAddress());
if (clientNode != null) {
final DatanodeStorageInfo[] storages = bm.getBlockPlacementPolicy()
.chooseTarget(path, 1, clientNode,
new ArrayList<DatanodeStorageInfo>(), false, null, blocksize,
// TODO: get storage type from the file
StorageType.DEFAULT);
final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
path, clientNode, blocksize);
if (storages.length > 0) {
return storages[0].getDatanodeDescriptor();
}

View File

@ -32,6 +32,7 @@
NAME_1:ID_1, NAME_2:ID_2, ..., NAME_n:ID_n
where ID is an integer in the range [1,15] and NAME is case insensitive.
The first element is the default policy. Empty list is not allowed.
</description>
</property>
@ -48,12 +49,12 @@
for i less than or equal to n, and
the j-th replica is stored using n-th storage type for j greater than n.
The value cannot specified as an empty list.
Empty list is not allowed.
Examples:
DISK : all replicas stored using DISK.
DISK, ARCHIVE : the first replica is stored using DISK and all the
remaining are stored using ARCHIVE.
remaining replicas are stored using ARCHIVE.
</description>
</property>

View File

@ -19,6 +19,7 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@ -27,15 +28,19 @@
/** Test {@link BlockStoragePolicy} */
public class TestBlockStoragePolicy {
public static final BlockStoragePolicy.Suite POLICY_SUITE;
public static final BlockStoragePolicy DEFAULT_STORAGE_POLICY;
static {
final Configuration conf = new HdfsConfiguration();
POLICY_SUITE = BlockStoragePolicy.readBlockStorageSuite(conf);
DEFAULT_STORAGE_POLICY = POLICY_SUITE.getDefaultPolicy();
}
static final EnumSet<StorageType> none = EnumSet.noneOf(StorageType.class);
static final EnumSet<StorageType> archive = EnumSet.of(StorageType.ARCHIVE);
static final EnumSet<StorageType> disk = EnumSet.of(StorageType.DISK);
static final EnumSet<StorageType> both = EnumSet.of(StorageType.DISK, StorageType.ARCHIVE);
static {
HdfsConfiguration.init();
}
@Test
public void testDefaultPolicies() throws Exception {
final byte COLD = (byte)4;
@ -49,19 +54,19 @@ public void testDefaultPolicies() throws Exception {
expectedPolicyStrings.put(HOT,
"BlockStoragePolicy{HOT:12, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]");
final Configuration conf = new Configuration();
final BlockStoragePolicy[] policies = BlockStoragePolicy.readBlockStoragePolicies(conf);
for(int i = 0; i < policies.length; i++) {
if (policies[i] != null) {
final String s = policies[i].toString();
Assert.assertEquals(expectedPolicyStrings.get((byte)i), s);
for(byte i = 1; i < 16; i++) {
final BlockStoragePolicy policy = POLICY_SUITE.getPolicy(i);
if (policy != null) {
final String s = policy.toString();
Assert.assertEquals(expectedPolicyStrings.get(i), s);
}
}
Assert.assertEquals(POLICY_SUITE.getPolicy(HOT), POLICY_SUITE.getDefaultPolicy());
{ // check Cold policy
final BlockStoragePolicy cold = policies[COLD];
final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);
for(short replication = 1; replication < 6; replication++) {
final StorageType[] computed = cold.getStoragteTypes(replication);
final List<StorageType> computed = cold.chooseStorageTypes(replication);
assertStorageType(computed, replication, StorageType.ARCHIVE);
}
assertCreationFallback(cold, null, null, null);
@ -69,9 +74,9 @@ public void testDefaultPolicies() throws Exception {
}
{ // check Warm policy
final BlockStoragePolicy warm = policies[WARM];
final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
for(short replication = 1; replication < 6; replication++) {
final StorageType[] computed = warm.getStoragteTypes(replication);
final List<StorageType> computed = warm.chooseStorageTypes(replication);
assertStorageType(computed, replication, StorageType.DISK, StorageType.ARCHIVE);
}
assertCreationFallback(warm, StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE);
@ -79,9 +84,9 @@ public void testDefaultPolicies() throws Exception {
}
{ // check Hot policy
final BlockStoragePolicy hot = policies[HOT];
final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
for(short replication = 1; replication < 6; replication++) {
final StorageType[] computed = hot.getStoragteTypes(replication);
final List<StorageType> computed = hot.chooseStorageTypes(replication);
assertStorageType(computed, replication, StorageType.DISK);
}
assertCreationFallback(hot, null, null, null);
@ -89,13 +94,13 @@ public void testDefaultPolicies() throws Exception {
}
}
static void assertStorageType(StorageType[] computed, short replication,
static void assertStorageType(List<StorageType> computed, short replication,
StorageType... answers) {
Assert.assertEquals(replication, computed.length);
Assert.assertEquals(replication, computed.size());
final StorageType last = answers[answers.length - 1];
for(int i = 0; i < computed.length; i++) {
for(int i = 0; i < computed.size(); i++) {
final StorageType expected = i < answers.length? answers[i]: last;
Assert.assertEquals(expected, computed[i]);
Assert.assertEquals(expected, computed.get(i));
}
}

View File

@ -46,14 +46,14 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@ -228,7 +228,7 @@ private static DatanodeStorageInfo[] chooseTarget(
List<DatanodeStorageInfo> chosenNodes,
Set<Node> excludedNodes) {
return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
false, excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
false, excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
}
/**
@ -295,7 +295,7 @@ public void testChooseTarget2() throws Exception {
excludedNodes.add(dataNodes[1]);
chosenNodes.add(storages[2]);
targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
System.out.println("targets=" + Arrays.asList(targets));
assertEquals(2, targets.length);
//make sure that the chosen node is in the target.
@ -630,7 +630,7 @@ public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception {
.getNamesystem().getBlockManager().getBlockPlacementPolicy();
DatanodeStorageInfo[] targets = replicator.chooseTarget(filename, 3,
staleNodeInfo, new ArrayList<DatanodeStorageInfo>(), false, null,
BLOCK_SIZE, StorageType.DEFAULT);
BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
assertEquals(targets.length, 3);
assertFalse(isOnSameRack(targets[0], staleNodeInfo));
@ -656,7 +656,7 @@ public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception {
// Call chooseTarget
targets = replicator.chooseTarget(filename, 3, staleNodeInfo,
new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE,
StorageType.DEFAULT);
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
assertEquals(targets.length, 3);
assertTrue(isOnSameRack(targets[0], staleNodeInfo));

View File

@ -17,24 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.VersionInfo;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
@ -44,8 +28,24 @@
import java.util.List;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.VersionInfo;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestReplicationPolicyConsiderLoad {
@ -138,7 +138,7 @@ public void testChooseTargetWithDecomNodes() throws IOException {
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
.getBlockPlacementPolicy().chooseTarget("testFile.txt", 3,
dataNodes[0], new ArrayList<DatanodeStorageInfo>(), false, null,
1024, StorageType.DEFAULT);
1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
assertEquals(3, targets.length);
Set<DatanodeStorageInfo> targetSet = new HashSet<DatanodeStorageInfo>(

View File

@ -36,7 +36,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetworkTopology;
@ -258,7 +258,7 @@ private DatanodeStorageInfo[] chooseTarget(
List<DatanodeStorageInfo> chosenNodes,
Set<Node> excludedNodes) {
return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
false, excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
false, excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
}
/**
@ -340,7 +340,7 @@ public void testChooseTarget2() throws Exception {
Set<Node> excludedNodes = new HashSet<Node>();
excludedNodes.add(dataNodes[1]);
targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false,
excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
assertEquals(targets.length, 4);
assertEquals(storages[0], targets[0]);
@ -358,7 +358,7 @@ public void testChooseTarget2() throws Exception {
excludedNodes.add(dataNodes[1]);
chosenNodes.add(storages[2]);
targets = repl.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
System.out.println("targets=" + Arrays.asList(targets));
assertEquals(2, targets.length);
//make sure that the chosen node is in the target.

View File

@ -120,9 +120,9 @@ public DatanodeStorageInfo[] answer(InvocationOnMock invocation)
}
return ret;
}
}).when(spyBM).chooseTarget(Mockito.anyString(), Mockito.anyInt(),
}).when(spyBM).chooseTarget4NewBlock(Mockito.anyString(), Mockito.anyInt(),
Mockito.<DatanodeDescriptor>any(), Mockito.<HashSet<Node>>any(),
Mockito.anyLong(), Mockito.<List<String>>any());
Mockito.anyLong(), Mockito.<List<String>>any(), Mockito.anyByte());
// create file
nn.create(src, FsPermission.getFileDefault(),

View File

@ -27,11 +27,11 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
@ -107,10 +107,10 @@ public DatanodeStorageInfo[] chooseTarget(String srcPath,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
StorageType storageType) {
final BlockStoragePolicy storagePolicy) {
DatanodeStorageInfo[] results = super.chooseTarget(srcPath,
numOfReplicas, writer, chosenNodes, returnChosenNodes, excludedNodes,
blocksize, storageType);
blocksize, storagePolicy);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {}