YARN-1811. Fixed AMFilters in YARN to correctly accept requests from either web-app proxy or the RMs when HA is enabled. Contributed by Robert Kanter.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1579877 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1c49cfbeb7
commit
4224e61340
@ -533,6 +533,9 @@ Release 2.4.0 - UNRELEASED
|
||||
YARN-1859. Fixed WebAppProxyServlet to correctly handle applications absent
|
||||
on the ResourceManager. (Zhijie Shen via vinodkv)
|
||||
|
||||
YARN-1811. Fixed AMFilters in YARN to correctly accept requests from either
|
||||
web-app proxy or the RMs when HA is enabled. (Robert Kanter via vinodkv)
|
||||
|
||||
Release 2.3.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -264,7 +264,6 @@ public static String getConfValueForRMInstance(
|
||||
}
|
||||
|
||||
/** Add non empty and non null suffix to a key */
|
||||
@VisibleForTesting
|
||||
public static String addSuffix(String key, String suffix) {
|
||||
if (suffix == null || suffix.isEmpty()) {
|
||||
return key;
|
||||
|
@ -16,10 +16,11 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
package org.apache.hadoop.yarn.util;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
@ -67,4 +68,28 @@ private static HAServiceState getHAState(YarnConfiguration yarnConf)
|
||||
HAServiceState haState = proto.getServiceStatus().getState();
|
||||
return haState;
|
||||
}
|
||||
|
||||
public static List<String> getRMHAWebappAddresses(
|
||||
final YarnConfiguration conf) {
|
||||
Collection<String> rmIds =
|
||||
conf.getStringCollection(YarnConfiguration.RM_HA_IDS);
|
||||
List<String> addrs = new ArrayList<String>();
|
||||
if (YarnConfiguration.useHttps(conf)) {
|
||||
for (String id : rmIds) {
|
||||
String addr = conf.get(
|
||||
YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + "." + id);
|
||||
if (addr != null) {
|
||||
addrs.add(addr);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (String id : rmIds) {
|
||||
String addr = conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS + "." + id);
|
||||
if (addr != null) {
|
||||
addrs.add(addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
return addrs;
|
||||
}
|
||||
}
|
@ -22,6 +22,9 @@
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
@ -30,6 +33,8 @@
|
||||
import org.apache.hadoop.http.HttpServer2;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.util.RMHAUtils;
|
||||
|
||||
@Private
|
||||
@Evolving
|
||||
@ -79,6 +84,36 @@ public static String getRMWebAppURLWithoutScheme(Configuration conf) {
|
||||
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> getProxyHostsAndPortsForAmFilter(
|
||||
Configuration conf) {
|
||||
List<String> addrs = new ArrayList<String>();
|
||||
String proxyAddr = conf.get(YarnConfiguration.PROXY_ADDRESS);
|
||||
// If PROXY_ADDRESS isn't set, fallback to RM_WEBAPP(_HTTPS)_ADDRESS
|
||||
// There could be multiple if using RM HA
|
||||
if (proxyAddr == null || proxyAddr.isEmpty()) {
|
||||
// If RM HA is enabled, try getting those addresses
|
||||
if (HAUtil.isHAEnabled(conf)) {
|
||||
List<String> haAddrs =
|
||||
RMHAUtils.getRMHAWebappAddresses(new YarnConfiguration(conf));
|
||||
for (String addr : haAddrs) {
|
||||
try {
|
||||
InetSocketAddress socketAddr = NetUtils.createSocketAddr(addr);
|
||||
addrs.add(getResolvedAddress(socketAddr));
|
||||
} catch(IllegalArgumentException e) {
|
||||
// skip if can't resolve
|
||||
}
|
||||
}
|
||||
}
|
||||
// If couldn't resolve any of the addresses or not using RM HA, fallback
|
||||
if (addrs.isEmpty()) {
|
||||
addrs.add(getResolvedRMWebAppURLWithoutScheme(conf));
|
||||
}
|
||||
} else {
|
||||
addrs.add(proxyAddr);
|
||||
}
|
||||
return addrs;
|
||||
}
|
||||
|
||||
public static String getProxyHostAndPort(Configuration conf) {
|
||||
String addr = conf.get(YarnConfiguration.PROXY_ADDRESS);
|
||||
@ -112,10 +147,14 @@ public static String getResolvedRMWebAppURLWithoutScheme(Configuration conf,
|
||||
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_WEBAPP_PORT);
|
||||
}
|
||||
return getResolvedAddress(address);
|
||||
}
|
||||
|
||||
private static String getResolvedAddress(InetSocketAddress address) {
|
||||
address = NetUtils.getConnectAddress(address);
|
||||
StringBuffer sb = new StringBuffer();
|
||||
StringBuilder sb = new StringBuilder();
|
||||
InetAddress resolved = address.getAddress();
|
||||
if (resolved == null || resolved.isAnyLocalAddress() ||
|
||||
if (resolved == null || resolved.isAnyLocalAddress() ||
|
||||
resolved.isLoopbackAddress()) {
|
||||
String lh = address.getHostName();
|
||||
try {
|
||||
|
@ -25,7 +25,7 @@
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMHAUtils;
|
||||
import org.apache.hadoop.yarn.util.RMHAUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.yarn.server.webproxy.amfilter;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -36,11 +37,23 @@ public class AmFilterInitializer extends FilterInitializer {
|
||||
@Override
|
||||
public void initFilter(FilterContainer container, Configuration conf) {
|
||||
Map<String, String> params = new HashMap<String, String>();
|
||||
String proxy = WebAppUtils.getProxyHostAndPort(conf);
|
||||
String[] parts = proxy.split(":");
|
||||
params.put(AmIpFilter.PROXY_HOST, parts[0]);
|
||||
params.put(AmIpFilter.PROXY_URI_BASE, WebAppUtils.getHttpSchemePrefix(conf)
|
||||
+ proxy + getApplicationWebProxyBase());
|
||||
List<String> proxies = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (String proxy : proxies) {
|
||||
sb.append(proxy.split(":")[0]).append(AmIpFilter.PROXY_HOSTS_DELIMITER);
|
||||
}
|
||||
sb.setLength(sb.length() - 1);
|
||||
params.put(AmIpFilter.PROXY_HOSTS, sb.toString());
|
||||
|
||||
String prefix = WebAppUtils.getHttpSchemePrefix(conf);
|
||||
String proxyBase = getApplicationWebProxyBase();
|
||||
sb = new StringBuilder();
|
||||
for (String proxy : proxies) {
|
||||
sb.append(prefix).append(proxy).append(proxyBase)
|
||||
.append(AmIpFilter.PROXY_HOSTS_DELIMITER);
|
||||
}
|
||||
sb.setLength(sb.length() - 1);
|
||||
params.put(AmIpFilter.PROXY_URI_BASES, sb.toString());
|
||||
container.addFilter(FILTER_NAME, FILTER_CLASS, params);
|
||||
}
|
||||
|
||||
|
@ -20,8 +20,12 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.servlet.Filter;
|
||||
@ -36,42 +40,78 @@
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
|
||||
import org.apache.hadoop.yarn.util.RMHAUtils;
|
||||
|
||||
@Public
|
||||
public class AmIpFilter implements Filter {
|
||||
private static final Log LOG = LogFactory.getLog(AmIpFilter.class);
|
||||
|
||||
|
||||
@Deprecated
|
||||
public static final String PROXY_HOST = "PROXY_HOST";
|
||||
@Deprecated
|
||||
public static final String PROXY_URI_BASE = "PROXY_URI_BASE";
|
||||
static final String PROXY_HOSTS = "PROXY_HOSTS";
|
||||
static final String PROXY_HOSTS_DELIMITER = ",";
|
||||
static final String PROXY_URI_BASES = "PROXY_URI_BASES";
|
||||
static final String PROXY_URI_BASES_DELIMITER = ",";
|
||||
//update the proxy IP list about every 5 min
|
||||
private static final long updateInterval = 5 * 60 * 1000;
|
||||
|
||||
private String proxyHost;
|
||||
|
||||
private String[] proxyHosts;
|
||||
private Set<String> proxyAddresses = null;
|
||||
private long lastUpdate;
|
||||
private String proxyUriBase;
|
||||
|
||||
private Map<String, String> proxyUriBases;
|
||||
|
||||
@Override
|
||||
public void init(FilterConfig conf) throws ServletException {
|
||||
proxyHost = conf.getInitParameter(PROXY_HOST);
|
||||
proxyUriBase = conf.getInitParameter(PROXY_URI_BASE);
|
||||
// Maintain for backwards compatibility
|
||||
if (conf.getInitParameter(PROXY_HOST) != null
|
||||
&& conf.getInitParameter(PROXY_URI_BASE) != null) {
|
||||
proxyHosts = new String[]{conf.getInitParameter(PROXY_HOST)};
|
||||
proxyUriBases = new HashMap<String, String>(1);
|
||||
proxyUriBases.put("dummy", conf.getInitParameter(PROXY_URI_BASE));
|
||||
} else {
|
||||
proxyHosts = conf.getInitParameter(PROXY_HOSTS)
|
||||
.split(PROXY_HOSTS_DELIMITER);
|
||||
|
||||
String[] proxyUriBasesArr = conf.getInitParameter(PROXY_URI_BASES)
|
||||
.split(PROXY_URI_BASES_DELIMITER);
|
||||
proxyUriBases = new HashMap<String, String>();
|
||||
for (String proxyUriBase : proxyUriBasesArr) {
|
||||
try {
|
||||
URL url = new URL(proxyUriBase);
|
||||
proxyUriBases.put(url.getHost() + ":" + url.getPort(), proxyUriBase);
|
||||
} catch(MalformedURLException e) {
|
||||
LOG.warn(proxyUriBase + " does not appear to be a valid URL", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected Set<String> getProxyAddresses() throws ServletException {
|
||||
long now = System.currentTimeMillis();
|
||||
synchronized(this) {
|
||||
if(proxyAddresses == null || (lastUpdate + updateInterval) >= now) {
|
||||
try {
|
||||
proxyAddresses = new HashSet<String>();
|
||||
for(InetAddress add : InetAddress.getAllByName(proxyHost)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("proxy address is: " + add.getHostAddress());
|
||||
proxyAddresses = new HashSet<String>();
|
||||
for (String proxyHost : proxyHosts) {
|
||||
try {
|
||||
for(InetAddress add : InetAddress.getAllByName(proxyHost)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("proxy address is: " + add.getHostAddress());
|
||||
}
|
||||
proxyAddresses.add(add.getHostAddress());
|
||||
}
|
||||
lastUpdate = now;
|
||||
} catch (UnknownHostException e) {
|
||||
LOG.warn("Could not locate " + proxyHost + " - skipping", e);
|
||||
}
|
||||
proxyAddresses.add(add.getHostAddress());
|
||||
}
|
||||
lastUpdate = now;
|
||||
} catch (UnknownHostException e) {
|
||||
throw new ServletException("Could not locate "+proxyHost, e);
|
||||
if (proxyAddresses.isEmpty()) {
|
||||
throw new ServletException("Could not locate any of the proxy hosts");
|
||||
}
|
||||
}
|
||||
return proxyAddresses;
|
||||
@ -89,21 +129,22 @@ public void doFilter(ServletRequest req, ServletResponse resp,
|
||||
if(!(req instanceof HttpServletRequest)) {
|
||||
throw new ServletException("This filter only works for HTTP/HTTPS");
|
||||
}
|
||||
|
||||
|
||||
HttpServletRequest httpReq = (HttpServletRequest)req;
|
||||
HttpServletResponse httpResp = (HttpServletResponse)resp;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Remote address for request is: " + httpReq.getRemoteAddr());
|
||||
}
|
||||
if(!getProxyAddresses().contains(httpReq.getRemoteAddr())) {
|
||||
String redirectUrl = httpResp.encodeRedirectURL(proxyUriBase +
|
||||
String redirectUrl = findRedirectUrl();
|
||||
redirectUrl = httpResp.encodeRedirectURL(redirectUrl +
|
||||
httpReq.getRequestURI());
|
||||
httpResp.sendRedirect(redirectUrl);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
String user = null;
|
||||
|
||||
|
||||
if (httpReq.getCookies() != null) {
|
||||
for(Cookie c: httpReq.getCookies()) {
|
||||
if(WebAppProxyServlet.PROXY_USER_COOKIE_NAME.equals(c.getName())){
|
||||
@ -118,9 +159,30 @@ public void doFilter(ServletRequest req, ServletResponse resp,
|
||||
chain.doFilter(req, resp);
|
||||
} else {
|
||||
final AmIpPrincipal principal = new AmIpPrincipal(user);
|
||||
ServletRequest requestWrapper = new AmIpServletRequestWrapper(httpReq,
|
||||
ServletRequest requestWrapper = new AmIpServletRequestWrapper(httpReq,
|
||||
principal);
|
||||
chain.doFilter(requestWrapper, resp);
|
||||
}
|
||||
}
|
||||
|
||||
protected String findRedirectUrl() throws ServletException {
|
||||
String addr;
|
||||
if (proxyUriBases.size() == 1) { // external proxy or not RM HA
|
||||
addr = proxyUriBases.values().iterator().next();
|
||||
} else { // RM HA
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
String activeRMId = RMHAUtils.findActiveRMHAId(conf);
|
||||
String addressPropertyPrefix = YarnConfiguration.useHttps(conf)
|
||||
? YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS
|
||||
: YarnConfiguration.RM_WEBAPP_ADDRESS;
|
||||
String host = conf.get(
|
||||
HAUtil.addSuffix(addressPropertyPrefix, activeRMId));
|
||||
addr = proxyUriBases.get(host);
|
||||
}
|
||||
if (addr == null) {
|
||||
throw new ServletException(
|
||||
"Could not determine the proxy server for redirection");
|
||||
}
|
||||
return addr;
|
||||
}
|
||||
}
|
||||
|
@ -88,6 +88,7 @@ public ServletContext getServletContext() {
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
@SuppressWarnings("deprecation")
|
||||
public void filterNullCookies() throws Exception {
|
||||
HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
|
||||
|
||||
@ -120,6 +121,7 @@ public void doFilter(ServletRequest servletRequest,
|
||||
* Test AmIpFilter
|
||||
*/
|
||||
@Test(timeout = 1000)
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testFilter() throws Exception {
|
||||
Map<String, String> params = new HashMap<String, String>();
|
||||
params.put(AmIpFilter.PROXY_HOST, proxyHost);
|
||||
|
@ -0,0 +1,214 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.webproxy.amfilter;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.http.FilterContainer;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestAmFilterInitializer extends TestCase {
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
super.setUp();
|
||||
NetUtils.addStaticResolution("host1", "172.0.0.1");
|
||||
NetUtils.addStaticResolution("host2", "172.0.0.1");
|
||||
NetUtils.addStaticResolution("host3", "172.0.0.1");
|
||||
NetUtils.addStaticResolution("host4", "172.0.0.1");
|
||||
NetUtils.addStaticResolution("host5", "172.0.0.1");
|
||||
NetUtils.addStaticResolution("host6", "172.0.0.1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInitFilter() {
|
||||
// Check PROXY_ADDRESS
|
||||
MockFilterContainer con = new MockFilterContainer();
|
||||
Configuration conf = new Configuration(false);
|
||||
conf.set(YarnConfiguration.PROXY_ADDRESS, "host1:1000");
|
||||
AmFilterInitializer afi = new MockAmFilterInitializer();
|
||||
assertNull(con.givenParameters);
|
||||
afi.initFilter(con, conf);
|
||||
assertEquals(2, con.givenParameters.size());
|
||||
assertEquals("host1", con.givenParameters.get(AmIpFilter.PROXY_HOSTS));
|
||||
assertEquals("http://host1:1000/foo",
|
||||
con.givenParameters.get(AmIpFilter.PROXY_URI_BASES));
|
||||
|
||||
// Check a single RM_WEBAPP_ADDRESS
|
||||
con = new MockFilterContainer();
|
||||
conf = new Configuration(false);
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "host2:2000");
|
||||
afi = new MockAmFilterInitializer();
|
||||
assertNull(con.givenParameters);
|
||||
afi.initFilter(con, conf);
|
||||
assertEquals(2, con.givenParameters.size());
|
||||
assertEquals("host2", con.givenParameters.get(AmIpFilter.PROXY_HOSTS));
|
||||
assertEquals("http://host2:2000/foo",
|
||||
con.givenParameters.get(AmIpFilter.PROXY_URI_BASES));
|
||||
|
||||
// Check multiple RM_WEBAPP_ADDRESSes (RM HA)
|
||||
con = new MockFilterContainer();
|
||||
conf = new Configuration(false);
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS + ".rm1", "host2:2000");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS + ".rm2", "host3:3000");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS + ".rm3", "host4:4000");
|
||||
afi = new MockAmFilterInitializer();
|
||||
assertNull(con.givenParameters);
|
||||
afi.initFilter(con, conf);
|
||||
assertEquals(2, con.givenParameters.size());
|
||||
String[] proxyHosts = con.givenParameters.get(AmIpFilter.PROXY_HOSTS)
|
||||
.split(AmIpFilter.PROXY_HOSTS_DELIMITER);
|
||||
assertEquals(3, proxyHosts.length);
|
||||
Arrays.sort(proxyHosts);
|
||||
assertEquals("host2", proxyHosts[0]);
|
||||
assertEquals("host3", proxyHosts[1]);
|
||||
assertEquals("host4", proxyHosts[2]);
|
||||
String[] proxyBases = con.givenParameters.get(AmIpFilter.PROXY_URI_BASES)
|
||||
.split(AmIpFilter.PROXY_URI_BASES_DELIMITER);
|
||||
assertEquals(3, proxyBases.length);
|
||||
Arrays.sort(proxyBases);
|
||||
assertEquals("http://host2:2000/foo", proxyBases[0]);
|
||||
assertEquals("http://host3:3000/foo", proxyBases[1]);
|
||||
assertEquals("http://host4:4000/foo", proxyBases[2]);
|
||||
|
||||
// Check multiple RM_WEBAPP_ADDRESSes (RM HA) with HTTPS
|
||||
con = new MockFilterContainer();
|
||||
conf = new Configuration(false);
|
||||
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY,
|
||||
HttpConfig.Policy.HTTPS_ONLY.toString());
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + ".rm1", "host5:5000");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + ".rm2", "host6:6000");
|
||||
afi = new MockAmFilterInitializer();
|
||||
assertNull(con.givenParameters);
|
||||
afi.initFilter(con, conf);
|
||||
assertEquals(2, con.givenParameters.size());
|
||||
proxyHosts = con.givenParameters.get(AmIpFilter.PROXY_HOSTS)
|
||||
.split(AmIpFilter.PROXY_HOSTS_DELIMITER);
|
||||
assertEquals(2, proxyHosts.length);
|
||||
Arrays.sort(proxyHosts);
|
||||
assertEquals("host5", proxyHosts[0]);
|
||||
assertEquals("host6", proxyHosts[1]);
|
||||
proxyBases = con.givenParameters.get(AmIpFilter.PROXY_URI_BASES)
|
||||
.split(AmIpFilter.PROXY_URI_BASES_DELIMITER);
|
||||
assertEquals(2, proxyBases.length);
|
||||
Arrays.sort(proxyBases);
|
||||
assertEquals("https://host5:5000/foo", proxyBases[0]);
|
||||
assertEquals("https://host6:6000/foo", proxyBases[1]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetProxyHostsAndPortsForAmFilter() {
|
||||
|
||||
// Check no configs given
|
||||
Configuration conf = new Configuration(false);
|
||||
List<String> proxyHosts =
|
||||
WebAppUtils.getProxyHostsAndPortsForAmFilter(conf);
|
||||
assertEquals(1, proxyHosts.size());
|
||||
assertEquals(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf),
|
||||
proxyHosts.get(0));
|
||||
|
||||
// Check PROXY_ADDRESS has priority
|
||||
conf = new Configuration(false);
|
||||
conf.set(YarnConfiguration.PROXY_ADDRESS, "host1:1000");
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS + ".rm1", "host2:2000");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS + ".rm2", "host3:3000");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS + ".rm3", "host4:4000");
|
||||
proxyHosts = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf);
|
||||
assertEquals(1, proxyHosts.size());
|
||||
assertEquals("host1:1000", proxyHosts.get(0));
|
||||
|
||||
// Check getting a single RM_WEBAPP_ADDRESS
|
||||
conf = new Configuration(false);
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "host2:2000");
|
||||
proxyHosts = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf);
|
||||
assertEquals(1, proxyHosts.size());
|
||||
Collections.sort(proxyHosts);
|
||||
assertEquals("host2:2000", proxyHosts.get(0));
|
||||
|
||||
// Check getting multiple RM_WEBAPP_ADDRESSes (RM HA)
|
||||
conf = new Configuration(false);
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS + ".rm1", "host2:2000");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS + ".rm2", "host3:3000");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS + ".rm3", "host4:4000");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS + ".rm4", "dummy");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + ".rm1", "host5:5000");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + ".rm2", "host6:6000");
|
||||
proxyHosts = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf);
|
||||
assertEquals(3, proxyHosts.size());
|
||||
Collections.sort(proxyHosts);
|
||||
assertEquals("host2:2000", proxyHosts.get(0));
|
||||
assertEquals("host3:3000", proxyHosts.get(1));
|
||||
assertEquals("host4:4000", proxyHosts.get(2));
|
||||
|
||||
// Check getting multiple RM_WEBAPP_ADDRESSes (RM HA) with HTTPS
|
||||
conf = new Configuration(false);
|
||||
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY,
|
||||
HttpConfig.Policy.HTTPS_ONLY.toString());
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3,dummy");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS + ".rm1", "host2:2000");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS + ".rm2", "host3:3000");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS + ".rm3", "host4:4000");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + ".rm1", "host5:5000");
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + ".rm2", "host6:6000");
|
||||
proxyHosts = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf);
|
||||
assertEquals(2, proxyHosts.size());
|
||||
Collections.sort(proxyHosts);
|
||||
assertEquals("host5:5000", proxyHosts.get(0));
|
||||
assertEquals("host6:6000", proxyHosts.get(1));
|
||||
}
|
||||
|
||||
class MockAmFilterInitializer extends AmFilterInitializer {
|
||||
@Override
|
||||
protected String getApplicationWebProxyBase() {
|
||||
return "/foo";
|
||||
}
|
||||
}
|
||||
|
||||
class MockFilterContainer implements FilterContainer {
|
||||
Map<String, String> givenParameters;
|
||||
|
||||
@Override
|
||||
public void addFilter(String name, String classname, Map<String,
|
||||
String> parameters) {
|
||||
givenParameters = parameters;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addGlobalFilter(String name, String classname,
|
||||
Map<String, String> parameters) {
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user