YARN-11153. Make proxy server support YARN federation. (#4314)
This commit is contained in:
parent
58314cbbf6
commit
59f3a16819
@ -24,6 +24,7 @@
|
||||
import com.sun.jersey.spi.container.servlet.ServletContainer;
|
||||
|
||||
import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics;
|
||||
import org.apache.hadoop.yarn.server.webproxy.DefaultAppReportFetcher;
|
||||
import org.apache.hadoop.yarn.webapp.WebAppException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -1426,9 +1427,9 @@ protected void startWepApp() {
|
||||
if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf).
|
||||
equals(proxyHostAndPort)) {
|
||||
if (HAUtil.isHAEnabled(conf)) {
|
||||
fetcher = new AppReportFetcher(conf);
|
||||
fetcher = new DefaultAppReportFetcher(conf);
|
||||
} else {
|
||||
fetcher = new AppReportFetcher(conf, getClientRMService());
|
||||
fetcher = new DefaultAppReportFetcher(conf, getClientRMService());
|
||||
}
|
||||
builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
|
||||
ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);
|
||||
|
@ -19,6 +19,9 @@
|
||||
package org.apache.hadoop.yarn.server.webproxy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
@ -27,41 +30,45 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.client.AHSProxy;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.util.StringHelper;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
|
||||
/**
|
||||
* This class abstracts away how ApplicationReports are fetched.
|
||||
*/
|
||||
public class AppReportFetcher {
|
||||
enum AppReportSource { RM, AHS }
|
||||
public abstract class AppReportFetcher {
|
||||
|
||||
protected enum AppReportSource {RM, AHS}
|
||||
|
||||
private final Configuration conf;
|
||||
private final ApplicationClientProtocol applicationsManager;
|
||||
private final ApplicationHistoryProtocol historyManager;
|
||||
private ApplicationHistoryProtocol historyManager;
|
||||
private String ahsAppPageUrlBase;
|
||||
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||
private boolean isAHSEnabled;
|
||||
|
||||
/**
|
||||
* Create a new Connection to the RM/Application History Server
|
||||
* to fetch Application reports.
|
||||
* Create a new Connection to the RM/Application History Server to fetch Application reports.
|
||||
*
|
||||
* @param conf the conf to use to know where the RM is.
|
||||
*/
|
||||
public AppReportFetcher(Configuration conf) {
|
||||
this.conf = conf;
|
||||
if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
|
||||
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
|
||||
isAHSEnabled = true;
|
||||
this.isAHSEnabled = true;
|
||||
String scheme = WebAppUtils.getHttpSchemePrefix(conf);
|
||||
String historyUrl = WebAppUtils.getAHSWebAppURLWithoutScheme(conf);
|
||||
this.ahsAppPageUrlBase = StringHelper.pjoin(scheme + historyUrl, "applicationhistory", "app");
|
||||
}
|
||||
this.conf = conf;
|
||||
try {
|
||||
applicationsManager = ClientRMProxy.createRMProxy(conf,
|
||||
ApplicationClientProtocol.class);
|
||||
if (isAHSEnabled) {
|
||||
historyManager = getAHSProxy(conf);
|
||||
if (this.isAHSEnabled) {
|
||||
this.historyManager = getAHSProxy(conf);
|
||||
} else {
|
||||
this.historyManager = null;
|
||||
}
|
||||
@ -69,39 +76,13 @@ public AppReportFetcher(Configuration conf) {
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a direct connection to RM instead of a remote connection when
|
||||
* the proxy is running as part of the RM. Also create a remote connection to
|
||||
* Application History Server if it is enabled.
|
||||
* @param conf the configuration to use
|
||||
* @param applicationsManager what to use to get the RM reports.
|
||||
*/
|
||||
public AppReportFetcher(Configuration conf, ApplicationClientProtocol applicationsManager) {
|
||||
if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
|
||||
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
|
||||
isAHSEnabled = true;
|
||||
}
|
||||
this.conf = conf;
|
||||
this.applicationsManager = applicationsManager;
|
||||
if (isAHSEnabled) {
|
||||
try {
|
||||
historyManager = getAHSProxy(conf);
|
||||
} catch (IOException e) {
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
} else {
|
||||
this.historyManager = null;
|
||||
}
|
||||
}
|
||||
|
||||
protected ApplicationHistoryProtocol getAHSProxy(Configuration configuration)
|
||||
throws IOException {
|
||||
return AHSProxy.createAHSProxy(configuration,
|
||||
ApplicationHistoryProtocol.class,
|
||||
configuration.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
|
||||
InetSocketAddress addr = configuration.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT));
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT);
|
||||
return AHSProxy.createAHSProxy(configuration, ApplicationHistoryProtocol.class, addr);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -112,17 +93,29 @@ protected ApplicationHistoryProtocol getAHSProxy(Configuration configuration)
|
||||
* @throws YarnException on any error.
|
||||
* @throws IOException
|
||||
*/
|
||||
public FetchedAppReport getApplicationReport(ApplicationId appId)
|
||||
throws YarnException, IOException {
|
||||
GetApplicationReportRequest request = recordFactory
|
||||
.newRecordInstance(GetApplicationReportRequest.class);
|
||||
public abstract FetchedAppReport getApplicationReport(ApplicationId appId)
|
||||
throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* Get an application report for the specified application id from the RM and
|
||||
* fall back to the Application History Server if not found in RM.
|
||||
*
|
||||
* @param applicationsManager what to use to get the RM reports.
|
||||
* @param appId id of the application to get.
|
||||
* @return the ApplicationReport for the appId.
|
||||
* @throws YarnException on any error.
|
||||
* @throws IOException connection exception.
|
||||
*/
|
||||
protected FetchedAppReport getApplicationReport(ApplicationClientProtocol applicationsManager,
|
||||
ApplicationId appId) throws YarnException, IOException {
|
||||
GetApplicationReportRequest request =
|
||||
this.recordFactory.newRecordInstance(GetApplicationReportRequest.class);
|
||||
request.setApplicationId(appId);
|
||||
|
||||
ApplicationReport appReport;
|
||||
FetchedAppReport fetchedAppReport;
|
||||
try {
|
||||
appReport = applicationsManager.
|
||||
getApplicationReport(request).getApplicationReport();
|
||||
appReport = applicationsManager.getApplicationReport(request).getApplicationReport();
|
||||
fetchedAppReport = new FetchedAppReport(appReport, AppReportSource.RM);
|
||||
} catch (ApplicationNotFoundException e) {
|
||||
if (!isAHSEnabled) {
|
||||
@ -130,33 +123,43 @@ public FetchedAppReport getApplicationReport(ApplicationId appId)
|
||||
throw e;
|
||||
}
|
||||
//Fetch the application report from AHS
|
||||
appReport = historyManager.
|
||||
getApplicationReport(request).getApplicationReport();
|
||||
appReport = historyManager.getApplicationReport(request).getApplicationReport();
|
||||
fetchedAppReport = new FetchedAppReport(appReport, AppReportSource.AHS);
|
||||
}
|
||||
return fetchedAppReport;
|
||||
}
|
||||
|
||||
public abstract String getRmAppPageUrlBase(ApplicationId appId) throws IOException, YarnException;
|
||||
|
||||
public String getAhsAppPageUrlBase() {
|
||||
return this.ahsAppPageUrlBase;
|
||||
}
|
||||
|
||||
protected Configuration getConf() {
|
||||
return this.conf;
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
if (this.applicationsManager != null) {
|
||||
RPC.stopProxy(this.applicationsManager);
|
||||
}
|
||||
if (this.historyManager != null) {
|
||||
RPC.stopProxy(this.historyManager);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setHistoryManager(ApplicationHistoryProtocol historyManager) {
|
||||
this.historyManager = historyManager;
|
||||
}
|
||||
|
||||
/*
|
||||
* This class creates a bundle of the application report and the source from
|
||||
* where the report was fetched. This allows the WebAppProxyServlet
|
||||
* to make decisions for the application report based on the source.
|
||||
*/
|
||||
static class FetchedAppReport {
|
||||
protected static class FetchedAppReport {
|
||||
private ApplicationReport appReport;
|
||||
private AppReportSource appReportSource;
|
||||
|
||||
public FetchedAppReport(ApplicationReport appReport,
|
||||
AppReportSource appReportSource) {
|
||||
public FetchedAppReport(ApplicationReport appReport, AppReportSource appReportSource) {
|
||||
this.appReport = appReport;
|
||||
this.appReportSource = appReportSource;
|
||||
}
|
||||
|
@ -0,0 +1,95 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.webproxy;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.util.StringHelper;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
|
||||
public class DefaultAppReportFetcher extends AppReportFetcher {
|
||||
|
||||
private final ApplicationClientProtocol applicationsManager;
|
||||
private String rmAppPageUrlBase;
|
||||
|
||||
/**
|
||||
* Create a new Connection to the RM/Application History Server
|
||||
* to fetch Application reports.
|
||||
*
|
||||
* @param conf the conf to use to know where the RM is.
|
||||
*/
|
||||
public DefaultAppReportFetcher(Configuration conf) {
|
||||
super(conf);
|
||||
this.rmAppPageUrlBase =
|
||||
StringHelper.pjoin(WebAppUtils.getResolvedRMWebAppURLWithScheme(conf), "cluster", "app");
|
||||
try {
|
||||
this.applicationsManager = ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class);
|
||||
} catch (IOException e) {
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a direct connection to RM instead of a remote connection when
|
||||
* the proxy is running as part of the RM. Also create a remote connection to
|
||||
* Application History Server if it is enabled.
|
||||
*
|
||||
* @param conf the configuration to use
|
||||
* @param applicationsManager what to use to get the RM reports.
|
||||
*/
|
||||
public DefaultAppReportFetcher(Configuration conf,
|
||||
ApplicationClientProtocol applicationsManager) {
|
||||
super(conf);
|
||||
this.rmAppPageUrlBase =
|
||||
StringHelper.pjoin(WebAppUtils.getResolvedRMWebAppURLWithScheme(conf), "cluster", "app");
|
||||
this.applicationsManager = applicationsManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an application report for the specified application id from the RM and
|
||||
* fall back to the Application History Server if not found in RM.
|
||||
*
|
||||
* @param appId id of the application to get.
|
||||
* @return the ApplicationReport for the appId.
|
||||
* @throws YarnException on any error.
|
||||
* @throws IOException connection exception.
|
||||
*/
|
||||
@Override
|
||||
public FetchedAppReport getApplicationReport(ApplicationId appId)
|
||||
throws YarnException, IOException {
|
||||
return super.getApplicationReport(applicationsManager, appId);
|
||||
}
|
||||
|
||||
public String getRmAppPageUrlBase(ApplicationId appId) throws YarnException, IOException {
|
||||
return this.rmAppPageUrlBase;
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
super.stop();
|
||||
if (this.applicationsManager != null) {
|
||||
RPC.stopProxy(this.applicationsManager);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,110 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.webproxy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.util.StringHelper;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
|
||||
public class FedAppReportFetcher extends AppReportFetcher {
|
||||
|
||||
private final Map<SubClusterId, Pair<SubClusterInfo, ApplicationClientProtocol>> subClusters;
|
||||
private FederationStateStoreFacade federationFacade;
|
||||
|
||||
/**
|
||||
* Create a new Connection to the RM/Application History Server to fetch
|
||||
* Application reports.
|
||||
*
|
||||
* @param conf the conf to use to know where the RM is.
|
||||
*/
|
||||
public FedAppReportFetcher(Configuration conf) {
|
||||
super(conf);
|
||||
subClusters = new ConcurrentHashMap<>();
|
||||
federationFacade = FederationStateStoreFacade.getInstance();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an application report for the specified application id from the RM and
|
||||
* fall back to the Application History Server if not found in RM.
|
||||
*
|
||||
* @param appId id of the application to get.
|
||||
* @return the ApplicationReport for the appId.
|
||||
* @throws YarnException on any error.
|
||||
* @throws IOException connection exception.
|
||||
*/
|
||||
@Override
|
||||
public FetchedAppReport getApplicationReport(ApplicationId appId)
|
||||
throws YarnException, IOException {
|
||||
SubClusterId scid = federationFacade.getApplicationHomeSubCluster(appId);
|
||||
createSubclusterIfAbsent(scid);
|
||||
ApplicationClientProtocol applicationsManager = subClusters.get(scid).getRight();
|
||||
|
||||
return super.getApplicationReport(applicationsManager, appId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRmAppPageUrlBase(ApplicationId appId)
|
||||
throws IOException, YarnException {
|
||||
SubClusterId scid = federationFacade.getApplicationHomeSubCluster(appId);
|
||||
createSubclusterIfAbsent(scid);
|
||||
|
||||
SubClusterInfo subClusterInfo = subClusters.get(scid).getLeft();
|
||||
String scheme = WebAppUtils.getHttpSchemePrefix(getConf());
|
||||
return StringHelper.pjoin(scheme + subClusterInfo.getRMWebServiceAddress(), "cluster", "app");
|
||||
}
|
||||
|
||||
private void createSubclusterIfAbsent(SubClusterId scId) throws YarnException, IOException {
|
||||
if (subClusters.containsKey(scId)) {
|
||||
return;
|
||||
}
|
||||
SubClusterInfo subClusterInfo = federationFacade.getSubCluster(scId);
|
||||
Configuration subClusterConf = new Configuration(getConf());
|
||||
FederationProxyProviderUtil
|
||||
.updateConfForFederation(subClusterConf, subClusterInfo.getSubClusterId().toString());
|
||||
ApplicationClientProtocol proxy =
|
||||
ClientRMProxy.createRMProxy(subClusterConf, ApplicationClientProtocol.class);
|
||||
subClusters.put(scId, Pair.of(subClusterInfo, proxy));
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
super.stop();
|
||||
for (Pair pair : this.subClusters.values()) {
|
||||
RPC.stopProxy(pair.getRight());
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void registerSubCluster(SubClusterInfo info, ApplicationClientProtocol proxy) {
|
||||
subClusters.put(info.getSubClusterId(), Pair.of(info, proxy));
|
||||
}
|
||||
}
|
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
@ -71,7 +72,11 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
String[] proxyParts = proxy.split(":");
|
||||
proxyHost = proxyParts[0];
|
||||
|
||||
fetcher = new AppReportFetcher(conf);
|
||||
if (HAUtil.isFederationEnabled(conf)) {
|
||||
fetcher = new FedAppReportFetcher(conf);
|
||||
} else {
|
||||
fetcher = new DefaultAppReportFetcher(conf);
|
||||
}
|
||||
bindAddress = conf.get(YarnConfiguration.PROXY_ADDRESS);
|
||||
if(bindAddress == null || bindAddress.isEmpty()) {
|
||||
throw new YarnRuntimeException(YarnConfiguration.PROXY_ADDRESS +
|
||||
@ -157,4 +162,9 @@ public void join() {
|
||||
String getBindAddress() {
|
||||
return bindAddress + ":" + port;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public AppReportFetcher getFetcher() {
|
||||
return fetcher;
|
||||
}
|
||||
}
|
||||
|
@ -37,6 +37,7 @@
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.servlet.ServletContext;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.Cookie;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
@ -95,8 +96,6 @@ public class WebAppProxyServlet extends HttpServlet {
|
||||
public static final String PROXY_USER_COOKIE_NAME = "proxy-user";
|
||||
|
||||
private transient List<TrackingUriPlugin> trackingUriPlugins;
|
||||
private final String rmAppPageUrlBase;
|
||||
private final String ahsAppPageUrlBase;
|
||||
private final String failurePageUrlBase;
|
||||
private transient YarnConfiguration conf;
|
||||
|
||||
@ -134,16 +133,21 @@ public WebAppProxyServlet() {
|
||||
this.trackingUriPlugins =
|
||||
conf.getInstances(YarnConfiguration.YARN_TRACKING_URL_GENERATOR,
|
||||
TrackingUriPlugin.class);
|
||||
this.rmAppPageUrlBase =
|
||||
StringHelper.pjoin(WebAppUtils.getResolvedRMWebAppURLWithScheme(conf),
|
||||
"cluster", "app");
|
||||
this.failurePageUrlBase =
|
||||
StringHelper.pjoin(WebAppUtils.getResolvedRMWebAppURLWithScheme(conf),
|
||||
"cluster", "failure");
|
||||
this.ahsAppPageUrlBase =
|
||||
StringHelper.pjoin(WebAppUtils.getHttpSchemePrefix(conf)
|
||||
+ WebAppUtils.getAHSWebAppURLWithoutScheme(conf),
|
||||
"applicationhistory", "app");
|
||||
}
|
||||
|
||||
private String getRmAppPageUrlBase(ApplicationId id) throws YarnException, IOException {
|
||||
ServletContext context = getServletContext();
|
||||
AppReportFetcher af = (AppReportFetcher) context.getAttribute(WebAppProxy.FETCHER_ATTRIBUTE);
|
||||
return af.getRmAppPageUrlBase(id);
|
||||
}
|
||||
|
||||
private String getAhsAppPageUrlBase() {
|
||||
ServletContext context = getServletContext();
|
||||
AppReportFetcher af = (AppReportFetcher) context.getAttribute(WebAppProxy.FETCHER_ATTRIBUTE);
|
||||
return af.getAhsAppPageUrlBase();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -578,7 +582,7 @@ private URI buildTrackingUrl(URI trackingUri, final HttpServletRequest req,
|
||||
*/
|
||||
private URI getTrackingUri(HttpServletRequest req, HttpServletResponse resp,
|
||||
ApplicationId id, String originalUri, AppReportSource appReportSource)
|
||||
throws IOException, URISyntaxException {
|
||||
throws IOException, URISyntaxException, YarnException {
|
||||
URI trackingUri = null;
|
||||
|
||||
if ((originalUri == null) ||
|
||||
@ -589,15 +593,15 @@ private URI getTrackingUri(HttpServletRequest req, HttpServletResponse resp,
|
||||
// and Application Report was fetched from RM
|
||||
LOG.debug("Original tracking url is '{}'. Redirecting to RM app page",
|
||||
originalUri == null ? "NULL" : originalUri);
|
||||
ProxyUtils.sendRedirect(req, resp,
|
||||
StringHelper.pjoin(rmAppPageUrlBase, id.toString()));
|
||||
ProxyUtils.sendRedirect(req, resp, StringHelper.pjoin(getRmAppPageUrlBase(id),
|
||||
id.toString()));
|
||||
} else if (appReportSource == AppReportSource.AHS) {
|
||||
// fallback to Application History Server app page if the application
|
||||
// report was fetched from AHS
|
||||
LOG.debug("Original tracking url is '{}'. Redirecting to AHS app page",
|
||||
originalUri == null ? "NULL" : originalUri);
|
||||
ProxyUtils.sendRedirect(req, resp,
|
||||
StringHelper.pjoin(ahsAppPageUrlBase, id.toString()));
|
||||
ProxyUtils.sendRedirect(req, resp, StringHelper.pjoin(getAhsAppPageUrlBase(),
|
||||
id.toString()));
|
||||
}
|
||||
} else if (ProxyUriUtils.getSchemeFromUrl(originalUri).isEmpty()) {
|
||||
trackingUri =
|
||||
|
@ -92,7 +92,7 @@ void testFetchReportAHSDisabled() throws YarnException, IOException {
|
||||
}
|
||||
}
|
||||
|
||||
static class AppReportFetcherForTest extends AppReportFetcher {
|
||||
static class AppReportFetcherForTest extends DefaultAppReportFetcher {
|
||||
|
||||
public AppReportFetcherForTest(Configuration conf,
|
||||
ApplicationClientProtocol acp) {
|
||||
|
@ -0,0 +1,169 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.webproxy;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.util.StringHelper;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class TestFedAppReportFetcher {
|
||||
|
||||
private Configuration conf;
|
||||
private static ApplicationHistoryProtocol history;
|
||||
|
||||
private SubClusterId subClusterId1 = SubClusterId.newInstance("subCluster1");
|
||||
private SubClusterId subClusterId2 = SubClusterId.newInstance("subCluster2");
|
||||
private SubClusterInfo clusterInfo1 = SubClusterInfo.newInstance(subClusterId1, "10.0.0.1:1000",
|
||||
"10.0.0.1:1000", "10.0.0.1:1000", "10.0.0.1:1000", SubClusterState.SC_RUNNING, 0L, "");
|
||||
private SubClusterInfo clusterInfo2 = SubClusterInfo.newInstance(subClusterId2, "10.0.0.2:1000",
|
||||
"10.0.0.2:1000", "10.0.0.2:1000", "10.0.0.2:1000", SubClusterState.SC_RUNNING, 0L, "");
|
||||
private ApplicationClientProtocol appManager1;
|
||||
private ApplicationClientProtocol appManager2;
|
||||
private ApplicationId appId1 = ApplicationId.newInstance(0, 1);
|
||||
private ApplicationId appId2 = ApplicationId.newInstance(0, 2);
|
||||
|
||||
private static FedAppReportFetcher fetcher;
|
||||
private final String appNotFoundExceptionMsg = "APP NOT FOUND";
|
||||
|
||||
@After
|
||||
public void cleanUp() {
|
||||
history = null;
|
||||
fetcher = null;
|
||||
}
|
||||
|
||||
private void testHelper(boolean isAHSEnabled)
|
||||
throws YarnException, IOException {
|
||||
conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, isAHSEnabled);
|
||||
|
||||
FederationStateStoreFacade fedFacade = FederationStateStoreFacade.getInstance();
|
||||
FederationStateStore fss = new MemoryFederationStateStore();
|
||||
fss.init(conf);
|
||||
fedFacade.reinitialize(fss, conf);
|
||||
|
||||
fss.registerSubCluster(SubClusterRegisterRequest.newInstance(clusterInfo1));
|
||||
fss.registerSubCluster(SubClusterRegisterRequest.newInstance(clusterInfo2));
|
||||
fss.addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest
|
||||
.newInstance(ApplicationHomeSubCluster.newInstance(appId1, subClusterId1)));
|
||||
fss.addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest
|
||||
.newInstance(ApplicationHomeSubCluster.newInstance(appId2, subClusterId2)));
|
||||
|
||||
appManager1 = Mockito.mock(ApplicationClientProtocol.class);
|
||||
Mockito.when(appManager1.getApplicationReport(Mockito.any(GetApplicationReportRequest.class)))
|
||||
.thenThrow(new ApplicationNotFoundException(appNotFoundExceptionMsg));
|
||||
|
||||
appManager2 = Mockito.mock(ApplicationClientProtocol.class);
|
||||
Mockito.when(appManager2.getApplicationReport(Mockito.any(GetApplicationReportRequest.class)))
|
||||
.thenThrow(new ApplicationNotFoundException(appNotFoundExceptionMsg));
|
||||
|
||||
fetcher = new TestFedAppReportFetcher.FedAppReportFetcherForTest(conf);
|
||||
fetcher.registerSubCluster(clusterInfo1, appManager1);
|
||||
fetcher.registerSubCluster(clusterInfo2, appManager2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchReportAHSEnabled() throws YarnException, IOException {
|
||||
testHelper(true);
|
||||
fetcher.getApplicationReport(appId1);
|
||||
fetcher.getApplicationReport(appId2);
|
||||
Mockito.verify(history, Mockito.times(2))
|
||||
.getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
|
||||
Mockito.verify(appManager1, Mockito.times(1))
|
||||
.getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
|
||||
Mockito.verify(appManager2, Mockito.times(1))
|
||||
.getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchReportAHSDisabled() throws Exception {
|
||||
testHelper(false);
|
||||
|
||||
/* RM will not know of the app and Application History Service is disabled
|
||||
* So we will not try to get the report from AHS and RM will throw
|
||||
* ApplicationNotFoundException
|
||||
*/
|
||||
LambdaTestUtils.intercept(ApplicationNotFoundException.class, appNotFoundExceptionMsg,
|
||||
() -> fetcher.getApplicationReport(appId1));
|
||||
LambdaTestUtils.intercept(ApplicationNotFoundException.class, appNotFoundExceptionMsg,
|
||||
() -> fetcher.getApplicationReport(appId2));
|
||||
|
||||
Mockito.verify(appManager1, Mockito.times(1))
|
||||
.getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
|
||||
Mockito.verify(appManager2, Mockito.times(1))
|
||||
.getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
|
||||
Assert.assertNull("HistoryManager should be null as AHS is disabled", history);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetRmAppPageUrlBase() throws IOException, YarnException {
|
||||
testHelper(true);
|
||||
String scheme = WebAppUtils.getHttpSchemePrefix(conf);
|
||||
Assert.assertEquals(fetcher.getRmAppPageUrlBase(appId1),
|
||||
StringHelper.pjoin(scheme + clusterInfo1.getRMWebServiceAddress(), "cluster", "app"));
|
||||
Assert.assertEquals(fetcher.getRmAppPageUrlBase(appId2),
|
||||
StringHelper.pjoin(scheme + clusterInfo2.getRMWebServiceAddress(), "cluster", "app"));
|
||||
}
|
||||
|
||||
static class FedAppReportFetcherForTest extends FedAppReportFetcher {
|
||||
|
||||
FedAppReportFetcherForTest(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ApplicationHistoryProtocol getAHSProxy(Configuration conf)
|
||||
throws IOException {
|
||||
GetApplicationReportResponse resp = Mockito.mock(GetApplicationReportResponse.class);
|
||||
history = Mockito.mock(ApplicationHistoryProtocol.class);
|
||||
try {
|
||||
Mockito.when(history.getApplicationReport(Mockito.any(GetApplicationReportRequest.class)))
|
||||
.thenReturn(resp);
|
||||
} catch (YarnException e) {
|
||||
// This should never happen
|
||||
fail("Found exception when getApplicationReport!");
|
||||
}
|
||||
return history;
|
||||
}
|
||||
}
|
||||
}
|
@ -54,6 +54,7 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.http.HttpServer2;
|
||||
@ -67,6 +68,7 @@
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.util.StringHelper;
|
||||
import org.apache.hadoop.yarn.webapp.MimeType;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
|
||||
@ -110,8 +112,7 @@ public static void start() throws Exception {
|
||||
((ServerConnector)server.getConnectors()[0]).setHost("localhost");
|
||||
server.start();
|
||||
originalPort = ((ServerConnector)server.getConnectors()[0]).getLocalPort();
|
||||
LOG.info("Running embedded servlet container at: http://localhost:"
|
||||
+ originalPort);
|
||||
LOG.info("Running embedded servlet container at: http://localhost:{}", originalPort);
|
||||
// This property needs to be set otherwise CORS Headers will be dropped
|
||||
// by HttpUrlConnection
|
||||
System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
|
||||
@ -364,11 +365,13 @@ void testAppReportForEmptyTrackingUrl() throws Exception {
|
||||
String appAddressInRm =
|
||||
WebAppUtils.getResolvedRMWebAppURLWithScheme(configuration) +
|
||||
"/cluster" + "/app/" + app.toString();
|
||||
assertEquals(proxyConn.getURL().toString(), appAddressInRm);
|
||||
assertEquals(proxyConn.getURL().toString(), appAddressInRm,
|
||||
"Webapp proxy servlet should have redirected to RM");
|
||||
|
||||
//set AHS_ENABLED = true to simulate getting the app report from AHS
|
||||
configuration.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
|
||||
true);
|
||||
proxy.proxy.appReportFetcher.setAhsAppPageUrlBase(configuration);
|
||||
proxyConn = (HttpURLConnection) url.openConnection();
|
||||
proxyConn.connect();
|
||||
try {
|
||||
@ -381,7 +384,8 @@ void testAppReportForEmptyTrackingUrl() throws Exception {
|
||||
String appAddressInAhs =
|
||||
WebAppUtils.getHttpSchemePrefix(configuration) + WebAppUtils.getAHSWebAppURLWithoutScheme(
|
||||
configuration) + "/applicationhistory" + "/app/" + app.toString();
|
||||
assertEquals(proxyConn.getURL().toString(), appAddressInAhs);
|
||||
assertEquals(proxyConn.getURL().toString(), appAddressInAhs,
|
||||
"Webapp proxy servlet should have redirected to AHS");
|
||||
} finally {
|
||||
proxy.close();
|
||||
}
|
||||
@ -607,8 +611,9 @@ protected void serviceStart() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
private class AppReportFetcherForTest extends AppReportFetcher {
|
||||
private class AppReportFetcherForTest extends DefaultAppReportFetcher {
|
||||
int answer = 0;
|
||||
private String ahsAppPageUrlBase = null;
|
||||
|
||||
public AppReportFetcherForTest(Configuration conf) {
|
||||
super(conf);
|
||||
@ -679,5 +684,17 @@ private FetchedAppReport getDefaultApplicationReport(ApplicationId appId,
|
||||
private FetchedAppReport getDefaultApplicationReport(ApplicationId appId) {
|
||||
return getDefaultApplicationReport(appId, true);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public String getAhsAppPageUrlBase() {
|
||||
return ahsAppPageUrlBase != null ? ahsAppPageUrlBase : super.getAhsAppPageUrlBase();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setAhsAppPageUrlBase(Configuration conf) {
|
||||
this.ahsAppPageUrlBase = StringHelper.pjoin(
|
||||
WebAppUtils.getHttpSchemePrefix(conf) + WebAppUtils.getAHSWebAppURLWithoutScheme(conf),
|
||||
"applicationhistory", "app");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,366 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.webproxy;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.http.HttpServer2;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Test the WebAppProxyServlet and WebAppProxy. For back end use simple web server.
|
||||
*/
|
||||
public class TestWebAppProxyServletFed {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestWebAppProxyServletFed.class);
|
||||
|
||||
public static final String AM_PREFIX = "AM";
|
||||
public static final String RM_PREFIX = "RM";
|
||||
public static final String AHS_PREFIX = "AHS";
|
||||
|
||||
/*
|
||||
* Mocked Server is used for simulating the web of AppMaster, ResourceMamanger or TimelineServer.
|
||||
* */
|
||||
private static Server mockServer;
|
||||
private static int mockServerPort = 0;
|
||||
|
||||
/**
|
||||
* Simple http server. Server should send answer with status 200
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
mockServer = new Server(0);
|
||||
((QueuedThreadPool) mockServer.getThreadPool()).setMaxThreads(20);
|
||||
ServletContextHandler context = new ServletContextHandler();
|
||||
context.setContextPath("/");
|
||||
context.addServlet(new ServletHolder(new MockWebServlet(AM_PREFIX)), "/amweb/*");
|
||||
context.addServlet(new ServletHolder(new MockWebServlet(RM_PREFIX)), "/cluster/app/*");
|
||||
context.addServlet(new ServletHolder(new MockWebServlet(AHS_PREFIX)),
|
||||
"/applicationhistory/app/*");
|
||||
mockServer.setHandler(context);
|
||||
|
||||
((ServerConnector) mockServer.getConnectors()[0]).setHost("localhost");
|
||||
mockServer.start();
|
||||
mockServerPort = ((ServerConnector) mockServer.getConnectors()[0]).getLocalPort();
|
||||
LOG.info("Running embedded servlet container at: http://localhost:" + mockServerPort);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
if (mockServer != null) {
|
||||
mockServer.stop();
|
||||
mockServer.destroy();
|
||||
mockServer = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWebServlet() throws IOException {
|
||||
HttpURLConnection conn;
|
||||
// 1. Mocked AppMaster web Test
|
||||
URL url = new URL("http", "localhost", mockServerPort, "/amweb/apptest");
|
||||
conn = (HttpURLConnection) url.openConnection();
|
||||
conn.connect();
|
||||
assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
||||
assertEquals(AM_PREFIX + "/apptest", readResponse(conn));
|
||||
conn.disconnect();
|
||||
|
||||
// 2. Mocked RM web Test
|
||||
url = new URL("http", "localhost", mockServerPort, "/cluster/app/apptest");
|
||||
conn = (HttpURLConnection) url.openConnection();
|
||||
conn.connect();
|
||||
assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
||||
assertEquals(RM_PREFIX + "/apptest", readResponse(conn));
|
||||
conn.disconnect();
|
||||
|
||||
// 3. Mocked AHS web Test
|
||||
url = new URL("http", "localhost", mockServerPort, "/applicationhistory/app/apptest");
|
||||
conn = (HttpURLConnection) url.openConnection();
|
||||
conn.connect();
|
||||
assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
||||
assertEquals(AHS_PREFIX + "/apptest", readResponse(conn));
|
||||
conn.disconnect();
|
||||
}
|
||||
|
||||
@Test(timeout=5000)
|
||||
public void testWebAppProxyServletFed() throws Exception {
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.PROXY_ADDRESS, "localhost:9090");
|
||||
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true);
|
||||
conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "localhost:" + mockServerPort);
|
||||
// overriding num of web server threads, see HttpServer.HTTP_MAXTHREADS
|
||||
conf.setInt("hadoop.http.max.threads", 10);
|
||||
|
||||
// Create sub cluster information.
|
||||
SubClusterId subClusterId1 = SubClusterId.newInstance("scid1");
|
||||
SubClusterId subClusterId2 = SubClusterId.newInstance("scid2");
|
||||
SubClusterInfo subClusterInfo1 = SubClusterInfo.newInstance(subClusterId1, "10.0.0.1:1",
|
||||
"10.0.0.1:1", "10.0.0.1:1", "localhost:" + mockServerPort, SubClusterState.SC_RUNNING, 0,
|
||||
"");
|
||||
SubClusterInfo subClusterInfo2 = SubClusterInfo.newInstance(subClusterId2, "10.0.0.2:1",
|
||||
"10.0.0.2:1", "10.0.0.2:1", "10.0.0.2:1", SubClusterState.SC_RUNNING, 0, "");
|
||||
|
||||
// App1 and App2 is running applications.
|
||||
ApplicationId appId1 = ApplicationId.newInstance(0, 1);
|
||||
ApplicationId appId2 = ApplicationId.newInstance(0, 2);
|
||||
String appUrl1 = "http://localhost:" + mockServerPort + "/amweb/" + appId1;
|
||||
String appUrl2 = "http://localhost:" + mockServerPort + "/amweb/" + appId2;
|
||||
// App3 is accepted application, has not registered original url to am.
|
||||
ApplicationId appId3 = ApplicationId.newInstance(0, 3);
|
||||
// App4 is finished application, has remove from rm, but not remove from timeline server.
|
||||
ApplicationId appId4 = ApplicationId.newInstance(0, 4);
|
||||
|
||||
// Mock for application
|
||||
ApplicationClientProtocol appManager1 = Mockito.mock(ApplicationClientProtocol.class);
|
||||
Mockito.when(appManager1.getApplicationReport(GetApplicationReportRequest.newInstance(appId1)))
|
||||
.thenReturn(GetApplicationReportResponse
|
||||
.newInstance(newApplicationReport(appId1, YarnApplicationState.RUNNING, appUrl1)));
|
||||
Mockito.when(appManager1.getApplicationReport(GetApplicationReportRequest.newInstance(appId3)))
|
||||
.thenReturn(GetApplicationReportResponse
|
||||
.newInstance(newApplicationReport(appId3, YarnApplicationState.ACCEPTED, null)));
|
||||
|
||||
ApplicationClientProtocol appManager2 = Mockito.mock(ApplicationClientProtocol.class);
|
||||
Mockito.when(appManager2.getApplicationReport(GetApplicationReportRequest.newInstance(appId2)))
|
||||
.thenReturn(GetApplicationReportResponse
|
||||
.newInstance(newApplicationReport(appId2, YarnApplicationState.RUNNING, appUrl2)));
|
||||
Mockito.when(appManager2.getApplicationReport(GetApplicationReportRequest.newInstance(appId4)))
|
||||
.thenThrow(new ApplicationNotFoundException("APP NOT FOUND"));
|
||||
|
||||
ApplicationHistoryProtocol historyManager = Mockito.mock(ApplicationHistoryProtocol.class);
|
||||
Mockito
|
||||
.when(historyManager.getApplicationReport(GetApplicationReportRequest.newInstance(appId4)))
|
||||
.thenReturn(GetApplicationReportResponse
|
||||
.newInstance(newApplicationReport(appId4, YarnApplicationState.FINISHED, null)));
|
||||
|
||||
// Initial federation store.
|
||||
FederationStateStoreFacade facade = FederationStateStoreFacade.getInstance();
|
||||
facade.getStateStore()
|
||||
.registerSubCluster(SubClusterRegisterRequest.newInstance(subClusterInfo1));
|
||||
facade.getStateStore()
|
||||
.registerSubCluster(SubClusterRegisterRequest.newInstance(subClusterInfo2));
|
||||
facade.addApplicationHomeSubCluster(
|
||||
ApplicationHomeSubCluster.newInstance(appId1, subClusterId1));
|
||||
facade.addApplicationHomeSubCluster(
|
||||
ApplicationHomeSubCluster.newInstance(appId2, subClusterId2));
|
||||
facade.addApplicationHomeSubCluster(
|
||||
ApplicationHomeSubCluster.newInstance(appId3, subClusterId1));
|
||||
facade.addApplicationHomeSubCluster(
|
||||
ApplicationHomeSubCluster.newInstance(appId4, subClusterId2));
|
||||
|
||||
// Start proxy server
|
||||
WebAppProxyServerForTest proxy = new WebAppProxyServerForTest();
|
||||
proxy.init(conf);
|
||||
proxy.start();
|
||||
|
||||
try {
|
||||
// set Mocked rm and timeline
|
||||
int proxyPort = proxy.proxy.proxyServer.getConnectorAddress(0).getPort();
|
||||
FedAppReportFetcher appReportFetcher = proxy.proxy.appReportFetcher;
|
||||
appReportFetcher.registerSubCluster(subClusterInfo1, appManager1);
|
||||
appReportFetcher.registerSubCluster(subClusterInfo2, appManager2);
|
||||
appReportFetcher.setHistoryManager(historyManager);
|
||||
|
||||
// App1 is running in subcluster1, and original url is registered
|
||||
// in rm of subCluster1. So proxy server will get original url from rm by
|
||||
// getApplicationReport. Then proxy server will fetch the webapp directly.
|
||||
URL url = new URL("http", "localhost", proxyPort, "/proxy/" + appId1.toString());
|
||||
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
||||
conn.connect();
|
||||
assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
||||
assertEquals(AM_PREFIX + "/" + appId1.toString(), readResponse(conn));
|
||||
conn.disconnect();
|
||||
|
||||
// App2 is running in subcluster2, and original url is registered
|
||||
// in rm of subCluster2. So proxy server will get original url from rm by
|
||||
// getApplicationReport. Then proxy server will fetch the webapp directly.
|
||||
url = new URL("http", "localhost", proxyPort, "/proxy/" + appId2.toString());
|
||||
conn = (HttpURLConnection) url.openConnection();
|
||||
conn.connect();
|
||||
assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
||||
assertEquals(AM_PREFIX + "/" + appId2.toString(), readResponse(conn));
|
||||
conn.disconnect();
|
||||
|
||||
// App3 is accepted in subcluster1, and original url is not registered
|
||||
// yet. So proxy server will fetch the application web from rm.
|
||||
url = new URL("http", "localhost", proxyPort, "/proxy/" + appId3.toString());
|
||||
conn = (HttpURLConnection) url.openConnection();
|
||||
conn.connect();
|
||||
assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
||||
assertEquals(RM_PREFIX + "/" + appId3.toString(), readResponse(conn));
|
||||
conn.disconnect();
|
||||
|
||||
// App4 is finished in subcluster2, and have removed from rm, but not
|
||||
// removed from timeline server. So proxy server will fetch the
|
||||
// application web from timeline server.
|
||||
url = new URL("http", "localhost", proxyPort, "/proxy/" + appId4.toString());
|
||||
conn = (HttpURLConnection) url.openConnection();
|
||||
conn.connect();
|
||||
assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
||||
assertEquals(AHS_PREFIX + "/" + appId4.toString(), readResponse(conn));
|
||||
conn.disconnect();
|
||||
} finally {
|
||||
proxy.close();
|
||||
}
|
||||
}
|
||||
|
||||
private ApplicationReport newApplicationReport(ApplicationId appId,
|
||||
YarnApplicationState state, String origTrackingUrl) {
|
||||
return ApplicationReport.newInstance(appId, null, "testuser", null, null, null, 0, null, state,
|
||||
null, null, 0, 0, 0, null, null, origTrackingUrl, 0f, null, null);
|
||||
}
|
||||
|
||||
private String readResponse(HttpURLConnection conn) throws IOException {
|
||||
InputStream input = conn.getInputStream();
|
||||
byte[] bytes = new byte[input.available()];
|
||||
input.read(bytes);
|
||||
return new String(bytes);
|
||||
}
|
||||
|
||||
private class WebAppProxyServerForTest extends CompositeService {
|
||||
|
||||
private WebAppProxyForTest proxy = null;
|
||||
|
||||
WebAppProxyServerForTest() {
|
||||
super(WebAppProxyServer.class.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void serviceInit(Configuration conf) throws Exception {
|
||||
proxy = new WebAppProxyForTest();
|
||||
addService(proxy);
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* This servlet is used for simulate the web of AppMaster, ResourceManager,
|
||||
* TimelineServer and so on.
|
||||
* */
|
||||
public static class MockWebServlet extends HttpServlet {
|
||||
|
||||
private String role;
|
||||
|
||||
public MockWebServlet(String role) {
|
||||
this.role = role;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
|
||||
throws IOException {
|
||||
if (req.getPathInfo() != null) {
|
||||
resp.getWriter().write(role + req.getPathInfo());
|
||||
}
|
||||
resp.setStatus(HttpServletResponse.SC_OK);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
|
||||
throws IOException {
|
||||
InputStream is = req.getInputStream();
|
||||
OutputStream os = resp.getOutputStream();
|
||||
int c = is.read();
|
||||
while (c > -1) {
|
||||
os.write(c);
|
||||
c = is.read();
|
||||
}
|
||||
is.close();
|
||||
os.close();
|
||||
resp.setStatus(HttpServletResponse.SC_OK);
|
||||
}
|
||||
}
|
||||
|
||||
private class WebAppProxyForTest extends WebAppProxy {
|
||||
|
||||
private HttpServer2 proxyServer;
|
||||
private FedAppReportFetcher appReportFetcher;
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
Configuration conf = getConfig();
|
||||
String bindAddress = conf.get(YarnConfiguration.PROXY_ADDRESS);
|
||||
bindAddress = StringUtils.split(bindAddress, ':')[0];
|
||||
AccessControlList acl = new AccessControlList(conf.get(YarnConfiguration.YARN_ADMIN_ACL,
|
||||
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
|
||||
proxyServer = new HttpServer2.Builder()
|
||||
.setName("proxy")
|
||||
.addEndpoint(URI.create(WebAppUtils.getHttpSchemePrefix(conf) + bindAddress + ":0"))
|
||||
.setFindPort(true)
|
||||
.setConf(conf)
|
||||
.setACL(acl)
|
||||
.build();
|
||||
proxyServer.addServlet(ProxyUriUtils.PROXY_SERVLET_NAME, ProxyUriUtils.PROXY_PATH_SPEC,
|
||||
WebAppProxyServlet.class);
|
||||
|
||||
appReportFetcher = new FedAppReportFetcher(conf);
|
||||
proxyServer.setAttribute(FETCHER_ATTRIBUTE, appReportFetcher);
|
||||
proxyServer.setAttribute(IS_SECURITY_ENABLED_ATTRIBUTE, Boolean.FALSE);
|
||||
|
||||
String proxy = WebAppUtils.getProxyHostAndPort(conf);
|
||||
String[] proxyParts = proxy.split(":");
|
||||
String proxyHost = proxyParts[0];
|
||||
|
||||
proxyServer.setAttribute(PROXY_HOST_ATTRIBUTE, proxyHost);
|
||||
proxyServer.start();
|
||||
LOG.info("Proxy server is started at port {}", proxyServer.getConnectorAddress(0).getPort());
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user