From 59f3a168199d47bf28777cb6eead6a5de8d073ae Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Sat, 12 Aug 2023 04:02:58 +0800 Subject: [PATCH] YARN-11153. Make proxy server support YARN federation. (#4314) --- .../resourcemanager/ResourceManager.java | 5 +- .../server/webproxy/AppReportFetcher.java | 115 +++--- .../webproxy/DefaultAppReportFetcher.java | 95 +++++ .../server/webproxy/FedAppReportFetcher.java | 110 ++++++ .../yarn/server/webproxy/WebAppProxy.java | 12 +- .../server/webproxy/WebAppProxyServlet.java | 32 +- .../server/webproxy/TestAppReportFetcher.java | 2 +- .../webproxy/TestFedAppReportFetcher.java | 169 ++++++++ .../webproxy/TestWebAppProxyServlet.java | 27 +- .../webproxy/TestWebAppProxyServletFed.java | 366 ++++++++++++++++++ 10 files changed, 854 insertions(+), 79 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/DefaultAppReportFetcher.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/FedAppReportFetcher.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestFedAppReportFetcher.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServletFed.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 52aa466b3f..2730dde72f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -24,6 +24,7 @@ import com.sun.jersey.spi.container.servlet.ServletContainer; import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics; +import org.apache.hadoop.yarn.server.webproxy.DefaultAppReportFetcher; import org.apache.hadoop.yarn.webapp.WebAppException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1426,9 +1427,9 @@ protected void startWepApp() { if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf). equals(proxyHostAndPort)) { if (HAUtil.isHAEnabled(conf)) { - fetcher = new AppReportFetcher(conf); + fetcher = new DefaultAppReportFetcher(conf); } else { - fetcher = new AppReportFetcher(conf, getClientRMService()); + fetcher = new DefaultAppReportFetcher(conf, getClientRMService()); } builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java index 94c34da031..245e471cdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java @@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.server.webproxy; import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; @@ -27,41 +30,45 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.client.AHSProxy; -import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.StringHelper; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; /** * This class abstracts away how ApplicationReports are fetched. */ -public class AppReportFetcher { - enum AppReportSource { RM, AHS } +public abstract class AppReportFetcher { + + protected enum AppReportSource {RM, AHS} + private final Configuration conf; - private final ApplicationClientProtocol applicationsManager; - private final ApplicationHistoryProtocol historyManager; + private ApplicationHistoryProtocol historyManager; + private String ahsAppPageUrlBase; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private boolean isAHSEnabled; /** - * Create a new Connection to the RM/Application History Server - * to fetch Application reports. + * Create a new Connection to the RM/Application History Server to fetch Application reports. + * * @param conf the conf to use to know where the RM is. */ public AppReportFetcher(Configuration conf) { + this.conf = conf; if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { - isAHSEnabled = true; + this.isAHSEnabled = true; + String scheme = WebAppUtils.getHttpSchemePrefix(conf); + String historyUrl = WebAppUtils.getAHSWebAppURLWithoutScheme(conf); + this.ahsAppPageUrlBase = StringHelper.pjoin(scheme + historyUrl, "applicationhistory", "app"); } - this.conf = conf; try { - applicationsManager = ClientRMProxy.createRMProxy(conf, - ApplicationClientProtocol.class); - if (isAHSEnabled) { - historyManager = getAHSProxy(conf); + if (this.isAHSEnabled) { + this.historyManager = getAHSProxy(conf); } else { this.historyManager = null; } @@ -69,39 +76,13 @@ public AppReportFetcher(Configuration conf) { throw new YarnRuntimeException(e); } } - - /** - * Create a direct connection to RM instead of a remote connection when - * the proxy is running as part of the RM. Also create a remote connection to - * Application History Server if it is enabled. - * @param conf the configuration to use - * @param applicationsManager what to use to get the RM reports. - */ - public AppReportFetcher(Configuration conf, ApplicationClientProtocol applicationsManager) { - if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, - YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { - isAHSEnabled = true; - } - this.conf = conf; - this.applicationsManager = applicationsManager; - if (isAHSEnabled) { - try { - historyManager = getAHSProxy(conf); - } catch (IOException e) { - throw new YarnRuntimeException(e); - } - } else { - this.historyManager = null; - } - } protected ApplicationHistoryProtocol getAHSProxy(Configuration configuration) throws IOException { - return AHSProxy.createAHSProxy(configuration, - ApplicationHistoryProtocol.class, - configuration.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, + InetSocketAddress addr = configuration.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT)); + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT); + return AHSProxy.createAHSProxy(configuration, ApplicationHistoryProtocol.class, addr); } /** @@ -112,17 +93,29 @@ protected ApplicationHistoryProtocol getAHSProxy(Configuration configuration) * @throws YarnException on any error. * @throws IOException */ - public FetchedAppReport getApplicationReport(ApplicationId appId) - throws YarnException, IOException { - GetApplicationReportRequest request = recordFactory - .newRecordInstance(GetApplicationReportRequest.class); + public abstract FetchedAppReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException; + + /** + * Get an application report for the specified application id from the RM and + * fall back to the Application History Server if not found in RM. + * + * @param applicationsManager what to use to get the RM reports. + * @param appId id of the application to get. + * @return the ApplicationReport for the appId. + * @throws YarnException on any error. + * @throws IOException connection exception. + */ + protected FetchedAppReport getApplicationReport(ApplicationClientProtocol applicationsManager, + ApplicationId appId) throws YarnException, IOException { + GetApplicationReportRequest request = + this.recordFactory.newRecordInstance(GetApplicationReportRequest.class); request.setApplicationId(appId); ApplicationReport appReport; FetchedAppReport fetchedAppReport; try { - appReport = applicationsManager. - getApplicationReport(request).getApplicationReport(); + appReport = applicationsManager.getApplicationReport(request).getApplicationReport(); fetchedAppReport = new FetchedAppReport(appReport, AppReportSource.RM); } catch (ApplicationNotFoundException e) { if (!isAHSEnabled) { @@ -130,33 +123,43 @@ public FetchedAppReport getApplicationReport(ApplicationId appId) throw e; } //Fetch the application report from AHS - appReport = historyManager. - getApplicationReport(request).getApplicationReport(); + appReport = historyManager.getApplicationReport(request).getApplicationReport(); fetchedAppReport = new FetchedAppReport(appReport, AppReportSource.AHS); } return fetchedAppReport; } + public abstract String getRmAppPageUrlBase(ApplicationId appId) throws IOException, YarnException; + + public String getAhsAppPageUrlBase() { + return this.ahsAppPageUrlBase; + } + + protected Configuration getConf() { + return this.conf; + } + public void stop() { - if (this.applicationsManager != null) { - RPC.stopProxy(this.applicationsManager); - } if (this.historyManager != null) { RPC.stopProxy(this.historyManager); } } + @VisibleForTesting + public void setHistoryManager(ApplicationHistoryProtocol historyManager) { + this.historyManager = historyManager; + } + /* * This class creates a bundle of the application report and the source from * where the report was fetched. This allows the WebAppProxyServlet * to make decisions for the application report based on the source. */ - static class FetchedAppReport { + protected static class FetchedAppReport { private ApplicationReport appReport; private AppReportSource appReportSource; - public FetchedAppReport(ApplicationReport appReport, - AppReportSource appReportSource) { + public FetchedAppReport(ApplicationReport appReport, AppReportSource appReportSource) { this.appReport = appReport; this.appReportSource = appReportSource; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/DefaultAppReportFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/DefaultAppReportFetcher.java new file mode 100644 index 0000000000..1c2b0c0c34 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/DefaultAppReportFetcher.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.webproxy; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.util.StringHelper; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; + +public class DefaultAppReportFetcher extends AppReportFetcher { + + private final ApplicationClientProtocol applicationsManager; + private String rmAppPageUrlBase; + + /** + * Create a new Connection to the RM/Application History Server + * to fetch Application reports. + * + * @param conf the conf to use to know where the RM is. + */ + public DefaultAppReportFetcher(Configuration conf) { + super(conf); + this.rmAppPageUrlBase = + StringHelper.pjoin(WebAppUtils.getResolvedRMWebAppURLWithScheme(conf), "cluster", "app"); + try { + this.applicationsManager = ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class); + } catch (IOException e) { + throw new YarnRuntimeException(e); + } + } + + /** + * Create a direct connection to RM instead of a remote connection when + * the proxy is running as part of the RM. Also create a remote connection to + * Application History Server if it is enabled. + * + * @param conf the configuration to use + * @param applicationsManager what to use to get the RM reports. + */ + public DefaultAppReportFetcher(Configuration conf, + ApplicationClientProtocol applicationsManager) { + super(conf); + this.rmAppPageUrlBase = + StringHelper.pjoin(WebAppUtils.getResolvedRMWebAppURLWithScheme(conf), "cluster", "app"); + this.applicationsManager = applicationsManager; + } + + /** + * Get an application report for the specified application id from the RM and + * fall back to the Application History Server if not found in RM. + * + * @param appId id of the application to get. + * @return the ApplicationReport for the appId. + * @throws YarnException on any error. + * @throws IOException connection exception. + */ + @Override + public FetchedAppReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException { + return super.getApplicationReport(applicationsManager, appId); + } + + public String getRmAppPageUrlBase(ApplicationId appId) throws YarnException, IOException { + return this.rmAppPageUrlBase; + } + + public void stop() { + super.stop(); + if (this.applicationsManager != null) { + RPC.stopProxy(this.applicationsManager); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/FedAppReportFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/FedAppReportFetcher.java new file mode 100644 index 0000000000..24e6751608 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/FedAppReportFetcher.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.webproxy; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.util.StringHelper; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; + +public class FedAppReportFetcher extends AppReportFetcher { + + private final Map> subClusters; + private FederationStateStoreFacade federationFacade; + + /** + * Create a new Connection to the RM/Application History Server to fetch + * Application reports. + * + * @param conf the conf to use to know where the RM is. + */ + public FedAppReportFetcher(Configuration conf) { + super(conf); + subClusters = new ConcurrentHashMap<>(); + federationFacade = FederationStateStoreFacade.getInstance(); + } + + /** + * Get an application report for the specified application id from the RM and + * fall back to the Application History Server if not found in RM. + * + * @param appId id of the application to get. + * @return the ApplicationReport for the appId. + * @throws YarnException on any error. + * @throws IOException connection exception. + */ + @Override + public FetchedAppReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException { + SubClusterId scid = federationFacade.getApplicationHomeSubCluster(appId); + createSubclusterIfAbsent(scid); + ApplicationClientProtocol applicationsManager = subClusters.get(scid).getRight(); + + return super.getApplicationReport(applicationsManager, appId); + } + + @Override + public String getRmAppPageUrlBase(ApplicationId appId) + throws IOException, YarnException { + SubClusterId scid = federationFacade.getApplicationHomeSubCluster(appId); + createSubclusterIfAbsent(scid); + + SubClusterInfo subClusterInfo = subClusters.get(scid).getLeft(); + String scheme = WebAppUtils.getHttpSchemePrefix(getConf()); + return StringHelper.pjoin(scheme + subClusterInfo.getRMWebServiceAddress(), "cluster", "app"); + } + + private void createSubclusterIfAbsent(SubClusterId scId) throws YarnException, IOException { + if (subClusters.containsKey(scId)) { + return; + } + SubClusterInfo subClusterInfo = federationFacade.getSubCluster(scId); + Configuration subClusterConf = new Configuration(getConf()); + FederationProxyProviderUtil + .updateConfForFederation(subClusterConf, subClusterInfo.getSubClusterId().toString()); + ApplicationClientProtocol proxy = + ClientRMProxy.createRMProxy(subClusterConf, ApplicationClientProtocol.class); + subClusters.put(scId, Pair.of(subClusterInfo, proxy)); + } + + public void stop() { + super.stop(); + for (Pair pair : this.subClusters.values()) { + RPC.stopProxy(pair.getRight()); + } + } + + @VisibleForTesting + public void registerSubCluster(SubClusterInfo info, ApplicationClientProtocol proxy) { + subClusters.put(info.getSubClusterId(), Pair.of(info, proxy)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java index 0a65c6d34a..bfe469be8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java @@ -26,6 +26,7 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @@ -71,7 +72,11 @@ protected void serviceInit(Configuration conf) throws Exception { String[] proxyParts = proxy.split(":"); proxyHost = proxyParts[0]; - fetcher = new AppReportFetcher(conf); + if (HAUtil.isFederationEnabled(conf)) { + fetcher = new FedAppReportFetcher(conf); + } else { + fetcher = new DefaultAppReportFetcher(conf); + } bindAddress = conf.get(YarnConfiguration.PROXY_ADDRESS); if(bindAddress == null || bindAddress.isEmpty()) { throw new YarnRuntimeException(YarnConfiguration.PROXY_ADDRESS + @@ -157,4 +162,9 @@ public void join() { String getBindAddress() { return bindAddress + ":" + port; } + + @VisibleForTesting + public AppReportFetcher getFetcher() { + return fetcher; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxyServlet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxyServlet.java index 75b891b720..56adabe8f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxyServlet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxyServlet.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Set; +import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.servlet.http.Cookie; import javax.servlet.http.HttpServlet; @@ -95,8 +96,6 @@ public class WebAppProxyServlet extends HttpServlet { public static final String PROXY_USER_COOKIE_NAME = "proxy-user"; private transient List trackingUriPlugins; - private final String rmAppPageUrlBase; - private final String ahsAppPageUrlBase; private final String failurePageUrlBase; private transient YarnConfiguration conf; @@ -134,16 +133,21 @@ public WebAppProxyServlet() { this.trackingUriPlugins = conf.getInstances(YarnConfiguration.YARN_TRACKING_URL_GENERATOR, TrackingUriPlugin.class); - this.rmAppPageUrlBase = - StringHelper.pjoin(WebAppUtils.getResolvedRMWebAppURLWithScheme(conf), - "cluster", "app"); this.failurePageUrlBase = StringHelper.pjoin(WebAppUtils.getResolvedRMWebAppURLWithScheme(conf), "cluster", "failure"); - this.ahsAppPageUrlBase = - StringHelper.pjoin(WebAppUtils.getHttpSchemePrefix(conf) - + WebAppUtils.getAHSWebAppURLWithoutScheme(conf), - "applicationhistory", "app"); + } + + private String getRmAppPageUrlBase(ApplicationId id) throws YarnException, IOException { + ServletContext context = getServletContext(); + AppReportFetcher af = (AppReportFetcher) context.getAttribute(WebAppProxy.FETCHER_ATTRIBUTE); + return af.getRmAppPageUrlBase(id); + } + + private String getAhsAppPageUrlBase() { + ServletContext context = getServletContext(); + AppReportFetcher af = (AppReportFetcher) context.getAttribute(WebAppProxy.FETCHER_ATTRIBUTE); + return af.getAhsAppPageUrlBase(); } /** @@ -578,7 +582,7 @@ private URI buildTrackingUrl(URI trackingUri, final HttpServletRequest req, */ private URI getTrackingUri(HttpServletRequest req, HttpServletResponse resp, ApplicationId id, String originalUri, AppReportSource appReportSource) - throws IOException, URISyntaxException { + throws IOException, URISyntaxException, YarnException { URI trackingUri = null; if ((originalUri == null) || @@ -589,15 +593,15 @@ private URI getTrackingUri(HttpServletRequest req, HttpServletResponse resp, // and Application Report was fetched from RM LOG.debug("Original tracking url is '{}'. Redirecting to RM app page", originalUri == null ? "NULL" : originalUri); - ProxyUtils.sendRedirect(req, resp, - StringHelper.pjoin(rmAppPageUrlBase, id.toString())); + ProxyUtils.sendRedirect(req, resp, StringHelper.pjoin(getRmAppPageUrlBase(id), + id.toString())); } else if (appReportSource == AppReportSource.AHS) { // fallback to Application History Server app page if the application // report was fetched from AHS LOG.debug("Original tracking url is '{}'. Redirecting to AHS app page", originalUri == null ? "NULL" : originalUri); - ProxyUtils.sendRedirect(req, resp, - StringHelper.pjoin(ahsAppPageUrlBase, id.toString())); + ProxyUtils.sendRedirect(req, resp, StringHelper.pjoin(getAhsAppPageUrlBase(), + id.toString())); } } else if (ProxyUriUtils.getSchemeFromUrl(originalUri).isEmpty()) { trackingUri = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestAppReportFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestAppReportFetcher.java index cc7542f3e5..fdd0779109 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestAppReportFetcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestAppReportFetcher.java @@ -92,7 +92,7 @@ void testFetchReportAHSDisabled() throws YarnException, IOException { } } - static class AppReportFetcherForTest extends AppReportFetcher { + static class AppReportFetcherForTest extends DefaultAppReportFetcher { public AppReportFetcherForTest(Configuration conf, ApplicationClientProtocol acp) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestFedAppReportFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestFedAppReportFetcher.java new file mode 100644 index 0000000000..e1aaf7ae69 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestFedAppReportFetcher.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.webproxy; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.util.StringHelper; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.fail; + +public class TestFedAppReportFetcher { + + private Configuration conf; + private static ApplicationHistoryProtocol history; + + private SubClusterId subClusterId1 = SubClusterId.newInstance("subCluster1"); + private SubClusterId subClusterId2 = SubClusterId.newInstance("subCluster2"); + private SubClusterInfo clusterInfo1 = SubClusterInfo.newInstance(subClusterId1, "10.0.0.1:1000", + "10.0.0.1:1000", "10.0.0.1:1000", "10.0.0.1:1000", SubClusterState.SC_RUNNING, 0L, ""); + private SubClusterInfo clusterInfo2 = SubClusterInfo.newInstance(subClusterId2, "10.0.0.2:1000", + "10.0.0.2:1000", "10.0.0.2:1000", "10.0.0.2:1000", SubClusterState.SC_RUNNING, 0L, ""); + private ApplicationClientProtocol appManager1; + private ApplicationClientProtocol appManager2; + private ApplicationId appId1 = ApplicationId.newInstance(0, 1); + private ApplicationId appId2 = ApplicationId.newInstance(0, 2); + + private static FedAppReportFetcher fetcher; + private final String appNotFoundExceptionMsg = "APP NOT FOUND"; + + @After + public void cleanUp() { + history = null; + fetcher = null; + } + + private void testHelper(boolean isAHSEnabled) + throws YarnException, IOException { + conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, isAHSEnabled); + + FederationStateStoreFacade fedFacade = FederationStateStoreFacade.getInstance(); + FederationStateStore fss = new MemoryFederationStateStore(); + fss.init(conf); + fedFacade.reinitialize(fss, conf); + + fss.registerSubCluster(SubClusterRegisterRequest.newInstance(clusterInfo1)); + fss.registerSubCluster(SubClusterRegisterRequest.newInstance(clusterInfo2)); + fss.addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest + .newInstance(ApplicationHomeSubCluster.newInstance(appId1, subClusterId1))); + fss.addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest + .newInstance(ApplicationHomeSubCluster.newInstance(appId2, subClusterId2))); + + appManager1 = Mockito.mock(ApplicationClientProtocol.class); + Mockito.when(appManager1.getApplicationReport(Mockito.any(GetApplicationReportRequest.class))) + .thenThrow(new ApplicationNotFoundException(appNotFoundExceptionMsg)); + + appManager2 = Mockito.mock(ApplicationClientProtocol.class); + Mockito.when(appManager2.getApplicationReport(Mockito.any(GetApplicationReportRequest.class))) + .thenThrow(new ApplicationNotFoundException(appNotFoundExceptionMsg)); + + fetcher = new TestFedAppReportFetcher.FedAppReportFetcherForTest(conf); + fetcher.registerSubCluster(clusterInfo1, appManager1); + fetcher.registerSubCluster(clusterInfo2, appManager2); + } + + @Test + public void testFetchReportAHSEnabled() throws YarnException, IOException { + testHelper(true); + fetcher.getApplicationReport(appId1); + fetcher.getApplicationReport(appId2); + Mockito.verify(history, Mockito.times(2)) + .getApplicationReport(Mockito.any(GetApplicationReportRequest.class)); + Mockito.verify(appManager1, Mockito.times(1)) + .getApplicationReport(Mockito.any(GetApplicationReportRequest.class)); + Mockito.verify(appManager2, Mockito.times(1)) + .getApplicationReport(Mockito.any(GetApplicationReportRequest.class)); + } + + @Test + public void testFetchReportAHSDisabled() throws Exception { + testHelper(false); + + /* RM will not know of the app and Application History Service is disabled + * So we will not try to get the report from AHS and RM will throw + * ApplicationNotFoundException + */ + LambdaTestUtils.intercept(ApplicationNotFoundException.class, appNotFoundExceptionMsg, + () -> fetcher.getApplicationReport(appId1)); + LambdaTestUtils.intercept(ApplicationNotFoundException.class, appNotFoundExceptionMsg, + () -> fetcher.getApplicationReport(appId2)); + + Mockito.verify(appManager1, Mockito.times(1)) + .getApplicationReport(Mockito.any(GetApplicationReportRequest.class)); + Mockito.verify(appManager2, Mockito.times(1)) + .getApplicationReport(Mockito.any(GetApplicationReportRequest.class)); + Assert.assertNull("HistoryManager should be null as AHS is disabled", history); + } + + @Test + public void testGetRmAppPageUrlBase() throws IOException, YarnException { + testHelper(true); + String scheme = WebAppUtils.getHttpSchemePrefix(conf); + Assert.assertEquals(fetcher.getRmAppPageUrlBase(appId1), + StringHelper.pjoin(scheme + clusterInfo1.getRMWebServiceAddress(), "cluster", "app")); + Assert.assertEquals(fetcher.getRmAppPageUrlBase(appId2), + StringHelper.pjoin(scheme + clusterInfo2.getRMWebServiceAddress(), "cluster", "app")); + } + + static class FedAppReportFetcherForTest extends FedAppReportFetcher { + + FedAppReportFetcherForTest(Configuration conf) { + super(conf); + } + + @Override + protected ApplicationHistoryProtocol getAHSProxy(Configuration conf) + throws IOException { + GetApplicationReportResponse resp = Mockito.mock(GetApplicationReportResponse.class); + history = Mockito.mock(ApplicationHistoryProtocol.class); + try { + Mockito.when(history.getApplicationReport(Mockito.any(GetApplicationReportRequest.class))) + .thenReturn(resp); + } catch (YarnException e) { + // This should never happen + fail("Found exception when getApplicationReport!"); + } + return history; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServlet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServlet.java index 39699ae7d9..1d0ca00e7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServlet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServlet.java @@ -54,6 +54,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.http.HttpServer2; @@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.StringHelper; import org.apache.hadoop.yarn.webapp.MimeType; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @@ -110,8 +112,7 @@ public static void start() throws Exception { ((ServerConnector)server.getConnectors()[0]).setHost("localhost"); server.start(); originalPort = ((ServerConnector)server.getConnectors()[0]).getLocalPort(); - LOG.info("Running embedded servlet container at: http://localhost:" - + originalPort); + LOG.info("Running embedded servlet container at: http://localhost:{}", originalPort); // This property needs to be set otherwise CORS Headers will be dropped // by HttpUrlConnection System.setProperty("sun.net.http.allowRestrictedHeaders", "true"); @@ -364,11 +365,13 @@ void testAppReportForEmptyTrackingUrl() throws Exception { String appAddressInRm = WebAppUtils.getResolvedRMWebAppURLWithScheme(configuration) + "/cluster" + "/app/" + app.toString(); - assertEquals(proxyConn.getURL().toString(), appAddressInRm); + assertEquals(proxyConn.getURL().toString(), appAddressInRm, + "Webapp proxy servlet should have redirected to RM"); //set AHS_ENABLED = true to simulate getting the app report from AHS configuration.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true); + proxy.proxy.appReportFetcher.setAhsAppPageUrlBase(configuration); proxyConn = (HttpURLConnection) url.openConnection(); proxyConn.connect(); try { @@ -381,7 +384,8 @@ void testAppReportForEmptyTrackingUrl() throws Exception { String appAddressInAhs = WebAppUtils.getHttpSchemePrefix(configuration) + WebAppUtils.getAHSWebAppURLWithoutScheme( configuration) + "/applicationhistory" + "/app/" + app.toString(); - assertEquals(proxyConn.getURL().toString(), appAddressInAhs); + assertEquals(proxyConn.getURL().toString(), appAddressInAhs, + "Webapp proxy servlet should have redirected to AHS"); } finally { proxy.close(); } @@ -607,8 +611,9 @@ protected void serviceStart() throws Exception { } - private class AppReportFetcherForTest extends AppReportFetcher { + private class AppReportFetcherForTest extends DefaultAppReportFetcher { int answer = 0; + private String ahsAppPageUrlBase = null; public AppReportFetcherForTest(Configuration conf) { super(conf); @@ -679,5 +684,17 @@ private FetchedAppReport getDefaultApplicationReport(ApplicationId appId, private FetchedAppReport getDefaultApplicationReport(ApplicationId appId) { return getDefaultApplicationReport(appId, true); } + + @VisibleForTesting + public String getAhsAppPageUrlBase() { + return ahsAppPageUrlBase != null ? ahsAppPageUrlBase : super.getAhsAppPageUrlBase(); + } + + @VisibleForTesting + public void setAhsAppPageUrlBase(Configuration conf) { + this.ahsAppPageUrlBase = StringHelper.pjoin( + WebAppUtils.getHttpSchemePrefix(conf) + WebAppUtils.getAHSWebAppURLWithoutScheme(conf), + "applicationhistory", "app"); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServletFed.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServletFed.java new file mode 100644 index 0000000000..8931bc577a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServletFed.java @@ -0,0 +1,366 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.webproxy; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; + +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.HttpServer2; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test the WebAppProxyServlet and WebAppProxy. For back end use simple web server. + */ +public class TestWebAppProxyServletFed { + + private static final Logger LOG = LoggerFactory.getLogger(TestWebAppProxyServletFed.class); + + public static final String AM_PREFIX = "AM"; + public static final String RM_PREFIX = "RM"; + public static final String AHS_PREFIX = "AHS"; + + /* + * Mocked Server is used for simulating the web of AppMaster, ResourceMamanger or TimelineServer. + * */ + private static Server mockServer; + private static int mockServerPort = 0; + + /** + * Simple http server. Server should send answer with status 200 + */ + @BeforeClass + public static void setUp() throws Exception { + mockServer = new Server(0); + ((QueuedThreadPool) mockServer.getThreadPool()).setMaxThreads(20); + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + context.addServlet(new ServletHolder(new MockWebServlet(AM_PREFIX)), "/amweb/*"); + context.addServlet(new ServletHolder(new MockWebServlet(RM_PREFIX)), "/cluster/app/*"); + context.addServlet(new ServletHolder(new MockWebServlet(AHS_PREFIX)), + "/applicationhistory/app/*"); + mockServer.setHandler(context); + + ((ServerConnector) mockServer.getConnectors()[0]).setHost("localhost"); + mockServer.start(); + mockServerPort = ((ServerConnector) mockServer.getConnectors()[0]).getLocalPort(); + LOG.info("Running embedded servlet container at: http://localhost:" + mockServerPort); + } + + @AfterClass + public static void tearDown() throws Exception { + if (mockServer != null) { + mockServer.stop(); + mockServer.destroy(); + mockServer = null; + } + } + + @Test + public void testWebServlet() throws IOException { + HttpURLConnection conn; + // 1. Mocked AppMaster web Test + URL url = new URL("http", "localhost", mockServerPort, "/amweb/apptest"); + conn = (HttpURLConnection) url.openConnection(); + conn.connect(); + assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + assertEquals(AM_PREFIX + "/apptest", readResponse(conn)); + conn.disconnect(); + + // 2. Mocked RM web Test + url = new URL("http", "localhost", mockServerPort, "/cluster/app/apptest"); + conn = (HttpURLConnection) url.openConnection(); + conn.connect(); + assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + assertEquals(RM_PREFIX + "/apptest", readResponse(conn)); + conn.disconnect(); + + // 3. Mocked AHS web Test + url = new URL("http", "localhost", mockServerPort, "/applicationhistory/app/apptest"); + conn = (HttpURLConnection) url.openConnection(); + conn.connect(); + assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + assertEquals(AHS_PREFIX + "/apptest", readResponse(conn)); + conn.disconnect(); + } + + @Test(timeout=5000) + public void testWebAppProxyServletFed() throws Exception { + + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.PROXY_ADDRESS, "localhost:9090"); + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true); + conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "localhost:" + mockServerPort); + // overriding num of web server threads, see HttpServer.HTTP_MAXTHREADS + conf.setInt("hadoop.http.max.threads", 10); + + // Create sub cluster information. + SubClusterId subClusterId1 = SubClusterId.newInstance("scid1"); + SubClusterId subClusterId2 = SubClusterId.newInstance("scid2"); + SubClusterInfo subClusterInfo1 = SubClusterInfo.newInstance(subClusterId1, "10.0.0.1:1", + "10.0.0.1:1", "10.0.0.1:1", "localhost:" + mockServerPort, SubClusterState.SC_RUNNING, 0, + ""); + SubClusterInfo subClusterInfo2 = SubClusterInfo.newInstance(subClusterId2, "10.0.0.2:1", + "10.0.0.2:1", "10.0.0.2:1", "10.0.0.2:1", SubClusterState.SC_RUNNING, 0, ""); + + // App1 and App2 is running applications. + ApplicationId appId1 = ApplicationId.newInstance(0, 1); + ApplicationId appId2 = ApplicationId.newInstance(0, 2); + String appUrl1 = "http://localhost:" + mockServerPort + "/amweb/" + appId1; + String appUrl2 = "http://localhost:" + mockServerPort + "/amweb/" + appId2; + // App3 is accepted application, has not registered original url to am. + ApplicationId appId3 = ApplicationId.newInstance(0, 3); + // App4 is finished application, has remove from rm, but not remove from timeline server. + ApplicationId appId4 = ApplicationId.newInstance(0, 4); + + // Mock for application + ApplicationClientProtocol appManager1 = Mockito.mock(ApplicationClientProtocol.class); + Mockito.when(appManager1.getApplicationReport(GetApplicationReportRequest.newInstance(appId1))) + .thenReturn(GetApplicationReportResponse + .newInstance(newApplicationReport(appId1, YarnApplicationState.RUNNING, appUrl1))); + Mockito.when(appManager1.getApplicationReport(GetApplicationReportRequest.newInstance(appId3))) + .thenReturn(GetApplicationReportResponse + .newInstance(newApplicationReport(appId3, YarnApplicationState.ACCEPTED, null))); + + ApplicationClientProtocol appManager2 = Mockito.mock(ApplicationClientProtocol.class); + Mockito.when(appManager2.getApplicationReport(GetApplicationReportRequest.newInstance(appId2))) + .thenReturn(GetApplicationReportResponse + .newInstance(newApplicationReport(appId2, YarnApplicationState.RUNNING, appUrl2))); + Mockito.when(appManager2.getApplicationReport(GetApplicationReportRequest.newInstance(appId4))) + .thenThrow(new ApplicationNotFoundException("APP NOT FOUND")); + + ApplicationHistoryProtocol historyManager = Mockito.mock(ApplicationHistoryProtocol.class); + Mockito + .when(historyManager.getApplicationReport(GetApplicationReportRequest.newInstance(appId4))) + .thenReturn(GetApplicationReportResponse + .newInstance(newApplicationReport(appId4, YarnApplicationState.FINISHED, null))); + + // Initial federation store. + FederationStateStoreFacade facade = FederationStateStoreFacade.getInstance(); + facade.getStateStore() + .registerSubCluster(SubClusterRegisterRequest.newInstance(subClusterInfo1)); + facade.getStateStore() + .registerSubCluster(SubClusterRegisterRequest.newInstance(subClusterInfo2)); + facade.addApplicationHomeSubCluster( + ApplicationHomeSubCluster.newInstance(appId1, subClusterId1)); + facade.addApplicationHomeSubCluster( + ApplicationHomeSubCluster.newInstance(appId2, subClusterId2)); + facade.addApplicationHomeSubCluster( + ApplicationHomeSubCluster.newInstance(appId3, subClusterId1)); + facade.addApplicationHomeSubCluster( + ApplicationHomeSubCluster.newInstance(appId4, subClusterId2)); + + // Start proxy server + WebAppProxyServerForTest proxy = new WebAppProxyServerForTest(); + proxy.init(conf); + proxy.start(); + + try { + // set Mocked rm and timeline + int proxyPort = proxy.proxy.proxyServer.getConnectorAddress(0).getPort(); + FedAppReportFetcher appReportFetcher = proxy.proxy.appReportFetcher; + appReportFetcher.registerSubCluster(subClusterInfo1, appManager1); + appReportFetcher.registerSubCluster(subClusterInfo2, appManager2); + appReportFetcher.setHistoryManager(historyManager); + + // App1 is running in subcluster1, and original url is registered + // in rm of subCluster1. So proxy server will get original url from rm by + // getApplicationReport. Then proxy server will fetch the webapp directly. + URL url = new URL("http", "localhost", proxyPort, "/proxy/" + appId1.toString()); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.connect(); + assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + assertEquals(AM_PREFIX + "/" + appId1.toString(), readResponse(conn)); + conn.disconnect(); + + // App2 is running in subcluster2, and original url is registered + // in rm of subCluster2. So proxy server will get original url from rm by + // getApplicationReport. Then proxy server will fetch the webapp directly. + url = new URL("http", "localhost", proxyPort, "/proxy/" + appId2.toString()); + conn = (HttpURLConnection) url.openConnection(); + conn.connect(); + assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + assertEquals(AM_PREFIX + "/" + appId2.toString(), readResponse(conn)); + conn.disconnect(); + + // App3 is accepted in subcluster1, and original url is not registered + // yet. So proxy server will fetch the application web from rm. + url = new URL("http", "localhost", proxyPort, "/proxy/" + appId3.toString()); + conn = (HttpURLConnection) url.openConnection(); + conn.connect(); + assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + assertEquals(RM_PREFIX + "/" + appId3.toString(), readResponse(conn)); + conn.disconnect(); + + // App4 is finished in subcluster2, and have removed from rm, but not + // removed from timeline server. So proxy server will fetch the + // application web from timeline server. + url = new URL("http", "localhost", proxyPort, "/proxy/" + appId4.toString()); + conn = (HttpURLConnection) url.openConnection(); + conn.connect(); + assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + assertEquals(AHS_PREFIX + "/" + appId4.toString(), readResponse(conn)); + conn.disconnect(); + } finally { + proxy.close(); + } + } + + private ApplicationReport newApplicationReport(ApplicationId appId, + YarnApplicationState state, String origTrackingUrl) { + return ApplicationReport.newInstance(appId, null, "testuser", null, null, null, 0, null, state, + null, null, 0, 0, 0, null, null, origTrackingUrl, 0f, null, null); + } + + private String readResponse(HttpURLConnection conn) throws IOException { + InputStream input = conn.getInputStream(); + byte[] bytes = new byte[input.available()]; + input.read(bytes); + return new String(bytes); + } + + private class WebAppProxyServerForTest extends CompositeService { + + private WebAppProxyForTest proxy = null; + + WebAppProxyServerForTest() { + super(WebAppProxyServer.class.getName()); + } + + @Override + protected synchronized void serviceInit(Configuration conf) throws Exception { + proxy = new WebAppProxyForTest(); + addService(proxy); + super.serviceInit(conf); + } + } + + /* + * This servlet is used for simulate the web of AppMaster, ResourceManager, + * TimelineServer and so on. + * */ + public static class MockWebServlet extends HttpServlet { + + private String role; + + public MockWebServlet(String role) { + this.role = role; + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) + throws IOException { + if (req.getPathInfo() != null) { + resp.getWriter().write(role + req.getPathInfo()); + } + resp.setStatus(HttpServletResponse.SC_OK); + } + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) + throws IOException { + InputStream is = req.getInputStream(); + OutputStream os = resp.getOutputStream(); + int c = is.read(); + while (c > -1) { + os.write(c); + c = is.read(); + } + is.close(); + os.close(); + resp.setStatus(HttpServletResponse.SC_OK); + } + } + + private class WebAppProxyForTest extends WebAppProxy { + + private HttpServer2 proxyServer; + private FedAppReportFetcher appReportFetcher; + + @Override + protected void serviceStart() throws Exception { + Configuration conf = getConfig(); + String bindAddress = conf.get(YarnConfiguration.PROXY_ADDRESS); + bindAddress = StringUtils.split(bindAddress, ':')[0]; + AccessControlList acl = new AccessControlList(conf.get(YarnConfiguration.YARN_ADMIN_ACL, + YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)); + proxyServer = new HttpServer2.Builder() + .setName("proxy") + .addEndpoint(URI.create(WebAppUtils.getHttpSchemePrefix(conf) + bindAddress + ":0")) + .setFindPort(true) + .setConf(conf) + .setACL(acl) + .build(); + proxyServer.addServlet(ProxyUriUtils.PROXY_SERVLET_NAME, ProxyUriUtils.PROXY_PATH_SPEC, + WebAppProxyServlet.class); + + appReportFetcher = new FedAppReportFetcher(conf); + proxyServer.setAttribute(FETCHER_ATTRIBUTE, appReportFetcher); + proxyServer.setAttribute(IS_SECURITY_ENABLED_ATTRIBUTE, Boolean.FALSE); + + String proxy = WebAppUtils.getProxyHostAndPort(conf); + String[] proxyParts = proxy.split(":"); + String proxyHost = proxyParts[0]; + + proxyServer.setAttribute(PROXY_HOST_ATTRIBUTE, proxyHost); + proxyServer.start(); + LOG.info("Proxy server is started at port {}", proxyServer.getConnectorAddress(0).getPort()); + } + } +}