YARN-3970. Add REST api support for Application Priority. Contributed by Naganarasimha G R.

This commit is contained in:
Varun Vasudev 2015-09-03 16:35:10 +05:30
parent c92e31bd65
commit b469ac531a
8 changed files with 415 additions and 12 deletions

View File

@ -187,6 +187,9 @@ Release 2.8.0 - UNRELEASED
YARN-3250. Support admin cli interface in for Application Priority.
(Rohith Sharma K S via jianhe)
YARN-3970. Add REST api support for Application Priority.
(Naganarasimha G R via vvasudev)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -645,7 +645,7 @@ private void updateApplicationPriority(String applicationId, String priority)
Priority newAppPriority = Priority.newInstance(Integer.parseInt(priority));
sysout.println("Updating priority of an aplication " + applicationId);
client.updateApplicationPriority(appId, newAppPriority);
sysout.println("Successfully updated the priority of any application "
+ applicationId);
sysout.println("Successfully updated the application with id "
+ applicationId + " with priority '" + priority + "'");
}
}

View File

@ -1832,7 +1832,7 @@ public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
LOG.info("Application '" + applicationId
+ "' is submitted without priority "
+ "hence considering default queue/cluster priority:"
+ "hence considering default queue/cluster priority: "
+ priorityFromContext.getPriority());
}
@ -1846,8 +1846,8 @@ public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
appPriority = priorityFromContext;
LOG.info("Priority '" + appPriority.getPriority()
+ "' is acceptable in queue :" + queueName + "for application:"
+ applicationId + "for the user: " + user);
+ "' is acceptable in queue : " + queueName + " for application: "
+ applicationId + " for the user: " + user);
return appPriority;
}
@ -1876,14 +1876,14 @@ public void updateApplicationPriority(Priority newPriority,
+ "' is not present, hence could not change priority.");
}
if (application.getPriority().equals(newPriority)) {
return;
}
RMApp rmApp = rmContext.getRMApps().get(applicationId);
appPriority = checkAndGetApplicationPriority(newPriority, rmApp.getUser(),
rmApp.getQueue(), applicationId);
if (application.getPriority().equals(appPriority)) {
return;
}
// Update new priority in Submission Context to keep track in HA
rmApp.getApplicationSubmissionContext().setPriority(appPriority);
@ -1909,7 +1909,7 @@ public void updateApplicationPriority(Priority newPriority,
}
LOG.info("Priority '" + appPriority + "' is updated in queue :"
+ rmApp.getQueue() + "for application:" + applicationId
+ "for the user: " + rmApp.getUser());
+ rmApp.getQueue() + " for application: " + applicationId
+ " for the user: " + rmApp.getUser());
}
}

View File

@ -59,7 +59,7 @@ public JAXBContextResolver() throws Exception {
final Class[] rootUnwrappedTypes =
{ NewApplication.class, ApplicationSubmissionContextInfo.class,
ContainerLaunchContextInfo.class, LocalResourceInfo.class,
DelegationToken.class, AppQueue.class };
DelegationToken.class, AppQueue.class, AppPriority.class };
this.typesContextMap = new HashMap<Class, JAXBContext>();
context =

View File

