diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
index 892ba07359..b3627ea264 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
@@ -170,8 +170,7 @@ boolean add(Node n) {
}
if (parentNode == null) {
// create a new InnerNode
- parentNode = new InnerNode(parentName, getPath(this),
- this, this.getLevel()+1);
+ parentNode = createParentNode(parentName);
children.add(parentNode);
}
// add n to the subtree of the next ancestor node
@@ -288,7 +287,7 @@ Node getLeaf(int leafIndex, Node excludedNode) {
// calculate the total number of excluded leaf nodes
int numOfExcludedLeaves =
isLeaf ? 1 : ((InnerNode)excludedNode).getNumOfLeaves();
- if (isRack()) { // children are leaves
+ if (isLeafParent()) { // children are leaves
if (isLeaf) { // excluded node is a leaf node
int excludedIndex = children.indexOf(excludedNode);
if (excludedIndex != -1 && leafIndex >= 0) {
@@ -326,6 +325,10 @@ Node getLeaf(int leafIndex, Node excludedNode) {
}
}
+ protected boolean isLeafParent() {
+ return isRack();
+ }
+
/**
* Determine if children a leaves, default implementation calls {@link #isRack()}
*
To be overridden in subclasses for specific InnerNode implementations,
@@ -752,6 +755,30 @@ public String toString() {
}
return tree.toString();
}
+
+ /**
+ * Divide networklocation string into two parts by last separator, and get
+ * the first part here.
+ *
+ * @param networkLocation
+ * @return
+ */
+ public static String getFirstHalf(String networkLocation) {
+ int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
+ return networkLocation.substring(0, index);
+ }
+
+ /**
+ * Divide networklocation string into two parts by last separator, and get
+ * the second part here.
+ *
+ * @param networkLocation
+ * @return
+ */
+ public static String getLastHalf(String networkLocation) {
+ int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
+ return networkLocation.substring(index);
+ }
/** swap two array items */
static protected void swap(Node[] nodes, int i, int j) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
index 6066cd2a61..1bc21450e6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
@@ -49,7 +49,7 @@ protected Node getNodeForNetworkLocation(Node node) {
}
Node nodeGroup = getNode(node.getNetworkLocation());
if (nodeGroup == null) {
- nodeGroup = new InnerNode(node.getNetworkLocation());
+ nodeGroup = new InnerNodeWithNodeGroup(node.getNetworkLocation());
}
return getNode(nodeGroup.getNetworkLocation());
}
@@ -383,6 +383,11 @@ boolean isNodeGroup() {
}
return true;
}
+
+ @Override
+ protected boolean isLeafParent() {
+ return isNodeGroup();
+ }
@Override
protected InnerNode createParentNode(String parentName) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 5f43596d3d..10752512d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -13,6 +13,9 @@ Trunk (unreleased changes)
HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
+ HDFS-3601. Add BlockPlacementPolicyWithNodeGroup to support block placement
+ with 4-layer network topology. (Junping Du via szetszwo)
+
IMPROVEMENTS
HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index 350f863eae..173198d213 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -148,6 +148,7 @@ DatanodeDescriptor[] chooseTarget(int numOfReplicas,
new ArrayList(chosenNodes);
for (Node node:chosenNodes) {
excludedNodes.put(node, node);
+ adjustExcludedNodes(excludedNodes, node);
}
if (!clusterMap.contains(writer)) {
@@ -359,10 +360,11 @@ protected DatanodeDescriptor chooseRandom(
(DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
Node oldNode = excludedNodes.put(chosenNode, chosenNode);
- if (oldNode == null) { // choosendNode was not in the excluded list
+ if (oldNode == null) { // chosenNode was not in the excluded list
numOfAvailableNodes--;
if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
results.add(chosenNode);
+ adjustExcludedNodes(excludedNodes, chosenNode);
return chosenNode;
} else {
badTarget = true;
@@ -409,6 +411,7 @@ protected void chooseRandom(int numOfReplicas,
if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
numOfReplicas--;
results.add(chosenNode);
+ adjustExcludedNodes(excludedNodes, chosenNode);
} else {
badTarget = true;
}
@@ -426,7 +429,21 @@ protected void chooseRandom(int numOfReplicas,
throw new NotEnoughReplicasException(detail);
}
}
-
+
+ /**
+ * After choosing a node to place replica, adjust excluded nodes accordingly.
+ * It should do nothing here as chosenNode is already put into exlcudeNodes,
+ * but it can be overridden in subclass to put more related nodes into
+ * excludedNodes.
+ *
+ * @param excludedNodes
+ * @param chosenNode
+ */
+ protected void adjustExcludedNodes(HashMap excludedNodes,
+ Node chosenNode) {
+ // do nothing here.
+ }
+
/* judge if a node is a good target.
* return true if node has enough space,
* does not have too much load, and the rack does not have too many nodes
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
new file mode 100644
index 0000000000..00c6af607a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+
+/** The class is responsible for choosing the desired number of targets
+ * for placing block replicas on environment with node-group layer.
+ * The replica placement strategy is adjusted to:
+ * If the writer is on a datanode, the 1st replica is placed on the local
+ * node (or local node-group), otherwise a random datanode.
+ * The 2nd replica is placed on a datanode that is on a different rack with 1st
+ * replica node.
+ * The 3rd replica is placed on a datanode which is on a different node-group
+ * but the same rack as the second replica node.
+ */
+public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault {
+
+ BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats,
+ NetworkTopology clusterMap) {
+ initialize(conf, stats, clusterMap);
+ }
+
+ BlockPlacementPolicyWithNodeGroup() {
+ }
+
+ public void initialize(Configuration conf, FSClusterStats stats,
+ NetworkTopology clusterMap) {
+ super.initialize(conf, stats, clusterMap);
+ }
+
+ /** choose local node of localMachine as the target.
+ * if localMachine is not available, choose a node on the same nodegroup or
+ * rack instead.
+ * @return the chosen node
+ */
+ @Override
+ protected DatanodeDescriptor chooseLocalNode(
+ DatanodeDescriptor localMachine,
+ HashMap excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List results)
+ throws NotEnoughReplicasException {
+ // if no local machine, randomly choose one node
+ if (localMachine == null)
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+
+ // otherwise try local machine first
+ Node oldNode = excludedNodes.put(localMachine, localMachine);
+ if (oldNode == null) { // was not in the excluded list
+ if (isGoodTarget(localMachine, blocksize,
+ maxNodesPerRack, false, results)) {
+ results.add(localMachine);
+ // Nodes under same nodegroup should be excluded.
+ addNodeGroupToExcludedNodes(excludedNodes,
+ localMachine.getNetworkLocation());
+ return localMachine;
+ }
+ }
+
+ // try a node on local node group
+ DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
+ (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ if (chosenNode != null) {
+ return chosenNode;
+ }
+ // try a node on local rack
+ return chooseLocalRack(localMachine, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void adjustExcludedNodes(HashMap excludedNodes,
+ Node chosenNode) {
+ // as node-group aware implementation, it should make sure no two replica
+ // are placing on the same node group.
+ addNodeGroupToExcludedNodes(excludedNodes, chosenNode.getNetworkLocation());
+ }
+
+ // add all nodes under specific nodegroup to excludedNodes.
+ private void addNodeGroupToExcludedNodes(HashMap excludedNodes,
+ String nodeGroup) {
+ List leafNodes = clusterMap.getLeaves(nodeGroup);
+ for (Node node : leafNodes) {
+ excludedNodes.put(node, node);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected DatanodeDescriptor chooseLocalRack(
+ DatanodeDescriptor localMachine,
+ HashMap excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List results)
+ throws NotEnoughReplicasException {
+ // no local machine, so choose a random machine
+ if (localMachine == null) {
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ }
+
+ // choose one from the local rack, but off-nodegroup
+ try {
+ return chooseRandom(NetworkTopology.getFirstHalf(
+ localMachine.getNetworkLocation()),
+ excludedNodes, blocksize,
+ maxNodesPerRack, results);
+ } catch (NotEnoughReplicasException e1) {
+ // find the second replica
+ DatanodeDescriptor newLocal=null;
+ for(Iterator iter=results.iterator();
+ iter.hasNext();) {
+ DatanodeDescriptor nextNode = iter.next();
+ if (nextNode != localMachine) {
+ newLocal = nextNode;
+ break;
+ }
+ }
+ if (newLocal != null) {
+ try {
+ return chooseRandom(clusterMap.getRack(newLocal.getNetworkLocation()),
+ excludedNodes, blocksize, maxNodesPerRack, results);
+ } catch(NotEnoughReplicasException e2) {
+ //otherwise randomly choose one from the network
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ }
+ } else {
+ //otherwise randomly choose one from the network
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void chooseRemoteRack(int numOfReplicas,
+ DatanodeDescriptor localMachine,
+ HashMap excludedNodes,
+ long blocksize,
+ int maxReplicasPerRack,
+ List results)
+ throws NotEnoughReplicasException {
+ int oldNumOfReplicas = results.size();
+ // randomly choose one node from remote racks
+ try {
+ chooseRandom(numOfReplicas, "~"+NetworkTopology.getFirstHalf(
+ localMachine.getNetworkLocation()),
+ excludedNodes, blocksize, maxReplicasPerRack, results);
+ } catch (NotEnoughReplicasException e) {
+ chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
+ localMachine.getNetworkLocation(), excludedNodes, blocksize,
+ maxReplicasPerRack, results);
+ }
+ }
+
+ /* choose one node from the nodegroup that localMachine is on.
+ * if no such node is available, choose one node from the nodegroup where
+ * a second replica is on.
+ * if still no such node is available, choose a random node in the cluster.
+ * @return the chosen node
+ */
+ private DatanodeDescriptor chooseLocalNodeGroup(NetworkTopologyWithNodeGroup clusterMap,
+ DatanodeDescriptor localMachine, HashMap excludedNodes, long blocksize,
+ int maxNodesPerRack, List results) throws NotEnoughReplicasException {
+ // no local machine, so choose a random machine
+ if (localMachine == null) {
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ }
+
+ // choose one from the local node group
+ try {
+ return chooseRandom(clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
+ excludedNodes, blocksize, maxNodesPerRack, results);
+ } catch (NotEnoughReplicasException e1) {
+ // find the second replica
+ DatanodeDescriptor newLocal=null;
+ for(Iterator iter=results.iterator();
+ iter.hasNext();) {
+ DatanodeDescriptor nextNode = iter.next();
+ if (nextNode != localMachine) {
+ newLocal = nextNode;
+ break;
+ }
+ }
+ if (newLocal != null) {
+ try {
+ return chooseRandom(clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
+ excludedNodes, blocksize, maxNodesPerRack, results);
+ } catch(NotEnoughReplicasException e2) {
+ //otherwise randomly choose one from the network
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ }
+ } else {
+ //otherwise randomly choose one from the network
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ }
+ }
+ }
+
+ @Override
+ protected String getRack(final DatanodeInfo cur) {
+ String nodeGroupString = cur.getNetworkLocation();
+ return NetworkTopology.getFirstHalf(nodeGroupString);
+ }
+
+ /**
+ * Pick up replica node set for deleting replica as over-replicated.
+ * First set contains replica nodes on rack with more than one
+ * replica while second set contains remaining replica nodes.
+ * If first is not empty, divide first set into two subsets:
+ * moreThanOne contains nodes on nodegroup with more than one replica
+ * exactlyOne contains the remaining nodes in first set
+ * then pickup priSet if not empty.
+ * If first is empty, then pick second.
+ */
+ @Override
+ public Iterator pickupReplicaSet(
+ Collection first,
+ Collection second) {
+ // If no replica within same rack, return directly.
+ if (first.isEmpty()) {
+ return second.iterator();
+ }
+ // Split data nodes in the first set into two sets,
+ // moreThanOne contains nodes on nodegroup with more than one replica
+ // exactlyOne contains the remaining nodes
+ Map> nodeGroupMap =
+ new HashMap>();
+
+ for(DatanodeDescriptor node : first) {
+ final String nodeGroupName =
+ NetworkTopology.getLastHalf(node.getNetworkLocation());
+ List datanodeList =
+ nodeGroupMap.get(nodeGroupName);
+ if (datanodeList == null) {
+ datanodeList = new ArrayList();
+ nodeGroupMap.put(nodeGroupName, datanodeList);
+ }
+ datanodeList.add(node);
+ }
+
+ final List moreThanOne = new ArrayList();
+ final List exactlyOne = new ArrayList();
+ // split nodes into two sets
+ for(List datanodeList : nodeGroupMap.values()) {
+ if (datanodeList.size() == 1 ) {
+ // exactlyOne contains nodes on nodegroup with exactly one replica
+ exactlyOne.add(datanodeList.get(0));
+ } else {
+ // moreThanOne contains nodes on nodegroup with more than one replica
+ moreThanOne.addAll(datanodeList);
+ }
+ }
+
+ Iterator iter =
+ moreThanOne.isEmpty() ? exactlyOne.iterator() : moreThanOne.iterator();
+ return iter;
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
new file mode 100644
index 0000000000..5af87adb5c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.junit.Test;
+
+public class TestReplicationPolicyWithNodeGroup extends TestCase {
+ private static final int BLOCK_SIZE = 1024;
+ private static final int NUM_OF_DATANODES = 8;
+ private static final Configuration CONF = new HdfsConfiguration();
+ private static final NetworkTopology cluster;
+ private static final NameNode namenode;
+ private static final BlockPlacementPolicy replicator;
+ private static final String filename = "/dummyfile.txt";
+
+ private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
+ DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
+ DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
+ DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n2"),
+ DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2/n3"),
+ DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
+ DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n4"),
+ DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/n5"),
+ DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/n6")
+ };
+
+ private final static DatanodeDescriptor NODE =
+ new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7"));
+
+ static {
+ try {
+ FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
+ CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+ // Set properties to make HDFS aware of NodeGroup.
+ CONF.set("dfs.block.replicator.classname",
+ "org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup");
+ CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
+ "org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
+ DFSTestUtil.formatNameNode(CONF);
+ namenode = new NameNode(CONF);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw (RuntimeException)new RuntimeException().initCause(e);
+ }
+ final BlockManager bm = namenode.getNamesystem().getBlockManager();
+ replicator = bm.getBlockPlacementPolicy();
+ cluster = bm.getDatanodeManager().getNetworkTopology();
+ // construct network topology
+ for(int i=0; inumOfReplicas is 2,
+ * the 1st is on dataNodes[0] and the 2nd is on a different rack.
+ * @throws Exception
+ */
+ public void testChooseTarget1() throws Exception {
+ dataNodes[0].updateHeartbeat(
+ 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
+
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(filename,
+ 0, dataNodes[0], BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[0], BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertEquals(targets[0], dataNodes[0]);
+
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[0], BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertEquals(targets[0], dataNodes[0]);
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(filename,
+ 3, dataNodes[0], BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertEquals(targets[0], dataNodes[0]);
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+ assertFalse(cluster.isOnSameNodeGroup(targets[1], targets[2]));
+
+ targets = replicator.chooseTarget(filename,
+ 4, dataNodes[0], BLOCK_SIZE);
+ assertEquals(targets.length, 4);
+ assertEquals(targets[0], dataNodes[0]);
+ assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
+ cluster.isOnSameRack(targets[2], targets[3]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ // Make sure no more than one replicas are on the same nodegroup
+ verifyNoTwoTargetsOnSameNodeGroup(targets);
+
+ dataNodes[0].updateHeartbeat(
+ 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+ }
+
+ private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeDescriptor[] targets) {
+ Set nodeGroupSet = new HashSet();
+ for (DatanodeDescriptor target: targets) {
+ nodeGroupSet.add(target.getNetworkLocation());
+ }
+ assertEquals(nodeGroupSet.size(), targets.length);
+ }
+
+ /**
+ * In this testcase, client is dataNodes[0], but the dataNodes[1] is
+ * not allowed to be chosen. So the 1st replica should be
+ * placed on dataNodes[0], the 2nd replica should be placed on a different
+ * rack, the 3rd should be on same rack as the 2nd replica but in different
+ * node group, and the rest should be placed on a third rack.
+ * @throws Exception
+ */
+ public void testChooseTarget2() throws Exception {
+ HashMap excludedNodes;
+ DatanodeDescriptor[] targets;
+ BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
+ List chosenNodes = new ArrayList();
+
+ excludedNodes = new HashMap();
+ excludedNodes.put(dataNodes[1], dataNodes[1]);
+ targets = repl.chooseTarget(4, dataNodes[0], chosenNodes, false,
+ excludedNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 4);
+ assertEquals(targets[0], dataNodes[0]);
+ assertTrue(cluster.isNodeGroupAware());
+ // Make sure no replicas are on the same nodegroup
+ for (int i=1;i<4;i++) {
+ assertFalse(cluster.isOnSameNodeGroup(targets[0], targets[i]));
+ }
+ assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
+ cluster.isOnSameRack(targets[2], targets[3]));
+ assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+
+ excludedNodes.clear();
+ chosenNodes.clear();
+ excludedNodes.put(dataNodes[1], dataNodes[1]);
+ chosenNodes.add(dataNodes[2]);
+ targets = repl.chooseTarget(1, dataNodes[0], chosenNodes, true,
+ excludedNodes, BLOCK_SIZE);
+ System.out.println("targets=" + Arrays.asList(targets));
+ assertEquals(2, targets.length);
+ //make sure that the chosen node is in the target.
+ int i = 0;
+ for(; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
+ assertTrue(i < targets.length);
+ }
+
+ /**
+ * In this testcase, client is dataNodes[0], but dataNodes[0] is not qualified
+ * to be chosen. So the 1st replica should be placed on dataNodes[1],
+ * the 2nd replica should be placed on a different rack,
+ * the 3rd replica should be placed on the same rack as the 2nd replica but in different nodegroup,
+ * and the rest should be placed on the third rack.
+ * @throws Exception
+ */
+ public void testChooseTarget3() throws Exception {
+ // make data node 0 to be not qualified to choose
+ dataNodes[0].updateHeartbeat(
+ 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
+
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(filename,
+ 0, dataNodes[0], BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[0], BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertEquals(targets[0], dataNodes[1]);
+
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[0], BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertEquals(targets[0], dataNodes[1]);
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(filename,
+ 3, dataNodes[0], BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertEquals(targets[0], dataNodes[1]);
+ assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(filename,
+ 4, dataNodes[0], BLOCK_SIZE);
+ assertEquals(targets.length, 4);
+ assertEquals(targets[0], dataNodes[1]);
+ assertTrue(cluster.isNodeGroupAware());
+ verifyNoTwoTargetsOnSameNodeGroup(targets);
+ assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
+ cluster.isOnSameRack(targets[2], targets[3]));
+
+ dataNodes[0].updateHeartbeat(
+ 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+ }
+
+ /**
+ * In this testcase, client is dataNodes[0], but none of the nodes on rack 1
+ * is qualified to be chosen. So the 1st replica should be placed on either
+ * rack 2 or rack 3.
+ * the 2nd replica should be placed on a different rack,
+ * the 3rd replica should be placed on the same rack as the 1st replica, but
+ * in different node group.
+ * @throws Exception
+ */
+ public void testChooseTarget4() throws Exception {
+ // make data node 0-2 to be not qualified to choose: not enough disk space
+ for(int i=0; i<3; i++) {
+ dataNodes[i].updateHeartbeat(
+ 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
+ }
+
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(filename,
+ 0, dataNodes[0], BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[0], BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[0], BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(filename,
+ 3, dataNodes[0], BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ for(int i=0; i<3; i++) {
+ assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
+ }
+ verifyNoTwoTargetsOnSameNodeGroup(targets);
+ assertTrue(cluster.isOnSameRack(targets[0], targets[1]) ||
+ cluster.isOnSameRack(targets[1], targets[2]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ }
+
+ /**
+ * In this testcase, client is is a node outside of file system.
+ * So the 1st replica can be placed on any node.
+ * the 2nd replica should be placed on a different rack,
+ * the 3rd replica should be placed on the same rack as the 2nd replica,
+ * @throws Exception
+ */
+ public void testChooseTarget5() throws Exception {
+ setupDataNodeCapacity();
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(filename,
+ 0, NODE, BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(filename,
+ 1, NODE, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+
+ targets = replicator.chooseTarget(filename,
+ 2, NODE, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(filename,
+ 3, NODE, BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+ verifyNoTwoTargetsOnSameNodeGroup(targets);
+ }
+
+ /**
+ * This testcase tests re-replication, when dataNodes[0] is already chosen.
+ * So the 1st replica can be placed on random rack.
+ * the 2nd replica should be placed on different node and nodegroup by same rack as
+ * the 1st replica. The 3rd replica can be placed randomly.
+ * @throws Exception
+ */
+ public void testRereplicate1() throws Exception {
+ setupDataNodeCapacity();
+ List chosenNodes = new ArrayList();
+ chosenNodes.add(dataNodes[0]);
+ DatanodeDescriptor[] targets;
+
+ targets = replicator.chooseTarget(filename,
+ 0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(filename,
+ 3, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
+ assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+ }
+
+ /**
+ * This testcase tests re-replication,
+ * when dataNodes[0] and dataNodes[1] are already chosen.
+ * So the 1st replica should be placed on a different rack of rack 1.
+ * the rest replicas can be placed randomly,
+ * @throws Exception
+ */
+ public void testRereplicate2() throws Exception {
+ setupDataNodeCapacity();
+ List chosenNodes = new ArrayList();
+ chosenNodes.add(dataNodes[0]);
+ chosenNodes.add(dataNodes[1]);
+
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(filename,
+ 0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]) &&
+ cluster.isOnSameRack(dataNodes[0], targets[1]));
+ }
+
+ /**
+ * This testcase tests re-replication,
+ * when dataNodes[0] and dataNodes[3] are already chosen.
+ * So the 1st replica should be placed on the rack that the writer resides.
+ * the rest replicas can be placed randomly,
+ * @throws Exception
+ */
+ public void testRereplicate3() throws Exception {
+ setupDataNodeCapacity();
+ List chosenNodes = new ArrayList();
+ chosenNodes.add(dataNodes[0]);
+ chosenNodes.add(dataNodes[3]);
+
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(filename,
+ 0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(cluster.isOnSameRack(dataNodes[3], targets[0]));
+
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[3], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+ assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
+ assertFalse(cluster.isOnSameNodeGroup(dataNodes[3], targets[0]));
+ assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+ assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[3], chosenNodes, BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
+ }
+
+ /**
+ * Test for the chooseReplicaToDelete are processed based on
+ * block locality and free space
+ */
+ @Test
+ public void testChooseReplicaToDelete() throws Exception {
+ List replicaNodeList =
+ new ArrayList();
+ final Map> rackMap =
+ new HashMap>();
+ dataNodes[0].setRemaining(4*1024*1024);
+ replicaNodeList.add(dataNodes[0]);
+
+ dataNodes[1].setRemaining(3*1024*1024);
+ replicaNodeList.add(dataNodes[1]);
+
+ dataNodes[2].setRemaining(2*1024*1024);
+ replicaNodeList.add(dataNodes[2]);
+
+ dataNodes[5].setRemaining(1*1024*1024);
+ replicaNodeList.add(dataNodes[5]);
+
+ List first = new ArrayList();
+ List second = new ArrayList();
+ replicator.splitNodesWithRack(
+ replicaNodeList, rackMap, first, second);
+ assertEquals(3, first.size());
+ assertEquals(1, second.size());
+ DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(
+ null, null, (short)3, first, second);
+ // Within first set {dataNodes[0], dataNodes[1], dataNodes[2]},
+ // dataNodes[0] and dataNodes[1] are in the same nodegroup,
+ // but dataNodes[1] is chosen as less free space
+ assertEquals(chosenNode, dataNodes[1]);
+
+ replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
+ assertEquals(2, first.size());
+ assertEquals(1, second.size());
+ // Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen
+ // as less free space
+ chosenNode = replicator.chooseReplicaToDelete(
+ null, null, (short)2, first, second);
+ assertEquals(chosenNode, dataNodes[2]);
+
+ replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
+ assertEquals(0, first.size());
+ assertEquals(2, second.size());
+ // Within second set, dataNodes[5] with less free space
+ chosenNode = replicator.chooseReplicaToDelete(
+ null, null, (short)1, first, second);
+ assertEquals(chosenNode, dataNodes[5]);
+ }
+
+}