YARN-10218. [GPG] Support HTTPS in GPG. (#5945) Contributed by Shilun Fan.
Reviewed-by: Inigo Goiri <inigoiri@apache.org> Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
parent
136111314d
commit
ff1570acfa
@ -19,13 +19,20 @@
|
|||||||
package org.apache.hadoop.yarn.server.globalpolicygenerator;
|
package org.apache.hadoop.yarn.server.globalpolicygenerator;
|
||||||
|
|
||||||
import static javax.servlet.http.HttpServletResponse.SC_OK;
|
import static javax.servlet.http.HttpServletResponse.SC_OK;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH;
|
||||||
|
import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.HTTPS_PREFIX;
|
||||||
|
import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.HTTP_PREFIX;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
|
||||||
@ -51,16 +58,23 @@ public final class GPGUtils {
|
|||||||
* @param webAddr WebAddress.
|
* @param webAddr WebAddress.
|
||||||
* @param path url path.
|
* @param path url path.
|
||||||
* @param returnType return type.
|
* @param returnType return type.
|
||||||
|
* @param conf configuration.
|
||||||
* @return response entity.
|
* @return response entity.
|
||||||
*/
|
*/
|
||||||
public static <T> T invokeRMWebService(String webAddr, String path, final Class<T> returnType) {
|
public static <T> T invokeRMWebService(String webAddr, String path, final Class<T> returnType,
|
||||||
|
Configuration conf) {
|
||||||
Client client = Client.create();
|
Client client = Client.create();
|
||||||
T obj;
|
T obj;
|
||||||
|
|
||||||
WebResource webResource = client.resource(webAddr);
|
// webAddr stores the form of host:port in subClusterInfo
|
||||||
|
InetSocketAddress socketAddress = NetUtils
|
||||||
|
.getConnectAddress(NetUtils.createSocketAddr(webAddr));
|
||||||
|
String scheme = YarnConfiguration.useHttps(conf) ? HTTPS_PREFIX : HTTP_PREFIX;
|
||||||
|
String webAddress = scheme + socketAddress.getHostName() + ":" + socketAddress.getPort();
|
||||||
|
WebResource webResource = client.resource(webAddress);
|
||||||
ClientResponse response = null;
|
ClientResponse response = null;
|
||||||
try {
|
try {
|
||||||
response = webResource.path("ws/v1/cluster").path(path)
|
response = webResource.path(RM_WEB_SERVICE_PATH).path(path)
|
||||||
.accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
|
.accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
|
||||||
if (response.getStatus() == SC_OK) {
|
if (response.getStatus() == SC_OK) {
|
||||||
obj = response.getEntity(returnType);
|
obj = response.getEntity(returnType);
|
||||||
|
@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.globalpolicygenerator.webapp.GPGWebApp;
|
|||||||
import org.apache.hadoop.yarn.webapp.WebApp;
|
import org.apache.hadoop.yarn.webapp.WebApp;
|
||||||
import org.apache.hadoop.yarn.webapp.WebApps;
|
import org.apache.hadoop.yarn.webapp.WebApps;
|
||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
|
import org.apache.hadoop.yarn.webapp.util.WebServiceClient;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -131,6 +132,7 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|||||||
|
|
||||||
// super.serviceInit after all services are added
|
// super.serviceInit after all services are added
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
|
WebServiceClient.initialize(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -207,6 +209,7 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|||||||
}
|
}
|
||||||
DefaultMetricsSystem.shutdown();
|
DefaultMetricsSystem.shutdown();
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
|
WebServiceClient.destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getName() {
|
public String getName() {
|
||||||
|
@ -159,7 +159,7 @@ public class PolicyGenerator implements Runnable, Configurable {
|
|||||||
clusterInfo.put(sci.getSubClusterId(), new HashMap<>());
|
clusterInfo.put(sci.getSubClusterId(), new HashMap<>());
|
||||||
}
|
}
|
||||||
Object ret = GPGUtils.invokeRMWebService(sci.getRMWebServiceAddress(),
|
Object ret = GPGUtils.invokeRMWebService(sci.getRMWebServiceAddress(),
|
||||||
e.getValue(), e.getKey());
|
e.getValue(), e.getKey(), getConf());
|
||||||
clusterInfo.get(sci.getSubClusterId()).put(e.getKey(), ret);
|
clusterInfo.get(sci.getSubClusterId()).put(e.getKey(), ret);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -181,12 +181,11 @@ public class PolicyGenerator implements Runnable, Configurable {
|
|||||||
for (SubClusterInfo sci : activeSubClusters.values()) {
|
for (SubClusterInfo sci : activeSubClusters.values()) {
|
||||||
SchedulerTypeInfo sti = GPGUtils
|
SchedulerTypeInfo sti = GPGUtils
|
||||||
.invokeRMWebService(sci.getRMWebServiceAddress(),
|
.invokeRMWebService(sci.getRMWebServiceAddress(),
|
||||||
RMWSConsts.SCHEDULER, SchedulerTypeInfo.class);
|
RMWSConsts.SCHEDULER, SchedulerTypeInfo.class, getConf());
|
||||||
if(sti != null){
|
if(sti != null){
|
||||||
schedInfo.put(sci.getSubClusterId(), sti.getSchedulerInfo());
|
schedInfo.put(sci.getSubClusterId(), sti.getSchedulerInfo());
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Skipped null scheduler info from SubCluster " + sci
|
LOG.warn("Skipped null scheduler info from SubCluster {}.", sci.getSubClusterId());
|
||||||
.getSubClusterId().toString());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return schedInfo;
|
return schedInfo;
|
||||||
|
@ -22,6 +22,7 @@ import com.sun.jersey.api.json.JSONConfiguration;
|
|||||||
import com.sun.jersey.api.json.JSONJAXBContext;
|
import com.sun.jersey.api.json.JSONJAXBContext;
|
||||||
import com.sun.jersey.api.json.JSONUnmarshaller;
|
import com.sun.jersey.api.json.JSONUnmarshaller;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
|
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
|
||||||
@ -60,6 +61,7 @@ import org.mockito.ArgumentCaptor;
|
|||||||
import javax.xml.bind.JAXBException;
|
import javax.xml.bind.JAXBException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.StringReader;
|
import java.io.StringReader;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -294,8 +296,10 @@ public class TestPolicyGenerator {
|
|||||||
resourceManager.start();
|
resourceManager.start();
|
||||||
|
|
||||||
String rmAddress = WebAppUtils.getRMWebAppURLWithScheme(this.conf);
|
String rmAddress = WebAppUtils.getRMWebAppURLWithScheme(this.conf);
|
||||||
SchedulerTypeInfo sti = GPGUtils.invokeRMWebService(rmAddress, RMWSConsts.SCHEDULER,
|
String webAppAddress = getServiceAddress(NetUtils.createSocketAddr(rmAddress));
|
||||||
SchedulerTypeInfo.class);
|
|
||||||
|
SchedulerTypeInfo sti = GPGUtils.invokeRMWebService(webAppAddress, RMWSConsts.SCHEDULER,
|
||||||
|
SchedulerTypeInfo.class, this.conf);
|
||||||
|
|
||||||
Assert.assertNotNull(sti);
|
Assert.assertNotNull(sti);
|
||||||
SchedulerInfo schedulerInfo = sti.getSchedulerInfo();
|
SchedulerInfo schedulerInfo = sti.getSchedulerInfo();
|
||||||
@ -346,6 +350,11 @@ public class TestPolicyGenerator {
|
|||||||
Assert.assertEquals(20f, queueB3.getCapacity(), 0.00001);
|
Assert.assertEquals(20f, queueB3.getCapacity(), 0.00001);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getServiceAddress(InetSocketAddress address) {
|
||||||
|
InetSocketAddress socketAddress = NetUtils.getConnectAddress(address);
|
||||||
|
return socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Testable policy generator overrides the methods that communicate
|
* Testable policy generator overrides the methods that communicate
|
||||||
* with the RM REST endpoint, allowing us to inject faked responses.
|
* with the RM REST endpoint, allowing us to inject faked responses.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user