HDFS-3601. Add BlockPlacementPolicyWithNodeGroup to support block placement with 4-layer network topology. Contributed by Junping Du
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1357442 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
359b4efd19
commit
4d0cab2729
@ -170,8 +170,7 @@ boolean add(Node n) {
|
||||
}
|
||||
if (parentNode == null) {
|
||||
// create a new InnerNode
|
||||
parentNode = new InnerNode(parentName, getPath(this),
|
||||
this, this.getLevel()+1);
|
||||
parentNode = createParentNode(parentName);
|
||||
children.add(parentNode);
|
||||
}
|
||||
// add n to the subtree of the next ancestor node
|
||||
@ -288,7 +287,7 @@ Node getLeaf(int leafIndex, Node excludedNode) {
|
||||
// calculate the total number of excluded leaf nodes
|
||||
int numOfExcludedLeaves =
|
||||
isLeaf ? 1 : ((InnerNode)excludedNode).getNumOfLeaves();
|
||||
if (isRack()) { // children are leaves
|
||||
if (isLeafParent()) { // children are leaves
|
||||
if (isLeaf) { // excluded node is a leaf node
|
||||
int excludedIndex = children.indexOf(excludedNode);
|
||||
if (excludedIndex != -1 && leafIndex >= 0) {
|
||||
@ -326,6 +325,10 @@ Node getLeaf(int leafIndex, Node excludedNode) {
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean isLeafParent() {
|
||||
return isRack();
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if children a leaves, default implementation calls {@link #isRack()}
|
||||
* <p>To be overridden in subclasses for specific InnerNode implementations,
|
||||
@ -752,6 +755,30 @@ public String toString() {
|
||||
}
|
||||
return tree.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Divide networklocation string into two parts by last separator, and get
|
||||
* the first part here.
|
||||
*
|
||||
* @param networkLocation
|
||||
* @return
|
||||
*/
|
||||
public static String getFirstHalf(String networkLocation) {
|
||||
int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
|
||||
return networkLocation.substring(0, index);
|
||||
}
|
||||
|
||||
/**
|
||||
* Divide networklocation string into two parts by last separator, and get
|
||||
* the second part here.
|
||||
*
|
||||
* @param networkLocation
|
||||
* @return
|
||||
*/
|
||||
public static String getLastHalf(String networkLocation) {
|
||||
int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
|
||||
return networkLocation.substring(index);
|
||||
}
|
||||
|
||||
/** swap two array items */
|
||||
static protected void swap(Node[] nodes, int i, int j) {
|
||||
|
@ -49,7 +49,7 @@ protected Node getNodeForNetworkLocation(Node node) {
|
||||
}
|
||||
Node nodeGroup = getNode(node.getNetworkLocation());
|
||||
if (nodeGroup == null) {
|
||||
nodeGroup = new InnerNode(node.getNetworkLocation());
|
||||
nodeGroup = new InnerNodeWithNodeGroup(node.getNetworkLocation());
|
||||
}
|
||||
return getNode(nodeGroup.getNetworkLocation());
|
||||
}
|
||||
@ -383,6 +383,11 @@ boolean isNodeGroup() {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isLeafParent() {
|
||||
return isNodeGroup();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InnerNode createParentNode(String parentName) {
|
||||
|
@ -13,6 +13,9 @@ Trunk (unreleased changes)
|
||||
|
||||
HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
|
||||
|
||||
HDFS-3601. Add BlockPlacementPolicyWithNodeGroup to support block placement
|
||||
with 4-layer network topology. (Junping Du via szetszwo)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->
|
||||
|
@ -148,6 +148,7 @@ DatanodeDescriptor[] chooseTarget(int numOfReplicas,
|
||||
new ArrayList<DatanodeDescriptor>(chosenNodes);
|
||||
for (Node node:chosenNodes) {
|
||||
excludedNodes.put(node, node);
|
||||
adjustExcludedNodes(excludedNodes, node);
|
||||
}
|
||||
|
||||
if (!clusterMap.contains(writer)) {
|
||||
@ -359,10 +360,11 @@ protected DatanodeDescriptor chooseRandom(
|
||||
(DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
|
||||
|
||||
Node oldNode = excludedNodes.put(chosenNode, chosenNode);
|
||||
if (oldNode == null) { // choosendNode was not in the excluded list
|
||||
if (oldNode == null) { // chosenNode was not in the excluded list
|
||||
numOfAvailableNodes--;
|
||||
if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
|
||||
results.add(chosenNode);
|
||||
adjustExcludedNodes(excludedNodes, chosenNode);
|
||||
return chosenNode;
|
||||
} else {
|
||||
badTarget = true;
|
||||
@ -409,6 +411,7 @@ protected void chooseRandom(int numOfReplicas,
|
||||
if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
|
||||
numOfReplicas--;
|
||||
results.add(chosenNode);
|
||||
adjustExcludedNodes(excludedNodes, chosenNode);
|
||||
} else {
|
||||
badTarget = true;
|
||||
}
|
||||
@ -426,7 +429,21 @@ protected void chooseRandom(int numOfReplicas,
|
||||
throw new NotEnoughReplicasException(detail);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* After choosing a node to place replica, adjust excluded nodes accordingly.
|
||||
* It should do nothing here as chosenNode is already put into exlcudeNodes,
|
||||
* but it can be overridden in subclass to put more related nodes into
|
||||
* excludedNodes.
|
||||
*
|
||||
* @param excludedNodes
|
||||
* @param chosenNode
|
||||
*/
|
||||
protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
|
||||
Node chosenNode) {
|
||||
// do nothing here.
|
||||
}
|
||||
|
||||
/* judge if a node is a good target.
|
||||
* return true if <i>node</i> has enough space,
|
||||
* does not have too much load, and the rack does not have too many nodes
|
||||
|
@ -0,0 +1,305 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.apache.hadoop.net.NodeBase;
|
||||
|
||||
/** The class is responsible for choosing the desired number of targets
|
||||
* for placing block replicas on environment with node-group layer.
|
||||
* The replica placement strategy is adjusted to:
|
||||
* If the writer is on a datanode, the 1st replica is placed on the local
|
||||
* node (or local node-group), otherwise a random datanode.
|
||||
* The 2nd replica is placed on a datanode that is on a different rack with 1st
|
||||
* replica node.
|
||||
* The 3rd replica is placed on a datanode which is on a different node-group
|
||||
* but the same rack as the second replica node.
|
||||
*/
|
||||
public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault {
|
||||
|
||||
BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats,
|
||||
NetworkTopology clusterMap) {
|
||||
initialize(conf, stats, clusterMap);
|
||||
}
|
||||
|
||||
BlockPlacementPolicyWithNodeGroup() {
|
||||
}
|
||||
|
||||
public void initialize(Configuration conf, FSClusterStats stats,
|
||||
NetworkTopology clusterMap) {
|
||||
super.initialize(conf, stats, clusterMap);
|
||||
}
|
||||
|
||||
/** choose local node of localMachine as the target.
|
||||
* if localMachine is not available, choose a node on the same nodegroup or
|
||||
* rack instead.
|
||||
* @return the chosen node
|
||||
*/
|
||||
@Override
|
||||
protected DatanodeDescriptor chooseLocalNode(
|
||||
DatanodeDescriptor localMachine,
|
||||
HashMap<Node, Node> excludedNodes,
|
||||
long blocksize,
|
||||
int maxNodesPerRack,
|
||||
List<DatanodeDescriptor> results)
|
||||
throws NotEnoughReplicasException {
|
||||
// if no local machine, randomly choose one node
|
||||
if (localMachine == null)
|
||||
return chooseRandom(NodeBase.ROOT, excludedNodes,
|
||||
blocksize, maxNodesPerRack, results);
|
||||
|
||||
// otherwise try local machine first
|
||||
Node oldNode = excludedNodes.put(localMachine, localMachine);
|
||||
if (oldNode == null) { // was not in the excluded list
|
||||
if (isGoodTarget(localMachine, blocksize,
|
||||
maxNodesPerRack, false, results)) {
|
||||
results.add(localMachine);
|
||||
// Nodes under same nodegroup should be excluded.
|
||||
addNodeGroupToExcludedNodes(excludedNodes,
|
||||
localMachine.getNetworkLocation());
|
||||
return localMachine;
|
||||
}
|
||||
}
|
||||
|
||||
// try a node on local node group
|
||||
DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
|
||||
(NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes,
|
||||
blocksize, maxNodesPerRack, results);
|
||||
if (chosenNode != null) {
|
||||
return chosenNode;
|
||||
}
|
||||
// try a node on local rack
|
||||
return chooseLocalRack(localMachine, excludedNodes,
|
||||
blocksize, maxNodesPerRack, results);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
|
||||
Node chosenNode) {
|
||||
// as node-group aware implementation, it should make sure no two replica
|
||||
// are placing on the same node group.
|
||||
addNodeGroupToExcludedNodes(excludedNodes, chosenNode.getNetworkLocation());
|
||||
}
|
||||
|
||||
// add all nodes under specific nodegroup to excludedNodes.
|
||||
private void addNodeGroupToExcludedNodes(HashMap<Node, Node> excludedNodes,
|
||||
String nodeGroup) {
|
||||
List<Node> leafNodes = clusterMap.getLeaves(nodeGroup);
|
||||
for (Node node : leafNodes) {
|
||||
excludedNodes.put(node, node);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
protected DatanodeDescriptor chooseLocalRack(
|
||||
DatanodeDescriptor localMachine,
|
||||
HashMap<Node, Node> excludedNodes,
|
||||
long blocksize,
|
||||
int maxNodesPerRack,
|
||||
List<DatanodeDescriptor> results)
|
||||
throws NotEnoughReplicasException {
|
||||
// no local machine, so choose a random machine
|
||||
if (localMachine == null) {
|
||||
return chooseRandom(NodeBase.ROOT, excludedNodes,
|
||||
blocksize, maxNodesPerRack, results);
|
||||
}
|
||||
|
||||
// choose one from the local rack, but off-nodegroup
|
||||
try {
|
||||
return chooseRandom(NetworkTopology.getFirstHalf(
|
||||
localMachine.getNetworkLocation()),
|
||||
excludedNodes, blocksize,
|
||||
maxNodesPerRack, results);
|
||||
} catch (NotEnoughReplicasException e1) {
|
||||
// find the second replica
|
||||
DatanodeDescriptor newLocal=null;
|
||||
for(Iterator<DatanodeDescriptor> iter=results.iterator();
|
||||
iter.hasNext();) {
|
||||
DatanodeDescriptor nextNode = iter.next();
|
||||
if (nextNode != localMachine) {
|
||||
newLocal = nextNode;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (newLocal != null) {
|
||||
try {
|
||||
return chooseRandom(clusterMap.getRack(newLocal.getNetworkLocation()),
|
||||
excludedNodes, blocksize, maxNodesPerRack, results);
|
||||
} catch(NotEnoughReplicasException e2) {
|
||||
//otherwise randomly choose one from the network
|
||||
return chooseRandom(NodeBase.ROOT, excludedNodes,
|
||||
blocksize, maxNodesPerRack, results);
|
||||
}
|
||||
} else {
|
||||
//otherwise randomly choose one from the network
|
||||
return chooseRandom(NodeBase.ROOT, excludedNodes,
|
||||
blocksize, maxNodesPerRack, results);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
protected void chooseRemoteRack(int numOfReplicas,
|
||||
DatanodeDescriptor localMachine,
|
||||
HashMap<Node, Node> excludedNodes,
|
||||
long blocksize,
|
||||
int maxReplicasPerRack,
|
||||
List<DatanodeDescriptor> results)
|
||||
throws NotEnoughReplicasException {
|
||||
int oldNumOfReplicas = results.size();
|
||||
// randomly choose one node from remote racks
|
||||
try {
|
||||
chooseRandom(numOfReplicas, "~"+NetworkTopology.getFirstHalf(
|
||||
localMachine.getNetworkLocation()),
|
||||
excludedNodes, blocksize, maxReplicasPerRack, results);
|
||||
} catch (NotEnoughReplicasException e) {
|
||||
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
|
||||
localMachine.getNetworkLocation(), excludedNodes, blocksize,
|
||||
maxReplicasPerRack, results);
|
||||
}
|
||||
}
|
||||
|
||||
/* choose one node from the nodegroup that <i>localMachine</i> is on.
|
||||
* if no such node is available, choose one node from the nodegroup where
|
||||
* a second replica is on.
|
||||
* if still no such node is available, choose a random node in the cluster.
|
||||
* @return the chosen node
|
||||
*/
|
||||
private DatanodeDescriptor chooseLocalNodeGroup(NetworkTopologyWithNodeGroup clusterMap,
|
||||
DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes, long blocksize,
|
||||
int maxNodesPerRack, List<DatanodeDescriptor> results) throws NotEnoughReplicasException {
|
||||
// no local machine, so choose a random machine
|
||||
if (localMachine == null) {
|
||||
return chooseRandom(NodeBase.ROOT, excludedNodes,
|
||||
blocksize, maxNodesPerRack, results);
|
||||
}
|
||||
|
||||
// choose one from the local node group
|
||||
try {
|
||||
return chooseRandom(clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
|
||||
excludedNodes, blocksize, maxNodesPerRack, results);
|
||||
} catch (NotEnoughReplicasException e1) {
|
||||
// find the second replica
|
||||
DatanodeDescriptor newLocal=null;
|
||||
for(Iterator<DatanodeDescriptor> iter=results.iterator();
|
||||
iter.hasNext();) {
|
||||
DatanodeDescriptor nextNode = iter.next();
|
||||
if (nextNode != localMachine) {
|
||||
newLocal = nextNode;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (newLocal != null) {
|
||||
try {
|
||||
return chooseRandom(clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
|
||||
excludedNodes, blocksize, maxNodesPerRack, results);
|
||||
} catch(NotEnoughReplicasException e2) {
|
||||
//otherwise randomly choose one from the network
|
||||
return chooseRandom(NodeBase.ROOT, excludedNodes,
|
||||
blocksize, maxNodesPerRack, results);
|
||||
}
|
||||
} else {
|
||||
//otherwise randomly choose one from the network
|
||||
return chooseRandom(NodeBase.ROOT, excludedNodes,
|
||||
blocksize, maxNodesPerRack, results);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getRack(final DatanodeInfo cur) {
|
||||
String nodeGroupString = cur.getNetworkLocation();
|
||||
return NetworkTopology.getFirstHalf(nodeGroupString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Pick up replica node set for deleting replica as over-replicated.
|
||||
* First set contains replica nodes on rack with more than one
|
||||
* replica while second set contains remaining replica nodes.
|
||||
* If first is not empty, divide first set into two subsets:
|
||||
* moreThanOne contains nodes on nodegroup with more than one replica
|
||||
* exactlyOne contains the remaining nodes in first set
|
||||
* then pickup priSet if not empty.
|
||||
* If first is empty, then pick second.
|
||||
*/
|
||||
@Override
|
||||
public Iterator<DatanodeDescriptor> pickupReplicaSet(
|
||||
Collection<DatanodeDescriptor> first,
|
||||
Collection<DatanodeDescriptor> second) {
|
||||
// If no replica within same rack, return directly.
|
||||
if (first.isEmpty()) {
|
||||
return second.iterator();
|
||||
}
|
||||
// Split data nodes in the first set into two sets,
|
||||
// moreThanOne contains nodes on nodegroup with more than one replica
|
||||
// exactlyOne contains the remaining nodes
|
||||
Map<String, List<DatanodeDescriptor>> nodeGroupMap =
|
||||
new HashMap<String, List<DatanodeDescriptor>>();
|
||||
|
||||
for(DatanodeDescriptor node : first) {
|
||||
final String nodeGroupName =
|
||||
NetworkTopology.getLastHalf(node.getNetworkLocation());
|
||||
List<DatanodeDescriptor> datanodeList =
|
||||
nodeGroupMap.get(nodeGroupName);
|
||||
if (datanodeList == null) {
|
||||
datanodeList = new ArrayList<DatanodeDescriptor>();
|
||||
nodeGroupMap.put(nodeGroupName, datanodeList);
|
||||
}
|
||||
datanodeList.add(node);
|
||||
}
|
||||
|
||||
final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
|
||||
final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
|
||||
// split nodes into two sets
|
||||
for(List<DatanodeDescriptor> datanodeList : nodeGroupMap.values()) {
|
||||
if (datanodeList.size() == 1 ) {
|
||||
// exactlyOne contains nodes on nodegroup with exactly one replica
|
||||
exactlyOne.add(datanodeList.get(0));
|
||||
} else {
|
||||
// moreThanOne contains nodes on nodegroup with more than one replica
|
||||
moreThanOne.addAll(datanodeList);
|
||||
}
|
||||
}
|
||||
|
||||
Iterator<DatanodeDescriptor> iter =
|
||||
moreThanOne.isEmpty() ? exactlyOne.iterator() : moreThanOne.iterator();
|
||||
return iter;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,490 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestReplicationPolicyWithNodeGroup extends TestCase {
|
||||
private static final int BLOCK_SIZE = 1024;
|
||||
private static final int NUM_OF_DATANODES = 8;
|
||||
private static final Configuration CONF = new HdfsConfiguration();
|
||||
private static final NetworkTopology cluster;
|
||||
private static final NameNode namenode;
|
||||
private static final BlockPlacementPolicy replicator;
|
||||
private static final String filename = "/dummyfile.txt";
|
||||
|
||||
private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
|
||||
DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
|
||||
DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
|
||||
DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n2"),
|
||||
DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2/n3"),
|
||||
DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
|
||||
DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n4"),
|
||||
DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/n5"),
|
||||
DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/n6")
|
||||
};
|
||||
|
||||
private final static DatanodeDescriptor NODE =
|
||||
new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7"));
|
||||
|
||||
static {
|
||||
try {
|
||||
FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
|
||||
CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
||||
// Set properties to make HDFS aware of NodeGroup.
|
||||
CONF.set("dfs.block.replicator.classname",
|
||||
"org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup");
|
||||
CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
|
||||
"org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
|
||||
DFSTestUtil.formatNameNode(CONF);
|
||||
namenode = new NameNode(CONF);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
throw (RuntimeException)new RuntimeException().initCause(e);
|
||||
}
|
||||
final BlockManager bm = namenode.getNamesystem().getBlockManager();
|
||||
replicator = bm.getBlockPlacementPolicy();
|
||||
cluster = bm.getDatanodeManager().getNetworkTopology();
|
||||
// construct network topology
|
||||
for(int i=0; i<NUM_OF_DATANODES; i++) {
|
||||
cluster.add(dataNodes[i]);
|
||||
}
|
||||
setupDataNodeCapacity();
|
||||
}
|
||||
|
||||
private static void setupDataNodeCapacity() {
|
||||
for(int i=0; i<NUM_OF_DATANODES; i++) {
|
||||
dataNodes[i].updateHeartbeat(
|
||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In this testcase, client is dataNodes[0]. So the 1st replica should be
|
||||
* placed on dataNodes[0], the 2nd replica should be placed on
|
||||
* different rack and third should be placed on different node (and node group)
|
||||
* of rack chosen for 2nd node.
|
||||
* The only excpetion is when the <i>numOfReplicas</i> is 2,
|
||||
* the 1st is on dataNodes[0] and the 2nd is on a different rack.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testChooseTarget1() throws Exception {
|
||||
dataNodes[0].updateHeartbeat(
|
||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
|
||||
|
||||
DatanodeDescriptor[] targets;
|
||||
targets = replicator.chooseTarget(filename,
|
||||
0, dataNodes[0], BLOCK_SIZE);
|
||||
assertEquals(targets.length, 0);
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
1, dataNodes[0], BLOCK_SIZE);
|
||||
assertEquals(targets.length, 1);
|
||||
assertEquals(targets[0], dataNodes[0]);
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
2, dataNodes[0], BLOCK_SIZE);
|
||||
assertEquals(targets.length, 2);
|
||||
assertEquals(targets[0], dataNodes[0]);
|
||||
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
3, dataNodes[0], BLOCK_SIZE);
|
||||
assertEquals(targets.length, 3);
|
||||
assertEquals(targets[0], dataNodes[0]);
|
||||
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
|
||||
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
|
||||
assertFalse(cluster.isOnSameNodeGroup(targets[1], targets[2]));
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
4, dataNodes[0], BLOCK_SIZE);
|
||||
assertEquals(targets.length, 4);
|
||||
assertEquals(targets[0], dataNodes[0]);
|
||||
assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
|
||||
cluster.isOnSameRack(targets[2], targets[3]));
|
||||
assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
|
||||
// Make sure no more than one replicas are on the same nodegroup
|
||||
verifyNoTwoTargetsOnSameNodeGroup(targets);
|
||||
|
||||
dataNodes[0].updateHeartbeat(
|
||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
|
||||
}
|
||||
|
||||
private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeDescriptor[] targets) {
|
||||
Set<String> nodeGroupSet = new HashSet<String>();
|
||||
for (DatanodeDescriptor target: targets) {
|
||||
nodeGroupSet.add(target.getNetworkLocation());
|
||||
}
|
||||
assertEquals(nodeGroupSet.size(), targets.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* In this testcase, client is dataNodes[0], but the dataNodes[1] is
|
||||
* not allowed to be chosen. So the 1st replica should be
|
||||
* placed on dataNodes[0], the 2nd replica should be placed on a different
|
||||
* rack, the 3rd should be on same rack as the 2nd replica but in different
|
||||
* node group, and the rest should be placed on a third rack.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testChooseTarget2() throws Exception {
|
||||
HashMap<Node, Node> excludedNodes;
|
||||
DatanodeDescriptor[] targets;
|
||||
BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
|
||||
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
|
||||
|
||||
excludedNodes = new HashMap<Node, Node>();
|
||||
excludedNodes.put(dataNodes[1], dataNodes[1]);
|
||||
targets = repl.chooseTarget(4, dataNodes[0], chosenNodes, false,
|
||||
excludedNodes, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 4);
|
||||
assertEquals(targets[0], dataNodes[0]);
|
||||
assertTrue(cluster.isNodeGroupAware());
|
||||
// Make sure no replicas are on the same nodegroup
|
||||
for (int i=1;i<4;i++) {
|
||||
assertFalse(cluster.isOnSameNodeGroup(targets[0], targets[i]));
|
||||
}
|
||||
assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
|
||||
cluster.isOnSameRack(targets[2], targets[3]));
|
||||
assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
|
||||
|
||||
excludedNodes.clear();
|
||||
chosenNodes.clear();
|
||||
excludedNodes.put(dataNodes[1], dataNodes[1]);
|
||||
chosenNodes.add(dataNodes[2]);
|
||||
targets = repl.chooseTarget(1, dataNodes[0], chosenNodes, true,
|
||||
excludedNodes, BLOCK_SIZE);
|
||||
System.out.println("targets=" + Arrays.asList(targets));
|
||||
assertEquals(2, targets.length);
|
||||
//make sure that the chosen node is in the target.
|
||||
int i = 0;
|
||||
for(; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
|
||||
assertTrue(i < targets.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* In this testcase, client is dataNodes[0], but dataNodes[0] is not qualified
|
||||
* to be chosen. So the 1st replica should be placed on dataNodes[1],
|
||||
* the 2nd replica should be placed on a different rack,
|
||||
* the 3rd replica should be placed on the same rack as the 2nd replica but in different nodegroup,
|
||||
* and the rest should be placed on the third rack.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testChooseTarget3() throws Exception {
|
||||
// make data node 0 to be not qualified to choose
|
||||
dataNodes[0].updateHeartbeat(
|
||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
|
||||
|
||||
DatanodeDescriptor[] targets;
|
||||
targets = replicator.chooseTarget(filename,
|
||||
0, dataNodes[0], BLOCK_SIZE);
|
||||
assertEquals(targets.length, 0);
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
1, dataNodes[0], BLOCK_SIZE);
|
||||
assertEquals(targets.length, 1);
|
||||
assertEquals(targets[0], dataNodes[1]);
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
2, dataNodes[0], BLOCK_SIZE);
|
||||
assertEquals(targets.length, 2);
|
||||
assertEquals(targets[0], dataNodes[1]);
|
||||
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
3, dataNodes[0], BLOCK_SIZE);
|
||||
assertEquals(targets.length, 3);
|
||||
assertEquals(targets[0], dataNodes[1]);
|
||||
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
|
||||
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
4, dataNodes[0], BLOCK_SIZE);
|
||||
assertEquals(targets.length, 4);
|
||||
assertEquals(targets[0], dataNodes[1]);
|
||||
assertTrue(cluster.isNodeGroupAware());
|
||||
verifyNoTwoTargetsOnSameNodeGroup(targets);
|
||||
assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
|
||||
cluster.isOnSameRack(targets[2], targets[3]));
|
||||
|
||||
dataNodes[0].updateHeartbeat(
|
||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* In this testcase, client is dataNodes[0], but none of the nodes on rack 1
|
||||
* is qualified to be chosen. So the 1st replica should be placed on either
|
||||
* rack 2 or rack 3.
|
||||
* the 2nd replica should be placed on a different rack,
|
||||
* the 3rd replica should be placed on the same rack as the 1st replica, but
|
||||
* in different node group.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testChooseTarget4() throws Exception {
|
||||
// make data node 0-2 to be not qualified to choose: not enough disk space
|
||||
for(int i=0; i<3; i++) {
|
||||
dataNodes[i].updateHeartbeat(
|
||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
|
||||
}
|
||||
|
||||
DatanodeDescriptor[] targets;
|
||||
targets = replicator.chooseTarget(filename,
|
||||
0, dataNodes[0], BLOCK_SIZE);
|
||||
assertEquals(targets.length, 0);
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
1, dataNodes[0], BLOCK_SIZE);
|
||||
assertEquals(targets.length, 1);
|
||||
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
2, dataNodes[0], BLOCK_SIZE);
|
||||
assertEquals(targets.length, 2);
|
||||
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
|
||||
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
3, dataNodes[0], BLOCK_SIZE);
|
||||
assertEquals(targets.length, 3);
|
||||
for(int i=0; i<3; i++) {
|
||||
assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
|
||||
}
|
||||
verifyNoTwoTargetsOnSameNodeGroup(targets);
|
||||
assertTrue(cluster.isOnSameRack(targets[0], targets[1]) ||
|
||||
cluster.isOnSameRack(targets[1], targets[2]));
|
||||
assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
|
||||
}
|
||||
|
||||
/**
|
||||
* In this testcase, client is is a node outside of file system.
|
||||
* So the 1st replica can be placed on any node.
|
||||
* the 2nd replica should be placed on a different rack,
|
||||
* the 3rd replica should be placed on the same rack as the 2nd replica,
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testChooseTarget5() throws Exception {
|
||||
setupDataNodeCapacity();
|
||||
DatanodeDescriptor[] targets;
|
||||
targets = replicator.chooseTarget(filename,
|
||||
0, NODE, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 0);
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
1, NODE, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 1);
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
2, NODE, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 2);
|
||||
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
3, NODE, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 3);
|
||||
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
|
||||
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
|
||||
verifyNoTwoTargetsOnSameNodeGroup(targets);
|
||||
}
|
||||
|
||||
/**
|
||||
* This testcase tests re-replication, when dataNodes[0] is already chosen.
|
||||
* So the 1st replica can be placed on random rack.
|
||||
* the 2nd replica should be placed on different node and nodegroup by same rack as
|
||||
* the 1st replica. The 3rd replica can be placed randomly.
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testRereplicate1() throws Exception {
|
||||
setupDataNodeCapacity();
|
||||
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
|
||||
chosenNodes.add(dataNodes[0]);
|
||||
DatanodeDescriptor[] targets;
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
0, dataNodes[0], chosenNodes, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 0);
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
1, dataNodes[0], chosenNodes, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 1);
|
||||
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
2, dataNodes[0], chosenNodes, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 2);
|
||||
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
|
||||
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
3, dataNodes[0], chosenNodes, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 3);
|
||||
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
|
||||
assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
|
||||
assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
|
||||
}
|
||||
|
||||
/**
|
||||
* This testcase tests re-replication,
|
||||
* when dataNodes[0] and dataNodes[1] are already chosen.
|
||||
* So the 1st replica should be placed on a different rack of rack 1.
|
||||
* the rest replicas can be placed randomly,
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testRereplicate2() throws Exception {
|
||||
setupDataNodeCapacity();
|
||||
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
|
||||
chosenNodes.add(dataNodes[0]);
|
||||
chosenNodes.add(dataNodes[1]);
|
||||
|
||||
DatanodeDescriptor[] targets;
|
||||
targets = replicator.chooseTarget(filename,
|
||||
0, dataNodes[0], chosenNodes, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 0);
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
1, dataNodes[0], chosenNodes, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 1);
|
||||
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
2, dataNodes[0], chosenNodes, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 2);
|
||||
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]) &&
|
||||
cluster.isOnSameRack(dataNodes[0], targets[1]));
|
||||
}
|
||||
|
||||
/**
|
||||
* This testcase tests re-replication,
|
||||
* when dataNodes[0] and dataNodes[3] are already chosen.
|
||||
* So the 1st replica should be placed on the rack that the writer resides.
|
||||
* the rest replicas can be placed randomly,
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testRereplicate3() throws Exception {
|
||||
setupDataNodeCapacity();
|
||||
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
|
||||
chosenNodes.add(dataNodes[0]);
|
||||
chosenNodes.add(dataNodes[3]);
|
||||
|
||||
DatanodeDescriptor[] targets;
|
||||
targets = replicator.chooseTarget(filename,
|
||||
0, dataNodes[0], chosenNodes, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 0);
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
1, dataNodes[0], chosenNodes, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 1);
|
||||
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
|
||||
assertFalse(cluster.isOnSameRack(dataNodes[3], targets[0]));
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
1, dataNodes[3], chosenNodes, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 1);
|
||||
assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
|
||||
assertFalse(cluster.isOnSameNodeGroup(dataNodes[3], targets[0]));
|
||||
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
|
||||
|
||||
targets = replicator.chooseTarget(filename,
|
||||
2, dataNodes[0], chosenNodes, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 2);
|
||||
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
|
||||
assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
|
||||
targets = replicator.chooseTarget(filename,
|
||||
2, dataNodes[3], chosenNodes, BLOCK_SIZE);
|
||||
assertEquals(targets.length, 2);
|
||||
assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for the chooseReplicaToDelete are processed based on
|
||||
* block locality and free space
|
||||
*/
|
||||
@Test
|
||||
public void testChooseReplicaToDelete() throws Exception {
|
||||
List<DatanodeDescriptor> replicaNodeList =
|
||||
new ArrayList<DatanodeDescriptor>();
|
||||
final Map<String, List<DatanodeDescriptor>> rackMap =
|
||||
new HashMap<String, List<DatanodeDescriptor>>();
|
||||
dataNodes[0].setRemaining(4*1024*1024);
|
||||
replicaNodeList.add(dataNodes[0]);
|
||||
|
||||
dataNodes[1].setRemaining(3*1024*1024);
|
||||
replicaNodeList.add(dataNodes[1]);
|
||||
|
||||
dataNodes[2].setRemaining(2*1024*1024);
|
||||
replicaNodeList.add(dataNodes[2]);
|
||||
|
||||
dataNodes[5].setRemaining(1*1024*1024);
|
||||
replicaNodeList.add(dataNodes[5]);
|
||||
|
||||
List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
|
||||
List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
|
||||
replicator.splitNodesWithRack(
|
||||
replicaNodeList, rackMap, first, second);
|
||||
assertEquals(3, first.size());
|
||||
assertEquals(1, second.size());
|
||||
DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(
|
||||
null, null, (short)3, first, second);
|
||||
// Within first set {dataNodes[0], dataNodes[1], dataNodes[2]},
|
||||
// dataNodes[0] and dataNodes[1] are in the same nodegroup,
|
||||
// but dataNodes[1] is chosen as less free space
|
||||
assertEquals(chosenNode, dataNodes[1]);
|
||||
|
||||
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
|
||||
assertEquals(2, first.size());
|
||||
assertEquals(1, second.size());
|
||||
// Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen
|
||||
// as less free space
|
||||
chosenNode = replicator.chooseReplicaToDelete(
|
||||
null, null, (short)2, first, second);
|
||||
assertEquals(chosenNode, dataNodes[2]);
|
||||
|
||||
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
|
||||
assertEquals(0, first.size());
|
||||
assertEquals(2, second.size());
|
||||
// Within second set, dataNodes[5] with less free space
|
||||
chosenNode = replicator.chooseReplicaToDelete(
|
||||
null, null, (short)1, first, second);
|
||||
assertEquals(chosenNode, dataNodes[5]);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user