HDFS-11776. Ozone: KSM: add SetBucketProperty. Contributed by Nandakumar Vadivelu.
This commit is contained in:
parent
72b228a9e6
commit
236c410881
@ -68,7 +68,7 @@ public final class KsmBucketArgs {
|
|||||||
*/
|
*/
|
||||||
private KsmBucketArgs(String volumeName, String bucketName,
|
private KsmBucketArgs(String volumeName, String bucketName,
|
||||||
List<OzoneAclInfo> addAcls, List<OzoneAclInfo> removeAcls,
|
List<OzoneAclInfo> addAcls, List<OzoneAclInfo> removeAcls,
|
||||||
boolean isVersionEnabled, StorageTypeProto storageType) {
|
Boolean isVersionEnabled, StorageTypeProto storageType) {
|
||||||
this.volumeName = volumeName;
|
this.volumeName = volumeName;
|
||||||
this.bucketName = bucketName;
|
this.bucketName = bucketName;
|
||||||
this.addAcls = addAcls;
|
this.addAcls = addAcls;
|
||||||
@ -113,7 +113,7 @@ public List<OzoneAclInfo> getRemoveAcls() {
|
|||||||
* Returns true if bucket version is enabled, else false.
|
* Returns true if bucket version is enabled, else false.
|
||||||
* @return isVersionEnabled
|
* @return isVersionEnabled
|
||||||
*/
|
*/
|
||||||
public boolean getIsVersionEnabled() {
|
public Boolean getIsVersionEnabled() {
|
||||||
return isVersionEnabled;
|
return isVersionEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -219,7 +219,8 @@ public static KsmBucketArgs getFromProtobuf(BucketArgs bucketArgs) {
|
|||||||
bucketArgs.getBucketName(),
|
bucketArgs.getBucketName(),
|
||||||
bucketArgs.getAddAclsList(),
|
bucketArgs.getAddAclsList(),
|
||||||
bucketArgs.getRemoveAclsList(),
|
bucketArgs.getRemoveAclsList(),
|
||||||
bucketArgs.getIsVersionEnabled(),
|
bucketArgs.hasIsVersionEnabled() ? bucketArgs.getIsVersionEnabled() :
|
||||||
bucketArgs.getStorageType());
|
null,
|
||||||
|
bucketArgs.hasStorageType() ? bucketArgs.getStorageType() : null);
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ksm.protocol;
|
package org.apache.hadoop.ksm.protocol;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||||
@ -115,6 +116,13 @@ List<KsmVolumeArgs> listAllVolumes(String prefix, String
|
|||||||
KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
|
KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets bucket property from args.
|
||||||
|
* @param args - BucketArgs.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void setBucketProperty(KsmBucketArgs args) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allocate a block to a container, the block is returned to the client.
|
* Allocate a block to a container, the block is returned to the client.
|
||||||
*
|
*
|
||||||
|
@ -22,11 +22,14 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||||
import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
|
import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.BucketArgs;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.BucketInfo;
|
.KeySpaceManagerProtocolProtos.BucketInfo;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
@ -37,6 +40,10 @@
|
|||||||
.KeySpaceManagerProtocolProtos.InfoBucketRequest;
|
.KeySpaceManagerProtocolProtos.InfoBucketRequest;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.InfoBucketResponse;
|
.KeySpaceManagerProtocolProtos.InfoBucketResponse;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.SetBucketPropertyRequest;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.SetBucketPropertyResponse;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
|
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
@ -320,6 +327,30 @@ public KsmBucketInfo getBucketInfo(String volume, String bucket)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets bucket property from args.
|
||||||
|
* @param args - BucketArgs.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void setBucketProperty(KsmBucketArgs args)
|
||||||
|
throws IOException {
|
||||||
|
SetBucketPropertyRequest.Builder req =
|
||||||
|
SetBucketPropertyRequest.newBuilder();
|
||||||
|
BucketArgs bucketArgs = args.getProtobuf();
|
||||||
|
req.setBucketArgs(bucketArgs);
|
||||||
|
final SetBucketPropertyResponse resp;
|
||||||
|
try {
|
||||||
|
resp = rpcProxy.setBucketProperty(NULL_RPC_CONTROLLER,
|
||||||
|
req.build());
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
if (resp.getStatus() != Status.OK) {
|
||||||
|
throw new IOException("Setting bucket property failed, error: "
|
||||||
|
+ resp.getStatus());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allocate a block for a key, then use the returned meta info to talk to data
|
* Allocate a block for a key, then use the returned meta info to talk to data
|
||||||
|
@ -239,6 +239,14 @@ message LocateKeyResponse {
|
|||||||
optional KeyInfo keyInfo = 2;
|
optional KeyInfo keyInfo = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message SetBucketPropertyRequest {
|
||||||
|
required BucketArgs bucketArgs = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SetBucketPropertyResponse {
|
||||||
|
required Status status = 1;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
The KSM service that takes care of Ozone namespace.
|
The KSM service that takes care of Ozone namespace.
|
||||||
*/
|
*/
|
||||||
@ -291,6 +299,12 @@ service KeySpaceManagerService {
|
|||||||
rpc infoBucket(InfoBucketRequest)
|
rpc infoBucket(InfoBucketRequest)
|
||||||
returns(InfoBucketResponse);
|
returns(InfoBucketResponse);
|
||||||
|
|
||||||
|
/**
|
||||||
|
Sets bucket properties.
|
||||||
|
*/
|
||||||
|
rpc setBucketProperty(SetBucketPropertyRequest)
|
||||||
|
returns(SetBucketPropertyResponse);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Get key.
|
Get key.
|
||||||
*/
|
*/
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.ksm;
|
package org.apache.hadoop.ozone.ksm;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -36,4 +37,11 @@ public interface BucketManager {
|
|||||||
*/
|
*/
|
||||||
KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
|
KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets bucket property from args.
|
||||||
|
* @param args - BucketArgs.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void setBucketProperty(KsmBucketArgs args) throws IOException;
|
||||||
}
|
}
|
||||||
|
@ -17,15 +17,24 @@
|
|||||||
package org.apache.hadoop.ozone.ksm;
|
package org.apache.hadoop.ozone.ksm;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto
|
||||||
|
.HdfsProtos.StorageTypeProto;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.BucketInfo;
|
.KeySpaceManagerProtocolProtos.BucketInfo;
|
||||||
|
import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
|
||||||
|
import org.apache.hadoop.ozone.web.request.OzoneAcl;
|
||||||
import org.iq80.leveldb.DBException;
|
import org.iq80.leveldb.DBException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* KSM bucket manager.
|
* KSM bucket manager.
|
||||||
@ -134,4 +143,108 @@ public KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
|
|||||||
metadataManager.readLock().unlock();
|
metadataManager.readLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets bucket property from args.
|
||||||
|
* @param args - BucketArgs.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void setBucketProperty(KsmBucketArgs args) throws IOException {
|
||||||
|
Preconditions.checkNotNull(args);
|
||||||
|
metadataManager.writeLock().lock();
|
||||||
|
String volumeName = args.getVolumeName();
|
||||||
|
String bucketName = args.getBucketName();
|
||||||
|
try {
|
||||||
|
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
|
||||||
|
//Check if volume exists
|
||||||
|
if(metadataManager.get(metadataManager.getVolumeKey(volumeName)) ==
|
||||||
|
null) {
|
||||||
|
LOG.error("volume: {} not found ", volumeName);
|
||||||
|
throw new KSMException("Volume doesn't exist",
|
||||||
|
KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
||||||
|
}
|
||||||
|
byte[] value = metadataManager.get(bucketKey);
|
||||||
|
//Check if bucket exist
|
||||||
|
if(value == null) {
|
||||||
|
LOG.error("bucket: {} not found ", bucketName);
|
||||||
|
throw new KSMException("Bucket doesn't exist",
|
||||||
|
KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
|
||||||
|
}
|
||||||
|
KsmBucketInfo oldBucketInfo = KsmBucketInfo.getFromProtobuf(
|
||||||
|
BucketInfo.parseFrom(value));
|
||||||
|
KsmBucketInfo.Builder bucketInfoBuilder = KsmBucketInfo.newBuilder();
|
||||||
|
bucketInfoBuilder.setVolumeName(oldBucketInfo.getVolumeName())
|
||||||
|
.setBucketName(oldBucketInfo.getBucketName());
|
||||||
|
|
||||||
|
//Check ACLs to update
|
||||||
|
if(args.getAddAcls() != null || args.getRemoveAcls() != null) {
|
||||||
|
List<OzoneAcl> acls = getUpdatedAclList(oldBucketInfo.getAcls(),
|
||||||
|
args.getRemoveAcls(), args.getAddAcls());
|
||||||
|
bucketInfoBuilder.setAcls(acls.stream().map(
|
||||||
|
KSMPBHelper::convertOzoneAcl).collect(Collectors.toList()));
|
||||||
|
LOG.debug("Updating ACLs for bucket: {} in volume: {}",
|
||||||
|
bucketName, volumeName);
|
||||||
|
} else {
|
||||||
|
bucketInfoBuilder.setAcls(oldBucketInfo.getAcls());
|
||||||
|
}
|
||||||
|
|
||||||
|
//Check StorageType to update
|
||||||
|
StorageTypeProto storageTypeProto = args.getStorageType();
|
||||||
|
if(storageTypeProto != null) {
|
||||||
|
bucketInfoBuilder.setStorageType(storageTypeProto);
|
||||||
|
LOG.debug("Updating bucket storage type for bucket: {} in volume: {}",
|
||||||
|
bucketName, volumeName);
|
||||||
|
} else {
|
||||||
|
bucketInfoBuilder.setStorageType(oldBucketInfo.getStorageType());
|
||||||
|
}
|
||||||
|
|
||||||
|
//Check Versioning to update
|
||||||
|
Boolean versioning = args.getIsVersionEnabled();
|
||||||
|
if(versioning != null) {
|
||||||
|
bucketInfoBuilder.setIsVersionEnabled(versioning);
|
||||||
|
LOG.debug("Updating bucket versioning for bucket: {} in volume: {}",
|
||||||
|
bucketName, volumeName);
|
||||||
|
} else {
|
||||||
|
bucketInfoBuilder.setIsVersionEnabled(
|
||||||
|
oldBucketInfo.getIsVersionEnabled());
|
||||||
|
}
|
||||||
|
|
||||||
|
metadataManager.put(bucketKey, bucketInfoBuilder.build()
|
||||||
|
.getProtobuf().toByteArray());
|
||||||
|
} catch (IOException | DBException ex) {
|
||||||
|
LOG.error("Setting bucket property failed for bucket:{} in volume:{}",
|
||||||
|
bucketName, volumeName, ex);
|
||||||
|
throw ex;
|
||||||
|
} finally {
|
||||||
|
metadataManager.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the existing ACL list with remove and add ACLs that are passed.
|
||||||
|
* Remove is done before Add.
|
||||||
|
*
|
||||||
|
* @param existingAcls - old ACL list.
|
||||||
|
* @param removeAclInfos - ACLs to be removed.
|
||||||
|
* @param addAclInfos - ACLs to be added.
|
||||||
|
* @return updated ACL list.
|
||||||
|
*/
|
||||||
|
private List<OzoneAcl> getUpdatedAclList(List<OzoneAclInfo> existingAcls,
|
||||||
|
List<OzoneAclInfo> removeAclInfos, List<OzoneAclInfo> addAclInfos) {
|
||||||
|
List<OzoneAcl> acls = existingAcls.stream().map(
|
||||||
|
KSMPBHelper::convertOzoneAcl).collect(Collectors.toList());
|
||||||
|
if(removeAclInfos != null && !removeAclInfos.isEmpty()) {
|
||||||
|
List<OzoneAcl> removeAcls = removeAclInfos.stream().map(
|
||||||
|
KSMPBHelper::convertOzoneAcl).collect(Collectors.toList());
|
||||||
|
acls.removeAll(removeAcls);
|
||||||
|
}
|
||||||
|
if(addAclInfos != null && !addAclInfos.isEmpty()) {
|
||||||
|
List<OzoneAcl> addAcls = addAclInfos.stream().map(
|
||||||
|
KSMPBHelper::convertOzoneAcl).collect(Collectors.toList());
|
||||||
|
addAcls.stream().filter(acl -> !acls.contains(acl)).forEach(
|
||||||
|
acls::add);
|
||||||
|
}
|
||||||
|
return acls;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,7 @@ public class KSMMetrics {
|
|||||||
private @Metric MutableCounterLong numVolumeInfos;
|
private @Metric MutableCounterLong numVolumeInfos;
|
||||||
private @Metric MutableCounterLong numBucketCreates;
|
private @Metric MutableCounterLong numBucketCreates;
|
||||||
private @Metric MutableCounterLong numBucketInfos;
|
private @Metric MutableCounterLong numBucketInfos;
|
||||||
|
private @Metric MutableCounterLong numBucketModifies;
|
||||||
private @Metric MutableCounterLong numKeyAllocate;
|
private @Metric MutableCounterLong numKeyAllocate;
|
||||||
private @Metric MutableCounterLong numKeyLookup;
|
private @Metric MutableCounterLong numKeyLookup;
|
||||||
|
|
||||||
@ -42,6 +43,7 @@ public class KSMMetrics {
|
|||||||
private @Metric MutableCounterLong numVolumeInfoFails;
|
private @Metric MutableCounterLong numVolumeInfoFails;
|
||||||
private @Metric MutableCounterLong numBucketCreateFails;
|
private @Metric MutableCounterLong numBucketCreateFails;
|
||||||
private @Metric MutableCounterLong numBucketInfoFails;
|
private @Metric MutableCounterLong numBucketInfoFails;
|
||||||
|
private @Metric MutableCounterLong numBucketModifyFails;
|
||||||
private @Metric MutableCounterLong numKeyAllocateFails;
|
private @Metric MutableCounterLong numKeyAllocateFails;
|
||||||
private @Metric MutableCounterLong numKeyLookupFails;
|
private @Metric MutableCounterLong numKeyLookupFails;
|
||||||
|
|
||||||
@ -75,8 +77,12 @@ public void incNumBucketInfos() {
|
|||||||
numBucketInfos.incr();
|
numBucketInfos.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incNumBucketModifies() {
|
||||||
|
numBucketModifies.incr();
|
||||||
|
}
|
||||||
|
|
||||||
public void incNumVolumeCreateFails() {
|
public void incNumVolumeCreateFails() {
|
||||||
numVolumeCreates.incr();
|
numVolumeCreateFails.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void incNumVolumeModifyFails() {
|
public void incNumVolumeModifyFails() {
|
||||||
@ -95,6 +101,10 @@ public void incNumBucketInfoFails() {
|
|||||||
numBucketInfoFails.incr();
|
numBucketInfoFails.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incNumBucketModifyFails() {
|
||||||
|
numBucketModifyFails.incr();
|
||||||
|
}
|
||||||
|
|
||||||
public void incNumKeyAllocates() {
|
public void incNumKeyAllocates() {
|
||||||
numKeyAllocate.incr();
|
numKeyAllocate.incr();
|
||||||
}
|
}
|
||||||
@ -136,6 +146,11 @@ public long getNumBucketInfos() {
|
|||||||
return numBucketInfos.value();
|
return numBucketInfos.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumBucketModifies() {
|
||||||
|
return numBucketModifies.value();
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public long getNumVolumeCreateFails() {
|
public long getNumVolumeCreateFails() {
|
||||||
return numVolumeCreateFails.value();
|
return numVolumeCreateFails.value();
|
||||||
@ -161,6 +176,11 @@ public long getNumBucketInfoFails() {
|
|||||||
return numBucketInfoFails.value();
|
return numBucketInfoFails.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumBucketModifyFails() {
|
||||||
|
return numBucketModifyFails.value();
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public long getNumKeyAllocates() {
|
public long getNumKeyAllocates() {
|
||||||
return numKeyAllocate.value();
|
return numKeyAllocate.value();
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
import org.apache.hadoop.ipc.Client;
|
import org.apache.hadoop.ipc.Client;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||||
@ -442,4 +443,23 @@ public KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException {
|
|||||||
throw ex;
|
throw ex;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets bucket property from args.
|
||||||
|
* @param args - BucketArgs.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void setBucketProperty(KsmBucketArgs args)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
metrics.incNumBucketModifies();
|
||||||
|
bucketManager.setBucketProperty(args);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
metrics.incNumBucketModifyFails();
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
import com.google.protobuf.RpcController;
|
import com.google.protobuf.RpcController;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||||
@ -33,6 +34,10 @@
|
|||||||
.KeySpaceManagerProtocolProtos.InfoBucketRequest;
|
.KeySpaceManagerProtocolProtos.InfoBucketRequest;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.InfoBucketResponse;
|
.KeySpaceManagerProtocolProtos.InfoBucketResponse;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.SetBucketPropertyRequest;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.SetBucketPropertyResponse;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
|
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
@ -269,4 +274,20 @@ public LocateKeyResponse lookupKey(
|
|||||||
}
|
}
|
||||||
return resp.build();
|
return resp.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SetBucketPropertyResponse setBucketProperty(
|
||||||
|
RpcController controller, SetBucketPropertyRequest request)
|
||||||
|
throws ServiceException {
|
||||||
|
SetBucketPropertyResponse.Builder resp =
|
||||||
|
SetBucketPropertyResponse.newBuilder();
|
||||||
|
try {
|
||||||
|
impl.setBucketProperty(KsmBucketArgs.getFromProtobuf(
|
||||||
|
request.getBucketArgs()));
|
||||||
|
resp.setStatus(Status.OK);
|
||||||
|
} catch(IOException e) {
|
||||||
|
resp.setStatus(exceptionToResponseStatus(e));
|
||||||
|
}
|
||||||
|
return resp.build();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset
|
||||||
.LengthInputStream;
|
.LengthInputStream;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||||
@ -38,6 +39,7 @@
|
|||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts.Versioning;
|
import org.apache.hadoop.ozone.OzoneConsts.Versioning;
|
||||||
import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
|
import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
|
||||||
|
import org.apache.hadoop.ozone.web.request.OzoneAcl;
|
||||||
import org.apache.hadoop.ozone.web.request.OzoneQuota;
|
import org.apache.hadoop.ozone.web.request.OzoneQuota;
|
||||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
import org.apache.hadoop.scm.ScmConfigKeys;
|
import org.apache.hadoop.scm.ScmConfigKeys;
|
||||||
@ -222,21 +224,44 @@ private boolean getBucketVersioningProtobuf(
|
|||||||
@Override
|
@Override
|
||||||
public void setBucketAcls(BucketArgs args)
|
public void setBucketAcls(BucketArgs args)
|
||||||
throws IOException, OzoneException {
|
throws IOException, OzoneException {
|
||||||
throw new UnsupportedOperationException("setBucketAcls not implemented");
|
List<OzoneAcl> removeAcls = args.getRemoveAcls();
|
||||||
|
List<OzoneAcl> addAcls = args.getAddAcls();
|
||||||
|
if(removeAcls != null || addAcls != null) {
|
||||||
|
KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
|
||||||
|
builder.setVolumeName(args.getVolumeName())
|
||||||
|
.setBucketName(args.getBucketName());
|
||||||
|
if(removeAcls != null && !removeAcls.isEmpty()) {
|
||||||
|
builder.setRemoveAcls(args.getRemoveAcls().stream().map(
|
||||||
|
KSMPBHelper::convertOzoneAcl).collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
if(addAcls != null && !addAcls.isEmpty()) {
|
||||||
|
builder.setAddAcls(args.getAddAcls().stream().map(
|
||||||
|
KSMPBHelper::convertOzoneAcl).collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
keySpaceManagerClient.setBucketProperty(builder.build());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setBucketVersioning(BucketArgs args)
|
public void setBucketVersioning(BucketArgs args)
|
||||||
throws IOException, OzoneException {
|
throws IOException, OzoneException {
|
||||||
throw new UnsupportedOperationException(
|
KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
|
||||||
"setBucketVersioning not implemented");
|
builder.setVolumeName(args.getVolumeName())
|
||||||
|
.setBucketName(args.getBucketName())
|
||||||
|
.setIsVersionEnabled(getBucketVersioningProtobuf(
|
||||||
|
args.getVersioning()));
|
||||||
|
keySpaceManagerClient.setBucketProperty(builder.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setBucketStorageClass(BucketArgs args)
|
public void setBucketStorageClass(BucketArgs args)
|
||||||
throws IOException, OzoneException {
|
throws IOException, OzoneException {
|
||||||
throw new UnsupportedOperationException(
|
KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
|
||||||
"setBucketStorageClass not implemented");
|
builder.setVolumeName(args.getVolumeName())
|
||||||
|
.setBucketName(args.getBucketName())
|
||||||
|
.setStorageType(PBHelperClient.convertStorageType(
|
||||||
|
args.getStorageType()));
|
||||||
|
keySpaceManagerClient.setBucketProperty(builder.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -18,11 +18,16 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
||||||
import org.apache.hadoop.ozone.ksm.exceptions
|
import org.apache.hadoop.ozone.ksm.exceptions
|
||||||
.KSMException.ResultCodes;
|
.KSMException.ResultCodes;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
|
||||||
|
import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
|
||||||
|
import org.apache.hadoop.ozone.web.request.OzoneAcl;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -34,8 +39,10 @@
|
|||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
@ -176,4 +183,129 @@ public void testGetBucketInfo() throws IOException {
|
|||||||
result.getStorageType());
|
result.getStorageType());
|
||||||
Assert.assertEquals(false, result.getIsVersionEnabled());
|
Assert.assertEquals(false, result.getIsVersionEnabled());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetBucketPropertyAddACL() throws IOException {
|
||||||
|
MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
|
||||||
|
List<OzoneAclInfo> acls = new LinkedList<>();
|
||||||
|
OzoneAcl ozoneAcl = new OzoneAcl(OzoneAcl.OzoneACLType.USER,
|
||||||
|
"root", OzoneAcl.OzoneACLRights.READ);
|
||||||
|
acls.add(KSMPBHelper.convertOzoneAcl(ozoneAcl));
|
||||||
|
BucketManager bucketManager = new BucketManagerImpl(metaMgr);
|
||||||
|
KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
|
||||||
|
.setVolumeName("sampleVol")
|
||||||
|
.setBucketName("bucketOne")
|
||||||
|
.setAcls(acls)
|
||||||
|
.setStorageType(HdfsProtos.StorageTypeProto.DISK)
|
||||||
|
.setIsVersionEnabled(false)
|
||||||
|
.build();
|
||||||
|
bucketManager.createBucket(bucketInfo);
|
||||||
|
KsmBucketInfo result = bucketManager.getBucketInfo(
|
||||||
|
"sampleVol", "bucketOne");
|
||||||
|
Assert.assertEquals("sampleVol", result.getVolumeName());
|
||||||
|
Assert.assertEquals("bucketOne", result.getBucketName());
|
||||||
|
Assert.assertEquals(1, result.getAcls().size());
|
||||||
|
List<OzoneAclInfo> addAcls = new LinkedList<>();
|
||||||
|
OzoneAcl newAcl = new OzoneAcl(OzoneAcl.OzoneACLType.USER,
|
||||||
|
"ozone", OzoneAcl.OzoneACLRights.READ);
|
||||||
|
addAcls.add(KSMPBHelper.convertOzoneAcl(newAcl));
|
||||||
|
KsmBucketArgs bucketArgs = KsmBucketArgs.newBuilder()
|
||||||
|
.setVolumeName("sampleVol")
|
||||||
|
.setBucketName("bucketOne")
|
||||||
|
.setAddAcls(addAcls)
|
||||||
|
.build();
|
||||||
|
bucketManager.setBucketProperty(bucketArgs);
|
||||||
|
KsmBucketInfo updatedResult = bucketManager.getBucketInfo(
|
||||||
|
"sampleVol", "bucketOne");
|
||||||
|
Assert.assertEquals(2, updatedResult.getAcls().size());
|
||||||
|
Assert.assertTrue(updatedResult.getAcls().contains(
|
||||||
|
KSMPBHelper.convertOzoneAcl(newAcl)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetBucketPropertyRemoveACL() throws IOException {
|
||||||
|
MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
|
||||||
|
List<OzoneAclInfo> acls = new LinkedList<>();
|
||||||
|
OzoneAcl aclOne = new OzoneAcl(OzoneAcl.OzoneACLType.USER,
|
||||||
|
"root", OzoneAcl.OzoneACLRights.READ);
|
||||||
|
OzoneAcl aclTwo = new OzoneAcl(OzoneAcl.OzoneACLType.USER,
|
||||||
|
"ozone", OzoneAcl.OzoneACLRights.READ);
|
||||||
|
acls.add(KSMPBHelper.convertOzoneAcl(aclOne));
|
||||||
|
acls.add(KSMPBHelper.convertOzoneAcl(aclTwo));
|
||||||
|
BucketManager bucketManager = new BucketManagerImpl(metaMgr);
|
||||||
|
KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
|
||||||
|
.setVolumeName("sampleVol")
|
||||||
|
.setBucketName("bucketOne")
|
||||||
|
.setAcls(acls)
|
||||||
|
.setStorageType(HdfsProtos.StorageTypeProto.DISK)
|
||||||
|
.setIsVersionEnabled(false)
|
||||||
|
.build();
|
||||||
|
bucketManager.createBucket(bucketInfo);
|
||||||
|
KsmBucketInfo result = bucketManager.getBucketInfo(
|
||||||
|
"sampleVol", "bucketOne");
|
||||||
|
Assert.assertEquals(2, result.getAcls().size());
|
||||||
|
List<OzoneAclInfo> removeAcls = new LinkedList<>();
|
||||||
|
removeAcls.add(KSMPBHelper.convertOzoneAcl(aclTwo));
|
||||||
|
KsmBucketArgs bucketArgs = KsmBucketArgs.newBuilder()
|
||||||
|
.setVolumeName("sampleVol")
|
||||||
|
.setBucketName("bucketOne")
|
||||||
|
.setRemoveAcls(removeAcls)
|
||||||
|
.build();
|
||||||
|
bucketManager.setBucketProperty(bucketArgs);
|
||||||
|
KsmBucketInfo updatedResult = bucketManager.getBucketInfo(
|
||||||
|
"sampleVol", "bucketOne");
|
||||||
|
Assert.assertEquals(1, updatedResult.getAcls().size());
|
||||||
|
Assert.assertFalse(updatedResult.getAcls().contains(
|
||||||
|
KSMPBHelper.convertOzoneAcl(aclTwo)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetBucketPropertyChangeStorageType() throws IOException {
|
||||||
|
MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
|
||||||
|
BucketManager bucketManager = new BucketManagerImpl(metaMgr);
|
||||||
|
KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
|
||||||
|
.setVolumeName("sampleVol")
|
||||||
|
.setBucketName("bucketOne")
|
||||||
|
.setStorageType(HdfsProtos.StorageTypeProto.DISK)
|
||||||
|
.build();
|
||||||
|
bucketManager.createBucket(bucketInfo);
|
||||||
|
KsmBucketInfo result = bucketManager.getBucketInfo(
|
||||||
|
"sampleVol", "bucketOne");
|
||||||
|
Assert.assertEquals(HdfsProtos.StorageTypeProto.DISK,
|
||||||
|
result.getStorageType());
|
||||||
|
KsmBucketArgs bucketArgs = KsmBucketArgs.newBuilder()
|
||||||
|
.setVolumeName("sampleVol")
|
||||||
|
.setBucketName("bucketOne")
|
||||||
|
.setStorageType(HdfsProtos.StorageTypeProto.SSD)
|
||||||
|
.build();
|
||||||
|
bucketManager.setBucketProperty(bucketArgs);
|
||||||
|
KsmBucketInfo updatedResult = bucketManager.getBucketInfo(
|
||||||
|
"sampleVol", "bucketOne");
|
||||||
|
Assert.assertEquals(HdfsProtos.StorageTypeProto.SSD,
|
||||||
|
updatedResult.getStorageType());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetBucketPropertyChangeVersioning() throws IOException {
|
||||||
|
MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
|
||||||
|
BucketManager bucketManager = new BucketManagerImpl(metaMgr);
|
||||||
|
KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
|
||||||
|
.setVolumeName("sampleVol")
|
||||||
|
.setBucketName("bucketOne")
|
||||||
|
.setIsVersionEnabled(false)
|
||||||
|
.build();
|
||||||
|
bucketManager.createBucket(bucketInfo);
|
||||||
|
KsmBucketInfo result = bucketManager.getBucketInfo(
|
||||||
|
"sampleVol", "bucketOne");
|
||||||
|
Assert.assertFalse(result.getIsVersionEnabled());
|
||||||
|
KsmBucketArgs bucketArgs = KsmBucketArgs.newBuilder()
|
||||||
|
.setVolumeName("sampleVol")
|
||||||
|
.setBucketName("bucketOne")
|
||||||
|
.setIsVersionEnabled(true)
|
||||||
|
.build();
|
||||||
|
bucketManager.setBucketProperty(bucketArgs);
|
||||||
|
KsmBucketInfo updatedResult = bucketManager.getBucketInfo(
|
||||||
|
"sampleVol", "bucketOne");
|
||||||
|
Assert.assertTrue(updatedResult.getIsVersionEnabled());
|
||||||
|
}
|
||||||
}
|
}
|
@ -31,6 +31,7 @@
|
|||||||
import org.apache.hadoop.ozone.web.handlers.UserArgs;
|
import org.apache.hadoop.ozone.web.handlers.UserArgs;
|
||||||
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
|
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
|
||||||
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
|
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
|
||||||
|
import org.apache.hadoop.ozone.web.request.OzoneAcl;
|
||||||
import org.apache.hadoop.ozone.web.request.OzoneQuota;
|
import org.apache.hadoop.ozone.web.request.OzoneQuota;
|
||||||
import org.apache.hadoop.ozone.web.response.BucketInfo;
|
import org.apache.hadoop.ozone.web.response.BucketInfo;
|
||||||
import org.apache.hadoop.ozone.web.response.VolumeInfo;
|
import org.apache.hadoop.ozone.web.response.VolumeInfo;
|
||||||
@ -48,6 +49,7 @@
|
|||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test Key Space Manager operation in distributed handler scenario.
|
* Test Key Space Manager operation in distributed handler scenario.
|
||||||
|
Loading…
Reference in New Issue
Block a user