YARN-1702. Added kill app functionality to RM web services. Contributed by Varun Vasudev.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1602298 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-06-12 21:31:52 +00:00
parent 4bc91b44c9
commit dc7dd1fa19
5 changed files with 1002 additions and 2 deletions

View File

@ -36,6 +36,9 @@ Release 2.5.0 - UNRELEASED
schedulers after ResourceManager Restart so as to preserve running work in
the cluster. (Jian He via vinodkv)
YARN-1702. Added kill app functionality to RM web services. (Varun Vasudev
via vinodkv)
IMPROVEMENTS
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.AccessControlException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
@ -31,19 +34,27 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -56,6 +67,8 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -66,10 +79,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
@ -82,7 +95,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
@ -584,4 +596,166 @@ public AppAttemptsInfo getAppAttempts(@PathParam("appid") String appId) {
return appAttemptsInfo;
}
@GET
@Path("/apps/{appid}/state")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public AppState getAppState(@Context HttpServletRequest hsr,
@PathParam("appid") String appId) throws AuthorizationException {
init();
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
String userName = "";
if (callerUGI != null) {
userName = callerUGI.getUserName();
}
RMApp app = null;
try {
app = getRMAppForAppId(appId);
} catch (NotFoundException e) {
RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST,
"UNKNOWN", "RMWebService",
"Trying to get state of an absent application " + appId);
throw e;
}
AppState ret = new AppState();
ret.setState(app.getState().toString());
return ret;
}
// can't return POJO because we can't control the status code
// it's always set to 200 when we need to allow it to be set
// to 202
@PUT
@Path("/apps/{appid}/state")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Response updateAppState(AppState targetState,
@Context HttpServletRequest hsr, @PathParam("appid") String appId)
throws AuthorizationException, YarnException, InterruptedException,
IOException {
init();
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
if (callerUGI == null) {
String msg = "Unable to obtain user name, user not authenticated";
throw new AuthorizationException(msg);
}
String userName = callerUGI.getUserName();
RMApp app = null;
try {
app = getRMAppForAppId(appId);
} catch (NotFoundException e) {
RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST,
"UNKNOWN", "RMWebService", "Trying to kill/move an absent application "
+ appId);
throw e;
}
if (!app.getState().toString().equals(targetState.getState())) {
// user is attempting to change state. right we only
// allow users to kill the app
if (targetState.getState().equals(YarnApplicationState.KILLED.toString())) {
return killApp(app, callerUGI, hsr);
}
throw new BadRequestException("Only '"
+ YarnApplicationState.KILLED.toString()
+ "' is allowed as a target state.");
}
AppState ret = new AppState();
ret.setState(app.getState().toString());
return Response.status(Status.OK).entity(ret).build();
}
protected Response killApp(RMApp app, UserGroupInformation callerUGI,
HttpServletRequest hsr) throws IOException, InterruptedException {
if (app == null) {
throw new IllegalArgumentException("app cannot be null");
}
String userName = callerUGI.getUserName();
final ApplicationId appid = app.getApplicationId();
KillApplicationResponse resp = null;
try {
resp =
callerUGI
.doAs(new PrivilegedExceptionAction<KillApplicationResponse>() {
@Override
public KillApplicationResponse run() throws IOException,
YarnException {
KillApplicationRequest req =
KillApplicationRequest.newInstance(appid);
return rm.getClientRMService().forceKillApplication(req);
}
});
} catch (UndeclaredThrowableException ue) {
// if the root cause is a permissions issue
// bubble that up to the user
if (ue.getCause() instanceof YarnException) {
YarnException ye = (YarnException) ue.getCause();
if (ye.getCause() instanceof AccessControlException) {
String appId = app.getApplicationId().toString();
String msg =
"Unauthorized attempt to kill appid " + appId
+ " by remote user " + userName;
return Response.status(Status.FORBIDDEN).entity(msg).build();
} else {
throw ue;
}
} else {
throw ue;
}
}
AppState ret = new AppState();
ret.setState(app.getState().toString());
if (resp.getIsKillCompleted()) {
RMAuditLogger.logSuccess(userName, AuditConstants.KILL_APP_REQUEST,
"RMWebService", app.getApplicationId());
} else {
return Response.status(Status.ACCEPTED).entity(ret)
.header(HttpHeaders.LOCATION, hsr.getRequestURL()).build();
}
return Response.status(Status.OK).entity(ret).build();
}
private RMApp getRMAppForAppId(String appId) {
if (appId == null || appId.isEmpty()) {
throw new NotFoundException("appId, " + appId + ", is empty or null");
}
ApplicationId id;
try {
id = ConverterUtils.toApplicationId(recordFactory, appId);
} catch (NumberFormatException e) {
throw new NotFoundException("appId is invalid");
}
if (id == null) {
throw new NotFoundException("appId is invalid");
}
RMApp app = rm.getRMContext().getRMApps().get(id);
if (app == null) {
throw new NotFoundException("app with id: " + appId + " not found");
}
return app;
}
private UserGroupInformation getCallerUserGroupInformation(
HttpServletRequest hsr) {
String remoteUser = hsr.getRemoteUser();
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
return callerUGI;
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "appstate")
@XmlAccessorType(XmlAccessType.FIELD)
public class AppState {
String state;
public AppState() {
}
public AppState(String state) {
this.state = state;
}
public void setState(String state) {
this.state = state;
}
public String getState() {
return this.state;
}
}

