HDFS-11773. Ozone: KSM : add listVolumes. Contributed by Weiwei Yang.

This commit is contained in:
Anu Engineer 2017-06-30 23:44:15 -07:00 committed by Owen O'Malley
parent b49c165d28
commit b7b8511bae
13 changed files with 400 additions and 24 deletions

View File

@ -91,7 +91,7 @@ boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
* @throws IOException
*/
List<KsmVolumeArgs> listVolumeByUser(String userName, String prefix, String
prevKey, long maxKeys) throws IOException;
prevKey, int maxKeys) throws IOException;
/**
* Lists volume all volumes in the cluster.
@ -102,7 +102,7 @@ List<KsmVolumeArgs> listVolumeByUser(String userName, String prefix, String
* @throws IOException
*/
List<KsmVolumeArgs> listAllVolumes(String prefix, String
prevKey, long maxKeys) throws IOException;
prevKey, int maxKeys) throws IOException;
/**
* Creates a bucket.

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.ksm.protocolPB;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
@ -86,6 +88,10 @@
.KeySpaceManagerProtocolProtos.Status;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos
.ListVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos
.ListVolumeResponse;
import java.io.Closeable;
import java.io.IOException;
@ -301,9 +307,19 @@ public void deleteVolume(String volume) throws IOException {
*/
@Override
public List<KsmVolumeArgs> listVolumeByUser(String userName, String prefix,
String prevKey, long maxKeys)
String prevKey, int maxKeys)
throws IOException {
return null;
ListVolumeRequest.Builder builder = ListVolumeRequest.newBuilder();
if (!Strings.isNullOrEmpty(prefix)) {
builder.setPrefix(prefix);
}
if (!Strings.isNullOrEmpty(prevKey)) {
builder.setPrevKey(prevKey);
}
builder.setMaxKeys(maxKeys);
builder.setUserName(userName);
builder.setScope(ListVolumeRequest.Scope.VOLUMES_BY_USER);
return listVolume(builder.build());
}
/**
@ -317,9 +333,43 @@ public List<KsmVolumeArgs> listVolumeByUser(String userName, String prefix,
* @throws IOException
*/
@Override
public List<KsmVolumeArgs> listAllVolumes(String prefix, String prevKey, long
maxKeys) throws IOException {
return null;
public List<KsmVolumeArgs> listAllVolumes(String prefix, String prevKey,
int maxKeys) throws IOException {
ListVolumeRequest.Builder builder = ListVolumeRequest.newBuilder();
if (!Strings.isNullOrEmpty(prefix)) {
builder.setPrefix(prefix);
}
if (!Strings.isNullOrEmpty(prevKey)) {
builder.setPrevKey(prevKey);
}
builder.setMaxKeys(maxKeys);
builder.setScope(ListVolumeRequest.Scope.VOLUMES_BY_CLUSTER);
return listVolume(builder.build());
}
private List<KsmVolumeArgs> listVolume(ListVolumeRequest request)
throws IOException {
final ListVolumeResponse resp;
try {
resp = rpcProxy.listVolumes(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
if (resp.getStatus() != Status.OK) {
throw new IOException("List volume failed, error: "
+ resp.getStatus());
}
List<KsmVolumeArgs> result = Lists.newArrayList();
for (VolumeInfo volInfo : resp.getVolumeInfoList()) {
KsmVolumeArgs volArgs = KsmVolumeArgs.getFromProtobuf(volInfo);
result.add(volArgs);
}
return resp.getVolumeInfoList().stream()
.map(item -> KsmVolumeArgs.getFromProtobuf(item))
.collect(Collectors.toList());
}
/**

View File

@ -116,6 +116,11 @@ public enum Versioning {NOT_DEFINED, ENABLED, DISABLED}
*/
public static final int MAX_LISTKEYS_SIZE = 1024;
/**
* Max number of volumes returned per list volumes operation.
*/
public static final int MAX_LISTVOLUMES_SIZE = 1024;
private OzoneConsts() {
// Never Constructed
}

View File

@ -144,19 +144,13 @@ message ListVolumeRequest {
VOLUMES_BY_CLUSTER = 3; // All volumes in the cluster
}
required Scope scope = 1;
required string volumeName = 2;
optional string userName = 3;
optional string prefix = 4;
optional string prevKey = 5;
optional uint64 maxKeys = 6;
optional string userName = 2;
optional string prefix = 3;
optional string prevKey = 4;
optional uint32 maxKeys = 5;
}
message ListVolumeResponse {
enum Status {
OK = 1;
ACCESS_DENIED = 2;
REQUIRED_ARG_MISSING = 3;
}
required Status status = 1;
repeated VolumeInfo volumeInfo = 2;
}

View File

@ -42,6 +42,7 @@ public class KSMMetrics {
private @Metric MutableCounterLong numKeyDeletes;
private @Metric MutableCounterLong numBucketLists;
private @Metric MutableCounterLong numKeyLists;
private @Metric MutableCounterLong numVolumeLists;
// Failure Metrics
private @Metric MutableCounterLong numVolumeCreateFails;
@ -58,6 +59,7 @@ public class KSMMetrics {
private @Metric MutableCounterLong numKeyDeleteFails;
private @Metric MutableCounterLong numBucketListFails;
private @Metric MutableCounterLong numKeyListFails;
private @Metric MutableCounterLong numVolumeListFails;
public KSMMetrics() {
}
@ -113,6 +115,10 @@ public void incNumKeyLists() {
numKeyLists.incr();
}
public void incNumVolumeLists() {
numVolumeLists.incr();
}
public void incNumVolumeCreateFails() {
numVolumeCreateFails.incr();
}
@ -181,6 +187,10 @@ public void incNumKeyListFails() {
numKeyListFails.incr();
}
public void incNumVolumeListFails() {
numVolumeListFails.incr();
}
@VisibleForTesting
public long getNumVolumeCreates() {
return numVolumeCreates.value();
@ -231,6 +241,11 @@ public long getNumBucketLists() {
return numBucketLists.value();
}
@VisibleForTesting
public long getNumVolumeLists() {
return numVolumeLists.value();
}
@VisibleForTesting
public long getNumKeyLists() {
return numKeyLists.value();
@ -320,4 +335,9 @@ public long getNumBucketListFails() {
public long getNumKeyListFails() {
return numKeyListFails.value();
}
@VisibleForTesting
public long getNumVolumeListFails() {
return numVolumeListFails.value();
}
}

View File

@ -350,8 +350,14 @@ public void deleteVolume(String volume) throws IOException {
*/
@Override
public List<KsmVolumeArgs> listVolumeByUser(String userName, String prefix,
String prevKey, long maxKeys) throws IOException {
return null;
String prevKey, int maxKeys) throws IOException {
try {
metrics.incNumVolumeLists();
return volumeManager.listVolumes(userName, prefix, prevKey, maxKeys);
} catch (Exception ex) {
metrics.incNumVolumeListFails();
throw ex;
}
}
/**
@ -365,9 +371,15 @@ public List<KsmVolumeArgs> listVolumeByUser(String userName, String prefix,
* @throws IOException
*/
@Override
public List<KsmVolumeArgs> listAllVolumes(String prefix, String prevKey, long
public List<KsmVolumeArgs> listAllVolumes(String prefix, String prevKey, int
maxKeys) throws IOException {
return null;
try {
metrics.incNumVolumeLists();
return volumeManager.listVolumes(null, prefix, prevKey, maxKeys);
} catch (Exception ex) {
metrics.incNumVolumeListFails();
throw ex;
}
}
/**
@ -523,5 +535,4 @@ public void deleteBucket(String volume, String bucket) throws IOException {
throw ex;
}
}
}

View File

@ -18,6 +18,7 @@
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
import java.io.IOException;
import java.util.List;
@ -182,4 +183,22 @@ List<KsmBucketInfo> listBuckets(String volumeName, String startBucket,
List<KsmKeyInfo> listKeys(String volumeName,
String bucketName, String startKey, String keyPrefix, int maxKeys)
throws IOException;
/**
* Returns a list of volumes owned by a given user; if user is null,
* returns all volumes.
*
* @param userName
* volume owner
* @param prefix
* the volume prefix used to filter the listing result.
* @param startKey
* the start volume name determines where to start listing from.
* @param maxKeys
* the maximum number of volumes to return.
* @return a list of {@link KsmVolumeArgs}
* @throws IOException
*/
List<KsmVolumeArgs> listVolumes(String userName, String prefix,
String startKey, int maxKeys) throws IOException;
}

View File

@ -17,15 +17,20 @@
package org.apache.hadoop.ozone.ksm;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.BucketInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.LevelDBKeyFilters.LevelDBKeyFilter;
@ -351,4 +356,91 @@ public List<KsmKeyInfo> listKeys(String volumeName, String bucketName,
}
return result;
}
@Override
public List<KsmVolumeArgs> listVolumes(String userName,
String prefix, String startKey, int maxKeys) throws IOException {
List<KsmVolumeArgs> result = Lists.newArrayList();
VolumeList volumes;
if (Strings.isNullOrEmpty(userName)) {
volumes = getAllVolumes();
} else {
volumes = getVolumesByUser(userName);
}
if (volumes == null || volumes.getVolumeNamesCount() == 0) {
return result;
}
boolean startKeyFound = Strings.isNullOrEmpty(startKey);
for (String volumeName : volumes.getVolumeNamesList()) {
if (!Strings.isNullOrEmpty(prefix)) {
if (!volumeName.startsWith(prefix)) {
continue;
}
}
if (!startKeyFound && volumeName.equals(startKey)) {
startKeyFound = true;
}
if (startKeyFound && result.size() < maxKeys) {
byte[] volumeInfo = store.get(this.getVolumeKey(volumeName));
if (volumeInfo == null) {
// Could not get volume info by given volume name,
// since the volume name is loaded from db,
// this probably means ksm db is corrupted or some entries are
// accidentally removed.
throw new KSMException("Volume info not found for " + volumeName,
ResultCodes.FAILED_INTERNAL_ERROR);
}
VolumeInfo info = VolumeInfo.parseFrom(volumeInfo);
KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(info);
result.add(volumeArgs);
}
}
return result;
}
private VolumeList getVolumesByUser(String userName)
throws KSMException {
return getVolumesByUser(getUserKey(userName));
}
private VolumeList getVolumesByUser(byte[] userNameKey)
throws KSMException {
VolumeList volumes = null;
byte[] volumesInBytes = store.get(userNameKey);
if (volumesInBytes == null) {
// No volume found for this user, return an empty list
return VolumeList.newBuilder().build();
}
try {
volumes = VolumeList.parseFrom(volumesInBytes);
} catch (InvalidProtocolBufferException e) {
throw new KSMException("Unable to get volumes info by the given user, "
+ "metadata might be corrupted",
e, ResultCodes.FAILED_INTERNAL_ERROR);
}
return volumes;
}
private VolumeList getAllVolumes() throws IOException {
// Scan all users in database
KeyPrefixFilter filter = new KeyPrefixFilter(OzoneConsts.KSM_USER_PREFIX);
// We are not expecting a huge number of users per cluster,
// it should be fine to scan all users in db and return us a
// list of volume names in string per user.
List<Map.Entry<byte[], byte[]>> rangeKVs = store
.getRangeKVs(null, Integer.MAX_VALUE, filter);
VolumeList.Builder builder = VolumeList.newBuilder();
for (Map.Entry<byte[], byte[]> entry : rangeKVs) {
VolumeList volumes = this.getVolumesByUser(entry.getKey());
builder.addAllVolumeNames(volumes.getVolumeNamesList());
}
return builder.build();
}
}

View File

@ -21,6 +21,7 @@
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
import java.io.IOException;
import java.util.List;
/**
* KSM volume manager interface.
@ -77,4 +78,22 @@ public interface VolumeManager {
*/
boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
throws IOException;
/**
* Returns a list of volumes owned by a given user; if user is null,
* returns all volumes.
*
* @param userName
* volume owner
* @param prefix
* the volume prefix used to filter the listing result.
* @param startKey
* the start volume name determines where to start listing from.
* @param maxKeys
* the maximum number of volumes to return.
* @return a list of {@link KsmVolumeArgs}
* @throws IOException
*/
List<KsmVolumeArgs> listVolumes(String userName, String prefix,
String startKey, int maxKeys) throws IOException;
}

View File

@ -345,4 +345,19 @@ public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
metadataManager.readLock().unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public List<KsmVolumeArgs> listVolumes(String userName,
String prefix, String startKey, int maxKeys) throws IOException {
metadataManager.readLock().lock();
try {
return metadataManager.listVolumes(
userName, prefix, startKey, maxKeys);
} finally {
metadataManager.readLock().unlock();
}
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.hadoop.ozone.protocolPB;
import com.google.common.collect.Lists;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
@ -229,7 +230,30 @@ public DeleteVolumeResponse deleteVolume(
public ListVolumeResponse listVolumes(
RpcController controller, ListVolumeRequest request)
throws ServiceException {
return null;
ListVolumeResponse.Builder resp = ListVolumeResponse.newBuilder();
List<KsmVolumeArgs> result = Lists.newArrayList();
try {
if (request.getScope()
== ListVolumeRequest.Scope.VOLUMES_BY_USER) {
result = impl.listVolumeByUser(request.getUserName(),
request.getPrefix(), request.getPrevKey(), request.getMaxKeys());
} else if (request.getScope()
== ListVolumeRequest.Scope.VOLUMES_BY_CLUSTER) {
result = impl.listAllVolumes(request.getPrefix(), request.getPrevKey(),
request.getMaxKeys());
}
if (result == null) {
throw new ServiceException("Failed to get volumes for given scope "
+ request.getScope());
}
result.forEach(item -> resp.addVolumeInfo(item.getProtobuf()));
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
@Override

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.OzoneConsts.Versioning;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos;
import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.ozone.OzoneAcl;
@ -171,7 +172,43 @@ public boolean checkVolumeAccess(String volume, OzoneAcl acl)
@Override
public ListVolumes listVolumes(ListArgs args)
throws IOException, OzoneException {
throw new UnsupportedOperationException("listVolumes not implemented");
int maxNumOfKeys = args.getMaxKeys();
if (maxNumOfKeys <= 0 ||
maxNumOfKeys > OzoneConsts.MAX_LISTVOLUMES_SIZE) {
throw new IllegalArgumentException(
String.format("Illegal max number of keys specified,"
+ " the value must be in range (0, %d], actual : %d.",
OzoneConsts.MAX_LISTVOLUMES_SIZE, maxNumOfKeys));
}
List<KsmVolumeArgs> listResult;
if (args.isRootScan()) {
listResult = keySpaceManagerClient.listAllVolumes(args.getPrefix(),
args.getPrevKey(), args.getMaxKeys());
} else {
UserArgs userArgs = args.getArgs();
if (userArgs == null || userArgs.getUserName() == null) {
throw new IllegalArgumentException("Illegal argument,"
+ " missing user argument.");
}
listResult = keySpaceManagerClient.listVolumeByUser(
args.getArgs().getUserName(), args.getPrefix(), args.getPrevKey(),
args.getMaxKeys());
}
// TODO Add missing fields createdOn, createdBy, bucketCount and bytesUsed
ListVolumes result = new ListVolumes();
for (KsmVolumeArgs volumeArgs : listResult) {
VolumeInfo info = new VolumeInfo();
KeySpaceManagerProtocolProtos.VolumeInfo
infoProto = volumeArgs.getProtobuf();
info.setOwner(new VolumeOwner(infoProto.getOwnerName()));
info.setQuota(OzoneQuota.getOzoneQuota(infoProto.getQuotaInBytes()));
info.setVolumeName(infoProto.getVolume());
result.addVolume(info);
}
return result;
}
@Override

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.ozone.web.handlers.ListArgs;
import org.apache.hadoop.ozone.web.response.ListBuckets;
import org.apache.hadoop.ozone.web.response.ListKeys;
import org.apache.hadoop.ozone.web.response.ListVolumes;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -840,4 +841,93 @@ public void testListKeys() throws IOException, OzoneException {
Status.BUCKET_NOT_FOUND.name(), e);
}
}
@Test
public void testListVolumes() throws IOException, OzoneException {
String user0 = "testListVolumes-user-0";
String user1 = "testListVolumes-user-1";
String adminUser = "testListVolumes-admin";
ListArgs listVolumeArgs;
ListVolumes volumes;
// Create 10 volumes by user0 and user1
String[] user0vols = new String[10];
String[] user1vols = new String[10];
for (int i =0; i<10; i++) {
VolumeArgs createVolumeArgs;
String user0VolName = "Vol-" + user0 + "-" + i;
user0vols[i] = user0VolName;
createVolumeArgs = new VolumeArgs(user0VolName, userArgs);
createVolumeArgs.setUserName(user0);
createVolumeArgs.setAdminName(adminUser);
createVolumeArgs.setQuota(new OzoneQuota(i, OzoneQuota.Units.GB));
storageHandler.createVolume(createVolumeArgs);
String user1VolName = "Vol-" + user1 + "-" + i;
user1vols[i] = user1VolName;
createVolumeArgs = new VolumeArgs(user1VolName, userArgs);
createVolumeArgs.setUserName(user1);
createVolumeArgs.setAdminName(adminUser);
createVolumeArgs.setQuota(new OzoneQuota(i, OzoneQuota.Units.GB));
storageHandler.createVolume(createVolumeArgs);
}
// Test list all volumes
UserArgs userArgs0 = new UserArgs(user0, OzoneUtils.getRequestID(),
null, null, null, null);
listVolumeArgs = new ListArgs(userArgs0, "Vol-testListVolumes", 100, null);
listVolumeArgs.setRootScan(true);
volumes = storageHandler.listVolumes(listVolumeArgs);
Assert.assertEquals(20, volumes.getVolumes().size());
// Test list all volumes belongs to an user
listVolumeArgs = new ListArgs(userArgs0, null, 100, null);
listVolumeArgs.setRootScan(false);
volumes = storageHandler.listVolumes(listVolumeArgs);
Assert.assertEquals(10, volumes.getVolumes().size());
// Test prefix
listVolumeArgs = new ListArgs(userArgs0,
"Vol-" + user0 + "-3", 100, null);
volumes = storageHandler.listVolumes(listVolumeArgs);
Assert.assertEquals(1, volumes.getVolumes().size());
Assert.assertEquals(user0vols[3],
volumes.getVolumes().get(0).getVolumeName());
Assert.assertEquals(user0,
volumes.getVolumes().get(0).getOwner().getName());
// Test list volumes by user
UserArgs userArgs1 = new UserArgs(user1, OzoneUtils.getRequestID(),
null, null, null, null);
listVolumeArgs = new ListArgs(userArgs1, null, 100, null);
listVolumeArgs.setRootScan(false);
volumes = storageHandler.listVolumes(listVolumeArgs);
Assert.assertEquals(10, volumes.getVolumes().size());
Assert.assertEquals(user1,
volumes.getVolumes().get(3).getOwner().getName());
// Make sure all available fields are returned
final String user0vol5 = "Vol-" + user0 + "-5";
listVolumeArgs = new ListArgs(userArgs0, null, 1, user0vol5);
listVolumeArgs.setRootScan(false);
volumes = storageHandler.listVolumes(listVolumeArgs);
Assert.assertEquals(1, volumes.getVolumes().size());
Assert.assertEquals(user0,
volumes.getVolumes().get(0).getOwner().getName());
Assert.assertEquals(user0vol5,
volumes.getVolumes().get(0).getVolumeName());
Assert.assertEquals(5,
volumes.getVolumes().get(0).getQuota().getSize());
Assert.assertEquals(OzoneQuota.Units.GB,
volumes.getVolumes().get(0).getQuota().getUnit());
// User doesn't have volumes
UserArgs userArgsX = new UserArgs("unknwonUser", OzoneUtils.getRequestID(),
null, null, null, null);
listVolumeArgs = new ListArgs(userArgsX, null, 100, null);
listVolumeArgs.setRootScan(false);
volumes = storageHandler.listVolumes(listVolumeArgs);
Assert.assertEquals(0, volumes.getVolumes().size());
}
}