From da5bcf5f7d40913de2981731e951d662a3279562 Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Thu, 19 Apr 2018 16:49:29 -0700 Subject: [PATCH] YARN-8186. [Router] Federation: routing getAppState REST invocations transparently to multiple RMs. Contributed by Giovanni Matteo Fumarola. --- .../webapp/FederationInterceptorREST.java | 52 +++++++++++++++-- .../MockDefaultRequestInterceptorREST.java | 16 ++++++ .../webapp/TestFederationInterceptorREST.java | 56 +++++++++++++++++++ 3 files changed, 118 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index fad5d966ff..d1d49ec4e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -987,6 +987,52 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { return metrics; } + /** + * The YARN Router will forward to the respective YARN RM in which the AM is + * running. + *

+ * Possible failure: + *

+ * Client: identical behavior as {@code RMWebServices}. + *

+ * Router: the Client will timeout and resubmit the request. + *

+ * ResourceManager: the Router will timeout and the call will fail. + *

+ * State Store: the Router will timeout and it will retry depending on the + * FederationFacade settings - if the failure happened before the select + * operation. + */ + @Override + public AppState getAppState(HttpServletRequest hsr, String appId) + throws AuthorizationException { + + ApplicationId applicationId = null; + try { + applicationId = ApplicationId.fromString(appId); + } catch (IllegalArgumentException e) { + return null; + } + + SubClusterInfo subClusterInfo = null; + SubClusterId subClusterId = null; + try { + subClusterId = + federationFacade.getApplicationHomeSubCluster(applicationId); + if (subClusterId == null) { + return null; + } + subClusterInfo = federationFacade.getSubCluster(subClusterId); + } catch (YarnException e) { + return null; + } + + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster(subClusterId, + subClusterInfo.getRMWebServiceAddress()); + return interceptor.getAppState(hsr, appId); + } + @Override public ClusterInfo get() { return getClusterInfo(); @@ -1025,12 +1071,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { throw new NotImplementedException(); } - @Override - public AppState getAppState(HttpServletRequest hsr, String appId) - throws AuthorizationException { - throw new NotImplementedException(); - } - @Override public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java index 9f54582650..63de9ac18b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java @@ -61,6 +61,7 @@ public class MockDefaultRequestInterceptorREST // down e.g. network issue, failover. private boolean isRunning = true; private HashSet applicationMap = new HashSet<>(); + public static final String APP_STATE_RUNNING = "RUNNING"; private void validateRunning() throws ConnectException { if (!isRunning) { @@ -192,6 +193,21 @@ public class MockDefaultRequestInterceptorREST return metrics; } + @Override + public AppState getAppState(HttpServletRequest hsr, String appId) + throws AuthorizationException { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.contains(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + + return new AppState(APP_STATE_RUNNING); + } + public void setSubClusterId(int subClusterId) { setSubClusterId(SubClusterId.newInstance(Integer.toString(subClusterId))); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index fae4ecf3cd..dc60043eed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -443,4 +443,60 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { // The merge operations is tested in TestRouterWebServiceUtil } + /** + * This test validates the correctness of GetApplicationState in case the + * application exists in the cluster. + */ + @Test + public void testGetApplicationState() + throws YarnException, IOException, InterruptedException { + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContextInfo context = + new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + + // Submit the application we want the report later + Response response = interceptor.submitApplication(context, null); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + AppState responseGet = interceptor.getAppState(null, appId.toString()); + + Assert.assertNotNull(responseGet); + Assert.assertEquals(MockDefaultRequestInterceptorREST.APP_STATE_RUNNING, + responseGet.getState()); + } + + /** + * This test validates the correctness of GetApplicationState in case the + * application does not exist in StateStore. + */ + @Test + public void testGetApplicationStateNotExists() + throws YarnException, IOException, InterruptedException { + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + + AppState response = interceptor.getAppState(null, appId.toString()); + + Assert.assertNull(response); + } + + /** + * This test validates the correctness of GetApplicationState in case of + * application in wrong format. + */ + @Test + public void testGetApplicationStateWrongFormat() + throws YarnException, IOException, InterruptedException { + + AppState response = interceptor.getAppState(null, "Application_wrong_id"); + + Assert.assertNull(response); + } + }