View File

@ -0,0 +1,496 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.Properties;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Singleton;
import com.google.inject.servlet.GuiceServletContextListener;
import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.json.JSONJAXBContext;
import com.sun.jersey.api.json.JSONMarshaller;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.JerseyTest;
import com.sun.jersey.test.framework.WebAppDescriptor;
@RunWith(Parameterized.class)
public class TestRMWebServicesAppsModification extends JerseyTest {
private static MockRM rm;
private static final int CONTAINER_MB = 1024;
private Injector injector;
private String webserviceUserName = "testuser";
public class GuiceServletConfig extends GuiceServletContextListener {
@Override
protected Injector getInjector() {
return injector;
}
}
/*
* Helper class to allow testing of RM web services which require
* authorization Add this class as a filter in the Guice injector for the
* MockRM
*/
@Singleton
public static class TestRMCustomAuthFilter extends AuthenticationFilter {
@Override
protected Properties getConfiguration(String configPrefix,
FilterConfig filterConfig) throws ServletException {
Properties props = new Properties();
Enumeration<?> names = filterConfig.getInitParameterNames();
while (names.hasMoreElements()) {
String name = (String) names.nextElement();
if (name.startsWith(configPrefix)) {
String value = filterConfig.getInitParameter(name);
props.put(name.substring(configPrefix.length()), value);
}
}
props.put(AuthenticationFilter.AUTH_TYPE, "simple");
props.put(PseudoAuthenticationHandler.ANONYMOUS_ALLOWED, "false");
return props;
}
}
private class TestServletModule extends ServletModule {
public Configuration conf = new Configuration();
boolean setAuthFilter = false;
@Override
protected void configureServlets() {
bind(JAXBContextResolver.class);
bind(RMWebServices.class);
bind(GenericExceptionHandler.class);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
rm = new MockRM(conf);
bind(ResourceManager.class).toInstance(rm);
bind(RMContext.class).toInstance(rm.getRMContext());
bind(ApplicationACLsManager.class).toInstance(
rm.getApplicationACLsManager());
bind(QueueACLsManager.class).toInstance(rm.getQueueACLsManager());
if (setAuthFilter) {
filter("/*").through(TestRMCustomAuthFilter.class);
}
serve("/*").with(GuiceContainer.class);
}
}
private Injector getNoAuthInjector() {
return Guice.createInjector(new TestServletModule() {
@Override
protected void configureServlets() {
super.configureServlets();
}
});
}
private Injector getSimpleAuthInjector() {
return Guice.createInjector(new TestServletModule() {
@Override
protected void configureServlets() {
setAuthFilter = true;
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
// set the admin acls otherwise all users are considered admins
// and we can't test authorization
conf.setStrings(YarnConfiguration.YARN_ADMIN_ACL, "testuser1");
super.configureServlets();
}
});
}
@Parameters
public static Collection<Object[]> guiceConfigs() {
return Arrays.asList(new Object[][] { { 0 }, { 1 } });
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
}
public TestRMWebServicesAppsModification(int run) {
super(new WebAppDescriptor.Builder(
"org.apache.hadoop.yarn.server.resourcemanager.webapp")
.contextListenerClass(GuiceServletConfig.class)
.filterClass(com.google.inject.servlet.GuiceFilter.class)
.contextPath("jersey-guice-filter").servletPath("/").build());
switch (run) {
case 0:
default:
injector = getNoAuthInjector();
break;
case 1:
injector = getSimpleAuthInjector();
break;
}
}
private boolean isAuthorizationEnabled() {
return rm.getConfig().getBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
}
private WebResource constructWebResource(WebResource r, String... paths) {
WebResource rt = r;
for (String path : paths) {
rt = rt.path(path);
}
if (isAuthorizationEnabled()) {
rt = rt.queryParam("user.name", webserviceUserName);
}
return rt;
}
private WebResource constructWebResource(String... paths) {
WebResource r = resource();
WebResource ws = r.path("ws").path("v1").path("cluster");
return this.constructWebResource(ws, paths);
}
@Test
public void testSingleAppState() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
for (String mediaType : mediaTypes) {
RMApp app = rm.submitApp(CONTAINER_MB, "", webserviceUserName);
amNodeManager.nodeHeartbeat(true);
ClientResponse response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"state").accept(mediaType).get(ClientResponse.class);
assertEquals(Status.OK, response.getClientResponseStatus());
if (mediaType == MediaType.APPLICATION_JSON) {
verifyAppStateJson(response, RMAppState.ACCEPTED);
} else if (mediaType == MediaType.APPLICATION_XML) {
verifyAppStateXML(response, RMAppState.ACCEPTED);
}
}
rm.stop();
}
@Test(timeout = 90000)
public void testSingleAppKill() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
MediaType[] contentTypes =
{ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE };
for (String mediaType : mediaTypes) {
for (MediaType contentType : contentTypes) {
RMApp app = rm.submitApp(CONTAINER_MB, "", webserviceUserName);
amNodeManager.nodeHeartbeat(true);
ClientResponse response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"state").accept(mediaType).get(ClientResponse.class);
AppState targetState =
new AppState(YarnApplicationState.KILLED.toString());
Object entity;
if (contentType == MediaType.APPLICATION_JSON_TYPE) {
entity = appStateToJSON(targetState);
} else {
entity = targetState;
}
response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"state").entity(entity, contentType).accept(mediaType)
.put(ClientResponse.class);
if (!isAuthorizationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
continue;
}
assertEquals(Status.ACCEPTED, response.getClientResponseStatus());
if (mediaType == MediaType.APPLICATION_JSON) {
verifyAppStateJson(response, RMAppState.KILLING, RMAppState.ACCEPTED);
} else {
verifyAppStateXML(response, RMAppState.KILLING, RMAppState.ACCEPTED);
}
String locationHeaderValue =
response.getHeaders().getFirst(HttpHeaders.LOCATION);
Client c = Client.create();
WebResource tmp = c.resource(locationHeaderValue);
if (isAuthorizationEnabled()) {
tmp = tmp.queryParam("user.name", webserviceUserName);
}
response = tmp.get(ClientResponse.class);
assertEquals(Status.OK, response.getClientResponseStatus());
assertTrue(locationHeaderValue.endsWith("/ws/v1/cluster/apps/"
+ app.getApplicationId().toString() + "/state"));
while (true) {
Thread.sleep(100);
response =
this
.constructWebResource("apps",
app.getApplicationId().toString(), "state").accept(mediaType)
.entity(entity, contentType).put(ClientResponse.class);
assertTrue((response.getClientResponseStatus() == Status.ACCEPTED)
|| (response.getClientResponseStatus() == Status.OK));
if (response.getClientResponseStatus() == Status.OK) {
assertEquals(RMAppState.KILLED, app.getState());
if (mediaType == MediaType.APPLICATION_JSON) {
verifyAppStateJson(response, RMAppState.KILLED);
} else {
verifyAppStateXML(response, RMAppState.KILLED);
}
break;
}
}
}
}
rm.stop();
return;
}
@Test
public void testSingleAppKillInvalidState() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
MediaType[] contentTypes =
{ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE };
String[] targetStates =
{ YarnApplicationState.FINISHED.toString(), "blah" };
for (String mediaType : mediaTypes) {
for (MediaType contentType : contentTypes) {
for (String targetStateString : targetStates) {
RMApp app = rm.submitApp(CONTAINER_MB, "", webserviceUserName);
amNodeManager.nodeHeartbeat(true);
ClientResponse response;
AppState targetState = new AppState(targetStateString);
Object entity;
if (contentType == MediaType.APPLICATION_JSON_TYPE) {
entity = appStateToJSON(targetState);
} else {
entity = targetState;
}
response =
this
.constructWebResource("apps",
app.getApplicationId().toString(), "state")
.entity(entity, contentType).accept(mediaType)
.put(ClientResponse.class);
if (!isAuthorizationEnabled()) {
assertEquals(Status.UNAUTHORIZED,
response.getClientResponseStatus());
continue;
}
assertEquals(Status.BAD_REQUEST, response.getClientResponseStatus());
}
}
}
rm.stop();
return;
}
private static String appStateToJSON(AppState state) throws Exception {
StringWriter sw = new StringWriter();
JSONJAXBContext ctx = new JSONJAXBContext(AppState.class);
JSONMarshaller jm = ctx.createJSONMarshaller();
jm.marshallToJSON(state, sw);
return sw.toString();
}
protected static void verifyAppStateJson(ClientResponse response,
RMAppState... states) throws JSONException {
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
boolean valid = false;
for (RMAppState state : states) {
if (state.toString().equals(json.getString("state"))) {
valid = true;
}
}
assertTrue("app state incorrect", valid);
return;
}
protected static void verifyAppStateXML(ClientResponse response,
RMAppState... appStates) throws ParserConfigurationException,
IOException, SAXException {
assertEquals(MediaType.APPLICATION_XML_TYPE, response.getType());
String xml = response.getEntity(String.class);
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("appstate");
assertEquals("incorrect number of elements", 1, nodes.getLength());
Element element = (Element) nodes.item(0);
String state = WebServicesTestUtils.getXmlString(element, "state");
boolean valid = false;
for (RMAppState appState : appStates) {
if (appState.toString().equals(state)) {
valid = true;
}
}
assertTrue("app state incorrect", valid);
return;
}
@Test(timeout = 30000)
public void testSingleAppKillUnauthorized() throws Exception {
// default root queue allows anyone to have admin acl
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.setAcl("root", QueueACL.ADMINISTER_QUEUE, "someuser");
csconf.setAcl("root.default", QueueACL.ADMINISTER_QUEUE, "someuser");
rm.getResourceScheduler().reinitialize(csconf, rm.getRMContext());
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
String[] mediaTypes =
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
for (String mediaType : mediaTypes) {
RMApp app = rm.submitApp(CONTAINER_MB, "test", "someuser");
amNodeManager.nodeHeartbeat(true);
ClientResponse response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"state").accept(mediaType).get(ClientResponse.class);
AppState info = response.getEntity(AppState.class);
info.setState(YarnApplicationState.KILLED.toString());
response =
this
.constructWebResource("apps", app.getApplicationId().toString(),
"state").accept(mediaType)
.entity(info, MediaType.APPLICATION_XML).put(ClientResponse.class);
if (!isAuthorizationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
} else {
assertEquals(Status.FORBIDDEN, response.getClientResponseStatus());
}
}
rm.stop();
return;
}
@Test
public void testSingleAppKillInvalidId() throws Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
amNodeManager.nodeHeartbeat(true);
String[] testAppIds = { "application_1391705042196_0001", "random_string" };
for (String testAppId : testAppIds) {
AppState info = new AppState("KILLED");
ClientResponse response =
this.constructWebResource("apps", testAppId, "state")
.accept(MediaType.APPLICATION_XML)
.entity(info, MediaType.APPLICATION_XML).put(ClientResponse.class);
if (!isAuthorizationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
continue;
}
assertEquals(Status.NOT_FOUND, response.getClientResponseStatus());
}
rm.stop();
return;
}
@After
@Override
public void tearDown() throws Exception {
if (rm != null) {
rm.stop();
}
super.tearDown();
}
}

