diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 8398e50a69..f23adc7bd0 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -118,6 +118,12 @@ ${project.version} + + org.apache.hadoop + hadoop-yarn-client + ${project.version} + + org.apache.hadoop hadoop-mapreduce-client-core diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ca1810ed50..734e0e1a7c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -33,6 +33,8 @@ Release 2.1.0-alpha - Unreleased IMPROVEMENTS + YARN-29. Add a yarn-client module. (Vinod Kumar Vavilapalli via sseth) + BUG FIXES YARN-12. Fix findbugs warnings in FairScheduler. (Junping Du via acmurthy) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml index babaa34a83..d29edc1be2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml @@ -41,6 +41,10 @@ org.apache.hadoop hadoop-yarn-common + + org.apache.hadoop + hadoop-yarn-client + org.apache.hadoop hadoop-yarn-server-nodemanager diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 1d598d8310..c5e04a8c4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -44,20 +43,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -73,12 +60,12 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; -import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; - +import org.hadoop.yarn.client.YarnClientImpl; /** * Client for Distributed Shell application submission to YARN. @@ -113,19 +100,13 @@ */ @InterfaceAudience.Public @InterfaceStability.Unstable -public class Client { +public class Client extends YarnClientImpl { private static final Log LOG = LogFactory.getLog(Client.class); // Configuration private Configuration conf; - // RPC to communicate to RM - private YarnRPC rpc; - - // Handle to talk to the Resource Manager/Applications Manager - private ClientRMProtocol applicationsManager; - // Application master specific info to register a new Application with RM/ASM private String appName = ""; // App master priority @@ -196,9 +177,9 @@ public static void main(String[] args) { /** */ public Client(Configuration conf) throws Exception { - // Set up the configuration and RPC + super(); this.conf = conf; - rpc = YarnRPC.create(conf); + init(conf); } /** @@ -328,22 +309,17 @@ public boolean init(String[] args) throws ParseException { * @throws IOException */ public boolean run() throws IOException { - LOG.info("Starting Client"); - // Connect to ResourceManager - connectToASM(); - assert(applicationsManager != null); + LOG.info("Running Client"); + start(); - // Use ClientRMProtocol handle to general cluster information - GetClusterMetricsRequest clusterMetricsReq = Records.newRecord(GetClusterMetricsRequest.class); - GetClusterMetricsResponse clusterMetricsResp = applicationsManager.getClusterMetrics(clusterMetricsReq); + YarnClusterMetrics clusterMetrics = super.getYarnClusterMetrics(); LOG.info("Got Cluster metric info from ASM" - + ", numNodeManagers=" + clusterMetricsResp.getClusterMetrics().getNumNodeManagers()); + + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers()); - GetClusterNodesRequest clusterNodesReq = Records.newRecord(GetClusterNodesRequest.class); - GetClusterNodesResponse clusterNodesResp = applicationsManager.getClusterNodes(clusterNodesReq); + List clusterNodeReports = super.getNodeReports(); LOG.info("Got Cluster node info from ASM"); - for (NodeReport node : clusterNodesResp.getNodeReports()) { + for (NodeReport node : clusterNodeReports) { LOG.info("Got node report from ASM for" + ", nodeId=" + node.getNodeId() + ", nodeAddress" + node.getHttpAddress() @@ -352,10 +328,7 @@ public boolean run() throws IOException { + ", nodeHealthStatus" + node.getNodeHealthStatus()); } - GetQueueInfoRequest queueInfoReq = Records.newRecord(GetQueueInfoRequest.class); - queueInfoReq.setQueueName(this.amQueue); - GetQueueInfoResponse queueInfoResp = applicationsManager.getQueueInfo(queueInfoReq); - QueueInfo queueInfo = queueInfoResp.getQueueInfo(); + QueueInfo queueInfo = super.getQueueInfo(this.amQueue); LOG.info("Queue info" + ", queueName=" + queueInfo.getQueueName() + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity() @@ -363,9 +336,7 @@ public boolean run() throws IOException { + ", queueApplicationCount=" + queueInfo.getApplications().size() + ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); - GetQueueUserAclsInfoRequest queueUserAclsReq = Records.newRecord(GetQueueUserAclsInfoRequest.class); - GetQueueUserAclsInfoResponse queueUserAclsResp = applicationsManager.getQueueUserAcls(queueUserAclsReq); - List listAclInfo = queueUserAclsResp.getUserAclsInfoList(); + List listAclInfo = super.getQueueAclsInfo(); for (QueueUserACLInfo aclInfo : listAclInfo) { for (QueueACL userAcl : aclInfo.getUserAcls()) { LOG.info("User ACL Info for Queue" @@ -375,7 +346,7 @@ public boolean run() throws IOException { } // Get a new application id - GetNewApplicationResponse newApp = getApplication(); + GetNewApplicationResponse newApp = super.getNewApplication(); ApplicationId appId = newApp.getApplicationId(); // TODO get min/max resource capabilities from RM and change memory ask if needed @@ -590,16 +561,12 @@ else if (amMemory > maxMem) { // Set the queue to which this application is to be submitted in the RM appContext.setQueue(amQueue); - // Create the request to send to the applications manager - SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class); - appRequest.setApplicationSubmissionContext(appContext); - // Submit the application to the applications manager // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); // Ignore the response as either a valid response object is returned on success // or an exception thrown to denote some form of a failure LOG.info("Submitting application to ASM"); - applicationsManager.submitApplication(appRequest); + super.submitApplication(appContext); // TODO // Try submitting the same request again @@ -629,10 +596,7 @@ private boolean monitorApplication(ApplicationId appId) throws YarnRemoteExcepti } // Get application report for the appId we are interested in - GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class); - reportRequest.setApplicationId(appId); - GetApplicationReportResponse reportResponse = applicationsManager.getApplicationReport(reportRequest); - ApplicationReport report = reportResponse.getApplicationReport(); + ApplicationReport report = super.getApplicationReport(appId); LOG.info("Got application report from ASM for" + ", appId=" + appId.getId() @@ -671,7 +635,7 @@ else if (YarnApplicationState.KILLED == state if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { LOG.info("Reached client specified timeout for application. Killing application"); - killApplication(appId); + forceKillApplication(appId); return false; } } @@ -683,61 +647,14 @@ else if (YarnApplicationState.KILLED == state * @param appId Application Id to be killed. * @throws YarnRemoteException */ - private void killApplication(ApplicationId appId) throws YarnRemoteException { - KillApplicationRequest request = Records.newRecord(KillApplicationRequest.class); + private void forceKillApplication(ApplicationId appId) throws YarnRemoteException { // TODO clarify whether multiple jobs with the same app id can be submitted and be running at // the same time. // If yes, can we kill a particular attempt only? - request.setApplicationId(appId); - // KillApplicationResponse response = applicationsManager.forceKillApplication(request); + // Response can be ignored as it is non-null on success or // throws an exception in case of failures - applicationsManager.forceKillApplication(request); - } - - /** - * Connect to the Resource Manager/Applications Manager - * @return Handle to communicate with the ASM - * @throws IOException - */ - private void connectToASM() throws IOException { - - /* - UserGroupInformation user = UserGroupInformation.getCurrentUser(); - applicationsManager = user.doAs(new PrivilegedAction() { - public ClientRMProtocol run() { - InetSocketAddress rmAddress = NetUtils.createSocketAddr(conf.get( - YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); - LOG.info("Connecting to ResourceManager at " + rmAddress); - Configuration appsManagerServerConf = new Configuration(conf); - appsManagerServerConf.setClass(YarnConfiguration.YARN_SECURITY_INFO, - ClientRMSecurityInfo.class, SecurityInfo.class); - ClientRMProtocol asm = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, appsManagerServerConf)); - return asm; - } - }); - */ - YarnConfiguration yarnConf = new YarnConfiguration(conf); - InetSocketAddress rmAddress = yarnConf.getSocketAddr( - YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT); - LOG.info("Connecting to ResourceManager at " + rmAddress); - applicationsManager = ((ClientRMProtocol) rpc.getProxy( - ClientRMProtocol.class, rmAddress, conf)); - } - - /** - * Get a new application from the ASM - * @return New Application - * @throws YarnRemoteException - */ - private GetNewApplicationResponse getApplication() throws YarnRemoteException { - GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class); - GetNewApplicationResponse response = applicationsManager.getNewApplication(request); - LOG.info("Got new application id=" + response.getApplicationId()); - return response; + super.killApplication(appId); } private static String getTestRuntimeClasspath() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/dev-support/findbugs-exclude.xml new file mode 100644 index 0000000000..0e037a2ad0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/dev-support/findbugs-exclude.xml @@ -0,0 +1,19 @@ + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml new file mode 100644 index 0000000000..2646f6d0dc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml @@ -0,0 +1,37 @@ + + + + 4.0.0 + + hadoop-yarn + org.apache.hadoop + 3.0.0-SNAPSHOT + + org.apache.hadoop + hadoop-yarn-client + 3.0.0-SNAPSHOT + hadoop-yarn-client + + + + org.apache.hadoop + hadoop-yarn-api + + + org.apache.hadoop + hadoop-yarn-common + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/hadoop/yarn/client/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/hadoop/yarn/client/YarnClient.java new file mode 100644 index 0000000000..eca80c919f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/hadoop/yarn/client/YarnClient.java @@ -0,0 +1,234 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.hadoop.yarn.client; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.DelegationToken; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.service.Service; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface YarnClient extends Service { + + /** + *

+ * Obtain a new {@link ApplicationId} for submitting new applications. + *

+ * + *

+ * Returns a response which contains {@link ApplicationId} that can be used to + * submit a new application. See + * {@link #submitApplication(ApplicationSubmissionContext)}. + *

+ * + *

+ * See {@link GetNewApplicationResponse} for other information that is + * returned. + *

+ * + * @return response containing the new ApplicationId to be used + * to submit an application + * @throws YarnRemoteException + */ + GetNewApplicationResponse getNewApplication() throws YarnRemoteException; + + /** + *

+ * Submit a new application to YARN. + *

+ * + * @param appContext + * {@link ApplicationSubmissionContext} containing all the details + * needed to submit a new application + * @return {@link ApplicationId} of the accepted application + * @throws YarnRemoteException + * @see #getNewApplication() + */ + ApplicationId submitApplication(ApplicationSubmissionContext appContext) + throws YarnRemoteException; + + /** + *

+ * Kill an application identified by given ID. + *

+ * + * @param applicationId + * {@link ApplicationId} of the application that needs to be killed + * @throws YarnRemoteException + * in case of errors or if YARN rejects the request due to + * access-control restrictions. + * @see #getQueueAclsInfo() + */ + void killApplication(ApplicationId applicationId) throws YarnRemoteException; + + /** + *

+ * Get a report of the given Application. + *

+ * + *

+ * In secure mode, YARN verifies access to the application, queue + * etc. before accepting the request. + *

+ * + *

+ * If the user does not have VIEW_APP access then the following + * fields in the report will be set to stubbed values: + *

    + *
  • host - set to "N/A"
  • + *
  • RPC port - set to -1
  • + *
  • client token - set to "N/A"
  • + *
  • diagnostics - set to "N/A"
  • + *
  • tracking URL - set to "N/A"
  • + *
  • original tracking URL - set to "N/A"
  • + *
  • resource usage report - all values are -1
  • + *
+ *

+ * + * @param appId + * {@link ApplicationId} of the application that needs a report + * @return application report + * @throws YarnRemoteException + */ + ApplicationReport getApplicationReport(ApplicationId appId) + throws YarnRemoteException; + + /** + *

+ * Get a report (ApplicationReport) of all Applications in the cluster. + *

+ * + *

+ * If the user does not have VIEW_APP access for an application + * then the corresponding report will be filtered as described in + * {@link #getApplicationReport(ApplicationId)}. + *

+ * + * @return a list of reports of all running applications + * @throws YarnRemoteException + */ + List getApplicationList() throws YarnRemoteException; + + /** + *

+ * Get metrics ({@link YarnClusterMetrics}) about the cluster. + *

+ * + * @return cluster metrics + * @throws YarnRemoteException + */ + YarnClusterMetrics getYarnClusterMetrics() throws YarnRemoteException; + + /** + *

+ * Get a report of all nodes ({@link NodeReport}) in the cluster. + *

+ * + * @return A list of report of all nodes + * @throws YarnRemoteException + */ + List getNodeReports() throws YarnRemoteException; + + /** + *

+ * Get a delegation token so as to be able to talk to YARN using those tokens. + * + * @param renewer + * Address of the renewer who can renew these tokens when needed by + * securely talking to YARN. + * @return a delegation token ({@link DelegationToken}) that can be used to + * talk to YARN + * @throws YarnRemoteException + */ + DelegationToken getRMDelegationToken(Text renewer) throws YarnRemoteException; + + /** + *

+ * Get information ({@link QueueInfo}) about a given queue. + *

+ * + * @param queueName + * Name of the queue whose information is needed + * @return queue information + * @throws YarnRemoteException + * in case of errors or if YARN rejects the request due to + * access-control restrictions. + */ + QueueInfo getQueueInfo(String queueName) throws YarnRemoteException; + + /** + *

+ * Get information ({@link QueueInfo}) about all queues, recursively if there + * is a hierarchy + *

+ * + * @return a list of queue-information for all queues + * @throws YarnRemoteException + */ + List getAllQueues() throws YarnRemoteException; + + /** + *

+ * Get information ({@link QueueInfo}) about top level queues. + *

+ * + * @return a list of queue-information for all the top-level queues + * @throws YarnRemoteException + */ + List getRootQueueInfos() throws YarnRemoteException; + + /** + *

+ * Get information ({@link QueueInfo}) about all the immediate children queues + * of the given queue + *

+ * + * @param parent + * Name of the queue whose child-queues' information is needed + * @return a list of queue-information for all queues who are direct children + * of the given parent queue. + * @throws YarnRemoteException + */ + List getChildQueueInfos(String parent) throws YarnRemoteException; + + /** + *

+ * Get information about acls for current user on all the + * existing queues. + *

+ * + * @return a list of queue acls ({@link QueueUserACLInfo}) for + * current user + * @throws YarnRemoteException + */ + List getQueueAclsInfo() throws YarnRemoteException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/hadoop/yarn/client/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/hadoop/yarn/client/YarnClientImpl.java new file mode 100644 index 0000000000..927a261e75 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/hadoop/yarn/client/YarnClientImpl.java @@ -0,0 +1,258 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.hadoop.yarn.client; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.DelegationToken; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.util.Records; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class YarnClientImpl extends AbstractService implements YarnClient { + + private static final Log LOG = LogFactory.getLog(YarnClientImpl.class); + + protected ClientRMProtocol rmClient; + protected InetSocketAddress rmAddress; + + private static final String ROOT = "root"; + + public YarnClientImpl() { + super(YarnClientImpl.class.getName()); + } + + private static InetSocketAddress getRmAddress(Configuration conf) { + return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); + } + + @Override + public synchronized void init(Configuration conf) { + this.rmAddress = getRmAddress(conf); + super.init(conf); + } + + @Override + public synchronized void start() { + YarnRPC rpc = YarnRPC.create(getConfig()); + + this.rmClient = + (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, + getConfig()); + LOG.debug("Connecting to ResourceManager at " + rmAddress); + super.start(); + } + + @Override + public synchronized void stop() { + RPC.stopProxy(this.rmClient); + super.stop(); + } + + @Override + public GetNewApplicationResponse getNewApplication() + throws YarnRemoteException { + GetNewApplicationRequest request = + Records.newRecord(GetNewApplicationRequest.class); + return rmClient.getNewApplication(request); + } + + @Override + public ApplicationId + submitApplication(ApplicationSubmissionContext appContext) + throws YarnRemoteException { + ApplicationId applicationId = appContext.getApplicationId(); + appContext.setApplicationId(applicationId); + SubmitApplicationRequest request = + Records.newRecord(SubmitApplicationRequest.class); + request.setApplicationSubmissionContext(appContext); + rmClient.submitApplication(request); + LOG.info("Submitted application " + applicationId + " to ResourceManager" + + " at " + rmAddress); + return applicationId; + } + + @Override + public void killApplication(ApplicationId applicationId) + throws YarnRemoteException { + LOG.info("Killing application " + applicationId); + KillApplicationRequest request = + Records.newRecord(KillApplicationRequest.class); + request.setApplicationId(applicationId); + rmClient.forceKillApplication(request); + } + + @Override + public ApplicationReport getApplicationReport(ApplicationId appId) + throws YarnRemoteException { + GetApplicationReportRequest request = + Records.newRecord(GetApplicationReportRequest.class); + request.setApplicationId(appId); + GetApplicationReportResponse response = + rmClient.getApplicationReport(request); + return response.getApplicationReport(); + } + + @Override + public List getApplicationList() + throws YarnRemoteException { + GetAllApplicationsRequest request = + Records.newRecord(GetAllApplicationsRequest.class); + GetAllApplicationsResponse response = rmClient.getAllApplications(request); + return response.getApplicationList(); + } + + @Override + public YarnClusterMetrics getYarnClusterMetrics() throws YarnRemoteException { + GetClusterMetricsRequest request = + Records.newRecord(GetClusterMetricsRequest.class); + GetClusterMetricsResponse response = rmClient.getClusterMetrics(request); + return response.getClusterMetrics(); + } + + @Override + public List getNodeReports() throws YarnRemoteException { + GetClusterNodesRequest request = + Records.newRecord(GetClusterNodesRequest.class); + GetClusterNodesResponse response = rmClient.getClusterNodes(request); + return response.getNodeReports(); + } + + @Override + public DelegationToken getRMDelegationToken(Text renewer) + throws YarnRemoteException { + /* get the token from RM */ + GetDelegationTokenRequest rmDTRequest = + Records.newRecord(GetDelegationTokenRequest.class); + rmDTRequest.setRenewer(renewer.toString()); + GetDelegationTokenResponse response = + rmClient.getDelegationToken(rmDTRequest); + return response.getRMDelegationToken(); + } + + private GetQueueInfoRequest + getQueueInfoRequest(String queueName, boolean includeApplications, + boolean includeChildQueues, boolean recursive) { + GetQueueInfoRequest request = Records.newRecord(GetQueueInfoRequest.class); + request.setQueueName(queueName); + request.setIncludeApplications(includeApplications); + request.setIncludeChildQueues(includeChildQueues); + request.setRecursive(recursive); + return request; + } + + @Override + public QueueInfo getQueueInfo(String queueName) throws YarnRemoteException { + GetQueueInfoRequest request = + getQueueInfoRequest(queueName, true, false, false); + Records.newRecord(GetQueueInfoRequest.class); + return rmClient.getQueueInfo(request).getQueueInfo(); + } + + @Override + public List getQueueAclsInfo() throws YarnRemoteException { + GetQueueUserAclsInfoRequest request = + Records.newRecord(GetQueueUserAclsInfoRequest.class); + return rmClient.getQueueUserAcls(request).getUserAclsInfoList(); + } + + @Override + public List getAllQueues() throws YarnRemoteException { + List queues = new ArrayList(); + + QueueInfo rootQueue = + rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true)) + .getQueueInfo(); + getChildQueues(rootQueue, queues, true); + return queues; + } + + @Override + public List getRootQueueInfos() throws YarnRemoteException { + List queues = new ArrayList(); + + QueueInfo rootQueue = + rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true)) + .getQueueInfo(); + getChildQueues(rootQueue, queues, false); + return queues; + } + + @Override + public List getChildQueueInfos(String parent) + throws YarnRemoteException { + List queues = new ArrayList(); + + QueueInfo parentQueue = + rmClient.getQueueInfo(getQueueInfoRequest(parent, false, true, false)) + .getQueueInfo(); + getChildQueues(parentQueue, queues, true); + return queues; + } + + private void getChildQueues(QueueInfo parent, List queues, + boolean recursive) { + List childQueues = parent.getChildQueues(); + + for (QueueInfo child : childQueues) { + queues.add(child); + if (recursive) { + getChildQueues(child, queues, recursive); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/hadoop/yarn/client/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/hadoop/yarn/client/TestYarnClient.java new file mode 100644 index 0000000000..58737da1f7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/hadoop/yarn/client/TestYarnClient.java @@ -0,0 +1,30 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.hadoop.yarn.client; + +import org.junit.Test; + +public class TestYarnClient { + + @Test + public void test() { + // More to come later. + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/pom.xml b/hadoop-yarn-project/hadoop-yarn/pom.xml index d6db3815b0..718c5ffea8 100644 --- a/hadoop-yarn-project/hadoop-yarn/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/pom.xml @@ -11,8 +11,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. See accompanying LICENSE file. ---> - +--> 4.0.0 org.apache.hadoop @@ -186,5 +185,6 @@ hadoop-yarn-server hadoop-yarn-applications hadoop-yarn-site + hadoop-yarn-client - + \ No newline at end of file