MAPREDUCE-1740. NPE in getMatchingLevelForNodes when node locations are variable depth (ahmed via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1303076 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
40a8293d36
commit
a24d12bdec
@ -89,6 +89,9 @@ Trunk (unreleased changes)
|
|||||||
MAPREDUCE-3664. Federation Documentation has incorrect configuration example.
|
MAPREDUCE-3664. Federation Documentation has incorrect configuration example.
|
||||||
(Brandon Li via jitendra)
|
(Brandon Li via jitendra)
|
||||||
|
|
||||||
|
MAPREDUCE-1740. NPE in getMatchingLevelForNodes when node locations are
|
||||||
|
variable depth (ahmed via tucu) [IMPORTANT: this is dead code in trunk]
|
||||||
|
|
||||||
Release 0.23.3 - UNRELEASED
|
Release 0.23.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -1596,16 +1596,37 @@ public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts,
|
|||||||
|
|
||||||
// returns the (cache)level at which the nodes matches
|
// returns the (cache)level at which the nodes matches
|
||||||
private int getMatchingLevelForNodes(Node n1, Node n2) {
|
private int getMatchingLevelForNodes(Node n1, Node n2) {
|
||||||
|
return getMatchingLevelForNodes(n1, n2, this.maxLevel);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int getMatchingLevelForNodes(Node n1, Node n2, int maxLevel) {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
|
||||||
|
// In the case that the two nodes are at different levels in the
|
||||||
|
// node heirarchy, walk upwards on the deeper one until the
|
||||||
|
// levels are equal. Each of these counts as "distance" since it
|
||||||
|
// assumedly is going through another rack.
|
||||||
|
int level1=n1.getLevel(), level2=n2.getLevel();
|
||||||
|
while(n1!=null && level1>level2) {
|
||||||
|
n1 = n1.getParent();
|
||||||
|
level1--;
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
while(n2!=null && level2>level1) {
|
||||||
|
n2 = n2.getParent();
|
||||||
|
level2--;
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
|
||||||
do {
|
do {
|
||||||
if (n1.equals(n2)) {
|
if (n1.equals(n2) || count >= maxLevel) {
|
||||||
return count;
|
return Math.min(count, maxLevel);
|
||||||
}
|
}
|
||||||
++count;
|
++count;
|
||||||
n1 = n1.getParent();
|
n1 = n1.getParent();
|
||||||
n2 = n2.getParent();
|
n2 = n2.getParent();
|
||||||
} while (n1 != null);
|
} while (n1 != null);
|
||||||
return this.maxLevel;
|
return maxLevel;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -29,10 +29,9 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import junit.extensions.TestSetup;
|
import static org.junit.Assert.*;
|
||||||
import junit.framework.Test;
|
import org.junit.Test;
|
||||||
import junit.framework.TestCase;
|
import org.junit.BeforeClass;
|
||||||
import junit.framework.TestSuite;
|
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
@ -47,11 +46,13 @@
|
|||||||
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
||||||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
||||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||||
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
|
import org.apache.hadoop.net.NodeBase;
|
||||||
import org.apache.hadoop.net.StaticMapping;
|
import org.apache.hadoop.net.StaticMapping;
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
public class TestJobInProgress extends TestCase {
|
public class TestJobInProgress {
|
||||||
static final Log LOG = LogFactory.getLog(TestJobInProgress.class);
|
static final Log LOG = LogFactory.getLog(TestJobInProgress.class);
|
||||||
|
|
||||||
static FakeJobTracker jobTracker;
|
static FakeJobTracker jobTracker;
|
||||||
@ -75,25 +76,21 @@ public class TestJobInProgress extends TestCase {
|
|||||||
static int numUniqueHosts = hosts.length;
|
static int numUniqueHosts = hosts.length;
|
||||||
static int clusterSize = trackers.length;
|
static int clusterSize = trackers.length;
|
||||||
|
|
||||||
public static Test suite() {
|
@BeforeClass
|
||||||
TestSetup setup = new TestSetup(new TestSuite(TestJobInProgress.class)) {
|
public static void setup() throws Exception {
|
||||||
protected void setUp() throws Exception {
|
JobConf conf = new JobConf();
|
||||||
JobConf conf = new JobConf();
|
conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
|
||||||
conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
|
conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
|
||||||
conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
|
conf.setClass("topology.node.switch.mapping.impl",
|
||||||
conf.setClass("topology.node.switch.mapping.impl",
|
StaticMapping.class, DNSToSwitchMapping.class);
|
||||||
StaticMapping.class, DNSToSwitchMapping.class);
|
jobTracker = new FakeJobTracker(conf, new FakeClock(), trackers);
|
||||||
jobTracker = new FakeJobTracker(conf, new FakeClock(), trackers);
|
// Set up the Topology Information
|
||||||
// Set up the Topology Information
|
for (int i = 0; i < hosts.length; i++) {
|
||||||
for (int i = 0; i < hosts.length; i++) {
|
StaticMapping.addNodeToRack(hosts[i], racks[i]);
|
||||||
StaticMapping.addNodeToRack(hosts[i], racks[i]);
|
}
|
||||||
}
|
for (String s: trackers) {
|
||||||
for (String s: trackers) {
|
FakeObjectUtilities.establishFirstContact(jobTracker, s);
|
||||||
FakeObjectUtilities.establishFirstContact(jobTracker, s);
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
return setup;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static class MyFakeJobInProgress extends FakeJobInProgress {
|
static class MyFakeJobInProgress extends FakeJobInProgress {
|
||||||
@ -157,6 +154,7 @@ public TaskAttemptID findAndRunNewTask(boolean isMap,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//@Test
|
||||||
public void testPendingMapTaskCount() throws Exception {
|
public void testPendingMapTaskCount() throws Exception {
|
||||||
|
|
||||||
int numMaps = 4;
|
int numMaps = 4;
|
||||||
@ -259,6 +257,7 @@ static void testRunningTaskCount(boolean speculation) throws Exception {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//@Test
|
||||||
public void testRunningTaskCount() throws Exception {
|
public void testRunningTaskCount() throws Exception {
|
||||||
// test with spec = false
|
// test with spec = false
|
||||||
testRunningTaskCount(false);
|
testRunningTaskCount(false);
|
||||||
@ -287,6 +286,7 @@ static void checkTaskCounts(JobInProgress jip, int runningMaps,
|
|||||||
assertEquals(pendingReduces, jip.pendingReduces());
|
assertEquals(pendingReduces, jip.pendingReduces());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//@Test
|
||||||
public void testJobSummary() throws Exception {
|
public void testJobSummary() throws Exception {
|
||||||
int numMaps = 2;
|
int numMaps = 2;
|
||||||
int numReds = 2;
|
int numReds = 2;
|
||||||
@ -341,4 +341,35 @@ public void testJobSummary() throws Exception {
|
|||||||
assertEquals("firstReduceTaskLaunchTime", 3,
|
assertEquals("firstReduceTaskLaunchTime", 3,
|
||||||
jspy.getFirstTaskLaunchTimes().get(TaskType.REDUCE).longValue());
|
jspy.getFirstTaskLaunchTimes().get(TaskType.REDUCE).longValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLocality() throws Exception {
|
||||||
|
NetworkTopology nt = new NetworkTopology();
|
||||||
|
|
||||||
|
Node r1n1 = new NodeBase("/default/rack1/node1");
|
||||||
|
nt.add(r1n1);
|
||||||
|
Node r1n2 = new NodeBase("/default/rack1/node2");
|
||||||
|
nt.add(r1n2);
|
||||||
|
|
||||||
|
Node r2n3 = new NodeBase("/default/rack2/node3");
|
||||||
|
nt.add(r2n3);
|
||||||
|
|
||||||
|
Node r2n4 = new NodeBase("/default/rack2/s1/node4");
|
||||||
|
nt.add(r2n4);
|
||||||
|
|
||||||
|
LOG.debug("r1n1 parent: " + r1n1.getParent() + "\n" +
|
||||||
|
"r1n2 parent: " + r1n2.getParent() + "\n" +
|
||||||
|
"r2n3 parent: " + r2n3.getParent() + "\n" +
|
||||||
|
"r2n4 parent: " + r2n4.getParent());
|
||||||
|
|
||||||
|
// Same host
|
||||||
|
assertEquals(0, JobInProgress.getMatchingLevelForNodes(r1n1, r1n1, 3));
|
||||||
|
// Same rack
|
||||||
|
assertEquals(1, JobInProgress.getMatchingLevelForNodes(r1n1, r1n2, 3));
|
||||||
|
// Different rack
|
||||||
|
assertEquals(2, JobInProgress.getMatchingLevelForNodes(r1n1, r2n3, 3));
|
||||||
|
// Different rack at different depth
|
||||||
|
assertEquals(3, JobInProgress.getMatchingLevelForNodes(r1n1, r2n4, 3));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user