HDFS-12060. Ozone: OzoneClient: Add list calls in RpcClient. Contributed by Nandakumar.

This commit is contained in:
Xiaoyu Yao 2017-10-02 11:50:59 -07:00 committed by Owen O'Malley
parent 7846636a5d
commit 05246e2b32
11 changed files with 686 additions and 79 deletions

View File

@ -120,6 +120,13 @@ public final class OzoneConfigKeys {
"ozone.client.connection.timeout.ms"; "ozone.client.connection.timeout.ms";
public static final int OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT = 5000; public static final int OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT = 5000;
/**
* Configuration property to configure the cache size of client list calls.
*/
public static final String OZONE_CLIENT_LIST_CACHE_SIZE =
"ozone.client.list.cache";
public static final int OZONE_CLIENT_LIST_CACHE_SIZE_DEFAULT = 1000;
/** /**
* Configuration properties for Ozone Block Deleting Service. * Configuration properties for Ozone Block Deleting Service.
*/ */

View File

@ -19,9 +19,15 @@
package org.apache.hadoop.ozone.client; package org.apache.hadoop.ozone.client;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol; import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
/** /**
* ObjectStore class is responsible for the client operations that can be * ObjectStore class is responsible for the client operations that can be
@ -36,11 +42,18 @@ public class ObjectStore {
private final ClientProtocol proxy; private final ClientProtocol proxy;
/** /**
* Creates an instance of ObjectStore with the proxy. * Cache size to be used for listVolume calls.
* @param proxy ClientProtocol proxy
*/ */
public ObjectStore(ClientProtocol proxy) { private int listCacheSize;
/**
* Creates an instance of ObjectStore.
* @param conf Configuration object.
* @param proxy ClientProtocol proxy.
*/
public ObjectStore(Configuration conf, ClientProtocol proxy) {
this.proxy = proxy; this.proxy = proxy;
this.listCacheSize = OzoneClientUtils.getListCacheSize(conf);
} }
/** /**
@ -78,10 +91,41 @@ public OzoneVolume getVolume(String volumeName) throws IOException {
Preconditions.checkNotNull(volumeName); Preconditions.checkNotNull(volumeName);
OzoneClientUtils.verifyResourceName(volumeName); OzoneClientUtils.verifyResourceName(volumeName);
OzoneVolume volume = proxy.getVolumeDetails(volumeName); OzoneVolume volume = proxy.getVolumeDetails(volumeName);
volume.setClientProxy(proxy);
return volume; return volume;
} }
/**
* Returns Iterator to iterate over all the volumes in object store.
* The result can be restricted using volume prefix, will return all
* volumes if volume prefix is null.
*
* @param volumePrefix Volume prefix to match
* @return {@code Iterator<OzoneVolume>}
*/
public Iterator<OzoneVolume> listVolumes(String volumePrefix)
throws IOException {
return new VolumeIterator(volumePrefix);
}
/**
* Returns Iterator to iterate over the List of volumes owned by a specific
* user. The result can be restricted using volume prefix, will return all
* volumes if volume prefix is null. If user is null, returns the volume of
* current user.
*
* @param user User Name
* @param volumePrefix Volume prefix to match
* @return {@code Iterator<OzoneVolume>}
*/
public Iterator<OzoneVolume> listVolumes(String user, String volumePrefix)
throws IOException {
if(Strings.isNullOrEmpty(user)) {
user = UserGroupInformation.getCurrentUser().getShortUserName();
}
return new VolumeIterator(user, volumePrefix);
}
/** /**
* Deletes the volume. * Deletes the volume.
* @param volumeName Name of the volume. * @param volumeName Name of the volume.
@ -92,4 +136,74 @@ public void deleteVolume(String volumeName) throws IOException {
OzoneClientUtils.verifyResourceName(volumeName); OzoneClientUtils.verifyResourceName(volumeName);
proxy.deleteVolume(volumeName); proxy.deleteVolume(volumeName);
} }
/**
* An Iterator to iterate over {@link OzoneVolume} list.
*/
private class VolumeIterator implements Iterator<OzoneVolume> {
private String user = null;
private String volPrefix = null;
private Iterator<OzoneVolume> currentIterator;
private OzoneVolume currentValue;
/**
* Creates an Iterator to iterate over all volumes in the cluster,
* which matches the volume prefix.
* @param volPrefix prefix to match
*/
VolumeIterator(String volPrefix) {
this(null, volPrefix);
}
/**
* Creates an Iterator to iterate over all volumes of the user,
* which matches volume prefix.
* @param user user name
* @param volPrefix volume prefix to match
*/
VolumeIterator(String user, String volPrefix) {
this.user = user;
this.volPrefix = volPrefix;
this.currentValue = null;
this.currentIterator = getNextListOfVolumes(null).iterator();
}
@Override
public boolean hasNext() {
if(!currentIterator.hasNext()) {
currentIterator = getNextListOfVolumes(
currentValue.getName()).iterator();
}
return currentIterator.hasNext();
}
@Override
public OzoneVolume next() {
if(hasNext()) {
currentValue = currentIterator.next();
return currentValue;
}
throw new NoSuchElementException();
}
/**
* Returns the next set of volume list using proxy.
* @param prevVolume previous volume, this will be excluded from the result
* @return {@code List<OzoneVolume>}
*/
private List<OzoneVolume> getNextListOfVolumes(String prevVolume) {
try {
//if user is null, we do list of all volumes.
if(user != null) {
return proxy.listVolumes(user, volPrefix, prevVolume, listCacheSize);
}
return proxy.listVolumes(volPrefix, prevVolume, listCacheSize);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
} }

View File

@ -20,6 +20,7 @@
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
@ -27,13 +28,20 @@
import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneAcl;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException;
/** /**
* A class that encapsulates OzoneBucket. * A class that encapsulates OzoneBucket.
*/ */
public class OzoneBucket { public class OzoneBucket {
/**
* The proxy used for connecting to the cluster and perform
* client operations.
*/
private final ClientProtocol proxy;
/** /**
* Name of the volume in which the bucket belongs to. * Name of the volume in which the bucket belongs to.
*/ */
@ -59,35 +67,31 @@ public class OzoneBucket {
private Boolean versioning; private Boolean versioning;
/** /**
* The proxy used for connecting to the cluster and perform * Cache size to be used for listKey calls.
* client operations.
*/ */
private ClientProtocol proxy; private int listCacheSize;
/** /**
* Constructs OzoneBucket instance. * Constructs OzoneBucket instance.
* @param conf Configuration object.
* @param proxy ClientProtocol proxy.
* @param volumeName Name of the volume the bucket belongs to. * @param volumeName Name of the volume the bucket belongs to.
* @param bucketName Name of the bucket. * @param bucketName Name of the bucket.
* @param acls ACLs associated with the bucket. * @param acls ACLs associated with the bucket.
* @param storageType StorageType of the bucket. * @param storageType StorageType of the bucket.
* @param versioning versioning status of the bucket. * @param versioning versioning status of the bucket.
*/ */
public OzoneBucket(String volumeName, String bucketName, public OzoneBucket(Configuration conf, ClientProtocol proxy,
String volumeName, String bucketName,
List<OzoneAcl> acls, StorageType storageType, List<OzoneAcl> acls, StorageType storageType,
Boolean versioning) { Boolean versioning) {
this.proxy = proxy;
this.volumeName = volumeName; this.volumeName = volumeName;
this.name = bucketName; this.name = bucketName;
this.acls = acls; this.acls = acls;
this.storageType = storageType; this.storageType = storageType;
this.versioning = versioning; this.versioning = versioning;
} this.listCacheSize = OzoneClientUtils.getListCacheSize(conf);
/**
* Sets the proxy using which client operations are performed.
* @param clientProxy
*/
public void setClientProxy(ClientProtocol clientProxy) {
this.proxy = clientProxy;
} }
/** /**
@ -221,6 +225,18 @@ public OzoneKey getKey(String key) throws IOException {
return proxy.getKeyDetails(volumeName, name, key); return proxy.getKeyDetails(volumeName, name, key);
} }
/**
* Returns Iterator to iterate over all keys in the bucket.
* The result can be restricted using key prefix, will return all
* keys if key prefix is null.
*
* @param keyPrefix Bucket prefix to match
* @return {@code Iterator<OzoneKey>}
*/
public Iterator<OzoneKey> listKeys(String keyPrefix) {
return new KeyIterator(keyPrefix);
}
/** /**
* Deletes key from the bucket. * Deletes key from the bucket.
* @param key Name of the key to be deleted. * @param key Name of the key to be deleted.
@ -231,4 +247,59 @@ public void deleteKey(String key) throws IOException {
Preconditions.checkNotNull(key); Preconditions.checkNotNull(key);
proxy.deleteKey(volumeName, name, key); proxy.deleteKey(volumeName, name, key);
} }
/**
* An Iterator to iterate over {@link OzoneKey} list.
*/
private class KeyIterator implements Iterator<OzoneKey> {
private String keyPrefix = null;
private Iterator<OzoneKey> currentIterator;
private OzoneKey currentValue;
/**
* Creates an Iterator to iterate over all keys in the bucket,
* which matches volume prefix.
* @param keyPrefix
*/
KeyIterator(String keyPrefix) {
this.keyPrefix = keyPrefix;
this.currentValue = null;
this.currentIterator = getNextListOfKeys(null).iterator();
}
@Override
public boolean hasNext() {
if(!currentIterator.hasNext()) {
currentIterator = getNextListOfKeys(
currentValue.getName()).iterator();
}
return currentIterator.hasNext();
}
@Override
public OzoneKey next() {
if(hasNext()) {
currentValue = currentIterator.next();
return currentValue;
}
throw new NoSuchElementException();
}
/**
* Gets the next set of key list using proxy.
* @param prevKey
* @return {@code List<OzoneVolume>}
*/
private List<OzoneKey> getNextListOfKeys(String prevKey) {
try {
return proxy.listKeys(volumeName, name, keyPrefix, prevKey,
listCacheSize);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.client; package org.apache.hadoop.ozone.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol; import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import java.io.Closeable; import java.io.Closeable;
@ -75,11 +76,12 @@ public class OzoneClient implements Closeable {
/** /**
* Creates a new OzoneClient object, generally constructed * Creates a new OzoneClient object, generally constructed
* using {@link OzoneClientFactory}. * using {@link OzoneClientFactory}.
* @param proxy * @param conf Configuration object
* @param proxy ClientProtocol proxy instance
*/ */
public OzoneClient(ClientProtocol proxy) { public OzoneClient(Configuration conf, ClientProtocol proxy) {
this.proxy = proxy; this.proxy = proxy;
this.objectStore = new ObjectStore(this.proxy); this.objectStore = new ObjectStore(conf, this.proxy);
} }
/** /**

View File

@ -24,7 +24,6 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
@ -107,7 +106,7 @@ private static OzoneClient getClient(ClientType clientType)
ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance( ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance(
OzoneClientInvocationHandler.class.getClassLoader(), OzoneClientInvocationHandler.class.getClassLoader(),
new Class<?>[]{ClientProtocol.class}, clientHandler); new Class<?>[]{ClientProtocol.class}, clientHandler);
return new OzoneClient(proxy); return new OzoneClient(configuration, proxy);
} }
/** /**

View File

@ -54,11 +54,12 @@
.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_JSCSI_PORT_DEFAULT; .DFS_CBLOCK_JSCSI_PORT_DEFAULT;
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY; .OZONE_KSM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
.OZONE_KSM_BIND_HOST_DEFAULT; .OZONE_KSM_BIND_HOST_DEFAULT;
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_PORT_DEFAULT; import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
.OZONE_KSM_PORT_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_DEADNODE_INTERVAL_DEFAULT; .OZONE_SCM_DEADNODE_INTERVAL_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys import static org.apache.hadoop.scm.ScmConfigKeys
@ -439,6 +440,16 @@ public static Optional<Integer> getHostPort(String value) {
} }
} }
/**
* Returns the cache value to be used for list calls.
* @param conf Configuration object
* @return list cache size
*/
public static int getListCacheSize(Configuration conf) {
return conf.getInt(OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE,
OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE_DEFAULT);
}
/** /**
* Retrieve the port number, trying the supplied config keys in order. * Retrieve the port number, trying the supplied config keys in order.
* Each config value may be absent, or if present in the format * Each config value may be absent, or if present in the format

View File

@ -19,17 +19,26 @@
package org.apache.hadoop.ozone.client; package org.apache.hadoop.ozone.client;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol; import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException;
/** /**
* A class that encapsulates OzoneVolume. * A class that encapsulates OzoneVolume.
*/ */
public class OzoneVolume { public class OzoneVolume {
/**
* The proxy used for connecting to the cluster and perform
* client operations.
*/
private final ClientProtocol proxy;
/** /**
* Name of the Volume. * Name of the Volume.
*/ */
@ -52,22 +61,28 @@ public class OzoneVolume {
*/ */
private List<OzoneAcl> acls; private List<OzoneAcl> acls;
private ClientProtocol proxy; private int listCacheSize;
/** /**
* Constructs OzoneVolume. * Constructs OzoneVolume instance.
* @param conf Configuration object.
* @param proxy ClientProtocol proxy.
* @param name Name of the volume.
* @param admin Volume admin.
* @param owner Volume owner.
* @param quotaInBytes Volume quota in bytes.
* @param acls ACLs associated with the volume.
*/ */
public OzoneVolume(String name, String admin, String owner, public OzoneVolume(Configuration conf, ClientProtocol proxy, String name,
long quotaInBytes, List<OzoneAcl> acls) { String admin, String owner, long quotaInBytes,
List<OzoneAcl> acls) {
this.proxy = proxy;
this.name = name; this.name = name;
this.admin = admin; this.admin = admin;
this.owner = owner; this.owner = owner;
this.quotaInBytes = quotaInBytes; this.quotaInBytes = quotaInBytes;
this.acls = acls; this.acls = acls;
} this.listCacheSize = OzoneClientUtils.getListCacheSize(conf);
public void setClientProxy(ClientProtocol clientProxy) {
this.proxy = clientProxy;
} }
/** /**
@ -178,10 +193,21 @@ public OzoneBucket getBucket(String bucketName) throws IOException {
Preconditions.checkNotNull(bucketName); Preconditions.checkNotNull(bucketName);
OzoneClientUtils.verifyResourceName(bucketName); OzoneClientUtils.verifyResourceName(bucketName);
OzoneBucket bucket = proxy.getBucketDetails(name, bucketName); OzoneBucket bucket = proxy.getBucketDetails(name, bucketName);
bucket.setClientProxy(proxy);
return bucket; return bucket;
} }
/**
* Returns Iterator to iterate over all buckets in the volume.
* The result can be restricted using bucket prefix, will return all
* buckets if bucket prefix is null.
*
* @param bucketPrefix Bucket prefix to match
* @return {@code Iterator<OzoneBucket>}
*/
public Iterator<OzoneBucket> listBuckets(String bucketPrefix) {
return new BucketIterator(bucketPrefix);
}
/** /**
* Deletes the Bucket from this Volume. * Deletes the Bucket from this Volume.
* @param bucketName Name of the Bucket * @param bucketName Name of the Bucket
@ -193,4 +219,59 @@ public void deleteBucket(String bucketName) throws IOException {
OzoneClientUtils.verifyResourceName(bucketName); OzoneClientUtils.verifyResourceName(bucketName);
proxy.deleteBucket(name, bucketName); proxy.deleteBucket(name, bucketName);
} }
/**
* An Iterator to iterate over {@link OzoneBucket} list.
*/
private class BucketIterator implements Iterator<OzoneBucket> {
private String bucketPrefix = null;
private Iterator<OzoneBucket> currentIterator;
private OzoneBucket currentValue;
/**
* Creates an Iterator to iterate over all buckets in the volume,
* which matches volume prefix.
* @param bucketPrefix
*/
BucketIterator(String bucketPrefix) {
this.bucketPrefix = bucketPrefix;
this.currentValue = null;
this.currentIterator = getNextListOfBuckets(null).iterator();
}
@Override
public boolean hasNext() {
if(!currentIterator.hasNext()) {
currentIterator = getNextListOfBuckets(
currentValue.getName()).iterator();
}
return currentIterator.hasNext();
}
@Override
public OzoneBucket next() {
if(hasNext()) {
currentValue = currentIterator.next();
return currentValue;
}
throw new NoSuchElementException();
}
/**
* Gets the next set of bucket list using proxy.
* @param prevBucket
* @return {@code List<OzoneVolume>}
*/
private List<OzoneBucket> getNextListOfBuckets(String prevBucket) {
try {
return proxy.listBuckets(name, bucketPrefix, prevBucket, listCacheSize);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
} }

View File

@ -30,7 +30,6 @@
import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import java.util.List; import java.util.List;
/** /**
@ -106,22 +105,37 @@ boolean checkVolumeAccess(String volumeName, OzoneAcl acl)
void deleteVolume(String volumeName) throws IOException; void deleteVolume(String volumeName) throws IOException;
/** /**
* Returns the List of Volumes owned by current user. * Lists all volumes in the cluster that matches the volumePrefix,
* size of the returned list depends on maxListResult. If volume prefix
* is null, returns all the volumes. The caller has to make multiple calls
* to read all volumes.
*
* @param volumePrefix Volume prefix to match * @param volumePrefix Volume prefix to match
* @return {@link OzoneVolume} Iterator * @param prevVolume Starting point of the list, this volume is excluded
* @param maxListResult Max number of volumes to return.
* @return {@code List<OzoneVolume>}
* @throws IOException * @throws IOException
*/ */
Iterator<OzoneVolume> listVolumes(String volumePrefix) List<OzoneVolume> listVolumes(String volumePrefix, String prevVolume,
int maxListResult)
throws IOException; throws IOException;
/** /**
* Returns the List of Volumes owned by the specific user. * Lists all volumes in the cluster that are owned by the specified
* @param volumePrefix Volume prefix to match * user and matches the volumePrefix, size of the returned list depends on
* maxListResult. If the user is null, return volumes owned by current user.
* If volume prefix is null, returns all the volumes. The caller has to make
* multiple calls to read all volumes.
*
* @param user User Name * @param user User Name
* @return {@link OzoneVolume} Iterator * @param volumePrefix Volume prefix to match
* @param prevVolume Starting point of the list, this volume is excluded
* @param maxListResult Max number of volumes to return.
* @return {@code List<OzoneVolume>}
* @throws IOException * @throws IOException
*/ */
Iterator<OzoneVolume> listVolumes(String volumePrefix, String user) List<OzoneVolume> listVolumes(String user, String volumePrefix,
String prevVolume, int maxListResult)
throws IOException; throws IOException;
/** /**
@ -219,13 +233,18 @@ OzoneBucket getBucketDetails(String volumeName, String bucketName)
throws IOException; throws IOException;
/** /**
* Returns the List of Buckets in the Volume. * Returns the List of Buckets in the Volume that matches the bucketPrefix,
* size of the returned list depends on maxListResult. The caller has to make
* multiple calls to read all volumes.
* @param volumeName Name of the Volume * @param volumeName Name of the Volume
* @param bucketPrefix Bucket prefix to match * @param bucketPrefix Bucket prefix to match
* @return {@link OzoneBucket} Iterator * @param prevBucket Starting point of the list, this bucket is excluded
* @param maxListResult Max number of buckets to return.
* @return {@code List<OzoneBucket>}
* @throws IOException * @throws IOException
*/ */
Iterator<OzoneBucket> listBuckets(String volumeName, String bucketPrefix) List<OzoneBucket> listBuckets(String volumeName, String bucketPrefix,
String prevBucket, int maxListResult)
throws IOException; throws IOException;
/** /**
@ -265,14 +284,19 @@ void deleteKey(String volumeName, String bucketName, String keyName)
/** /**
* Returns list of {@link OzoneKey} in {Volume/Bucket}. * Returns list of Keys in {Volume/Bucket} that matches the keyPrefix,
* size of the returned list depends on maxListResult. The caller has
* to make multiple calls to read all keys.
* @param volumeName Name of the Volume * @param volumeName Name of the Volume
* @param bucketName Name of the Bucket * @param bucketName Name of the Bucket
* @return {@link OzoneKey} Iterator * @param keyPrefix Bucket prefix to match
* @param prevKey Starting point of the list, this key is excluded
* @param maxListResult Max number of buckets to return.
* @return {@code List<OzoneKey>}
* @throws IOException * @throws IOException
*/ */
Iterator<OzoneKey> listKeys(String volumeName, String bucketName, List<OzoneKey> listKeys(String volumeName, String bucketName,
String keyPrefix) String keyPrefix, String prevKey, int maxListResult)
throws IOException; throws IOException;

View File

@ -32,7 +32,6 @@
import org.apache.hadoop.ozone.client.protocol.ClientProtocol; import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import java.util.List; import java.util.List;
/** /**
@ -93,13 +92,15 @@ public void deleteVolume(String volumeName) throws IOException {
} }
@Override @Override
public Iterator<OzoneVolume> listVolumes(String volumePrefix) public List<OzoneVolume> listVolumes(String volumePrefix, String prevKey,
int maxListResult)
throws IOException { throws IOException {
throw new UnsupportedOperationException("Not yet implemented."); throw new UnsupportedOperationException("Not yet implemented.");
} }
@Override @Override
public Iterator<OzoneVolume> listVolumes(String volumePrefix, String user) public List<OzoneVolume> listVolumes(String user, String volumePrefix,
String prevKey, int maxListResult)
throws IOException { throws IOException {
throw new UnsupportedOperationException("Not yet implemented."); throw new UnsupportedOperationException("Not yet implemented.");
} }
@ -164,8 +165,9 @@ public OzoneBucket getBucketDetails(String volumeName, String bucketName)
} }
@Override @Override
public Iterator<OzoneBucket> listBuckets( public List<OzoneBucket> listBuckets(String volumeName, String bucketPrefix,
String volumeName, String bucketPrefix) throws IOException { String prevBucket, int maxListResult)
throws IOException {
throw new UnsupportedOperationException("Not yet implemented."); throw new UnsupportedOperationException("Not yet implemented.");
} }
@ -190,8 +192,9 @@ public void deleteKey(String volumeName, String bucketName, String keyName)
} }
@Override @Override
public Iterator<OzoneKey> listKeys( public List<OzoneKey> listKeys(String volumeName, String bucketName,
String volumeName, String bucketName, String keyPrefix) String keyPrefix, String prevKey,
int maxListResult)
throws IOException { throws IOException {
throw new UnsupportedOperationException("Not yet implemented."); throw new UnsupportedOperationException("Not yet implemented.");
} }

View File

@ -66,7 +66,6 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -81,14 +80,13 @@ public class RpcClient implements ClientProtocol {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(RpcClient.class); LoggerFactory.getLogger(RpcClient.class);
private final Configuration conf;
private final StorageContainerLocationProtocolClientSideTranslatorPB private final StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient; storageContainerLocationClient;
private final KeySpaceManagerProtocolClientSideTranslatorPB private final KeySpaceManagerProtocolClientSideTranslatorPB
keySpaceManagerClient; keySpaceManagerClient;
private final XceiverClientManager xceiverClientManager; private final XceiverClientManager xceiverClientManager;
private final int chunkSize; private final int chunkSize;
private final UserGroupInformation ugi; private final UserGroupInformation ugi;
private final OzoneAcl.OzoneACLRights userRights; private final OzoneAcl.OzoneACLRights userRights;
private final OzoneAcl.OzoneACLRights groupRights; private final OzoneAcl.OzoneACLRights groupRights;
@ -100,12 +98,12 @@ public class RpcClient implements ClientProtocol {
*/ */
public RpcClient(Configuration conf) throws IOException { public RpcClient(Configuration conf) throws IOException {
Preconditions.checkNotNull(conf); Preconditions.checkNotNull(conf);
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser(); this.ugi = UserGroupInformation.getCurrentUser();
this.userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS, this.userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS,
KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT); KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT);
this.groupRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS, this.groupRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS,
KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS_DEFAULT); KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS_DEFAULT);
long ksmVersion = long ksmVersion =
RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class); RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class);
InetSocketAddress ksmAddress = OzoneClientUtils InetSocketAddress ksmAddress = OzoneClientUtils
@ -219,10 +217,15 @@ public OzoneVolume getVolumeDetails(String volumeName)
throws IOException { throws IOException {
Preconditions.checkNotNull(volumeName); Preconditions.checkNotNull(volumeName);
KsmVolumeArgs volume = keySpaceManagerClient.getVolumeInfo(volumeName); KsmVolumeArgs volume = keySpaceManagerClient.getVolumeInfo(volumeName);
return new OzoneVolume(volume.getVolume(), volume.getAdminName(), return new OzoneVolume(
volume.getOwnerName(), volume.getQuotaInBytes(), conf,
this,
volume.getVolume(),
volume.getAdminName(),
volume.getOwnerName(),
volume.getQuotaInBytes(),
volume.getAclMap().ozoneAclGetProtobuf().stream(). volume.getAclMap().ozoneAclGetProtobuf().stream().
map(KSMPBHelper::convertOzoneAcl).collect(Collectors.toList())); map(KSMPBHelper::convertOzoneAcl).collect(Collectors.toList()));
} }
@Override @Override
@ -238,15 +241,41 @@ public void deleteVolume(String volumeName) throws IOException {
} }
@Override @Override
public Iterator<OzoneVolume> listVolumes(String volumePrefix) public List<OzoneVolume> listVolumes(String volumePrefix, String prevVolume,
int maxListResult)
throws IOException { throws IOException {
throw new UnsupportedOperationException("Not yet implemented."); List<KsmVolumeArgs> volumes = keySpaceManagerClient.listAllVolumes(
volumePrefix, prevVolume, maxListResult);
return volumes.stream().map(volume -> new OzoneVolume(
conf,
this,
volume.getVolume(),
volume.getAdminName(),
volume.getOwnerName(),
volume.getQuotaInBytes(),
volume.getAclMap().ozoneAclGetProtobuf().stream().
map(KSMPBHelper::convertOzoneAcl).collect(Collectors.toList())))
.collect(Collectors.toList());
} }
@Override @Override
public Iterator<OzoneVolume> listVolumes(String volumePrefix, String user) public List<OzoneVolume> listVolumes(String user, String volumePrefix,
String prevVolume, int maxListResult)
throws IOException { throws IOException {
throw new UnsupportedOperationException("Not yet implemented."); List<KsmVolumeArgs> volumes = keySpaceManagerClient.listVolumeByUser(
user, volumePrefix, prevVolume, maxListResult);
return volumes.stream().map(volume -> new OzoneVolume(
conf,
this,
volume.getVolume(),
volume.getAdminName(),
volume.getOwnerName(),
volume.getQuotaInBytes(),
volume.getAclMap().ozoneAclGetProtobuf().stream().
map(KSMPBHelper::convertOzoneAcl).collect(Collectors.toList())))
.collect(Collectors.toList());
} }
@Override @Override
@ -371,17 +400,32 @@ public OzoneBucket getBucketDetails(
Preconditions.checkNotNull(bucketName); Preconditions.checkNotNull(bucketName);
KsmBucketInfo bucketArgs = KsmBucketInfo bucketArgs =
keySpaceManagerClient.getBucketInfo(volumeName, bucketName); keySpaceManagerClient.getBucketInfo(volumeName, bucketName);
return new OzoneBucket(bucketArgs.getVolumeName(), return new OzoneBucket(
bucketArgs.getBucketName(), conf,
bucketArgs.getAcls(), this,
bucketArgs.getStorageType(), bucketArgs.getVolumeName(),
bucketArgs.getIsVersionEnabled()); bucketArgs.getBucketName(),
bucketArgs.getAcls(),
bucketArgs.getStorageType(),
bucketArgs.getIsVersionEnabled());
} }
@Override @Override
public Iterator<OzoneBucket> listBuckets( public List<OzoneBucket> listBuckets(String volumeName, String bucketPrefix,
String volumeName, String bucketPrefix) throws IOException { String prevBucket, int maxListResult)
throw new UnsupportedOperationException("Not yet implemented."); throws IOException {
List<KsmBucketInfo> buckets = keySpaceManagerClient.listBuckets(
volumeName, prevBucket, bucketPrefix, maxListResult);
return buckets.stream().map(bucket -> new OzoneBucket(
conf,
this,
bucket.getVolumeName(),
bucket.getBucketName(),
bucket.getAcls(),
bucket.getStorageType(),
bucket.getIsVersionEnabled()))
.collect(Collectors.toList());
} }
@Override @Override
@ -441,10 +485,19 @@ public void deleteKey(
} }
@Override @Override
public Iterator<OzoneKey> listKeys( public List<OzoneKey> listKeys(String volumeName, String bucketName,
String volumeName, String bucketName, String keyPrefix) String keyPrefix, String prevKey,
int maxListResult)
throws IOException { throws IOException {
throw new UnsupportedOperationException("Not yet implemented."); List<KsmKeyInfo> keys = keySpaceManagerClient.listKeys(
volumeName, bucketName, prevKey, keyPrefix, maxListResult);
return keys.stream().map(key -> new OzoneKey(
key.getVolumeName(),
key.getBucketName(),
key.getKeyName(),
key.getDataSize()))
.collect(Collectors.toList());
} }
@Override @Override

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.client.rpc; package org.apache.hadoop.ozone.client.rpc;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneAcl;
@ -45,6 +46,7 @@
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
@ -61,7 +63,7 @@ public class TestOzoneRpcClient {
private static ObjectStore store = null; private static ObjectStore store = null;
/** /**
* Create a MiniDFSCluster for testing. * Create a MiniOzoneCluster for testing.
* <p> * <p>
* Ozone is made active by setting OZONE_ENABLED = true and * Ozone is made active by setting OZONE_ENABLED = true and
* OZONE_HANDLER_TYPE_KEY = "distributed" * OZONE_HANDLER_TYPE_KEY = "distributed"
@ -403,8 +405,248 @@ public void testDeleteKey()
bucket.getKey(keyName); bucket.getKey(keyName);
} }
@Test
public void listVolumeTest() throws IOException, OzoneException {
String volBase = "vol-" + RandomStringUtils.randomNumeric(3);
//Create 10 volume vol-<random>-a-0-<random> to vol-<random>-a-9-<random>
String volBaseNameA = volBase + "-a-";
for(int i = 0; i < 10; i++) {
store.createVolume(
volBaseNameA + i + "-" + RandomStringUtils.randomNumeric(5));
}
//Create 10 volume vol-<random>-b-0-<random> to vol-<random>-b-9-<random>
String volBaseNameB = volBase + "-b-";
for(int i = 0; i < 10; i++) {
store.createVolume(
volBaseNameB + i + "-" + RandomStringUtils.randomNumeric(5));
}
Iterator<OzoneVolume> volIterator = store.listVolumes(volBase);
int totalVolumeCount = 0;
while(volIterator.hasNext()) {
volIterator.next();
totalVolumeCount++;
}
Assert.assertEquals(20, totalVolumeCount);
Iterator<OzoneVolume> volAIterator = store.listVolumes(volBaseNameA);
for(int i = 0; i < 10; i++) {
Assert.assertTrue(volAIterator.next().getName()
.startsWith(volBaseNameA + i + "-"));
}
Assert.assertFalse(volAIterator.hasNext());
Iterator<OzoneVolume> volBIterator = store.listVolumes(volBaseNameB);
for(int i = 0; i < 10; i++) {
Assert.assertTrue(volBIterator.next().getName()
.startsWith(volBaseNameB + i + "-"));
}
Assert.assertFalse(volBIterator.hasNext());
Iterator<OzoneVolume> iter = store.listVolumes(volBaseNameA + "1-");
Assert.assertTrue(iter.next().getName().startsWith(volBaseNameA + "1-"));
Assert.assertFalse(iter.hasNext());
}
@Test
public void listBucketTest()
throws IOException, OzoneException {
String volumeA = "vol-a-" + RandomStringUtils.randomNumeric(5);
String volumeB = "vol-b-" + RandomStringUtils.randomNumeric(5);
store.createVolume(volumeA);
store.createVolume(volumeB);
OzoneVolume volA = store.getVolume(volumeA);
OzoneVolume volB = store.getVolume(volumeB);
//Create 10 buckets in vol-a-<random> and 10 in vol-b-<random>
String bucketBaseNameA = "bucket-a-";
for(int i = 0; i < 10; i++) {
volA.createBucket(
bucketBaseNameA + i + "-" + RandomStringUtils.randomNumeric(5));
volB.createBucket(
bucketBaseNameA + i + "-" + RandomStringUtils.randomNumeric(5));
}
//Create 10 buckets in vol-a-<random> and 10 in vol-b-<random>
String bucketBaseNameB = "bucket-b-";
for(int i = 0; i < 10; i++) {
volA.createBucket(
bucketBaseNameB + i + "-" + RandomStringUtils.randomNumeric(5));
volB.createBucket(
bucketBaseNameB + i + "-" + RandomStringUtils.randomNumeric(5));
}
Iterator<OzoneBucket> volABucketIter =
volA.listBuckets("bucket-");
int volABucketCount = 0;
while(volABucketIter.hasNext()) {
volABucketIter.next();
volABucketCount++;
}
Assert.assertEquals(20, volABucketCount);
Iterator<OzoneBucket> volBBucketIter =
volA.listBuckets("bucket-");
int volBBucketCount = 0;
while(volBBucketIter.hasNext()) {
volBBucketIter.next();
volBBucketCount++;
}
Assert.assertEquals(20, volBBucketCount);
Iterator<OzoneBucket> volABucketAIter =
volA.listBuckets("bucket-a-");
int volABucketACount = 0;
while(volABucketAIter.hasNext()) {
volABucketAIter.next();
volABucketACount++;
}
Assert.assertEquals(10, volABucketACount);
Iterator<OzoneBucket> volBBucketBIter =
volA.listBuckets("bucket-b-");
int volBBucketBCount = 0;
while(volBBucketBIter.hasNext()) {
volBBucketBIter.next();
volBBucketBCount++;
}
Assert.assertEquals(10, volBBucketBCount);
Iterator<OzoneBucket> volABucketBIter = volA.listBuckets("bucket-b-");
for(int i = 0; i < 10; i++) {
Assert.assertTrue(volABucketBIter.next().getName()
.startsWith(bucketBaseNameB + i + "-"));
}
Assert.assertFalse(volABucketBIter.hasNext());
Iterator<OzoneBucket> volBBucketAIter = volB.listBuckets("bucket-a-");
for(int i = 0; i < 10; i++) {
Assert.assertTrue(volBBucketAIter.next().getName()
.startsWith(bucketBaseNameA + i + "-"));
}
Assert.assertFalse(volBBucketAIter.hasNext());
}
@Test
public void listKeyTest()
throws IOException, OzoneException {
String volumeA = "vol-a-" + RandomStringUtils.randomNumeric(5);
String volumeB = "vol-b-" + RandomStringUtils.randomNumeric(5);
String bucketA = "buc-a-" + RandomStringUtils.randomNumeric(5);
String bucketB = "buc-b-" + RandomStringUtils.randomNumeric(5);
store.createVolume(volumeA);
store.createVolume(volumeB);
OzoneVolume volA = store.getVolume(volumeA);
OzoneVolume volB = store.getVolume(volumeB);
volA.createBucket(bucketA);
volA.createBucket(bucketB);
volB.createBucket(bucketA);
volB.createBucket(bucketB);
OzoneBucket volAbucketA = volA.getBucket(bucketA);
OzoneBucket volAbucketB = volA.getBucket(bucketB);
OzoneBucket volBbucketA = volB.getBucket(bucketA);
OzoneBucket volBbucketB = volB.getBucket(bucketB);
/*
Create 10 keys in vol-a-<random>/buc-a-<random>,
vol-a-<random>/buc-b-<random>, vol-b-<random>/buc-a-<random> and
vol-b-<random>/buc-b-<random>
*/
String keyBaseA = "key-a-";
for (int i = 0; i < 10; i++) {
byte[] value = RandomStringUtils.randomAscii(10240).getBytes();
OzoneOutputStream one = volAbucketA.createKey(
keyBaseA + i + "-" + RandomStringUtils.randomNumeric(5),
value.length);
one.write(value);
one.close();
OzoneOutputStream two = volAbucketB.createKey(
keyBaseA + i + "-" + RandomStringUtils.randomNumeric(5),
value.length);
two.write(value);
two.close();
OzoneOutputStream three = volBbucketA.createKey(
keyBaseA + i + "-" + RandomStringUtils.randomNumeric(5),
value.length);
three.write(value);
three.close();
OzoneOutputStream four = volBbucketB.createKey(
keyBaseA + i + "-" + RandomStringUtils.randomNumeric(5),
value.length);
four.write(value);
four.close();
}
/*
Create 10 keys in vol-a-<random>/buc-a-<random>,
vol-a-<random>/buc-b-<random>, vol-b-<random>/buc-a-<random> and
vol-b-<random>/buc-b-<random>
*/
String keyBaseB = "key-b-";
for (int i = 0; i < 10; i++) {
byte[] value = RandomStringUtils.randomAscii(10240).getBytes();
OzoneOutputStream one = volAbucketA.createKey(
keyBaseB + i + "-" + RandomStringUtils.randomNumeric(5),
value.length);
one.write(value);
one.close();
OzoneOutputStream two = volAbucketB.createKey(
keyBaseB + i + "-" + RandomStringUtils.randomNumeric(5),
value.length);
two.write(value);
two.close();
OzoneOutputStream three = volBbucketA.createKey(
keyBaseB + i + "-" + RandomStringUtils.randomNumeric(5),
value.length);
three.write(value);
three.close();
OzoneOutputStream four = volBbucketB.createKey(
keyBaseB + i + "-" + RandomStringUtils.randomNumeric(5),
value.length);
four.write(value);
four.close();
}
Iterator<OzoneKey> volABucketAIter =
volAbucketA.listKeys("key-");
int volABucketAKeyCount = 0;
while(volABucketAIter.hasNext()) {
volABucketAIter.next();
volABucketAKeyCount++;
}
Assert.assertEquals(20, volABucketAKeyCount);
Iterator<OzoneKey> volABucketBIter =
volAbucketB.listKeys("key-");
int volABucketBKeyCount = 0;
while(volABucketBIter.hasNext()) {
volABucketBIter.next();
volABucketBKeyCount++;
}
Assert.assertEquals(20, volABucketBKeyCount);
Iterator<OzoneKey> volBBucketAIter =
volBbucketA.listKeys("key-");
int volBBucketAKeyCount = 0;
while(volBBucketAIter.hasNext()) {
volBBucketAIter.next();
volBBucketAKeyCount++;
}
Assert.assertEquals(20, volBBucketAKeyCount);
Iterator<OzoneKey> volBBucketBIter =
volBbucketB.listKeys("key-");
int volBBucketBKeyCount = 0;
while(volBBucketBIter.hasNext()) {
volBBucketBIter.next();
volBBucketBKeyCount++;
}
Assert.assertEquals(20, volBBucketBKeyCount);
Iterator<OzoneKey> volABucketAKeyAIter =
volAbucketA.listKeys("key-a-");
int volABucketAKeyACount = 0;
while(volABucketAKeyAIter.hasNext()) {
volABucketAKeyAIter.next();
volABucketAKeyACount++;
}
Assert.assertEquals(10, volABucketAKeyACount);
Iterator<OzoneKey> volABucketAKeyBIter =
volAbucketA.listKeys("key-b-");
for(int i = 0; i < 10; i++) {
Assert.assertTrue(volABucketAKeyBIter.next().getName()
.startsWith("key-b-" + i + "-"));
}
Assert.assertFalse(volABucketBIter.hasNext());
}
/** /**
* Close OzoneClient and shutdown MiniDFSCluster. * Close OzoneClient and shutdown MiniOzoneCluster.
*/ */
@AfterClass @AfterClass
public static void shutdown() throws IOException { public static void shutdown() throws IOException {