From c5ec7274355da31617399ae221d045b1d01902fd Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Thu, 4 Aug 2022 02:21:48 +0800 Subject: [PATCH] YARN-11230. [Federation] Add getContainer, signalToContainer REST APIs for Router. (#4689) --- .../yarn/server/router/RouterServerUtil.java | 19 ++++ .../webapp/FederationInterceptorREST.java | 91 ++++++++++++++++++- .../MockDefaultRequestInterceptorREST.java | 49 ++++++++++ .../webapp/TestFederationInterceptorREST.java | 19 ++++ 4 files changed, 176 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java index 85b06c073b..173df28de5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java @@ -82,4 +82,23 @@ public static void logAndThrowIOException(String errMsg, Throwable t) } } + /** + * Throws an RunTimeException due to an error. + * + * @param errMsg the error message + * @param t the throwable raised in the called class. + * @throws RuntimeException on failure + */ + @Public + @Unstable + public static void logAndThrowRunTimeException(String errMsg, Throwable t) + throws RuntimeException { + if (t != null) { + LOG.error(errMsg, t); + throw new RuntimeException(errMsg, t); + } else { + LOG.error(errMsg); + throw new RuntimeException(errMsg); + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 2bc6d60b4e..9415f3dd78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -50,6 +50,7 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -1415,7 +1416,39 @@ public ContainersInfo getContainers(HttpServletRequest req, public ContainerInfo getContainer(HttpServletRequest req, HttpServletResponse res, String appId, String appAttemptId, String containerId) { - throw new NotImplementedException("Code is not implemented"); + + if (appId == null || appId.isEmpty()) { + throw new IllegalArgumentException("Parameter error, the appId is empty or null."); + } + if (appAttemptId == null || appAttemptId.isEmpty()) { + throw new IllegalArgumentException("Parameter error, the appAttemptId is empty or null."); + } + if (containerId == null || containerId.isEmpty()) { + throw new IllegalArgumentException("Parameter error, the containerId is empty or null."); + } + + try { + ApplicationId applicationId = ApplicationId.fromString(appId); + SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId); + + if (subClusterInfo == null) { + RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " + + applicationId, null); + } + + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.getContainer(req, res, appId, appAttemptId, containerId); + } catch (IllegalArgumentException e) { + String msg = String.format( + "Unable to get the AppAttempt appId: %s, appAttemptId: %s, containerId: %s.", appId, + appAttemptId, containerId); + RouterServerUtil.logAndThrowRunTimeException(msg, e); + } catch (YarnException e) { + RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e); + } + + return null; } @Override @@ -1442,7 +1475,37 @@ public void setNextInterceptor(RESTRequestInterceptor next) { @Override public Response signalToContainer(String containerId, String command, HttpServletRequest req) { - throw new NotImplementedException("Code is not implemented"); + + if (containerId == null || containerId.isEmpty()) { + throw new IllegalArgumentException("Parameter error, the containerId is empty or null."); + } + + if (command == null || command.isEmpty()) { + throw new IllegalArgumentException("Parameter error, the command is empty or null."); + } + + try { + ContainerId containerIdObj = ContainerId.fromString(containerId); + ApplicationId applicationId = containerIdObj.getApplicationAttemptId().getApplicationId(); + + SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId); + + if (subClusterInfo == null) { + RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " + + applicationId, null); + } + + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + return interceptor.signalToContainer(containerId, command, req); + + } catch (YarnException e) { + RouterServerUtil.logAndThrowRunTimeException("signalToContainer Failed.", e); + } catch (AuthorizationException e) { + RouterServerUtil.logAndThrowRunTimeException("signalToContainer Author Failed.", e); + } + + return null; } @Override @@ -1494,4 +1557,28 @@ private Map invokeConcurrent(Collection c }); return results; } + + /** + * get the HomeSubCluster according to ApplicationId. + * + * @param applicationId applicationId + * @return HomeSubCluster + * @throws YarnException on failure + */ + private SubClusterInfo getHomeSubClusterInfoByAppId(ApplicationId applicationId) throws YarnException { + SubClusterInfo subClusterInfo = null; + SubClusterId subClusterId = null; + try { + subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId); + if (subClusterId == null) { + RouterServerUtil.logAndThrowException("Can't get HomeSubCluster by applicationId " + + applicationId, null); + } + subClusterInfo = federationFacade.getSubCluster(subClusterId); + } catch (YarnException e) { + RouterServerUtil.logAndThrowException("Get HomeSubClusterInfo by applicationId " + + applicationId + " failed.", e); + } + return subClusterInfo; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java index b5ee0bc014..8a6aa6dbb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java @@ -35,6 +35,7 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.commons.lang3.EnumUtils; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.util.Sets; import org.apache.hadoop.util.StringUtils; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; @@ -363,4 +365,51 @@ public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId) thr return null; } } + + @Override + public ContainerInfo getContainer(HttpServletRequest req, HttpServletResponse res, + String appId, String appAttemptId, String containerId) { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + + ContainerId newContainerId = ContainerId.newContainerId( + ApplicationAttemptId.fromString(appAttemptId), Integer.valueOf(containerId)); + + Resource allocatedResource = Resource.newInstance(1024, 2); + + int subClusterId = Integer.valueOf(getSubClusterId().getId()); + NodeId assignedNode = NodeId.newInstance("Node", subClusterId); + Priority priority = Priority.newInstance(subClusterId); + long creationTime = subClusterId; + long finishTime = subClusterId; + String diagnosticInfo = "Diagnostic " + subClusterId; + String logUrl = "Log " + subClusterId; + int containerExitStatus = subClusterId; + ContainerState containerState = ContainerState.COMPLETE; + String nodeHttpAddress = "HttpAddress " + subClusterId; + + ContainerReport containerReport = ContainerReport.newInstance( + newContainerId, allocatedResource, assignedNode, priority, + creationTime, finishTime, diagnosticInfo, logUrl, + containerExitStatus, containerState, nodeHttpAddress); + + return new ContainerInfo(containerReport); + } + + @Override + public Response signalToContainer(String containerId, String command, + HttpServletRequest req) throws AuthorizationException { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + + if (!EnumUtils.isValidEnum(SignalContainerCommand.class, command.toUpperCase())) { + String errMsg = "Invalid command: " + command.toUpperCase() + ", valid commands are: " + + Arrays.asList(SignalContainerCommand.values()); + return Response.status(Status.BAD_REQUEST).entity(errMsg).build(); + } + + return Response.status(Status.OK).build(); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index a4b60a5eb9..6233300723 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo; +import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.util.MonotonicClock; import org.junit.Assert; @@ -698,4 +699,22 @@ public void testGetLabelsOnNode() throws Exception { Assert.assertNotNull(nodeLabelsInfo2); Assert.assertEquals(0, nodeLabelsInfo2.getNodeLabelsName().size()); } + + @Test + public void testGetContainer() + throws IOException, InterruptedException, YarnException { + // Submit application to multiSubCluster + ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); + ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + + Assert.assertNotNull(interceptor.submitApplication(context, null)); + + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + + ContainerInfo containerInfo = interceptor.getContainer(null, null, + appId.toString(), appAttemptId.toString(), "0"); + Assert.assertNotNull(containerInfo); + } }