diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index caa27f4d3b..469fa1f8b9 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -226,6 +226,10 @@ Release 0.23.0 - Unreleased MAPREDUCE-2701. app/Job.java needs UGI for the user that launched it. (Robert Evans via mahadev) + MAPREDUCE-2652. Enabled multiple NMs to be runnable on a single node by + making shuffle service port to be truely configurable. (Robert Evans via + vinodkv) + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and @@ -652,7 +656,6 @@ Release 0.23.0 - Unreleased MAPREDUCE-279. Fix in MR-279 branch. Distributed cache bug fix to pass Terasort. (vinodkv) - MAPREDUCE-279. Fix in MR-279 branch. Fix null pointer exception in kill task attempt (mahadev) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java index 5f664d1516..422f6ff6b3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java @@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AMConstants; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.Job; @@ -198,9 +199,13 @@ public void run() { // after "launching," send launched event to task attempt to move // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must // do getRemoteTask() call first) + + //There is no port number because we are not really talking to a task + // tracker. The shuffle is just done through local files. So the + // port number is set to -1 in this case. context.getEventHandler().handle( - new TaskAttemptEvent(attemptID, - TaskAttemptEventType.TA_CONTAINER_LAUNCHED)); //FIXME: race condition here? or do we have same kind of lock on TA handler => MapTask can't send TA_UPDATE before TA_CONTAINER_LAUNCHED moves TA to RUNNING state? (probably latter) + new TaskAttemptContainerLaunchedEvent(attemptID, -1)); + //FIXME: race condition here? or do we have same kind of lock on TA handler => MapTask can't send TA_UPDATE before TA_CONTAINER_LAUNCHED moves TA to RUNNING state? (probably latter) if (numMapTasks == 0) { doneWithMaps = true; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java index bae1136246..7092b6dbc0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java @@ -63,4 +63,9 @@ public interface TaskAttempt { * yet, returns 0. */ long getFinishTime(); + + /** + * @return the port shuffle is on. + */ + public int getShufflePort(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerLaunchedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerLaunchedEvent.java new file mode 100644 index 0000000000..68cb84ccc3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerLaunchedEvent.java @@ -0,0 +1,45 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app.job.event; + +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; + +public class TaskAttemptContainerLaunchedEvent extends TaskAttemptEvent { + private int shufflePort; + + /** + * Create a new TaskAttemptEvent. + * @param id the id of the task attempt + * @param shufflePort the port that shuffle is listening on. + */ + public TaskAttemptContainerLaunchedEvent(TaskAttemptId id, int shufflePort) { + super(id, TaskAttemptEventType.TA_CONTAINER_LAUNCHED); + this.shufflePort = shufflePort; + } + + + /** + * Get the port that the shuffle handler is listening on. This is only + * valid if the type of the event is TA_CONTAINER_LAUNCHED + * @return the port the shuffle handler is listening on. + */ + public int getShufflePort() { + return shufflePort; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEvent.java index 41b6b3ef02..3a9851a617 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEvent.java @@ -28,7 +28,12 @@ public class TaskAttemptEvent extends AbstractEvent { private TaskAttemptId attemptID; - + + /** + * Create a new TaskAttemptEvent. + * @param id the id of the task attempt + * @param type the type of event that happened. + */ public TaskAttemptEvent(TaskAttemptId id, TaskAttemptEventType type) { super(type); this.attemptID = id; @@ -37,5 +42,4 @@ public TaskAttemptEvent(TaskAttemptId id, TaskAttemptEventType type) { public TaskAttemptId getTaskAttemptID() { return attemptID; } - } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 87e0e082d9..9bf67ef27e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -43,7 +43,6 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceChildJVM; -import org.apache.hadoop.mapred.ProgressSplitsBlock; import org.apache.hadoop.mapred.ShuffleHandler; import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.TaskAttemptContextImpl; @@ -65,7 +64,6 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.v2.MRConstants; import org.apache.hadoop.mapreduce.v2.api.records.Counter; -import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -80,6 +78,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; @@ -126,7 +125,6 @@ /** * Implementation of TaskAttempt interface. */ -@SuppressWarnings("all") public abstract class TaskAttemptImpl implements org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt, EventHandler { @@ -159,6 +157,7 @@ public abstract class TaskAttemptImpl implements private long launchTime; private long finishTime; private WrappedProgressSplitsBlock progressSplitBlock; + private int shufflePort = -1; private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition(); @@ -596,13 +595,10 @@ private ContainerLaunchContext createContainerLaunchContext() { // Add shuffle token LOG.info("Putting shuffle token in serviceData"); - DataOutputBuffer jobToken_dob = new DataOutputBuffer(); - jobToken.write(jobToken_dob); container .setServiceData( ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, - ByteBuffer.wrap(jobToken_dob.getData(), 0, - jobToken_dob.getLength())); + ShuffleHandler.serializeServiceData(jobToken)); MRApps.addToClassPath(container.getAllEnv(), getInitialClasspath()); } catch (IOException e) { @@ -784,6 +780,17 @@ public long getFinishTime() { } } + + @Override + public int getShufflePort() { + readLock.lock(); + try { + return shufflePort; + } finally { + readLock.unlock(); + } + } + /**If container Assigned then return the node's address, otherwise null. */ @Override @@ -1153,7 +1160,11 @@ private static class LaunchedContainerTransition implements SingleArcTransition { @Override public void transition(TaskAttemptImpl taskAttempt, - TaskAttemptEvent event) { + TaskAttemptEvent evnt) { + + TaskAttemptContainerLaunchedEvent event = + (TaskAttemptContainerLaunchedEvent) evnt; + //set the launch time taskAttempt.launchTime = taskAttempt.clock.getTime(); // register it to TaskAttemptListener so that it start listening @@ -1186,6 +1197,7 @@ public void transition(TaskAttemptImpl taskAttempt, //make remoteTask reference as null as it is no more needed //and free up the memory taskAttempt.remoteTask = null; + taskAttempt.shufflePort = event.getShufflePort(); //tell the Task that attempt has started taskAttempt.eventHandler.handle(new TaskTAttemptEvent( diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index f1e4c80cc3..cd2a540b97 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -559,8 +559,9 @@ private void handleTaskAttemptCompletion(TaskAttemptId attemptId, if (attempt.getNodeHttpAddress() != null) { TaskAttemptCompletionEvent tce = recordFactory.newRecordInstance(TaskAttemptCompletionEvent.class); tce.setEventId(-1); - //TODO: XXXXXX hardcoded port - tce.setMapOutputServerAddress("http://" + attempt.getNodeHttpAddress().split(":")[0] + ":8080"); + tce.setMapOutputServerAddress("http://" + + attempt.getNodeHttpAddress().split(":")[0] + ":" + + attempt.getShufflePort()); tce.setStatus(status); tce.setAttemptId(attempt.getID()); int runTime = 0; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 6ac96f5e53..bc6322cb0e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.v2.app.launcher; import java.io.IOException; +import java.nio.ByteBuffer; import java.security.PrivilegedAction; import java.util.HashMap; import java.util.Map; @@ -30,11 +31,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.ShuffleHandler; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.app.AMConstants; import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; @@ -48,6 +50,7 @@ import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; @@ -179,6 +182,7 @@ private class EventProcessor implements Runnable { this.event = event; } + @SuppressWarnings("unchecked") @Override public void run() { LOG.info("Processing the event " + event.toString()); @@ -208,15 +212,25 @@ public void run() { StartContainerRequest startRequest = recordFactory .newRecordInstance(StartContainerRequest.class); startRequest.setContainerLaunchContext(containerLaunchContext); - proxy.startContainer(startRequest); - - LOG.info("Returning from container-launch for " + taskAttemptID); + StartContainerResponse response = proxy.startContainer(startRequest); + ByteBuffer portInfo = response + .getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID); + int port = -1; + if(portInfo != null) { + port = ShuffleHandler.deserializeMetaData(portInfo); + } + LOG.info("Shuffle port returned by ContainerManager for " + + taskAttemptID + " : " + port); + + if(port < 0) { + throw new IllegalStateException("Invalid shuffle port number " + + port + " returned for " + taskAttemptID); + } // after launching, send launched event to task attempt to move // it from ASSIGNED to RUNNING state context.getEventHandler().handle( - new TaskAttemptEvent(taskAttemptID, - TaskAttemptEventType.TA_CONTAINER_LAUNCHED)); + new TaskAttemptContainerLaunchedEvent(taskAttemptID, port)); } catch (Throwable t) { String message = "Container launch failed for " + containerID + " : " + StringUtils.stringifyException(t); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java index c1e19b9c31..073411c9b4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java @@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; @@ -295,8 +296,8 @@ else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH) TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event) .getTaskAttemptID(); TaskAttemptInfo attInfo = getTaskAttemptInfo(aId); - actualHandler.handle(new TaskAttemptEvent(aId, - TaskAttemptEventType.TA_CONTAINER_LAUNCHED)); + //TODO need to get the real port number MAPREDUCE-2666 + actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId, -1)); // send the status update event sendStatusUpdateEvent(aId, attInfo); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index a47ba3cfb8..1151b76610 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -52,6 +52,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; @@ -291,9 +292,11 @@ class MockContainerLauncher implements ContainerLauncher { public void handle(ContainerLauncherEvent event) { switch (event.getType()) { case CONTAINER_REMOTE_LAUNCH: + //We are running locally so set the shuffle port to -1 getContext().getEventHandler().handle( - new TaskAttemptEvent(event.getTaskAttemptID(), - TaskAttemptEventType.TA_CONTAINER_LAUNCHED)); + new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(), + -1) + ); attemptLaunched(event.getTaskAttemptID()); break; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java index 321dd1d22f..ce160b8f13 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java @@ -25,6 +25,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; + +import org.apache.hadoop.mapred.ShuffleHandler; import org.apache.hadoop.mapreduce.FileSystemCounter; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobCounter; @@ -200,6 +202,11 @@ public long getFinishTime() { return 0; } + @Override + public int getShufflePort() { + return ShuffleHandler.DEFAULT_SHUFFLE_PORT; + } + @Override public Counters getCounters() { return report.getCounters(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index 7abf435ed0..37ef85858c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -515,6 +515,11 @@ public Counters getCounters() { throw new UnsupportedOperationException("Not supported yet."); } + @Override + public int getShufflePort() { + throw new UnsupportedOperationException("Not supported yet."); + } + private float getCodeRuntime() { int taskIndex = myAttemptID.getTaskId().getId(); int attemptIndex = myAttemptID.getId(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java index 3759be511e..5f303440d0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java @@ -146,4 +146,10 @@ public long getLaunchTime() { public long getFinishTime() { return report.getFinishTime(); } + + @Override + public int getShufflePort() { + throw new UnsupportedOperationException("Not supported yet."); + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java index 4155858058..efe8c3acb7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java @@ -72,6 +72,8 @@ public void init(Configuration conf) { conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class, Service.class); + // Non-standard shuffle port + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 8083); conf.setClass(NMConfig.NM_CONTAINER_EXECUTOR_CLASS, DefaultContainerExecutor.class, ContainerExecutor.class); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index fae2aa0d20..00709f316b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -105,7 +105,8 @@ public static void setup() throws IOException { if (mrCluster == null) { mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName()); - mrCluster.init(new Configuration()); + Configuration conf = new Configuration(); + mrCluster.init(conf); mrCluster.start(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index e2620b3894..d63b8ca924 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -120,7 +120,8 @@ public class ShuffleHandler extends AbstractService private static final JobTokenSecretManager secretManager = new JobTokenSecretManager(); - public static final String SHUFFLE_PORT = "mapreduce.shuffle.port"; + public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port"; + public static final int DEFAULT_SHUFFLE_PORT = 8080; @Metrics(about="Shuffle output metrics", context="mapred") static class ShuffleMetrics implements ChannelFutureListener { @@ -155,15 +156,59 @@ public ShuffleHandler() { this(DefaultMetricsSystem.instance()); } + /** + * Serialize the shuffle port into a ByteBuffer for use later on. + * @param port the port to be sent to the ApplciationMaster + * @return the serialized form of the port. + */ + static ByteBuffer serializeMetaData(int port) throws IOException { + //TODO these bytes should be versioned + DataOutputBuffer port_dob = new DataOutputBuffer(); + port_dob.writeInt(port); + return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength()); + } + + /** + * A helper function to deserialize the metadata returned by ShuffleHandler. + * @param meta the metadata returned by the ShuffleHandler + * @return the port the Shuffle Handler is listening on to serve shuffle data. + */ + public static int deserializeMetaData(ByteBuffer meta) throws IOException { + //TODO this should be returning a class not just an int + DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(meta); + int port = in.readInt(); + return port; + } + + /** + * A helper function to serialize the JobTokenIdentifier to be sent to the + * ShuffleHandler as ServiceData. + * @param jobToken the job token to be used for authentication of + * shuffle data requests. + * @return the serialized version of the jobToken. + */ + public static ByteBuffer serializeServiceData(Token jobToken) throws IOException { + //TODO these bytes should be versioned + DataOutputBuffer jobToken_dob = new DataOutputBuffer(); + jobToken.write(jobToken_dob); + return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength()); + } + + static Token deserializeServiceData(ByteBuffer secret) throws IOException { + DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(secret); + Token jt = new Token(); + jt.readFields(in); + return jt; + } + @Override public void initApp(String user, ApplicationId appId, ByteBuffer secret) { // TODO these bytes should be versioned try { - DataInputByteBuffer in = new DataInputByteBuffer(); - in.reset(secret); - Token jt = new Token(); - jt.readFields(in); - // TODO: Once SHuffle is out of NM, this can use MR APIs + Token jt = deserializeServiceData(secret); + // TODO: Once SHuffle is out of NM, this can use MR APIs JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId()); userRsrc.put(jobId.toString(), user); LOG.info("Added token for " + jobId.toString()); @@ -193,7 +238,7 @@ public synchronized void start() { Configuration conf = getConfig(); ServerBootstrap bootstrap = new ServerBootstrap(selector); bootstrap.setPipelineFactory(new HttpPipelineFactory(conf)); - port = conf.getInt("mapreduce.shuffle.port", 8080); + port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); accepted.add(bootstrap.bind(new InetSocketAddress(port))); LOG.info(getName() + " listening on port " + port); super.start(); @@ -207,6 +252,17 @@ public synchronized void stop() { super.stop(); } + @Override + public synchronized ByteBuffer getMeta() { + try { + return serializeMetaData(port); + } catch (IOException e) { + LOG.error("Error during getMeta", e); + // TODO add API to AuxiliaryServices to report failures + return null; + } + } + Shuffle createShuffle() { return new Shuffle(getConfig()); } @@ -306,7 +362,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); try { verifyRequest(jobId, ctx, request, response, - new URL("http", "", 8080, reqUri)); + new URL("http", "", port, reqUri)); } catch (IOException e) { LOG.warn("Shuffle failure ", e); sendError(ctx, e.getMessage(), UNAUTHORIZED); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index 97f0c9740a..c1526cc572 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -26,11 +26,21 @@ import org.jboss.netty.channel.ChannelFuture; import org.junit.Test; +import static org.junit.Assert.*; import static org.apache.hadoop.test.MockitoMaker.*; public class TestShuffleHandler { static final long MiB = 1024 * 1024; + @Test public void testSerializeMeta() throws Exception { + assertEquals(1, ShuffleHandler.deserializeMetaData( + ShuffleHandler.serializeMetaData(1))); + assertEquals(-1, ShuffleHandler.deserializeMetaData( + ShuffleHandler.serializeMetaData(-1))); + assertEquals(8080, ShuffleHandler.deserializeMetaData( + ShuffleHandler.serializeMetaData(8080))); + } + @Test public void testShuffleMetrics() throws Exception { MetricsSystem ms = new MetricsSystemImpl(); ShuffleHandler sh = new ShuffleHandler(ms); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StartContainerResponse.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StartContainerResponse.java index b648eb67e3..a83c520a7c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StartContainerResponse.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StartContainerResponse.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.api.protocolrecords; +import java.nio.ByteBuffer; +import java.util.Map; + import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.ContainerManager; @@ -32,5 +35,11 @@ @Public @Stable public interface StartContainerResponse { + Map getAllServiceResponse(); + ByteBuffer getServiceResponse(String key); + void addAllServiceResponse(Map serviceResponse); + void setServiceResponse(String key, ByteBuffer value); + void removeServiceResponse(String key); + void clearServiceResponse(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StartContainerResponsePBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StartContainerResponsePBImpl.java index ed415c8e4f..4fbdf97c7c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StartContainerResponsePBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StartContainerResponsePBImpl.java @@ -19,17 +19,26 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.List; + + import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerResponseProto; - - +import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerResponseProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto; public class StartContainerResponsePBImpl extends ProtoBase implements StartContainerResponse { StartContainerResponseProto proto = StartContainerResponseProto.getDefaultInstance(); StartContainerResponseProto.Builder builder = null; boolean viaProto = false; - + + private Map serviceResponse = null; + public StartContainerResponsePBImpl() { builder = StartContainerResponseProto.newBuilder(); } @@ -40,20 +49,113 @@ public StartContainerResponsePBImpl(StartContainerResponseProto proto) { } public StartContainerResponseProto getProto() { + mergeLocalToProto(); proto = viaProto ? proto : builder.build(); viaProto = true; return proto; } + private void mergeLocalToBuilder() { + if (this.serviceResponse != null) { + addServiceResponseToProto(); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + private void maybeInitBuilder() { if (viaProto || builder == null) { builder = StartContainerResponseProto.newBuilder(proto); } viaProto = false; } - + + + @Override + public Map getAllServiceResponse() { + initServiceResponse(); + return this.serviceResponse; + } + @Override + public ByteBuffer getServiceResponse(String key) { + initServiceResponse(); + return this.serviceResponse.get(key); + } + private void initServiceResponse() { + if (this.serviceResponse != null) { + return; + } + StartContainerResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getServiceResponseList(); + this.serviceResponse = new HashMap(); - - + for (StringBytesMapProto c : list) { + this.serviceResponse.put(c.getKey(), convertFromProtoFormat(c.getValue())); + } + } + + @Override + public void addAllServiceResponse(final Map serviceResponse) { + if (serviceResponse == null) + return; + initServiceResponse(); + this.serviceResponse.putAll(serviceResponse); + } + + private void addServiceResponseToProto() { + maybeInitBuilder(); + builder.clearServiceResponse(); + if (serviceResponse == null) + return; + Iterable iterable = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator keyIter = serviceResponse.keySet().iterator(); + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public StringBytesMapProto next() { + String key = keyIter.next(); + return StringBytesMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(serviceResponse.get(key))).build(); + } + + @Override + public boolean hasNext() { + return keyIter.hasNext(); + } + }; + } + }; + builder.addAllServiceResponse(iterable); + } + @Override + public void setServiceResponse(String key, ByteBuffer val) { + initServiceResponse(); + this.serviceResponse.put(key, val); + } + @Override + public void removeServiceResponse(String key) { + initServiceResponse(); + this.serviceResponse.remove(key); + } + @Override + public void clearServiceResponse() { + initServiceResponse(); + this.serviceResponse.clear(); + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 2c07abeb18..753c6b8c9a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -143,6 +143,7 @@ message StartContainerRequestProto { } message StartContainerResponseProto { + repeated StringBytesMapProto service_response = 1; } message StopContainerRequestProto { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index 4abf4a6a3e..254ff2a671 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -44,11 +44,14 @@ public class AuxServices extends AbstractService public static final String AUX_SERVICE_CLASS_FMT = "nodemanager.aux.service.%s.class"; public final Map serviceMap; + public final Map serviceMeta; public AuxServices() { super(AuxServices.class.getName()); serviceMap = Collections.synchronizedMap(new HashMap()); + serviceMeta = + Collections.synchronizedMap(new HashMap()); // Obtain services from configuration in init() } @@ -63,6 +66,15 @@ Collection getServices() { return Collections.unmodifiableCollection(serviceMap.values()); } + /** + * @return the meta data for all registered services, that have been started. + * If a service has not been started no metadata will be available. The key + * the the name of the service as defined in the configuration. + */ + public Map getMeta() { + return Collections.unmodifiableMap(serviceMeta); + } + @Override public void init(Configuration conf) { Collection auxNames = conf.getStringCollection(AUX_SERVICES); @@ -75,7 +87,15 @@ public void init(Configuration conf) { throw new RuntimeException("No class defiend for " + sName); } AuxiliaryService s = ReflectionUtils.newInstance(sClass, conf); - // TODO better use use s.getName()? + // TODO better use s.getName()? + if(!sName.equals(s.getName())) { + LOG.warn("The Auxilurary Service named '"+sName+"' in the " + +"configuration is for class "+sClass+" which has " + +"a name of '"+s.getName()+"'. Because these are " + +"not the same tools trying to send ServiceData and read " + +"Service Meta Data may have issues unless the refer to " + +"the name in the config."); + } addService(sName, s); s.init(conf); } catch (RuntimeException e) { @@ -90,9 +110,15 @@ public void init(Configuration conf) { public void start() { // TODO fork(?) services running as configured user // monitor for health, shutdown/restart(?) if any should die - for (Service service : serviceMap.values()) { + for (Map.Entry entry : serviceMap.entrySet()) { + AuxiliaryService service = entry.getValue(); + String name = entry.getKey(); service.start(); service.register(this); + ByteBuffer meta = service.getMeta(); + if(meta != null) { + serviceMeta.put(name, meta); + } } super.start(); } @@ -108,6 +134,7 @@ public void stop() { } } serviceMap.clear(); + serviceMeta.clear(); } } finally { super.stop(); @@ -146,6 +173,15 @@ public void handle(AuxServicesEvent event) { public interface AuxiliaryService extends Service { void initApp(String user, ApplicationId appId, ByteBuffer data); void stopApp(ApplicationId appId); + /** + * Retreive metadata for this service. This is likely going to be contact + * information so that applications can access the service remotely. Ideally + * each service should provide a method to parse out the information to a usable + * class. This will only be called after the services start method has finished. + * the result may be cached. + * @return metadata for this service that should be made avaiable to applications. + */ + ByteBuffer getMeta(); } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index c4bf2c299a..9248c3ee04 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -283,6 +283,7 @@ public StartContainerResponse startContainer(StartContainerRequest request) dispatcher.getEventHandler().handle(new ApplicationInitEvent(container)); StartContainerResponse response = recordFactory.newRecordInstance(StartContainerResponse.class); + response.addAllServiceResponse(auxiluaryServices.getMeta()); metrics.launchedContainer(); metrics.allocateContainer(launchContext.getResource()); return response; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index e30374b4ae..2324708150 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -22,6 +22,7 @@ import static org.junit.Assert.*; import java.nio.ByteBuffer; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -44,10 +45,16 @@ static class LightService extends AbstractService private final int expected_appId; private int remaining_init; private int remaining_stop; + private ByteBuffer meta = null; + LightService(String name, char idef, int expected_appId) { + this(name, idef, expected_appId, null); + } + LightService(String name, char idef, int expected_appId, ByteBuffer meta) { super(name); this.idef = idef; this.expected_appId = expected_appId; + this.meta = meta; } @Override public void init(Configuration conf) { @@ -71,14 +78,18 @@ public void initApp(String user, ApplicationId appId, ByteBuffer data) { public void stopApp(ApplicationId appId) { assertEquals(expected_appId, appId.getId()); } + @Override + public ByteBuffer getMeta() { + return meta; + } } static class ServiceA extends LightService { - public ServiceA() { super("A", 'A', 65); } + public ServiceA() { super("A", 'A', 65, ByteBuffer.wrap("A".getBytes())); } } static class ServiceB extends LightService { - public ServiceB() { super("B", 'B', 66); } + public ServiceB() { super("B", 'B', 66, ByteBuffer.wrap("B".getBytes())); } } @Test @@ -139,6 +150,44 @@ public void testAuxServices() { } } + + @Test + public void testAuxServicesMeta() { + Configuration conf = new Configuration(); + conf.setStrings(AuxServices.AUX_SERVICES, new String[] { "Asrv", "Bsrv" }); + conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Asrv"), + ServiceA.class, Service.class); + conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Bsrv"), + ServiceB.class, Service.class); + final AuxServices aux = new AuxServices(); + aux.init(conf); + + int latch = 1; + for (Service s : aux.getServices()) { + assertEquals(INITED, s.getServiceState()); + if (s instanceof ServiceA) { latch *= 2; } + else if (s instanceof ServiceB) { latch *= 3; } + else fail("Unexpected service type " + s.getClass()); + } + assertEquals("Invalid mix of services", 6, latch); + aux.start(); + for (Service s : aux.getServices()) { + assertEquals(STARTED, s.getServiceState()); + } + + Map meta = aux.getMeta(); + assertEquals(2, meta.size()); + assertEquals("A", new String(meta.get("Asrv").array())); + assertEquals("B", new String(meta.get("Bsrv").array())); + + aux.stop(); + for (Service s : aux.getServices()) { + assertEquals(STOPPED, s.getServiceState()); + } + } + + + @Test public void testAuxUnexpectedStop() { Configuration conf = new Configuration();