From f4d80e91ae314d316100baa7770b9d73ea853d9c Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 18 Jun 2013 06:20:37 +0000 Subject: [PATCH] YARN-841. Move Auxiliary service to yarn-api, annotate and document it. Contributed by Vinod Kumar Vavilapalli. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1494031 13f79535-47bb-0310-9956-ffa450edef68 --- .../app/launcher/ContainerLauncherImpl.java | 2 +- .../launcher/TestContainerLauncherImpl.java | 8 +- .../apache/hadoop/mapred/ShuffleHandler.java | 19 +++-- .../hadoop/mapred/TestShuffleHandler.java | 7 +- hadoop-yarn-project/CHANGES.txt | 3 + .../StartContainerResponse.java | 35 +++++--- .../impl/pb/StartContainerResponsePBImpl.java | 46 +++++------ .../api/records/ContainerLaunchContext.java | 29 ++++++- .../api/ApplicationInitializationContext.java | 81 ++++++++++++++++++ .../api/ApplicationTerminationContext.java | 52 ++++++++++++ .../yarn/server/api/AuxiliaryService.java | 82 +++++++++++++++++++ .../hadoop/yarn/server/api/package-info.java | 21 +++++ .../src/main/proto/yarn_service_protos.proto | 2 +- .../yarn/client/api/impl/NMClientImpl.java | 2 +- .../containermanager/AuxServices.java | 43 ++++------ .../containermanager/AuxServicesEvent.java | 1 - .../ContainerManagerImpl.java | 3 +- .../containermanager/TestAuxServices.java | 25 +++--- 18 files changed, 366 insertions(+), 95 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ApplicationInitializationContext.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ApplicationTerminationContext.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/package-info.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 02f15f14ed..283fad0768 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -158,7 +158,7 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) { StartContainerResponse response = proxy.startContainer(startRequest); ByteBuffer portInfo = - response.getAllServiceResponse().get( + response.getAllServicesMetaData().get( ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID); int port = -1; if(portInfo != null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java index c75ef92df9..dcd18dc8bd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java @@ -164,7 +164,7 @@ public void testHandle() throws Exception { String cmAddress = "127.0.0.1:8000"; StartContainerResponse startResp = recordFactory.newRecordInstance(StartContainerResponse.class); - startResp.setAllServiceResponse(serviceResponse); + startResp.setAllServicesMetaData(serviceResponse); LOG.info("inserting launch event"); @@ -230,7 +230,7 @@ public void testOutOfOrder() throws Exception { String cmAddress = "127.0.0.1:8000"; StartContainerResponse startResp = recordFactory.newRecordInstance(StartContainerResponse.class); - startResp.setAllServiceResponse(serviceResponse); + startResp.setAllServicesMetaData(serviceResponse); LOG.info("inserting cleanup event"); ContainerLauncherEvent mockCleanupEvent = @@ -296,7 +296,7 @@ public void testMyShutdown() throws Exception { String cmAddress = "127.0.0.1:8000"; StartContainerResponse startResp = recordFactory.newRecordInstance(StartContainerResponse.class); - startResp.setAllServiceResponse(serviceResponse); + startResp.setAllServicesMetaData(serviceResponse); LOG.info("inserting launch event"); ContainerRemoteLaunchEvent mockLaunchEvent = @@ -355,7 +355,7 @@ public void testContainerCleaned() throws Exception { String cmAddress = "127.0.0.1:8000"; StartContainerResponse startResp = recordFactory.newRecordInstance(StartContainerResponse.class); - startResp.setAllServiceResponse(serviceResponse); + startResp.setAllServicesMetaData(serviceResponse); LOG.info("inserting launch event"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 39611bbe56..27b57dd262 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -73,10 +73,11 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices; +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; +import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.util.ConverterUtils; import org.jboss.netty.bootstrap.ServerBootstrap; @@ -112,8 +113,7 @@ import com.google.common.base.Charsets; import com.google.common.util.concurrent.ThreadFactoryBuilder; -public class ShuffleHandler extends AbstractService - implements AuxServices.AuxiliaryService { +public class ShuffleHandler extends AuxiliaryService { private static final Log LOG = LogFactory.getLog(ShuffleHandler.class); @@ -244,7 +244,11 @@ static Token deserializeServiceData(ByteBuffer secret) throw } @Override - public void initApp(String user, ApplicationId appId, ByteBuffer secret) { + public void initializeApplication(ApplicationInitializationContext context) { + + String user = context.getUser(); + ApplicationId appId = context.getApplicationId(); + ByteBuffer secret = context.getApplicationDataForService(); // TODO these bytes should be versioned try { Token jt = deserializeServiceData(secret); @@ -260,7 +264,8 @@ public void initApp(String user, ApplicationId appId, ByteBuffer secret) { } @Override - public void stopApp(ApplicationId appId) { + public void stopApplication(ApplicationTerminationContext context) { + ApplicationId appId = context.getApplicationId(); JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId()); secretManager.removeTokenForJob(jobId.toString()); userRsrc.remove(jobId.toString()); @@ -328,7 +333,7 @@ protected void serviceStop() throws Exception { } @Override - public synchronized ByteBuffer getMeta() { + public synchronized ByteBuffer getMetaData() { try { return serializeMetaData(port); } catch (IOException e) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index c93a884c8b..c164e95a4b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -64,6 +64,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; @@ -317,8 +318,10 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx, new Token("identifier".getBytes(), "password".getBytes(), new Text(user), new Text("shuffleService")); jt.write(outputBuffer); - shuffleHandler.initApp(user, appId, - ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength())); + shuffleHandler + .initializeApplication(new ApplicationInitializationContext(user, + appId, ByteBuffer.wrap(outputBuffer.getData(), 0, + outputBuffer.getLength()))); URL url = new URL( "http://127.0.0.1:" diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ca484c2c05..61d9ce52ea 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -193,6 +193,9 @@ Release 2.1.0-beta - UNRELEASED YARN-840. Moved ProtoUtils to yarn.api.records.pb.impl. (Jian He via acmurthy) + YARN-841. Move Auxiliary service to yarn-api, annotate and document it. + (vinodkv) + NEW FEATURES YARN-482. FS: Extend SchedulingMode to intermediate queues. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StartContainerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StartContainerResponse.java index e788305c7c..8df11db9b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StartContainerResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StartContainerResponse.java @@ -42,32 +42,45 @@ public abstract class StartContainerResponse { @Private @Unstable public static StartContainerResponse newInstance( - Map serviceResponses) { + Map servicesMetaData) { StartContainerResponse response = Records.newRecord(StartContainerResponse.class); - response.setAllServiceResponse(serviceResponses); + response.setAllServicesMetaData(servicesMetaData); return response; } /** - *

Get the responses from all auxiliary services running on the - * NodeManager.

- *

The responses are returned as a Map between the auxiliary service names - * and their corresponding opaque blob ByteBuffers

- * @return a Map between the auxiliary service names and their outputs + *

+ * Get the meta-data from all auxiliary services running on the + * NodeManager. + *

+ *

+ * The meta-data is returned as a Map between the auxiliary service names and + * their corresponding per service meta-data as an opaque blob + * ByteBuffer + *

+ * + *

+ * To be able to interpret the per-service meta-data, you should consult the + * documentation for the Auxiliary-service configured on the NodeManager + *

+ * + * @return a Map between the names of auxiliary services and their + * corresponding meta-data */ @Public @Stable - public abstract Map getAllServiceResponse(); + public abstract Map getAllServicesMetaData(); /** * Set to the list of auxiliary services which have been started on the * NodeManager. This is done only once when the * NodeManager starts up - * @param serviceResponses A map from auxiliary service names to the opaque - * blob ByteBuffers for that auxiliary service + * @param allServicesMetaData A map from auxiliary service names to the opaque + * blob ByteBuffer for that auxiliary service */ @Private @Unstable - public abstract void setAllServiceResponse(Map serviceResponses); + public abstract void setAllServicesMetaData( + Map allServicesMetaData); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StartContainerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StartContainerResponsePBImpl.java index dba83ce442..8edbfa7869 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StartContainerResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StartContainerResponsePBImpl.java @@ -42,7 +42,7 @@ public class StartContainerResponsePBImpl extends StartContainerResponse { StartContainerResponseProto.Builder builder = null; boolean viaProto = false; - private Map serviceResponse = null; + private Map servicesMetaData = null; public StartContainerResponsePBImpl() { builder = StartContainerResponseProto.newBuilder(); @@ -81,8 +81,8 @@ public String toString() { } private synchronized void mergeLocalToBuilder() { - if (this.serviceResponse != null) { - addServiceResponseToProto(); + if (this.servicesMetaData != null) { + addServicesMetaDataToProto(); } } @@ -112,38 +112,38 @@ private synchronized void maybeInitBuilder() { @Override - public synchronized Map getAllServiceResponse() { - initServiceResponse(); - return this.serviceResponse; + public synchronized Map getAllServicesMetaData() { + initServicesMetaData(); + return this.servicesMetaData; } @Override - public synchronized void setAllServiceResponse( - Map serviceResponses) { - if(serviceResponses == null) { + public synchronized void setAllServicesMetaData( + Map servicesMetaData) { + if(servicesMetaData == null) { return; } - initServiceResponse(); - this.serviceResponse.clear(); - this.serviceResponse.putAll(serviceResponses); + initServicesMetaData(); + this.servicesMetaData.clear(); + this.servicesMetaData.putAll(servicesMetaData); } - private synchronized void initServiceResponse() { - if (this.serviceResponse != null) { + private synchronized void initServicesMetaData() { + if (this.servicesMetaData != null) { return; } StartContainerResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getServiceResponseList(); - this.serviceResponse = new HashMap(); + List list = p.getServicesMetaDataList(); + this.servicesMetaData = new HashMap(); for (StringBytesMapProto c : list) { - this.serviceResponse.put(c.getKey(), convertFromProtoFormat(c.getValue())); + this.servicesMetaData.put(c.getKey(), convertFromProtoFormat(c.getValue())); } } - private synchronized void addServiceResponseToProto() { + private synchronized void addServicesMetaDataToProto() { maybeInitBuilder(); - builder.clearServiceResponse(); - if (serviceResponse == null) + builder.clearServicesMetaData(); + if (servicesMetaData == null) return; Iterable iterable = new Iterable() { @@ -151,7 +151,7 @@ private synchronized void addServiceResponseToProto() { public synchronized Iterator iterator() { return new Iterator() { - Iterator keyIter = serviceResponse.keySet().iterator(); + Iterator keyIter = servicesMetaData.keySet().iterator(); @Override public synchronized void remove() { @@ -161,7 +161,7 @@ public synchronized void remove() { @Override public synchronized StringBytesMapProto next() { String key = keyIter.next(); - return StringBytesMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(serviceResponse.get(key))).build(); + return StringBytesMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(servicesMetaData.get(key))).build(); } @Override @@ -171,6 +171,6 @@ public synchronized boolean hasNext() { }; } }; - builder.addAllServiceResponse(iterable); + builder.addAllServicesMetaData(iterable); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java index da2a164dfb..54c1d07bac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java @@ -25,6 +25,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.util.Records; /** @@ -109,7 +111,19 @@ public static ContainerLaunchContext newInstance( public abstract void setLocalResources(Map localResources); /** - * Get application-specific binary service data. + *

+ * Get application-specific binary service data. This is a map keyed + * by the name of each {@link AuxiliaryService} that is configured on a + * NodeManager and value correspond to the application specific data targeted + * for the keyed {@link AuxiliaryService}. + *

+ * + *

+ * This will be used to initialize this application on the specific + * {@link AuxiliaryService} running on the NodeManager by calling + * {@link AuxiliaryService#initializeApplication(ApplicationInitializationContext)} + *

+ * * @return application-specific binary service data */ @Public @@ -117,9 +131,16 @@ public static ContainerLaunchContext newInstance( public abstract Map getServiceData(); /** - * Set application-specific binary service data. All pre-existing Map - * entries are preserved. - * @param serviceData application-specific binary service data + *

+ * Get application-specific binary service data. This is a map keyed + * by the name of each {@link AuxiliaryService} that is configured on a + * NodeManager and value correspond to the application specific data targeted + * for the keyed {@link AuxiliaryService}. All pre-existing Map entries are + * preserved. + *

+ * + * @param serviceData + * application-specific binary service data */ @Public @Stable diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ApplicationInitializationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ApplicationInitializationContext.java new file mode 100644 index 0000000000..b6ec2e1ab2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ApplicationInitializationContext.java @@ -0,0 +1,81 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.api; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; + +/** + * Initialization context for {@link AuxiliaryService} when starting an + * application. + */ +@Public +@Evolving +public class ApplicationInitializationContext { + + private final String user; + private final ApplicationId applicationId; + private ByteBuffer appDataForService; + + @Private + @Unstable + public ApplicationInitializationContext(String user, ApplicationId applicationId, + ByteBuffer appDataForService) { + this.user = user; + this.applicationId = applicationId; + this.appDataForService = appDataForService; + } + + /** + * Get the user-name of the application-submitter + * + * @return user-name + */ + public String getUser() { + return this.user; + } + + /** + * Get {@link ApplicationId} of the application + * + * @return applications ID + */ + public ApplicationId getApplicationId() { + return this.applicationId; + } + + /** + * Get the data sent to the NodeManager via + * {@link ContainerManagementProtocol#startContainer(StartContainerRequest)} + * as part of {@link ContainerLaunchContext#getServiceData()} + * + * @return the servicesData for this application. + */ + public ByteBuffer getApplicationDataForService() { + return this.appDataForService; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ApplicationTerminationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ApplicationTerminationContext.java new file mode 100644 index 0000000000..8187511a96 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ApplicationTerminationContext.java @@ -0,0 +1,52 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + * Initialization context for {@link AuxiliaryService} when stopping an + * application. + * + */ +@Public +@Evolving +public class ApplicationTerminationContext { + + private final ApplicationId applicationId; + + @Private + @Unstable + public ApplicationTerminationContext(ApplicationId applicationId) { + this.applicationId = applicationId; + } + + /** + * Get {@link ApplicationId} of the application being stopped. + * + * @return applications ID + */ + public ApplicationId getApplicationId() { + return this.applicationId; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java new file mode 100644 index 0000000000..e50098fc44 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java @@ -0,0 +1,82 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.api; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * A generic service that will be started by the NodeManager. This is a service + * that administrators have to configure on each node by setting + * {@link YarnConfiguration#NM_AUX_SERVICES}. + * + */ +@Public +@Evolving +public abstract class AuxiliaryService extends AbstractService { + + protected AuxiliaryService(String name) { + super(name); + } + + /** + * A new application is started on this NodeManager. This is a signal to + * this {@link AuxiliaryService} about the application initialization. + * + * @param initAppContext context for the application's initialization + */ + public abstract void initializeApplication( + ApplicationInitializationContext initAppContext); + + /** + * An application is finishing on this NodeManager. This is a signal to this + * {@link AuxiliaryService} about the same. + * + * @param stopAppContext context for the application termination + */ + public abstract void stopApplication( + ApplicationTerminationContext stopAppContext); + + /** + * Retrieve meta-data for this {@link AuxiliaryService}. Applications using + * this {@link AuxiliaryService} SHOULD know the format of the meta-data - + * ideally each service should provide a method to parse out the information + * to the applications. One example of meta-data is contact information so + * that applications can access the service remotely. This will only be called + * after the service's {@link #start()} method has finished. the result may be + * cached. + * + *

+ * The information is passed along to applications via + * {@link StartContainerResponse#getAllServicesMetaData()} that is returned by + * {@link ContainerManagementProtocol#startContainer(StartContainerRequest)} + *

+ * + * @return meta-data for this service that should be made available to + * applications. + */ + public abstract ByteBuffer getMetaData(); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/package-info.java new file mode 100644 index 0000000000..dd4cc3da44 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Public +package org.apache.hadoop.yarn.server.api; +import org.apache.hadoop.classification.InterfaceAudience; + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index dc2b100b5b..791239d526 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -160,7 +160,7 @@ message StartContainerRequestProto { } message StartContainerResponseProto { - repeated StringBytesMapProto service_response = 1; + repeated StringBytesMapProto services_meta_data = 1; } message StopContainerRequestProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index 89c92a52e0..bef0388249 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -242,7 +242,7 @@ public synchronized Map startContainer( LOG.warn("Container " + containerId + " failed to start", e); throw e; } - return startResponse.getAllServiceResponse(); + return startResponse.getAllServicesMetaData(); } public synchronized void stopContainer() throws YarnException, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index 84c47324de..1f46cb21ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -32,9 +32,11 @@ import org.apache.hadoop.service.Service; import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryService; +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; public class AuxServices extends AbstractService implements ServiceStateChangeListener, EventHandler { @@ -42,13 +44,13 @@ public class AuxServices extends AbstractService private static final Log LOG = LogFactory.getLog(AuxServices.class); protected final Map serviceMap; - protected final Map serviceMeta; + protected final Map serviceMetaData; public AuxServices() { super(AuxServices.class.getName()); serviceMap = Collections.synchronizedMap(new HashMap()); - serviceMeta = + serviceMetaData = Collections.synchronizedMap(new HashMap()); // Obtain services from configuration in init() } @@ -69,11 +71,11 @@ Collection getServices() { * If a service has not been started no metadata will be available. The key * is the name of the service as defined in the configuration. */ - public Map getMeta() { + public Map getMetaData() { Map metaClone = new HashMap( - serviceMeta.size()); - synchronized (serviceMeta) { - for (Entry entry : serviceMeta.entrySet()) { + serviceMetaData.size()); + synchronized (serviceMetaData) { + for (Entry entry : serviceMetaData.entrySet()) { metaClone.put(entry.getKey(), entry.getValue().duplicate()); } } @@ -121,9 +123,9 @@ public void serviceStart() throws Exception { String name = entry.getKey(); service.start(); service.registerServiceListener(this); - ByteBuffer meta = service.getMeta(); + ByteBuffer meta = service.getMetaData(); if(meta != null) { - serviceMeta.put(name, meta); + serviceMetaData.put(name, meta); } } super.serviceStart(); @@ -140,7 +142,7 @@ public void serviceStop() throws Exception { } } serviceMap.clear(); - serviceMeta.clear(); + serviceMetaData.clear(); } } finally { super.serviceStop(); @@ -167,12 +169,13 @@ public void handle(AuxServicesEvent event) { // TODO kill all containers waiting on Application return; } - service.initApp(event.getUser(), event.getApplicationID(), - event.getServiceData()); + service.initializeApplication(new ApplicationInitializationContext(event + .getUser(), event.getApplicationID(), event.getServiceData())); break; case APPLICATION_STOP: for (AuxiliaryService serv : serviceMap.values()) { - serv.stopApp(event.getApplicationID()); + serv.stopApplication(new ApplicationTerminationContext(event + .getApplicationID())); } break; default: @@ -180,18 +183,4 @@ public void handle(AuxServicesEvent event) { } } - public interface AuxiliaryService extends Service { - void initApp(String user, ApplicationId appId, ByteBuffer data); - void stopApp(ApplicationId appId); - /** - * Retreive metadata for this service. This is likely going to be contact - * information so that applications can access the service remotely. Ideally - * each service should provide a method to parse out the information to a usable - * class. This will only be called after the services start method has finished. - * the result may be cached. - * @return metadata for this service that should be made avaiable to applications. - */ - ByteBuffer getMeta(); - } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEvent.java index 8e897be250..4b9c93157b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEvent.java @@ -22,7 +22,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.hadoop.yarn.event.Event; public class AuxServicesEvent extends AbstractEvent { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 04b8b90498..69a5006a0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -491,8 +491,7 @@ public StartContainerResponse startContainer(StartContainerRequest request) applicationID, containerID); StartContainerResponse response = - recordFactory.newRecordInstance(StartContainerResponse.class); - response.setAllServiceResponse(auxiliaryServices.getMeta()); + StartContainerResponse.newInstance(auxiliaryServices.getMetaData()); // TODO launchedContainer misplaced -> doesn't necessarily mean a container // launch. A finished Application will not launch containers. metrics.launchedContainer(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index 556d7b0296..fb4b69a21f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -33,17 +33,19 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; +import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.junit.Test; public class TestAuxServices { private static final Log LOG = LogFactory.getLog(TestAuxServices.class); - static class LightService extends AbstractService - implements AuxServices.AuxiliaryService { + static class LightService extends AuxiliaryService implements Service + { private final char idef; private final int expected_appId; private int remaining_init; @@ -79,17 +81,18 @@ protected void serviceStop() throws Exception { super.serviceStop(); } @Override - public void initApp(String user, ApplicationId appId, ByteBuffer data) { + public void initializeApplication(ApplicationInitializationContext context) { + ByteBuffer data = context.getApplicationDataForService(); assertEquals(idef, data.getChar()); assertEquals(expected_appId, data.getInt()); - assertEquals(expected_appId, appId.getId()); + assertEquals(expected_appId, context.getApplicationId().getId()); } @Override - public void stopApp(ApplicationId appId) { - stoppedApps.add(appId.getId()); + public void stopApplication(ApplicationTerminationContext context) { + stoppedApps.add(context.getApplicationId().getId()); } @Override - public ByteBuffer getMeta() { + public ByteBuffer getMetaData() { return meta; } } @@ -133,8 +136,8 @@ public void testAuxEventDispatch() { AuxServicesEventType.APPLICATION_STOP, "user0", appId2, "Bsrv", null); // verify all services got the stop event aux.handle(event); - Collection servs = aux.getServices(); - for (AuxServices.AuxiliaryService serv: servs) { + Collection servs = aux.getServices(); + for (AuxiliaryService serv: servs) { ArrayList appIds = ((LightService)serv).getAppIdsStopped(); assertEquals("app not properly stopped", 1, appIds.size()); assertTrue("wrong app stopped", appIds.contains((Integer)66)); @@ -196,7 +199,7 @@ public void testAuxServicesMeta() { assertEquals(STARTED, s.getServiceState()); } - Map meta = aux.getMeta(); + Map meta = aux.getMetaData(); assertEquals(2, meta.size()); assertEquals("A", new String(meta.get("Asrv").array())); assertEquals("B", new String(meta.get("Bsrv").array()));