MAPREDUCE-2652. Enabled multiple NMs to be runnable on a single node by making shuffle service port to be truely configurable. Contributed by Robert Joseph Evans.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1163585 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-08-31 11:38:32 +00:00
parent 5786efbfa6
commit ade0f0560f
23 changed files with 422 additions and 44 deletions

View File

@ -226,6 +226,10 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2701. app/Job.java needs UGI for the user that launched it. MAPREDUCE-2701. app/Job.java needs UGI for the user that launched it.
(Robert Evans via mahadev) (Robert Evans via mahadev)
MAPREDUCE-2652. Enabled multiple NMs to be runnable on a single node by
making shuffle service port to be truely configurable. (Robert Evans via
vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and MAPREDUCE-2026. Make JobTracker.getJobCounters() and
@ -652,7 +656,6 @@ Release 0.23.0 - Unreleased
MAPREDUCE-279. Fix in MR-279 branch. Distributed cache bug fix to pass Terasort. MAPREDUCE-279. Fix in MR-279 branch. Distributed cache bug fix to pass Terasort.
(vinodkv) (vinodkv)
MAPREDUCE-279. Fix in MR-279 branch. Fix null pointer exception in kill task MAPREDUCE-279. Fix in MR-279 branch. Fix null pointer exception in kill task
attempt (mahadev) attempt (mahadev)

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.AMConstants; import org.apache.hadoop.mapreduce.v2.app.AMConstants;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
@ -198,9 +199,13 @@ public void run() {
// after "launching," send launched event to task attempt to move // after "launching," send launched event to task attempt to move
// state from ASSIGNED to RUNNING (also nukes "remoteTask", so must // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must
// do getRemoteTask() call first) // do getRemoteTask() call first)
//There is no port number because we are not really talking to a task
// tracker. The shuffle is just done through local files. So the
// port number is set to -1 in this case.
context.getEventHandler().handle( context.getEventHandler().handle(
new TaskAttemptEvent(attemptID, new TaskAttemptContainerLaunchedEvent(attemptID, -1));
TaskAttemptEventType.TA_CONTAINER_LAUNCHED)); //FIXME: race condition here? or do we have same kind of lock on TA handler => MapTask can't send TA_UPDATE before TA_CONTAINER_LAUNCHED moves TA to RUNNING state? (probably latter) //FIXME: race condition here? or do we have same kind of lock on TA handler => MapTask can't send TA_UPDATE before TA_CONTAINER_LAUNCHED moves TA to RUNNING state? (probably latter)
if (numMapTasks == 0) { if (numMapTasks == 0) {
doneWithMaps = true; doneWithMaps = true;

View File

@ -63,4 +63,9 @@ public interface TaskAttempt {
* yet, returns 0. * yet, returns 0.
*/ */
long getFinishTime(); long getFinishTime();
/**
* @return the port shuffle is on.
*/
public int getShufflePort();
} }

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job.event;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
public class TaskAttemptContainerLaunchedEvent extends TaskAttemptEvent {
private int shufflePort;
/**
* Create a new TaskAttemptEvent.
* @param id the id of the task attempt
* @param shufflePort the port that shuffle is listening on.
*/
public TaskAttemptContainerLaunchedEvent(TaskAttemptId id, int shufflePort) {
super(id, TaskAttemptEventType.TA_CONTAINER_LAUNCHED);
this.shufflePort = shufflePort;
}
/**
* Get the port that the shuffle handler is listening on. This is only
* valid if the type of the event is TA_CONTAINER_LAUNCHED
* @return the port the shuffle handler is listening on.
*/
public int getShufflePort() {
return shufflePort;
}
}

View File

@ -29,6 +29,11 @@ public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> {
private TaskAttemptId attemptID; private TaskAttemptId attemptID;
/**
* Create a new TaskAttemptEvent.
* @param id the id of the task attempt
* @param type the type of event that happened.
*/
public TaskAttemptEvent(TaskAttemptId id, TaskAttemptEventType type) { public TaskAttemptEvent(TaskAttemptId id, TaskAttemptEventType type) {
super(type); super(type);
this.attemptID = id; this.attemptID = id;
@ -37,5 +42,4 @@ public TaskAttemptEvent(TaskAttemptId id, TaskAttemptEventType type) {
public TaskAttemptId getTaskAttemptID() { public TaskAttemptId getTaskAttemptID() {
return attemptID; return attemptID;
} }
} }

View File

@ -43,7 +43,6 @@
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceChildJVM; import org.apache.hadoop.mapred.MapReduceChildJVM;
import org.apache.hadoop.mapred.ProgressSplitsBlock;
import org.apache.hadoop.mapred.ShuffleHandler; import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskAttemptContextImpl; import org.apache.hadoop.mapred.TaskAttemptContextImpl;
@ -65,7 +64,6 @@
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.MRConstants; import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.api.records.Counter; import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@ -80,6 +78,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@ -126,7 +125,6 @@
/** /**
* Implementation of TaskAttempt interface. * Implementation of TaskAttempt interface.
*/ */
@SuppressWarnings("all")
public abstract class TaskAttemptImpl implements public abstract class TaskAttemptImpl implements
org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt, org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
EventHandler<TaskAttemptEvent> { EventHandler<TaskAttemptEvent> {
@ -159,6 +157,7 @@ public abstract class TaskAttemptImpl implements
private long launchTime; private long launchTime;
private long finishTime; private long finishTime;
private WrappedProgressSplitsBlock progressSplitBlock; private WrappedProgressSplitsBlock progressSplitBlock;
private int shufflePort = -1;
private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION = private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
new CleanupContainerTransition(); new CleanupContainerTransition();
@ -596,13 +595,10 @@ private ContainerLaunchContext createContainerLaunchContext() {
// Add shuffle token // Add shuffle token
LOG.info("Putting shuffle token in serviceData"); LOG.info("Putting shuffle token in serviceData");
DataOutputBuffer jobToken_dob = new DataOutputBuffer();
jobToken.write(jobToken_dob);
container container
.setServiceData( .setServiceData(
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
ByteBuffer.wrap(jobToken_dob.getData(), 0, ShuffleHandler.serializeServiceData(jobToken));
jobToken_dob.getLength()));
MRApps.addToClassPath(container.getAllEnv(), getInitialClasspath()); MRApps.addToClassPath(container.getAllEnv(), getInitialClasspath());
} catch (IOException e) { } catch (IOException e) {
@ -784,6 +780,17 @@ public long getFinishTime() {
} }
} }
@Override
public int getShufflePort() {
readLock.lock();
try {
return shufflePort;
} finally {
readLock.unlock();
}
}
/**If container Assigned then return the node's address, otherwise null. /**If container Assigned then return the node's address, otherwise null.
*/ */
@Override @Override
@ -1153,7 +1160,11 @@ private static class LaunchedContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override @Override
public void transition(TaskAttemptImpl taskAttempt, public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) { TaskAttemptEvent evnt) {
TaskAttemptContainerLaunchedEvent event =
(TaskAttemptContainerLaunchedEvent) evnt;
//set the launch time //set the launch time
taskAttempt.launchTime = taskAttempt.clock.getTime(); taskAttempt.launchTime = taskAttempt.clock.getTime();
// register it to TaskAttemptListener so that it start listening // register it to TaskAttemptListener so that it start listening
@ -1186,6 +1197,7 @@ public void transition(TaskAttemptImpl taskAttempt,
//make remoteTask reference as null as it is no more needed //make remoteTask reference as null as it is no more needed
//and free up the memory //and free up the memory
taskAttempt.remoteTask = null; taskAttempt.remoteTask = null;
taskAttempt.shufflePort = event.getShufflePort();
//tell the Task that attempt has started //tell the Task that attempt has started
taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.eventHandler.handle(new TaskTAttemptEvent(

View File

@ -559,8 +559,9 @@ private void handleTaskAttemptCompletion(TaskAttemptId attemptId,
if (attempt.getNodeHttpAddress() != null) { if (attempt.getNodeHttpAddress() != null) {
TaskAttemptCompletionEvent tce = recordFactory.newRecordInstance(TaskAttemptCompletionEvent.class); TaskAttemptCompletionEvent tce = recordFactory.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(-1); tce.setEventId(-1);
//TODO: XXXXXX hardcoded port tce.setMapOutputServerAddress("http://"
tce.setMapOutputServerAddress("http://" + attempt.getNodeHttpAddress().split(":")[0] + ":8080"); + attempt.getNodeHttpAddress().split(":")[0] + ":"
+ attempt.getShufflePort());
tce.setStatus(status); tce.setStatus(status);
tce.setAttemptId(attempt.getID()); tce.setAttemptId(attempt.getID());
int runTime = 0; int runTime = 0;

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app.launcher; package org.apache.hadoop.mapreduce.v2.app.launcher;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -30,11 +31,12 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.AMConstants; import org.apache.hadoop.mapreduce.v2.app.AMConstants;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@ -48,6 +50,7 @@
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@ -179,6 +182,7 @@ private class EventProcessor implements Runnable {
this.event = event; this.event = event;
} }
@SuppressWarnings("unchecked")
@Override @Override
public void run() { public void run() {
LOG.info("Processing the event " + event.toString()); LOG.info("Processing the event " + event.toString());
@ -208,15 +212,25 @@ public void run() {
StartContainerRequest startRequest = recordFactory StartContainerRequest startRequest = recordFactory
.newRecordInstance(StartContainerRequest.class); .newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext); startRequest.setContainerLaunchContext(containerLaunchContext);
proxy.startContainer(startRequest); StartContainerResponse response = proxy.startContainer(startRequest);
ByteBuffer portInfo = response
.getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
int port = -1;
if(portInfo != null) {
port = ShuffleHandler.deserializeMetaData(portInfo);
}
LOG.info("Shuffle port returned by ContainerManager for "
+ taskAttemptID + " : " + port);
LOG.info("Returning from container-launch for " + taskAttemptID); if(port < 0) {
throw new IllegalStateException("Invalid shuffle port number "
+ port + " returned for " + taskAttemptID);
}
// after launching, send launched event to task attempt to move // after launching, send launched event to task attempt to move
// it from ASSIGNED to RUNNING state // it from ASSIGNED to RUNNING state
context.getEventHandler().handle( context.getEventHandler().handle(
new TaskAttemptEvent(taskAttemptID, new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
TaskAttemptEventType.TA_CONTAINER_LAUNCHED));
} catch (Throwable t) { } catch (Throwable t) {
String message = "Container launch failed for " + containerID String message = "Container launch failed for " + containerID
+ " : " + StringUtils.stringifyException(t); + " : " + StringUtils.stringifyException(t);

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
@ -295,8 +296,8 @@ else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)
TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event) TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event)
.getTaskAttemptID(); .getTaskAttemptID();
TaskAttemptInfo attInfo = getTaskAttemptInfo(aId); TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
actualHandler.handle(new TaskAttemptEvent(aId, //TODO need to get the real port number MAPREDUCE-2666
TaskAttemptEventType.TA_CONTAINER_LAUNCHED)); actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId, -1));
// send the status update event // send the status update event
sendStatusUpdateEvent(aId, attInfo); sendStatusUpdateEvent(aId, attInfo);

View File

@ -52,6 +52,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
@ -291,9 +292,11 @@ class MockContainerLauncher implements ContainerLauncher {
public void handle(ContainerLauncherEvent event) { public void handle(ContainerLauncherEvent event) {
switch (event.getType()) { switch (event.getType()) {
case CONTAINER_REMOTE_LAUNCH: case CONTAINER_REMOTE_LAUNCH:
//We are running locally so set the shuffle port to -1
getContext().getEventHandler().handle( getContext().getEventHandler().handle(
new TaskAttemptEvent(event.getTaskAttemptID(), new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(),
TaskAttemptEventType.TA_CONTAINER_LAUNCHED)); -1)
);
attemptLaunched(event.getTaskAttemptID()); attemptLaunched(event.getTaskAttemptID());
break; break;

View File

@ -25,6 +25,8 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.FileSystemCounter; import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobCounter;
@ -200,6 +202,11 @@ public long getFinishTime() {
return 0; return 0;
} }
@Override
public int getShufflePort() {
return ShuffleHandler.DEFAULT_SHUFFLE_PORT;
}
@Override @Override
public Counters getCounters() { public Counters getCounters() {
return report.getCounters(); return report.getCounters();

View File

@ -515,6 +515,11 @@ public Counters getCounters() {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");
} }
@Override
public int getShufflePort() {
throw new UnsupportedOperationException("Not supported yet.");
}
private float getCodeRuntime() { private float getCodeRuntime() {
int taskIndex = myAttemptID.getTaskId().getId(); int taskIndex = myAttemptID.getTaskId().getId();
int attemptIndex = myAttemptID.getId(); int attemptIndex = myAttemptID.getId();

View File

@ -146,4 +146,10 @@ public long getLaunchTime() {
public long getFinishTime() { public long getFinishTime() {
return report.getFinishTime(); return report.getFinishTime();
} }
@Override
public int getShufflePort() {
throw new UnsupportedOperationException("Not supported yet.");
}
} }

View File

@ -72,6 +72,8 @@ public void init(Configuration conf) {
conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT,
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class, ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
Service.class); Service.class);
// Non-standard shuffle port
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 8083);
conf.setClass(NMConfig.NM_CONTAINER_EXECUTOR_CLASS, conf.setClass(NMConfig.NM_CONTAINER_EXECUTOR_CLASS,
DefaultContainerExecutor.class, ContainerExecutor.class); DefaultContainerExecutor.class, ContainerExecutor.class);

View File

@ -105,7 +105,8 @@ public static void setup() throws IOException {
if (mrCluster == null) { if (mrCluster == null) {
mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName()); mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName());
mrCluster.init(new Configuration()); Configuration conf = new Configuration();
mrCluster.init(conf);
mrCluster.start(); mrCluster.start();
} }

View File

@ -120,7 +120,8 @@ public class ShuffleHandler extends AbstractService
private static final JobTokenSecretManager secretManager = private static final JobTokenSecretManager secretManager =
new JobTokenSecretManager(); new JobTokenSecretManager();
public static final String SHUFFLE_PORT = "mapreduce.shuffle.port"; public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
public static final int DEFAULT_SHUFFLE_PORT = 8080;
@Metrics(about="Shuffle output metrics", context="mapred") @Metrics(about="Shuffle output metrics", context="mapred")
static class ShuffleMetrics implements ChannelFutureListener { static class ShuffleMetrics implements ChannelFutureListener {
@ -155,14 +156,58 @@ public ShuffleHandler() {
this(DefaultMetricsSystem.instance()); this(DefaultMetricsSystem.instance());
} }
@Override /**
public void initApp(String user, ApplicationId appId, ByteBuffer secret) { * Serialize the shuffle port into a ByteBuffer for use later on.
// TODO these bytes should be versioned * @param port the port to be sent to the ApplciationMaster
try { * @return the serialized form of the port.
*/
static ByteBuffer serializeMetaData(int port) throws IOException {
//TODO these bytes should be versioned
DataOutputBuffer port_dob = new DataOutputBuffer();
port_dob.writeInt(port);
return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
}
/**
* A helper function to deserialize the metadata returned by ShuffleHandler.
* @param meta the metadata returned by the ShuffleHandler
* @return the port the Shuffle Handler is listening on to serve shuffle data.
*/
public static int deserializeMetaData(ByteBuffer meta) throws IOException {
//TODO this should be returning a class not just an int
DataInputByteBuffer in = new DataInputByteBuffer();
in.reset(meta);
int port = in.readInt();
return port;
}
/**
* A helper function to serialize the JobTokenIdentifier to be sent to the
* ShuffleHandler as ServiceData.
* @param jobToken the job token to be used for authentication of
* shuffle data requests.
* @return the serialized version of the jobToken.
*/
public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken) throws IOException {
//TODO these bytes should be versioned
DataOutputBuffer jobToken_dob = new DataOutputBuffer();
jobToken.write(jobToken_dob);
return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
}
static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret) throws IOException {
DataInputByteBuffer in = new DataInputByteBuffer(); DataInputByteBuffer in = new DataInputByteBuffer();
in.reset(secret); in.reset(secret);
Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(); Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
jt.readFields(in); jt.readFields(in);
return jt;
}
@Override
public void initApp(String user, ApplicationId appId, ByteBuffer secret) {
// TODO these bytes should be versioned
try {
Token<JobTokenIdentifier> jt = deserializeServiceData(secret);
// TODO: Once SHuffle is out of NM, this can use MR APIs // TODO: Once SHuffle is out of NM, this can use MR APIs
JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId()); JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
userRsrc.put(jobId.toString(), user); userRsrc.put(jobId.toString(), user);
@ -193,7 +238,7 @@ public synchronized void start() {
Configuration conf = getConfig(); Configuration conf = getConfig();
ServerBootstrap bootstrap = new ServerBootstrap(selector); ServerBootstrap bootstrap = new ServerBootstrap(selector);
bootstrap.setPipelineFactory(new HttpPipelineFactory(conf)); bootstrap.setPipelineFactory(new HttpPipelineFactory(conf));
port = conf.getInt("mapreduce.shuffle.port", 8080); port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
accepted.add(bootstrap.bind(new InetSocketAddress(port))); accepted.add(bootstrap.bind(new InetSocketAddress(port)));
LOG.info(getName() + " listening on port " + port); LOG.info(getName() + " listening on port " + port);
super.start(); super.start();
@ -207,6 +252,17 @@ public synchronized void stop() {
super.stop(); super.stop();
} }
@Override
public synchronized ByteBuffer getMeta() {
try {
return serializeMetaData(port);
} catch (IOException e) {
LOG.error("Error during getMeta", e);
// TODO add API to AuxiliaryServices to report failures
return null;
}
}
Shuffle createShuffle() { Shuffle createShuffle() {
return new Shuffle(getConfig()); return new Shuffle(getConfig());
} }
@ -306,7 +362,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
try { try {
verifyRequest(jobId, ctx, request, response, verifyRequest(jobId, ctx, request, response,
new URL("http", "", 8080, reqUri)); new URL("http", "", port, reqUri));
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Shuffle failure ", e); LOG.warn("Shuffle failure ", e);
sendError(ctx, e.getMessage(), UNAUTHORIZED); sendError(ctx, e.getMessage(), UNAUTHORIZED);

View File

@ -26,11 +26,21 @@
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*;
import static org.apache.hadoop.test.MockitoMaker.*; import static org.apache.hadoop.test.MockitoMaker.*;
public class TestShuffleHandler { public class TestShuffleHandler {
static final long MiB = 1024 * 1024; static final long MiB = 1024 * 1024;
@Test public void testSerializeMeta() throws Exception {
assertEquals(1, ShuffleHandler.deserializeMetaData(
ShuffleHandler.serializeMetaData(1)));
assertEquals(-1, ShuffleHandler.deserializeMetaData(
ShuffleHandler.serializeMetaData(-1)));
assertEquals(8080, ShuffleHandler.deserializeMetaData(
ShuffleHandler.serializeMetaData(8080)));
}
@Test public void testShuffleMetrics() throws Exception { @Test public void testShuffleMetrics() throws Exception {
MetricsSystem ms = new MetricsSystemImpl(); MetricsSystem ms = new MetricsSystemImpl();
ShuffleHandler sh = new ShuffleHandler(ms); ShuffleHandler sh = new ShuffleHandler(ms);

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.api.protocolrecords; package org.apache.hadoop.yarn.api.protocolrecords;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
@ -32,5 +35,11 @@
@Public @Public
@Stable @Stable
public interface StartContainerResponse { public interface StartContainerResponse {
Map<String, ByteBuffer> getAllServiceResponse();
ByteBuffer getServiceResponse(String key);
void addAllServiceResponse(Map<String, ByteBuffer> serviceResponse);
void setServiceResponse(String key, ByteBuffer value);
void removeServiceResponse(String key);
void clearServiceResponse();
} }

View File

@ -19,17 +19,26 @@
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto;
public class StartContainerResponsePBImpl extends ProtoBase<StartContainerResponseProto> implements StartContainerResponse { public class StartContainerResponsePBImpl extends ProtoBase<StartContainerResponseProto> implements StartContainerResponse {
StartContainerResponseProto proto = StartContainerResponseProto.getDefaultInstance(); StartContainerResponseProto proto = StartContainerResponseProto.getDefaultInstance();
StartContainerResponseProto.Builder builder = null; StartContainerResponseProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
private Map<String, ByteBuffer> serviceResponse = null;
public StartContainerResponsePBImpl() { public StartContainerResponsePBImpl() {
builder = StartContainerResponseProto.newBuilder(); builder = StartContainerResponseProto.newBuilder();
} }
@ -40,11 +49,27 @@ public StartContainerResponsePBImpl(StartContainerResponseProto proto) {
} }
public StartContainerResponseProto getProto() { public StartContainerResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build(); proto = viaProto ? proto : builder.build();
viaProto = true; viaProto = true;
return proto; return proto;
} }
private void mergeLocalToBuilder() {
if (this.serviceResponse != null) {
addServiceResponseToProto();
}
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() { private void maybeInitBuilder() {
if (viaProto || builder == null) { if (viaProto || builder == null) {
builder = StartContainerResponseProto.newBuilder(proto); builder = StartContainerResponseProto.newBuilder(proto);
@ -53,7 +78,84 @@ private void maybeInitBuilder() {
} }
@Override
public Map<String, ByteBuffer> getAllServiceResponse() {
initServiceResponse();
return this.serviceResponse;
}
@Override
public ByteBuffer getServiceResponse(String key) {
initServiceResponse();
return this.serviceResponse.get(key);
}
private void initServiceResponse() {
if (this.serviceResponse != null) {
return;
}
StartContainerResponseProtoOrBuilder p = viaProto ? proto : builder;
List<StringBytesMapProto> list = p.getServiceResponseList();
this.serviceResponse = new HashMap<String, ByteBuffer>();
for (StringBytesMapProto c : list) {
this.serviceResponse.put(c.getKey(), convertFromProtoFormat(c.getValue()));
}
}
@Override
public void addAllServiceResponse(final Map<String, ByteBuffer> serviceResponse) {
if (serviceResponse == null)
return;
initServiceResponse();
this.serviceResponse.putAll(serviceResponse);
}
private void addServiceResponseToProto() {
maybeInitBuilder();
builder.clearServiceResponse();
if (serviceResponse == null)
return;
Iterable<StringBytesMapProto> iterable = new Iterable<StringBytesMapProto>() {
@Override
public Iterator<StringBytesMapProto> iterator() {
return new Iterator<StringBytesMapProto>() {
Iterator<String> keyIter = serviceResponse.keySet().iterator();
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public StringBytesMapProto next() {
String key = keyIter.next();
return StringBytesMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(serviceResponse.get(key))).build();
}
@Override
public boolean hasNext() {
return keyIter.hasNext();
}
};
}
};
builder.addAllServiceResponse(iterable);
}
@Override
public void setServiceResponse(String key, ByteBuffer val) {
initServiceResponse();
this.serviceResponse.put(key, val);
}
@Override
public void removeServiceResponse(String key) {
initServiceResponse();
this.serviceResponse.remove(key);
}
@Override
public void clearServiceResponse() {
initServiceResponse();
this.serviceResponse.clear();
}
} }

View File

@ -143,6 +143,7 @@ message StartContainerRequestProto {
} }
message StartContainerResponseProto { message StartContainerResponseProto {
repeated StringBytesMapProto service_response = 1;
} }
message StopContainerRequestProto { message StopContainerRequestProto {

View File

@ -44,11 +44,14 @@ public class AuxServices extends AbstractService
public static final String AUX_SERVICE_CLASS_FMT = public static final String AUX_SERVICE_CLASS_FMT =
"nodemanager.aux.service.%s.class"; "nodemanager.aux.service.%s.class";
public final Map<String,AuxiliaryService> serviceMap; public final Map<String,AuxiliaryService> serviceMap;
public final Map<String,ByteBuffer> serviceMeta;
public AuxServices() { public AuxServices() {
super(AuxServices.class.getName()); super(AuxServices.class.getName());
serviceMap = serviceMap =
Collections.synchronizedMap(new HashMap<String,AuxiliaryService>()); Collections.synchronizedMap(new HashMap<String,AuxiliaryService>());
serviceMeta =
Collections.synchronizedMap(new HashMap<String,ByteBuffer>());
// Obtain services from configuration in init() // Obtain services from configuration in init()
} }
@ -63,6 +66,15 @@ Collection<AuxiliaryService> getServices() {
return Collections.unmodifiableCollection(serviceMap.values()); return Collections.unmodifiableCollection(serviceMap.values());
} }
/**
* @return the meta data for all registered services, that have been started.
* If a service has not been started no metadata will be available. The key
* the the name of the service as defined in the configuration.
*/
public Map<String, ByteBuffer> getMeta() {
return Collections.unmodifiableMap(serviceMeta);
}
@Override @Override
public void init(Configuration conf) { public void init(Configuration conf) {
Collection<String> auxNames = conf.getStringCollection(AUX_SERVICES); Collection<String> auxNames = conf.getStringCollection(AUX_SERVICES);
@ -75,7 +87,15 @@ public void init(Configuration conf) {
throw new RuntimeException("No class defiend for " + sName); throw new RuntimeException("No class defiend for " + sName);
} }
AuxiliaryService s = ReflectionUtils.newInstance(sClass, conf); AuxiliaryService s = ReflectionUtils.newInstance(sClass, conf);
// TODO better use use s.getName()? // TODO better use s.getName()?
if(!sName.equals(s.getName())) {
LOG.warn("The Auxilurary Service named '"+sName+"' in the "
+"configuration is for class "+sClass+" which has "
+"a name of '"+s.getName()+"'. Because these are "
+"not the same tools trying to send ServiceData and read "
+"Service Meta Data may have issues unless the refer to "
+"the name in the config.");
}
addService(sName, s); addService(sName, s);
s.init(conf); s.init(conf);
} catch (RuntimeException e) { } catch (RuntimeException e) {
@ -90,9 +110,15 @@ public void init(Configuration conf) {
public void start() { public void start() {
// TODO fork(?) services running as configured user // TODO fork(?) services running as configured user
// monitor for health, shutdown/restart(?) if any should die // monitor for health, shutdown/restart(?) if any should die
for (Service service : serviceMap.values()) { for (Map.Entry<String, AuxiliaryService> entry : serviceMap.entrySet()) {
AuxiliaryService service = entry.getValue();
String name = entry.getKey();
service.start(); service.start();
service.register(this); service.register(this);
ByteBuffer meta = service.getMeta();
if(meta != null) {
serviceMeta.put(name, meta);
}
} }
super.start(); super.start();
} }
@ -108,6 +134,7 @@ public void stop() {
} }
} }
serviceMap.clear(); serviceMap.clear();
serviceMeta.clear();
} }
} finally { } finally {
super.stop(); super.stop();
@ -146,6 +173,15 @@ public void handle(AuxServicesEvent event) {
public interface AuxiliaryService extends Service { public interface AuxiliaryService extends Service {
void initApp(String user, ApplicationId appId, ByteBuffer data); void initApp(String user, ApplicationId appId, ByteBuffer data);
void stopApp(ApplicationId appId); void stopApp(ApplicationId appId);
/**
* Retreive metadata for this service. This is likely going to be contact
* information so that applications can access the service remotely. Ideally
* each service should provide a method to parse out the information to a usable
* class. This will only be called after the services start method has finished.
* the result may be cached.
* @return metadata for this service that should be made avaiable to applications.
*/
ByteBuffer getMeta();
} }
} }

View File

@ -283,6 +283,7 @@ public StartContainerResponse startContainer(StartContainerRequest request)
dispatcher.getEventHandler().handle(new ApplicationInitEvent(container)); dispatcher.getEventHandler().handle(new ApplicationInitEvent(container));
StartContainerResponse response = StartContainerResponse response =
recordFactory.newRecordInstance(StartContainerResponse.class); recordFactory.newRecordInstance(StartContainerResponse.class);
response.addAllServiceResponse(auxiluaryServices.getMeta());
metrics.launchedContainer(); metrics.launchedContainer();
metrics.allocateContainer(launchContext.getResource()); metrics.allocateContainer(launchContext.getResource());
return response; return response;

View File

@ -22,6 +22,7 @@
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -44,10 +45,16 @@ static class LightService extends AbstractService
private final int expected_appId; private final int expected_appId;
private int remaining_init; private int remaining_init;
private int remaining_stop; private int remaining_stop;
private ByteBuffer meta = null;
LightService(String name, char idef, int expected_appId) { LightService(String name, char idef, int expected_appId) {
this(name, idef, expected_appId, null);
}
LightService(String name, char idef, int expected_appId, ByteBuffer meta) {
super(name); super(name);
this.idef = idef; this.idef = idef;
this.expected_appId = expected_appId; this.expected_appId = expected_appId;
this.meta = meta;
} }
@Override @Override
public void init(Configuration conf) { public void init(Configuration conf) {
@ -71,14 +78,18 @@ public void initApp(String user, ApplicationId appId, ByteBuffer data) {
public void stopApp(ApplicationId appId) { public void stopApp(ApplicationId appId) {
assertEquals(expected_appId, appId.getId()); assertEquals(expected_appId, appId.getId());
} }
@Override
public ByteBuffer getMeta() {
return meta;
}
} }
static class ServiceA extends LightService { static class ServiceA extends LightService {
public ServiceA() { super("A", 'A', 65); } public ServiceA() { super("A", 'A', 65, ByteBuffer.wrap("A".getBytes())); }
} }
static class ServiceB extends LightService { static class ServiceB extends LightService {
public ServiceB() { super("B", 'B', 66); } public ServiceB() { super("B", 'B', 66, ByteBuffer.wrap("B".getBytes())); }
} }
@Test @Test
@ -139,6 +150,44 @@ public void testAuxServices() {
} }
} }
@Test
public void testAuxServicesMeta() {
Configuration conf = new Configuration();
conf.setStrings(AuxServices.AUX_SERVICES, new String[] { "Asrv", "Bsrv" });
conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Asrv"),
ServiceA.class, Service.class);
conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Bsrv"),
ServiceB.class, Service.class);
final AuxServices aux = new AuxServices();
aux.init(conf);
int latch = 1;
for (Service s : aux.getServices()) {
assertEquals(INITED, s.getServiceState());
if (s instanceof ServiceA) { latch *= 2; }
else if (s instanceof ServiceB) { latch *= 3; }
else fail("Unexpected service type " + s.getClass());
}
assertEquals("Invalid mix of services", 6, latch);
aux.start();
for (Service s : aux.getServices()) {
assertEquals(STARTED, s.getServiceState());
}
Map<String, ByteBuffer> meta = aux.getMeta();
assertEquals(2, meta.size());
assertEquals("A", new String(meta.get("Asrv").array()));
assertEquals("B", new String(meta.get("Bsrv").array()));
aux.stop();
for (Service s : aux.getServices()) {
assertEquals(STOPPED, s.getServiceState());
}
}
@Test @Test
public void testAuxUnexpectedStop() { public void testAuxUnexpectedStop() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();