HDFS-12870. Ozone: Service Discovery: REST endpoint in KSM for getServiceList. Contributed by Nanda kumar.
This commit is contained in:
parent
5f10c2d8ce
commit
5766792cad
@ -149,6 +149,11 @@ public enum Versioning {NOT_DEFINED, ENABLED, DISABLED}
|
||||
|
||||
public static final int INVALID_PORT = -1;
|
||||
|
||||
|
||||
// The ServiceListJSONServlet context attribute where KeySpaceManager
|
||||
// instance gets stored.
|
||||
public static final String KSM_CONTEXT_ATTRIBUTE = "ozone.ksm";
|
||||
|
||||
private OzoneConsts() {
|
||||
// Never Constructed
|
||||
}
|
||||
|
@ -19,12 +19,18 @@
|
||||
package org.apache.hadoop.ozone.ksm.helpers;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.ObjectReader;
|
||||
import com.fasterxml.jackson.databind.ObjectWriter;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos
|
||||
.ServicePort;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@ -36,35 +42,45 @@
|
||||
*/
|
||||
public final class ServiceInfo {
|
||||
|
||||
private static final ObjectReader READER =
|
||||
new ObjectMapper().readerFor(ServiceInfo.class);
|
||||
private static final ObjectWriter WRITER =
|
||||
new ObjectMapper().writerWithDefaultPrettyPrinter();
|
||||
|
||||
/**
|
||||
* Type of node/service.
|
||||
*/
|
||||
private final NodeType nodeType;
|
||||
private NodeType nodeType;
|
||||
/**
|
||||
* Hostname of the node in which the service is running.
|
||||
*/
|
||||
private final String hostname;
|
||||
private String hostname;
|
||||
|
||||
/**
|
||||
* List of ports the service listens to.
|
||||
*/
|
||||
private final Map<ServicePort.Type, ServicePort> portsMap;
|
||||
private Map<ServicePort.Type, Integer> ports;
|
||||
|
||||
/**
|
||||
* Default constructor for JSON deserialization.
|
||||
*/
|
||||
public ServiceInfo() {}
|
||||
|
||||
/**
|
||||
* Constructs the ServiceInfo for the {@code nodeType}.
|
||||
* @param nodeType type of node/service
|
||||
* @param hostname hostname of the service
|
||||
* @param ports list of ports the service listens to
|
||||
* @param portList list of ports the service listens to
|
||||
*/
|
||||
private ServiceInfo(
|
||||
NodeType nodeType, String hostname, List<ServicePort> ports) {
|
||||
NodeType nodeType, String hostname, List<ServicePort> portList) {
|
||||
Preconditions.checkNotNull(nodeType);
|
||||
Preconditions.checkNotNull(hostname);
|
||||
this.nodeType = nodeType;
|
||||
this.hostname = hostname;
|
||||
this.portsMap = new HashMap<>();
|
||||
for (ServicePort port : ports) {
|
||||
portsMap.put(port.getType(), port);
|
||||
this.ports = new HashMap<>();
|
||||
for (ServicePort port : portList) {
|
||||
ports.put(port.getType(), port.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@ -85,11 +101,11 @@ public String getHostname() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the list of port which the service listens to.
|
||||
* @return List<ServicePort>
|
||||
* Returns ServicePort.Type to port mappings.
|
||||
* @return ports
|
||||
*/
|
||||
public List<ServicePort> getPorts() {
|
||||
return portsMap.values().parallelStream().collect(Collectors.toList());
|
||||
public Map<ServicePort.Type, Integer> getPorts() {
|
||||
return ports;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -99,8 +115,9 @@ public List<ServicePort> getPorts() {
|
||||
* @param type the type of port.
|
||||
* ex: RPC, HTTP, HTTPS, etc..
|
||||
*/
|
||||
@JsonIgnore
|
||||
public int getPort(ServicePort.Type type) {
|
||||
return portsMap.get(type).getValue();
|
||||
return ports.get(type);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -108,12 +125,20 @@ public int getPort(ServicePort.Type type) {
|
||||
*
|
||||
* @return KeySpaceManagerProtocolProtos.ServiceInfo
|
||||
*/
|
||||
@JsonIgnore
|
||||
public KeySpaceManagerProtocolProtos.ServiceInfo getProtobuf() {
|
||||
KeySpaceManagerProtocolProtos.ServiceInfo.Builder builder =
|
||||
KeySpaceManagerProtocolProtos.ServiceInfo.newBuilder();
|
||||
builder.setNodeType(nodeType)
|
||||
.setHostname(hostname)
|
||||
.addAllServicePorts(portsMap.values());
|
||||
.addAllServicePorts(
|
||||
ports.entrySet().stream()
|
||||
.map(
|
||||
entry ->
|
||||
ServicePort.newBuilder()
|
||||
.setType(entry.getKey())
|
||||
.setValue(entry.getValue()).build())
|
||||
.collect(Collectors.toList()));
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@ -122,6 +147,7 @@ public KeySpaceManagerProtocolProtos.ServiceInfo getProtobuf() {
|
||||
*
|
||||
* @return {@link ServiceInfo}
|
||||
*/
|
||||
@JsonIgnore
|
||||
public static ServiceInfo getFromProtobuf(
|
||||
KeySpaceManagerProtocolProtos.ServiceInfo serviceInfo) {
|
||||
return new ServiceInfo(serviceInfo.getNodeType(),
|
||||
@ -129,6 +155,26 @@ public static ServiceInfo getFromProtobuf(
|
||||
serviceInfo.getServicePortsList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a JSON string of this object.
|
||||
*
|
||||
* @return String - json string
|
||||
* @throws IOException
|
||||
*/
|
||||
public String toJsonString() throws IOException {
|
||||
return WRITER.writeValueAsString(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a JSON string into ServiceInfo Object.
|
||||
*
|
||||
* @param jsonString Json String
|
||||
* @return BucketInfo
|
||||
* @throws IOException
|
||||
*/
|
||||
public static BucketInfo parse(String jsonString) throws IOException {
|
||||
return READER.readValue(jsonString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new builder to build {@link ServiceInfo}.
|
||||
@ -145,7 +191,7 @@ public static class Builder {
|
||||
|
||||
private NodeType node;
|
||||
private String host;
|
||||
private List<ServicePort> ports = new ArrayList<>();
|
||||
private List<ServicePort> portList = new ArrayList<>();
|
||||
|
||||
|
||||
/**
|
||||
@ -174,7 +220,7 @@ public Builder setHostname(String hostname) {
|
||||
* @return the builder
|
||||
*/
|
||||
public Builder addServicePort(ServicePort servicePort) {
|
||||
ports.add(servicePort);
|
||||
portList.add(servicePort);
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -184,7 +230,7 @@ public Builder addServicePort(ServicePort servicePort) {
|
||||
* @return {@link ServiceInfo}
|
||||
*/
|
||||
public ServiceInfo build() {
|
||||
return new ServiceInfo(node, host, ports);
|
||||
return new ServiceInfo(node, host, portList);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -173,7 +173,7 @@ private KeySpaceManager(OzoneConfiguration conf) throws IOException {
|
||||
metrics = KSMMetrics.create();
|
||||
keyManager = new KeyManagerImpl(scmBlockClient, metadataManager,
|
||||
configuration);
|
||||
httpServer = new KeySpaceManagerHttpServer(configuration);
|
||||
httpServer = new KeySpaceManagerHttpServer(configuration, this);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -785,6 +785,11 @@ public String getRpcPort() {
|
||||
return "" + ksmRpcAddress.getPort();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public KeySpaceManagerHttpServer getHttpServer() {
|
||||
return httpServer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServiceInfo> getServiceList() throws IOException {
|
||||
// When we implement multi-home this call has to be handled properly.
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.web.OzoneHttpServer;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -28,8 +29,11 @@
|
||||
*/
|
||||
public class KeySpaceManagerHttpServer extends OzoneHttpServer {
|
||||
|
||||
public KeySpaceManagerHttpServer(Configuration conf) throws IOException {
|
||||
public KeySpaceManagerHttpServer(Configuration conf, KeySpaceManager ksm)
|
||||
throws IOException {
|
||||
super(conf, "ksm");
|
||||
addServlet("serviceList", "/serviceList", ServiceListJSONServlet.class);
|
||||
getWebAppContext().setAttribute(OzoneConsts.KSM_CONTEXT_ATTRIBUTE, ksm);
|
||||
}
|
||||
|
||||
@Override protected String getHttpAddressKey() {
|
||||
|
@ -0,0 +1,103 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.ksm;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
|
||||
|
||||
/**
|
||||
* Provides REST access to Ozone Service List.
|
||||
* <p>
|
||||
* This servlet generally will be placed under the /serviceList URL of
|
||||
* KeySpaceManager HttpServer.
|
||||
*
|
||||
* The return format is of JSON and in the form
|
||||
* <p>
|
||||
* <code><pre>
|
||||
* {
|
||||
* "services" : [
|
||||
* {
|
||||
* "NodeType":"KSM",
|
||||
* "Hostname" "$hostname",
|
||||
* "ports" : {
|
||||
* "$PortType" : "$port",
|
||||
* ...
|
||||
* }
|
||||
* }
|
||||
* ]
|
||||
* }
|
||||
* </pre></code>
|
||||
* <p>
|
||||
*
|
||||
*/
|
||||
public class ServiceListJSONServlet extends HttpServlet {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ServiceListJSONServlet.class);
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private KeySpaceManager ksm;
|
||||
|
||||
public void init() throws ServletException {
|
||||
this.ksm = (KeySpaceManager) getServletContext()
|
||||
.getAttribute(OzoneConsts.KSM_CONTEXT_ATTRIBUTE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a GET request for the specified resource.
|
||||
*
|
||||
* @param request
|
||||
* The servlet request we are processing
|
||||
* @param response
|
||||
* The servlet response we are creating
|
||||
*/
|
||||
@Override
|
||||
public void doGet(HttpServletRequest request, HttpServletResponse response) {
|
||||
try {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
|
||||
response.setContentType("application/json; charset=utf8");
|
||||
PrintWriter writer = response.getWriter();
|
||||
try {
|
||||
writer.write(objectMapper.writeValueAsString(ksm.getServiceList()));
|
||||
} finally {
|
||||
if (writer != null) {
|
||||
writer.close();
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error(
|
||||
"Caught an exception while processing ServiceList request", e);
|
||||
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -25,9 +25,11 @@
|
||||
import org.apache.hadoop.http.HttpServer2;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.ozone.client.OzoneClientUtils;
|
||||
import org.eclipse.jetty.webapp.WebAppContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
@ -80,6 +82,25 @@ public OzoneHttpServer(Configuration conf, String name) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a servlet to OzoneHttpServer.
|
||||
* @param servletName The name of the servlet
|
||||
* @param pathSpec The path spec for the servlet
|
||||
* @param clazz The servlet class
|
||||
*/
|
||||
protected void addServlet(String servletName, String pathSpec,
|
||||
Class<? extends HttpServlet> clazz) {
|
||||
httpServer.addServlet(servletName, pathSpec, clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the WebAppContext associated with this HttpServer.
|
||||
* @return WebAppContext
|
||||
*/
|
||||
protected WebAppContext getWebAppContext() {
|
||||
return httpServer.getWebAppContext();
|
||||
}
|
||||
|
||||
protected InetSocketAddress getBindAddress(String bindHostKey,
|
||||
String addressKey, String bindHostDefault, int bindPortdefault) {
|
||||
final Optional<String> bindHost =
|
||||
|
@ -99,7 +99,7 @@ public TestKeySpaceManagerHttpServer(Policy policy) {
|
||||
InetSocketAddress addr = InetSocketAddress.createUnresolved("localhost", 0);
|
||||
KeySpaceManagerHttpServer server = null;
|
||||
try {
|
||||
server = new KeySpaceManagerHttpServer(conf);
|
||||
server = new KeySpaceManagerHttpServer(conf, null);
|
||||
server.start();
|
||||
|
||||
Assert.assertTrue(implies(policy.isHttpEnabled(),
|
||||
|
@ -0,0 +1,113 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.ksm;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import org.apache.hadoop.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.client.OzoneClientUtils;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* This class is to test the REST interface exposed by KeySpaceManager.
|
||||
*/
|
||||
public class TestKeySpaceManagerRestInterface {
|
||||
|
||||
private static MiniOzoneCluster cluster;
|
||||
private static OzoneConfiguration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
conf = new OzoneConfiguration();
|
||||
cluster = new MiniOzoneClassicCluster.Builder(conf)
|
||||
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
|
||||
.setClusterId(UUID.randomUUID().toString())
|
||||
.setScmId(UUID.randomUUID().toString())
|
||||
.build();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
if (cluster != null) {
|
||||
cluster.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetServiceList() throws Exception {
|
||||
KeySpaceManagerHttpServer server =
|
||||
cluster.getKeySpaceManager().getHttpServer();
|
||||
HttpClient client = HttpClients.createDefault();
|
||||
String connectionUri = "http://" +
|
||||
NetUtils.getHostPortString(server.getHttpAddress());
|
||||
HttpGet httpGet = new HttpGet(connectionUri + "/serviceList");
|
||||
HttpResponse response = client.execute(httpGet);
|
||||
String serviceListJson = EntityUtils.toString(response.getEntity());
|
||||
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
TypeReference<List<ServiceInfo>> serviceInfoReference =
|
||||
new TypeReference<List<ServiceInfo>>() {};
|
||||
List<ServiceInfo> serviceInfos = objectMapper.readValue(
|
||||
serviceListJson, serviceInfoReference);
|
||||
Map<OzoneProtos.NodeType, ServiceInfo> serviceMap = new HashMap<>();
|
||||
for (ServiceInfo serviceInfo : serviceInfos) {
|
||||
serviceMap.put(serviceInfo.getNodeType(), serviceInfo);
|
||||
}
|
||||
|
||||
InetSocketAddress ksmAddress =
|
||||
OzoneClientUtils.getKsmAddressForClients(conf);
|
||||
ServiceInfo ksmInfo = serviceMap.get(OzoneProtos.NodeType.KSM);
|
||||
|
||||
Assert.assertEquals(ksmAddress.getHostName(), ksmInfo.getHostname());
|
||||
Assert.assertEquals(ksmAddress.getPort(),
|
||||
ksmInfo.getPort(KeySpaceManagerProtocolProtos.ServicePort.Type.RPC));
|
||||
Assert.assertEquals(server.getHttpAddress().getPort(),
|
||||
ksmInfo.getPort(KeySpaceManagerProtocolProtos.ServicePort.Type.HTTP));
|
||||
|
||||
InetSocketAddress scmAddress =
|
||||
OzoneClientUtils.getScmAddressForClients(conf);
|
||||
ServiceInfo scmInfo = serviceMap.get(OzoneProtos.NodeType.SCM);
|
||||
|
||||
Assert.assertEquals(scmAddress.getHostName(), scmInfo.getHostname());
|
||||
Assert.assertEquals(scmAddress.getPort(),
|
||||
scmInfo.getPort(KeySpaceManagerProtocolProtos.ServicePort.Type.RPC));
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user