HDFS-12868. Ozone: Service Discovery API. Contributed by Nanda Kumar.
This commit is contained in:
parent
fb09d75379
commit
6b03fca37d
@ -124,10 +124,6 @@ public final class OzoneConfigKeys {
|
|||||||
public static final Class<? extends ClientProtocol>
|
public static final Class<? extends ClientProtocol>
|
||||||
OZONE_CLIENT_PROTOCOL_REST = RestClient.class;
|
OZONE_CLIENT_PROTOCOL_REST = RestClient.class;
|
||||||
|
|
||||||
public static final String OZONE_REST_SERVERS = "ozone.rest.servers";
|
|
||||||
public static final String OZONE_REST_CLIENT_PORT = "ozone.rest.client.port";
|
|
||||||
public static final int OZONE_REST_CLIENT_PORT_DEFAULT = 9864;
|
|
||||||
|
|
||||||
// This defines the overall connection limit for the connection pool used in
|
// This defines the overall connection limit for the connection pool used in
|
||||||
// RestClient.
|
// RestClient.
|
||||||
public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX =
|
public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX =
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.ozone.client;
|
package org.apache.hadoop.ozone.client;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.OzoneConfiguration;
|
import org.apache.hadoop.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
|
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
|
||||||
@ -34,6 +35,12 @@
|
|||||||
.OZONE_CLIENT_PROTOCOL_REST;
|
.OZONE_CLIENT_PROTOCOL_REST;
|
||||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||||
.OZONE_CLIENT_PROTOCOL_RPC;
|
.OZONE_CLIENT_PROTOCOL_RPC;
|
||||||
|
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
|
||||||
|
.OZONE_KSM_HTTP_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
|
||||||
|
.OZONE_KSM_HTTP_BIND_PORT_DEFAULT;
|
||||||
|
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_PORT_DEFAULT;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Factory class to create different types of OzoneClients.
|
* Factory class to create different types of OzoneClients.
|
||||||
@ -49,112 +56,240 @@
|
|||||||
*/
|
*/
|
||||||
public final class OzoneClientFactory {
|
public final class OzoneClientFactory {
|
||||||
|
|
||||||
private enum ClientType {
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
RPC, REST
|
OzoneClientFactory.class);
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Private constructor, class is not meant to be initialized.
|
* Private constructor, class is not meant to be initialized.
|
||||||
*/
|
*/
|
||||||
private OzoneClientFactory(){}
|
private OzoneClientFactory(){}
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(
|
|
||||||
OzoneClientFactory.class);
|
|
||||||
|
|
||||||
private static Configuration configuration;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns an OzoneClient which will use protocol defined through
|
* Constructs and return an OzoneClient with default configuration.
|
||||||
* <code>ozone.client.protocol</code> to perform client operations.
|
*
|
||||||
* @return OzoneClient
|
* @return OzoneClient
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static OzoneClient getClient() throws IOException {
|
public static OzoneClient getClient() throws IOException {
|
||||||
return getClient(null);
|
LOG.info("Creating OzoneClient with default configuration.");
|
||||||
|
return getClient(new OzoneConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns an OzoneClient which will use RPC protocol to perform
|
* Constructs and return an OzoneClient based on the configuration object.
|
||||||
* client operations.
|
* Protocol type is decided by <code>ozone.client.protocol</code>.
|
||||||
|
*
|
||||||
|
* @param config
|
||||||
|
* Configuration to be used for OzoneClient creation
|
||||||
|
*
|
||||||
* @return OzoneClient
|
* @return OzoneClient
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static OzoneClient getRpcClient() throws IOException {
|
public static OzoneClient getClient(Configuration config)
|
||||||
return getClient(ClientType.RPC);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns an OzoneClient which will use REST protocol to perform
|
|
||||||
* client operations.
|
|
||||||
* @return OzoneClient
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public static OzoneClient getRestClient() throws IOException {
|
|
||||||
return getClient(ClientType.REST);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns OzoneClient with protocol type set base on ClientType.
|
|
||||||
* @param clientType
|
|
||||||
* @return OzoneClient
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private static OzoneClient getClient(ClientType clientType)
|
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
Preconditions.checkNotNull(config);
|
||||||
|
Class<? extends ClientProtocol> clazz = (Class<? extends ClientProtocol>)
|
||||||
|
config.getClass(OZONE_CLIENT_PROTOCOL, OZONE_CLIENT_PROTOCOL_RPC);
|
||||||
|
return getClient(getClientProtocol(clazz, config), config);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an OzoneClient which will use RPC protocol.
|
||||||
|
*
|
||||||
|
* @param ksmHost
|
||||||
|
* hostname of KeySpaceManager to connect.
|
||||||
|
*
|
||||||
|
* @return OzoneClient
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static OzoneClient getRpcClient(String ksmHost)
|
||||||
|
throws IOException {
|
||||||
|
return getRpcClient(ksmHost, OZONE_KSM_PORT_DEFAULT,
|
||||||
|
new OzoneConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an OzoneClient which will use RPC protocol.
|
||||||
|
*
|
||||||
|
* @param ksmHost
|
||||||
|
* hostname of KeySpaceManager to connect.
|
||||||
|
*
|
||||||
|
* @param ksmRpcPort
|
||||||
|
* RPC port of KeySpaceManager.
|
||||||
|
*
|
||||||
|
* @return OzoneClient
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static OzoneClient getRpcClient(String ksmHost, Integer ksmRpcPort)
|
||||||
|
throws IOException {
|
||||||
|
return getRpcClient(ksmHost, ksmRpcPort, new OzoneConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an OzoneClient which will use RPC protocol.
|
||||||
|
*
|
||||||
|
* @param ksmHost
|
||||||
|
* hostname of KeySpaceManager to connect.
|
||||||
|
*
|
||||||
|
* @param ksmRpcPort
|
||||||
|
* RPC port of KeySpaceManager.
|
||||||
|
*
|
||||||
|
* @param config
|
||||||
|
* Configuration to be used for OzoneClient creation
|
||||||
|
*
|
||||||
|
* @return OzoneClient
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static OzoneClient getRpcClient(String ksmHost, Integer ksmRpcPort,
|
||||||
|
Configuration config)
|
||||||
|
throws IOException {
|
||||||
|
Preconditions.checkNotNull(ksmHost);
|
||||||
|
Preconditions.checkNotNull(ksmRpcPort);
|
||||||
|
Preconditions.checkNotNull(config);
|
||||||
|
config.set(OZONE_KSM_ADDRESS_KEY, ksmHost + ":" + ksmRpcPort);
|
||||||
|
return getRpcClient(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an OzoneClient which will use RPC protocol.
|
||||||
|
*
|
||||||
|
* @param config
|
||||||
|
* used for OzoneClient creation
|
||||||
|
*
|
||||||
|
* @return OzoneClient
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static OzoneClient getRpcClient(Configuration config)
|
||||||
|
throws IOException {
|
||||||
|
Preconditions.checkNotNull(config);
|
||||||
|
return getClient(getClientProtocol(OZONE_CLIENT_PROTOCOL_RPC, config),
|
||||||
|
config);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an OzoneClient which will use REST protocol.
|
||||||
|
*
|
||||||
|
* @param ksmHost
|
||||||
|
* hostname of KeySpaceManager to connect.
|
||||||
|
*
|
||||||
|
* @return OzoneClient
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static OzoneClient getRestClient(String ksmHost)
|
||||||
|
throws IOException {
|
||||||
|
return getRestClient(ksmHost, OZONE_KSM_HTTP_BIND_PORT_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an OzoneClient which will use REST protocol.
|
||||||
|
*
|
||||||
|
* @param ksmHost
|
||||||
|
* hostname of KeySpaceManager to connect.
|
||||||
|
*
|
||||||
|
* @param ksmHttpPort
|
||||||
|
* HTTP port of KeySpaceManager.
|
||||||
|
*
|
||||||
|
* @return OzoneClient
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static OzoneClient getRestClient(String ksmHost, Integer ksmHttpPort)
|
||||||
|
throws IOException {
|
||||||
|
return getRestClient(ksmHost, ksmHttpPort, new OzoneConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an OzoneClient which will use REST protocol.
|
||||||
|
*
|
||||||
|
* @param ksmHost
|
||||||
|
* hostname of KeySpaceManager to connect.
|
||||||
|
*
|
||||||
|
* @param ksmHttpPort
|
||||||
|
* HTTP port of KeySpaceManager.
|
||||||
|
*
|
||||||
|
* @param config
|
||||||
|
* Configuration to be used for OzoneClient creation
|
||||||
|
*
|
||||||
|
* @return OzoneClient
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static OzoneClient getRestClient(String ksmHost, Integer ksmHttpPort,
|
||||||
|
Configuration config)
|
||||||
|
throws IOException {
|
||||||
|
Preconditions.checkNotNull(ksmHost);
|
||||||
|
Preconditions.checkNotNull(ksmHttpPort);
|
||||||
|
Preconditions.checkNotNull(config);
|
||||||
|
config.set(OZONE_KSM_HTTP_ADDRESS_KEY, ksmHost + ":" + ksmHttpPort);
|
||||||
|
return getRestClient(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an OzoneClient which will use REST protocol.
|
||||||
|
*
|
||||||
|
* @param config
|
||||||
|
* Configuration to be used for OzoneClient creation
|
||||||
|
*
|
||||||
|
* @return OzoneClient
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static OzoneClient getRestClient(Configuration config)
|
||||||
|
throws IOException {
|
||||||
|
Preconditions.checkNotNull(config);
|
||||||
|
return getClient(getClientProtocol(OZONE_CLIENT_PROTOCOL_REST, config),
|
||||||
|
config);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates OzoneClient with the given ClientProtocol and Configuration.
|
||||||
|
*
|
||||||
|
* @param clientProtocol
|
||||||
|
* Protocol to be used by the OzoneClient
|
||||||
|
*
|
||||||
|
* @param config
|
||||||
|
* Configuration to be used for OzoneClient creation
|
||||||
|
*/
|
||||||
|
private static OzoneClient getClient(ClientProtocol clientProtocol,
|
||||||
|
Configuration config) {
|
||||||
OzoneClientInvocationHandler clientHandler =
|
OzoneClientInvocationHandler clientHandler =
|
||||||
new OzoneClientInvocationHandler(getProtocolClass(clientType));
|
new OzoneClientInvocationHandler(clientProtocol);
|
||||||
ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance(
|
ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance(
|
||||||
OzoneClientInvocationHandler.class.getClassLoader(),
|
OzoneClientInvocationHandler.class.getClassLoader(),
|
||||||
new Class<?>[]{ClientProtocol.class}, clientHandler);
|
new Class<?>[]{ClientProtocol.class}, clientHandler);
|
||||||
return new OzoneClient(configuration, proxy);
|
return new OzoneClient(config, proxy);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the configuration if it's already set, else creates a new
|
* Returns an instance of Protocol class.
|
||||||
* {@link OzoneConfiguration} and returns it.
|
*
|
||||||
|
* @param protocolClass
|
||||||
|
* Class object of the ClientProtocol.
|
||||||
|
*
|
||||||
|
* @param config
|
||||||
|
* Configuration used to initialize ClientProtocol.
|
||||||
|
*
|
||||||
|
* @return ClientProtocol
|
||||||
*
|
*
|
||||||
* @return Configuration
|
|
||||||
*/
|
|
||||||
private static synchronized Configuration getConfiguration() {
|
|
||||||
if(configuration == null) {
|
|
||||||
setConfiguration(new OzoneConfiguration());
|
|
||||||
}
|
|
||||||
return configuration;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Based on the clientType, client protocol instance is created.
|
|
||||||
* If clientType is null, <code>ozone.client.protocol</code> property
|
|
||||||
* will be used to decide the protocol to be used.
|
|
||||||
* @param clientType type of client protocol to be created
|
|
||||||
* @return ClientProtocol implementation
|
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private static ClientProtocol getProtocolClass(ClientType clientType)
|
private static ClientProtocol getClientProtocol(
|
||||||
|
Class<? extends ClientProtocol> protocolClass, Configuration config)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Class<? extends ClientProtocol> protocolClass = null;
|
|
||||||
if(clientType != null) {
|
|
||||||
switch (clientType) {
|
|
||||||
case RPC:
|
|
||||||
protocolClass = OZONE_CLIENT_PROTOCOL_RPC;
|
|
||||||
break;
|
|
||||||
case REST:
|
|
||||||
protocolClass = OZONE_CLIENT_PROTOCOL_REST;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
LOG.warn("Invalid ClientProtocol type, falling back to RPC.");
|
|
||||||
protocolClass = OZONE_CLIENT_PROTOCOL_RPC;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
protocolClass = (Class<ClientProtocol>)
|
|
||||||
getConfiguration().getClass(
|
|
||||||
OZONE_CLIENT_PROTOCOL, OZONE_CLIENT_PROTOCOL_RPC);
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
|
LOG.info("Using {} as client protocol.",
|
||||||
|
protocolClass.getCanonicalName());
|
||||||
Constructor<? extends ClientProtocol> ctor =
|
Constructor<? extends ClientProtocol> ctor =
|
||||||
protocolClass.getConstructor(Configuration.class);
|
protocolClass.getConstructor(Configuration.class);
|
||||||
return ctor.newInstance(getConfiguration());
|
return ctor.newInstance(config);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
final String message = "Couldn't create protocol " + protocolClass;
|
final String message = "Couldn't create protocol " + protocolClass;
|
||||||
if (e.getCause() instanceof IOException) {
|
if (e.getCause() instanceof IOException) {
|
||||||
@ -165,13 +300,4 @@ private static ClientProtocol getProtocolClass(ClientType clientType)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets the configuration, which will be used while creating OzoneClient.
|
|
||||||
*
|
|
||||||
* @param conf
|
|
||||||
*/
|
|
||||||
public static void setConfiguration(Configuration conf) {
|
|
||||||
configuration = conf;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,36 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.client.rest;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default selector randomly picks one of the REST Server from the list.
|
||||||
|
*/
|
||||||
|
public class DefaultRestServerSelector implements RestServerSelector {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServiceInfo getRestServer(List<ServiceInfo> restServices) {
|
||||||
|
return restServices.get(
|
||||||
|
new Random().nextInt(restServices.size()));
|
||||||
|
}
|
||||||
|
}
|
@ -18,9 +18,12 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.ozone.client.rest;
|
package org.apache.hadoop.ozone.client.rest;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.ozone.OzoneAcl;
|
import org.apache.hadoop.ozone.OzoneAcl;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
@ -36,14 +39,16 @@
|
|||||||
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||||
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
|
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import org.apache.hadoop.ozone.client.rest.headers.Header;
|
import org.apache.hadoop.ozone.client.rest.headers.Header;
|
||||||
import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
|
import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
|
||||||
import org.apache.hadoop.ozone.client.rest.response.KeyInfo;
|
import org.apache.hadoop.ozone.client.rest.response.KeyInfo;
|
||||||
import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
|
import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
|
||||||
import org.apache.hadoop.ozone.client.rpc.RpcClient;
|
import org.apache.hadoop.ozone.client.rpc.RpcClient;
|
||||||
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
|
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
|
||||||
|
import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.ServicePort;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.http.HttpEntity;
|
import org.apache.http.HttpEntity;
|
||||||
@ -64,19 +69,20 @@
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.PipedInputStream;
|
import java.io.PipedInputStream;
|
||||||
import java.io.PipedOutputStream;
|
import java.io.PipedOutputStream;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.FutureTask;
|
import java.util.concurrent.FutureTask;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static java.net.HttpURLConnection.HTTP_CREATED;
|
import static java.net.HttpURLConnection.HTTP_CREATED;
|
||||||
import static java.net.HttpURLConnection.HTTP_OK;
|
import static java.net.HttpURLConnection.HTTP_OK;
|
||||||
@ -108,13 +114,7 @@ public RestClient(Configuration conf)
|
|||||||
try {
|
try {
|
||||||
Preconditions.checkNotNull(conf);
|
Preconditions.checkNotNull(conf);
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
int port = conf.getInt(OzoneConfigKeys.OZONE_REST_CLIENT_PORT,
|
|
||||||
OzoneConfigKeys.OZONE_REST_CLIENT_PORT_DEFAULT);
|
|
||||||
URIBuilder uriBuilder = new URIBuilder()
|
|
||||||
.setScheme("http")
|
|
||||||
.setHost(getOzoneRestHandlerHost())
|
|
||||||
.setPort(port);
|
|
||||||
this.ozoneRestUri = uriBuilder.build();
|
|
||||||
long socketTimeout = conf.getTimeDuration(
|
long socketTimeout = conf.getTimeDuration(
|
||||||
OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT,
|
OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT,
|
||||||
OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT,
|
OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT,
|
||||||
@ -129,7 +129,8 @@ public RestClient(Configuration conf)
|
|||||||
|
|
||||||
int maxConnectionPerRoute = conf.getInt(
|
int maxConnectionPerRoute = conf.getInt(
|
||||||
OzoneConfigKeys.OZONE_REST_CLIENT_HTTP_CONNECTION_PER_ROUTE_MAX,
|
OzoneConfigKeys.OZONE_REST_CLIENT_HTTP_CONNECTION_PER_ROUTE_MAX,
|
||||||
OzoneConfigKeys.OZONE_REST_CLIENT_HTTP_CONNECTION_PER_ROUTE_MAX_DEFAULT
|
OzoneConfigKeys
|
||||||
|
.OZONE_REST_CLIENT_HTTP_CONNECTION_PER_ROUTE_MAX_DEFAULT
|
||||||
);
|
);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -152,26 +153,55 @@ public RestClient(Configuration conf)
|
|||||||
this.ugi = UserGroupInformation.getCurrentUser();
|
this.ugi = UserGroupInformation.getCurrentUser();
|
||||||
this.userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS,
|
this.userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS,
|
||||||
KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT);
|
KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT);
|
||||||
|
|
||||||
|
// TODO: Add new configuration parameter to configure RestServerSelector.
|
||||||
|
RestServerSelector defaultSelector = new DefaultRestServerSelector();
|
||||||
|
InetSocketAddress restServer = getOzoneRestServerAddress(defaultSelector);
|
||||||
|
URIBuilder uriBuilder = new URIBuilder()
|
||||||
|
.setScheme("http")
|
||||||
|
.setHost(restServer.getHostName())
|
||||||
|
.setPort(restServer.getPort());
|
||||||
|
this.ozoneRestUri = uriBuilder.build();
|
||||||
|
|
||||||
} catch (URISyntaxException e) {
|
} catch (URISyntaxException e) {
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private InetSocketAddress getOzoneRestServerAddress(
|
||||||
* Returns the REST server host to connect to.
|
RestServerSelector selector) throws IOException {
|
||||||
*
|
String httpAddress = conf.get(KSMConfigKeys.OZONE_KSM_HTTP_ADDRESS_KEY);
|
||||||
* @return hostname of REST server
|
|
||||||
*/
|
if (httpAddress == null) {
|
||||||
private String getOzoneRestHandlerHost() {
|
throw new IllegalArgumentException(
|
||||||
List<String> servers = new ArrayList<>(conf.getTrimmedStringCollection(
|
KSMConfigKeys.OZONE_KSM_HTTP_ADDRESS_KEY + " must be defined. See" +
|
||||||
OzoneConfigKeys.OZONE_REST_SERVERS));
|
|
||||||
if(servers.isEmpty()) {
|
|
||||||
throw new IllegalArgumentException(OzoneConfigKeys.OZONE_REST_SERVERS +
|
|
||||||
" must be defined. See" +
|
|
||||||
" https://wiki.apache.org/hadoop/Ozone#Configuration for" +
|
" https://wiki.apache.org/hadoop/Ozone#Configuration for" +
|
||||||
" details on configuring Ozone.");
|
" details on configuring Ozone.");
|
||||||
}
|
}
|
||||||
return servers.get(new Random().nextInt(servers.size()));
|
|
||||||
|
HttpGet httpGet = new HttpGet("http://" + httpAddress + "/serviceList");
|
||||||
|
HttpEntity entity = executeHttpRequest(httpGet);
|
||||||
|
try {
|
||||||
|
String serviceListJson = EntityUtils.toString(entity);
|
||||||
|
|
||||||
|
ObjectMapper objectMapper = new ObjectMapper();
|
||||||
|
TypeReference<List<ServiceInfo>> serviceInfoReference =
|
||||||
|
new TypeReference<List<ServiceInfo>>() {
|
||||||
|
};
|
||||||
|
List<ServiceInfo> services = objectMapper.readValue(
|
||||||
|
serviceListJson, serviceInfoReference);
|
||||||
|
|
||||||
|
List<ServiceInfo> dataNodeInfos = services.stream().filter(
|
||||||
|
a -> a.getNodeType().equals(OzoneProtos.NodeType.DATANODE))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
ServiceInfo restServer = selector.getRestServer(dataNodeInfos);
|
||||||
|
|
||||||
|
return NetUtils.createSocketAddr(restServer.getHostname() + ":" +
|
||||||
|
restServer.getPort(ServicePort.Type.HTTP));
|
||||||
|
} finally {
|
||||||
|
EntityUtils.consume(entity);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -0,0 +1,40 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.client.rest;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The implementor of this interface should select the REST server which will
|
||||||
|
* be used by the client to connect to Ozone Cluster, given list of
|
||||||
|
* REST Servers/DataNodes (DataNodes are the ones which hosts REST Service).
|
||||||
|
*/
|
||||||
|
public interface RestServerSelector {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the REST Service which will be used by the client for connection.
|
||||||
|
*
|
||||||
|
* @param restServices list of available REST servers
|
||||||
|
* @return ServiceInfo
|
||||||
|
*/
|
||||||
|
ServiceInfo getRestServer(List<ServiceInfo> restServices);
|
||||||
|
|
||||||
|
}
|
@ -46,6 +46,7 @@
|
|||||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
|
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
|
||||||
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
|
||||||
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
|
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
|
||||||
|
import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
|
||||||
import org.apache.hadoop.ozone.ksm.protocolPB
|
import org.apache.hadoop.ozone.ksm.protocolPB
|
||||||
.KeySpaceManagerProtocolClientSideTranslatorPB;
|
.KeySpaceManagerProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.ozone.ksm.protocolPB
|
import org.apache.hadoop.ozone.ksm.protocolPB
|
||||||
@ -54,6 +55,8 @@
|
|||||||
import org.apache.hadoop.ozone.OzoneAcl;
|
import org.apache.hadoop.ozone.OzoneAcl;
|
||||||
import org.apache.hadoop.ozone.client.OzoneClientUtils;
|
import org.apache.hadoop.ozone.client.OzoneClientUtils;
|
||||||
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
|
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.ServicePort;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||||
import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
|
import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
|
||||||
import org.apache.hadoop.scm.ScmConfigKeys;
|
import org.apache.hadoop.scm.ScmConfigKeys;
|
||||||
@ -123,8 +126,7 @@ public RpcClient(Configuration conf) throws IOException {
|
|||||||
|
|
||||||
long scmVersion =
|
long scmVersion =
|
||||||
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
|
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
|
||||||
InetSocketAddress scmAddress =
|
InetSocketAddress scmAddress = getScmAddressForClient();
|
||||||
OzoneClientUtils.getScmAddressForClients(conf);
|
|
||||||
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
|
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
|
||||||
ProtobufRpcEngine.class);
|
ProtobufRpcEngine.class);
|
||||||
this.storageContainerLocationClient =
|
this.storageContainerLocationClient =
|
||||||
@ -150,6 +152,15 @@ public RpcClient(Configuration conf) throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private InetSocketAddress getScmAddressForClient() throws IOException {
|
||||||
|
List<ServiceInfo> services = keySpaceManagerClient.getServiceList();
|
||||||
|
ServiceInfo scmInfo = services.stream().filter(
|
||||||
|
a -> a.getNodeType().equals(OzoneProtos.NodeType.SCM))
|
||||||
|
.collect(Collectors.toList()).get(0);
|
||||||
|
return NetUtils.createSocketAddr(scmInfo.getHostname()+ ":" +
|
||||||
|
scmInfo.getPort(ServicePort.Type.RPC));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void createVolume(String volumeName) throws IOException {
|
public void createVolume(String volumeName) throws IOException {
|
||||||
createVolume(volumeName, VolumeArgs.newBuilder().build());
|
createVolume(volumeName, VolumeArgs.newBuilder().build());
|
||||||
|
@ -187,8 +187,7 @@ enum CoronaOps {
|
|||||||
numberOfVolumesCreated = new AtomicInteger();
|
numberOfVolumesCreated = new AtomicInteger();
|
||||||
numberOfBucketsCreated = new AtomicInteger();
|
numberOfBucketsCreated = new AtomicInteger();
|
||||||
numberOfKeysAdded = new AtomicLong();
|
numberOfKeysAdded = new AtomicLong();
|
||||||
OzoneClientFactory.setConfiguration(conf);
|
ozoneClient = OzoneClientFactory.getClient(conf);
|
||||||
ozoneClient = OzoneClientFactory.getClient();
|
|
||||||
objectStore = ozoneClient.getObjectStore();
|
objectStore = ozoneClient.getObjectStore();
|
||||||
for (CoronaOps ops : CoronaOps.values()) {
|
for (CoronaOps ops : CoronaOps.values()) {
|
||||||
histograms.add(ops.ordinal(), new Histogram(new UniformReservoir()));
|
histograms.add(ops.ordinal(), new Histogram(new UniformReservoir()));
|
||||||
|
@ -20,7 +20,6 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.OzoneConfiguration;
|
import org.apache.hadoop.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
||||||
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
|
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
|
||||||
import org.apache.hadoop.ozone.OzoneAcl;
|
import org.apache.hadoop.ozone.OzoneAcl;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
@ -46,6 +45,7 @@
|
|||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
@ -77,15 +77,11 @@ public static void init() throws Exception {
|
|||||||
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
|
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
|
||||||
cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1)
|
cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1)
|
||||||
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
|
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
|
||||||
DataNode datanode = cluster.getDataNodes().get(0);
|
|
||||||
conf.set(OzoneConfigKeys.OZONE_CLIENT_PROTOCOL,
|
InetSocketAddress ksmHttpAddress = cluster.getKeySpaceManager()
|
||||||
"org.apache.hadoop.ozone.client.rest.RestClient");
|
.getHttpServer().getHttpAddress();
|
||||||
conf.set(OzoneConfigKeys.OZONE_REST_SERVERS,
|
ozClient = OzoneClientFactory.getRestClient(ksmHttpAddress.getHostName(),
|
||||||
datanode.getDatanodeHostname());
|
ksmHttpAddress.getPort(), conf);
|
||||||
conf.set(OzoneConfigKeys.OZONE_REST_CLIENT_PORT,
|
|
||||||
Integer.toString(datanode.getInfoPort()));
|
|
||||||
OzoneClientFactory.setConfiguration(conf);
|
|
||||||
ozClient = OzoneClientFactory.getClient();
|
|
||||||
store = ozClient.getObjectStore();
|
store = ozClient.getObjectStore();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,10 +93,7 @@ public static void init() throws Exception {
|
|||||||
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1);
|
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1);
|
||||||
cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(10)
|
cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(10)
|
||||||
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
|
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
|
||||||
conf.set("ozone.client.protocol",
|
ozClient = OzoneClientFactory.getRpcClient(conf);
|
||||||
"org.apache.hadoop.ozone.client.rpc.RpcClient");
|
|
||||||
OzoneClientFactory.setConfiguration(conf);
|
|
||||||
ozClient = OzoneClientFactory.getClient();
|
|
||||||
store = ozClient.getObjectStore();
|
store = ozClient.getObjectStore();
|
||||||
storageContainerLocationClient =
|
storageContainerLocationClient =
|
||||||
cluster.createStorageContainerLocationClient();
|
cluster.createStorageContainerLocationClient();
|
||||||
|
@ -59,8 +59,7 @@ public void test() throws IOException, TimeoutException, InterruptedException,
|
|||||||
cluster.waitOzoneReady();
|
cluster.waitOzoneReady();
|
||||||
|
|
||||||
//the easiest way to create an open container is creating a key
|
//the easiest way to create an open container is creating a key
|
||||||
OzoneClientFactory.setConfiguration(conf);
|
OzoneClient client = OzoneClientFactory.getClient(conf);
|
||||||
OzoneClient client = OzoneClientFactory.getClient();
|
|
||||||
ObjectStore objectStore = client.getObjectStore();
|
ObjectStore objectStore = client.getObjectStore();
|
||||||
objectStore.createVolume("test");
|
objectStore.createVolume("test");
|
||||||
objectStore.getVolume("test").createBucket("test");
|
objectStore.getVolume("test").createBucket("test");
|
||||||
|
Loading…
Reference in New Issue
Block a user