@ -85,6 +85,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -122,6 +123,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
@ -1047,6 +1049,120 @@ public KillApplicationResponse run() throws IOException,
return Response.status(Status.OK).entity(ret).build();
}
@GET
@Path("/apps/{appid}/priority")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public AppPriority getAppPriority(@Context HttpServletRequest hsr,
@PathParam("appid") String appId) throws AuthorizationException {
init();
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
String userName = "UNKNOWN-USER";
if (callerUGI != null) {
userName = callerUGI.getUserName();
}
RMApp app = null;
try {
app = getRMAppForAppId(appId);
} catch (NotFoundException e) {
RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST,
"UNKNOWN", "RMWebService",
"Trying to get state of an absent application " + appId);
throw e;
}
AppPriority ret = new AppPriority();
ret.setPriority(
app.getApplicationSubmissionContext().getPriority().getPriority());
return ret;
}
@PUT
@Path("/apps/{appid}/priority")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Response updateApplicationPriority(AppPriority targetPriority,
@Context HttpServletRequest hsr, @PathParam("appid") String appId)
throws AuthorizationException, YarnException, InterruptedException,
IOException {
init();
if (targetPriority == null) {
throw new YarnException("Target Priority cannot be null");
}
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
if (callerUGI == null) {
throw new AuthorizationException(
"Unable to obtain user name, user not authenticated");
}
if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) {
return Response.status(Status.FORBIDDEN)
.entity("The default static user cannot carry out this operation.")
.build();
}
String userName = callerUGI.getUserName();
RMApp app = null;
try {
app = getRMAppForAppId(appId);
} catch (NotFoundException e) {
RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST,
"UNKNOWN", "RMWebService",
"Trying to move an absent application " + appId);
throw e;
}
Priority priority = app.getApplicationSubmissionContext().getPriority();
if (priority == null
|| priority.getPriority() != targetPriority.getPriority()) {
return modifyApplicationPriority(app, callerUGI,
targetPriority.getPriority());
}
return Response.status(Status.OK).entity(targetPriority).build();
}
private Response modifyApplicationPriority(final RMApp app,
UserGroupInformation callerUGI, final int appPriority)
throws IOException, InterruptedException {
String userName = callerUGI.getUserName();
try {
callerUGI.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException, YarnException {
Priority priority = Priority.newInstance(appPriority);
UpdateApplicationPriorityRequest request =
UpdateApplicationPriorityRequest
.newInstance(app.getApplicationId(), priority);
rm.getClientRMService().updateApplicationPriority(request);
return null;
}
});
} catch (UndeclaredThrowableException ue) {
// if the root cause is a permissions issue
// bubble that up to the user
if (ue.getCause() instanceof YarnException) {
YarnException ye = (YarnException) ue.getCause();
if (ye.getCause() instanceof AccessControlException) {
String appId = app.getApplicationId().toString();
String msg = "Unauthorized attempt to change priority of appid "
+ appId + " by remote user " + userName;
return Response.status(Status.FORBIDDEN).entity(msg).build();
} else if (ye.getMessage().startsWith("Application in")
&& ye.getMessage().endsWith("state cannot be update priority.")) {
return Response.status(Status.BAD_REQUEST).entity(ye.getMessage())
.build();
} else {
throw ue;
}
} else {
throw ue;
}
}
AppPriority ret = new AppPriority(
app.getApplicationSubmissionContext().getPriority().getPriority());
return Response.status(Status.OK).entity(ret).build();
}
@GET
@Path("/apps/{appid}/queue")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "applicationpriority")
@XmlAccessorType(XmlAccessType.FIELD)
public class AppPriority {
private int priority;
public AppPriority() {
}
public AppPriority(int priority) {
this.priority = priority;
}
public void setPriority(int priority) {
this.priority = priority;
}
public int getPriority() {
return this.priority;
}
}

View File

