YARN-11227. [Federation] Add getAppTimeout, getAppTimeouts, updateApplicationTimeout REST APIs for Router. (#4715)

This commit is contained in:
slfan1989 2022-08-11 05:53:46 +08:00 committed by GitHub
parent ffa9ed93a4
commit 133e8aabf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 281 additions and 23 deletions

View File

@ -22,6 +22,7 @@
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
/**
@ -45,6 +46,12 @@ public AppTimeoutInfo() {
remainingTimeInSec = -1;
}
public AppTimeoutInfo(ApplicationTimeout applicationTimeout) {
this.expiryTime = applicationTimeout.getExpiryTime();
this.remainingTimeInSec = applicationTimeout.getRemainingTime();
this.timeoutType = applicationTimeout.getTimeoutType();
}
public ApplicationTimeoutType getTimeoutType() {
return timeoutType;
}

View File

@ -1342,31 +1342,87 @@ public Response listReservation(String queue, String reservationId,
@Override
public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr, String appId,
String type) throws AuthorizationException {
throw new NotImplementedException("Code is not implemented");
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}
if (type == null || type.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the type is empty or null.");
}
try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppTimeout(hsr, appId, type);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the getAppTimeout appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getAppTimeout Failed.", e);
}
return null;
}
@Override
public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId)
throws AuthorizationException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
HttpServletRequest hsr, String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}
try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppTimeouts(hsr, appId);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the getAppTimeouts appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getAppTimeouts Failed.", e);
}
return null;
}
@Override
public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
HttpServletRequest hsr, String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}
if (appTimeout == null) {
throw new IllegalArgumentException("Parameter error, the appTimeout is null.");
}
try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.updateApplicationTimeout(appTimeout, hsr, appId);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the updateApplicationTimeout appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("updateApplicationTimeout Failed.", e);
}
return null;
}
@Override
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}
try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppAttempts(hsr, appId);
@ -1374,9 +1430,8 @@ public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the AppAttempt appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
RouterServerUtil.logAndThrowRunTimeException("getAppAttempts Failed.", e);
}
return null;
}

View File

@ -21,7 +21,6 @@
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
@ -54,6 +53,8 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@ -73,6 +74,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
@ -93,7 +96,7 @@ public class MockDefaultRequestInterceptorREST
// This property allows us to write tests for specific scenario as YARN RM
// down e.g. network issue, failover.
private boolean isRunning = true;
private HashSet<ApplicationId> applicationMap = new HashSet<>();
private Map<ApplicationId, ApplicationReport> applicationMap = new HashMap<>();
public static final String APP_STATE_RUNNING = "RUNNING";
private void validateRunning() throws ConnectException {
@ -123,7 +126,22 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId());
LOG.info("Application submitted: " + appId);
applicationMap.add(appId);
// Initialize appReport
ApplicationReport appReport = ApplicationReport.newInstance(
appId, ApplicationAttemptId.newInstance(appId, 1), null, newApp.getQueue(), null, null, 0,
null, YarnApplicationState.ACCEPTED, "", null, 0, 0, null, null, null, 0, null, null, null,
false, Priority.newInstance(newApp.getPriority()), null, null);
// Initialize appTimeoutsMap
HashMap<ApplicationTimeoutType, ApplicationTimeout> appTimeoutsMap = new HashMap<>();
ApplicationTimeoutType timeoutType = ApplicationTimeoutType.LIFETIME;
ApplicationTimeout appTimeOut =
ApplicationTimeout.newInstance(ApplicationTimeoutType.LIFETIME, "UNLIMITED", 10);
appTimeoutsMap.put(timeoutType, appTimeOut);
appReport.setApplicationTimeouts(appTimeoutsMap);
applicationMap.put(appId, appReport);
return Response.status(Status.ACCEPTED).header(HttpHeaders.LOCATION, "")
.entity(getSubClusterId()).build();
}
@ -136,7 +154,7 @@ public AppInfo getApp(HttpServletRequest hsr, String appId,
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.contains(applicationId)) {
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
@ -171,7 +189,7 @@ public Response updateAppState(AppState targetState, HttpServletRequest hsr,
validateRunning();
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.remove(applicationId)) {
if (applicationMap.remove(applicationId) == null) {
throw new ApplicationNotFoundException(
"Trying to kill an absent application: " + appId);
}
@ -244,7 +262,7 @@ public AppState getAppState(HttpServletRequest hsr, String appId)
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.contains(applicationId)) {
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
@ -428,7 +446,7 @@ public AppAttemptInfo getAppAttempt(HttpServletRequest req, HttpServletResponse
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.contains(applicationId)) {
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
@ -454,7 +472,7 @@ public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.contains(applicationId)) {
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
@ -463,4 +481,102 @@ public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
infos.add(TestRouterWebServiceUtil.generateAppAttemptInfo(1));
return infos;
}
@Override
public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr,
String appId, String type) throws AuthorizationException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
ApplicationReport appReport = applicationMap.get(applicationId);
Map<ApplicationTimeoutType, ApplicationTimeout> timeouts = appReport.getApplicationTimeouts();
ApplicationTimeoutType paramType = ApplicationTimeoutType.valueOf(type);
if (paramType == null) {
throw new NotFoundException("application timeout type not found");
}
if (!timeouts.containsKey(paramType)) {
throw new NotFoundException("timeout with id: " + appId + " not found");
}
ApplicationTimeout applicationTimeout = timeouts.get(paramType);
AppTimeoutInfo timeoutInfo = new AppTimeoutInfo();
timeoutInfo.setExpiryTime(applicationTimeout.getExpiryTime());
timeoutInfo.setTimeoutType(applicationTimeout.getTimeoutType());
timeoutInfo.setRemainingTime(applicationTimeout.getRemainingTime());
return timeoutInfo;
}
@Override
public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId)
throws AuthorizationException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
ApplicationReport appReport = applicationMap.get(applicationId);
Map<ApplicationTimeoutType, ApplicationTimeout> timeouts = appReport.getApplicationTimeouts();
AppTimeoutsInfo timeoutsInfo = new AppTimeoutsInfo();
for (ApplicationTimeout timeout : timeouts.values()) {
AppTimeoutInfo timeoutInfo = new AppTimeoutInfo();
timeoutInfo.setExpiryTime(timeout.getExpiryTime());
timeoutInfo.setTimeoutType(timeout.getTimeoutType());
timeoutInfo.setRemainingTime(timeout.getRemainingTime());
timeoutsInfo.add(timeoutInfo);
}
return timeoutsInfo;
}
@Override
public Response updateApplicationTimeout(AppTimeoutInfo appTimeout, HttpServletRequest hsr,
String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
ApplicationReport appReport = applicationMap.get(applicationId);
Map<ApplicationTimeoutType, ApplicationTimeout> timeouts = appReport.getApplicationTimeouts();
ApplicationTimeoutType paramTimeoutType = appTimeout.getTimeoutType();
if (!timeouts.containsKey(paramTimeoutType)) {
throw new NotFoundException("TimeOutType with id: " + appId + " not found");
}
ApplicationTimeout applicationTimeout = timeouts.get(paramTimeoutType);
applicationTimeout.setTimeoutType(appTimeout.getTimeoutType());
applicationTimeout.setExpiryTime(appTimeout.getExpireTime());
applicationTimeout.setRemainingTime(appTimeout.getRemainingTimeInSec());
AppTimeoutInfo result = new AppTimeoutInfo(applicationTimeout);
return Response.status(Status.OK).entity(result).build();
}
}

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
@ -58,10 +59,13 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.Times;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@ -757,7 +761,7 @@ public void testGetAppAttempt()
throws IOException, InterruptedException, YarnException {
// Generate ApplicationId information
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
@ -775,4 +779,80 @@ public void testGetAppAttempt()
Assert.assertEquals(124, appAttemptInfo.getRpcPort());
Assert.assertEquals("host", appAttemptInfo.getHost());
}
@Test
public void testGetAppTimeout() throws IOException, InterruptedException, YarnException {
// Generate ApplicationId information
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// Generate ApplicationAttemptId information
Assert.assertNotNull(interceptor.submitApplication(context, null));
ApplicationTimeoutType appTimeoutType = ApplicationTimeoutType.LIFETIME;
AppTimeoutInfo appTimeoutInfo =
interceptor.getAppTimeout(null, appId.toString(), appTimeoutType.toString());
Assert.assertNotNull(appTimeoutInfo);
Assert.assertEquals(10, appTimeoutInfo.getRemainingTimeInSec());
Assert.assertEquals("UNLIMITED", appTimeoutInfo.getExpireTime());
Assert.assertEquals(appTimeoutType, appTimeoutInfo.getTimeoutType());
}
@Test
public void testGetAppTimeouts() throws IOException, InterruptedException, YarnException {
// Generate ApplicationId information
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// Generate ApplicationAttemptId information
Assert.assertNotNull(interceptor.submitApplication(context, null));
AppTimeoutsInfo appTimeoutsInfo = interceptor.getAppTimeouts(null, appId.toString());
Assert.assertNotNull(appTimeoutsInfo);
List<AppTimeoutInfo> timeouts = appTimeoutsInfo.getAppTimeouts();
Assert.assertNotNull(timeouts);
Assert.assertEquals(1, timeouts.size());
AppTimeoutInfo resultAppTimeout = timeouts.get(0);
Assert.assertNotNull(resultAppTimeout);
Assert.assertEquals(10, resultAppTimeout.getRemainingTimeInSec());
Assert.assertEquals("UNLIMITED", resultAppTimeout.getExpireTime());
Assert.assertEquals(ApplicationTimeoutType.LIFETIME, resultAppTimeout.getTimeoutType());
}
@Test
public void testUpdateApplicationTimeout() throws IOException, InterruptedException,
YarnException {
// Generate ApplicationId information
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// Generate ApplicationAttemptId information
Assert.assertNotNull(interceptor.submitApplication(context, null));
long newLifetime = 10L;
// update 10L seconds more to timeout
String timeout = Times.formatISO8601(Time.now() + newLifetime * 1000);
AppTimeoutInfo paramAppTimeOut = new AppTimeoutInfo();
paramAppTimeOut.setExpiryTime(timeout);
// RemainingTime = Math.max((timeoutInMillis - System.currentTimeMillis()) / 1000, 0))
paramAppTimeOut.setRemainingTime(newLifetime);
paramAppTimeOut.setTimeoutType(ApplicationTimeoutType.LIFETIME);
Response response =
interceptor.updateApplicationTimeout(paramAppTimeOut, null, appId.toString());
Assert.assertNotNull(response);
AppTimeoutInfo entity = (AppTimeoutInfo) response.getEntity();
Assert.assertNotNull(entity);
Assert.assertEquals(paramAppTimeOut.getExpireTime(), entity.getExpireTime());
Assert.assertEquals(paramAppTimeOut.getTimeoutType(), entity.getTimeoutType());
Assert.assertEquals(paramAppTimeOut.getRemainingTimeInSec(), entity.getRemainingTimeInSec());
}
}