View File

@ -1564,6 +1564,287 @@ _01_000001</amContainerLogs>
</app>
+---+
* Cluster Application State API
With the application state API, you can query the state of a submitted app as well kill a running app by modifying the state of a running app using a PUT request with the state set to "KILLED". To perform the PUT operation, authentication has to be setup for the RM web services. In addition, you must be authorized to kill the app. Currently you can only change the state to "KILLED"; an attempt to change the state to any other results in a 400 error response. Examples of the unauthorized and bad request errors are below. When you carry out a successful PUT, the iniital response may be a 202. You can confirm that the app is killed by repeating the PUT request until you get a 200, querying the state using the GET method or querying for app information and checking the state. In the examples below, we repeat the PUT request and get a 200 response.
Please note that in order to kill an app, you must have an authentication filter setup for the HTTP interface. The functionality requires that a username is set in the HttpServletRequest. If no filter is setup, the response will be an "UNAUTHORIZED" response.
** URI
-----
* http://<rm http address:port>/ws/v1/cluster/apps/{appid}/state
-----
** HTTP Operations Supported
------
* GET
* PUT
------
** Query Parameters Supported
------
None
------
** Elements of <appstate> object
When you make a request for the state of an app, the information returned has the following fields
*---------------+--------------+-------------------------------+
|| Item || Data Type || Description |
*---------------+--------------+-------------------------------+
| state | string | The application state - can be one of "NEW", "NEW_SAVING", "SUBMITTED", "ACCEPTED", "RUNNING", "FINISHED", "FAILED", "KILLED" |
*---------------+--------------+--------------------------------+
** Response Examples
<<JSON responses>>
HTTP Request
-----
GET http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/state
-----
Response Header:
+---+
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(6.1.26)
+---+
Response Body:
+---+
{
"state":"ACCEPTED"
}
+---+
HTTP Request
-----
PUT http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/state
----
Request Body:
+---+
{
"state":"KILLED"
}
+---+
Response Header:
+---+
HTTP/1.1 202 Accepted
Content-Type: application/json
Transfer-Encoding: chunked
Location: http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003
Server: Jetty(6.1.26)
+---+
Response Body:
+---+
{
"state":"ACCEPTED"
}
+---+
-----
PUT http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/state
----
Request Body:
+---+
{
"state":"KILLED"
}
+---+
Response Header:
+---+
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(6.1.26)
+---+
Response Body:
+---+
{
"state":"KILLED"
}
+---+
<<XML responses>>
HTTP Request
-----
GET http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/state
-----
Response Header:
+---+
HTTP/1.1 200 OK
Content-Type: application/xml
Content-Length: 99
Server: Jetty(6.1.26)
+---+
Response Body:
+---+
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<appstate>
<state>ACCEPTED</state>
</appstate>
+---+
HTTP Request
-----
PUT http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/state
----
Request Body:
+---+
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<appstate>
<state>KILLED</state>
</appstate>
+---+
Response Header:
+---+
HTTP/1.1 202 Accepted
Content-Type: application/json
Content-Length: 794
Location: http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003
Server: Jetty(6.1.26)
+---+
Response Body:
+---+
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<appstate>
<state>ACCEPTED</state>
</appstate>
+---+
HTTP Request
-----
PUT http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/state
----
Request Body:
+---+
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<appstate>
<state>KILLED</state>
</appstate>
+---+
Response Header:
+---+
HTTP/1.1 200 OK
Content-Type: application/xml
Content-Length: 917
Server: Jetty(6.1.26)
+---+
Response Body:
+---+
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<appstate>
<state>KILLED</state>
</appstate>
+---+
<<Unauthorized Error Response>>
HTTP Request
-----
PUT http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/state
----
Request Body:
+---+
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<appstate>
<state>KILLED</state>
</appstate>
+---+
Response Header:
+---+
HTTP/1.1 403 Unauthorized
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(6.1.26)
+---+
<<Bad Request Error Response>>
HTTP Request
-----
PUT http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/state
----
Request Body:
+---+
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<appstate>
<state>RUNNING</state>
</appstate>
+---+
Response Header:
+---+
HTTP/1.1 400
Content-Length: 295
Content-Type: application/xml
Server: Jetty(6.1.26)
+---+
Response Body:
+---+
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<RemoteException>
<exception>BadRequestException</exception>
<message>java.lang.Exception: Only 'KILLED' is allowed as a target state.</message>
<javaClassName>org.apache.hadoop.yarn.webapp.BadRequestException</javaClassName>
</RemoteException>
+---+
* Cluster Application Attempts API
With the application attempts API, you can obtain a collection of resources that represent an application attempt. When you run a GET operation on this resource, you obtain a collection of App Attempt Objects.