@ -969,6 +969,92 @@ public void testGetAppQueue() throws Exception {
rm.stop();
}
@Test(timeout = 90000)
public void testUpdateAppPriority() throws Exception {
client().addFilter(new LoggingFilter(System.out));
if (!(rm.getResourceScheduler() instanceof CapacityScheduler)) {
// till the fair scheduler modifications for priority is completed
return;
}
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
cs.setClusterMaxPriority(conf);
// default root queue allows anyone to have admin acl
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
String[] queues = { "default", "test" };
csconf.setQueues("root", queues);
csconf.setCapacity("root.default", 50.0f);
csconf.setCapacity("root.test", 50.0f);
csconf.setAcl("root", QueueACL.ADMINISTER_QUEUE, "someuser");
csconf.setAcl("root.default", QueueACL.ADMINISTER_QUEUE, "someuser");
csconf.setAcl("root.test", QueueACL.ADMINISTER_QUEUE, "someuser");
rm.getResourceScheduler().reinitialize(csconf, rm.getRMContext());
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
MediaType[] contentTypes =
{ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE };
for (String mediaType : mediaTypes) {
for (MediaType contentType : contentTypes) {
RMApp app = rm.submitApp(CONTAINER_MB, "", webserviceUserName);
amNodeManager.nodeHeartbeat(true);
int modifiedPriority = 8;
AppPriority priority = new AppPriority(modifiedPriority);
Object entity;
if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) {
entity = appPriorityToJSON(priority);
} else {
entity = priority;
}
ClientResponse response = this
.constructWebResource("apps", app.getApplicationId().toString(),
"priority")
.entity(entity, contentType).accept(mediaType)
.put(ClientResponse.class);
if (!isAuthenticationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
continue;
}
assertEquals(Status.OK, response.getClientResponseStatus());
if (mediaType.equals(MediaType.APPLICATION_JSON)) {
verifyAppPriorityJson(response, modifiedPriority);
} else {
verifyAppPriorityXML(response, modifiedPriority);
}
response = this
.constructWebResource("apps", app.getApplicationId().toString(),
"priority")
.accept(mediaType).get(ClientResponse.class);
assertEquals(Status.OK, response.getClientResponseStatus());
if (mediaType.equals(MediaType.APPLICATION_JSON)) {
verifyAppPriorityJson(response, modifiedPriority);
} else {
verifyAppPriorityXML(response, modifiedPriority);
}
// check unauthorized
app = rm.submitApp(CONTAINER_MB, "", "someuser");
amNodeManager.nodeHeartbeat(true);
response = this
.constructWebResource("apps", app.getApplicationId().toString(),
"priority")
.entity(entity, contentType).accept(mediaType)
.put(ClientResponse.class);
assertEquals(Status.FORBIDDEN, response.getClientResponseStatus());
}
}
rm.stop();
}
@Test(timeout = 90000)
public void testAppMove() throws Exception {
@ -1049,6 +1135,15 @@ public void testAppMove() throws Exception {
rm.stop();
}
protected static String appPriorityToJSON(AppPriority targetPriority)
throws Exception {
StringWriter sw = new StringWriter();
JSONJAXBContext ctx = new JSONJAXBContext(AppPriority.class);
JSONMarshaller jm = ctx.createJSONMarshaller();
jm.marshallToJSON(targetPriority, sw);
return sw.toString();
}
protected static String appQueueToJSON(AppQueue targetQueue) throws Exception {
StringWriter sw = new StringWriter();
JSONJAXBContext ctx = new JSONJAXBContext(AppQueue.class);
@ -1056,6 +1151,31 @@ protected static String appQueueToJSON(AppQueue targetQueue) throws Exception {
jm.marshallToJSON(targetQueue, sw);
return sw.toString();
}
protected static void verifyAppPriorityJson(ClientResponse response,
int expectedPriority) throws JSONException {
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
int responsePriority = json.getInt("priority");
assertEquals(expectedPriority, responsePriority);
}
protected static void verifyAppPriorityXML(ClientResponse response,
int expectedPriority)
throws ParserConfigurationException, IOException, SAXException {
assertEquals(MediaType.APPLICATION_XML_TYPE, response.getType());
String xml = response.getEntity(String.class);
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("applicationpriority");
assertEquals("incorrect number of elements", 1, nodes.getLength());
Element element = (Element) nodes.item(0);
int responsePriority = WebServicesTestUtils.getXmlInt(element, "priority");
assertEquals(expectedPriority, responsePriority);
}
protected static void
verifyAppQueueJson(ClientResponse response, String queue)

View File

@ -30,6 +30,7 @@ ResourceManager REST API's.
* [Cluster Applications API(Submit Application)](#Cluster_Applications_APISubmit_Application)
* [Cluster Application State API](#Cluster_Application_State_API)
* [Cluster Application Queue API](#Cluster_Application_Queue_API)
* [Cluster Application Priority API](#Cluster_Application_Priority_API)
* [Cluster Delegation Tokens API](#Cluster_Delegation_Tokens_API)
Overview
@ -2770,6 +2771,125 @@ Response Body:
<queue>test</queue>
</appqueue>
Cluster Application Priority API
-----------------------------
With the application priority API, you can query the priority of a submitted app as well update priority of a running or accepted app using a PUT request specifying the target priority. To perform the PUT operation, authentication has to be setup for the RM web services. In addition, you must be authorized to update the app priority. Currently you can only update the app priority if you're using the Capacity scheduler.
Please note that in order to update priority of an app, you must have an authentication filter setup for the HTTP interface. The functionality requires that a username is set in the HttpServletRequest. If no filter is setup, the response will be an "UNAUTHORIZED" response.
This feature is currently in the alpha stage and may change in the future.
### URI
* http://<rm http address:port>/ws/v1/cluster/apps/{appid}/priority
### HTTP Operations Supported
* GET
* PUT
### Query Parameters Supported
None
### Elements of *apppriority* object
When you make a request for the state of an app, the information returned has the following fields
| Item | Data Type | Description |
|:---- |:---- |:---- |
| priority | int | The application priority |
### Response Examples
**JSON responses**
HTTP Request
GET http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/priority
Response Header:
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(6.1.26)
Response Body:
{
"priority":0
}
HTTP Request
PUT http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/priority
Request Body:
{
"priority":8
}
Response Header:
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(6.1.26)
Response Body:
{
"priority":8
}
**XML responses**
HTTP Request
GET http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/priority
Response Header:
HTTP/1.1 200 OK
Content-Type: application/xml
Content-Length: 98
Server: Jetty(6.1.26)
Response Body:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<applicationpriority>
<priority>0</priority>
</applicationpriority>
HTTP Request
PUT http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/priority
Request Body:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<applicationpriority>
<priority>8</priority>
</applicationpriority>
Response Header:
HTTP/1.1 200 OK
Content-Type: application/xml
Content-Length: 95
Server: Jetty(6.1.26)
Response Body:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<applicationpriority>
<priority>8</priority>
</applicationpriority>
Cluster Delegation Tokens API
-----------------------------