MAPREDUCE-5152. Make MR App to simply pass through the container from RM instead of extracting and populating information itself to start any container. Contributed by Vinod Kumar Vavilapalli.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1469544 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-04-18 20:13:40 +00:00
parent d12c463121
commit cb78a65a15
7 changed files with 121 additions and 81 deletions

View File

@ -198,6 +198,10 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-4985. Add compression option to TestDFSIO usage. MAPREDUCE-4985. Add compression option to TestDFSIO usage.
(Plamen Jeliazkov via shv) (Plamen Jeliazkov via shv)
MAPREDUCE-5152. Make MR App to simply pass through the container from RM
instead of extracting and populating information itself to start any
container. (vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method MAPREDUCE-4974. Optimising the LineRecordReader initialize() method

View File

@ -117,7 +117,6 @@
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -490,14 +489,10 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent> <TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
stateMachine; stateMachine;
private ContainerId containerID; @VisibleForTesting
private NodeId containerNodeId; public Container container;
private String containerMgrAddress;
private String nodeHttpAddress;
private String nodeRackName; private String nodeRackName;
private WrappedJvmID jvmID; private WrappedJvmID jvmID;
private ContainerToken containerToken;
private Resource assignedCapability;
//this takes good amount of memory ~ 30KB. Instantiate it lazily //this takes good amount of memory ~ 30KB. Instantiate it lazily
//and make it null once task is launched. //and make it null once task is launched.
@ -825,7 +820,7 @@ static ContainerLaunchContext createContainerLaunchContext(
public ContainerId getAssignedContainerID() { public ContainerId getAssignedContainerID() {
readLock.lock(); readLock.lock();
try { try {
return containerID; return container == null ? null : container.getId();
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
@ -835,7 +830,8 @@ public ContainerId getAssignedContainerID() {
public String getAssignedContainerMgrAddress() { public String getAssignedContainerMgrAddress() {
readLock.lock(); readLock.lock();
try { try {
return containerMgrAddress; return container == null ? null : StringInterner.weakIntern(container
.getNodeId().toString());
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
@ -895,7 +891,7 @@ public int getShufflePort() {
public NodeId getNodeId() { public NodeId getNodeId() {
readLock.lock(); readLock.lock();
try { try {
return containerNodeId; return container == null ? null : container.getNodeId();
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
@ -907,7 +903,7 @@ public NodeId getNodeId() {
public String getNodeHttpAddress() { public String getNodeHttpAddress() {
readLock.lock(); readLock.lock();
try { try {
return nodeHttpAddress; return container == null ? null : container.getNodeHttpAddress();
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
@ -967,8 +963,8 @@ public TaskAttemptReport getReport() {
result.setContainerId(this.getAssignedContainerID()); result.setContainerId(this.getAssignedContainerID());
result.setNodeManagerHost(trackerName); result.setNodeManagerHost(trackerName);
result.setNodeManagerHttpPort(httpPort); result.setNodeManagerHttpPort(httpPort);
if (this.containerNodeId != null) { if (this.container != null) {
result.setNodeManagerPort(this.containerNodeId.getPort()); result.setNodeManagerPort(this.container.getNodeId().getPort());
} }
return result; return result;
} finally { } finally {
@ -1093,13 +1089,17 @@ public void setAvataar(Avataar avataar) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo, public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
OutputCommitter committer, boolean recoverOutput) { OutputCommitter committer, boolean recoverOutput) {
containerID = taInfo.getContainerId(); ContainerId containerId = taInfo.getContainerId();
containerNodeId = ConverterUtils.toNodeId(taInfo.getHostname() + ":" NodeId containerNodeId = ConverterUtils.toNodeId(taInfo.getHostname() + ":"
+ taInfo.getPort()); + taInfo.getPort());
containerMgrAddress = StringInterner.weakIntern( String nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
containerNodeId.toString());
nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
+ taInfo.getHttpPort()); + taInfo.getHttpPort());
// Resource/Priority/Tokens are only needed while launching the
// container on an NM, these are already completed tasks, so setting them to
// null
container =
BuilderUtils.newContainer(containerId, containerNodeId,
nodeHttpAddress, null, null, null);
computeRackAndLocality(); computeRackAndLocality();
launchTime = taInfo.getStartTime(); launchTime = taInfo.getStartTime();
finishTime = (taInfo.getFinishTime() != -1) ? finishTime = (taInfo.getFinishTime() != -1) ?
@ -1227,6 +1227,7 @@ private void setFinishTime() {
} }
private void computeRackAndLocality() { private void computeRackAndLocality() {
NodeId containerNodeId = container.getNodeId();
nodeRackName = RackResolver.resolve( nodeRackName = RackResolver.resolve(
containerNodeId.getHost()).getNetworkLocation(); containerNodeId.getHost()).getNetworkLocation();
@ -1331,10 +1332,10 @@ private static JobCounterUpdateEvent createJobCounterUpdateEventTAKilled(
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId() TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId()
.getTaskType()), attemptState.toString(), .getTaskType()), attemptState.toString(),
taskAttempt.finishTime, taskAttempt.finishTime,
taskAttempt.containerNodeId == null ? "UNKNOWN" taskAttempt.container == null ? "UNKNOWN"
: taskAttempt.containerNodeId.getHost(), : taskAttempt.container.getNodeId().getHost(),
taskAttempt.containerNodeId == null ? -1 taskAttempt.container == null ? -1
: taskAttempt.containerNodeId.getPort(), : taskAttempt.container.getNodeId().getPort(),
taskAttempt.nodeRackName == null ? "UNKNOWN" taskAttempt.nodeRackName == null ? "UNKNOWN"
: taskAttempt.nodeRackName, : taskAttempt.nodeRackName,
StringUtils.join( StringUtils.join(
@ -1353,12 +1354,12 @@ private void sendLaunchedEvents() {
eventHandler.handle(jce); eventHandler.handle(jce);
LOG.info("TaskAttempt: [" + attemptId LOG.info("TaskAttempt: [" + attemptId
+ "] using containerId: [" + containerID + " on NM: [" + "] using containerId: [" + container.getId() + " on NM: ["
+ containerMgrAddress + "]"); + StringInterner.weakIntern(container.getNodeId().toString()) + "]");
TaskAttemptStartedEvent tase = TaskAttemptStartedEvent tase =
new TaskAttemptStartedEvent(TypeConverter.fromYarn(attemptId), new TaskAttemptStartedEvent(TypeConverter.fromYarn(attemptId),
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
launchTime, trackerName, httpPort, shufflePort, containerID, launchTime, trackerName, httpPort, shufflePort, container.getId(),
locality.toString(), avataar.toString()); locality.toString(), avataar.toString());
eventHandler.handle( eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), tase)); new JobHistoryEvent(attemptId.getTaskId().getJobId(), tase));
@ -1490,19 +1491,14 @@ public void transition(final TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) { TaskAttemptEvent event) {
final TaskAttemptContainerAssignedEvent cEvent = final TaskAttemptContainerAssignedEvent cEvent =
(TaskAttemptContainerAssignedEvent) event; (TaskAttemptContainerAssignedEvent) event;
taskAttempt.containerID = cEvent.getContainer().getId(); Container container = cEvent.getContainer();
taskAttempt.containerNodeId = cEvent.getContainer().getNodeId(); taskAttempt.container = container;
taskAttempt.containerMgrAddress = StringInterner.weakIntern(
taskAttempt.containerNodeId.toString());
taskAttempt.nodeHttpAddress = StringInterner.weakIntern(
cEvent.getContainer().getNodeHttpAddress());
taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
taskAttempt.assignedCapability = cEvent.getContainer().getResource();
// this is a _real_ Task (classic Hadoop mapred flavor): // this is a _real_ Task (classic Hadoop mapred flavor):
taskAttempt.remoteTask = taskAttempt.createRemoteTask(); taskAttempt.remoteTask = taskAttempt.createRemoteTask();
taskAttempt.jvmID = new WrappedJvmID( taskAttempt.jvmID =
taskAttempt.remoteTask.getTaskID().getJobID(), new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(),
taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId()); taskAttempt.remoteTask.isMapTask(), taskAttempt.container.getId()
.getId());
taskAttempt.taskAttemptListener.registerPendingTask( taskAttempt.taskAttemptListener.registerPendingTask(
taskAttempt.remoteTask, taskAttempt.jvmID); taskAttempt.remoteTask, taskAttempt.jvmID);
@ -1514,10 +1510,9 @@ public void transition(final TaskAttemptImpl taskAttempt,
cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken, cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID, taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
taskAttempt.taskAttemptListener, taskAttempt.credentials); taskAttempt.taskAttemptListener, taskAttempt.credentials);
taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent( taskAttempt.eventHandler
taskAttempt.attemptId, taskAttempt.containerID, .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
taskAttempt.containerMgrAddress, taskAttempt.containerToken, launchContext, container, taskAttempt.remoteTask));
launchContext, taskAttempt.assignedCapability, taskAttempt.remoteTask));
// send event to speculator that our container needs are satisfied // send event to speculator that our container needs are satisfied
taskAttempt.eventHandler.handle taskAttempt.eventHandler.handle
@ -1604,9 +1599,8 @@ public void transition(TaskAttemptImpl taskAttempt,
taskAttempt.taskAttemptListener taskAttempt.taskAttemptListener
.registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID); .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
//TODO Resolve to host / IP in case of a local address. //TODO Resolve to host / IP in case of a local address.
InetSocketAddress nodeHttpInetAddr = InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO: NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
// Costly?
taskAttempt.trackerName = nodeHttpInetAddr.getHostName(); taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
taskAttempt.httpPort = nodeHttpInetAddr.getPort(); taskAttempt.httpPort = nodeHttpInetAddr.getPort();
taskAttempt.sendLaunchedEvents(); taskAttempt.sendLaunchedEvents();
@ -1713,6 +1707,10 @@ public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt,
private void logAttemptFinishedEvent(TaskAttemptStateInternal state) { private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
//Log finished events only if an attempt started. //Log finished events only if an attempt started.
if (getLaunchTime() == 0) return; if (getLaunchTime() == 0) return;
String containerHostName = this.container == null ? "UNKNOWN"
: this.container.getNodeId().getHost();
int containerNodePort =
this.container == null ? -1 : this.container.getNodeId().getPort();
if (attemptId.getTaskId().getTaskType() == TaskType.MAP) { if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
MapAttemptFinishedEvent mfe = MapAttemptFinishedEvent mfe =
new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
@ -1720,9 +1718,8 @@ private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
state.toString(), state.toString(),
this.reportedStatus.mapFinishTime, this.reportedStatus.mapFinishTime,
finishTime, finishTime,
this.containerNodeId == null ? "UNKNOWN" containerHostName,
: this.containerNodeId.getHost(), containerNodePort,
this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString, this.reportedStatus.stateString,
getCounters(), getCounters(),
@ -1737,9 +1734,8 @@ private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
this.reportedStatus.shuffleFinishTime, this.reportedStatus.shuffleFinishTime,
this.reportedStatus.sortFinishTime, this.reportedStatus.sortFinishTime,
finishTime, finishTime,
this.containerNodeId == null ? "UNKNOWN" containerHostName,
: this.containerNodeId.getHost(), containerNodePort,
this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString, this.reportedStatus.stateString,
getCounters(), getCounters(),
@ -1864,8 +1860,9 @@ public void transition(TaskAttemptImpl taskAttempt,
//send the cleanup event to containerLauncher //send the cleanup event to containerLauncher
taskAttempt.eventHandler.handle(new ContainerLauncherEvent( taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
taskAttempt.attemptId, taskAttempt.attemptId,
taskAttempt.containerID, taskAttempt.containerMgrAddress, taskAttempt.container.getId(), StringInterner
taskAttempt.containerToken, .weakIntern(taskAttempt.container.getNodeId().toString()),
taskAttempt.container.getContainerToken(),
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP)); ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
} }
} }

View File

@ -59,7 +59,6 @@
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ProtoUtils; import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -149,16 +148,13 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) {
// Construct the actual Container // Construct the actual Container
ContainerLaunchContext containerLaunchContext = ContainerLaunchContext containerLaunchContext =
event.getContainer(); event.getContainerLaunchContext();
org.apache.hadoop.yarn.api.records.Container container =
BuilderUtils.newContainer(containerID, null, null,
event.getResource(), null, containerToken);
// Now launch the actual container // Now launch the actual container
StartContainerRequest startRequest = Records StartContainerRequest startRequest = Records
.newRecord(StartContainerRequest.class); .newRecord(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext); startRequest.setContainerLaunchContext(containerLaunchContext);
startRequest.setContainer(container); startRequest.setContainer(event.getAllocatedContainer());
StartContainerResponse response = proxy.startContainer(startRequest); StartContainerResponse response = proxy.startContainer(startRequest);
ByteBuffer portInfo = response ByteBuffer portInfo = response

View File

@ -20,35 +20,34 @@
import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.Resource;
public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent { public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
private final ContainerLaunchContext container; private final Container allocatedContainer;
private final ContainerLaunchContext containerLaunchContext;
private final Task task; private final Task task;
private final Resource resource;
public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID, public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID,
ContainerId containerID, String containerMgrAddress, ContainerLaunchContext containerLaunchContext,
ContainerToken containerToken, Container allocatedContainer, Task remoteTask) {
ContainerLaunchContext containerLaunchContext, Resource resource, super(taskAttemptID, allocatedContainer.getId(), StringInterner
Task remoteTask) { .weakIntern(allocatedContainer.getNodeId().toString()),
super(taskAttemptID, containerID, containerMgrAddress, containerToken, allocatedContainer.getContainerToken(),
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH); ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
this.container = containerLaunchContext; this.allocatedContainer = allocatedContainer;
this.containerLaunchContext = containerLaunchContext;
this.task = remoteTask; this.task = remoteTask;
this.resource = resource;
} }
public ContainerLaunchContext getContainer() { public ContainerLaunchContext getContainerLaunchContext() {
return this.container; return this.containerLaunchContext;
} }
public Resource getResource() { public Container getAllocatedContainer() {
return this.resource; return this.allocatedContainer;
} }
public Task getRemoteTask() { public Task getRemoteTask() {

View File

@ -23,6 +23,7 @@
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import junit.framework.Assert; import junit.framework.Assert;
@ -46,6 +47,11 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
@ -412,6 +418,39 @@ public void checkTaskStateTypeConversion() {
} }
} }
private Container containerObtainedByContainerLauncher;
@Test
public void testContainerPassThrough() throws Exception {
MRApp app = new MRApp(0, 1, true, this.getClass().getName(), true) {
@Override
protected ContainerLauncher createContainerLauncher(AppContext context) {
return new MockContainerLauncher() {
@Override
public void handle(ContainerLauncherEvent event) {
if (event instanceof ContainerRemoteLaunchEvent) {
containerObtainedByContainerLauncher =
((ContainerRemoteLaunchEvent) event).getAllocatedContainer();
}
super.handle(event);
}
};
};
};
Job job = app.submit(new Configuration());
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
Collection<Task> tasks = job.getTasks().values();
Collection<TaskAttempt> taskAttempts =
tasks.iterator().next().getAttempts().values();
TaskAttemptImpl taskAttempt =
(TaskAttemptImpl) taskAttempts.iterator().next();
// Container from RM should pass through to the launcher. Container object
// should be the same.
Assert.assertTrue(taskAttempt.container
== containerObtainedByContainerLauncher);
}
private final class MRAppWithHistory extends MRApp { private final class MRAppWithHistory extends MRApp {
public MRAppWithHistory(int maps, int reduces, boolean autoComplete, public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart, int startCount) { String testName, boolean cleanOnStart, int startCount) {

View File

@ -79,7 +79,8 @@ protected ContainerLauncher createContainerLauncher(AppContext context) {
public void handle(ContainerLauncherEvent event) { public void handle(ContainerLauncherEvent event) {
if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) { if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
ContainerRemoteLaunchEvent launchEvent = (ContainerRemoteLaunchEvent) event; ContainerRemoteLaunchEvent launchEvent = (ContainerRemoteLaunchEvent) event;
ContainerLaunchContext launchContext = launchEvent.getContainer(); ContainerLaunchContext launchContext =
launchEvent.getContainerLaunchContext();
String cmdString = launchContext.getCommands().toString(); String cmdString = launchContext.getCommands().toString();
LOG.info("launchContext " + cmdString); LOG.info("launchContext " + cmdString);
myCommandLine = cmdString; myCommandLine = cmdString;

View File

@ -37,7 +37,6 @@
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@ -224,10 +223,6 @@ private void waitForEvents(CustomContainerLauncher containerLauncher,
@Test @Test
public void testSlowNM() throws Exception { public void testSlowNM() throws Exception {
test();
}
private void test() throws Exception {
conf = new Configuration(); conf = new Configuration();
int maxAttempts = 1; int maxAttempts = 1;
@ -382,6 +377,15 @@ public GetContainerStatusResponse getContainerStatus(
@Override @Override
public StartContainerResponse startContainer(StartContainerRequest request) public StartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException { throws YarnRemoteException {
// Validate that the container is what RM is giving.
Assert.assertEquals(MRApp.NM_HOST, request.getContainer().getNodeId()
.getHost());
Assert.assertEquals(MRApp.NM_PORT, request.getContainer().getNodeId()
.getPort());
Assert.assertEquals(MRApp.NM_HOST + ":" + MRApp.NM_HTTP_PORT, request
.getContainer().getNodeHttpAddress());
StartContainerResponse response = recordFactory StartContainerResponse response = recordFactory
.newRecordInstance(StartContainerResponse.class); .newRecordInstance(StartContainerResponse.class);
status = recordFactory.newRecordInstance(ContainerStatus.class); status = recordFactory.newRecordInstance(ContainerStatus.class);