YARN-11224. [Federation] Add getAppQueue, updateAppQueue REST APIs for Router. (#4747)
This commit is contained in:
parent
e40b3a3089
commit
cd72f7e042
@ -1312,14 +1312,52 @@ public Response updateApplicationPriority(AppPriority targetPriority,
|
|||||||
@Override
|
@Override
|
||||||
public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
|
public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
|
||||||
throws AuthorizationException {
|
throws AuthorizationException {
|
||||||
throw new NotImplementedException("Code is not implemented");
|
|
||||||
|
if (appId == null || appId.isEmpty()) {
|
||||||
|
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
|
||||||
|
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
|
||||||
|
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
|
||||||
|
return interceptor.getAppQueue(hsr, appId);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
RouterServerUtil.logAndThrowRunTimeException(e,
|
||||||
|
"Unable to get queue by appId: %s.", appId);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
RouterServerUtil.logAndThrowRunTimeException("getAppQueue Failed.", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr,
|
public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr,
|
||||||
String appId) throws AuthorizationException, YarnException,
|
String appId) throws AuthorizationException, YarnException,
|
||||||
InterruptedException, IOException {
|
InterruptedException, IOException {
|
||||||
throw new NotImplementedException("Code is not implemented");
|
|
||||||
|
if (appId == null || appId.isEmpty()) {
|
||||||
|
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (targetQueue == null) {
|
||||||
|
throw new IllegalArgumentException("Parameter error, the targetQueue is null.");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
|
||||||
|
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
|
||||||
|
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
|
||||||
|
return interceptor.updateAppQueue(targetQueue, hsr, appId);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
RouterServerUtil.logAndThrowRunTimeException(e,
|
||||||
|
"Unable to update app queue by appId: %s.", appId);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
RouterServerUtil.logAndThrowRunTimeException("updateAppQueue Failed.", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -35,9 +35,9 @@
|
|||||||
import javax.ws.rs.core.Response.Status;
|
import javax.ws.rs.core.Response.Status;
|
||||||
|
|
||||||
import org.apache.commons.lang3.EnumUtils;
|
import org.apache.commons.lang3.EnumUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||||
import org.apache.hadoop.util.Sets;
|
import org.apache.hadoop.util.Sets;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
@ -77,6 +77,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||||
@ -621,4 +622,43 @@ public AppPriority getAppPriority(HttpServletRequest hsr, String appId)
|
|||||||
|
|
||||||
return new AppPriority(priority.getPriority());
|
return new AppPriority(priority.getPriority());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
|
||||||
|
throws AuthorizationException {
|
||||||
|
if (!isRunning) {
|
||||||
|
throw new RuntimeException("RM is stopped");
|
||||||
|
}
|
||||||
|
ApplicationId applicationId = ApplicationId.fromString(appId);
|
||||||
|
if (!applicationMap.containsKey(applicationId)) {
|
||||||
|
throw new NotFoundException("app with id: " + appId + " not found");
|
||||||
|
}
|
||||||
|
String queue = applicationMap.get(applicationId).getQueue();
|
||||||
|
return new AppQueue(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, String appId)
|
||||||
|
throws AuthorizationException, YarnException, InterruptedException, IOException {
|
||||||
|
if (!isRunning) {
|
||||||
|
throw new RuntimeException("RM is stopped");
|
||||||
|
}
|
||||||
|
ApplicationId applicationId = ApplicationId.fromString(appId);
|
||||||
|
if (!applicationMap.containsKey(applicationId)) {
|
||||||
|
throw new NotFoundException("app with id: " + appId + " not found");
|
||||||
|
}
|
||||||
|
if (targetQueue == null || StringUtils.isBlank(targetQueue.getQueue())) {
|
||||||
|
return Response.status(Status.BAD_REQUEST).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
ApplicationReport appReport = applicationMap.get(applicationId);
|
||||||
|
String originalQueue = appReport.getQueue();
|
||||||
|
appReport.setQueue(targetQueue.getQueue());
|
||||||
|
applicationMap.put(applicationId, appReport);
|
||||||
|
LOG.info("Update applicationId = {} from originalQueue = {} to targetQueue = {}.",
|
||||||
|
appId, originalQueue, targetQueue);
|
||||||
|
|
||||||
|
AppQueue targetAppQueue = new AppQueue(targetQueue.getQueue());
|
||||||
|
return Response.status(Status.OK).entity(targetAppQueue).build();
|
||||||
|
}
|
||||||
}
|
}
|
@ -62,6 +62,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||||
@ -900,4 +901,52 @@ public void testGetAppPriority() throws IOException, InterruptedException,
|
|||||||
Assert.assertNotNull(appPriority);
|
Assert.assertNotNull(appPriority);
|
||||||
Assert.assertEquals(priority, appPriority.getPriority());
|
Assert.assertEquals(priority, appPriority.getPriority());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateAppQueue() throws IOException, InterruptedException,
|
||||||
|
YarnException {
|
||||||
|
|
||||||
|
String oldQueue = "oldQueue";
|
||||||
|
String newQueue = "newQueue";
|
||||||
|
|
||||||
|
// Submit application to multiSubCluster
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
|
||||||
|
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
|
||||||
|
context.setApplicationId(appId.toString());
|
||||||
|
context.setQueue(oldQueue);
|
||||||
|
|
||||||
|
// Submit the application
|
||||||
|
Assert.assertNotNull(interceptor.submitApplication(context, null));
|
||||||
|
|
||||||
|
// Set New Queue for application
|
||||||
|
Response response = interceptor.updateAppQueue(new AppQueue(newQueue),
|
||||||
|
null, appId.toString());
|
||||||
|
|
||||||
|
Assert.assertNotNull(response);
|
||||||
|
AppQueue appQueue = (AppQueue) response.getEntity();
|
||||||
|
Assert.assertEquals(newQueue, appQueue.getQueue());
|
||||||
|
|
||||||
|
// Get AppQueue by application
|
||||||
|
AppQueue queue = interceptor.getAppQueue(null, appId.toString());
|
||||||
|
Assert.assertNotNull(queue);
|
||||||
|
Assert.assertEquals(newQueue, queue.getQueue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetAppQueue() throws IOException, InterruptedException, YarnException {
|
||||||
|
String queueName = "queueName";
|
||||||
|
|
||||||
|
// Submit application to multiSubCluster
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
|
||||||
|
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
|
||||||
|
context.setApplicationId(appId.toString());
|
||||||
|
context.setQueue(queueName);
|
||||||
|
|
||||||
|
Assert.assertNotNull(interceptor.submitApplication(context, null));
|
||||||
|
|
||||||
|
// Get Queue by application
|
||||||
|
AppQueue queue = interceptor.getAppQueue(null, appId.toString());
|
||||||
|
Assert.assertNotNull(queue);
|
||||||
|
Assert.assertEquals(queueName, queue.getQueue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user