diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 52aa466b3f..2730dde72f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -24,6 +24,7 @@ import com.sun.jersey.spi.container.servlet.ServletContainer; import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics; +import org.apache.hadoop.yarn.server.webproxy.DefaultAppReportFetcher; import org.apache.hadoop.yarn.webapp.WebAppException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1426,9 +1427,9 @@ protected void startWepApp() { if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf). equals(proxyHostAndPort)) { if (HAUtil.isHAEnabled(conf)) { - fetcher = new AppReportFetcher(conf); + fetcher = new DefaultAppReportFetcher(conf); } else { - fetcher = new AppReportFetcher(conf, getClientRMService()); + fetcher = new DefaultAppReportFetcher(conf, getClientRMService()); } builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java index 94c34da031..245e471cdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java @@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.server.webproxy; import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; @@ -27,41 +30,45 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.client.AHSProxy; -import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.StringHelper; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; /** * This class abstracts away how ApplicationReports are fetched. */ -public class AppReportFetcher { - enum AppReportSource { RM, AHS } +public abstract class AppReportFetcher { + + protected enum AppReportSource {RM, AHS} + private final Configuration conf; - private final ApplicationClientProtocol applicationsManager; - private final ApplicationHistoryProtocol historyManager; + private ApplicationHistoryProtocol historyManager; + private String ahsAppPageUrlBase; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private boolean isAHSEnabled; /** - * Create a new Connection to the RM/Application History Server - * to fetch Application reports. + * Create a new Connection to the RM/Application History Server to fetch Application reports. + * * @param conf the conf to use to know where the RM is. */ public AppReportFetcher(Configuration conf) { + this.conf = conf; if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { - isAHSEnabled = true; + this.isAHSEnabled = true; + String scheme = WebAppUtils.getHttpSchemePrefix(conf); + String historyUrl = WebAppUtils.getAHSWebAppURLWithoutScheme(conf); + this.ahsAppPageUrlBase = StringHelper.pjoin(scheme + historyUrl, "applicationhistory", "app"); } - this.conf = conf; try { - applicationsManager = ClientRMProxy.createRMProxy(conf, - ApplicationClientProtocol.class); - if (isAHSEnabled) { - historyManager = getAHSProxy(conf); + if (this.isAHSEnabled) { + this.historyManager = getAHSProxy(conf); } else { this.historyManager = null; } @@ -69,39 +76,13 @@ public AppReportFetcher(Configuration conf) { throw new YarnRuntimeException(e); } } - - /** - * Create a direct connection to RM instead of a remote connection when - * the proxy is running as part of the RM. Also create a remote connection to - * Application History Server if it is enabled. - * @param conf the configuration to use - * @param applicationsManager what to use to get the RM reports. - */ - public AppReportFetcher(Configuration conf, ApplicationClientProtocol applicationsManager) { - if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, - YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { - isAHSEnabled = true; - } - this.conf = conf; - this.applicationsManager = applicationsManager; - if (isAHSEnabled) { - try { - historyManager = getAHSProxy(conf); - } catch (IOException e) { - throw new YarnRuntimeException(e); - } - } else { - this.historyManager = null; - } - } protected ApplicationHistoryProtocol getAHSProxy(Configuration configuration) throws IOException { - return AHSProxy.createAHSProxy(configuration, - ApplicationHistoryProtocol.class, - configuration.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, + InetSocketAddress addr = configuration.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT)); + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT); + return AHSProxy.createAHSProxy(configuration, ApplicationHistoryProtocol.class, addr); } /** @@ -112,17 +93,29 @@ protected ApplicationHistoryProtocol getAHSProxy(Configuration configuration) * @throws YarnException on any error. * @throws IOException */ - public FetchedAppReport getApplicationReport(ApplicationId appId) - throws YarnException, IOException { - GetApplicationReportRequest request = recordFactory - .newRecordInstance(GetApplicationReportRequest.class); + public abstract FetchedAppReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException; + + /** + * Get an application report for the specified application id from the RM and + * fall back to the Application History Server if not found in RM. + * + * @param applicationsManager what to use to get the RM reports. + * @param appId id of the application to get. + * @return the ApplicationReport for the appId. + * @throws YarnException on any error. + * @throws IOException connection exception. + */ + protected FetchedAppReport getApplicationReport(ApplicationClientProtocol applicationsManager, + ApplicationId appId) throws YarnException, IOException { + GetApplicationReportRequest request = + this.recordFactory.newRecordInstance(GetApplicationReportRequest.class); request.setApplicationId(appId); ApplicationReport appReport; FetchedAppReport fetchedAppReport; try { - appReport = applicationsManager. - getApplicationReport(request).getApplicationReport(); + appReport = applicationsManager.getApplicationReport(request).getApplicationReport(); fetchedAppReport = new FetchedAppReport(appReport, AppReportSource.RM); } catch (ApplicationNotFoundException e) { if (!isAHSEnabled) { @@ -130,33 +123,43 @@ public FetchedAppReport getApplicationReport(ApplicationId appId) throw e; } //Fetch the application report from AHS - appReport = historyManager. - getApplicationReport(request).getApplicationReport(); + appReport = historyManager.getApplicationReport(request).getApplicationReport(); fetchedAppReport = new FetchedAppReport(appReport, AppReportSource.AHS); } return fetchedAppReport; } + public abstract String getRmAppPageUrlBase(ApplicationId appId) throws IOException, YarnException; + + public String getAhsAppPageUrlBase() { + return this.ahsAppPageUrlBase; + } + + protected Configuration getConf() { + return this.conf; + } + public void stop() { - if (this.applicationsManager != null) { - RPC.stopProxy(this.applicationsManager); - } if (this.historyManager != null) { RPC.stopProxy(this.historyManager); } } + @VisibleForTesting + public void setHistoryManager(ApplicationHistoryProtocol historyManager) { + this.historyManager = historyManager; + } + /* * This class creates a bundle of the application report and the source from * where the report was fetched. This allows the WebAppProxyServlet * to make decisions for the application report based on the source. */ - static class FetchedAppReport { + protected static class FetchedAppReport { private ApplicationReport appReport; private AppReportSource appReportSource; - public FetchedAppReport(ApplicationReport appReport, - AppReportSource appReportSource) { + public FetchedAppReport(ApplicationReport appReport, AppReportSource appReportSource) { this.appReport = appReport; this.appReportSource = appReportSource; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/DefaultAppReportFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/DefaultAppReportFetcher.java new file mode 100644 index 0000000000..1c2b0c0c34 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/DefaultAppReportFetcher.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.webproxy; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.util.StringHelper; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; + +public class DefaultAppReportFetcher extends AppReportFetcher { + + private final ApplicationClientProtocol applicationsManager; + private String rmAppPageUrlBase; + + /** + * Create a new Connection to the RM/Application History Server + * to fetch Application reports. + * + * @param conf the conf to use to know where the RM is. + */ + public DefaultAppReportFetcher(Configuration conf) { + super(conf); + this.rmAppPageUrlBase = + StringHelper.pjoin(WebAppUtils.getResolvedRMWebAppURLWithScheme(conf), "cluster", "app"); + try { + this.applicationsManager = ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class); + } catch (IOException e) { + throw new YarnRuntimeException(e); + } + } + + /** + * Create a direct connection to RM instead of a remote connection when + * the proxy is running as part of the RM. Also create a remote connection to + * Application History Server if it is enabled. + * + * @param conf the configuration to use + * @param applicationsManager what to use to get the RM reports. + */ + public DefaultAppReportFetcher(Configuration conf, + ApplicationClientProtocol applicationsManager) { + super(conf); + this.rmAppPageUrlBase = + StringHelper.pjoin(WebAppUtils.getResolvedRMWebAppURLWithScheme(conf), "cluster", "app"); + this.applicationsManager = applicationsManager; + } + + /** + * Get an application report for the specified application id from the RM and + * fall back to the Application History Server if not found in RM. + * + * @param appId id of the application to get. + * @return the ApplicationReport for the appId. + * @throws YarnException on any error. + * @throws IOException connection exception. + */ + @Override + public FetchedAppReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException { + return super.getApplicationReport(applicationsManager, appId); + } + + public String getRmAppPageUrlBase(ApplicationId appId) throws YarnException, IOException { + return this.rmAppPageUrlBase; + } + + public void stop() { + super.stop(); + if (this.applicationsManager != null) { + RPC.stopProxy(this.applicationsManager); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/FedAppReportFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/FedAppReportFetcher.java new file mode 100644 index 0000000000..24e6751608 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/FedAppReportFetcher.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.webproxy;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.util.StringHelper;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+public class FedAppReportFetcher extends AppReportFetcher {
+
+ private final Map
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.webproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.util.StringHelper;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.fail;
+
+public class TestFedAppReportFetcher {
+
+ private Configuration conf;
+ private static ApplicationHistoryProtocol history;
+
+ private SubClusterId subClusterId1 = SubClusterId.newInstance("subCluster1");
+ private SubClusterId subClusterId2 = SubClusterId.newInstance("subCluster2");
+ private SubClusterInfo clusterInfo1 = SubClusterInfo.newInstance(subClusterId1, "10.0.0.1:1000",
+ "10.0.0.1:1000", "10.0.0.1:1000", "10.0.0.1:1000", SubClusterState.SC_RUNNING, 0L, "");
+ private SubClusterInfo clusterInfo2 = SubClusterInfo.newInstance(subClusterId2, "10.0.0.2:1000",
+ "10.0.0.2:1000", "10.0.0.2:1000", "10.0.0.2:1000", SubClusterState.SC_RUNNING, 0L, "");
+ private ApplicationClientProtocol appManager1;
+ private ApplicationClientProtocol appManager2;
+ private ApplicationId appId1 = ApplicationId.newInstance(0, 1);
+ private ApplicationId appId2 = ApplicationId.newInstance(0, 2);
+
+ private static FedAppReportFetcher fetcher;
+ private final String appNotFoundExceptionMsg = "APP NOT FOUND";
+
+ @After
+ public void cleanUp() {
+ history = null;
+ fetcher = null;
+ }
+
+ private void testHelper(boolean isAHSEnabled)
+ throws YarnException, IOException {
+ conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, isAHSEnabled);
+
+ FederationStateStoreFacade fedFacade = FederationStateStoreFacade.getInstance();
+ FederationStateStore fss = new MemoryFederationStateStore();
+ fss.init(conf);
+ fedFacade.reinitialize(fss, conf);
+
+ fss.registerSubCluster(SubClusterRegisterRequest.newInstance(clusterInfo1));
+ fss.registerSubCluster(SubClusterRegisterRequest.newInstance(clusterInfo2));
+ fss.addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest
+ .newInstance(ApplicationHomeSubCluster.newInstance(appId1, subClusterId1)));
+ fss.addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest
+ .newInstance(ApplicationHomeSubCluster.newInstance(appId2, subClusterId2)));
+
+ appManager1 = Mockito.mock(ApplicationClientProtocol.class);
+ Mockito.when(appManager1.getApplicationReport(Mockito.any(GetApplicationReportRequest.class)))
+ .thenThrow(new ApplicationNotFoundException(appNotFoundExceptionMsg));
+
+ appManager2 = Mockito.mock(ApplicationClientProtocol.class);
+ Mockito.when(appManager2.getApplicationReport(Mockito.any(GetApplicationReportRequest.class)))
+ .thenThrow(new ApplicationNotFoundException(appNotFoundExceptionMsg));
+
+ fetcher = new TestFedAppReportFetcher.FedAppReportFetcherForTest(conf);
+ fetcher.registerSubCluster(clusterInfo1, appManager1);
+ fetcher.registerSubCluster(clusterInfo2, appManager2);
+ }
+
+ @Test
+ public void testFetchReportAHSEnabled() throws YarnException, IOException {
+ testHelper(true);
+ fetcher.getApplicationReport(appId1);
+ fetcher.getApplicationReport(appId2);
+ Mockito.verify(history, Mockito.times(2))
+ .getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
+ Mockito.verify(appManager1, Mockito.times(1))
+ .getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
+ Mockito.verify(appManager2, Mockito.times(1))
+ .getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
+ }
+
+ @Test
+ public void testFetchReportAHSDisabled() throws Exception {
+ testHelper(false);
+
+ /* RM will not know of the app and Application History Service is disabled
+ * So we will not try to get the report from AHS and RM will throw
+ * ApplicationNotFoundException
+ */
+ LambdaTestUtils.intercept(ApplicationNotFoundException.class, appNotFoundExceptionMsg,
+ () -> fetcher.getApplicationReport(appId1));
+ LambdaTestUtils.intercept(ApplicationNotFoundException.class, appNotFoundExceptionMsg,
+ () -> fetcher.getApplicationReport(appId2));
+
+ Mockito.verify(appManager1, Mockito.times(1))
+ .getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
+ Mockito.verify(appManager2, Mockito.times(1))
+ .getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
+ Assert.assertNull("HistoryManager should be null as AHS is disabled", history);
+ }
+
+ @Test
+ public void testGetRmAppPageUrlBase() throws IOException, YarnException {
+ testHelper(true);
+ String scheme = WebAppUtils.getHttpSchemePrefix(conf);
+ Assert.assertEquals(fetcher.getRmAppPageUrlBase(appId1),
+ StringHelper.pjoin(scheme + clusterInfo1.getRMWebServiceAddress(), "cluster", "app"));
+ Assert.assertEquals(fetcher.getRmAppPageUrlBase(appId2),
+ StringHelper.pjoin(scheme + clusterInfo2.getRMWebServiceAddress(), "cluster", "app"));
+ }
+
+ static class FedAppReportFetcherForTest extends FedAppReportFetcher {
+
+ FedAppReportFetcherForTest(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ protected ApplicationHistoryProtocol getAHSProxy(Configuration conf)
+ throws IOException {
+ GetApplicationReportResponse resp = Mockito.mock(GetApplicationReportResponse.class);
+ history = Mockito.mock(ApplicationHistoryProtocol.class);
+ try {
+ Mockito.when(history.getApplicationReport(Mockito.any(GetApplicationReportRequest.class)))
+ .thenReturn(resp);
+ } catch (YarnException e) {
+ // This should never happen
+ fail("Found exception when getApplicationReport!");
+ }
+ return history;
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServlet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServlet.java
index 39699ae7d9..1d0ca00e7e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServlet.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServlet.java
@@ -54,6 +54,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.http.HttpServer2;
@@ -67,6 +68,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.StringHelper;
import org.apache.hadoop.yarn.webapp.MimeType;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@@ -110,8 +112,7 @@ public static void start() throws Exception {
((ServerConnector)server.getConnectors()[0]).setHost("localhost");
server.start();
originalPort = ((ServerConnector)server.getConnectors()[0]).getLocalPort();
- LOG.info("Running embedded servlet container at: http://localhost:"
- + originalPort);
+ LOG.info("Running embedded servlet container at: http://localhost:{}", originalPort);
// This property needs to be set otherwise CORS Headers will be dropped
// by HttpUrlConnection
System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
@@ -364,11 +365,13 @@ void testAppReportForEmptyTrackingUrl() throws Exception {
String appAddressInRm =
WebAppUtils.getResolvedRMWebAppURLWithScheme(configuration) +
"/cluster" + "/app/" + app.toString();
- assertEquals(proxyConn.getURL().toString(), appAddressInRm);
+ assertEquals(proxyConn.getURL().toString(), appAddressInRm,
+ "Webapp proxy servlet should have redirected to RM");
//set AHS_ENABLED = true to simulate getting the app report from AHS
configuration.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
true);
+ proxy.proxy.appReportFetcher.setAhsAppPageUrlBase(configuration);
proxyConn = (HttpURLConnection) url.openConnection();
proxyConn.connect();
try {
@@ -381,7 +384,8 @@ void testAppReportForEmptyTrackingUrl() throws Exception {
String appAddressInAhs =
WebAppUtils.getHttpSchemePrefix(configuration) + WebAppUtils.getAHSWebAppURLWithoutScheme(
configuration) + "/applicationhistory" + "/app/" + app.toString();
- assertEquals(proxyConn.getURL().toString(), appAddressInAhs);
+ assertEquals(proxyConn.getURL().toString(), appAddressInAhs,
+ "Webapp proxy servlet should have redirected to AHS");
} finally {
proxy.close();
}
@@ -607,8 +611,9 @@ protected void serviceStart() throws Exception {
}
- private class AppReportFetcherForTest extends AppReportFetcher {
+ private class AppReportFetcherForTest extends DefaultAppReportFetcher {
int answer = 0;
+ private String ahsAppPageUrlBase = null;
public AppReportFetcherForTest(Configuration conf) {
super(conf);
@@ -679,5 +684,17 @@ private FetchedAppReport getDefaultApplicationReport(ApplicationId appId,
private FetchedAppReport getDefaultApplicationReport(ApplicationId appId) {
return getDefaultApplicationReport(appId, true);
}
+
+ @VisibleForTesting
+ public String getAhsAppPageUrlBase() {
+ return ahsAppPageUrlBase != null ? ahsAppPageUrlBase : super.getAhsAppPageUrlBase();
+ }
+
+ @VisibleForTesting
+ public void setAhsAppPageUrlBase(Configuration conf) {
+ this.ahsAppPageUrlBase = StringHelper.pjoin(
+ WebAppUtils.getHttpSchemePrefix(conf) + WebAppUtils.getAHSWebAppURLWithoutScheme(conf),
+ "applicationhistory", "app");
+ }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServletFed.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServletFed.java
new file mode 100644
index 0000000000..8931bc577a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServletFed.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.webproxy;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URL;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the WebAppProxyServlet and WebAppProxy. For back end use simple web server.
+ */
+public class TestWebAppProxyServletFed {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestWebAppProxyServletFed.class);
+
+ public static final String AM_PREFIX = "AM";
+ public static final String RM_PREFIX = "RM";
+ public static final String AHS_PREFIX = "AHS";
+
+ /*
+ * Mocked Server is used for simulating the web of AppMaster, ResourceMamanger or TimelineServer.
+ * */
+ private static Server mockServer;
+ private static int mockServerPort = 0;
+
+ /**
+ * Simple http server. Server should send answer with status 200
+ */
+ @BeforeClass
+ public static void setUp() throws Exception {
+ mockServer = new Server(0);
+ ((QueuedThreadPool) mockServer.getThreadPool()).setMaxThreads(20);
+ ServletContextHandler context = new ServletContextHandler();
+ context.setContextPath("/");
+ context.addServlet(new ServletHolder(new MockWebServlet(AM_PREFIX)), "/amweb/*");
+ context.addServlet(new ServletHolder(new MockWebServlet(RM_PREFIX)), "/cluster/app/*");
+ context.addServlet(new ServletHolder(new MockWebServlet(AHS_PREFIX)),
+ "/applicationhistory/app/*");
+ mockServer.setHandler(context);
+
+ ((ServerConnector) mockServer.getConnectors()[0]).setHost("localhost");
+ mockServer.start();
+ mockServerPort = ((ServerConnector) mockServer.getConnectors()[0]).getLocalPort();
+ LOG.info("Running embedded servlet container at: http://localhost:" + mockServerPort);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (mockServer != null) {
+ mockServer.stop();
+ mockServer.destroy();
+ mockServer = null;
+ }
+ }
+
+ @Test
+ public void testWebServlet() throws IOException {
+ HttpURLConnection conn;
+ // 1. Mocked AppMaster web Test
+ URL url = new URL("http", "localhost", mockServerPort, "/amweb/apptest");
+ conn = (HttpURLConnection) url.openConnection();
+ conn.connect();
+ assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+ assertEquals(AM_PREFIX + "/apptest", readResponse(conn));
+ conn.disconnect();
+
+ // 2. Mocked RM web Test
+ url = new URL("http", "localhost", mockServerPort, "/cluster/app/apptest");
+ conn = (HttpURLConnection) url.openConnection();
+ conn.connect();
+ assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+ assertEquals(RM_PREFIX + "/apptest", readResponse(conn));
+ conn.disconnect();
+
+ // 3. Mocked AHS web Test
+ url = new URL("http", "localhost", mockServerPort, "/applicationhistory/app/apptest");
+ conn = (HttpURLConnection) url.openConnection();
+ conn.connect();
+ assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+ assertEquals(AHS_PREFIX + "/apptest", readResponse(conn));
+ conn.disconnect();
+ }
+
+ @Test(timeout=5000)
+ public void testWebAppProxyServletFed() throws Exception {
+
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.PROXY_ADDRESS, "localhost:9090");
+ conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true);
+ conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "localhost:" + mockServerPort);
+ // overriding num of web server threads, see HttpServer.HTTP_MAXTHREADS
+ conf.setInt("hadoop.http.max.threads", 10);
+
+ // Create sub cluster information.
+ SubClusterId subClusterId1 = SubClusterId.newInstance("scid1");
+ SubClusterId subClusterId2 = SubClusterId.newInstance("scid2");
+ SubClusterInfo subClusterInfo1 = SubClusterInfo.newInstance(subClusterId1, "10.0.0.1:1",
+ "10.0.0.1:1", "10.0.0.1:1", "localhost:" + mockServerPort, SubClusterState.SC_RUNNING, 0,
+ "");
+ SubClusterInfo subClusterInfo2 = SubClusterInfo.newInstance(subClusterId2, "10.0.0.2:1",
+ "10.0.0.2:1", "10.0.0.2:1", "10.0.0.2:1", SubClusterState.SC_RUNNING, 0, "");
+
+ // App1 and App2 is running applications.
+ ApplicationId appId1 = ApplicationId.newInstance(0, 1);
+ ApplicationId appId2 = ApplicationId.newInstance(0, 2);
+ String appUrl1 = "http://localhost:" + mockServerPort + "/amweb/" + appId1;
+ String appUrl2 = "http://localhost:" + mockServerPort + "/amweb/" + appId2;
+ // App3 is accepted application, has not registered original url to am.
+ ApplicationId appId3 = ApplicationId.newInstance(0, 3);
+ // App4 is finished application, has remove from rm, but not remove from timeline server.
+ ApplicationId appId4 = ApplicationId.newInstance(0, 4);
+
+ // Mock for application
+ ApplicationClientProtocol appManager1 = Mockito.mock(ApplicationClientProtocol.class);
+ Mockito.when(appManager1.getApplicationReport(GetApplicationReportRequest.newInstance(appId1)))
+ .thenReturn(GetApplicationReportResponse
+ .newInstance(newApplicationReport(appId1, YarnApplicationState.RUNNING, appUrl1)));
+ Mockito.when(appManager1.getApplicationReport(GetApplicationReportRequest.newInstance(appId3)))
+ .thenReturn(GetApplicationReportResponse
+ .newInstance(newApplicationReport(appId3, YarnApplicationState.ACCEPTED, null)));
+
+ ApplicationClientProtocol appManager2 = Mockito.mock(ApplicationClientProtocol.class);
+ Mockito.when(appManager2.getApplicationReport(GetApplicationReportRequest.newInstance(appId2)))
+ .thenReturn(GetApplicationReportResponse
+ .newInstance(newApplicationReport(appId2, YarnApplicationState.RUNNING, appUrl2)));
+ Mockito.when(appManager2.getApplicationReport(GetApplicationReportRequest.newInstance(appId4)))
+ .thenThrow(new ApplicationNotFoundException("APP NOT FOUND"));
+
+ ApplicationHistoryProtocol historyManager = Mockito.mock(ApplicationHistoryProtocol.class);
+ Mockito
+ .when(historyManager.getApplicationReport(GetApplicationReportRequest.newInstance(appId4)))
+ .thenReturn(GetApplicationReportResponse
+ .newInstance(newApplicationReport(appId4, YarnApplicationState.FINISHED, null)));
+
+ // Initial federation store.
+ FederationStateStoreFacade facade = FederationStateStoreFacade.getInstance();
+ facade.getStateStore()
+ .registerSubCluster(SubClusterRegisterRequest.newInstance(subClusterInfo1));
+ facade.getStateStore()
+ .registerSubCluster(SubClusterRegisterRequest.newInstance(subClusterInfo2));
+ facade.addApplicationHomeSubCluster(
+ ApplicationHomeSubCluster.newInstance(appId1, subClusterId1));
+ facade.addApplicationHomeSubCluster(
+ ApplicationHomeSubCluster.newInstance(appId2, subClusterId2));
+ facade.addApplicationHomeSubCluster(
+ ApplicationHomeSubCluster.newInstance(appId3, subClusterId1));
+ facade.addApplicationHomeSubCluster(
+ ApplicationHomeSubCluster.newInstance(appId4, subClusterId2));
+
+ // Start proxy server
+ WebAppProxyServerForTest proxy = new WebAppProxyServerForTest();
+ proxy.init(conf);
+ proxy.start();
+
+ try {
+ // set Mocked rm and timeline
+ int proxyPort = proxy.proxy.proxyServer.getConnectorAddress(0).getPort();
+ FedAppReportFetcher appReportFetcher = proxy.proxy.appReportFetcher;
+ appReportFetcher.registerSubCluster(subClusterInfo1, appManager1);
+ appReportFetcher.registerSubCluster(subClusterInfo2, appManager2);
+ appReportFetcher.setHistoryManager(historyManager);
+
+ // App1 is running in subcluster1, and original url is registered
+ // in rm of subCluster1. So proxy server will get original url from rm by
+ // getApplicationReport. Then proxy server will fetch the webapp directly.
+ URL url = new URL("http", "localhost", proxyPort, "/proxy/" + appId1.toString());
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.connect();
+ assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+ assertEquals(AM_PREFIX + "/" + appId1.toString(), readResponse(conn));
+ conn.disconnect();
+
+ // App2 is running in subcluster2, and original url is registered
+ // in rm of subCluster2. So proxy server will get original url from rm by
+ // getApplicationReport. Then proxy server will fetch the webapp directly.
+ url = new URL("http", "localhost", proxyPort, "/proxy/" + appId2.toString());
+ conn = (HttpURLConnection) url.openConnection();
+ conn.connect();
+ assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+ assertEquals(AM_PREFIX + "/" + appId2.toString(), readResponse(conn));
+ conn.disconnect();
+
+ // App3 is accepted in subcluster1, and original url is not registered
+ // yet. So proxy server will fetch the application web from rm.
+ url = new URL("http", "localhost", proxyPort, "/proxy/" + appId3.toString());
+ conn = (HttpURLConnection) url.openConnection();
+ conn.connect();
+ assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+ assertEquals(RM_PREFIX + "/" + appId3.toString(), readResponse(conn));
+ conn.disconnect();
+
+ // App4 is finished in subcluster2, and have removed from rm, but not
+ // removed from timeline server. So proxy server will fetch the
+ // application web from timeline server.
+ url = new URL("http", "localhost", proxyPort, "/proxy/" + appId4.toString());
+ conn = (HttpURLConnection) url.openConnection();
+ conn.connect();
+ assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+ assertEquals(AHS_PREFIX + "/" + appId4.toString(), readResponse(conn));
+ conn.disconnect();
+ } finally {
+ proxy.close();
+ }
+ }
+
+ private ApplicationReport newApplicationReport(ApplicationId appId,
+ YarnApplicationState state, String origTrackingUrl) {
+ return ApplicationReport.newInstance(appId, null, "testuser", null, null, null, 0, null, state,
+ null, null, 0, 0, 0, null, null, origTrackingUrl, 0f, null, null);
+ }
+
+ private String readResponse(HttpURLConnection conn) throws IOException {
+ InputStream input = conn.getInputStream();
+ byte[] bytes = new byte[input.available()];
+ input.read(bytes);
+ return new String(bytes);
+ }
+
+ private class WebAppProxyServerForTest extends CompositeService {
+
+ private WebAppProxyForTest proxy = null;
+
+ WebAppProxyServerForTest() {
+ super(WebAppProxyServer.class.getName());
+ }
+
+ @Override
+ protected synchronized void serviceInit(Configuration conf) throws Exception {
+ proxy = new WebAppProxyForTest();
+ addService(proxy);
+ super.serviceInit(conf);
+ }
+ }
+
+ /*
+ * This servlet is used for simulate the web of AppMaster, ResourceManager,
+ * TimelineServer and so on.
+ * */
+ public static class MockWebServlet extends HttpServlet {
+
+ private String role;
+
+ public MockWebServlet(String role) {
+ this.role = role;
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws IOException {
+ if (req.getPathInfo() != null) {
+ resp.getWriter().write(role + req.getPathInfo());
+ }
+ resp.setStatus(HttpServletResponse.SC_OK);
+ }
+
+ @Override
+ protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+ throws IOException {
+ InputStream is = req.getInputStream();
+ OutputStream os = resp.getOutputStream();
+ int c = is.read();
+ while (c > -1) {
+ os.write(c);
+ c = is.read();
+ }
+ is.close();
+ os.close();
+ resp.setStatus(HttpServletResponse.SC_OK);
+ }
+ }
+
+ private class WebAppProxyForTest extends WebAppProxy {
+
+ private HttpServer2 proxyServer;
+ private FedAppReportFetcher appReportFetcher;
+
+ @Override
+ protected void serviceStart() throws Exception {
+ Configuration conf = getConfig();
+ String bindAddress = conf.get(YarnConfiguration.PROXY_ADDRESS);
+ bindAddress = StringUtils.split(bindAddress, ':')[0];
+ AccessControlList acl = new AccessControlList(conf.get(YarnConfiguration.YARN_ADMIN_ACL,
+ YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
+ proxyServer = new HttpServer2.Builder()
+ .setName("proxy")
+ .addEndpoint(URI.create(WebAppUtils.getHttpSchemePrefix(conf) + bindAddress + ":0"))
+ .setFindPort(true)
+ .setConf(conf)
+ .setACL(acl)
+ .build();
+ proxyServer.addServlet(ProxyUriUtils.PROXY_SERVLET_NAME, ProxyUriUtils.PROXY_PATH_SPEC,
+ WebAppProxyServlet.class);
+
+ appReportFetcher = new FedAppReportFetcher(conf);
+ proxyServer.setAttribute(FETCHER_ATTRIBUTE, appReportFetcher);
+ proxyServer.setAttribute(IS_SECURITY_ENABLED_ATTRIBUTE, Boolean.FALSE);
+
+ String proxy = WebAppUtils.getProxyHostAndPort(conf);
+ String[] proxyParts = proxy.split(":");
+ String proxyHost = proxyParts[0];
+
+ proxyServer.setAttribute(PROXY_HOST_ATTRIBUTE, proxyHost);
+ proxyServer.start();
+ LOG.info("Proxy server is started at port {}", proxyServer.getConnectorAddress(0).getPort());
+ }
+ }
+}