YARN-11228. [Federation] Add getAppAttempts, getAppAttempt REST APIs for Router. (#4695)

This commit is contained in:
slfan1989 2022-08-07 00:36:26 +08:00 committed by GitHub
parent 25ccdc77af
commit 52c2d99889
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 219 additions and 24 deletions

View File

@ -42,6 +42,28 @@ private RouterServerUtil() {
public static final Logger LOG = public static final Logger LOG =
LoggerFactory.getLogger(RouterServerUtil.class); LoggerFactory.getLogger(RouterServerUtil.class);
/**
* Throws an exception due to an error.
*
* @param t the throwable raised in the called class.
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @throws YarnException on failure
*/
@Public
@Unstable
public static void logAndThrowException(Throwable t, String errMsgFormat, Object... args)
throws YarnException {
String msg = String.format(errMsgFormat, args);
if (t != null) {
LOG.error(msg, t);
throw new YarnException(msg, t);
} else {
LOG.error(msg);
throw new YarnException(msg);
}
}
/** /**
* Throws an exception due to an error. * Throws an exception due to an error.
* *
@ -101,4 +123,26 @@ public static void logAndThrowRunTimeException(String errMsg, Throwable t)
throw new RuntimeException(errMsg); throw new RuntimeException(errMsg);
} }
} }
/**
* Throws an RunTimeException due to an error.
*
* @param t the throwable raised in the called class.
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @throws RuntimeException on failure
*/
@Public
@Unstable
public static void logAndThrowRunTimeException(Throwable t, String errMsgFormat, Object... args)
throws RuntimeException {
String msg = String.format(errMsgFormat, args);
if (t != null) {
LOG.error(msg, t);
throw new RuntimeException(msg, t);
} else {
LOG.error(msg);
throw new RuntimeException(msg);
}
}
} }

View File

@ -1360,7 +1360,24 @@ public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
@Override @Override
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) { public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
throw new NotImplementedException("Code is not implemented"); if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}
try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppAttempts(hsr, appId);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the AppAttempt appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
}
return null;
} }
@Override @Override
@ -1372,7 +1389,28 @@ public RMQueueAclInfo checkUserAccessToQueue(String queue, String username,
@Override @Override
public AppAttemptInfo getAppAttempt(HttpServletRequest req, public AppAttemptInfo getAppAttempt(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) { HttpServletResponse res, String appId, String appAttemptId) {
throw new NotImplementedException("Code is not implemented");
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}
if (appAttemptId == null || appAttemptId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appAttemptId is empty or null.");
}
try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppAttempt(req, res, appId, appAttemptId);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the AppAttempt appId: %s, appAttemptId: %s.", appId, appAttemptId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
}
return null;
} }
@Override @Override
@ -1423,13 +1461,7 @@ public ContainerInfo getContainer(HttpServletRequest req,
} }
try { try {
ApplicationId applicationId = ApplicationId.fromString(appId); SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);
if (subClusterInfo == null) {
RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " +
applicationId, null);
}
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
@ -1483,12 +1515,7 @@ public Response signalToContainer(String containerId, String command,
ContainerId containerIdObj = ContainerId.fromString(containerId); ContainerId containerIdObj = ContainerId.fromString(containerId);
ApplicationId applicationId = containerIdObj.getApplicationAttemptId().getApplicationId(); ApplicationId applicationId = containerIdObj.getApplicationAttemptId().getApplicationId();
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId); SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId.toString());
if (subClusterInfo == null) {
RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " +
applicationId, null);
}
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
@ -1556,24 +1583,26 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
/** /**
* get the HomeSubCluster according to ApplicationId. * get the HomeSubCluster according to ApplicationId.
* *
* @param applicationId applicationId * @param appId applicationId
* @return HomeSubCluster * @return HomeSubCluster
* @throws YarnException on failure * @throws YarnException on failure
*/ */
private SubClusterInfo getHomeSubClusterInfoByAppId(ApplicationId applicationId) throws YarnException { private SubClusterInfo getHomeSubClusterInfoByAppId(String appId)
throws YarnException {
SubClusterInfo subClusterInfo = null; SubClusterInfo subClusterInfo = null;
SubClusterId subClusterId = null;
try { try {
subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId); ApplicationId applicationId = ApplicationId.fromString(appId);
SubClusterId subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId);
if (subClusterId == null) { if (subClusterId == null) {
RouterServerUtil.logAndThrowException("Can't get HomeSubCluster by applicationId " RouterServerUtil.logAndThrowException(null,
+ applicationId, null); "Can't get HomeSubCluster by applicationId %s", applicationId);
} }
subClusterInfo = federationFacade.getSubCluster(subClusterId); subClusterInfo = federationFacade.getSubCluster(subClusterId);
return subClusterInfo;
} catch (YarnException e) { } catch (YarnException e) {
RouterServerUtil.logAndThrowException("Get HomeSubClusterInfo by applicationId " RouterServerUtil.logAndThrowException(e,
+ applicationId + " failed.", e); "Get HomeSubClusterInfo by applicationId %s failed.", appId);
} }
return subClusterInfo; throw new YarnException("Unable to get subCluster by applicationId = " + appId);
} }
} }

View File

