diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d2732492bd..d5993698de 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -89,6 +89,9 @@ Trunk (unreleased changes) MAPREDUCE-3664. Federation Documentation has incorrect configuration example. (Brandon Li via jitendra) + MAPREDUCE-1740. NPE in getMatchingLevelForNodes when node locations are + variable depth (ahmed via tucu) [IMPORTANT: this is dead code in trunk] + Release 0.23.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java index 1bac8de3fe..9f92707077 100644 --- a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java +++ b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java @@ -1596,16 +1596,37 @@ public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts, // returns the (cache)level at which the nodes matches private int getMatchingLevelForNodes(Node n1, Node n2) { + return getMatchingLevelForNodes(n1, n2, this.maxLevel); + } + + static int getMatchingLevelForNodes(Node n1, Node n2, int maxLevel) { int count = 0; + + // In the case that the two nodes are at different levels in the + // node heirarchy, walk upwards on the deeper one until the + // levels are equal. Each of these counts as "distance" since it + // assumedly is going through another rack. + int level1=n1.getLevel(), level2=n2.getLevel(); + while(n1!=null && level1>level2) { + n1 = n1.getParent(); + level1--; + count++; + } + while(n2!=null && level2>level1) { + n2 = n2.getParent(); + level2--; + count++; + } + do { - if (n1.equals(n2)) { - return count; + if (n1.equals(n2) || count >= maxLevel) { + return Math.min(count, maxLevel); } ++count; n1 = n1.getParent(); n2 = n2.getParent(); } while (n1 != null); - return this.maxLevel; + return maxLevel; } /** diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java index fc74528a01..ea100aab08 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java +++ b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java @@ -29,10 +29,9 @@ import java.util.Map; import java.util.Set; -import junit.extensions.TestSetup; -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; +import static org.junit.Assert.*; +import org.junit.Test; +import org.junit.BeforeClass; import static org.mockito.Mockito.*; import org.apache.commons.logging.Log; @@ -47,11 +46,13 @@ import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.StaticMapping; @SuppressWarnings("deprecation") -public class TestJobInProgress extends TestCase { +public class TestJobInProgress { static final Log LOG = LogFactory.getLog(TestJobInProgress.class); static FakeJobTracker jobTracker; @@ -75,25 +76,21 @@ public class TestJobInProgress extends TestCase { static int numUniqueHosts = hosts.length; static int clusterSize = trackers.length; - public static Test suite() { - TestSetup setup = new TestSetup(new TestSuite(TestJobInProgress.class)) { - protected void setUp() throws Exception { - JobConf conf = new JobConf(); - conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0"); - conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0"); - conf.setClass("topology.node.switch.mapping.impl", - StaticMapping.class, DNSToSwitchMapping.class); - jobTracker = new FakeJobTracker(conf, new FakeClock(), trackers); - // Set up the Topology Information - for (int i = 0; i < hosts.length; i++) { - StaticMapping.addNodeToRack(hosts[i], racks[i]); - } - for (String s: trackers) { - FakeObjectUtilities.establishFirstContact(jobTracker, s); - } - } - }; - return setup; + @BeforeClass + public static void setup() throws Exception { + JobConf conf = new JobConf(); + conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0"); + conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0"); + conf.setClass("topology.node.switch.mapping.impl", + StaticMapping.class, DNSToSwitchMapping.class); + jobTracker = new FakeJobTracker(conf, new FakeClock(), trackers); + // Set up the Topology Information + for (int i = 0; i < hosts.length; i++) { + StaticMapping.addNodeToRack(hosts[i], racks[i]); + } + for (String s: trackers) { + FakeObjectUtilities.establishFirstContact(jobTracker, s); + } } static class MyFakeJobInProgress extends FakeJobInProgress { @@ -157,6 +154,7 @@ public TaskAttemptID findAndRunNewTask(boolean isMap, } } + //@Test public void testPendingMapTaskCount() throws Exception { int numMaps = 4; @@ -259,6 +257,7 @@ static void testRunningTaskCount(boolean speculation) throws Exception { } + //@Test public void testRunningTaskCount() throws Exception { // test with spec = false testRunningTaskCount(false); @@ -287,6 +286,7 @@ static void checkTaskCounts(JobInProgress jip, int runningMaps, assertEquals(pendingReduces, jip.pendingReduces()); } + //@Test public void testJobSummary() throws Exception { int numMaps = 2; int numReds = 2; @@ -341,4 +341,35 @@ public void testJobSummary() throws Exception { assertEquals("firstReduceTaskLaunchTime", 3, jspy.getFirstTaskLaunchTimes().get(TaskType.REDUCE).longValue()); } + + @Test + public void testLocality() throws Exception { + NetworkTopology nt = new NetworkTopology(); + + Node r1n1 = new NodeBase("/default/rack1/node1"); + nt.add(r1n1); + Node r1n2 = new NodeBase("/default/rack1/node2"); + nt.add(r1n2); + + Node r2n3 = new NodeBase("/default/rack2/node3"); + nt.add(r2n3); + + Node r2n4 = new NodeBase("/default/rack2/s1/node4"); + nt.add(r2n4); + + LOG.debug("r1n1 parent: " + r1n1.getParent() + "\n" + + "r1n2 parent: " + r1n2.getParent() + "\n" + + "r2n3 parent: " + r2n3.getParent() + "\n" + + "r2n4 parent: " + r2n4.getParent()); + + // Same host + assertEquals(0, JobInProgress.getMatchingLevelForNodes(r1n1, r1n1, 3)); + // Same rack + assertEquals(1, JobInProgress.getMatchingLevelForNodes(r1n1, r1n2, 3)); + // Different rack + assertEquals(2, JobInProgress.getMatchingLevelForNodes(r1n1, r2n3, 3)); + // Different rack at different depth + assertEquals(3, JobInProgress.getMatchingLevelForNodes(r1n1, r2n4, 3)); + } + }