From 52c2d99889dc39a4f385cbb91570027ef97ec7df Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Sun, 7 Aug 2022 00:36:26 +0800 Subject: [PATCH] YARN-11228. [Federation] Add getAppAttempts, getAppAttempt REST APIs for Router. (#4695) --- .../yarn/server/router/RouterServerUtil.java | 44 +++++++++++ .../webapp/FederationInterceptorREST.java | 77 +++++++++++++------ .../MockDefaultRequestInterceptorREST.java | 51 ++++++++++++ .../webapp/TestFederationInterceptorREST.java | 58 ++++++++++++++ .../webapp/TestRouterWebServiceUtil.java | 13 ++++ 5 files changed, 219 insertions(+), 24 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java index 173df28de5..65f2b68bfc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java @@ -42,6 +42,28 @@ private RouterServerUtil() { public static final Logger LOG = LoggerFactory.getLogger(RouterServerUtil.class); + /** + * Throws an exception due to an error. + * + * @param t the throwable raised in the called class. + * @param errMsgFormat the error message format string. + * @param args referenced by the format specifiers in the format string. + * @throws YarnException on failure + */ + @Public + @Unstable + public static void logAndThrowException(Throwable t, String errMsgFormat, Object... args) + throws YarnException { + String msg = String.format(errMsgFormat, args); + if (t != null) { + LOG.error(msg, t); + throw new YarnException(msg, t); + } else { + LOG.error(msg); + throw new YarnException(msg); + } + } + /** * Throws an exception due to an error. * @@ -101,4 +123,26 @@ public static void logAndThrowRunTimeException(String errMsg, Throwable t) throw new RuntimeException(errMsg); } } + + /** + * Throws an RunTimeException due to an error. + * + * @param t the throwable raised in the called class. + * @param errMsgFormat the error message format string. + * @param args referenced by the format specifiers in the format string. + * @throws RuntimeException on failure + */ + @Public + @Unstable + public static void logAndThrowRunTimeException(Throwable t, String errMsgFormat, Object... args) + throws RuntimeException { + String msg = String.format(errMsgFormat, args); + if (t != null) { + LOG.error(msg, t); + throw new RuntimeException(msg, t); + } else { + LOG.error(msg); + throw new RuntimeException(msg); + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 474d5fac15..d2fda2be52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -1360,7 +1360,24 @@ public Response updateApplicationTimeout(AppTimeoutInfo appTimeout, @Override public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) { - throw new NotImplementedException("Code is not implemented"); + if (appId == null || appId.isEmpty()) { + throw new IllegalArgumentException("Parameter error, the appId is empty or null."); + } + + try { + SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); + + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.getAppAttempts(hsr, appId); + } catch (IllegalArgumentException e) { + RouterServerUtil.logAndThrowRunTimeException(e, + "Unable to get the AppAttempt appId: %s.", appId); + } catch (YarnException e) { + RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e); + } + + return null; } @Override @@ -1372,7 +1389,28 @@ public RMQueueAclInfo checkUserAccessToQueue(String queue, String username, @Override public AppAttemptInfo getAppAttempt(HttpServletRequest req, HttpServletResponse res, String appId, String appAttemptId) { - throw new NotImplementedException("Code is not implemented"); + + if (appId == null || appId.isEmpty()) { + throw new IllegalArgumentException("Parameter error, the appId is empty or null."); + } + if (appAttemptId == null || appAttemptId.isEmpty()) { + throw new IllegalArgumentException("Parameter error, the appAttemptId is empty or null."); + } + + try { + SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); + + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.getAppAttempt(req, res, appId, appAttemptId); + } catch (IllegalArgumentException e) { + RouterServerUtil.logAndThrowRunTimeException(e, + "Unable to get the AppAttempt appId: %s, appAttemptId: %s.", appId, appAttemptId); + } catch (YarnException e) { + RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e); + } + + return null; } @Override @@ -1423,13 +1461,7 @@ public ContainerInfo getContainer(HttpServletRequest req, } try { - ApplicationId applicationId = ApplicationId.fromString(appId); - SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId); - - if (subClusterInfo == null) { - RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " + - applicationId, null); - } + SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); @@ -1483,12 +1515,7 @@ public Response signalToContainer(String containerId, String command, ContainerId containerIdObj = ContainerId.fromString(containerId); ApplicationId applicationId = containerIdObj.getApplicationAttemptId().getApplicationId(); - SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId); - - if (subClusterInfo == null) { - RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " + - applicationId, null); - } + SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId.toString()); DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); @@ -1556,24 +1583,26 @@ private Map invokeConcurrent(Collection c /** * get the HomeSubCluster according to ApplicationId. * - * @param applicationId applicationId + * @param appId applicationId * @return HomeSubCluster * @throws YarnException on failure */ - private SubClusterInfo getHomeSubClusterInfoByAppId(ApplicationId applicationId) throws YarnException { + private SubClusterInfo getHomeSubClusterInfoByAppId(String appId) + throws YarnException { SubClusterInfo subClusterInfo = null; - SubClusterId subClusterId = null; try { - subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId); + ApplicationId applicationId = ApplicationId.fromString(appId); + SubClusterId subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId); if (subClusterId == null) { - RouterServerUtil.logAndThrowException("Can't get HomeSubCluster by applicationId " - + applicationId, null); + RouterServerUtil.logAndThrowException(null, + "Can't get HomeSubCluster by applicationId %s", applicationId); } subClusterInfo = federationFacade.getSubCluster(subClusterId); + return subClusterInfo; } catch (YarnException e) { - RouterServerUtil.logAndThrowException("Get HomeSubClusterInfo by applicationId " - + applicationId + " failed.", e); + RouterServerUtil.logAndThrowException(e, + "Get HomeSubClusterInfo by applicationId %s failed.", appId); } - return subClusterInfo; + throw new YarnException("Unable to get subCluster by applicationId = " + appId); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java index 8a6aa6dbb1..cedcae6937 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java @@ -49,6 +49,11 @@ import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.SignalContainerCommand; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; @@ -67,6 +72,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo; +import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.webapp.NotFoundException; @@ -412,4 +419,48 @@ public Response signalToContainer(String containerId, String command, return Response.status(Status.OK).build(); } + + @Override + public AppAttemptInfo getAppAttempt(HttpServletRequest req, HttpServletResponse res, + String appId, String appAttemptId) { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.contains(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + + ApplicationReport newApplicationReport = ApplicationReport.newInstance( + applicationId, ApplicationAttemptId.newInstance(applicationId, Integer.parseInt(appAttemptId)), + "user", "queue", "appname", "host", 124, null, + YarnApplicationState.RUNNING, "diagnostics", "url", 1, 2, 3, 4, + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); + + ApplicationAttemptReport attempt = ApplicationAttemptReport.newInstance( + ApplicationAttemptId.newInstance(applicationId, Integer.parseInt(appAttemptId)), + "host", 124, "url", "oUrl", "diagnostics", + YarnApplicationAttemptState.FINISHED, ContainerId.newContainerId( + newApplicationReport.getCurrentApplicationAttemptId(), 1)); + + return new AppAttemptInfo(attempt); + } + + @Override + public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.contains(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + + AppAttemptsInfo infos = new AppAttemptsInfo(); + infos.add(TestRouterWebServiceUtil.generateAppAttemptInfo(0)); + infos.add(TestRouterWebServiceUtil.generateAppAttemptInfo(1)); + return infos; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index 6233300723..d3625ff089 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; @@ -717,4 +719,60 @@ public void testGetContainer() appId.toString(), appAttemptId.toString(), "0"); Assert.assertNotNull(containerInfo); } + + @Test + public void testGetAppAttempts() + throws IOException, InterruptedException, YarnException { + // Submit application to multiSubCluster + ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); + ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + + Assert.assertNotNull(interceptor.submitApplication(context, null)); + + AppAttemptsInfo appAttemptsInfo = interceptor.getAppAttempts(null, appId.toString()); + Assert.assertNotNull(appAttemptsInfo); + + ArrayList attemptLists = appAttemptsInfo.getAttempts(); + Assert.assertNotNull(appAttemptsInfo); + Assert.assertEquals(2, attemptLists.size()); + + AppAttemptInfo attemptInfo1 = attemptLists.get(0); + Assert.assertNotNull(attemptInfo1); + Assert.assertEquals(0, attemptInfo1.getAttemptId()); + Assert.assertEquals("AppAttemptId_0", attemptInfo1.getAppAttemptId()); + Assert.assertEquals("LogLink_0", attemptInfo1.getLogsLink()); + Assert.assertEquals(1659621705L, attemptInfo1.getFinishedTime()); + + AppAttemptInfo attemptInfo2 = attemptLists.get(1); + Assert.assertNotNull(attemptInfo2); + Assert.assertEquals(0, attemptInfo2.getAttemptId()); + Assert.assertEquals("AppAttemptId_1", attemptInfo2.getAppAttemptId()); + Assert.assertEquals("LogLink_1", attemptInfo2.getLogsLink()); + Assert.assertEquals(1659621705L, attemptInfo2.getFinishedTime()); + } + + @Test + public void testGetAppAttempt() + throws IOException, InterruptedException, YarnException { + + // Generate ApplicationId information + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + + // Generate ApplicationAttemptId information + Assert.assertNotNull(interceptor.submitApplication(context, null)); + ApplicationAttemptId expectAppAttemptId = ApplicationAttemptId.newInstance(appId, 1); + + org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo + appAttemptInfo = interceptor.getAppAttempt(null, null, appId.toString(), "1"); + + Assert.assertNotNull(appAttemptInfo); + Assert.assertEquals(expectAppAttemptId.toString(), appAttemptInfo.getAppAttemptId()); + Assert.assertEquals("url", appAttemptInfo.getTrackingUrl()); + Assert.assertEquals("oUrl", appAttemptInfo.getOriginalTrackingUrl()); + Assert.assertEquals(124, appAttemptInfo.getRpcPort()); + Assert.assertEquals("host", appAttemptInfo.getHost()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java index edf3804d82..565074b61f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java @@ -30,12 +30,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * Test class to validate RouterWebServiceUtil methods. */ @@ -579,4 +583,13 @@ private void setUpClusterMetrics(ClusterMetricsInfo metrics, long seed) { metrics.setActiveNodes(rand.nextInt(1000)); metrics.setShutdownNodes(rand.nextInt(1000)); } + + public static AppAttemptInfo generateAppAttemptInfo(int attemptId) { + AppAttemptInfo appAttemptInfo = mock(AppAttemptInfo.class); + when(appAttemptInfo.getAppAttemptId()).thenReturn("AppAttemptId_" + attemptId); + when(appAttemptInfo.getAttemptId()).thenReturn(0); + when(appAttemptInfo.getFinishedTime()).thenReturn(1659621705L); + when(appAttemptInfo.getLogsLink()).thenReturn("LogLink_" + attemptId); + return appAttemptInfo; + } } \ No newline at end of file