@ -49,6 +49,11 @@
import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@ -67,6 +72,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.webapp.NotFoundException; import org.apache.hadoop.yarn.webapp.NotFoundException;
@ -412,4 +419,48 @@ public Response signalToContainer(String containerId, String command,
return Response.status(Status.OK).build(); return Response.status(Status.OK).build();
} }
@Override
public AppAttemptInfo getAppAttempt(HttpServletRequest req, HttpServletResponse res,
String appId, String appAttemptId) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.contains(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
ApplicationReport newApplicationReport = ApplicationReport.newInstance(
applicationId, ApplicationAttemptId.newInstance(applicationId, Integer.parseInt(appAttemptId)),
"user", "queue", "appname", "host", 124, null,
YarnApplicationState.RUNNING, "diagnostics", "url", 1, 2, 3, 4,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
ApplicationAttemptReport attempt = ApplicationAttemptReport.newInstance(
ApplicationAttemptId.newInstance(applicationId, Integer.parseInt(appAttemptId)),
"host", 124, "url", "oUrl", "diagnostics",
YarnApplicationAttemptState.FINISHED, ContainerId.newContainerId(
newApplicationReport.getCurrentApplicationAttemptId(), 1));
return new AppAttemptInfo(attempt);
}
@Override
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.contains(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
AppAttemptsInfo infos = new AppAttemptsInfo();
infos.add(TestRouterWebServiceUtil.generateAppAttemptInfo(0));
infos.add(TestRouterWebServiceUtil.generateAppAttemptInfo(1));
return infos;
}
} }

View File

@ -56,6 +56,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
@ -717,4 +719,60 @@ public void testGetContainer()
appId.toString(), appAttemptId.toString(), "0"); appId.toString(), appAttemptId.toString(), "0");
Assert.assertNotNull(containerInfo); Assert.assertNotNull(containerInfo);
} }
@Test
public void testGetAppAttempts()
throws IOException, InterruptedException, YarnException {
// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
Assert.assertNotNull(interceptor.submitApplication(context, null));
AppAttemptsInfo appAttemptsInfo = interceptor.getAppAttempts(null, appId.toString());
Assert.assertNotNull(appAttemptsInfo);
ArrayList<AppAttemptInfo> attemptLists = appAttemptsInfo.getAttempts();
Assert.assertNotNull(appAttemptsInfo);
Assert.assertEquals(2, attemptLists.size());
AppAttemptInfo attemptInfo1 = attemptLists.get(0);
Assert.assertNotNull(attemptInfo1);
Assert.assertEquals(0, attemptInfo1.getAttemptId());
Assert.assertEquals("AppAttemptId_0", attemptInfo1.getAppAttemptId());
Assert.assertEquals("LogLink_0", attemptInfo1.getLogsLink());
Assert.assertEquals(1659621705L, attemptInfo1.getFinishedTime());
AppAttemptInfo attemptInfo2 = attemptLists.get(1);
Assert.assertNotNull(attemptInfo2);
Assert.assertEquals(0, attemptInfo2.getAttemptId());
Assert.assertEquals("AppAttemptId_1", attemptInfo2.getAppAttemptId());
Assert.assertEquals("LogLink_1", attemptInfo2.getLogsLink());
Assert.assertEquals(1659621705L, attemptInfo2.getFinishedTime());
}
@Test
public void testGetAppAttempt()
throws IOException, InterruptedException, YarnException {
// Generate ApplicationId information
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
// Generate ApplicationAttemptId information
Assert.assertNotNull(interceptor.submitApplication(context, null));
ApplicationAttemptId expectAppAttemptId = ApplicationAttemptId.newInstance(appId, 1);
org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo
appAttemptInfo = interceptor.getAppAttempt(null, null, appId.toString(), "1");
Assert.assertNotNull(appAttemptInfo);
Assert.assertEquals(expectAppAttemptId.toString(), appAttemptInfo.getAppAttemptId());
Assert.assertEquals("url", appAttemptInfo.getTrackingUrl());
Assert.assertEquals("oUrl", appAttemptInfo.getOriginalTrackingUrl());
Assert.assertEquals(124, appAttemptInfo.getRpcPort());
Assert.assertEquals("host", appAttemptInfo.getHost());
}
} }

View File

@ -30,12 +30,16 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceRequestInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/** /**
* Test class to validate RouterWebServiceUtil methods. * Test class to validate RouterWebServiceUtil methods.
*/ */
@ -579,4 +583,13 @@ private void setUpClusterMetrics(ClusterMetricsInfo metrics, long seed) {
metrics.setActiveNodes(rand.nextInt(1000)); metrics.setActiveNodes(rand.nextInt(1000));
metrics.setShutdownNodes(rand.nextInt(1000)); metrics.setShutdownNodes(rand.nextInt(1000));
} }
public static AppAttemptInfo generateAppAttemptInfo(int attemptId) {
AppAttemptInfo appAttemptInfo = mock(AppAttemptInfo.class);
when(appAttemptInfo.getAppAttemptId()).thenReturn("AppAttemptId_" + attemptId);
when(appAttemptInfo.getAttemptId()).thenReturn(0);
when(appAttemptInfo.getFinishedTime()).thenReturn(1659621705L);
when(appAttemptInfo.getLogsLink()).thenReturn("LogLink_" + attemptId);
return appAttemptInfo;
}
} }